Source code for pharmpy.modeling.odes

"""
:meta private:
"""

from __future__ import annotations

from collections.abc import Mapping
from typing import Literal, Union

from pharmpy.basic import Expr, TExpr
from pharmpy.deps import sympy
from pharmpy.internals.unicode import bracket
from pharmpy.model import (
    Assignment,
    Bolus,
    Compartment,
    CompartmentalSystem,
    CompartmentalSystemBuilder,
    ExecutionSteps,
    Infusion,
    Model,
    ModelError,
    Parameter,
    Parameters,
    Statements,
    get_and_check_odes,
    output,
)
from pharmpy.modeling.help_functions import _as_integer

from .common import remove_unused_parameters_and_rvs, rename_symbols
from .data import get_observations
from .expressions import create_symbol, is_real
from .parameters import (
    add_population_parameter,
    fix_parameters,
    fix_parameters_to,
    set_initial_estimates,
    set_upper_bounds,
    unfix_parameters,
)


def _extract_params_from_symb(
    statements: Statements, symbol_name: str, pset: Parameters
) -> Parameter:
    terms = {
        symb.name
        for symb in statements.before_odes.full_expression(Expr.symbol(symbol_name)).free_symbols
    }
    theta_name = terms.intersection(pset.names).pop()
    return pset[theta_name]


def _find_noncov_theta(model, paramsymb, full=False):
    # Deterministic function to find the main theta in the expression for parasymb
    # Set full to True if already providing a full expression
    # Find subexpression with as few thetas and covariates as possible
    # Stop if one theta found with no covs.
    if full:
        start_expr = paramsymb
    else:
        start_expr = model.statements.before_odes.full_expression(paramsymb)

    exprs = [start_expr]
    all_popparams = set(model.parameters.symbols)
    all_covs = set(Expr.symbol(name) for name in model.datainfo.names)

    while exprs:
        nthetas = float("Inf")
        ncovs = float("Inf")
        next_expr = None
        for expr in exprs:
            if isinstance(expr, tuple):
                # This is for Piecewise pairs
                symbs = set()
                for e in expr:
                    symbs |= e.free_symbols
            else:
                symbs = expr.free_symbols
            curthetas = symbs & all_popparams
            ncurthetas = len(curthetas)
            curcovs = symbs & all_covs
            ncurcovs = len(curcovs)
            if ncurthetas != 0 and (
                ncurthetas < nthetas or (ncurthetas == nthetas and ncurcovs < ncovs)
            ):
                next_expr = expr
                thetas = curthetas
                nthetas = ncurthetas
                ncovs = ncurcovs

        if next_expr is None:
            break
        else:
            if nthetas == 1 and ncovs == 0:
                return next(iter(thetas))
            else:
                if isinstance(next_expr, tuple):
                    exprs = next_expr
                else:
                    exprs = next_expr.args
    raise ValueError(f"Could not find theta connected to {paramsymb}")


