Source code for pharmpy.modeling.iterators

"""
Iterators generating new datasets from a dataset. The dataset could either be stand alone
or connected to a model. If a model is used the same model will be updated with different
datasets for each iteration.

Currently, contains:

1. Omit - Can be used for cdd
2. Resample - Can be used by bootstrap
"""

import warnings
from collections.abc import Mapping
from typing import Optional, Union

from pharmpy.deps import numpy as np
from pharmpy.deps import pandas as pd
from pharmpy.internals.math import round_and_keep_sum
from pharmpy.model import Model


class DatasetIterator:
    """Base class for iterator classes that generate new datasets from an input dataset

    The __next__ method could return either a DataFrame or a tuple where the first
    element is the main DataFrame.
    """

    def __init__(self, iterations, name_pattern='dataset_{}'):
        """Initialization of the base class
        :param iterations: is the number of iterations
        :param name_pattern: Name pattern to use for generated datasets.
             A number starting from 1 will be put in the placeholder.
        """
        self._next = 1
        self._iterations = iterations
        self._name_pattern = name_pattern

    def _check_exhausted(self):
        """Check if the iterator is exhausted. Raise StopIteration in that case"""
        if self._next > self._iterations:
            raise StopIteration

    def _prepare_next(self, df):
        df.name = self._name_pattern.format(self._next)
        self._next += 1

    def _retrieve_dataset(self, df_or_model):
        """Extract dataset from model and remember the model.
        If input is dataset simply pass through and set model to None
        """
        try:
            dataset = df_or_model.dataset
            self._model = df_or_model
            return dataset
        except AttributeError:
            self._model = None
            return df_or_model

    def _combine_dataset(self, df):
        """If we are working with a model set the dataset and return model
        else simply pass the dataset through
        """
        if self._model is None:
            return df
        else:
            return self._model.replace(name=df.name, dataset=df)

    def __iter__(self):
        return self


class Omit(DatasetIterator):
    """Iterate over omissions of a certain group in a dataset. One group is omitted at a time.

    :param dataset_or_model: DataFrame to iterate over or a model from which to use the dataset
    :param colname group: Name of the column to use for grouping
    :param name_pattern: Name to use for generated datasets. A number starting from 1 will
        be put in the placeholder.
    :returns: Tuple of DataFrame and the omitted group
    """

    def __init__(self, dataset_or_model, group, name_pattern='omitted_{}'):
        df = self._retrieve_dataset(dataset_or_model)
        self._unique_groups = df[group].unique()
        if len(self._unique_groups) == 1:
            raise ValueError("Cannot create an Omit iterator as the number of unique groups is 1.")
        self._df = df
        self._group = group
        super().__init__(len(self._unique_groups), name_pattern=name_pattern)

    def __next__(self):
        self._check_exhausted()
        df = self._df
        next_group = self._unique_groups[self._next - 1]
        new_df = df[df[self._group] != next_group]
        self._prepare_next(new_df)
        return self._combine_dataset(new_df), next_group


