"""
:meta private:
"""
from collections import Counter
from functools import reduce
from itertools import chain, combinations
from operator import add, mul
from typing import List, Optional, Union
from pharmpy.deps import numpy as np
from pharmpy.deps import sympy
from pharmpy.internals.expr.parse import parse as parse_expr
from pharmpy.internals.expr.subs import subs
from pharmpy.model import (
Assignment,
JointNormalDistribution,
Model,
NormalDistribution,
Parameter,
Parameters,
)
from .expressions import create_symbol, get_pk_parameters, has_random_effect
from .help_functions import _format_input_list, _format_options, _get_etas
[docs]def add_iiv(
model: Model,
list_of_parameters: Union[List[str], str],
expression: Union[List[str], str],
operation: str = '*',
initial_estimate: float = 0.09,
eta_names: Optional[List[str]] = None,
):
"""Adds IIVs to :class:`pharmpy.model`.
Effects that currently have templates are:
- Additive (*add*)
- Proportional (*prop*)
- Exponential (*exp*)
- Logit (*log*)
For all except exponential the operation input is not needed. Otherwise user specified
input is supported. Initial estimates for new etas are 0.09.
Parameters
----------
model : Model
Pharmpy model to add new IIVs to.
list_of_parameters : str, list
Name/names of parameter to add new IIVs to.
expression : str, list
Effect/effects on eta. Either abbreviated (see above) or custom.
operation : str, list, optional
Whether the new IIV should be added or multiplied (default).
initial_estimate : float
Value of initial estimate of parameter. Default is 0.09
eta_names : str, list, optional
Custom name/names of new eta
Return
------
Model
Pharmpy model object
Example
-------
>>> from pharmpy.modeling import *
>>> model = load_example_model("pheno")
>>> model = remove_iiv(model, "CL")
>>> model = add_iiv(model, "CL", "add")
>>> model.statements.find_assignment("CL")
CL = ETA_CL + TVCL
See also
--------
add_pk_iiv
add_iov
remove_iiv
remove_iov
"""
rvs, pset, sset = (
model.random_variables,
list(model.parameters),
model.statements,
)
list_of_parameters = _format_input_list(list_of_parameters)
list_of_options = _format_options([expression, operation, eta_names], len(list_of_parameters))
expression, operation, eta_names = list_of_options
if all(eta_name is None for eta_name in eta_names):
eta_names = None
if not all(len(opt) == len(list_of_parameters) for opt in list_of_options if opt):
raise ValueError(
'The number of provided expressions and operations must either be equal '
'to the number of parameters or equal to 1'
)
for i in range(len(list_of_parameters)):
omega = sympy.Symbol(f'IIV_{list_of_parameters[i]}')
if not eta_names:
eta_name = f'ETA_{list_of_parameters[i]}'
else:
eta_name = eta_names[i]
eta = NormalDistribution.create(eta_name, 'iiv', 0, omega)
rvs = rvs + eta
pset.append(Parameter(str(omega), init=initial_estimate))
index = sset.find_assignment_index(list_of_parameters[i])
if index is None:
raise ValueError(f'Could not find parameter: {list_of_parameters[i]}')
statement = sset[index]
eta_addition = _create_template(expression[i], operation[i])
eta_addition.apply(statement.expression, eta.names[0])
sset = (
sset[0:index] + Assignment(statement.symbol, eta_addition.template) + sset[index + 1 :]
)
model = model.replace(random_variables=rvs, parameters=Parameters.create(pset), statements=sset)
return model.update_source()
ADD_IOV_DISTRIBUTION = frozenset(('disjoint', 'joint', 'explicit', 'same-as-iiv'))
[docs]def add_iov(
model: Model,
occ: str,
list_of_parameters: Optional[Union[List[str], str]] = None,
eta_names: Optional[Union[List[str], str]] = None,
distribution: str = 'disjoint',
):
"""Adds IOVs to :class:`pharmpy.model`.
Initial estimate of new IOVs are 10% of the IIV eta it is based on.
Parameters
----------
model : Model
Pharmpy model to add new IOVs to.
occ : str
Name of occasion column.
list_of_parameters : str, list
List of names of parameters and random variables. Accepts random variable names, parameter
names, or a mix of both.
eta_names : str, list
Custom names of new etas. Must be equal to the number of input etas times the number of
categories for occasion.
distribution : str
The distribution that should be used for the new etas. Options are
'disjoint' for disjoint normal distributions, 'joint' for joint normal
distribution, 'explicit' for an explicit mix of joint and disjoint
distributions, and 'same-as-iiv' for copying the distribution of IIV etas.
Return
------
Model
Pharmpy model object
Example
-------
>>> from pharmpy.modeling import *
>>> model = load_example_model("pheno")
>>> model = add_iov(model, "TIME", "CL")
>>> model.statements.find_assignment("CL") # doctest: +SKIP
CL = ETA_CL + TVCL
See also
--------
add_iiv
add_pk_iiv
remove_iiv
remove_iov
"""
if distribution not in ADD_IOV_DISTRIBUTION:
raise ValueError(f'"{distribution}" is not a valid value for distribution')
list_of_parameters = _format_input_list(list_of_parameters)
if distribution == 'explicit':
if list_of_parameters is None or not all(
map(lambda x: isinstance(x, list), list_of_parameters)
):
raise ValueError(
'distribution == "explicit" requires parameters to be given as lists of lists'
)
else:
if list_of_parameters is not None and not all(
map(lambda x: isinstance(x, str), list_of_parameters)
):
raise ValueError(
'distribution != "explicit" requires parameters to be given as lists of strings'
)
if list_of_parameters is None:
if distribution == 'disjoint':
etas = list(map(lambda x: [x], _get_etas(model, None, include_symbols=True)))
else:
etas = [_get_etas(model, None, include_symbols=True)]
else:
if distribution == 'disjoint':
list_of_parameters = list(map(lambda x: [x], list_of_parameters))
elif distribution == 'joint' or distribution == 'same-as-iiv':
list_of_parameters = [list_of_parameters]
if not all(
map(
lambda x: isinstance(x, list) and all(map(lambda y: isinstance(y, str), x)),
list_of_parameters,
)
):
raise ValueError('not all parameters are strings')
etas = [_get_etas(model, grp, include_symbols=True) for grp in list_of_parameters]
for dist, grp in zip(etas, list_of_parameters):
assert len(dist) <= len(grp)
categories = get_occasion_levels(model.dataset, occ)
if eta_names and len(eta_names) != sum(map(len, etas)) * len(categories):
raise ValueError(
'Number of given eta names is incorrect, '
f'need {sum(map(len,etas)) * len(categories)} names.'
)
if len(categories) == 1:
raise ValueError(f'Only one value in {occ} column.')
# NOTE This declares the ETAS and their corresponding OMEGAs
if distribution == 'same-as-iiv':
# NOTE We filter existing IIV distributions for selected ETAs and then
# let the explicit distribution logic handle the rest
assert len(etas) == 1
etas_set = set(etas[0])
etas = []
for dist in model.random_variables:
intersection = list(filter(etas_set.__contains__, dist.names))
if not intersection:
continue
etas.append(intersection)
first_iov_name = create_symbol(model, 'IOV_', force_numbering=True).name
first_iov_number = int(first_iov_name.split('_')[-1])
def eta_name(i, k):
return (
eta_names[(i - 1) * len(categories) + k - 1] if eta_names else f'ETA_{iov_name(i)}_{k}'
)
def omega_iov_name(i, j):
return f'OMEGA_{iov_name(i)}' if i == j else f'OMEGA_{iov_name(i)}_{j}'
def iov_name(i):
return f'IOV_{first_iov_number + i - 1}'
def etai_name(i):
return f'ETAI{first_iov_number + i - 1}'
rvs, pset, iovs = _add_iov_explicit(
model, occ, etas, categories, iov_name, etai_name, eta_name, omega_iov_name
)
model = model.replace(random_variables=rvs, parameters=Parameters.create(pset), statements=iovs)
return model.update_source()
def _add_iov_explicit(model, occ, etas, categories, iov_name, etai_name, eta_name, omega_iov_name):
assert all(map(bool, etas))
ordered_etas = list(chain.from_iterable(etas))
eta, count = next(iter(Counter(ordered_etas).most_common()))
if count >= 2:
raise ValueError(f'{eta} was given twice.')
distributions = [
range(i, i + len(grp))
for i, grp in zip(reduce(lambda acc, x: acc + [acc[-1] + x], map(len, etas), [1]), etas)
]
rvs, pset, sset = (
model.random_variables,
list(model.parameters),
model.statements,
)
iovs, etais, sset = _add_iov_declare_etas(
sset,
occ,
ordered_etas,
range(1, len(ordered_etas) + 1),
categories,
eta_name,
iov_name,
etai_name,
)
for dist in distributions:
assert dist
_add_iov_etas = _add_iov_etas_disjoint if len(dist) == 1 else _add_iov_etas_joint
to_add = _add_iov_etas(
rvs,
pset,
ordered_etas,
dist,
categories,
omega_iov_name,
eta_name,
)
rvs = rvs + list(to_add)
return rvs, pset, iovs + etais + sset
def _add_iov_declare_etas(sset, occ, etas, indices, categories, eta_name, iov_name, etai_name):
iovs, etais = [], []
for i in indices:
eta = etas[i - 1]
# NOTE This declares IOV-ETA case assignments and replaces the existing
# ETA with its sum with the new IOV ETA
iov = sympy.Symbol(iov_name(i))
expression = sympy.Piecewise(
*(
(sympy.Symbol(eta_name(i, k)), sympy.Eq(cat, sympy.Symbol(occ)))
for k, cat in enumerate(categories, 1)
)
)
iovs.append(Assignment(iov, parse_expr(0)))
iovs.append(Assignment(iov, expression))
etai = sympy.Symbol(etai_name(i))
etais.append(Assignment(etai, sympy.Symbol(eta) + iov))
sset = sset.subs({eta: etai})
return iovs, etais, sset
def _add_iov_etas_disjoint(rvs, pset, etas, indices, categories, omega_iov_name, eta_name):
_add_iov_declare_diagonal_omegas(rvs, pset, etas, indices, omega_iov_name)
for i in indices:
omega_iov = sympy.Symbol(omega_iov_name(i, i))
for k in range(1, len(categories) + 1):
yield NormalDistribution.create(eta_name(i, k), 'iov', 0, omega_iov)
def _add_iov_etas_joint(rvs, pset, etas, indices, categories, omega_iov_name, eta_name):
_add_iov_declare_diagonal_omegas(rvs, pset, etas, indices, omega_iov_name)
# NOTE Declare off-diagonal OMEGAs
for i, j in combinations(indices, r=2):
omega_iov = sympy.Symbol(omega_iov_name(i, j))
omega_iiv = rvs.get_covariance(etas[i - 1], etas[j - 1])
paramset = Parameters.create(pset) # FIXME!
init = paramset[omega_iiv].init * 0.1 if omega_iiv != 0 and omega_iiv in paramset else 0.001
pset.append(Parameter(str(omega_iov), init=init))
mu = [0] * len(indices)
sigma = [[sympy.Symbol(omega_iov_name(min(i, j), max(i, j))) for i in indices] for j in indices]
for k in range(1, len(categories) + 1):
names = list(map(lambda i: eta_name(i, k), indices))
yield JointNormalDistribution.create(names, 'iov', mu, sigma)
def _add_iov_declare_diagonal_omegas(rvs, pset, etas, indices, omega_iov_name):
for i in indices:
eta = etas[i - 1]
omega_iiv = rvs[eta].get_variance(eta)
omega_iov = sympy.Symbol(omega_iov_name(i, i))
paramset = Parameters.create(pset) # FIXME!
init = paramset[omega_iiv].init * 0.1 if omega_iiv in paramset else 0.01
pset.append(Parameter(str(omega_iov), init=init))
[docs]def add_pk_iiv(model: Model, initial_estimate: float = 0.09):
"""Adds IIVs to all PK parameters in :class:`pharmpy.model`.
Will add exponential IIVs to all parameters that are included in the ODE.
Parameters
----------
model : Model
Pharmpy model to add new IIVs to.
initial_estimate : float
Value of initial estimate of parameter. Default is 0.09
Return
------
Model
Pharmpy model object
Example
-------
>>> from pharmpy.modeling import *
>>> model = load_example_model("pheno")
>>> model = set_first_order_absorption(model)
>>> model.statements.find_assignment("MAT")
MAT = POP_MAT
>>> model = add_pk_iiv(model)
>>> model.statements.find_assignment("MAT")
ETA_MAT
MAT = POP_MAT⋅ℯ
See also
--------
add_iiv
add_iov
remove_iiv
remove_iov
"""
params_to_add_etas = [
param for param in get_pk_parameters(model) if not has_random_effect(model, param, 'iiv')
]
if params_to_add_etas:
model = add_iiv(model, params_to_add_etas, 'exp', initial_estimate=initial_estimate)
return model.update_source()
def _create_template(expression, operation):
operation_func = _get_operation_func(operation)
if expression == 'add':
return EtaAddition.additive()
elif expression == 'prop':
return EtaAddition.proportional()
elif expression == 'exp':
return EtaAddition.exponential(operation_func)
elif expression == 'log':
return EtaAddition.logit()
else:
expression = parse_expr(f'original {operation} {expression}')
return EtaAddition(expression)
def _get_operation_func(operation):
"""Gets sympy operation based on string"""
if operation == '*':
return mul
elif operation == '+':
return add
def get_occasion_levels(df, occ):
levels = df[occ].unique()
return _canonicalize_categories(levels)
def _canonicalize_categories(categories):
return sorted(map(_canonicalize_category, categories))
def _canonicalize_category(c: Union[int, float, str]):
if isinstance(c, int):
return c
if isinstance(c, float):
return int(c)
if isinstance(c, (np.int32, np.int64)):
return int(c)
if isinstance(c, str):
return _canonicalize_category(float(c) if '.' in c else int(c))
raise ValueError(f'Cannot canonicalize category "{type(c)}({c})"')
class EtaAddition:
"""
Eta addition consisting of expression.
Attributes
----------
template
Expression consisting of original statement +/* new eta with effect.
:meta private:
"""
def __init__(self, template):
self.template = template
def apply(self, original, eta):
self.template = subs(self.template, {'original': original, 'eta_new': eta})
@classmethod
def additive(cls):
template = sympy.Symbol('original') + sympy.Symbol('eta_new')
return cls(template)
@classmethod
def proportional(cls):
template = sympy.Symbol('original') * sympy.Symbol('eta_new')
return cls(template)
@classmethod
def exponential(cls, operation):
template = operation(sympy.Symbol('original'), sympy.exp(sympy.Symbol('eta_new')))
return cls(template)
@classmethod
def logit(cls):
template = sympy.Symbol('original') * (
sympy.exp(sympy.Symbol('eta_new')) / (1 + sympy.exp(sympy.Symbol('eta_new')))
)
return cls(template)