[docs] def add_individual_parameter(model: Model, name: str): """Add an individual or pk parameter to a model Parameters ---------- model : Model Pharmpy model name : str Name of individual/pk parameter Return ------ Model Pharmpy model object Examples -------- >>> from pharmpy.modeling import * >>> model = load_example_model("pheno") >>> model = add_individual_parameter(model, "KA") >>> model.statements.find_assignment("KA") KA = POP_KA """ model, _ = _add_parameter(model, name) model = model.update_source() return model
def _add_parameter( model: Model, name: str, init: float = 0.1, lower: float = 0, upper: Union[float, None] = None ): pops = create_symbol(model, f'POP_{name}') model = add_population_parameter(model, pops.name, init, lower=lower, upper=upper) symb = create_symbol(model, name) ass = Assignment.create(symb, pops) model = model.replace(statements=ass + model.statements) return model, symb
[docs] def set_first_order_elimination(model: Model): """Sets elimination to first order Parameters ---------- model : Model Pharmpy model Return ------ Model Pharmpy model object Examples -------- >>> from pharmpy.modeling import * >>> model = load_example_model("pheno") >>> model = set_first_order_elimination(model) >>> model.statements.ode_system Bolus(AMT, admid=1) → CENTRAL ┌───────┐ │CENTRAL│──CL/V→ └───────┘ See also -------- set_zero_order_elimination set_michaelis_menten_elimination """ if has_first_order_elimination(model): pass elif has_zero_order_elimination(model) or has_michaelis_menten_elimination(model): model = rename_symbols(model, {'POP_CLMM': 'POP_CL', 'IIV_CLMM': 'IIV_CL'}) ind = model.statements.find_assignment_index('CLMM') assert ind is not None clmm_assignment = model.statements[ind] assert isinstance(clmm_assignment, Assignment) cl_ass = Assignment.create(Expr.symbol('CL'), clmm_assignment.expression) statements = model.statements[0:ind] + cl_ass + model.statements[ind + 1 :] odes = statements.ode_system assert isinstance(odes, CompartmentalSystem) central = odes.central_compartment v = Expr.symbol('V') rate = odes.get_flow(central, output) if v not in rate.free_symbols: # take first parameter that starts with 'V' and is no longer than 2 characters v = [ idx for idx in list(map(lambda x: str(x), rate.free_symbols)) if idx[0] == 'V' and len(idx) <= 2 ] v = Expr.symbol(v[0]) cb = CompartmentalSystemBuilder(odes) cb.remove_flow(central, output) cb.add_flow(central, output, Expr.symbol('CL') / v) statements = statements.before_odes + CompartmentalSystem(cb) + statements.after_odes statements = statements.remove_symbol_definitions( {Expr.symbol('KM')}, statements.ode_system ) model = model.replace(statements=statements) model = remove_unused_parameters_and_rvs(model) elif has_mixed_mm_fo_elimination(model): odes = model.statements.ode_system assert odes is not None central = odes.central_compartment v = Expr.symbol('V') rate = odes.get_flow(central, output) if v not in rate.free_symbols: v = [ idx for idx in list(map(lambda x: str(x), rate.free_symbols)) if idx[0] == 'V' and len(idx) <= 2 ] v = Expr.symbol(v[0]) cb = CompartmentalSystemBuilder(odes) cb.remove_flow(central, output) cb.add_flow(central, output, Expr.symbol('CL') / v) statements = ( model.statements.before_odes + CompartmentalSystem(cb) + model.statements.after_odes ) statements = statements.remove_symbol_definitions( {Expr.symbol('KM'), Expr.symbol('CLMM')}, statements.ode_system ) model = model.replace(statements=statements) model = remove_unused_parameters_and_rvs(model) return model
[docs] def add_bioavailability(model: Model, add_parameter: bool = True, logit_transform: bool = False): """Add bioavailability statement for the first dose compartment of the model. Can be added as a new parameter or otherwise it will be set to 1. If added as a parameter, a logit transformation can also be applied. Parameters ---------- model : Model Pharmpy model add_parameter : bool Add new parameter representing bioavailability or not logit_transform : bool Logit transform the added bioavailability parameter. Return ------ Model Pharmpy model object Examples -------- >>> from pharmpy.modeling import * >>> model = load_example_model("pheno") >>> model = add_bioavailability(model) See also -------- remove_bioavailability """ odes = get_and_check_odes(model) dose_comp = odes.dosing_compartments[0] bio = dose_comp.bioavailability if bio.is_number(): # Bio not defined if add_parameter: model, bio_symb = _add_parameter(model, 'BIO', init=float(bio), upper=1.0) if logit_transform: model = model.replace( statements=model.statements.reassign( bio_symb, (Expr.symbol("POP_BIO") / (1 - Expr.symbol("POP_BIO"))).log() ) ) f_ass = Assignment(Expr.symbol('F_BIO'), 1 / (1 + (-bio_symb).exp())) else: f_ass = Assignment(Expr.symbol('F_BIO'), bio_symb) new_before_odes = model.statements.before_odes + f_ass else: # Add as a number bio_ass = Assignment(Expr.symbol("BIO"), Expr.integer(1)) f_ass = Assignment(Expr.symbol("F_BIO"), bio_ass.symbol) new_before_odes = bio_ass + model.statements.before_odes + f_ass # Add statement to code cb = CompartmentalSystemBuilder(odes) cb.set_bioavailability(dose_comp, f_ass.symbol) model = model.replace( statements=(new_before_odes + CompartmentalSystem(cb) + model.statements.after_odes) ) else: # BIO already defined, leave it alone? pass return model.update_source()
[docs] def remove_bioavailability(model: Model): """Remove bioavailability from the first dose compartment of model. Parameters ---------- model : Model Pharmpy model Return ------ Model Pharmpy model object Examples -------- >>> from pharmpy.modeling import * >>> model = load_example_model("pheno") >>> model = remove_bioavailability(model) See also -------- set_bioavailability """ odes = get_and_check_odes(model) dosing_comp = odes.dosing_compartments[0] bio = dosing_comp.bioavailability if bio: symbols = bio.free_symbols cb = CompartmentalSystemBuilder(odes) cb.set_bioavailability(dosing_comp, Expr.integer(1)) statements = ( model.statements.before_odes + CompartmentalSystem(cb) + model.statements.after_odes ) statements = statements.remove_symbol_definitions(symbols, statements.ode_system) model = model.replace(statements=statements) model = remove_unused_parameters_and_rvs(model) return model
[docs] def set_zero_order_elimination(model: Model): """Sets elimination to zero order. Initial estimate for KM is set to 1% of smallest observation. Parameters ---------- model : Model Pharmpy model Return ------ Model Pharmpy model object Examples -------- >>> from pharmpy.modeling import * >>> model = load_example_model("pheno") >>> model = set_zero_order_elimination(model) >>> model.statements.ode_system Bolus(AMT, admid=1) → CENTRAL ┌───────┐ │CENTRAL│──CLMM*KM/(V*(KM + A_CENTRAL(t)/V))→ └───────┘ See also -------- set_first_order_elimination set_michaelis_menten_elimination """ if has_zero_order_elimination(model): pass elif has_michaelis_menten_elimination(model): model = fix_parameters(model, 'POP_KM') elif has_mixed_mm_fo_elimination(model): model = fix_parameters(model, 'POP_KM') odes = get_and_check_odes(model) central = odes.central_compartment rate = odes.get_flow(central, output) rate = rate.subs({'CL': 0}) cb = CompartmentalSystemBuilder(odes) cb.remove_flow(central, output) cb.add_flow(central, output, rate) statements = ( model.statements.before_odes + CompartmentalSystem(cb) + model.statements.after_odes ) statements = statements.remove_symbol_definitions( {Expr.symbol('CL')}, statements.ode_system ) model = model.replace(statements=statements) model = remove_unused_parameters_and_rvs(model) else: model = _do_michaelis_menten_elimination(model) if model.dataset is not None: obs = get_observations(model) init = obs.min() / 100 # 1% of smallest observation else: init = 0.01 if init < 0: init = 0.01 model = fix_parameters_to(model, {'POP_KM': init}) return model
[docs] def has_michaelis_menten_elimination(model: Model): """Check if the model describes Michaelis-Menten elimination This function relies on heuristics and will not be able to detect all possible ways of coding the Michaelis-Menten elimination. Parameters ---------- model : Model Pharmpy model Return ------ bool True if model has describes Michaelis-Menten elimination Examples -------- >>> from pharmpy.modeling import * >>> model = load_example_model("pheno") >>> has_michaelis_menten_elimination(model) False >>> model = set_michaelis_menten_elimination(model) >>> has_michaelis_menten_elimination(model) True """ odes = model.statements.ode_system if odes is None: return False central = odes.central_compartment rate = odes.get_flow(central, output) is_nonlinear = odes.t in rate.free_symbols is_zero_order = 'POP_KM' in model.parameters and model.parameters['POP_KM'].fix could_be_mixed = Expr.symbol('CL') in rate.free_symbols return is_nonlinear and not is_zero_order and not could_be_mixed
[docs] def has_zero_order_elimination(model: Model): """Check if the model describes zero-order elimination This function relies on heuristics and will not be able to detect all possible ways of coding the zero-order elimination. Parameters ---------- model : Model Pharmpy model Return ------ bool True if model has describes zero order elimination Examples -------- >>> from pharmpy.modeling import * >>> model = load_example_model("pheno") >>> has_zero_order_elimination(model) False >>> model = set_zero_order_elimination(model) >>> has_zero_order_elimination(model) True """ odes = model.statements.ode_system if odes is None: return False central = odes.central_compartment rate = odes.get_flow(central, output) is_nonlinear = odes.t in rate.free_symbols is_zero_order = 'POP_KM' in model.parameters and model.parameters['POP_KM'].fix could_be_mixed = Expr.symbol('CL') in rate.free_symbols return is_nonlinear and is_zero_order and not could_be_mixed
[docs] def has_mixed_mm_fo_elimination(model: Model): """Check if the model describes mixed Michaelis-Menten and first order elimination This function relies on heuristics and will not be able to detect all possible ways of coding the mixed Michalis-Menten and first order elimination. Parameters ---------- model : Model Pharmpy model Return ------ bool True if model has describes Michaelis-Menten elimination Examples -------- >>> from pharmpy.modeling import * >>> model = load_example_model("pheno") >>> has_mixed_mm_fo_elimination(model) False >>> model = set_mixed_mm_fo_elimination(model) >>> has_mixed_mm_fo_elimination(model) True """ odes = model.statements.ode_system if odes is None: return False central = odes.central_compartment rate = odes.get_flow(central, output) is_nonlinear = odes.t in rate.free_symbols is_zero_order = 'POP_KM' in model.parameters and model.parameters['POP_KM'].fix could_be_mixed = Expr.symbol('CL') in rate.free_symbols return is_nonlinear and not is_zero_order and could_be_mixed
[docs] def has_first_order_elimination(model: Model): """Check if the model describes first order elimination This function relies on heuristics and will not be able to detect all possible ways of coding the first order elimination. Parameters ---------- model : Model Pharmpy model Return ------ bool True if model has describes first order elimination Examples -------- >>> from pharmpy.modeling import * >>> model = load_example_model("pheno") >>> has_first_order_elimination(model) True """ odes = model.statements.ode_system if odes is None: return False central = odes.central_compartment rate = odes.get_flow(central, output) is_nonlinear = odes.t in rate.free_symbols return not is_nonlinear
[docs] def set_michaelis_menten_elimination(model: Model): """Sets elimination to Michaelis-Menten. Note that the parametrization is not the usual, but is instead using a CLMM parameter. Initial estimate for CLMM is set to CL and KM is set to :math:`max(DV)/2`. Parameters ---------- model : Model Pharmpy model Return ------ Model Pharmpy model object Examples -------- >>> from pharmpy.modeling import * >>> model = load_example_model("pheno") >>> model = set_michaelis_menten_elimination(model) >>> model.statements.ode_system Bolus(AMT, admid=1) → CENTRAL ┌───────┐ │CENTRAL│──CLMM*KM/(V*(KM + A_CENTRAL(t)/V))→ └───────┘ See also -------- set_first_order_elimination set_zero_order_elimination """ if has_michaelis_menten_elimination(model): pass elif has_zero_order_elimination(model): model = unfix_parameters(model, 'POP_KM') elif has_mixed_mm_fo_elimination(model): odes = get_and_check_odes(model) central = odes.central_compartment rate = odes.get_flow(central, output) rate = rate.subs({'CL': 0}) cb = CompartmentalSystemBuilder(odes) cb.remove_flow(central, output) cb.add_flow(central, output, rate) statements = ( model.statements.before_odes + CompartmentalSystem(cb) + model.statements.after_odes ) statements = statements.remove_symbol_definitions( {Expr.symbol('CL')}, statements.ode_system ) model = model.replace(statements=statements) model = remove_unused_parameters_and_rvs(model) else: model = _do_michaelis_menten_elimination(model) return model
[docs] def set_mixed_mm_fo_elimination(model: Model): """Sets elimination to mixed Michaelis-Menten and first order. Initial estimate for CLMM is set to CL/2 and KM is set to :math:`max(DV)/2`. Parameters ---------- model : Model Pharmpy model Return ------ Model Pharmpy model object Examples -------- >>> from pharmpy.modeling import * >>> model = load_example_model("pheno") >>> model = set_mixed_mm_fo_elimination(model) >>> model.statements.ode_system Bolus(AMT, admid=1) → CENTRAL ┌───────┐ │CENTRAL│──(CL + CLMM*KM/(KM + A_CENTRAL(t)/V))/V→ └───────┘ See also -------- set_first_order_elimination set_zero_order_elimination set_michaelis_menten_elimination """ if has_mixed_mm_fo_elimination(model): pass elif has_michaelis_menten_elimination(model) or has_zero_order_elimination(model): model = unfix_parameters(model, 'POP_KM') odes = get_and_check_odes(model) central = odes.central_compartment model = add_individual_parameter(model, 'CL') rate = odes.get_flow(central, output) assert rate is not None cb = CompartmentalSystemBuilder(odes) cb.remove_flow(central, output) v = Expr.symbol('V') if v not in rate.free_symbols: v = Expr.symbol('VC') cb.add_flow(central, output, Expr.symbol('CL') / v + rate) statements = ( model.statements.before_odes + CompartmentalSystem(cb) + model.statements.after_odes ) model = model.replace(statements=statements) model = model.update_source() else: model = _do_michaelis_menten_elimination(model, combined=True) return model
def _do_michaelis_menten_elimination(model: Model, combined: bool = False): sset = model.statements odes = sset.ode_system assert isinstance(odes, CompartmentalSystem) central = odes.central_compartment old_rate = odes.get_flow(central, output) assert old_rate is not None numer, denom = old_rate.as_numer_denom() km_init, clmm_init = _get_mm_inits(model, numer, combined) model, km = _add_parameter(model, 'KM', init=km_init) if model.dataset is not None and 'idv' in model.datainfo.types and 'dv' in model.datainfo.types: maxobs = get_observations(model).max() else: maxobs = 1.0 model = set_upper_bounds(model, {'POP_KM': 1.5 * maxobs}) if denom != 1: if combined: cl = numer model, clmm = _add_parameter(model, 'CLMM', init=clmm_init) else: model = _rename_parameter(model, 'CL', 'CLMM') clmm = Expr.symbol('CLMM') cl = 0 vc = denom else: if combined: if model.statements.find_assignment('CL'): assignment = model.statements.find_assignment('CL') assert assignment is not None cl = assignment.symbol else: model, cl = _add_parameter(model, 'CL', clmm_init) else: cl = 0 if model.statements.find_assignment('VC'): assignment = sset.find_assignment('VC') assert assignment is not None vc = assignment.symbol else: model, vc = _add_parameter(model, 'VC') # FIXME: decide better initial estimate if not combined and model.statements.find_assignment('CL'): model = _rename_parameter(model, 'CL', 'CLMM') assignment = model.statements.find_assignment('CLMM') assert assignment is not None clmm = assignment.symbol else: model, clmm = _add_parameter(model, 'CLMM', init=clmm_init) rate = (clmm * km / (km + central.amount / vc) + cl) / vc cb = CompartmentalSystemBuilder(odes) cb.add_flow(central, output, rate) statements = ( model.statements.before_odes + CompartmentalSystem(cb) + model.statements.after_odes ) statements = statements.remove_symbol_definitions(numer.free_symbols, statements.ode_system) model = model.replace(statements=statements) model = remove_unused_parameters_and_rvs(model) return model def _rename_parameter(model: Model, old_name, new_name): statements = model.statements rvs = model.random_variables a = statements.find_assignment(old_name) assert a is not None d = {} for s in a.rhs_symbols: if s in model.parameters: old_par = s d[model.parameters[s].symbol] = f'POP_{new_name}' new_par = Expr.symbol(f'POP_{new_name}') statements = statements.subs({old_par: new_par}) break for s in a.rhs_symbols: iivs = rvs.iiv if s.name in iivs.names: cov = iivs.covariance_matrix ind = iivs.names.index(s.name) pars = [] for i in range(cov.rows): e = cov[ind, i] if e.is_symbol(): pars.append(e) diag = cov[ind, ind] d[diag] = f'IIV_{new_name}' for p in pars: if p != diag: if p.name.startswith('IIV'): d[p] = p.name.replace(f'IIV_{old_name}', f'IIV_{new_name}') rvs = rvs.subs(d) break new = [] for p in model.parameters: if p.symbol in d: newparam = Parameter( name=d[p.symbol], init=p.init, upper=p.upper, lower=p.lower, fix=p.fix ) else: newparam = p new.append(newparam) parameters = Parameters.create(new) statements = statements.subs({old_name: new_name}) model = model.replace(statements=statements, parameters=parameters, random_variables=rvs) return model def _get_mm_inits(model: Model, rate_numer, combined): pset, sset = model.parameters, model.statements parameter = _extract_params_from_symb(sset, rate_numer.name, pset) assert parameter is not None clmm_init = parameter.init if combined: clmm_init /= 2 if model.dataset is not None and 'idv' in model.datainfo.types and 'dv' in model.datainfo.types: dv_max = get_observations(model).max() else: dv_max = 1.0 km_init = dv_max / 2 # FIXME: Cap initial estimate, this is NONMEM specific and should be handled more generally # (https://github.com/pharmpy/pharmpy/issues/1395) if km_init >= 10**6: km_init = 5 * 10**5 return km_init, clmm_init
[docs] def set_transit_compartments(model: Model, n: int, keep_depot: bool = True): """Set the number of transit compartments of model. Initial estimate for absorption rate is set the previous rate if available, otherwise it is set to the time of first observation/2. Parameters ---------- model : Model Pharmpy model n : int Number of transit compartments keep_depot : bool False to convert depot compartment into a transit compartment Return ------ Model Pharmpy model object Examples -------- >>> from pharmpy.modeling import * >>> model = load_example_model("pheno") >>> model = set_transit_compartments(model, 3) >>> model.statements.ode_system Bolus(AMT, admid=1) → TRANSIT1 ┌────────┐ ┌────────┐ ┌────────┐ ┌───────┐ │TRANSIT1│──K12→│TRANSIT2│──K23→│TRANSIT3│──K34→│CENTRAL│──K40→ └────────┘ └────────┘ └────────┘ └───────┘ See also -------- add_lag_time """ statements = model.statements cs = get_and_check_odes(model) transits = cs.find_transit_compartments(statements) try: n = _as_integer(n) except ValueError: raise ValueError(f'Number of compartments must be integer: {n}') model = remove_lag_time(model) # Handle keep_depot option depot = cs.find_depot(statements) mdt_init = None mdt_assign = None if not keep_depot and depot: central = cs.central_compartment rate = cs.get_flow(depot, central) assert rate is not None if not rate.is_symbol(): num, den = rate.as_numer_denom() if num == 1 and den.is_symbol(): symbol = den else: symbol = None else: symbol = rate if symbol: mdt_init = _extract_params_from_symb(statements, symbol.name, model.parameters).init inflows = cs.get_compartment_inflows(depot) cb = CompartmentalSystemBuilder(cs) if len(inflows) == 1: innode, inflow = inflows[0] cb.add_flow(innode, central, inflow) else: cb.set_dose(central, depot.doses[0]) if statements.find_assignment('MAT'): model = _rename_parameter(model, 'MAT', 'MDT') statements = model.statements mdt_assign = statements.find_assignment('MDT') cb.remove_compartment(depot) statements = statements.before_odes + CompartmentalSystem(cb) + statements.after_odes statements = statements.remove_symbol_definitions(rate.free_symbols, statements.ode_system) if mdt_assign and not statements.find_assignment('MDT'): statements = mdt_assign + statements model = model.replace(statements=statements) model = remove_unused_parameters_and_rvs(model) odes = statements.ode_system assert odes is not None # Since update_source() is used after removing the depot and statements are immutable, we need to # reset to get the correct rate names cs = model.statements.ode_system if len(transits) == n: return model elif n == 1 and has_instantaneous_absorption(model): raise ValueError( "Cannot set the number of transits to 1 for model with instantaneous " "absorption. The resulting model cannot be distinguished from first order absorption" ) elif len(transits) == 0: if mdt_assign: mdt_symb = mdt_assign.symbol else: if mdt_init is not None: init = mdt_init else: init = _get_absorption_init(model, 'MDT') model, mdt_symb = _add_parameter(model, 'MDT', init=init) rate = n / mdt_symb dosing_comp = cs.dosing_compartments[0] comp = dosing_comp cb = CompartmentalSystemBuilder(cs) while n > 0: new_comp = Compartment.create(f'TRANSIT{n}') cb.add_compartment(new_comp) n -= 1 cb.add_flow(new_comp, comp, rate) comp = new_comp comp = cb.set_bioavailability(comp, dosing_comp.bioavailability) dosing_comp = cb.set_bioavailability(dosing_comp, Expr.integer(1)) if len(dosing_comp.doses) == 1 and dosing_comp.doses[0].admid == 2: dosing_comp = cb.set_dose(dosing_comp, dosing_comp.doses[0].replace(admid=1)) cb.move_dose(dosing_comp, comp, admid=1) if len(dosing_comp.doses) == 1: cb.set_dose(comp, dosing_comp.doses) else: dose = _sorted_doses(dosing_comp, model)[0] cb.add_dose(comp, dose) cb.set_dose(dosing_comp, _sorted_doses(dosing_comp, model)[1:]) statements = ( model.statements.before_odes + CompartmentalSystem(cb) + model.statements.after_odes ) model = model.replace(statements=statements) model = model.update_source() elif len(transits) > n: nremove = len(transits) - n removed_symbols = set() remaining = set(transits) trans, destination, flow = _find_last_transit(cs, remaining) cb = CompartmentalSystemBuilder(cs) while nremove > 0: try: from_comp, from_flow = cs.get_compartment_inflows(trans)[0] except IndexError: # Final remaining transit final_transit = True else: cb.add_flow(from_comp, destination, from_flow) final_transit = False cb.remove_compartment(trans) remaining.remove(trans) removed_symbols |= flow.free_symbols if not final_transit: trans = from_comp flow = from_flow nremove -= 1 if n == 0: dose = cs.dosing_compartments[0].doses[0] cb.set_dose(destination, dose) statements = ( model.statements.before_odes + CompartmentalSystem(cb) + model.statements.after_odes ) model = model.replace(statements=statements) model = _update_numerators(model) statements = model.statements.remove_symbol_definitions( removed_symbols, model.statements.ode_system ) model = model.replace(statements=statements) model = remove_unused_parameters_and_rvs(model) else: nadd = n - len(transits) last, destination, rate = _find_last_transit(cs, set(transits)) cb = CompartmentalSystemBuilder(cs) cb.remove_flow(last, destination) while nadd > 0: new_comp = Compartment.create(f'TRANSIT{n - nadd + 1}') cb.add_compartment(new_comp) cb.add_flow(last, new_comp, rate) if rate.is_symbol(): ass = statements.find_assignment(rate.name) if ass is not None: rate = ass.expression last = new_comp nadd -= 1 cb.add_flow(last, destination, rate) statements = ( model.statements.before_odes + CompartmentalSystem(cb) + model.statements.after_odes ) model = model.replace(statements=statements) model = _update_numerators(model) model = model.update_source() return model
def _find_last_transit(odes: CompartmentalSystem, transits: set[Compartment]): for trans in transits: destination, flow = odes.get_compartment_outflows(trans)[0] if destination not in transits: return trans, destination, flow raise ValueError('Could not find last transit') def _update_numerators(model: Model): # update numerators for transit compartment rates statements = model.statements odes = statements.ode_system assert odes is not None transits = odes.find_transit_compartments(statements) new_numerator = Expr.integer(len(transits)) cb = CompartmentalSystemBuilder(odes) for comp in transits: to_comp, rate = odes.get_compartment_outflows(comp)[0] numer, denom = rate.as_numer_denom() if numer.is_integer() and numer != new_numerator: new_rate = new_numerator / denom cb.add_flow(comp, to_comp, new_rate) elif numer.is_symbol(): ass = statements.find_assignment(numer.name) if ass is not None: ass_numer, ass_denom = ass.expression.as_numer_denom() if ass_numer.is_integer() and ass_numer != new_numerator: new_rate = new_numerator / ass_denom statements = statements.reassign(numer, new_rate) model = model.replace( statements=statements.before_odes + CompartmentalSystem(cb) + statements.after_odes ) return model
[docs] def add_lag_time(model: Model): """Add lag time to the dose compartment of model. Initial estimate for lag time is set the previous lag time if available, otherwise it is set to the time of first observation/2. Parameters ---------- model : Model Pharmpy model Return ------ Model Pharmpy model object Examples -------- >>> from pharmpy.modeling import * >>> model = load_example_model("pheno") >>> model = add_lag_time(model) See also -------- set_transit_compartments remove_lag_time """ odes = get_and_check_odes(model) dosing_comp = odes.dosing_compartments[0] old_lag_time = dosing_comp.lag_time model, mdt_symb = _add_parameter(model, 'MDT', init=_get_absorption_init(model, 'MDT')) cb = CompartmentalSystemBuilder(odes) dosing_comp = cb.set_lag_time(dosing_comp, mdt_symb) # FIXME: Very temporary until new zo absorption logic is implemented if len(dosing_comp.doses) > 1: cb.set_lag_time(dosing_comp, Expr.symbol("lag_time")) doses = _sorted_doses(dosing_comp, model) oral_admid = doses[0].admid admid = Expr.symbol("ADMID") model = model.replace( statements=( model.statements.before_odes + Assignment.create( Expr.symbol("lag_time"), Expr.piecewise((mdt_symb, sympy.Eq(admid, oral_admid)), (0, sympy.true)), ) + CompartmentalSystem(cb) + model.statements.after_odes ) ) else: model = model.replace( statements=( model.statements.before_odes + CompartmentalSystem(cb) + model.statements.after_odes ) ) if old_lag_time: model = model.replace( statements=model.statements.remove_symbol_definitions( old_lag_time.free_symbols, model.statements.ode_system ) ) model = remove_unused_parameters_and_rvs(model) else: model = model.update_source() return model
[docs] def remove_lag_time(model: Model): """Remove lag time from the dose compartment of model. Parameters ---------- model : Model Pharmpy model Return ------ Model Pharmpy model object Examples -------- >>> from pharmpy.modeling import * >>> model = load_example_model("pheno") >>> model = remove_lag_time(model) See also -------- set_transit_compartments add_lag_time """ odes = get_and_check_odes(model) dosing_comp = odes.dosing_compartments[0] lag_time = dosing_comp.lag_time if lag_time: symbols = lag_time.free_symbols cb = CompartmentalSystemBuilder(odes) cb.set_lag_time(dosing_comp, Expr.integer(0)) statements = ( model.statements.before_odes + CompartmentalSystem(cb) + model.statements.after_odes ) statements = statements.remove_symbol_definitions(symbols, statements.ode_system) model = model.replace(statements=statements) model = remove_unused_parameters_and_rvs(model) return model
[docs] def set_zero_order_absorption(model: Model): """Set or change to zero order absorption rate. Initial estimate for absorption rate is set the previous rate if available, otherwise it is set to the time of first observation/2. Parameters ---------- model : Model Model to set or change to first order absorption rate Return ------ Model Pharmpy model object Examples -------- >>> from pharmpy.modeling import * >>> model = load_example_model("pheno") >>> model = set_zero_order_absorption(model) >>> model.statements.ode_system Infusion(AMT, admid=1, duration=D1) → CENTRAL ┌───────┐ │CENTRAL│──CL/V→ └───────┘ See also -------- set_instantaneous_absorption set_first_order_absorption """ statements = model.statements odes = get_and_check_odes(model) _disallow_infusion(model, odes) assert isinstance(odes, CompartmentalSystem) if has_zero_order_absorption(model) and not has_seq_zo_fo_absorption(model): pass else: _disallow_infusion(model, odes) depot = odes.find_depot(statements) dose_comp = odes.dosing_compartments[0] symbols = dose_comp.free_symbols dose = _sorted_doses(dose_comp, model)[0] lag_time = dose_comp.lag_time if depot: to_comp, _ = odes.get_compartment_outflows(depot)[0] ka = odes.get_flow(depot, odes.central_compartment) assert ka is not None cb = CompartmentalSystemBuilder(odes) cb.remove_compartment(depot) to_comp = cb.add_dose(to_comp, dose) to_comp = cb.set_lag_time(to_comp, depot.lag_time) cb.set_bioavailability(to_comp, depot.bioavailability) statements = statements.before_odes + CompartmentalSystem(cb) + statements.after_odes symbols = ka.free_symbols new_statements = statements.remove_symbol_definitions(symbols, statements.ode_system) mat_idx = statements.find_assignment_index('MAT') if mat_idx is not None: # FIXME : Causes issue if mat_assign statement is dependent on previously # removed parameters/statements mat_assign = statements[mat_idx] new_statements = new_statements[0:mat_idx] + mat_assign + new_statements[mat_idx:] model = model.replace(statements=new_statements) model = remove_unused_parameters_and_rvs(model) if not has_zero_order_absorption(model): odes = model.statements.ode_system assert odes is not None model = _add_zero_order_absorption( model, dose, odes.dosing_compartments[0], 'MAT', lag_time ) model = model.update_source() # FIXME : Very temporary until new zo absorption logic is implemented if lag_time != 0 and len(model.statements.ode_system.dosing_compartments[0].doses) > 1: model = remove_lag_time(model) model = add_lag_time(model) return model
[docs] def set_first_order_absorption(model: Model): """Set or change to first order absorption rate. Initial estimate for absorption rate is set to the previous rate if available, otherwise it is set to the time of first observation/2. If multiple doses is set to the affected compartment, currently only iv+oral doses (one of each) is supported Parameters ---------- model : Model Model to set or change to use first order absorption rate Return ------ Model Pharmpy model object Examples -------- >>> from pharmpy.modeling import * >>> model = load_example_model("pheno") >>> model = set_first_order_absorption(model) >>> model.statements.ode_system Bolus(AMT, admid=1) → DEPOT ┌─────┐ ┌───────┐ │DEPOT│──KA→│CENTRAL│──CL/V→ └─────┘ └───────┘ See also -------- set_instantaneous_absorption set_zero_order_absorption """ statements = model.statements cs = get_and_check_odes(model) if has_first_order_absorption(model) and not has_seq_zo_fo_absorption(model): pass else: depot = cs.find_depot(statements) dose_comp = cs.dosing_compartments[0] amount = dose_comp.doses[0].amount symbols = dose_comp.free_symbols lag_time = dose_comp.lag_time bio = dose_comp.bioavailability cb = CompartmentalSystemBuilder(cs) if depot and depot == dose_comp: dose = dose_comp.doses[0] dose_comp = cb.remove_dose(dose_comp, admid=dose.admid) dose_comp = cb.set_dose(dose_comp, Bolus(dose.amount)) dose_comp = cb.set_lag_time(dose_comp, Expr.integer(0)) if not depot: # TODO : Add another way of removing dependencies dose_admid = _sorted_doses(dose_comp, model)[0].admid if len(dose_comp.doses) == 1: dose_comp = cb.set_dose(dose_comp, Bolus(amount)) remove_dose = True else: dose_comp = cb.set_dose(dose_comp, _sorted_doses(dose_comp, model)[1:]) remove_dose = False statements = statements.before_odes + CompartmentalSystem(cb) + statements.after_odes new_statements = statements.remove_symbol_definitions(symbols, statements.ode_system) mat_idx = statements.find_assignment_index('MAT') if mat_idx is not None: mat_assign = statements[mat_idx] new_statements = new_statements[0:mat_idx] + mat_assign + new_statements[mat_idx:] model = model.replace(statements=new_statements) model = remove_unused_parameters_and_rvs(model) if not depot: # The new dose is created here model, _ = _add_first_order_absorption( model, Bolus(amount, admid=dose_admid), dose_comp, lag_time, bio, remove_dose=remove_dose, ) model = model.update_source() return model
[docs] def set_instantaneous_absorption(model: Model): """Set or change to instantaneous absorption rate. Currently lagtime together with instantaneous absorption is not supported. Parameters ---------- model : Model Model to set or change absorption rate Return ------ Model Pharmpy model object Examples -------- >>> from pharmpy.modeling import * >>> model = load_example_model("pheno") >>> model = set_instantaneous_absorption(model) >>> model.statements.ode_system Bolus(AMT, admid=1) → CENTRAL ┌───────┐ │CENTRAL│──CL/V→ └───────┘ See also -------- set_zero_order_absorption set_first_order_absorption """ statements = model.statements cs = get_and_check_odes(model) if has_instantaneous_absorption(model): pass else: depot = cs.find_depot(statements) if depot: to_comp, _ = cs.get_compartment_outflows(depot)[0] cb = CompartmentalSystemBuilder(cs) cb.set_dose(to_comp, depot.doses[0]) ka = cs.get_flow(depot, cs.central_compartment) cb.remove_compartment(depot) symbols = ka.free_symbols statements = statements.before_odes + CompartmentalSystem(cb) + statements.after_odes model = model.replace( statements=statements.remove_symbol_definitions(symbols, statements.ode_system) ) model = remove_unused_parameters_and_rvs(model) if has_zero_order_absorption(model): dose_comp = cs.dosing_compartments[0] old_symbols = dose_comp.free_symbols cb = CompartmentalSystemBuilder(cs) new_dose = Bolus(_sorted_doses(dose_comp, model)[0].amount) if len(dose_comp.doses) > 1: cb.set_dose(dose_comp, (new_dose,) + _sorted_doses(dose_comp, model)[1:]) else: cb.set_dose(dose_comp, new_dose) unneeded_symbols = old_symbols - new_dose.free_symbols statements = statements.before_odes + CompartmentalSystem(cb) + statements.after_odes model = model.replace( statements=statements.remove_symbol_definitions( unneeded_symbols, statements.ode_system ) ) model = remove_unused_parameters_and_rvs(model) return model
[docs] def set_seq_zo_fo_absorption(model: Model): """Set or change to sequential zero order first order absorption rate. Initial estimate for absorption rate is set the previous rate if available, otherwise it is set to the time of first observation/2. Currently lagtime together with sequential zero order first order absorption is not supported. Parameters ---------- model : Model Model to set or change absorption rate Return ------ Model Pharmpy model object Examples -------- >>> from pharmpy.modeling import * >>> model = load_example_model("pheno") >>> model = set_seq_zo_fo_absorption(model) >>> model.statements.ode_system Infusion(AMT, admid=1, duration=D1) → DEPOT ┌─────┐ ┌───────┐ │DEPOT│──KA→│CENTRAL│──CL/V→ └─────┘ └───────┘ See also -------- set_instantaneous_absorption set_zero_order_absorption set_first_order_absorption """ statements = model.statements cs = get_and_check_odes(model) if has_seq_zo_fo_absorption(model): pass else: _disallow_infusion(model, cs) depot = cs.find_depot(statements) dose_comp = cs.dosing_compartments[0] have_ZO = has_zero_order_absorption(model) if depot and not have_ZO: model = _add_zero_order_absorption(model, dose_comp.doses[0], depot, 'MDT') elif not depot and have_ZO: if len(dose_comp.doses) == 1: fo_dose = dose_comp.doses[0] remove_dose = True else: fo_dose = _sorted_doses(dose_comp, model)[0] cb = CompartmentalSystemBuilder(model.statements.ode_system) dose_comp = cb.set_dose(dose_comp, _sorted_doses(dose_comp, model)[1:]) model = model.replace( statements=model.statements.before_odes + CompartmentalSystem(cb) + model.statements.after_odes ) remove_dose = False model, _ = _add_first_order_absorption( model, fo_dose, dose_comp, remove_dose=remove_dose ) elif not depot and not have_ZO: model = set_first_order_absorption(model) depot = model.statements.ode_system.find_depot(model.statements) model = _add_zero_order_absorption( model, Bolus(dose_comp.doses[0].amount), depot, 'MDT' ) model = model.update_source() return model
def _disallow_infusion(model, odes): dose_comp = odes.dosing_compartments[0] if isinstance(dose_comp.doses[0], Infusion): if dose_comp.doses[0].rate is not None: ex = dose_comp.doses[0].rate else: ex = dose_comp.doses[0].duration assert ex is not None for s in ex.free_symbols: if s.name in model.datainfo.names: raise ModelError("Model already has an infusion given in the dataset")
[docs] def has_zero_order_absorption(model: Model): """Check if ode system describes a zero order absorption currently defined as having Infusion dose with rate not in dataset Parameters ---------- model : Model Pharmpy model Return ------ bool Whether the model has zero order absorption or not Examples -------- >>> from pharmpy.modeling import * >>> model = load_example_model("pheno") >>> has_zero_order_absorption(model) False """ odes = model.statements.ode_system if odes is None: return False dosing = odes.dosing_compartments[0] dose = dosing.doses[0] return _dose_zo(model, dose)
def _dose_zo(model, dose): if isinstance(dose, Infusion): if dose.rate is None: value = dose.duration else: value = dose.rate if isinstance(value, str) or isinstance(value, Expr) and value.is_symbol(): name = str(value) if name not in model.datainfo.names: return True elif isinstance(value, Expr): assert value is not None names = {symb.name for symb in value.free_symbols} if not all(name in model.datainfo.names for name in names): return True return False def _sorted_doses(comp, model): """Return doses to compartment where oral doses are located first""" if len(comp.doses) > 1: return tuple(sorted(comp.doses, key=lambda d: _dose_zo(model, d), reverse=True)) else: return comp.doses
[docs] def has_first_order_absorption(model: Model): """Check if ode system describes a first order absorption Currently defined as the central compartment having a unidirectional input flow from another compartment (such as depot or transit) Parameters ---------- model : Model Pharmpy model Return ------- Bool : True if model has first order absorption """ odes = model.statements.ode_system if odes is None: return False dosing = odes.dosing_compartments[0] central = odes.central_compartment if dosing == central: return False in_flow = [flow[0] for flow in odes.get_compartment_inflows(central) if flow[0] != output] out_flow = [flow[0] for flow in odes.get_compartment_outflows(central) if flow[0] != output] unidirectional_flow = [uni_flow for uni_flow in in_flow if uni_flow not in out_flow] if len(unidirectional_flow) == 1: return True return False
[docs] def has_instantaneous_absorption(model: Model): """Check if ode system describes a instantaneous absorption Defined as being a instantaneous dose directly into the central compartment Parameters ---------- model : Model Pharmpy model Return ------- Bool : True if model has instantaneous absorption """ odes = model.statements.ode_system if odes is None: return False dosing = odes.dosing_compartments[0] central = odes.central_compartment if dosing != central: return False if isinstance(dosing.doses[0], Bolus): return True return False
[docs] def has_seq_zo_fo_absorption(model: Model): """Check if ode system describes a sequential zero-order, first-order absorption Defined as the model having both zero- and first-order absorption. Parameters ---------- model : Model DPharmpy model See also -------- has_zero_order_absorption has_first_order_absorption """ if has_zero_order_absorption(model) and has_first_order_absorption(model): return True else: return False
[docs] def get_number_of_peripheral_compartments(model: Model): """Return the number of peripherals compartments connected to the central compartment Parameters ---------- model : Model Pharmpy model Returns ------- int Number of peripherals compartments """ # Redundant function ? odes = get_and_check_odes(model) return len(odes.find_peripheral_compartments())
[docs] def get_number_of_transit_compartments(model: Model): """Return the number of transit compartments in the model Parameters ---------- model : Model Pharmpy model Returns ------- int Number of transit compartments """ # Redundant function ? odes = get_and_check_odes(model) return len(odes.find_transit_compartments(model.statements))
def has_lag_time(model: Model): """Check if ode system is defined with absorption lag time Parameters ---------- model : Model Pharmpy model Return ------- Bool : True if model is defined with lagtime """ odes = model.statements.ode_system if odes is None: return False dosing = odes.dosing_compartments[0] if dosing.lag_time: return True return False def _add_zero_order_absorption( model, old_dose, to_comp, parameter_name, lag_time=None, replace=True ): """Add zero order absorption to a compartment. Initial estimate for absorption rate is set the previous rate if available, otherwise it is set to the time of first observation/2 is used. Disregards what is currently in the model. """ mat_assign = model.statements.find_assignment(parameter_name) if mat_assign: mat_symb = mat_assign.symbol else: model, mat_symb = _add_parameter( model, parameter_name, init=_get_absorption_init(model, parameter_name) ) new_dose = Infusion(old_dose.amount, admid=old_dose.admid, duration=mat_symb * 2) cb = CompartmentalSystemBuilder(model.statements.ode_system) dose_list = [new_dose] + list(to_comp.doses) dose_list.remove(old_dose) if replace: cb.set_dose(to_comp, tuple(dose_list)) else: cb.add_dose(to_comp, tuple(dose_list)) if lag_time is not None and lag_time != 0: cb.set_lag_time(model.statements.ode_system.dosing_compartments[0], lag_time) model = model.replace( statements=model.statements.before_odes + CompartmentalSystem(cb) + model.statements.after_odes ) return model def _add_first_order_absorption( model, dose, to_comp, lag_time=None, bioavailability=None, remove_dose=True ): """Add first order absorption Disregards what is currently in the model. """ odes = model.statements.ode_system cb = CompartmentalSystemBuilder(odes) depot = Compartment.create( 'DEPOT', doses=(dose,), lag_time=Expr.integer(0) if lag_time is None else lag_time, bioavailability=Expr.integer(1) if bioavailability is None else bioavailability, ) cb.add_compartment(depot) if remove_dose: to_comp = cb.set_dose(to_comp, tuple()) to_comp = cb.set_lag_time(to_comp, Expr.integer(0)) to_comp = cb.set_bioavailability(to_comp, Expr.integer(1)) mat_assign = model.statements.find_assignment('MAT') if mat_assign: mat_symb = mat_assign.symbol else: model, mat_symb = _add_parameter(model, 'MAT', _get_absorption_init(model, 'MAT')) cb.add_flow(depot, to_comp, 1 / mat_symb) model = model.replace( statements=model.statements.before_odes + CompartmentalSystem(cb) + model.statements.after_odes ) return model, depot def _get_absorption_init(model, param_name) -> float: try: if param_name == 'MDT': param_prev = model.statements.lag_time else: param_prev = _extract_params_from_symb(model.statements, param_name, model.parameters) return param_prev.init except (AttributeError, KeyError): pass time_label = model.datainfo.idv_column.name if model.dataset is None: time_min = 1.0 else: obs = get_observations(model) time = obs.index.get_level_values(level=time_label) time_min = time[time > 0].min() if param_name == 'MDT': init = float(time_min) / 2 elif param_name == 'MAT': init = float(time_min) * 2 else: raise NotImplementedError('param_name must be MDT or MAT') if init < 0: init = 0.1 return init
[docs] def set_peripheral_compartments(model: Model, n: int, name: str = None): """Sets the number of peripheral compartments for central compartment to a specified number. If name is set, the peripheral compartment will be added to the compartment with the specified name instead. Parameters ---------- model : Model Pharmpy model n : int Number of transit compartments name : str Name of compartment to add peripheral to. Return ------ Model Pharmpy model object Examples -------- >>> from pharmpy.modeling import * >>> model = load_example_model("pheno") >>> model = set_peripheral_compartments(model, 2) >>> model.statements.ode_system Bolus(AMT, admid=1) → CENTRAL ┌───────────┐ │PERIPHERAL1│ └───────────┘ ↑ │ Q2/V1 Q2/V2 │ ↓ ┌───────────┐ │ CENTRAL │──CL/V1→ └───────────┘ ↑ │ Q3/V3 Q3/V1 │ ↓ ┌───────────┐ │PERIPHERAL2│ └───────────┘ See also -------- add_peripheral_compartment remove_peripheral_compartment """ odes = model.statements.ode_system try: n = _as_integer(n) except TypeError: raise TypeError(f'Number of compartments must be integer: {n}') per = len(odes.find_peripheral_compartments()) if per < n: for _ in range(n - per): model = add_peripheral_compartment(model, name=name) elif per > n: for _ in range(per - n): model = remove_peripheral_compartment(model, name=name) return model
[docs] def add_peripheral_compartment(model: Model, name: str = None): r"""Add a peripheral distribution compartment to model The rate of flow from the central to the peripheral compartment will be parameterized as QPn / VC where VC is the volume of the central compartment. The rate of flow from the peripheral to the central compartment will be parameterized as QPn / VPn where VPn is the volumne of the added peripheral compartment. If name is set, the peripheral compartment will be added to the compartment with the specified name instead. Initial estimates: == =================================================== n == =================================================== 1 :math:`\mathsf{CL} = \mathsf{CL'}`, :math:`\mathsf{VC} = \mathsf{VC'}`, :math:`\mathsf{QP1} = \mathsf{CL'}` and :math:`\mathsf{VP1} = \mathsf{VC'} \cdot 0.05` 2 :math:`\mathsf{QP1} = \mathsf{QP1' * 0.1}`, :math:`\mathsf{VP1} = \mathsf{VP1'}`, :math:`\mathsf{QP2} = \mathsf{QP1' * 0.9}` and :math:`\mathsf{VP2} = \mathsf{VP1'}` == =================================================== Parameters ---------- model : Model Pharmpy model name : str Name of compartment to add peripheral to. Return ------ Model Pharmpy model object Examples -------- >>> from pharmpy.modeling import * >>> model = load_example_model("pheno") >>> model = add_peripheral_compartment(model) >>> model.statements.ode_system Bolus(AMT, admid=1) → CENTRAL ┌───────────┐ │PERIPHERAL1│ └───────────┘ ↑ │ Q/V1 Q/V2 │ ↓ ┌───────────┐ │ CENTRAL │──CL/V1→ └───────────┘ See also -------- set_peripheral_compartment remove_peripheral_compartment """ statements = model.statements odes = get_and_check_odes(model) if name: other_compartment = odes.find_compartment(name) if other_compartment is None: raise ValueError(f'{name} is not a compartment name.') per = odes.find_peripheral_compartments(name) central = other_compartment else: per = odes.find_peripheral_compartments() central = odes.central_compartment n = len(per) + 1 elimination_rate = odes.get_flow(central, output) assert elimination_rate is not None if has_mixed_mm_fo_elimination(model): elimination_rate = Expr(sympy.expand(elimination_rate).args[0]) cl, vc = elimination_rate.as_numer_denom() if cl.is_symbol() and vc == 1: # If K = CL / V s = statements.find_assignment(cl.name) assert s is not None cl, vc = s.expression.as_numer_denom() # Heuristic to handle the MM case if vc.is_mul(): vc = vc.args[0] cb = CompartmentalSystemBuilder(odes) # NOTE: Only used if vc != 1 qp_init = 0.1 vp_init = 0.1 if n == 1: if vc != 1: # Heuristic to handle the Mixed MM-FO case if cl.is_add(): cl1 = cl.args[0] if cl1.is_mul(): cl = cl1.args[0] pop_cl = _find_noncov_theta(model, cl) pop_vc = _find_noncov_theta(model, vc) pop_cl_init = model.parameters[pop_cl].init pop_vc_init = model.parameters[pop_vc].init qp_init = pop_cl_init vp_init = pop_vc_init * 0.05 elif n == 2: if vc != 1: per1 = per[0] from_rate = odes.get_flow(per1, central) assert from_rate is not None qp1, vp1 = from_rate.as_numer_denom() if qp1.is_symbol() and vc == 1: # If K = CL / V s = statements.find_assignment(qp1.name) assert s is not None qp1, vp1 = s.expression.as_numer_denom() full_qp1 = statements.before_odes.full_expression(qp1) full_vp1 = statements.before_odes.full_expression(vp1) if full_vp1 == 1: full_qp1, full_vp1 = full_qp1.as_numer_denom() pop_qp1 = _find_noncov_theta(model, full_qp1, full=True) pop_vp1 = _find_noncov_theta(model, full_vp1, full=True) pop_qp1_init = model.parameters[pop_qp1].init pop_vp1_init = model.parameters[pop_vp1].init model = set_initial_estimates(model, {pop_qp1.name: pop_qp1_init * 0.10}) qp_init = pop_qp1_init * 0.90 vp_init = pop_vp1_init if vc != 1: model, qp = _add_parameter(model, f'QP{n}', init=qp_init) model, vp = _add_parameter(model, f'VP{n}', init=vp_init) if name: peripheral = Compartment.create(f'{name}_PERIPHERAL{n}') else: peripheral = Compartment.create(f'PERIPHERAL{n}') cb.add_compartment(peripheral) cb.add_flow(central, peripheral, qp / vc) cb.add_flow(peripheral, central, qp / vp) elif vc == 1: model, kpc = _add_parameter(model, f'KPC{n}', init=0.1) model, kcp = _add_parameter(model, f'KCP{n}', init=0.1) if name: peripheral = Compartment.create(f'{name}_PERIPHERAL{n}') else: peripheral = Compartment.create(f'PERIPHERAL{n}') cb.add_compartment(peripheral) cb.add_flow(central, peripheral, kcp) cb.add_flow(peripheral, central, kpc) model = model.replace( statements=Statements( model.statements.before_odes + CompartmentalSystem(cb) + model.statements.after_odes ) ) return model.update_source()
[docs] def remove_peripheral_compartment(model: Model, name: str = None): r"""Remove a peripheral distribution compartment from model If name is set, a peripheral compartment will be removed from the compartment with the specified name. Initial estimates: == =================================================== n == =================================================== 2 :math:`\mathsf{CL} = \mathsf{CL'}`, :math:`\mathsf{QP1} = \mathsf{CL'}` and :math:`\mathsf{VP1} = \mathsf{VC'} \cdot 0.05` 3 :math:`\mathsf{QP1} = (\mathsf{QP1'} + \mathsf{QP2'}) / 2`, :math:`\mathsf{VP1} = \mathsf{VP1'} + \mathsf{VP2'}` == =================================================== Parameters ---------- model : Model Pharmpy model name : str Name of compartment to remove peripheral compartment from. Return ------ Model Pharmpy model object Examples -------- >>> from pharmpy.modeling import * >>> model = load_example_model("pheno") >>> model = set_peripheral_compartments(model, 2) >>> model = remove_peripheral_compartment(model) >>> model.statements.ode_system Bolus(AMT, admid=1) → CENTRAL ┌───────────┐ │PERIPHERAL1│ └───────────┘ ↑ │ Q/V1 Q/V2 │ ↓ ┌───────────┐ │ CENTRAL │──CL/V1→ └───────────┘ See also -------- set_peripheral_compartment add_peripheral_compartment """ odes = get_and_check_odes(model) peripherals = odes.find_peripheral_compartments(name) if peripherals: last_peripheral = peripherals[-1] if name: central = odes.find_compartment(name) else: central = odes.central_compartment if len(peripherals) == 1: # TODO: Elimination can be zero (drug metabolite) elimination_rate = odes.get_flow(central, output) if elimination_rate == 0: pass else: cl, vc = elimination_rate.as_numer_denom() if vc != 1: from_rate = odes.get_flow(last_peripheral, central) assert from_rate is not None qp1, vp1 = from_rate.as_numer_denom() pop_cl = _find_noncov_theta(model, cl) pop_vc = _find_noncov_theta(model, vc) pop_qp1 = _find_noncov_theta(model, qp1) pop_vp1 = _find_noncov_theta(model, vp1) pop_vc_init = model.parameters[pop_vc].init pop_cl_init = model.parameters[pop_cl].init pop_qp1_init = model.parameters[pop_qp1].init pop_vp1_init = model.parameters[pop_vp1].init new_vc_init = pop_vc_init + pop_qp1_init / pop_cl_init * pop_vp1_init model = set_initial_estimates(model, {pop_vc.name: new_vc_init}) elif len(peripherals) == 2: first_peripheral = peripherals[0] from1_rate = odes.get_flow(first_peripheral, central) assert from1_rate is not None qp1, vp1 = from1_rate.as_numer_denom() from2_rate = odes.get_flow(last_peripheral, central) assert from2_rate is not None qp2, vp2 = from2_rate.as_numer_denom() pop_qp2 = _find_noncov_theta(model, qp2) pop_vp2 = _find_noncov_theta(model, vp2) pop_qp1 = _find_noncov_theta(model, qp1) pop_vp1 = _find_noncov_theta(model, vp1) pop_qp2_init = model.parameters[pop_qp2].init pop_vp2_init = model.parameters[pop_vp2].init pop_qp1_init = model.parameters[pop_qp1].init pop_vp1_init = model.parameters[pop_vp1].init new_qp1_init = (pop_qp1_init + pop_qp2_init) / 2 new_vp1_init = pop_vp1_init + pop_vp2_init model = set_initial_estimates( model, {pop_qp1.name: new_qp1_init, pop_vp1.name: new_vp1_init} ) rate1 = odes.get_flow(central, last_peripheral) assert rate1 is not None rate2 = odes.get_flow(last_peripheral, central) assert rate2 is not None symbols = rate1.free_symbols | rate2.free_symbols cb = CompartmentalSystemBuilder(odes) cb.remove_compartment(last_peripheral) model = model.replace( statements=( model.statements.before_odes + CompartmentalSystem(cb) + model.statements.after_odes ) ) model = model.replace( statements=model.statements.remove_symbol_definitions( symbols, model.statements.ode_system ) ) model = remove_unused_parameters_and_rvs(model) return model
[docs] def set_ode_solver( model: Model, solver: Literal['CVODES', 'DGEAR', 'DVERK', 'IDA', 'LSODA', 'LSODI'] ): """Sets ODE solver to use for model Recognized solvers and their corresponding NONMEM advans: +----------------------------+------------------+ | Solver | NONMEM ADVAN | +============================+==================+ | CVODES | ADVAN14 | +----------------------------+------------------+ | DGEAR | ADVAN8 | +----------------------------+------------------+ | DVERK | ADVAN6 | +----------------------------+------------------+ | IDA | ADVAN15 | +----------------------------+------------------+ | LSODA | ADVAN13 | +----------------------------+------------------+ | LSODI | ADVAN9 | +----------------------------+------------------+ Parameters ---------- model : Model Pharmpy model solver : {'CVODES', 'DGEAR', 'DVERK', 'IDA', 'LSODA', 'LSODI'} Solver to use or None for no preference Return ------ Model Pharmpy model object Examples -------- >>> from pharmpy.modeling import * >>> model = load_example_model("pheno") >>> model = set_ode_solver(model, 'LSODA') """ new_steps = [] for step in model.execution_steps: new = step.replace(solver=solver) new_steps.append(new) newsteps = ExecutionSteps.create(new_steps) model = model.replace(execution_steps=newsteps).update_source() return model
[docs] def find_clearance_parameters(model: Model): """Find clearance parameters in model Parameters ---------- model : Model Pharmpy model Return ------ list A list of clearance parameters Examples -------- >>> from pharmpy.modeling import * >>> model = load_example_model("pheno") >>> find_clearance_parameters(model) [CL] """ cls = set() sset = model.statements odes = get_and_check_odes(model) t = odes.t rate_list = _find_rate(model, sset) for rate in rate_list: if rate.is_symbol(): assignment = sset.find_assignment(rate) assert assignment is not None rate = assignment.expression a, b = map(lambda x: x.free_symbols, rate.as_numer_denom()) if b: clearance_symbols = a - b - {t} for clearance in clearance_symbols: clearance = _find_real_symbol(sset, clearance) if str(clearance) not in ['LAFREE', 'KINT', 'KON']: # exclude TMDD parameters cls.add(clearance) return sorted(cls, key=str)
[docs] def find_volume_parameters(model: Model): """Find volume parameters in model Parameters ---------- model : Model Pharmpy model Return ------ list A list of volume parameters Examples -------- >>> from pharmpy.modeling import * >>> model = load_example_model("pheno") >>> find_volume_parameters(model) [VC] """ vcs = set() sset = model.statements odes = sset.ode_system if odes is None: raise ValueError(f'Model {model.name} has no ODE system') t = odes.t rate_list = _find_rate(model, sset) for rate in rate_list: if rate.is_symbol(): assignment = sset.find_assignment(rate) assert assignment is not None rate = assignment.expression rate = Expr(sympy.cancel(rate)) a, b = map(lambda x: x.free_symbols, rate.as_numer_denom()) volume_symbols = b - a - {t} for volume in volume_symbols: volume = _find_real_symbol(sset, volume) vcs.add(volume) return sorted(vcs, key=str)
def _find_rate(model: Model, sset: Statements): rate_list = [] odes = sset.ode_system assert isinstance(odes, CompartmentalSystem) central = odes.central_compartment # FIXME : How to specify metabolite compartment? metacomp = odes.find_compartment("METABOLITE") if metacomp: central_to_metabolite = odes.get_flow(central, metacomp) rate_list.append(central_to_metabolite) elimination_rate = odes.get_flow(metacomp, output) rate_list.append(elimination_rate) else: elimination_rate = odes.get_flow(central, output) rate_list.append(elimination_rate) if has_mixed_mm_fo_elimination(model): elimination_rate = Expr(sympy.expand(elimination_rate)) rate_list.append(elimination_rate.args[0]) rate_list.append(elimination_rate.args[1]) comp_peripherals = {} comp_peripherals[central] = odes.find_peripheral_compartments() if metacomp: comp_peripherals[metacomp] = odes.find_peripheral_compartments(name=metacomp.name) for comp, peripherals in comp_peripherals.items(): for periph in peripherals: rate1 = odes.get_flow(comp, periph) rate_list.append(rate1) rate2 = odes.get_flow(periph, comp) rate_list.append(rate2) return rate_list def _find_real_symbol(sset, symbol): assign = sset.find_assignment(symbol) if len(assign.rhs_symbols) == 1: dep_assigns = _get_dependent_assignments(sset, assign) for dep_assign in dep_assigns: symbol = dep_assign.symbol return symbol def _get_dependent_assignments(sset, assignment): """Finds dependent assignments one layer deep""" return list( filter( None, # NOTE: filters out falsy values (sset.find_assignment(symb) for symb in assignment.expression.free_symbols), ) )
[docs] def has_odes(model: Model) -> bool: """Check if model has an ODE system Parameters ---------- model : Model Pharmpy model Return ------ bool True if model has an ODE system See also -------- has_linear_odes has_linear_odes_with_real_eigenvalues Examples -------- >>> from pharmpy.modeling import has_odes, load_example_model >>> model = load_example_model("pheno") >>> has_odes(model) True """ odes = model.statements.ode_system return bool(odes)
[docs] def has_linear_odes(model: Model) -> bool: """Check if model has a linear ODE system Parameters ---------- model : Model Pharmpy model Return ------ bool True if model has an ODE system that is linear See also -------- has_odes has_linear_odes_with_real_eigenvalues Examples -------- >>> from pharmpy.modeling import has_linear_odes, load_example_model >>> model = load_example_model("pheno") >>> has_linear_odes(model) True """ if not has_odes(model): return False odes = model.statements.ode_system assert isinstance(odes, CompartmentalSystem) symbs = odes.compartmental_matrix.free_symbols | odes.zero_order_inputs.free_symbols return odes.t not in symbs
[docs] def has_linear_odes_with_real_eigenvalues(model: Model): """Check if model has a linear ode system with real eigenvalues Parameters ---------- model : Model Pharmpy model Return ------ bool True if model has an ODE system that is linear See also -------- has_odes has_linear_odes Examples -------- >>> from pharmpy.modeling import has_linear_odes_with_real_eigenvalues, load_example_model >>> model = load_example_model("pheno") >>> has_linear_odes_with_real_eigenvalues(model) True """ odes = model.statements.ode_system if odes is None: return False if not has_linear_odes(model): return False M = odes.compartmental_matrix eigs = M.eigenvals().keys() for eig in eigs: real = is_real(model, eig) if real is None or not real: return real return True
[docs] def get_initial_conditions(model: Model, dosing: bool = False) -> Mapping[Expr, Expr]: """Get initial conditions for the ode system Default initial conditions at t=0 for amounts is 0 Parameters ---------- model : Model Pharmpy model dosing : bool Set to True to add dosing as initial conditions Return ------ dict Initial conditions Examples -------- >>> from pharmpy.modeling import get_initial_conditions, load_example_model >>> model = load_example_model("pheno") >>> get_initial_conditions(model) {A_CENTRAL(0): 0} >>> get_initial_conditions(model, dosing=True) {A_CENTRAL(0): AMT} """ d = {} odes = model.statements.ode_system if odes is None: return d assert isinstance(odes, CompartmentalSystem) for amt in odes.amounts: d[Expr.function(amt.name, 0)] = Expr.integer(0) for s in model.statements: if isinstance(s, Assignment): if s.symbol.is_function() and not (s.symbol.args[0].free_symbols): d[s.symbol] = s.expression if dosing: for name in odes.compartment_names: comp = odes.find_compartment(name) if comp.doses and isinstance(comp.doses[0], Bolus): if comp.lag_time: time = comp.lag_time else: time = 0 d[Expr.function(comp.amount.name, time)] = comp.doses[0].amount return d
[docs] def set_initial_condition( model: Model, compartment: str, expression: TExpr, time: TExpr = Expr.integer(0), ) -> Model: """Set an initial condition for the ode system If the initial condition is already set it will be updated. If the initial condition is set to zero at time zero it will be removed (since the default is 0). Parameters ---------- model : Model Pharmpy model compartment : str Name of the compartment expression : Union[str, Expr] The expression of the initial condition time : Union[str, Expr] Time point. Default 0 Return ------ model Pharmpy model object Examples -------- >>> from pharmpy.modeling import * >>> model = load_example_model("pheno") >>> model = set_initial_condition(model, "CENTRAL", 10) >>> get_initial_conditions(model) {A_CENTRAL(0): 10} """ odes = model.statements.ode_system if odes is None: raise ValueError("Model has no system of ODEs") comp = odes.find_compartment(compartment) if comp is None: raise ValueError(f"Model has no compartment named {compartment}") expr = Expr(expression) time = Expr(time) amount = Expr.function(comp.amount.name, time) assignment = Assignment.create(amount, expr) for i, s in enumerate(model.statements.before_odes): if s.symbol == amount: if time == 0 and expr == 0: statements = model.statements[:i] + model.statements[i + 1 :] else: statements = model.statements[:i] + assignment + model.statements[i + 1 :] break else: if not (time == 0 and expr == 0): statements = ( model.statements.before_odes + assignment + odes + model.statements.after_odes ) model = model.replace(statements=statements) return model.update_source()
[docs] def get_zero_order_inputs(model: Model) -> sympy.Matrix: """Get zero order inputs for all compartments Parameters ---------- model : Model Pharmpy model Return ------ sympy.Matrix Vector of inputs Examples -------- >>> from pharmpy.modeling import get_zero_order_inputs, load_example_model >>> model = load_example_model("pheno") >>> get_zero_order_inputs(model) [0] """ odes = model.statements.ode_system if odes is None: return sympy.Matrix() return odes.zero_order_inputs
[docs] def set_zero_order_input(model: Model, compartment: str, expression: Union[TExpr]) -> Model: """Set a zero order input for the ode system If the zero order input is already set it will be updated. Parameters ---------- model : Model Pharmpy model compartment : str Name of the compartment expression : Union[str, Expr] The expression of the zero order input Return ------ model Pharmpy model object Examples -------- >>> from pharmpy.modeling import * >>> model = load_example_model("pheno") >>> model = set_zero_order_input(model, "CENTRAL", 10) >>> get_zero_order_inputs(model) [10] """ odes = model.statements.ode_system if odes is None: raise ValueError("Model has no system of ODEs") comp = odes.find_compartment(compartment) if comp is None: raise ValueError(f"Model has no compartment named {compartment}") expr = Expr(expression) cb = CompartmentalSystemBuilder(odes) cb.set_input(comp, expr) cs = CompartmentalSystem(cb) statements = model.statements.before_odes + cs + model.statements.after_odes model = model.replace(statements=statements) return model.update_source()
class ODEDisplayer: def __init__(self, eqs, ics): self._eqs = eqs self._ics = ics def __repr__(self): if self._eqs is None: return "" a = [] for ode in self._eqs: ode_str = ode.unicode() a += ode_str.split('\n') for key, value in self._ics.items(): ics_str = sympy.pretty(sympy.Eq(key, value)) a += ics_str.split('\n') return bracket(a) def _repr_latex_(self): if self._eqs is None: return "" rows = [] for ode in self._eqs: ode_repr = sympy.latex(ode, mul_symbol='dot') rows.append(ode_repr) for k, v in self._ics.items(): ics_eq = sympy.Eq(k, v) ics_repr = sympy.latex(ics_eq, mul_symbol='dot') rows.append(ics_repr) return r'\begin{cases} ' + r' \\ '.join(rows) + r' \end{cases}'
[docs] def display_odes(model: Model): """Displays the ordinary differential equation system Parameters ---------- model : Model Pharmpy model Return ------ ODEDisplayer A displayable object Example ------- >>> from pharmpy.modeling import * >>> model = load_example_model("pheno") >>> display_odes(model) ⎧d -CL⋅A_CENTRAL(t) ⎨──(A_CENTRAL(t)) = ───────────────── ⎩dt V <BLANKLINE> """ odes = model.statements.ode_system if odes is not None: eqs = odes.eqs ics = {key: val for key, val in get_initial_conditions(model).items() if val != 0} else: eqs = None ics = None return ODEDisplayer(eqs, ics)
[docs] def solve_ode_system(model: Model): """Replace ODE system with analytical solution if possible Warnings -------- This function can currently only handle the most simple of ODE systems. Parameters ---------- model : Model Pharmpy model object Returns ------- Model Pharmpy model object Example ------- >>> from pharmpy.modeling import * >>> model = load_example_model("pheno") >>> model.statements.ode_system Bolus(AMT, admid=1) → CENTRAL ┌───────┐ │CENTRAL│──CL/V→ └───────┘ >>> model = solve_ode_system(model) """ odes = model.statements.ode_system if odes is None: return model ics = get_initial_conditions(model, dosing=True) # FIXME: Should set assumptions on symbols before solving # FIXME: Need a way to handle systems with no explicit solutions sol = sympy.dsolve(sympy.sympify(odes.eqs), ics=sympy.sympify(ics)) new = [] for s in model.statements: if isinstance(s, CompartmentalSystem): for eq in sol: ass = Assignment.create(eq.lhs, eq.rhs) new.append(ass) else: new.append(s) model = model.replace(statements=Statements(new)).update_source() return model
[docs] def get_central_volume_and_clearance(model: Model): """Get the volume and clearance parameters Parameters ---------- model : Model Pharmpy model Returns ------- sympy.Symbol Volume symbol sympy.Symbol Clearance symbol Example ------- >>> from pharmpy.modeling import * >>> model = load_example_model("pheno") >>> get_central_volume_and_clearance(model) (VC, CL) """ vcs = set() cls = set() sset = model.statements odes = get_and_check_odes(model) t = odes.t central_comp = odes.central_compartment from .metabolite import has_presystemic_metabolite # Circular import issue if has_presystemic_metabolite(model): metabolite = odes.find_compartment("METABOLITE") rate = odes.get_flow(central_comp, metabolite) else: rate = odes.get_flow(central_comp, output) rate = sympy.expand(rate) if has_mixed_mm_fo_elimination(model): rate = rate.args[0] if isinstance(rate, sympy.Symbol): assignment = sset.find_assignment(rate) assert assignment is not None rate = assignment.expression rate = Expr(sympy.cancel(rate)) a, b = map(lambda x: x.free_symbols, rate.as_numer_denom()) if b: # Get volume parameter volume_symbols = b - a - {t} for volume in volume_symbols: volume = _find_real_symbol(sset, volume) vcs.add(volume) # Get clearance parameter clearance_symbols = a - b - {t} for clearance in clearance_symbols: clearance = _find_real_symbol(sset, clearance) cls.add(clearance) else: raise ValueError('Model is not suitable') return list(vcs)[0], list(cls)[0]