sampler Module

get_bins(record_set, dimension)

Creates bins of records based on the chosen dimension.

Parameters:
  • record_set (list) – The dataset from which the records are to be split (should be a list of lists with the inner-level representing rows).

  • dimension (int) – The index of the column along which to make the split (starts at 0).

Returns:
  • list – A list of lists with the inner lists representing bins of datasets.

Source code in samrand/sampler.py
def get_bins(record_set, dimension):
    """Creates bins of records based on the chosen dimension.

    Args:
        record_set (list): The dataset from which the records are to be split (should be a list of lists with the inner-level representing rows).
        dimension (int): The index of the column along which to make the split (starts at 0).

    Returns:
        list: A list of lists with the inner lists representing bins of datasets.
    """
    bin_dict = dict()
    result = []
    for index, entry in enumerate(record_set):
        if entry[dimension] not in bin_dict.keys():
            bin_dict[entry[dimension]] = []
        bin_dict[entry[dimension]].append(entry)
    for key in bin_dict.keys():
        result.append(bin_dict[key])
    return result

get_least_diverse_dimension(record_set)

Returns the dimension with the least variability as calculated by statistical variance.

Parameters:
  • record_set (list) – The dataset from which the dimension with the least variability is to be identified (should be a list of lists with the inner-level representing rows).

Returns:
  • int – An integer representing the dimension's index (starting from 0).

Source code in samrand/sampler.py
def get_least_diverse_dimension(record_set):
    """Returns the dimension with the least variability as calculated by statistical variance.

    Args:
        record_set (list): The dataset from which the dimension with the least variability is to be identified (should be a list of lists with the inner-level representing rows).

    Returns:
        int: An integer representing the dimension's index (starting from 0).
    """
    dimensions = []
    min_variance = 1000
    min_dimension = -1
    for dimension in record_set[0]:
        dimensions.append([dimension])
    for entry in record_set[1:]:
        for dim_index, dim_value in enumerate(entry):
            dimensions[dim_index].append(dim_value)
    for index, dimension in enumerate(dimensions):
        processed_dim = []
        # Check to see if we're dealing with numbers, text, or textual numbers
        if type(dimension[0]) is str:
            try:
                float(dimension[0])  # It's a number
                processed_dim = [float(x) for x in dimension]
            except ValueError:  # It's not a number, we need to encode it
                uniques = list(np.unique(dimension))
                uniques_map = dict()  # Build our own little ordinal encoder
                counter = 1
                for unique in uniques:
                    uniques_map[unique] = counter
                    counter += 1
                processed_dim = [uniques_map[x] for x in dimension]
        else:  # It's already numbers
            processed_dim = dimension
        dim_variance = np.var(processed_dim)
        if dim_variance < min_variance:
            min_variance = dim_variance
            min_dimension = index
    return min_dimension

get_random_subset(entry_set, n, replacement)

Extracts a random sample from a set with or without replacement.

Parameters:
  • entry_set (list) – The set from which the sample is to be extracted. Expects a list of lists where the inner lists represent rows.

  • replacement (bool) – A flag to indicate whether to extract the sample with replacement or not.

Returns:
  • list – A list of lists containing the extracted sample.

Source code in samrand/sampler.py
def get_random_subset(entry_set, n, replacement):
    """Extracts a random sample from a set with or without replacement.

    Args:
        entry_set (list): The set from which the sample is to be extracted. Expects a list of lists where the inner lists represent rows.
        replacement (bool): A flag to indicate whether to extract the sample with replacement or not.

    Returns:
        list: A list of lists containing the extracted sample.
    """
    random.seed()
    entry_set_copy = deepcopy(entry_set)
    result = []
    if n > len(entry_set):
        n = len(entry_set)
    while len(result) < n:
        index = random.randint(0, len(entry_set_copy) - 1)
        result.append(deepcopy(entry_set_copy[index]))
        if not replacement:
            del entry_set_copy[index]
    return result

