Source code for pharmpy.data.data_frame

import enum
from pathlib import Path

import numpy as np
import pandas as pd

import pharmpy.data


[docs]class DatasetError(Exception): pass
[docs]class DatasetWarning(Warning): pass
[docs]class ColumnType(enum.Enum): """The type of the data in a column""" UNKNOWN = enum.auto() ID = enum.auto() IDV = enum.auto() DV = enum.auto() COVARIATE = enum.auto() DOSE = enum.auto() EVENT = enum.auto() def __repr__(self): return f'{self.__class__.__name__}.{self.name}' @property def max_one(self): """Can this ColumnType only be assigned to at most one column?""" return self in (ColumnType.ID, ColumnType.IDV, ColumnType.DOSE, ColumnType.EVENT)
[docs]class PharmDataFrame(pd.DataFrame): """A DataFrame with additional metadata. Each column can have a ColumnType. The default ColumnType is UNKNOWN. ============ ============= ColumnType Description ============ ============= ID Individual identifier. Max one per DataFrame. All values have to be unique IDV Independent variable. Max one per DataFrame. DV Dependent variable COVARIATE Covariate DOSE Dose amount EVENT 0 = observation UNKOWN Unkown type. This will be the default for columns that hasn't been assigned a type ============ ============= """ _metadata = ['_column_types', 'name'] @property def _constructor(self): return PharmDataFrame @property def _constructor_sliced(self): return pd.Series def __deepcopy__(self, memo): return self.copy()
[docs] def copy(self, *kwargs): """ """ # FIXME: Set empty docstring to avoid getting documentation from base class # would like sphinx to do this so that in object docstring is kept. new_df = super().copy(*kwargs) try: new_df._column_types = self._column_types.copy() except AttributeError: pass return new_df
[docs] def to_json(self, **kwargs): """ """ # FIXME: Same docstring issue as for copy # FIXME: Directly using to_json on PharmDataFrame doesn't work return pd.DataFrame(self).to_json(**kwargs)
[docs]class ColumnTypeIndexer: """Indexing a PharmDataFrame to get or set column types An instance of ColumnTypeIndexer can be retrieved by df.pharmpy.column_type """ def __init__(self, df): self._obj = df def _set_one_column_type(self, label, tp): if label not in self._obj.columns: raise KeyError(str(label)) if ( tp.max_one and hasattr(self._obj, '_column_types') and tp in self._obj._column_types.values() ): raise KeyError(f'Only one column of type {tp} is allowed in a PharmDataFrame.') try: self._obj._column_types[label] = tp except AttributeError: self._obj._column_types = {} self._obj._column_types[label] = tp def __setitem__(self, ind, tp): if isinstance(ind, str): self._set_one_column_type(ind, tp) else: if hasattr(tp, '__len__') and not len(tp) == 1: if len(ind) == len(tp): for label, one_tp in zip(ind, tp): self._set_one_column_type(label, one_tp) else: raise ValueError(f'Cannot set {len(ind)} columns using {len(tp)} column types') else: # Broadcasting of tp try: len(ind) except TypeError: ind = [ind] for label in ind: self._set_one_column_type(label, tp) def _get_one_column_type(self, label): """Get the column type of one column""" if label in self._obj.columns: try: d = self._obj._column_types except AttributeError: return ColumnType.UNKNOWN try: return d[label] except KeyError: return ColumnType.UNKNOWN else: raise KeyError(str(label)) def __getitem__(self, ind): if isinstance(ind, str): return self._get_one_column_type(ind) else: try: return [self._get_one_column_type(label) for label in ind] except TypeError: return self._get_one_column_type(ind)
[docs]class LabelsByTypeIndexer: """Indexing a PharmDataFrame to get labels from ColumnTypes""" def __init__(self, acc): self._acc = acc def _get_one_label(self, tp): labels = self._get_many_labels(tp) if len(labels) == 0: raise KeyError(str(tp)) elif len(labels) > 1: raise KeyError('Did not expect more than one ' + str(tp)) else: return labels def _get_many_labels(self, column_type): """Will raise if no columns of the type exists Always returns a list of labels """ return [ label for label in self._acc._obj.columns if self._acc.column_type[label] == column_type ] def __getitem__(self, tp): try: len(tp) labels = [] for t in tp: if t.max_one: labels.extend(self._get_one_label(t)) else: labels.extend(self._get_many_labels(t)) return labels except TypeError: if tp.max_one: return self._get_one_label(tp) else: return self._get_many_labels(tp)
[docs]@pd.api.extensions.register_dataframe_accessor('pharmpy') class DataFrameAccessor: def __init__(self, obj): self._obj = obj @property def column_type(self): return ColumnTypeIndexer(self._obj) @property def labels_by_type(self): return LabelsByTypeIndexer(self) @property def id_label(self): """Return the label of the id column""" return self.labels_by_type[ColumnType.ID][0] @property def idv_label(self): """Return the label of the idv column""" return self.labels_by_type[ColumnType.IDV][0] @property def dv_label(self): """Return the label of the dv column""" return self.labels_by_type[ColumnType.DV][0] @property def ids(self): """Return the ids in the dataset""" return self._obj[self.id_label].unique() @property def time_varying_covariates(self): """Return a list of labels for all time varying covariates""" cov_labels = self.labels_by_type[ColumnType.COVARIATE] if len(cov_labels) == 0: return [] else: time_var = self._obj.groupby(by=self.id_label)[cov_labels].nunique().gt(1).any() return list(time_var.index[time_var]) @property def baselines(self): """Baselines for each id. Baseline is taken to be the first row even if that has a missing value. """ idlab = self.id_label return self._obj.groupby(idlab).nth(0) @property def covariate_baselines(self): """Return a dataframe with baselines of all covariates for each id. Baseline is taken to be the first row even if that has a missing value. """ covariates = self.labels_by_type[ColumnType.COVARIATE] idlab = self.id_label df = self._obj[covariates + [idlab]] df.set_index(idlab, inplace=True) return df.groupby(idlab).nth(0) @property def observations(self): """Return a series with observations. Indexed with ID and TIME""" try: label = self.labels_by_type[ColumnType.EVENT] except KeyError: try: label = self.labels_by_type[ColumnType.DOSE] except KeyError: raise DatasetError('Could not identify observation rows in dataset') label = label[0] idcol = self.id_label idvcol = self.idv_label df = self._obj.query(f'{label} == 0') if df.empty: df = self._obj.astype({label: 'float'}) df = df.query(f'{label} == 0') df = df[[idcol, idvcol, self.dv_label]] df = df.astype({idvcol: np.float64}) df.set_index([idcol, idvcol], inplace=True) return df.squeeze() @property def ninds(self): return len(self.ids) @property def nobs(self): return len(self.observations) @property def nobsi(self): return self.observations.groupby(self.id_label).count() @property def doses(self): """Return a series with all doses. Indexed with ID and TIME""" try: label = self.labels_by_type[ColumnType.DOSE] except KeyError: raise DatasetError('Could not identify dosing rows in dataset') label = label[0] idcol = self.id_label idvcol = self.idv_label df = self._obj.query(f'{label} != 0') df = df[[idcol, idvcol, label]] df.set_index([idcol, idvcol], inplace=True) return df.squeeze()
[docs] def add_doseid(self): """Add a column DOSEID with id of each dose period starting from 1""" try: dose = self.labels_by_type[ColumnType.DOSE] except KeyError: raise DatasetError('Could not identify dosing rows in dataset') df = self._obj df['DOSEID'] = df[dose] df.loc[df['DOSEID'] > 0, 'DOSEID'] = 1 df['DOSEID'] = df['DOSEID'].astype(int) df['DOSEID'] = df.groupby(self.id_label)['DOSEID'].cumsum()
[docs] def add_time_after_dose(self): """Calculate and add a TAD column to the dataset""" # FIXME: Should not rely on name here. Use coltypes for TAD and DOSEID # FIXME: TIME is converted to float. Should be handled when reading in dataset df = self._obj if 'DOSEID' in df.columns: had_doseid = True else: self.add_doseid() had_doseid = False idv = self.idv_label idlab = self.id_label df[idv] = df[idv].astype(np.float64) df['TAD'] = df.groupby([idlab, 'DOSEID'])[idv].diff().fillna(0) df['TAD'] = df.groupby([idlab, 'DOSEID'])['TAD'].cumsum() if not had_doseid: df.drop('DOSEID', axis=1, inplace=True)
[docs] def concentration_parameters(self): """Create a dataframe with concentration parameters""" df = self._obj.copy() df.pharmpy.add_doseid() df.pharmpy.add_time_after_dose() idlab = self.id_label dv = self.dv_label noobs = df.groupby([idlab, 'DOSEID']).size() == 1 idx = df.groupby([idlab, 'DOSEID'])[dv].idxmax() params = df.loc[idx].set_index([idlab, 'DOSEID']) params = params[[dv, 'TAD']] params.rename(columns={dv: 'Cmax', 'TAD': 'Tmax'}, inplace=True) params.loc[noobs] = np.nan grpind = df.groupby(['ID', 'DOSEID']).indices keep = [] for ind, rows in grpind.items(): index = idx.loc[ind] p = params.loc[ind] if not np.isnan(p['Tmax']): keep += [row for row in rows if row > index] minidx = df.iloc[keep].groupby([idlab, 'DOSEID'])[dv].idxmin() params2 = df.loc[minidx].set_index([idlab, 'DOSEID']) params2 = params2[[dv, 'TAD']] params2.rename(columns={dv: 'Cmin', 'TAD': 'Tmin'}, inplace=True) res = params.join(params2) return res
[docs] def generate_path(self, path=None, force=False): """Generate the path of dataframe if written. If no path is supplied or does not contain a filename a name is created from the name property of the PharmDataFrame. Will not overwrite unless forced. """ if path is None: path = Path("") else: path = Path(path) if not path or path.is_dir(): try: filename = f'{self._obj.name}.csv' except AttributeError: raise ValueError( 'Cannot name data file as no path argument was supplied and ' 'the DataFrame has no name property.' ) path /= filename if not force and path.exists(): raise FileExistsError(f'File at {path} already exists.') return path
[docs] def write_csv(self, path=None, force=False): """Write PharmDataFrame to a csv file return path for the written file """ path = self.generate_path(path, force) self._obj.to_csv(path, na_rep=pharmpy.data.conf.na_rep, index=False) return path