from __future__ import annotations
import re
import warnings
from pathlib import Path
from typing import Literal, Optional, Union
from pharmpy.basic import Expr, Unit
from pharmpy.deps import numpy as np
from pharmpy.deps import pandas as pd
from pharmpy.deps import sympy
from pharmpy.deps.rich import box as rich_box
from pharmpy.deps.rich import console as rich_console
from pharmpy.deps.rich import table as rich_table
from pharmpy.internals.fs.path import normalize_user_given_path, path_absolute
from pharmpy.model import ColumnInfo, CompartmentalSystem, DataInfo, DatasetError, Model
from pharmpy.model.model import update_datainfo
from .iterators import resample_data
[docs]
def get_ids(model: Model) -> list[int]:
"""Retrieve a list of all subject ids of the dataset
Parameters
----------
model : Model
Pharmpy model
Returns
-------
list
All subject ids
Example
-------
>>> from pharmpy.modeling import load_example_model, get_ids
>>> model = load_example_model("pheno")
>>> get_ids(model) # doctest: +ELLIPSIS
[1, 2, 3, ..., 57, 58, 59]
"""
idcol = model.datainfo.id_column.name
ids = list(int(x) for x in model.dataset[idcol].unique())
return ids
[docs]
def get_number_of_individuals(model: Model):
"""Retrieve the number of individuals in the model dataset
Parameters
----------
model : Model
Pharmpy model
Returns
-------
int
Number of individuals in the model dataset
Examples
--------
>>> from pharmpy.modeling import get_number_of_individuals, load_example_model
>>> model = load_example_model("pheno")
>>> get_number_of_individuals(model)
59
Notes
-----
For NONMEM models this is the number of individuals of the active dataset, i.e. after filtering
of IGNORE and ACCEPT and removal of individuals with no observations.
See also
--------
get_number_of_observations : Get the number of observations in a dataset
get_number_of_observations_per_individual : Get the number of observations per individual in a
dataset
"""
return len(get_ids(model))
[docs]
def get_number_of_observations(model: Model):
"""Retrieve the total number of observations in the model dataset
Parameters
----------
model : Model
Pharmpy model
Returns
-------
int
Number of observations in the model dataset
Examples
--------
>>> from pharmpy.modeling import get_number_of_observations, load_example_model
>>> model = load_example_model("pheno")
>>> get_number_of_observations(model)
155
Notes
-----
For NONMEM models this is the number of observations of the active dataset, i.e. after filtering
of IGNORE and ACCEPT and removal of individuals with no observations.
See also
--------
get_number_of_individuals : Get the number of individuals in a dataset
get_number_of_observations_per_individual : Get the number of observations per individual in a
dataset
"""
return len(get_observations(model))
[docs]
def get_number_of_observations_per_individual(model: Model):
"""Number of observations for each individual
Parameters
----------
model : Model
Pharmpy model
Returns
-------
pd.Series
Number of observations in the model dataset
Examples
--------
>>> from pharmpy.modeling import get_number_of_observations_per_individual, load_example_model
>>> model = load_example_model("pheno")
>>> get_number_of_observations_per_individual(model)
ID
1 2
2 3
3 3
4 3
5 3
6 3
7 3
8 3
9 4
10 3
11 1
12 3
13 2
14 4
15 2
16 3
17 3
18 4
19 3
20 3
21 3
22 2
23 3
24 3
25 6
26 2
27 2
28 1
29 1
30 2
31 1
32 3
33 2
34 2
35 2
36 3
37 2
38 4
39 3
40 2
41 3
42 2
43 1
44 3
45 3
46 1
47 1
48 5
49 3
50 4
51 3
52 3
53 2
54 4
55 1
56 1
57 2
58 3
59 3
Name: observation_count, dtype: int64
Notes
-----
For NONMEM models this is the individuals and number of observations of the active dataset, i.e.
after filtering of IGNORE and ACCEPT and removal of individuals with no observations.
See also
--------
get_number_of_individuals : Get the number of individuals in a dataset
get_number_of_observations_per_individual : Get the number of observations per individual in a
dataset
"""
ser = get_observations(model).groupby(model.datainfo.id_column.name).count()
ser.name = "observation_count"
return ser
[docs]
def get_observations(model: Model, keep_index: bool = False) -> pd.Series:
"""Get observations from dataset
Parameters
----------
model : Model
Pharmpy model
keep_index : bool
Set to True if the original index should be kept.
Otherwise a new index using ID and idv will be created.
Returns
-------
pd.Series
Observations indexed over ID and TIME
Examples
--------
>>> from pharmpy.modeling import get_observations, load_example_model
>>> model = load_example_model("pheno")
>>> get_observations(model)
ID TIME
1 2.0 17.3
112.5 31.0
2 2.0 9.7
63.5 24.6
135.5 33.0
...
58 47.5 27.9
131.8 31.0
59 1.8 22.6
73.8 34.3
146.8 40.2
Name: DV, Length: 155, dtype: float64
See also
--------
get_number_of_observations : get the number of observations
get_number_of_observations_per_individual : get the number of observations per individual
"""
try:
label = model.datainfo.typeix['mdv'][0].name
except IndexError:
try:
label = model.datainfo.typeix['event'][0].name
except IndexError:
try:
label = model.datainfo.typeix['dose'][0].name
except IndexError:
label = None # All data records are observations
idcol = model.datainfo.id_column.name
idvcol = model.datainfo.idv_column.name
dvcol = model.datainfo.dv_column.name
if label:
df = model.dataset.query(f'{label} == 0')
if df.empty:
df = model.dataset.astype({label: 'float'})
df = df.query(f'{label} == 0')
else:
df = model.dataset.copy()
if not keep_index:
df = df[[idcol, idvcol, dvcol]]
try:
# FIXME: This shouldn't be needed
df = df.astype({idvcol: np.float64})
except ValueError:
# TIME could not be converted to float (e.g. 10:15)
pass
df.set_index([idcol, idvcol], inplace=True)
df = df.squeeze()
else:
df = df[dvcol]
return df
[docs]
def get_baselines(model: Model):
"""Baselines for each subject.
Baseline is taken to be the first row even if that has a missing value.
Parameters
----------
model : Model
Pharmpy model
Returns
-------
pd.DataFrame
Dataset with the baselines
Examples
--------
>>> from pharmpy.modeling import load_example_model, get_baselines
>>> model = load_example_model("pheno")
>>> get_baselines(model)
TIME AMT WGT APGR DV FA1 FA2
ID
1 0.0 25.0 1.4 7.0 0.0 1.0 1.0
2 0.0 15.0 1.5 9.0 0.0 1.0 1.0
3 0.0 30.0 1.5 6.0 0.0 1.0 1.0
4 0.0 18.6 0.9 6.0 0.0 1.0 1.0
5 0.0 27.0 1.4 7.0 0.0 1.0 1.0
6 0.0 24.0 1.2 5.0 0.0 1.0 1.0
7 0.0 19.0 1.0 5.0 0.0 1.0 1.0
8 0.0 24.0 1.2 7.0 0.0 1.0 1.0
9 0.0 27.0 1.4 8.0 0.0 1.0 1.0
10 0.0 27.0 1.4 7.0 0.0 1.0 1.0
11 0.0 24.0 1.2 7.0 0.0 1.0 1.0
12 0.0 26.0 1.3 6.0 0.0 1.0 1.0
13 0.0 11.0 1.1 6.0 0.0 1.0 1.0
14 0.0 22.0 1.1 7.0 0.0 1.0 1.0
15 0.0 26.0 1.3 7.0 0.0 1.0 1.0
16 0.0 12.0 1.2 9.0 0.0 1.0 1.0
17 0.0 22.0 1.1 5.0 0.0 1.0 1.0
18 0.0 20.0 1.0 5.0 0.0 1.0 1.0
19 0.0 10.0 1.0 1.0 0.0 1.0 1.0
20 0.0 24.0 1.2 6.0 0.0 1.0 1.0
21 0.0 17.5 1.8 7.0 0.0 1.0 1.0
22 0.0 15.0 1.5 8.0 0.0 1.0 1.0
23 0.0 60.0 3.1 3.0 0.0 1.0 1.0
24 0.0 63.0 3.2 2.0 0.0 1.0 1.0
25 0.0 15.0 0.7 1.0 0.0 1.0 1.0
26 0.0 70.0 3.5 9.0 0.0 1.0 1.0
27 0.0 35.0 1.9 5.0 0.0 1.0 1.0
28 0.0 60.0 3.2 9.0 0.0 1.0 1.0
29 0.0 20.0 1.0 7.0 0.0 1.0 1.0
30 0.0 18.0 1.8 8.0 0.0 1.0 1.0
31 0.0 30.0 1.4 8.0 0.0 1.0 1.0
32 0.0 70.0 3.6 9.0 0.0 1.0 1.0
33 0.0 17.0 1.7 8.0 0.0 1.0 1.0
34 0.0 34.0 1.7 4.0 0.0 1.0 1.0
35 0.0 25.0 2.5 5.0 0.0 1.0 1.0
36 0.0 30.0 1.5 5.0 0.0 1.0 1.0
37 0.0 24.0 1.2 9.0 0.0 1.0 1.0
38 0.0 26.0 1.3 8.0 0.0 1.0 1.0
39 0.0 56.0 1.9 10.0 0.0 1.0 1.0
40 0.0 19.0 1.1 3.0 0.0 1.0 1.0
41 0.0 34.0 1.7 7.0 0.0 1.0 1.0
42 0.0 28.0 2.8 9.0 0.0 1.0 1.0
43 0.0 18.0 0.9 1.0 0.0 1.0 1.0
44 0.0 14.0 1.4 7.0 0.0 1.0 1.0
45 0.0 16.0 0.8 2.0 0.0 1.0 1.0
46 0.0 11.0 1.1 8.0 0.0 1.0 1.0
47 0.0 40.0 2.6 9.0 0.0 1.0 1.0
48 0.0 14.0 0.7 8.0 0.0 1.0 1.0
49 0.0 26.0 1.3 8.0 0.0 1.0 1.0
50 0.0 20.0 1.1 6.0 0.0 1.0 1.0
51 0.0 18.0 0.9 9.0 0.0 1.0 1.0
52 0.0 9.5 0.9 7.0 0.0 1.0 1.0
53 0.0 17.0 1.7 8.0 0.0 1.0 1.0
54 0.0 18.0 1.8 8.0 0.0 1.0 1.0
55 0.0 25.0 1.1 4.0 0.0 1.0 1.0
56 0.0 12.0 0.6 4.0 0.0 1.0 1.0
57 0.0 20.0 2.1 6.0 0.0 1.0 1.0
58 0.0 14.0 1.4 8.0 0.0 1.0 1.0
59 0.0 22.8 1.1 6.0 0.0 1.0 1.0
"""
idlab = model.datainfo.id_column.name
baselines = model.dataset.groupby(idlab).nth(0).set_index(idlab)
return baselines
[docs]
def set_covariates(model: Model, covariates: list[str]):
"""Set columns in the dataset to be covariates in the datainfo
Parameters
----------
model : Model
Pharmpy model
covariates : list
List of column names
Returns
-------
Model
Pharmpy model object
"""
di = model.datainfo
newcols = []
for col in di:
if col.name in covariates:
newcol = col.replace(type='covariate')
newcols.append(newcol)
else:
newcols.append(col)
model = model.replace(datainfo=di.replace(columns=newcols))
return model.update_source()
[docs]
def set_dvid(model: Model, name: str):
"""Set a column to act as DVID. Replace DVID if one is already set.
Parameters
----------
model : Model
Pharmpy model
name : str
Name of DVID column
Returns
-------
Model
Pharmpy model object
"""
di = model.datainfo
col = di[name]
if col.type == 'dvid':
return model
try:
curdvid = di.typeix['dvid'][0]
except IndexError:
pass
else:
curdvid = curdvid.replace(type='unknown')
di = di.set_column(curdvid)
col = col.replace(
type='dvid',
unit=1,
scale='nominal',
continuous=False,
drop=False,
descriptor='observation identifier',
)
df = model.dataset
if not col.is_integer():
ser = df[name]
converted = pd.to_numeric(ser, downcast='integer')
if not pd.api.types.is_integer_dtype(converted):
raise ValueError(
f"Could not use column {name} as DVID because it contains non-integral values"
)
df = df.assign(**{name: converted})
col = col.replace(datatype=ColumnInfo.convert_pd_dtype_to_datatype(converted.dtype))
new_dataset = True
else:
new_dataset = False
col = col.replace(categories=sorted(df[name].unique()))
di = di.set_column(col)
if new_dataset:
model = model.replace(datainfo=di, dataset=df)
else:
model = model.replace(datainfo=di)
return model.update_source()
[docs]
def get_covariate_baselines(model: Model):
"""Return a dataframe with baselines of all covariates for each id.
Baseline is taken to be the first row even if that has a missing value.
Parameters
----------
model : Model
Pharmpy model
Returns
-------
pd.DataFrame
covariate baselines
See also
--------
get_baselines : baselines for all data columns
Example
-------
>>> from pharmpy.modeling import load_example_model, get_covariate_baselines, set_covariates
>>> model = load_example_model("pheno")
>>> model = set_covariates(model, ["WGT", "APGR"])
>>> get_covariate_baselines(model)
WGT APGR
ID
1 1.4 7.0
2 1.5 9.0
3 1.5 6.0
4 0.9 6.0
5 1.4 7.0
6 1.2 5.0
7 1.0 5.0
8 1.2 7.0
9 1.4 8.0
10 1.4 7.0
11 1.2 7.0
12 1.3 6.0
13 1.1 6.0
14 1.1 7.0
15 1.3 7.0
16 1.2 9.0
17 1.1 5.0
18 1.0 5.0
19 1.0 1.0
20 1.2 6.0
21 1.8 7.0
22 1.5 8.0
23 3.1 3.0
24 3.2 2.0
25 0.7 1.0
26 3.5 9.0
27 1.9 5.0
28 3.2 9.0
29 1.0 7.0
30 1.8 8.0
31 1.4 8.0
32 3.6 9.0
33 1.7 8.0
34 1.7 4.0
35 2.5 5.0
36 1.5 5.0
37 1.2 9.0
38 1.3 8.0
39 1.9 10.0
40 1.1 3.0
41 1.7 7.0
42 2.8 9.0
43 0.9 1.0
44 1.4 7.0
45 0.8 2.0
46 1.1 8.0
47 2.6 9.0
48 0.7 8.0
49 1.3 8.0
50 1.1 6.0
51 0.9 9.0
52 0.9 7.0
53 1.7 8.0
54 1.8 8.0
55 1.1 4.0
56 0.6 4.0
57 2.1 6.0
58 1.4 8.0
59 1.1 6.0
"""
covariates = model.datainfo.typeix['covariate'].names
idlab = model.datainfo.id_column.name
df = model.dataset[covariates + [idlab]]
df = df.set_index(idlab)
return df.groupby(idlab).nth(0)
[docs]
def list_time_varying_covariates(model: Model):
"""Return a list of names of all time varying covariates
Parameters
----------
model : Model
Pharmpy model
Returns
-------
list
Names of all time varying covariates
See also
--------
get_covariate_baselines : get baselines for all covariates
Example
-------
>>> from pharmpy.modeling import load_example_model, list_time_varying_covariates
>>> model = load_example_model("pheno")
>>> list_time_varying_covariates(model)
[]
"""
cov_labels = model.datainfo.typeix['covariate'].names
if len(cov_labels) == 0:
return []
else:
time_var = (
model.dataset.groupby(by=model.datainfo.id_column.name)[cov_labels]
.nunique()
.gt(1)
.any()
)
return list(time_var.index[time_var])
[docs]
def get_doses(model: Model):
"""Get a series of all doses
Indexed with ID and TIME
Parameters
----------
model : Model
Pharmpy model
Returns
-------
pd.Series
doses
Example
-------
>>> from pharmpy.modeling import load_example_model, get_doses
>>> model = load_example_model("pheno")
>>> get_doses(model)
ID TIME
1 0.0 25.0
12.5 3.5
24.5 3.5
37.0 3.5
48.0 3.5
...
59 96.0 3.0
108.3 3.0
120.5 3.0
132.3 3.0
144.8 3.0
Name: AMT, Length: 589, dtype: float64
"""
try:
label = model.datainfo.typeix['dose'][0].name
except IndexError:
raise DatasetError('Could not identify dosing rows in dataset')
idcol = model.datainfo.id_column.name
idvcol = model.datainfo.idv_column.name
df = model.dataset.query(f'{label} != 0')
df = df[[idcol, idvcol, label]]
try:
# FIXME: This shouldn't be needed
df = df.astype({idvcol: np.float64})
except ValueError:
# TIME could not be converted to float (e.g. 10:15)
pass
df.set_index([idcol, idvcol], inplace=True)
return df.squeeze()
[docs]
def expand_additional_doses(model: Model, flag: bool = False):
"""Expand additional doses into separate dose records
Parameters
----------
model : Model
Pharmpy model object
flag : bool
True to add a boolean EXPANDED column to mark added records. In this case all
columns in the original dataset will be kept. Care needs to be taken to handle
the new dataset.
Returns
-------
Model
Pharmpy model object
"""
try:
addl = model.datainfo.typeix['additional'][0].name
ii = model.datainfo.typeix['ii'][0].name
except IndexError:
return model
idv = model.datainfo.idv_column.name
idcol = model.datainfo.id_column.name
df = model.dataset.copy()
try:
event = model.datainfo.typeix['event'][0].name
except IndexError:
df['_RESETGROUP'] = 1.0
else:
df['_FLAG'] = df[event] >= 3
df['_RESETGROUP'] = df.groupby('ID')['_FLAG'].cumsum()
df.drop('_FLAG', axis=1, inplace=True)
def fn(a):
if a[addl] == 0:
times = [a[idv]]
expanded = [False]
else:
length = int(a[addl])
times = [a[ii] * x + a[idv] for x in range(length + 1)]
expanded = [False] + [True] * length
a['_TIMES'] = times
a['_EXPANDED'] = expanded
return a
df = df.apply(fn, axis=1)
df = df.apply(lambda x: x.explode() if x.name in ['_TIMES', '_EXPANDED'] else x)
df = df.astype({'_EXPANDED': np.bool_})
df = df.groupby([idcol, '_RESETGROUP'], group_keys=False)[df.columns].apply(
lambda x: x.sort_values(by='_TIMES', kind='stable')
)
df[idv] = df['_TIMES'].astype(np.float64)
df.drop(['_TIMES', '_RESETGROUP'], axis=1, inplace=True)
if flag:
df.rename(columns={'_EXPANDED': 'EXPANDED'}, inplace=True)
else:
df.drop([addl, ii, '_EXPANDED'], axis=1, inplace=True)
model = model.replace(dataset=df.reset_index(drop=True))
return model.update_source()
[docs]
def get_doseid(model: Model):
"""Get a DOSEID series from the dataset with an id of each dose period starting from 1
If a a dose and observation exist at the same time point the observation will be counted
towards the previous dose.
Parameters
----------
model : Model
Pharmpy model
Returns
-------
pd.Series
DOSEIDs
Examples
--------
>>> from pharmpy.modeling import load_example_model, get_doseid
>>> model = load_example_model("pheno")
>>> get_doseid(model) # doctest: +ELLIPSIS
0 1
1 1
2 2
3 3
4 4
..
739 10
740 11
741 12
742 13
743 13
Name: DOSEID, Length: 744, dtype: int...
"""
try:
dose = model.datainfo.typeix['dose'][0].name
except IndexError:
raise DatasetError('Could not identify dosing rows in dataset')
df = model.dataset.copy()
df['DOSEID'] = df[dose]
df.loc[df['DOSEID'] > 0, 'DOSEID'] = 1
df['DOSEID'] = df['DOSEID'].astype(int)
idcol = model.datainfo.id_column.name
df['DOSEID'] = df.groupby(idcol)['DOSEID'].cumsum()
# Adjust for dose and observation at the same time point
# Observation is moved to previous dose group
# Except for steady state dose where the dose group is kept
try:
eventcol = model.datainfo.typeix['event'][0].name
except IndexError:
df['_RESETGROUP'] = 1.0
else:
df['_FLAG'] = df[eventcol] >= 3
df['_RESETGROUP'] = df.groupby('ID')['_FLAG'].cumsum()
try:
ss = model.datainfo.typeix['ss'][0].name
except IndexError:
ss = None
idvcol = model.datainfo.idv_column.name
ser = df.groupby([idcol, idvcol, '_RESETGROUP']).size()
nonunique = ser[ser > 1]
for i, time, _ in nonunique.index:
groupind = df[(df[idcol] == i) & (df[idvcol] == time)].index
obsind = df[(df[idcol] == i) & (df[idvcol] == time) & (df[dose] == 0)].index
doseind = set(groupind) - set(obsind)
if not doseind:
continue
maxind = max(doseind)
for index in obsind:
if 0 in groupind: # This is the first dose
continue
if maxind > index: # Dose record is after the observation
continue
if ss and df.loc[maxind, ss] > 0: # No swap for SS dosing
continue
curdoseid = df.loc[index, 'DOSEID']
df.loc[index, 'DOSEID'] = curdoseid - 1
return df['DOSEID'].copy()
[docs]
def get_mdv(model: Model):
"""Get MDVs from dataset
Parameters
----------
model : Model
Pharmpy model
Returns
-------
pd.Series
MDVs
"""
found = False
for key in ['mdv', 'event', 'dose']:
try:
label = model.datainfo.typeix[key][0].name
found = True
break
except IndexError:
pass
else:
label = model.datainfo.dv_column.name
data = model.dataset[label].astype('float64').squeeze()
series = data.where(data == 0, other=1) if found else pd.Series(np.zeros(len(data)))
return series.astype('int32').rename('MDV')
[docs]
def get_evid(model: Model):
"""Get the evid from model dataset
If an event column is present this will be extracted otherwise
an evid column will be created.
Parameters
----------
model : Model
Pharmpy model
Returns
-------
pd.Series
EVID
"""
di = model.datainfo
try:
eventcols = di.typeix['event']
except IndexError:
pass
else:
return model.dataset[eventcols[0].name]
mdv = get_mdv(model)
return mdv.rename('EVID')
[docs]
def get_admid(model: Model):
"""Get the admid from model dataset
If an administration column is present this will be extracted otherwise
an admid column will be created based on the admids of the present doses.
This is dependent on the presence of a CMT column to be generated correctly.
When generated, admids of events in between doses is set to the last used
admid.
Parameters
----------
model : Model
Pharmpy model
Returns
-------
pd.Series
ADMID
"""
di = model.datainfo
try:
admidcols = di.typeix["admid"]
except IndexError:
pass
else:
return model.dataset[admidcols[0].name]
odes = model.statements.ode_system
names = odes.compartment_names
remap = {}
if isinstance(odes, CompartmentalSystem):
for dosing in odes.dosing_compartments:
remap[names.index(dosing.name) + 1] = dosing.doses[0].admid
adm = get_cmt(model)
adm = adm.replace(remap)
adm.name = "ADMID"
# Replace all observations with the previous admid type
current_admin = adm[0]
current_subject = model.dataset["ID"][0]
for i, data in enumerate(zip(get_evid(model), adm, model.dataset["ID"])):
event = data[0]
admin = data[1]
subject = data[2]
if current_subject == subject:
if event == 1:
current_admin = admin
if event != 1:
if current_admin is not None:
adm[i] = current_admin
else:
current_subject = subject
current_admin = admin
return adm
[docs]
def add_admid(model: Model):
"""
Add an admid column to the model dataset and datainfo. Dependent on the
presence of a CMT column in order to add admid correctly.
When generated, admids of events in between doses is set to the last used
admid.
Parameters
----------
model : Model
Pharmpy model
Returns
-------
model : Model
Pharmpy model
See also
--------
get_admid : Get or create an admid column
get_cmt : Get or create a cmt column
"""
di = model.datainfo
if "admid" not in di.types:
adm = get_admid(model)
dataset = model.dataset
dataset["ADMID"] = adm
di = update_datainfo(model.datainfo, dataset)
colinfo = di['ADMID'].replace(type='admid')
model = model.replace(datainfo=di.set_column(colinfo), dataset=dataset)
return model.update_source()
def set_admid(model: Model, column_name: str):
"""
Set the specified column in the dataset to the admid data type.
Parameters
----------
model : Model
Pharmpy model
column_name : str
name of column to set as admid
Returns
-------
model : Model
Pharmpy model
See also
--------
get_admid : Get or create an admid column
add_admid : Add an admid column to the dataset
"""
di = model.datainfo
colinfo = di[column_name].replace(type="admid")
model = model.replace(datainfo=di.set_column(colinfo))
return model.update_source()
[docs]
def get_cmt(model: Model):
"""Get the cmt (compartment) column from the model dataset
If a cmt column is present this will be extracted otherwise
a cmt column will be created. If created, multiple dose compartments are
dependent on the presence of an admid type column, otherwise, dose/non-dose
will be considered.
Parameters
----------
model : Model
Pharmpy model
Returns
-------
pd.Series
CMT
"""
di = model.datainfo
try:
cmtcols = di.typeix['compartment']
except IndexError:
pass
else:
return model.dataset[cmtcols[0].name]
# See if admid exist
try:
admidcols = di.typeix["admid"]
except IndexError:
# No admid found --> Assume dose/non-dose
odes = model.statements.ode_system
if isinstance(odes, CompartmentalSystem):
dosing = odes.dosing_compartments[0]
names = odes.compartment_names
dose_cmt = names.index(dosing.name) + 1
else:
dose_cmt = 1
cmt = get_evid(model)
cmt = cmt.replace({1: dose_cmt, 2: 0, 3: 0, 4: dose_cmt}) # Only consider dose/non-dose
cmt.name = "CMT"
return cmt
else:
admidcols = model.dataset[admidcols[0].name]
# Admid found -> convert to CMT based on doses
odes = model.statements.ode_system
names = odes.compartment_names
remap = {}
if isinstance(odes, CompartmentalSystem):
for dosing in odes.dosing_compartments:
if dosing == odes.central_compartment:
remap[2] = names.index(dosing.name) + 1
central_number = names.index(dosing.name) + 1
else:
remap[1] = names.index(dosing.name) + 1
admidcols = admidcols.replace(remap)
admidcols.loc[get_evid(model) == 0] = central_number
admidcols.name = "ADMID"
return admidcols
[docs]
def add_cmt(model: Model):
"""Add a CMT column to the model dataset and datainfo if not existed
In case of multiple doses, this method is dependent on the presence of an
admid column to correctly number each dose.
NOTE : Existing CMT is based on datainfo type being set to 'compartment'
and a column named 'CMT' can be replaced
Parameters
----------
model : Model
Pharmpy model
Returns
-------
model : Model
Pharmpy model
See also
--------
get_admid : Get or create an admid column
get_cmt : Get or create a cmt column
"""
di = model.datainfo
if "compartment" not in di.types:
cmt_name = "CMT"
cmt = get_cmt(model)
dataset = model.dataset
dataset[cmt_name] = cmt
di = update_datainfo(model.datainfo, dataset)
colinfo = di[cmt_name].replace(type='compartment')
model = model.replace(datainfo=di.set_column(colinfo), dataset=dataset)
return model.update_source()
[docs]
def add_time_after_dose(model: Model):
"""Calculate and add a TAD column to the dataset
Parameters
----------
model : Model
Pharmpy model
Returns
-------
Model
Pharmpy model object
Examples
--------
>>> from pharmpy.modeling import load_example_model, add_time_after_dose
>>> model = load_example_model("pheno")
>>> model = add_time_after_dose(model)
"""
try:
model.datainfo.descriptorix['time after dose']
except IndexError:
pass
else:
# Already have time after dose
return model
temp = translate_nmtran_time(model)
idv = temp.datainfo.idv_column.name
idlab = temp.datainfo.id_column.name
df = model.dataset.copy()
df['_NEWTIME'] = temp.dataset[idv]
try:
addl = temp.datainfo.typeix['additional'][0].name
except IndexError:
addl = None
else:
# FIXME: Temp workaround, should be canonicalized in Model.replace
di = update_datainfo(temp.datainfo, df)
new_idvcol = di.idv_column.replace(type='unknown')
new_timecol = di['_NEWTIME'].replace(type='idv')
di = di.set_column(new_idvcol).set_column(new_timecol)
temp = temp.replace(datainfo=di, dataset=df)
temp = expand_additional_doses(temp, flag=True)
df = temp.dataset
df['_DOSEID'] = get_doseid(temp)
# Sort in case DOSEIDs are non-increasing
df = (
df.groupby(idlab)[df.columns]
.apply(lambda x: x.sort_values(by=['_DOSEID'], kind='stable', ignore_index=True))
.reset_index(drop=True)
)
df['TAD'] = df.groupby([idlab, '_DOSEID'])['_NEWTIME'].diff().fillna(0.0)
df['TAD'] = df.groupby([idlab, '_DOSEID'])['TAD'].cumsum()
if addl:
df = df[~df['EXPANDED']].reset_index(drop=True)
df.drop(columns=['EXPANDED'], inplace=True)
df.drop(columns=['_NEWTIME', '_DOSEID'], inplace=True)
# FIXME: Temp workaround, should be canonicalized in Model.replace
di = update_datainfo(model.datainfo, df)
colinfo = di['TAD'].replace(descriptor='time after dose', unit=di[idv].unit)
model = model.replace(datainfo=di.set_column(colinfo), dataset=df)
return model.update_source()
[docs]
def get_concentration_parameters_from_data(model: Model):
"""Create a dataframe with concentration parameters
Note that all values are directly calculated from the dataset
Parameters
----------
model : Model
Pharmpy model object
Returns
-------
pd.DataFrame
Concentration parameters
Examples
--------
>>> from pharmpy.modeling import load_example_model, get_concentration_parameters_from_data
>>> model = load_example_model("pheno")
>>> get_concentration_parameters_from_data(model)
Cmax Tmax Cmin Tmin
ID DOSEID
1 1 17.3 2.0 NaN NaN
2 NaN NaN NaN NaN
3 NaN NaN NaN NaN
4 NaN NaN NaN NaN
5 NaN NaN NaN NaN
... ... ... ... ...
59 9 NaN NaN NaN NaN
10 NaN NaN NaN NaN
11 NaN NaN NaN NaN
12 NaN NaN NaN NaN
13 40.2 2.0 NaN NaN
<BLANKLINE>
[589 rows x 4 columns]
"""
model = add_time_after_dose(model)
doseid = get_doseid(model)
df = model.dataset.copy()
df['DOSEID'] = doseid
idlab = model.datainfo.id_column.name
dv = model.datainfo.dv_column.name
noobs = df.groupby([idlab, 'DOSEID']).size() == 1
idx = df.groupby([idlab, 'DOSEID'])[dv].idxmax()
params = df.loc[idx].set_index([idlab, 'DOSEID'])
params = params[[dv, 'TAD']]
params.rename(columns={dv: 'Cmax', 'TAD': 'Tmax'}, inplace=True)
params.loc[noobs] = np.nan
grpind = df.groupby(['ID', 'DOSEID']).indices
keep = []
for ind, rows in grpind.items():
index = idx.loc[ind]
p = params.loc[ind]
if not np.isnan(p['Tmax']):
keep += [row for row in rows if row > index]
minidx = df.iloc[keep].groupby([idlab, 'DOSEID'])[dv].idxmin()
params2 = df.loc[minidx].set_index([idlab, 'DOSEID'])
params2 = params2[[dv, 'TAD']]
params2.rename(columns={dv: 'Cmin', 'TAD': 'Tmin'}, inplace=True)
res = params.join(params2)
return res
[docs]
def drop_dropped_columns(model: Model):
"""Drop columns marked as dropped from the dataset
NM-TRAN date columns will not be dropped by this function
even if marked as dropped.
Columns not specified in the datainfo ($INPUT for NONMEM)
will also be dropped from the dataset.
Parameters
----------
model : Model
Pharmpy model object
Returns
-------
Model
Pharmpy model object
Example
-------
>>> from pharmpy.modeling import *
>>> model = load_example_model("pheno")
>>> model = drop_dropped_columns(model)
>>> list(model.dataset.columns)
['ID', 'TIME', 'AMT', 'WGT', 'APGR', 'DV', 'FA1', 'FA2']
See also
--------
drop_columns : Drop specific columns or mark them as drop
"""
datainfo = model.datainfo
todrop = [
colname
for colname in datainfo.names
if datainfo[colname].drop and datainfo[colname].datatype != 'nmtran-date'
]
todrop += list(set(model.dataset.columns) - set(datainfo.names))
model = drop_columns(model, todrop)
return model.update_source()
[docs]
def drop_columns(model: Model, column_names: Union[list[str], str], mark: bool = False):
"""Drop columns from the dataset or mark as dropped
Parameters
----------
model : Model
Pharmpy model object
column_names : list or str
List of column names or one column name to drop or mark as dropped
mark : bool
Default is to remove column from dataset. Set this to True to only mark as dropped
Returns
-------
Model
Pharmpy model object
Example
-------
>>> from pharmpy.modeling import *
>>> model = load_example_model("pheno")
>>> model = drop_columns(model, ['WGT', 'APGR'])
>>> list(model.dataset.columns)
['ID', 'TIME', 'AMT', 'DV', 'FA1', 'FA2']
See also
--------
drop_dropped_columns : Drop all columns marked as drop
undrop_columns : Undrop columns of model
"""
if isinstance(column_names, str):
column_names = [column_names]
di = model.datainfo
newcols, to_drop = [], []
for col in di:
if col.name in column_names:
if mark:
newcol = col.replace(drop=True)
newcols.append(newcol)
else:
to_drop.append(col.name)
else:
newcols.append(col)
replace_dict = {'datainfo': di.replace(columns=newcols)}
if to_drop:
df = model.dataset.copy()
replace_dict['dataset'] = df.drop(to_drop, axis=1)
model = model.replace(**replace_dict)
return model.update_source()
[docs]
def undrop_columns(model: Model, column_names: Union[list[str], str]):
"""Undrop columns of model
Parameters
----------
model : Model
Pharmpy model object
column_names : list or str
List of column names or one column name to undrop
Returns
-------
Model
Pharmpy model object
Example
-------
>>> from pharmpy.modeling import *
>>> model = load_example_model("pheno")
>>> model = drop_columns(model, ['WGT', 'APGR'], mark=True)
>>> model = undrop_columns(model, 'WGT')
See also
--------
drop_dropped_columns : Drop all columns marked as drop
drop_columns : Drop or mark columns as dropped
"""
if isinstance(column_names, str):
column_names = [column_names]
di = model.datainfo
newcols = []
for col in di:
if col.name in column_names:
newcol = col.replace(drop=False)
newcols.append(newcol)
else:
newcols.append(col)
model = model.replace(datainfo=di.replace(columns=newcols))
return model.update_source()
def _translate_nonmem_time_value(time):
if ':' in time:
components = time.split(':')
if len(components) != 2:
raise DatasetError(f'Bad TIME format: {time}')
hours = float(components[0]) + float(components[1]) / 60
return hours
else:
return float(time)
def _translate_time_column(df, timecol, idcol):
if df[timecol].dtype != np.float64:
df[timecol] = df[timecol].apply(_translate_nonmem_time_value)
df[timecol] = df[timecol] - df.groupby(idcol)[timecol].transform('first')
return df
def _translate_nonmem_time_and_date_value(ser, timecol, datecol):
timeval = _translate_nonmem_time_value(ser[timecol])
date = ser[datecol]
a = re.split(r'[^0-9]', date)
if date.startswith('-') or len(a) == 1:
return timeval + float(date) * 24
elif len(a) == 2:
year = 2001 # Non leap year
month = a[1]
day = a[0]
elif len(a) == 3:
if datecol.endswith('E'):
month = a[0]
day = a[1]
year = a[2]
elif datecol.endswith('1'):
day = a[0]
month = a[1]
year = a[2]
elif datecol.endswith('3'):
year = a[0]
day = a[1]
month = a[2]
else: # Let DAT2 be default if other name
year = a[0]
month = a[1]
day = a[2]
if len(year) < 3:
year = int(year)
if year > 50:
year += 1900
else:
year += 2000
else:
year = int(year)
month = int(month)
day = int(day)
hour = int(timeval)
timeval = (timeval - hour) * 60
minute = int(timeval)
timeval = (timeval - minute) * 60
second = int(timeval)
timeval = (timeval - second) * 1000000
microsecond = int(timeval)
timeval = (timeval - microsecond) * 1000
nanosecond = int(timeval)
ts = pd.Timestamp(
year=year,
month=month,
day=day,
hour=hour,
minute=minute,
second=second,
microsecond=microsecond,
nanosecond=nanosecond,
)
return ts
else:
raise DatasetError(f'Bad DATE value: {date}')
def _translate_time_and_date_columns(df, timecol, datecol, idcol):
df[timecol] = df.apply(
_translate_nonmem_time_and_date_value, axis=1, timecol=timecol, datecol=datecol
)
timediff = df[timecol] - df.groupby(idcol)[timecol].transform('first')
if df[timecol].dtype != np.float64:
df[timecol] = timediff.dt.total_seconds() / 3600
return df
def _find_time_and_date_columns(model):
# Both time and date can be None. If date is None time must be not None
time = None
date = None
di = model.datainfo
for col in di:
if col.datatype == 'nmtran-time' and not col.drop:
if time is None:
time = col
else:
raise ValueError(f"Multiple time columns found {time} and {col.name}")
elif col.datatype == 'nmtran-date' and not col.drop:
if date is None:
date = col
else:
raise ValueError(f"Multiple date columns found {date} and {col.name}")
if time is None and date is not None:
raise ValueError(f"Found date column {date}, but no time column")
return time, date
[docs]
def translate_nmtran_time(model: Model):
"""Translate NM-TRAN TIME and DATE column into one TIME column
If dataset of model have special NM-TRAN TIME and DATE columns these
will be translated into one single time column with time in hours.
Warnings
--------
Use this function with caution. For example reset events are currently not taken into account.
Parameters
----------
model : Model
Pharmpy model object
Returns
-------
Model
Pharmpy model object
"""
if model.dataset is None:
return model
timecol, datecol = _find_time_and_date_columns(model)
df = model.dataset.copy()
di = model.datainfo
idname = di.id_column.name
if datecol is None:
if timecol is None:
return model
else:
df = _translate_time_column(df, timecol.name, idname)
else:
assert timecol is not None
df = _translate_time_and_date_columns(df, timecol.name, datecol.name, idname)
model = drop_columns(model, datecol.name)
timecol = timecol.replace(unit='h')
timecol = timecol.replace(datatype='float64')
di = di.set_column(timecol)
model = model.replace(datainfo=di, dataset=df)
return model.update_source()
def _loq_mask(
model: Model,
lloq: Optional[Union[float, str]] = None,
uloq: Optional[Union[float, str]] = None,
blq: Optional[str] = None,
alq: Optional[str] = None,
):
"""Boolean series with False for lloq records and True for non-lloq
Options as remove_loq_data
"""
if blq and lloq:
raise ValueError("Cannot specify blq and lloq at the same time")
if alq and uloq:
raise ValueError("Cannot specify alq and uloq at the same time")
df = model.dataset
if lloq is not None or uloq is not None:
dv = model.datainfo.dv_column.name
mdv = get_mdv(model)
which_keep = pd.Series(True, index=df.index)
if isinstance(lloq, str):
lloq = df[lloq]
if isinstance(uloq, str):
uloq = df[uloq]
if lloq is not None:
which_keep &= (df[dv] > lloq) | mdv
elif blq is not None:
which_keep &= (df[blq] == 0) | mdv
if uloq is not None:
which_keep &= (df[dv] < uloq) | mdv
elif alq is not None:
which_keep &= (df[alq] == 0) | mdv
return which_keep
[docs]
def remove_loq_data(
model: Model,
lloq: Optional[Union[float, str]] = None,
uloq: Optional[Union[float, str]] = None,
blq: Optional[str] = None,
alq: Optional[str] = None,
keep: Optional[int] = 0,
):
"""Remove loq data records from the dataset
Does nothing if none of the limits are specified.
Parameters
----------
model : Model
Pharmpy model object
lloq : float or str
Value or column name for lower limit of quantification.
uloq : float or str
Value or column name for upper limit of quantification.
blq : str
Column name for below limit of quantification indicator.
alq : str
Column name for above limit of quantification indicator.
keep : int
Number of loq records to keep for each run of consecutive loq records.
Returns
-------
Model
Pharmpy model object
Examples
--------
>>> from pharmpy.modeling import *
>>> model = load_example_model("pheno")
>>> model = remove_loq_data(model, lloq=10, uloq=40)
>>> len(model.dataset)
736
See also
--------
set_lloq_data
transform_blq
"""
which_keep = _loq_mask(model, lloq=lloq, uloq=uloq, blq=blq, alq=alq)
df = model.dataset
if keep > 0:
idcol = model.datainfo.id_column.name
keep_df = pd.DataFrame(
{'ID': df[idcol], 'consec': (~which_keep).diff().ne(0).cumsum(), 'remove': ~which_keep}
)
obj = keep_df.groupby([idcol, 'consec']).cumsum().le(keep)['remove']
which_keep = obj | which_keep
model = model.replace(dataset=df[which_keep])
return model.update_source()
[docs]
def set_lloq_data(
model: Model,
value: Union[str, float, Expr],
lloq: Optional[Union[float, str]] = None,
blq: Optional[str] = None,
):
"""Set a dv value for lloq data records
Parameters
----------
model : Model
Pharmpy model object
value : float or Expr
The new dv value
lloq : float or str
Value or column name for lower limit of quantification.
blq : str
Column name for below limit of quantification indicator.
Returns
-------
Model
Pharmpy model object
Examples
--------
>>> from pharmpy.modeling import *
>>> model = load_example_model("pheno")
>>> model = set_lloq_data(model, 0, lloq=10)
See also
--------
remove_loq_data
transform_blq
"""
which_keep = _loq_mask(model, lloq=lloq, blq=blq)
df = model.dataset.copy()
dv = model.datainfo.dv_column.name
if isinstance(value, Expr) or isinstance(value, str):
value = df.eval(str(value))
df[dv] = df[dv].where(which_keep, value)
model = model.replace(dataset=df)
return model
[docs]
def set_reference_values(model: Model, refs: dict[str, Union[int, float]]):
"""Set reference values for selected columns
All values for each selected column will be replaced. For dose columns
only the values for dosing events will be replaced.
Parameters
----------
model : Model
Pharmpy model object
refs : dict
Pairs of column names and reference values
Returns
-------
Model
Pharmpy model object
Examples
--------
>>> from pharmpy.modeling import *
>>> model = load_example_model("pheno")
>>> model = set_reference_values(model, {'WGT': 0.5, 'AMT': 4.0})
>>> model.dataset
ID TIME AMT WGT APGR DV FA1 FA2
0 1 0.0 4.0 0.5 7.0 0.0 1.0 1.0
1 1 2.0 0.0 0.5 7.0 17.3 0.0 0.0
2 1 12.5 4.0 0.5 7.0 0.0 1.0 1.0
3 1 24.5 4.0 0.5 7.0 0.0 1.0 1.0
4 1 37.0 4.0 0.5 7.0 0.0 1.0 1.0
.. .. ... ... ... ... ... ... ...
739 59 108.3 4.0 0.5 6.0 0.0 1.0 1.0
740 59 120.5 4.0 0.5 6.0 0.0 1.0 1.0
741 59 132.3 4.0 0.5 6.0 0.0 1.0 1.0
742 59 144.8 4.0 0.5 6.0 0.0 1.0 1.0
743 59 146.8 0.0 0.5 6.0 40.2 0.0 0.0
<BLANKLINE>
[744 rows x 8 columns]
"""
df = model.dataset
di = model.datainfo
newcols = dict()
dtypes = dict()
for colname, value in refs.items():
if di[colname].type == 'dose':
newdose = df[colname].mask(df[colname] > 0, value)
newcols[colname] = newdose
else:
newcols[colname] = value
datatype = ColumnInfo.convert_datatype_to_pd_dtype(di[colname].datatype)
dtypes[colname] = datatype
df = df.assign(**newcols).astype(dtypes)
model = model.replace(dataset=df)
return model
class Checker:
_all_checks = (
('A1', 'Body weight has unit'),
('A2', 'Body weight has mass unit'),
('A3', 'Body weight >0 and <700kg'),
('A4', 'Age has unit'),
('A5', 'Age has time unit'),
('A6', 'Age >=0 and <130 years'),
('A7', 'Lean body mass has unit'),
('A8', 'Lean body mass has mass unit'),
('A9', 'Lean body mass >0 and <700kg'),
('A10', 'Fat free mass has unit'),
('A11', 'Fat free mass has mass unit'),
('A12', 'Fat free mass >0 and <700kg'),
('D1', 'Time after dose has unit'),
('D2', 'Time after dose has time unit'),
('D3', 'Time after dose >=0'),
('D4', 'Plasma concentration has unit'),
('D5', 'Plasma concentration has mass/volume unit'),
('D6', 'Plasma concentration >= 0'),
('I1', 'Subject identifier is unitless'),
)
def __init__(self, datainfo, dataset, verbose=False):
self.datainfo = datainfo
self.dataset = dataset
self.verbose = verbose
self.check_results = {}
self.violations = []
def set_result(self, code, test=False, violation=None, skip=False, warn=False):
if skip:
result = "SKIP"
elif test:
result = "OK"
else:
if warn:
result = "WARN"
else:
result = "FAIL"
if code not in self.check_results or (
code in self.check_results
and (
self.check_results[code] == 'SKIP'
or self.check_results[code] == 'OK'
and result in ('WARN', 'FAIL')
or self.check_results[code] == 'WARN'
and result == 'FAIL'
)
):
self.check_results[code] = result
if result in ('WARN', 'FAIL'):
self.violations.append((code, result, violation))
def check_has_unit(self, code, col):
has_unit = col.unit is not None
self.set_result(code, test=has_unit, violation=col.name, warn=True)
return has_unit
def check_is_unitless(self, code, col):
is_unitless = col.unit == Unit.unitless()
self.set_result(code, test=is_unitless, violation=col.name, warn=True)
def check_dimension(self, code, column, dim):
if column.unit is None:
self.set_result(code, skip=True)
return False
else:
dim2 = sympy.physics.units.Dimension(
sympy.physics.units.si.SI.get_dimensional_expr(column.unit._expr)
)
self.set_result(
code,
test=dim == dim2,
violation=f"Unit {column.unit} of {column.name} is not a {dim} unit",
)
return dim == dim2
def check_range(self, code, col, lower, upper, unit, lower_included=True, upper_included=True):
name = col.name
if lower == 0:
scaled_lower = lower
else:
scaled_lower = float(
sympy.physics.units.convert_to(lower * unit, col.unit._expr) / col.unit._expr
)
if upper == 0:
scaled_upper = upper
else:
scaled_upper = float(
sympy.physics.units.convert_to(upper * unit, col.unit._expr) / col.unit._expr
)
if lower_included:
lower_viol = self.dataset[name] < scaled_lower
else:
lower_viol = self.dataset[name] <= scaled_lower
if upper_included:
upper_viol = self.dataset[name] > scaled_upper
else:
upper_viol = self.dataset[name] >= scaled_upper
all_viol = lower_viol | upper_viol
violations = all_viol[all_viol]
if not violations.empty:
for i in violations.index:
self.set_result(
code,
test=False,
violation=f"{col.name} index={i} value={self.dataset[name].loc[i]}",
)
else:
self.set_result(code, test=True)
def get_dataframe(self):
codes = []
checks = []
results = []
violations = []
for code, msg in Checker._all_checks:
if code not in self.check_results:
self.check_results[code] = "SKIP"
if self.check_results[code] in ['OK', 'SKIP']:
if self.verbose:
codes.append(code)
checks.append(msg)
results.append(self.check_results[code])
violations.append(None)
else:
for viol in self.violations:
if (
viol[0] == code
and viol[1] == "FAIL"
or (viol[1] == "WARN" and self.verbose)
):
codes.append(code)
checks.append(msg)
results.append(viol[1])
violations.append(viol[2])
df = pd.DataFrame(
{'code': codes, 'check': checks, 'result': results, 'violation': violations}
)
return df
def print(self):
table = rich_table.Table(title="Dataset checks", box=rich_box.SQUARE)
table.add_column("Code")
table.add_column("Check")
table.add_column("Result")
table.add_column("Violation")
for code, msg in Checker._all_checks:
if code not in self.check_results:
self.check_results[code] = "SKIP"
if self.check_results[code] in ['OK', 'SKIP']:
if self.verbose:
table.add_row(code, msg, f'[bold green]{self.check_results[code]}', "")
else:
for viol in self.violations:
if (
viol[0] == code
and viol[1] == "FAIL"
or (viol[1] == "WARN" and self.verbose)
):
result = viol[1]
if result == "FAIL":
result = f"[bold red]{result}"
else:
result = f"[bold yellow]{result}"
table.add_row(code, msg, result, viol[2])
if table.rows: # Do not print an empty table
console = rich_console.Console()
console.print(table)
[docs]
def check_dataset(model: Model, dataframe: bool = False, verbose: bool = False):
"""Check dataset for consistency across a set of rules
Parameters
----------
model : Model
Pharmpy model object
dataframe : bool
True to return a DataFrame instead of printing to the console
verbose : bool
Print out all rules checked if True else print only failed rules
Returns
-------
pd.DataFrame
Only returns a DataFrame is dataframe=True
"""
di = model.datainfo
df = model.dataset
checker = Checker(di, df, verbose=verbose)
for col in di:
if col.descriptor == "body weight":
checker.check_has_unit("A1", col)
samedim = checker.check_dimension("A2", col, sympy.physics.units.mass)
if samedim:
checker.check_range("A3", col, 0, 700, sympy.physics.units.kg, False, False)
if col.descriptor == "age":
checker.check_has_unit("A4", col)
samedim = checker.check_dimension("A5", col, sympy.physics.units.time)
if samedim:
checker.check_range("A6", col, 0, 130, sympy.physics.units.year, True, False)
if col.descriptor == "lean body mass":
checker.check_has_unit("A7", col)
samedim = checker.check_dimension("A8", col, sympy.physics.units.mass)
if samedim:
checker.check_range("A9", col, 0, 700, sympy.physics.units.kg, False, False)
if col.descriptor == "fat free mass":
checker.check_has_unit("A10", col)
samedim = checker.check_dimension("A11", col, sympy.physics.units.mass)
if samedim:
checker.check_range("A12", col, 0, 700, sympy.physics.units.kg, False, False)
if col.descriptor == "time after dose":
checker.check_has_unit("D1", col)
samedim = checker.check_dimension("D2", col, sympy.physics.units.time)
if samedim:
checker.check_range(
"D3", col, 0, float('inf'), sympy.physics.units.second, True, False
)
if col.descriptor == "plasma concentration":
checker.check_has_unit("D4", col)
samedim = checker.check_dimension(
"D5", col, sympy.physics.units.mass / sympy.physics.units.length**3
)
if samedim:
checker.check_range(
"D6",
col,
0,
float('inf'),
sympy.physics.units.kg / sympy.physics.units.L,
True,
False,
)
if col.descriptor == "subject identifier":
checker.check_is_unitless("I1", col)
if dataframe:
return checker.get_dataframe()
else:
checker.print()
[docs]
def read_dataset_from_datainfo(
datainfo: Union[DataInfo, Path, str], datatype: Optional[str] = None
):
"""Read a dataset given a datainfo object or path to a datainfo file
Parameters
----------
datainfo : DataInfo | Path | str
A datainfo object or a path to a datainfo object
datatype : str
A string to specify dataset type
Returns
-------
pd.DataFrame
The dataset
"""
if not isinstance(datainfo, DataInfo):
datainfo = DataInfo.read_json(datainfo)
if datainfo.path is None:
raise ValueError('datainfo.path is None')
from pharmpy.model.external.nonmem.dataset import read_nonmem_dataset
from pharmpy.model.external.nonmem.parsing import filter_observations
if datatype == 'nonmem':
drop = [col.drop for col in datainfo]
df = read_nonmem_dataset(
datainfo.path,
ignore_character='@',
drop=drop,
colnames=datainfo.names,
dtype=datainfo.get_dtype_dict(),
)
# This assumes a PK model
df = filter_observations(df, datainfo)
else:
df = pd.read_csv(
datainfo.path,
sep=datainfo.separator,
dtype=datainfo.get_dtype_dict(),
float_precision='round_trip',
)
return df
def create_default_datainfo(path_or_df):
if not isinstance(path_or_df, pd.DataFrame):
path = path_absolute(path_or_df)
datainfo_path = path.with_suffix('.datainfo')
if datainfo_path.is_file():
di = DataInfo.read_json(datainfo_path)
di = di.replace(path=path)
return di
else:
with open(path) as file:
first_line = file.readline()
if ',' not in first_line:
colnames = list(pd.read_csv(path, nrows=0, sep=r'\s+'))
separator = r'\s+'
else:
colnames = list(pd.read_csv(path, nrows=0))
separator = ','
else:
colnames = path_or_df.columns
separator = None
path = None
column_info = []
for colname in colnames:
if colname == 'ID' or colname == 'L1':
info = ColumnInfo.create(colname, type='id', scale='nominal', datatype='int32')
elif colname == 'DV':
info = ColumnInfo.create(colname, type='dv')
elif colname == 'TIME':
if not set(colnames).isdisjoint({'DATE', 'DAT1', 'DAT2', 'DAT3'}):
datatype = 'nmtran-time'
else:
datatype = 'float64'
info = ColumnInfo.create(colname, type='idv', scale='ratio', datatype=datatype)
elif colname == 'EVID':
info = ColumnInfo.create(colname, type='event', scale='nominal')
elif colname == 'MDV':
if 'EVID' in colnames:
info = ColumnInfo.create(colname, type='mdv')
else:
info = ColumnInfo.create(colname, type='event', scale='nominal', datatype='int32')
elif colname == 'AMT':
info = ColumnInfo.create(colname, type='dose', scale='ratio')
elif colname == 'BLQ':
info = ColumnInfo.create(colname, type='blq', scale='nominal', datatype='int32')
elif colname == 'LLOQ':
info = ColumnInfo.create(colname, type='lloq', scale='ratio')
elif colname == 'DVID':
info = ColumnInfo.create(colname, type='dvid', scale='nominal', datatype='int32')
elif colname == 'SS':
info = ColumnInfo.create(colname, type='ss', scale='nominal', datatype='int32')
elif colname == 'II':
info = ColumnInfo.create(colname, type='ii', scale='ratio')
else:
info = ColumnInfo.create(colname)
column_info.append(info)
di = DataInfo.create(column_info, path=path, separator=separator)
return di
[docs]
def deidentify_data(
df: pd.DataFrame, id_column: str = 'ID', date_columns: Optional[list[str]] = None
):
"""Deidentify a dataset
Two operations are performed on the dataset:
1. All ID numbers are randomized from the range 1 to n
2. All columns containing dates will have the year changed
The year change is done by letting the earliest year in the dataset
be used as a reference and by maintaining leap years. The reference year
will either be 1901, 1902, 1903 or 1904 depending on its distance to the closest
preceeding leap year.
Parameters
----------
df : pd.DataFrame
A dataset
id_column : str
Name of the id column
date_columns : list
Names of all date columns
Returns
-------
pd.DataFrame
Deidentified dataset
"""
df = df.copy()
df[id_column] = pd.to_numeric(df[id_column])
resampler = resample_data(df, id_column)
df, _ = next(resampler)
if date_columns is None:
return df
for datecol in date_columns:
if pd.api.types.is_datetime64_any_dtype(df[datecol]):
pass
elif df[datecol].dtype == 'object':
# assume string
df[datecol] = pd.to_datetime(df[datecol])
else:
raise ValueError(f"Column {datecol} does not seem to contain a date")
earliest_date = df[date_columns].min().min()
# Handle leap year modulo
earliest_year_modulo = earliest_date.year % 4
reference_offset = 4 if earliest_year_modulo == 0 else earliest_year_modulo
reference_year = 1900 + reference_offset
delta = earliest_date.year - reference_year
def convert(x):
new = x.replace(year=x.year - delta)
return new
for datecol in date_columns:
df[datecol] = df[datecol].transform(convert)
return df
[docs]
def unload_dataset(model: Model):
"""Unload the dataset from a model
Parameters
----------
model : Model
Pharmpy model
Returns
-------
Model
Pharmpy model with dataset removed
Example
-------
>>> from pharmpy.modeling import load_example_model, unload_dataset
>>> model = load_example_model("pheno")
>>> model = unload_dataset(model)
>>> model.dataset is None
True
"""
model = model.replace(dataset=None)
return model
[docs]
def load_dataset(model: Model):
"""Load the dataset given datainfo
Parameters
----------
model : Model
Pharmpy model
Returns
-------
Model
Pharmpy model with dataset removed
Example
-------
>>> from pharmpy.modeling import load_example_model, load_dataset, unload_dataset
>>> model = load_example_model("pheno")
>>> model = unload_dataset(model)
>>> model.dataset is None
True
>>> model = load_dataset(model)
>>> model.dataset
ID TIME AMT WGT APGR DV FA1 FA2
0 1 0.0 25.0 1.4 7.0 0.0 1.0 1.0
1 1 2.0 0.0 1.4 7.0 17.3 0.0 0.0
2 1 12.5 3.5 1.4 7.0 0.0 1.0 1.0
3 1 24.5 3.5 1.4 7.0 0.0 1.0 1.0
4 1 37.0 3.5 1.4 7.0 0.0 1.0 1.0
.. .. ... ... ... ... ... ... ...
739 59 108.3 3.0 1.1 6.0 0.0 1.0 1.0
740 59 120.5 3.0 1.1 6.0 0.0 1.0 1.0
741 59 132.3 3.0 1.1 6.0 0.0 1.0 1.0
742 59 144.8 3.0 1.1 6.0 0.0 1.0 1.0
743 59 146.8 0.0 1.1 6.0 40.2 0.0 0.0
<BLANKLINE>
[744 rows x 8 columns]
"""
df = read_dataset_from_datainfo(model.datainfo)
model = model.replace(dataset=df)
return model
[docs]
def set_dataset(
model: Model, path_or_df: Union[str, Path, pd.DataFrame], datatype: Optional[str] = None
):
"""Load the dataset given datainfo
Parameters
----------
model : Model
Pharmpy model
path_or_df : str, Path, or pd.DataFrame
Dataset path or dataframe
datatype : str
Type of dataset (optional)
Returns
-------
Model
Pharmpy model with new dataset and updated datainfo
Example
-------
>>> from pharmpy.modeling import load_example_model, load_dataset, unload_dataset, set_dataset
>>> model = load_example_model("pheno")
>>> model = unload_dataset(model)
>>> dataset_path = model.datainfo.path
>>> model.dataset is None
True
>>> model = set_dataset(model, dataset_path, datatype='nonmem')
>>> model.dataset
ID TIME AMT WGT APGR DV FA1 FA2
0 1 0.0 25.0 1.4 7.0 0.0 1.0 1.0
1 1 2.0 0.0 1.4 7.0 17.3 0.0 0.0
2 1 12.5 3.5 1.4 7.0 0.0 1.0 1.0
3 1 24.5 3.5 1.4 7.0 0.0 1.0 1.0
4 1 37.0 3.5 1.4 7.0 0.0 1.0 1.0
.. .. ... ... ... ... ... ... ...
739 59 108.3 3.0 1.1 6.0 0.0 1.0 1.0
740 59 120.5 3.0 1.1 6.0 0.0 1.0 1.0
741 59 132.3 3.0 1.1 6.0 0.0 1.0 1.0
742 59 144.8 3.0 1.1 6.0 0.0 1.0 1.0
743 59 146.8 0.0 1.1 6.0 40.2 0.0 0.0
<BLANKLINE>
[744 rows x 8 columns]
"""
if isinstance(path_or_df, pd.DataFrame):
df = path_or_df
if datatype == 'nonmem':
di = create_default_datainfo(path_or_df)
else:
di = DataInfo.create(columns=list(df.columns.values), path=None)
else:
path = normalize_user_given_path(path_or_df)
if datatype == 'nonmem':
di = create_default_datainfo(path)
else:
di = DataInfo.create(path=path)
df = read_dataset_from_datainfo(di, datatype=datatype)
di = update_datainfo(di, df).replace(path=path)
if len(df.columns) == 1:
warnings.warn('Could only find one column, should this be another datatype?')
model = model.replace(dataset=df, datainfo=di)
return model.update_source()
[docs]
def bin_observations(
model: Model, method: Literal["equal_width", "equal_number"], nbins: int
) -> pd.Series:
"""Bin all observations on the independent variable
Available binning methods:
+---------------+-------------------------------------------------+
| Method | Description |
+===============+=================================================+
| equal_width | Bins with equal width based on the idv |
+---------------+-------------------------------------------------+
| equal_number | Bins containing an equal number of observations |
+---------------+-------------------------------------------------+
Parameters
----------
model : Model
Pharmpy model
method : str
Name of the binning method to use
nbins : int
The number of bins wanted
Returns
-------
pd.Series
A series of bin ids indexed on the original record index of the dataset
list
A list of bin edges
Example
-------
>>> from pharmpy.modeling import load_example_model, bin_observations
>>> model = load_example_model("pheno")
>>> bins, boundaries = bin_observations(model, method="equal_width", nbins=10)
>>> bins
1 0
11 2
13 0
19 1
26 3
..
719 1
727 3
729 0
736 1
743 3
Name: TIME, Length: 155, dtype: int64
>>> boundaries
array([ 0. , 39.88, 78.76, 117.64, 156.52, 195.4 , 234.28, 273.16,
312.04, 350.92, 389.8 ])
"""
observations = get_observations(model, keep_index=True)
obs = model.dataset.loc[observations.index]
idv = model.datainfo.idv_column.name
sorted_idvs = obs[idv].sort_values()
method_lower = method.lower()
if method_lower == "equal_width":
bincol, boundaries = pd.cut(sorted_idvs, nbins, labels=False, retbins=True)
boundaries[0] = 0
elif method_lower == "equal_number":
bin_edges = _get_bin_edges_psn(sorted_idvs, nbins)
bincol, boundaries = pd.cut(
sorted_idvs, bin_edges, labels=False, retbins=True, include_lowest=True
)
else:
raise ValueError(f"Unknown binning method {method}")
sorted_bincol = bincol.sort_index()
return sorted_bincol, boundaries
def _get_bin_edges_psn(data, n_bins):
"""Similar to function "get_bin_ceilings_from_count" from PsN
Divide a list of data points into bins of equal count.
"""
# Create a dictionary with unique values and their indices
unique_values, value_indices = np.unique(data, return_inverse=True)
value_dict = {i: val for i, val in enumerate(unique_values)}
# Count occurrences of each unique value
obs_count = pd.Series(value_indices).value_counts().sort_index().tolist()
# Calculate the number of unique values
n_values = len(value_dict)
# Calculate the ideal count for each bin
count_per_bin = len(data) / n_bins
ideal_count = [count_per_bin] * n_bins
bin_ceilings = [0]
global_error = 0
bin_index = 0
local_error = -ideal_count[bin_index]
for value_index, obs in enumerate(obs_count):
if bin_index == len(ideal_count) - 1:
bin_ceilings.append(unique_values[n_values - 1])
break
elif local_error == -ideal_count[bin_index]:
local_error += obs
elif obs == 0:
continue
elif abs(global_error + local_error) > abs(global_error + local_error + obs) and (
n_values - value_index - 1
) > (len(ideal_count) - bin_index - 1):
local_error += obs
else:
bin_ceilings.append(unique_values[value_index - 1])
global_error += local_error
bin_index += 1
local_error = -ideal_count[bin_index] + obs
return bin_ceilings