sample(dataset, size, stratify=False, strata=[], replacement=False)

Extracts a random sample of a given size from the dataset.

Parameters:
  • dataset (tuple) – The dataset from which the sample is to be extracted. Expects a tuple of ([[entry 1], [entry 2], ...], []) if the dataset has a header or ([[entry 1], [entry 2], ...], None) if there is no header.

  • size (int) – The size of the expected sample.

  • stratify (bool) – A flag to indicate whether to stratify the sample.

  • strata (list) – A list of column indices (starting from 0) to indicate which columns to use to stratification.

  • replacement (bool) – A flag to indicate whether to sample with or without replacement. Default is without.

Returns:
  • list – A list of lists with each list representing a row in the sample.

Source code in samrand/sampler.py
def sample(dataset, size, stratify=False, strata=[], replacement=False):
    """Extracts a random sample of a given size from the dataset.

    Args:
        dataset (tuple): The dataset from which the sample is to be extracted. Expects a tuple of ([[entry 1], [entry 2], ...], []) if the dataset has a header or ([[entry 1], [entry 2], ...], None) if there is no header.
        size (int): The size of the expected sample.
        stratify (bool): A flag to indicate whether to stratify the sample.
        strata (list): A list of column indices (starting from 0) to indicate which columns to use to stratification.
        replacement (bool): A flag to indicate whether to sample with or without replacement. Default is without.

    Returns:
        list: A list of lists with each list representing a row in the sample.
    """
    header = dataset[1]
    entries = dataset[0]
    sample_result = []
    final_result = []

    # Add a header if there is one
    if header:
        final_result.append(header)

    if stratify and len(strata) == 0:
        # If we need stratification but without dimensions, we use single-stage cluster sampling, and create strata from the dimension with the least variance to ensure diverse strata
        dim_index = get_least_diverse_dimension(entries)
        sample_bins = get_bins(entries, dim_index)
        for sample_bin in sample_bins:
            proportion_size = 1 + round((len(sample_bin) / len(entries)) * size)
            representative_bin = get_random_subset(sample_bin, proportion_size, replacement)
            sample_result.extend(representative_bin)
    elif stratify and len(strata) == 1:
        # Similar to the previous branch, but we have a specific dimension in mind
        dim_index = strata[0]
        sample_bins = get_bins(entries, dim_index)
        for sample_bin in sample_bins:
            proportion_size = 1 + round((len(sample_bin) / len(entries)) * size)
            representative_bin = get_random_subset(sample_bin, proportion_size, replacement)
            sample_result.extend(representative_bin)
    elif stratify and len(strata) > 1:
        # We have several specific dimensions, so we use multi-stage cluster sampling and create strata based on the given dimensions
        bin_levels = dict()
        current_level = 0
        bin_levels[current_level] = get_bins(entries, strata[0])
        while current_level < len(strata):
            for grouped_bin in bin_levels[current_level]:
                lower_bins = get_bins(grouped_bin, strata[current_level])
                for lower_bin in lower_bins:
                    if (current_level + 1) not in bin_levels.keys():
                        bin_levels[current_level + 1] = []
                    bin_levels[current_level + 1].append(lower_bin)
            current_level += 1
        for sample_bin in bin_levels[len(strata)]:
            proportion_size = 1 + round((len(sample_bin) / len(entries)) * size)
            representative_bin = get_random_subset(sample_bin, proportion_size, replacement)
            sample_result.extend(representative_bin)
    else:
        # If we don't need stratification, we just sample randomly for a uniformly distributed sample
        sample_result.extend(get_random_subset(entries, size, replacement))
    # Drop extra rows at random if the result size is larger than the sample size
    result_sample_size = len(sample_result)
    if result_sample_size > size:
        difference = result_sample_size - size
        random.seed()
        for i in range(difference):
            random_index = random.randint(0, len(sample_result) - 1)
            del sample_result[random_index]
    final_result.extend(sample_result)
    return final_result