[docs] def omit_data( dataset_or_model: Union[pd.DataFrame, Model], group: str, name_pattern: str = 'omitted_{}' ): """Iterate over omissions of a certain group in a dataset. One group is omitted at a time. Parameters ---------- dataset_or_model : pd.DataFrame or Model Dataset or model for which to omit records group : str Name of the column to use for grouping name_pattern : str Name to use for generated datasets. A number starting from 1 will be put in the placeholder. Returns ------- iterator Iterator yielding tuples of models/dataframes and the omitted group """ return Omit(dataset_or_model, group, name_pattern)
class Resample(DatasetIterator): """Iterate over resamples of a dataset. The dataset will be grouped on the group column then groups will be selected randomly with or without replacement to form a new dataset. The groups will be renumbered from 1 and upwards to keep them separated in the new dataset. Stratification will make sure that :param DataFrame df: DataFrame to iterate over :param colname group: Name of column to group by :param Int resamples: Number of resamples (iterations) to make :param colname stratify: Name of column to use for stratification. The values in the stratification column must be equal within a group so that the group can be uniquely determined. A ValueError exception will be raised otherwise. :param Int sample_size: The number of groups that should be sampled. The default is the number of groups. If using stratification the default is to sample using the proportion of the strata in the dataset. A dictionary of specific sample sizes for each stratum can also be supplied. :param bool replace: A boolean controlling whether sampling should be done with or without replacement :param name_pattern: Name to use for generated datasets. A number starting from 1 will be put in the placeholder. :returns: A tuple of a resampled DataFrame and a list of resampled groups in order """ def __init__( self, dataset_or_model, group, resamples=1, stratify=None, sample_size=None, replace=False, name_pattern='resample_{}', name=None, ): df = self._retrieve_dataset(dataset_or_model) unique_groups = df[group].unique() numgroups = len(unique_groups) if sample_size is None: sample_size = numgroups if stratify: # Default is to use proportions in dataset stratas = df.groupby(stratify)[group].unique() have_mult_sample_sizes = isinstance(sample_size, Mapping) if not have_mult_sample_sizes: non_rounded_sample_sizes = stratas.apply( lambda x: (len(x) / numgroups) * sample_size ) rounded_sample_sizes = round_and_keep_sum(non_rounded_sample_sizes, sample_size) sample_size_dict = dict(rounded_sample_sizes) # strata: numsamples else: sample_size_dict = sample_size stratas = dict(stratas) # strata: list of groups else: sample_size_dict = {1: sample_size} stratas = {1: unique_groups} # Check that we will not run out of samples without replacement. if not replace: for strata in sample_size_dict: if sample_size_dict[strata] > len(stratas[strata]): if stratify: raise ValueError( f'The sample size ({sample_size_dict[strata]}) for strata {strata} is ' f'larger than the number of groups ({len(stratas[strata])}) in that ' f'strata which is impossible with replacement.' ) else: raise ValueError( f'The sample size ({sample_size_dict[strata]}) is larger than the ' f'number of groups ({len(stratas[strata])}) which is impossible with ' f'replacement.' ) self._df = df self._group = group self._replace = replace self._stratas = stratas self._sample_size_dict = sample_size_dict if resamples > 1 and name: warnings.warn( f'One name was provided despite having multiple resamples, falling back to ' f'name pattern: {name_pattern}' ) self._name = None else: self._name = name super().__init__(resamples, name_pattern=name_pattern) def __next__(self): self._check_exhausted() random_groups = [] for strata in self._sample_size_dict: random_groups += np.random.choice( self._stratas[strata], size=self._sample_size_dict[strata], replace=self._replace ).tolist() new_df = pd.DataFrame() # Build the dataset given the random_groups list for grp_id, new_grp in zip(random_groups, range(1, len(random_groups) + 1)): sub = self._df.loc[self._df[self._group] == grp_id].copy() sub[self._group] = new_grp new_df = pd.concat([new_df, sub]) new_df.reset_index(inplace=True, drop=True) if self._name: new_df.name = self._name else: self._prepare_next(new_df) return self._combine_dataset(new_df), random_groups
[docs] def resample_data( dataset_or_model: Union[pd.DataFrame, Model], group: str, resamples: int = 1, stratify: Optional[str] = None, sample_size: Optional[int] = None, replace: bool = False, name_pattern: str = 'resample_{}', name: Optional[str] = None, ): """Iterate over resamples of a dataset. The dataset will be grouped on the group column then groups will be selected randomly with or without replacement to form a new dataset. The groups will be renumbered from 1 and upwards to keep them separated in the new dataset. Parameters ---------- dataset_or_model : pd.DataFrame or Model Dataset or Model to use group : str Name of column to group by resamples : int Number of resamples (iterations) to make stratify : str Name of column to use for stratification. The values in the stratification column must be equal within a group so that the group can be uniquely determined. A ValueError exception will be raised otherwise. sample_size : int The number of groups that should be sampled. The default is the number of groups. If using stratification the default is to sample using the proportion of the strata in the dataset. A dictionary of specific sample sizes for each stratum can also be supplied. replace : bool A boolean controlling whether sampling should be done with or without replacement name_pattern : str Name to use for generated datasets. A number starting from 1 will be put in the placeholder. name : str Option to name pattern in case of only one resample Returns ------- iterator An iterator yielding tuples of a resampled DataFrame and a list of resampled groups in order """ return Resample( dataset_or_model, group, resamples=resamples, stratify=stratify, sample_size=sample_size, replace=replace, name_pattern=name_pattern, name=name, )