"""
:meta private:
"""
from __future__ import annotations
import math
import re
import warnings
from collections import defaultdict
from operator import add, mul
from typing import Literal, Union
from pharmpy.basic.expr import BooleanExpr, Expr
from pharmpy.deps import numpy as np
from pharmpy.deps import sympy
from pharmpy.internals.expr.parse import parse as parse_expr
from pharmpy.internals.expr.subs import subs
from pharmpy.model import Assignment, Model, Parameter, Parameters, Statement, Statements
from .common import get_model_covariates
from .data import get_baselines
from .expressions import (
depends_on,
get_individual_parameters,
get_mu_connected_to_parameter,
has_mu_reference,
remove_covariate_effect_from_statements,
simplify_model,
)
from .parameters import get_thetas
EffectType = Union[Literal['lin', 'cat', 'cat2', 'piece_lin', 'exp', 'pow'], str]
OperationType = Literal['*', '+']
[docs]
def get_covariate_effects(model: Model) -> dict[list]:
"""Return a dictionary of all used covariates within a model
The dictionary will have parameter name as key with a connected value as
a list of tuple(s) with (covariate, effect type, operator)
Parameters
----------
model : Model
Model to extract covariates from.
Returns
-------
Dictionary : Dictionary of parameters and connected covariate(s)
"""
parameters = get_individual_parameters(model)
covariates = get_model_covariates(model)
param_w_cov = defaultdict(list)
for p in parameters:
for c in covariates:
if has_covariate_effect(model, str(p), str(c)):
param_w_cov[p].append(c)
res = defaultdict(list)
for param, covariates in param_w_cov.items():
for cov in covariates:
coveffect, op = _get_covariate_effect(model, param, cov)
if coveffect and op:
res[(param, cov)].append((coveffect, op))
return res
def _get_covariate_effect(model: Model, symbol, covariate):
param_expr = model.statements.before_odes.full_expression(symbol)
param_expr = sympy.sympify(param_expr)
covariate = sympy.sympify(covariate)
etas = tuple(sympy.sympify(e) for e in model.random_variables.etas.symbols)
thetas = tuple(sympy.sympify(t) for t in get_thetas(model).symbols)
if isinstance(param_expr, sympy.Mul):
op = "*"
elif isinstance(param_expr, sympy.Add):
op = "+"
else:
# Do nothing ?
pass
cov_expression = None
perform_matching = False
for arg in param_expr.args:
check_covariate = False
free_symbols = arg.free_symbols
if any(eta in free_symbols for eta in etas):
if Expr(arg).is_exp() and covariate in free_symbols:
skip = False
expression = arg
exp_terms = arg.args[0]
for a in exp_terms.args:
if covariate not in a.free_symbols:
expression = expression.subs({a: Expr(0)})
else:
# If both covariate and ETA in same term -> SKIP
if any(eta in a.free_symbols for eta in etas):
skip = True
if not skip:
check_covariate = True
else:
check_covariate = True
expression = arg
if check_covariate:
if covariate in free_symbols:
cov_expression = expression
# Need at least one theta to perform matching
cov_effect = "CUSTOM"
if any(theta in free_symbols for theta in thetas):
perform_matching = True
if perform_matching:
for effect in ['lin', 'cat', 'cat2', 'piece_lin', 'exp', 'pow']:
template = _create_template(effect, model, str(covariate))
template = template.template.expression
template = sympy.sympify(template)
wild_dict = defaultdict(list)
for s in template.free_symbols:
wild_symbol = sympy.Wild(str(s))
template = template.subs({s: wild_symbol})
if str(s).startswith("theta"):
wild_dict["theta"].append(wild_symbol)
elif str(s).startswith("cov"):
wild_dict["cov"].append(wild_symbol)
elif str(s).startswith("median"):
wild_dict["median"].append(wild_symbol)
cov_expression = sympy.sympify(cov_expression)
match = cov_expression.match(template)
if match:
if _assert_cov_effect_match(wild_dict, match, model, str(covariate), effect):
return effect, op
if cov_expression:
return cov_effect, op
return None, None
def _assert_cov_effect_match(symbols, match, model, covariate, effect):
if effect == "pow":
if (
sympy.Wild("cov") in match.keys()
and match[sympy.Wild("cov")].is_number
and sympy.Wild("median") in match.keys()
and match[sympy.Wild("median")].is_Pow
):
temp = match[sympy.Wild("cov")]
match[sympy.Wild("cov")] = match[sympy.Wild("median")]
match[sympy.Wild("median")] = temp
for key, values in symbols.items():
if key == "theta":
thetas = get_thetas(model).symbols
if all(value in match.keys() for value in values):
if any(match[value] not in thetas for value in values):
return False
if key == "cov":
covariate = sympy.Symbol(covariate)
if all(value in match.keys() for value in values):
if all(match[value] not in [covariate, 1 / covariate] for value in values):
return False
if key == "median":
if all(value in match.keys() for value in values):
if not all(match[value].is_number for value in values):
return False
return True
[docs]
def has_covariate_effect(model: Model, parameter: str, covariate: str):
"""Tests if an instance of :class:`pharmpy.model` has a given covariate
effect.
Parameters
----------
model : Model
Pharmpy model to check for covariate effect.
parameter : str
Name of parameter.
covariate : str
Name of covariate.
Return
------
bool
Whether input model has a covariate effect of the input covariate on
the input parameter.
Examples
--------
>>> from pharmpy.modeling import *
>>> model = load_example_model("pheno")
>>> has_covariate_effect(model, "CL", "APGR")
False
"""
return depends_on(model, parameter, covariate)
[docs]
def remove_covariate_effect(model: Model, parameter: str, covariate: str):
"""Remove a covariate effect from an instance of :class:`pharmpy.model`.
Parameters
----------
model : Model
Pharmpy model from which to remove the covariate effect.
parameter : str
Name of parameter.
covariate : str
Name of covariate.
Return
------
Model
Pharmpy model object
Examples
--------
>>> from pharmpy.modeling import *
>>> model = load_example_model("pheno")
>>> has_covariate_effect(model, "CL", "WGT")
True
>>> model = remove_covariate_effect(model, "CL", "WGT")
>>> has_covariate_effect(model, "CL", "WGT")
False
"""
if not has_covariate_effect(model, parameter, covariate):
return model
kept_thetas, before_odes = simplify_model(
model,
model.statements.before_odes,
remove_covariate_effect_from_statements(
model, model.statements.before_odes, parameter, covariate
),
)
ode_system: list[Statement] = (
[] if model.statements.ode_system is None else [model.statements.ode_system]
)
after_odes = list(model.statements.after_odes)
statements = Statements(before_odes + ode_system + after_odes)
kept_parameters = model.random_variables.free_symbols.union(
kept_thetas, model.statements.after_odes.free_symbols
)
parameters = Parameters.create((p for p in model.parameters if p.symbol in kept_parameters))
model = model.replace(statements=statements, parameters=parameters)
return model.update_source()
[docs]
def add_covariate_effect(
model: Model,
parameter: str,
covariate: str,
effect: EffectType,
operation: OperationType = '*',
allow_nested: bool = False,
):
"""Adds covariate effect to :class:`pharmpy.model`.
The following effects have templates:
- Linear function for continuous covariates (*lin*)
- Function:
.. math::
\\text{coveff} = 1 + \\text{theta} * (\\text{cov} - \\text{median})
- Init: 0.001
- Upper:
- If median of covariate equals minimum: 100,000
- Otherwise: :math:`\\frac{1}{\\text{median} - \\text{min}}`
- Lower:
- If median of covariate equals maximum: -100,000
- Otherwise: :math:`\\frac{1}{\\text{median} - \\text{max}}`
- Linear function for categorical covariates (*cat*)
- Function:
- If covariate is the most common category:
.. math::
\\text{coveff} = 1
- For each additional category:
.. math::
\\text{coveff} = 1 + \\text{theta}
- Init: 0.001
- Upper: 5
- Lower: -1
- (alternative) Linear function for categorical covariates (*cat2*)
- Function:
- If covariate is the most common category:
.. math::
\\text{coveff} = 1
- For each additional category:
.. math::
\\text{coveff} = \\text{theta}
- Init: 0.001
- Upper: 6
- Lower: 0
- Piecewise linear function/"hockey-stick", continuous covariates only (*piece_lin*)
- Function:
- If cov <= median:
.. math::
\\text{coveff} = 1 + \\text{theta1} * (\\text{cov} - \\text{median})
- If cov > median:
.. math::
\\text{coveff} = 1 + \\text{theta2} * (\\text{cov} - \\text{median})
- Init: 0.001
- Upper:
- For first state: :math:`\\frac{1}{\\text{median} - \\text{min}}`
- Otherwise: 100,000
- Lower:
- For first state: -100,000
- Otherwise: :math:`\\frac{1}{\\text{median} - \\text{max}}`
- Exponential function, continuous covariates only (*exp*)
- Function:
.. math::
\\text{coveff} = \\exp(\\text{theta} * (\\text{cov} - \\text{median}))
- Init:
- If lower > 0.001 or upper < 0.001: :math:`\\frac{\\text{upper} - \\text{lower}}{2}`
- If estimated init is 0: :math:`\\frac{\\text{upper}}{2}`
- Otherwise: 0.001
- Upper:
- If min - median = 0 or max - median = 0: 100
- Otherwise:
.. math::
\\min(\\frac{\\log(0.01)}{\\text{min} - \\text{median}},
\\frac{\\log(100)}{\\text{max} - \\text{median}})
- Lower:
- If min - median = 0 or max - median = 0: 0.01
- Otherwise:
.. math::
\\max(\\frac{\\log(0.01)}{\\text{max} - \\text{median}},
\\frac{\\log(100)}{\\text{min} - \\text{median}})
- Power function, continuous covariates only (*pow*)
- Function:
.. math::
\\text{coveff} = (\\frac{\\text{cov}}{\\text{median}})^\\text{theta}
- Init: 0.001
- Upper: 100,000
- Lower: -100
Parameters
----------
model : Model
Pharmpy model to add covariate effect to.
parameter : str
Name of parameter to add covariate effect to.
covariate : str
Name of covariate.
effect : str
Type of covariate effect. May be abbreviated covariate effect (see above) or custom.
operation : str, optional
Whether the covariate effect should be added or multiplied (default).
allow_nested : bool, optional
Whether to allow adding a covariate effect when one already exists for
the input parameter-covariate pair.
Return
------
Model
Pharmpy model object
Examples
--------
>>> from pharmpy.modeling import *
>>> model = load_example_model("pheno")
>>> model = add_covariate_effect(model, "CL", "APGR", "exp")
>>> model.statements.before_odes.full_expression("CL")
POP_CL*WGT*exp(ETA_CL + POP_CLAPGR*(APGR - 7.0))
"""
sset = model.statements
if not allow_nested and depends_on(model, parameter, covariate):
warnings.warn(f'Covariate effect of {covariate} on {parameter} already exists')
return model
statistics = {}
statistics['mean'] = _calculate_mean(model.dataset, covariate)
statistics['median'] = _calculate_median(model, covariate)
statistics['std'] = _calculate_std(model, covariate)
covariate_effect = _create_template(effect, model, covariate)
pset, thetas = _create_thetas(model, parameter, effect, covariate, covariate_effect.template)
covariate_effect.apply(parameter, covariate, thetas, statistics)
# NOTE: We hoist the statistic statements to avoid referencing variables
# before declaring them. We also avoid duplicate statements.
sset = [s for s in covariate_effect.statistic_statements if s not in sset] + sset
if has_mu_reference(model):
mu_symbol = get_mu_connected_to_parameter(model, parameter)
last_existing_parameter_assignment = sset.find_assignment(mu_symbol)
else:
last_existing_parameter_assignment = sset.find_assignment(parameter)
assert last_existing_parameter_assignment is not None
insertion_index = sset.index(last_existing_parameter_assignment) + 1
# NOTE: We can use any assignment to the parameter since we currently only
# use its symbol to create the new effect statement.
effect_statement = covariate_effect.create_effect_statement(
operation, last_existing_parameter_assignment
)
statements = []
statements.append(covariate_effect.template)
statements.append(effect_statement)
cov_possible = {Expr.symbol(parameter)} | {
Expr.symbol(f'{parameter}{col_name}') for col_name in model.datainfo.names
}
if has_mu_reference(model):
mu_assignment = sset.find_assignment(mu_symbol)
parameter_assignment = sset.find_assignment(parameter)
index = {Expr.symbol(eta): i for i, eta in enumerate(model.random_variables.etas.names, 1)}
etas = set(index)
eta = next(iter(etas.intersection(parameter_assignment.expression.free_symbols)))
old_def = parameter_assignment.subs(
{mu_assignment.symbol: mu_assignment.expression}
).expression._sympy_()
remove_iiv_def = old_def.as_independent(eta)[0]
new_mu_expression = sympy.solve(
subs(old_def, {remove_iiv_def: remove_iiv_def * statements[0].symbol})
- parameter_assignment.expression,
mu_assignment.symbol,
)[0]
statements[-1] = Assignment.create(effect_statement.symbol, new_mu_expression)
sset = sset[0 : insertion_index - 1] + sset[insertion_index:]
insertion_index -= 1
# NOTE: This is a heuristic that simplifies the NONMEM statements by
# grouping multiple effect statements in a single statement.
if last_existing_parameter_assignment.expression.args and all(
map(cov_possible.__contains__, last_existing_parameter_assignment.expression.args)
):
statements[-1] = Assignment.create(
effect_statement.symbol,
effect_statement.expression.subs(
{parameter: last_existing_parameter_assignment.expression},
),
)
sset = sset[0 : insertion_index - 1] + sset[insertion_index:]
insertion_index -= 1
sset = sset[0:insertion_index] + statements + sset[insertion_index:]
model = model.replace(parameters=pset, statements=sset)
return model.update_source()
def natural_order(string, _nsre=re.compile(r'([0-9]+)')):
# From https://stackoverflow.com/a/16090640
return [int(key) if key.isdigit() else (key.lower(), key) for key in _nsre.split(string)]
def _create_thetas(model, parameter, effect, covariate, template, _ctre=re.compile(r'theta\d*')):
"""Creates theta parameters and adds to parameter set of model.
Number of parameters depends on how many thetas have been declared."""
new_thetas = {sym for sym in map(str, template.expression.free_symbols) if _ctre.match(sym)}
no_of_thetas = len(new_thetas)
pset = model.parameters
theta_names = {}
if no_of_thetas == 1:
inits = _choose_param_inits(effect, model, covariate)
theta_name = f'POP_{parameter}{covariate}'
pset = Parameters.create(
list(pset) + [Parameter(theta_name, inits['init'], inits['lower'], inits['upper'])]
)
theta_names['theta'] = theta_name
else:
for i, new_theta in enumerate(sorted(new_thetas, key=natural_order), 1):
inits = _choose_param_inits(effect, model, covariate, i)
theta_name = f'POP_{parameter}{covariate}_{i}'
pset = Parameters.create(
list(pset) + [Parameter(theta_name, inits['init'], inits['lower'], inits['upper'])]
)
theta_names[new_theta] = theta_name
return pset, theta_names
def _count_categorical(model, covariate):
"""Gets the number of individuals that has a level of categorical covariate."""
idcol = model.datainfo.id_column.name
df = model.dataset.set_index(idcol)
allcounts = df[covariate].groupby('ID').value_counts()
allcounts.name = None # To avoid collisions when resetting index
counts = allcounts.reset_index().iloc[:, 1].value_counts()
counts.sort_index(inplace=True) # To make deterministic in case of multiple modes
if model.dataset[covariate].isna().any():
counts[np.nan] = 0
return counts
def _calculate_mean(df, covariate, baselines=False):
"""Calculate mean. Can be set to use baselines, otherwise it is
calculated first per individual, then for the group."""
if baselines:
return df[str(covariate)].mean()
else:
return df.groupby('ID')[str(covariate)].mean().mean()
def _calculate_median(model, covariate, baselines=False):
"""Calculate median. Can be set to use baselines, otherwise it is
calculated first per individual, then for the group."""
if baselines:
return get_baselines(model)[str(covariate)].median()
else:
df = model.dataset
return df.groupby('ID')[str(covariate)].median().median()
def _calculate_std(model, covariate, baselines=False):
"""Calculate median. Can be set to use baselines, otherwise it is
calculated first per individual, then for the group."""
if baselines:
return get_baselines(model)[str(covariate)].std()
else:
df = model.dataset
return df.groupby('ID')[str(covariate)].mean().std()
def _choose_param_inits(effect, model, covariate, index=None):
"""Chooses inits for parameters. If the effect is exponential, the
bounds need to be dynamic."""
df = model.dataset
init_default = 0.001
inits = {}
cov_median = _calculate_median(model, covariate)
cov_min = df[str(covariate)].min()
cov_max = df[str(covariate)].max()
lower, upper = _choose_bounds(effect, cov_median, cov_min, cov_max, index)
if effect == 'exp':
if lower > init_default or init_default > upper:
init = (upper + lower) / 2
if init == 0:
init = upper / 5
else:
init = init_default
elif effect == 'pow':
init = init_default
elif effect == "cat2":
init = 1.01
else:
init = init_default
inits['init'] = init
inits['lower'] = lower
inits['upper'] = upper
return inits
def _choose_bounds(effect, cov_median, cov_min, cov_max, index=None):
if effect == 'exp':
min_diff = cov_min - cov_median
max_diff = cov_max - cov_median
lower_expected = 0.01
upper_expected = 100
if min_diff == 0 or max_diff == 0:
return lower_expected, upper_expected
else:
log_base = 10
lower = max(
math.log(lower_expected, log_base) / max_diff,
math.log(upper_expected, log_base) / min_diff,
)
upper = min(
math.log(lower_expected, log_base) / min_diff,
math.log(upper_expected, log_base) / max_diff,
)
elif effect == 'lin':
if cov_median == cov_min:
upper = 100000
else:
upper = 1 / (cov_median - cov_min)
if cov_median == cov_max:
lower = -100000
else:
lower = 1 / (cov_median - cov_max)
elif effect == 'piece_lin':
if cov_median == cov_min or cov_median == cov_max:
raise Exception(
'Median cannot be same as min or max, cannot use '
'piecewise-linear parameterization.'
)
if index == 0:
lower = -100000
upper = 1 / (cov_median - cov_min)
else:
lower = 1 / (cov_median - cov_max)
upper = 100000
elif effect == 'pow':
lower = -100
upper = 100000
elif effect == 'cat':
lower = -1
upper = 5
elif effect == 'cat2':
lower = 0
upper = 6
else:
lower = -100000
upper = 100000
return round(lower, 4), round(upper, 4)
def _create_template(effect, model, covariate):
"""Creates Covariate class objects with effect template."""
if effect == 'lin':
return CovariateEffect.linear()
elif effect == 'cat':
counts = _count_categorical(model, covariate)
return CovariateEffect.categorical(counts)
elif effect == 'cat2':
counts = _count_categorical(model, covariate)
return CovariateEffect.categorical(counts, alternative=True)
elif effect == 'piece_lin':
return CovariateEffect.piecewise_linear()
elif effect == 'exp':
return CovariateEffect.exponential()
elif effect == 'pow':
return CovariateEffect.power()
else:
symbol = Expr.symbol('symbol')
expression = parse_expr(effect)
return CovariateEffect(Assignment.create(symbol, expression))
class CovariateEffect:
"""
Covariate effect consisting of new assignments.
Attributes
----------
template
Assignment based on covariate effect
statistic_statements
Dict with mean, median and standard deviation
:meta private:
"""
def __init__(self, template):
self.template = template
self.statistic_statements = []
def apply(self, parameter, covariate, thetas, statistics):
effect_name = f'{parameter}{covariate}'
theta_subs = self.template.expression.subs(thetas)
cov_subs = theta_subs.subs({'cov': covariate})
self.template = Assignment.create(
Expr(effect_name),
cov_subs,
)
template_str = [str(symbol) for symbol in self.template.free_symbols]
if 'mean' in template_str:
self.template = self.template.subs({'mean': f'{covariate}_MEAN'})
s = Assignment.create(Expr.symbol(f'{covariate}_MEAN'), statistics['mean'])
self.statistic_statements.append(s)
if 'median' in template_str:
self.template = self.template.subs({'median': f'{covariate}_MEDIAN'})
s = Assignment.create(Expr.symbol(f'{covariate}_MEDIAN'), statistics['median'])
self.statistic_statements.append(s)
if 'std' in template_str:
self.template = self.template.subs({'std': f'{covariate}_STD'})
s = Assignment.create(Expr.symbol(f'{covariate}_STD'), statistics['std'])
self.statistic_statements.append(s)
def create_effect_statement(self, operation_str, statement_original):
"""Creates statement for addition or multiplication of covariate
to parameter, e.g. (if parameter is CL and covariate is WGT):
CL = CLWGT + TVCL*EXP(ETA(1))"""
operation = self._get_operation(operation_str)
symbol = statement_original.symbol
expression = statement_original.symbol
statement_new = Assignment.create(symbol, operation(expression, self.template.symbol))
return statement_new
@staticmethod
def _get_operation(operation_str):
"""Gets sympy operation based on string"""
if operation_str == '*':
return mul
elif operation_str == '+':
return add
raise NotImplementedError(f'Can only handle + or *, got {operation_str}.')
@classmethod
def linear(cls):
"""Linear continuous template (for continuous covariates)."""
symbol = Expr.symbol('symbol')
expression = 1 + Expr.symbol('theta') * (Expr.symbol('cov') - Expr.symbol('median'))
template = Assignment.create(symbol, expression)
return cls(template)
@classmethod
def categorical(cls, counts, alternative=False):
"""Linear categorical template (for categorical covariates)."""
symbol = Expr.symbol('symbol')
most_common = counts.idxmax()
categories = list(counts.index)
values = [1]
conditions = [BooleanExpr.eq(Expr.symbol('cov'), most_common)]
for i, cat in enumerate(categories, 1):
if cat != most_common:
if np.isnan(cat):
conditions += [BooleanExpr.eq(Expr.symbol('cov'), Expr.symbol('NaN'))]
values += [1]
else:
conditions += [BooleanExpr.eq(Expr.symbol('cov'), cat)]
if len(categories) == 2:
if alternative:
values += [Expr.symbol('theta')]
else:
values += [1 + Expr.symbol('theta')]
else:
if alternative:
values += [Expr.symbol(f'theta{i}')]
else:
values += [1 + Expr.symbol(f'theta{i}')]
expression = Expr.piecewise(*zip(values, conditions))
template = Assignment.create(symbol, expression)
return cls(template)
@classmethod
def piecewise_linear(cls):
"""Piecewise linear ("hockey-stick") template (for continuous
covariates)."""
symbol = Expr.symbol('symbol')
values = [
1 + Expr.symbol('theta1') * (Expr.symbol('cov') - Expr.symbol('median')),
1 + Expr.symbol('theta2') * (Expr.symbol('cov') - Expr.symbol('median')),
]
conditions = [
BooleanExpr.le(Expr.symbol('cov'), Expr.symbol('median')),
BooleanExpr.gt(Expr.symbol('cov'), Expr.symbol('median')),
]
expression = Expr.piecewise((values[0], conditions[0]), (values[1], conditions[1]))
template = Assignment.create(symbol, expression)
return cls(template)
@classmethod
def exponential(cls):
"""Exponential template (for continuous covariates)."""
symbol = Expr.symbol('symbol')
expression = Expr.exp(Expr.symbol('theta') * (Expr.symbol('cov') - Expr.symbol('median')))
template = Assignment.create(symbol, expression)
return cls(template)
@classmethod
def power(cls):
"""Power template (for continuous covariates)."""
symbol = Expr.symbol('symbol')
expression = (Expr.symbol('cov') / Expr.symbol('median')) ** Expr.symbol('theta')
template = Assignment.create(symbol, expression)
return cls(template)
def __str__(self):
"""String representation of class."""
return str(self.template)
def get_covariates_allowed_in_covariate_effect(model: Model) -> set[str]:
try:
di_covariate = model.datainfo.typeix['covariate'].names
except IndexError:
di_covariate = []
try:
di_admid = model.datainfo.typeix['admid'].names
except IndexError:
di_admid = []
try:
di_unknown = model.datainfo.typeix['unknown'].names
except IndexError:
di_unknown = []
return set(di_covariate).union(di_unknown, di_admid)