Source code for pharmpy.modeling.pd

"""
:meta private:
"""

from __future__ import annotations

from typing import Literal, Optional

from pharmpy.basic import Expr
from pharmpy.model import (
    Assignment,
    Compartment,
    CompartmentalSystem,
    CompartmentalSystemBuilder,
    Model,
    Statements,
    get_and_check_odes,
    output,
)
from pharmpy.modeling import create_symbol, get_central_volume_and_clearance, set_initial_condition

from .error import set_proportional_error_model
from .odes import add_individual_parameter, set_initial_estimates

PDTypes = Literal['linear', 'emax', 'sigmoid', 'step', 'loglin']


[docs] def add_effect_compartment(model: Model, expr: PDTypes): r"""Add an effect compartment. Implemented PD models are: * Linear: .. math:: E = B \cdot (1 + \text{slope} \cdot C) * Emax: .. math:: E = B \cdot \Bigg(1 + \frac {E_{max} \cdot C } { EC_{50} + C} \Bigg) * Step effect: .. math:: E = \Biggl \lbrace {B \quad \text{if C} \leq 0 \atop B \cdot (1+ E_{max}) \quad \text{else}} * Sigmoidal: .. math:: E=\Biggl \lbrace {B \cdot \Bigl(1+\frac{E_{max} \cdot C^n}{EC_{50}^n+C^n}\Bigl) \quad \ \text{if C}>0 \atop B \quad \text{else}} * Log-linear: .. math:: E = \text{slope} \cdot \text{log}(C + C_0) :math:`B` is the baseline effect Parameters ---------- model : Model Pharmpy model expr : {'linear', 'emax', 'sigmoid', 'step', 'loglin'} Name of the PD effect function. Return ------ Model Updated Pharmpy model Examples -------- >>> from pharmpy.modeling import * >>> model = load_example_model("pheno") >>> model = add_effect_compartment(model, "linear") >>> model.statements.ode_system.find_compartment("EFFECT") Compartment(EFFECT, amount=A_EFFECT(t), input=KE0*A_CENTRAL(t)/VC) """ vc, cl = get_central_volume_and_clearance(model) odes = get_and_check_odes(model) central = odes.central_compartment cb = CompartmentalSystemBuilder(odes) ke0 = Expr.symbol("KE0") met = Expr.symbol('MET') model = add_individual_parameter(model, met.name) ke0_ass = Assignment.create(ke0, 1 / met) effect = Compartment.create("EFFECT", input=ke0 * central.amount / vc) cb.add_compartment(effect) cb.add_flow(effect, output, ke0) model = model.replace( statements=Statements( model.statements.before_odes + ke0_ass + CompartmentalSystem(cb) + model.statements.after_odes ) ) model = _add_effect(model, expr, effect.amount) return model.update_source()
[docs] def set_direct_effect(model: Model, expr: PDTypes, variable: Optional[str] = None): r"""Add an effect to a model. Effects are by default using concentratrion, but any user specified variable in the model can be used. Implemented PD models are: * Linear: .. math:: E = B \cdot (1 + \text{slope} \cdot C) * Emax: .. math:: E = B \cdot \Bigg(1 + \frac {E_{max} \cdot C } { EC_{50} + C} \Bigg) * Step effect: .. math:: E=\Biggl \lbrace {B \cdot (1+ E_{max}) \quad \text{if C}>0 \atop B \quad \text{else}} * Sigmoidal: .. math:: E=\Biggl \lbrace {B \cdot \Bigl(1+\frac{E_{max} \cdot C^n}{EC_{50}^n+C^n}\Bigl) \quad \ \text{if C}>0 \atop B \quad \text{else}} * Log-linear: .. math:: E = \text{slope} \cdot \text{log}(C + C_0) :math:`B` is the baseline effect Parameters ---------- model : Model Pharmpy model expr : {'linear', 'emax', 'sigmoid', 'step', 'loglin'} Name of PD effect function. variable : str Name of variable to use (if None concentration will be used) Return ------ Model Updated Pharmpy model Examples -------- >>> from pharmpy.modeling import * >>> model = load_example_model("pheno") >>> model = set_direct_effect(model, "linear") >>> model.statements.find_assignment("E") SLOPE⋅A_CENTRAL(t) ────────────────── E = VC """ if variable is None: vc, cl = get_central_volume_and_clearance(model) odes = get_and_check_odes(model) conc_expr = odes.central_compartment.amount / vc variable_symb = conc_expr for s in model.statements.after_odes: if s.expression == conc_expr: variable_symb = s.symbol break concentration = True else: variable_symb = Expr.symbol(variable) if ( variable not in model.datainfo.names and variable_symb not in model.statements.free_symbols ): raise ValueError(f'Variable {variable} could not be found') concentration = False model = _add_effect(model, expr, variable_symb, concentration) return model.update_source()
def _add_baseline_effect(model: Model): b = model.statements.find_assignment("B") if b is None: model = add_individual_parameter(model, "B") return model def _handle_zero(model: Model, expr: Expr): idv = model.datainfo.idv_column.symbol if expr.is_piecewise(): args = expr.piecewise_args non_zero_arg = args[0] if args[0][0] != 0 else args[1] condition = (idv <= 0) | (~non_zero_arg[1]) expr = non_zero_arg[0] expr = Expr.piecewise((Expr.integer(0), condition), (expr, True)) else: expr = Expr.piecewise((Expr.integer(0), idv <= 0), (expr, True)) return expr def _add_drug_effect(model: Model, expr: str, conc, zero_handled=True): if expr == "linear": s = create_symbol(model, "SLOPE") model = add_individual_parameter(model, s.name, lower=-float("inf")) effect = s * conc if not zero_handled: effect = _handle_zero(model, effect) elif expr == "emax": emax = Expr.symbol("E_MAX") model = add_individual_parameter(model, emax.name, lower=-1.0) x50 = _get_x50(model, conc) model = add_individual_parameter(model, x50.name) effect = emax * conc / (x50 + conc) if not zero_handled: effect = _handle_zero(model, effect) elif expr == "step": emax = Expr.symbol("E_MAX") model = add_individual_parameter(model, emax.name, lower=-1.0) effect = Expr.piecewise((emax, conc > 0), (Expr.integer(0), True)) if not zero_handled: effect = _handle_zero(model, effect) elif expr == "sigmoid": emax = Expr.symbol("E_MAX") model = add_individual_parameter(model, emax.name, lower=-1.0) x50 = _get_x50(model, conc) model = add_individual_parameter(model, x50.name) n = Expr.symbol("N") # Hill coefficient model = add_individual_parameter(model, n.name) model = set_initial_estimates(model, {"POP_N": 1}) effect = Expr.piecewise( ((emax * conc**n / (x50**n + conc**n)), conc > 0), (Expr.integer(0), True) ) if not zero_handled: effect = _handle_zero(model, effect) elif expr == "loglin": s = Expr.symbol("SLOPE") e0 = Expr.symbol("B") model = add_individual_parameter(model, s.name, lower=-float("inf")) effect = s * (conc + (e0 / s).exp()).log() if not zero_handled: effect = _handle_zero(model, effect) else: raise ValueError(f'Unknown model "{expr}".') E = Assignment(Expr.symbol("E"), effect) e_index = model.statements.find_assignment_index("E") if e_index is None: r_index = model.statements.find_assignment_index("R") if r_index is None: model = model.replace(statements=model.statements + E) else: statements = model.statements[0:r_index] + E + model.statements[r_index:] model = model.replace(statements=statements) else: statements = model.statements[0:e_index] + E + model.statements[e_index + 1 :] model = model.replace(statements=statements) return model def _get_x50(model, conc): x50_name = "EC_50" conc_assign = model.statements.find_assignment(conc) if conc_assign: central = model.statements.ode_system.central_compartment elimination_rate = model.statements.ode_system.get_flow(central, output) amount = central.amount if conc_assign.expression == amount * elimination_rate: x50_name = "EDK_50" elif conc_assign.expression == amount: x50_name = "A_50" else: x50_name = "EC_50" return Expr.symbol(x50_name) def _add_response(model: Model, expr: str): r_index = model.statements.find_assignment_index("R") if expr != "loglin": if r_index is not None: assignment = model.statements[r_index] assert isinstance(assignment, Assignment) expression = assignment.expression * (Expr.integer(1) + Expr.symbol("E")) assignment = Assignment(Expr.symbol("R"), expression) statements = model.statements[0:r_index] + assignment + model.statements[r_index + 1 :] else: b = Expr.symbol("B") assignment = Assignment(Expr.symbol("R"), b * (Expr.integer(1) + Expr.symbol("E"))) statements = model.statements + assignment model = model.replace(statements=statements) return model def _add_dependent_variable(model: Model, expr: str): dv, *_ = model.dependent_variables a = model.statements.get_assignment(dv) R = Expr.symbol("R") if R not in a.expression.free_symbols: # Add dependent variable Y_2 y_2 = Expr.symbol('Y_2') if expr != 'loglin': y = Assignment.create(y_2, R) else: y = Assignment.create(y_2, Expr.symbol("E")) dvs = model.dependent_variables.replace(y_2, 2) model = model.replace(statements=model.statements + y, dependent_variables=dvs) # Add error model model = set_proportional_error_model(model, dv=2, zero_protection=False) return model def _add_effect(model: Model, expr: str, conc, zero_handled=True): model = _add_baseline_effect(model) model = _add_drug_effect(model, expr, conc, zero_handled) model = _add_response(model, expr) model = _add_dependent_variable(model, expr) return model
[docs] def add_indirect_effect( model: Model, expr: Literal['linear', 'emax', 'sigmoid', 'step'], prod: bool = True, ): r"""Add indirect (turnover) effect The concentration :math:`C_c` has an impact on the production or degradation rate of the response R: * Production: .. math:: \frac {dR}{dt} = k_{in} \cdot (1 + f(C_c)) - k_{out} \cdot R * Degradation: .. math:: \frac {dR}{dt} = k_{in} - k_{out} \cdot (1 + f(C_c)) \cdot R :math:`k_{in}` and :math:`k_{out}` can either be inhibited or stimulated. Baseline :math:`B = R(0) = R_0 = k_{in}/k_{out}`. Models: * Linear: .. math:: f(C_c) = \text{slope} \cdot C_c * Emax: .. math:: f(C_c) = \frac {E_{max} \cdot C_c } { EC_{50} + C_c } * Sigmoidal: .. math:: f(C_c) = \frac{E_{max} \cdot C_c^n}{EC_{50}^n+C^n} Parameters ---------- model : Model Pharmpy model prod : bool Production (True) (default) or degradation (False) expr : {'linear', 'emax', 'sigmoid', 'step'} Name of PD effect function. Return ------ Model Updated Pharmpy model Examples -------- >>> from pharmpy.modeling import * >>> model = load_example_model("pheno") >>> model = add_indirect_effect(model, expr='linear', prod=True) """ vc, cl = get_central_volume_and_clearance(model) odes = get_and_check_odes(model) central = odes.central_compartment conc_c = central.amount / vc response = Compartment.create("RESPONSE") a_response = response.amount kin = Expr.symbol("K_IN") kout = Expr.symbol("K_OUT") met = Expr.symbol('MET') model = add_individual_parameter(model, met.name) b = Expr.symbol("B") # baseline model = add_individual_parameter(model, b.name) kout_ass = Assignment.create(kout, 1 / met) kin_ass = Assignment.create(kin, kout * b) if expr == 'linear': s = Expr.symbol("SLOPE") model = add_individual_parameter(model, s.name, lower=-float("inf")) R = Expr.symbol("SLOPE") * conc_c elif expr == 'emax': emax = Expr.symbol("E_MAX") model = add_individual_parameter(model, emax.name, lower=-1.0) ec50 = Expr.symbol("EC_50") model = add_individual_parameter(model, ec50.name) R = emax * conc_c / (ec50 + conc_c) elif expr == 'sigmoid': emax = Expr.symbol("E_MAX") ec50 = Expr.symbol("EC_50") n = Expr.symbol("N") model = add_individual_parameter(model, n.name) model = set_initial_estimates(model, {"POP_N": 1}) model = add_individual_parameter(model, ec50.name) model = add_individual_parameter(model, emax.name, lower=-1.0) R = emax * conc_c**n / (ec50**n + conc_c**n) else: raise ValueError(f'Unknown model "{expr}".') cb = CompartmentalSystemBuilder(odes) if prod: response = Compartment.create("RESPONSE", input=kin * (1 + R)) cb.add_compartment(response) cb.add_flow(response, output, kout) elif not prod: response = Compartment.create("RESPONSE", input=kin) cb.add_compartment(response) cb.add_flow(response, output, kout * (1 + R)) model = model.replace( statements=Statements( model.statements.before_odes + kout_ass + kin_ass + CompartmentalSystem(cb) + model.statements.after_odes ) ) model = set_initial_condition(model, "RESPONSE", b) # Add dependent variable Y_2 y_2 = Expr.symbol('Y_2') y = Assignment(y_2, a_response) dvs = model.dependent_variables.replace(y_2, 2) model = model.replace(statements=model.statements + y, dependent_variables=dvs) # Add error model model = set_proportional_error_model(model, dv=2, zero_protection=False) return model.update_source()
[docs] def set_baseline_effect(model: Model, expr: str = 'const'): r"""Create baseline effect model. Currently implemented baseline effects are: Constant baseline effect (const): .. math:: E = B Parameters ---------- model : Model Pharmpy model expr : str Name of baseline effect function. Return ------ Model Updated Pharmpy model Examples -------- >>> from pharmpy.modeling import * >>> model = load_example_model("pheno") >>> model = set_baseline_effect(model, expr='const') >>> model.statements.find_assignment("E") E = B """ e0 = Expr.symbol("B") model = add_individual_parameter(model, e0.name) E = Assignment(Expr.symbol('E'), e0) # Add dependent variable Y_2 y_2 = Expr.symbol('Y_2') y = Assignment.create(y_2, E.symbol) dvs = model.dependent_variables.replace(y_2, 2) model = model.replace(statements=model.statements + E + y, dependent_variables=dvs) # Add error model model = set_proportional_error_model(model, dv=2, zero_protection=False) return model
[docs] def add_placebo_model( model: Model, expr: Literal['linear', 'exp', 'hyperbolic'], operator: Literal['*', '+', 'prop'] = '*', ): r"""Add a placebo or disease progression effect to a model. .. warning:: This function is under development. * linear .. math:: R = B + \text{slope} \cdot \text{TIME} * exp .. math:: R = B \cdot e^{\frac{-t}{t_D}} * hyperbolic .. math:: R = B \cdot \frac{t_{50}}{t + t_{50}} :math:`B` is the baseline effect Parameters ---------- model : Model Pharmpy model expr : str Name of placebo/disease progression effect function. operator : str Operator to use for combining the baseline with the placebo/disease progression Return ------ Model Updated Pharmpy model Examples -------- >>> from pharmpy.modeling import * >>> model = create_basic_pd_model() >>> model = add_placebo_model(model, "linear") >>> model.statements.find_assignment("PDP") PDP = SLOPE⋅TIME """ r_index = model.statements.find_assignment_index("R") if r_index is None: raise ValueError("Cannot find response variable R. Is this a PD model?") P = Expr.symbol("PDP") p_index = model.statements.find_assignment_index("PDP") if p_index is not None: raise ValueError("PDP already in the model. Not yet supported") idv = Expr.symbol(model.datainfo.idv_column.name) old_rassign = model.statements.get_assignment("R") def operate(lhs, rhs, operator): if operator == '*': return lhs * rhs elif operator == '+': return lhs + rhs elif operator == 'prop': return lhs * (1 + rhs) else: raise ValueError(f"Unknown operator {operator}") if expr == 'linear': slope = create_symbol(model, "SLOPE") model = add_individual_parameter(model, slope.name, lower=-float("inf")) passign_expr = slope * idv rassign_expr = operate(old_rassign.expression, P, operator) elif expr == 'exp': if operator != '*': raise ValueError('Only * is supported for exp') td = create_symbol(model, "TD") model = add_individual_parameter(model, td.name) passign_expr = (-idv / td).exp() rassign_expr = old_rassign.expression * P elif expr == 'hyperbolic': t50 = create_symbol(model, "T50") model = add_individual_parameter(model, t50.name) passign_expr = t50 / (idv + t50) rassign_expr = operate(old_rassign.expression, P, operator) else: raise ValueError(f"Unknown placebo model {expr}") passign = Assignment(P, passign_expr) new_rassign = Assignment(old_rassign.symbol, rassign_expr) r_index = model.statements.get_assignment_index("R") statements = ( model.statements[:r_index] + passign + new_rassign + model.statements[r_index + 1 :] ) model = model.replace(statements=statements) model = model.update_source() return model