from __future__ import annotations
import warnings
from abc import ABC, abstractmethod
from collections.abc import Iterable, Mapping, Sequence
from typing import Any, Optional, Union, overload
import pharmpy.internals.unicode as unicode
from pharmpy.basic import BooleanExpr, Expr, Matrix, TExpr, TSymbol
from pharmpy.deps import networkx as nx
from pharmpy.deps import symengine, sympy
from pharmpy.internals.expr.assumptions import assume_all
from pharmpy.internals.expr.leaves import free_images, free_images_and_symbols
from pharmpy.internals.expr.ode import canonical_ode_rhs
from pharmpy.internals.immutable import Immutable, cache_method
[docs]
class Statement(Immutable):
"""Abstract base class for all types of statements"""
def __add__(self, other: Union[Statement, Statements, Iterable[Statement]]) -> Statements:
if isinstance(other, Statements):
return Statements((self,) + other._statements)
elif isinstance(other, Statement):
return Statements((self, other))
elif isinstance(other, Iterable):
return Statements.create((self,) + tuple(other))
else:
return NotImplemented
def __radd__(self, other: Union[Statement, Iterable[Statement]]) -> Statements:
if isinstance(other, Iterable):
return Statements.create(tuple(other) + (self,))
else:
return NotImplemented
[docs]
@abstractmethod
def subs(self, substitutions: Mapping[Expr, Expr]) -> Statement:
pass
@property
@abstractmethod
def free_symbols(self) -> set[Expr]:
pass
@property
@abstractmethod
def rhs_symbols(self) -> set[Expr]:
pass
[docs]
class Assignment(Statement):
"""Representation of variable assignment
This class represents an assignment of an expression to a variable. Multiple assignments
are combined together into a Statements object.
Parameters
----------
symbol : Expr
Symbol of assignment
expression : Expr
Expression of assignment
"""
def __init__(self, symbol: Expr, expression: Expr):
self._symbol = symbol
self._expression = expression
[docs]
@classmethod
def create(cls, symbol: TExpr, expression: TExpr) -> Assignment:
symbol = Expr(symbol)
if not symbol.is_symbol():
raise TypeError("symbol of Assignment must be a Symbol or str representing a symbol")
expression = Expr(expression)
# To avoid nested piecewises
expression = expression.piecewise_fold()
return cls(symbol, expression)
[docs]
def replace(self, **kwargs) -> Assignment:
symbol = kwargs.get('symbol', self._symbol)
expression = kwargs.get('expression', self._expression)
return Assignment.create(symbol, expression)
@property
def symbol(self) -> Expr:
"""Symbol of statement"""
return self._symbol
@property
def expression(self) -> Expr:
"""Expression of assignment"""
return self._expression
[docs]
def subs(self, substitutions: Mapping[Expr, Expr]) -> Assignment:
"""Substitute expressions or symbols in assignment
Parameters
----------
substitutions : dict
old-new pairs
Return
------
Assignment
Updated assignment object
Examples
--------
>>> from pharmpy.model import Assignment
>>> a = Assignment.create('CL', 'POP_CL + ETA_CL')
>>> a
CL = ETA_CL + POP_CL
>>> b = a.subs({'ETA_CL' : 'ETA_CL * WGT'})
>>> b
CL = ETA_CL⋅WGT + POP_CL
"""
symbol = self.symbol.subs(substitutions)
expression = self.expression.subs(substitutions)
expression = expression.piecewise_fold()
return Assignment(symbol, expression)
@property
def free_symbols(self) -> set[Expr]:
"""Get set of all free symbols in the assignment
Note that the left hand side symbol will be in the set
Examples
--------
>>> from pharmpy.model import Assignment
>>> a = Assignment.create('CL', 'POP_CL + ETA_CL')
>>> a.free_symbols # doctest: +SKIP
{CL, ETA_CL, POP_CL}
"""
symbols = {self.symbol}
symbols |= self.expression.free_symbols
return symbols
@property
def rhs_symbols(self) -> set[Expr]:
"""Get set of all free symbols in the right hand side expression
Examples
--------
>>> from pharmpy.model import Assignment
>>> a = Assignment.create('CL', 'POP_CL + ETA_CL')
>>> a.rhs_symbols # doctest: +SKIP
{ETA_CL, POP_CL}
"""
prefuncs = self._expression._sympy_().atoms(sympy.Function)
from sympy.core.function import AppliedUndef
# Allow applied undefined functions
funcs = {Expr(f) for f in prefuncs if isinstance(f, AppliedUndef)}
symbols = self._expression.free_symbols
return funcs | symbols
def __eq__(self, other: Any):
if not isinstance(other, Assignment):
return NotImplemented
if hash(self) != hash(other):
return False
return self.symbol == other.symbol and self.expression == other.expression
@cache_method
def __hash__(self):
return hash((self._symbol, self._expression))
[docs]
def to_dict(self) -> dict[str, Any]:
return {
'class': 'Assignment',
'symbol': self._symbol.serialize(),
'expression': self._expression.serialize(),
}
[docs]
@classmethod
def from_dict(cls, d: dict[str, Any]) -> Assignment:
return cls(
symbol=Expr.deserialize(d['symbol']), expression=Expr.deserialize(d['expression'])
)
def __repr__(self):
expression = self._expression.unicode()
lines = [line.rstrip() for line in expression.split('\n')]
definition = f'{self._symbol.unicode()} = '
s = ''
for line in lines:
if line == lines[-1]:
s += definition + line + '\n'
else:
s += len(definition) * ' ' + line + '\n'
return s.rstrip()
def _repr_latex_(self) -> str:
sym = self._symbol.latex()
expr = self._expression.latex()
return f'${sym} = {expr}$'
class CompartmentBase(Immutable):
pass
class Output(CompartmentBase):
def __new__(cls):
# Singleton class
if not hasattr(cls, 'instance'):
cls.instance = super(Output, cls).__new__(cls)
return cls.instance
def __repr__(self):
return "Output()"
def __hash__(self):
return 5267
def to_dict(self) -> dict[str, str]:
return {'class': 'Output'}
@classmethod
def from_dict(cls, d) -> Output:
return cls.instance
def __eq__(self, other: Any):
if not isinstance(other, CompartmentBase):
return NotImplemented
return self is other
output = Output()
[docs]
class CompartmentalSystemBuilder:
"""Builder for CompartmentalSystem"""
def __init__(self, cs: Optional[CompartmentalSystem] = None):
if cs:
self._g = cs._g.copy()
else:
self._g = nx.DiGraph()
self._g.add_node(output) # Single output "compartment"
[docs]
def add_compartment(self, compartment: Compartment) -> None:
"""Add compartment to system
The compartment will be added without any flows to other compartments.
Use the add_flow method to add flows to and from the newly added compartment.
Parameters
----------
compartment : Compartment
Compartment to add
Examples
--------
>>> from pharmpy.model import CompartmentalSystemBuilder
>>> cb = CompartmentalSystemBuilder()
>>> central = cb.add_compartment("CENTRAL")
"""
self._g.add_node(compartment)
[docs]
def remove_compartment(self, compartment: Compartment) -> None:
"""Remove compartment from system
Parameters
----------
compartment : Compartment
Compartment object to remove from system
Examples
--------
>>> from pharmpy.model import Compartment, CompartmentalSystemBuilder
>>> cb = CompartmentalSystemBuilder()
>>> central = Compartment.create("CENTRAL")
>>> cb.add_compartment(central)
>>> cb.remove_compartment(central)
"""
self._g.remove_node(compartment)
[docs]
def add_flow(self, source: Compartment, destination: CompartmentBase, rate: TExpr) -> None:
"""Add flow between two compartments
Parameters
----------
source : Compartment
Source compartment
destination : Compartment
Destination compartment
rate : Expression
Symbolic rate of flow
Examples
--------
>>> from pharmpy.model import Compartment, CompartmentalSystemBuilder
>>> cb = CompartmentalSystemBuilder()
>>> depot = Compartment.create("DEPOT")
>>> cb.add_compartment(depot)
>>> central = Compartment.create("CENTRAL")
>>> cb.add_compartment(central)
>>> cb.add_flow(depot, central, "KA")
"""
self._g.add_edge(source, destination, rate=Expr(rate))
[docs]
def remove_flow(self, source: Compartment, destination: Compartment) -> None:
"""Remove flow between two compartments
Parameters
----------
source : Compartment
Source compartment
destination : Compartment
Destination compartment
Examples
--------
>>> from pharmpy.model import Compartment, CompartmentalSystemBuilder
>>> cb = CompartmentalSystemBuilder()
>>> depot = Compartment.create("DEPOT")
>>> cb.add_compartment(depot)
>>> central = Compartment.create("CENTRAL")
>>> cb.add_compartment(central)
>>> cb.add_flow(depot, central, "KA")
>>> cb.remove_flow(depot, central)
"""
self._g.remove_edge(source, destination)
[docs]
def move_dose(
self,
source: Compartment,
destination: Compartment,
admid: Optional[int] = None,
) -> tuple[Compartment, Compartment]:
"""Move a dose input from one compartment to another
Parameters
----------
source : Compartment
Source compartment
destination : Compartment
Destination compartment
admid : int
Move dose with specified admid, move all if None. Default is None.
"""
if source is None:
raise ValueError('Option `source` cannot be None')
if destination is None:
raise ValueError('Option `destination` cannot be None')
if not source.doses:
raise ValueError(f'No doses to move in compartment: `{source.name}`')
if destination.doses:
new_dest_dose = destination.doses
else:
new_dest_dose = tuple()
if admid:
new_source_dose = tuple([d for d in source.doses if d.admid != admid])
new_dest_dose += tuple([d for d in source.doses if d.admid == admid])
else:
new_source_dose = tuple()
new_dest_dose += source.doses
new_source = source.replace(doses=new_source_dose)
new_dest = destination.replace(doses=new_dest_dose)
mapping = {source: new_source, destination: new_dest}
nx.relabel_nodes(self._g, mapping, copy=False)
return new_source, new_dest
[docs]
def set_dose(
self,
compartment: Compartment,
dose: Optional[Union[Dose, tuple[Dose, ...]]],
) -> Compartment:
"""Set dose of compartment, replacing the previous.
Parameters
----------
compartment : Compartment
Compartment for which to set dose
dose : Dose
New dose
Returns
-------
Compartment
The new updated compartment
"""
if compartment is None:
raise ValueError('Option `compartment` cannot be None')
if dose is None:
dose = tuple()
elif isinstance(dose, Dose):
dose = (dose,)
new_comp = compartment.replace(doses=dose)
mapping = {compartment: new_comp}
nx.relabel_nodes(self._g, mapping, copy=False)
return new_comp
[docs]
def add_dose(self, compartment: Compartment, dose: Union[Dose, tuple[Dose, ...]]):
"""Add dose to compartment.
Parameters
----------
compartment : Compartment
Compartment for which to add dose
dose : Dose
New dose
Returns
-------
Compartment
The new updated compartment
"""
if compartment is None:
raise ValueError('Option `compartment` cannot be None')
if dose is None:
raise ValueError('Option `dose` cannot be None')
elif isinstance(dose, Dose):
dose = (dose,)
new_comp = compartment.replace(doses=compartment.doses + dose)
mapping = {compartment: new_comp}
nx.relabel_nodes(self._g, mapping, copy=False)
return new_comp
[docs]
def remove_dose(self, compartment: Compartment, admid: Optional[int] = None):
"""Remove dose of compartment.
Removes dose(s) of compartment. If admid is specified, only doses of that
admid will be removed.
Parameters
----------
compartment : Compartment
Compartment for which to remove dose
admid : int
Remove dose of compartment with specified admid, remove all if None.
Default is None.
Returns
-------
Compartment
The new updated compartment
"""
if compartment is None:
raise ValueError('Option `compartment` cannot be None')
mapping = dict()
if admid:
doses = tuple(dose for dose in compartment.doses if dose.admid != admid)
else:
doses = tuple()
new_comp = compartment.replace(doses=doses)
mapping[compartment] = new_comp
nx.relabel_nodes(self._g, mapping, copy=False)
return new_comp
[docs]
def set_lag_time(self, compartment: Compartment, lag_time: TExpr) -> Compartment:
"""Set lag time of compartment
Parameters
----------
compartment : Compartment
Compartment for which to change lag time
lag_time : expr
New lag time
Returns
-------
Compartment
The new updated compartment
"""
new_comp = compartment.replace(lag_time=lag_time)
mapping = {compartment: new_comp}
nx.relabel_nodes(self._g, mapping, copy=False)
return new_comp
[docs]
def set_bioavailability(self, compartment: Compartment, bioavailability: TExpr) -> Compartment:
"""Set bioavailability of compartment
Parameters
----------
compartment : Compartment
Compartment for which to change bioavailability
bioavailability : expr
New bioavailability
Returns
-------
Compartment
The new updated compartment
"""
new_comp = compartment.replace(bioavailability=bioavailability)
mapping = {compartment: new_comp}
nx.relabel_nodes(self._g, mapping, copy=False)
return new_comp
[docs]
def find_compartment(self, name: str) -> Optional[Compartment]:
for comp in self._g.nodes:
if not isinstance(comp, Output) and comp.name == name:
return comp
return None
def _is_positive(expr: sympy.Expr) -> bool:
return (
sympy.ask(
sympy.Q.positive(expr), assume_all(sympy.Q.positive, free_images_and_symbols(expr))
)
is True
)
def _comps(graph):
return {comp for comp in graph.nodes if not isinstance(comp, Output)}
[docs]
def to_compartmental_system(names, eqs: Sequence[sympy.Eq]) -> CompartmentalSystem:
"""Convert an list of odes to a compartmental system
names : func to compartment name map
"""
eqs = [sympy.sympify(eq) for eq in eqs]
cb = CompartmentalSystemBuilder()
compartments = {}
concentrations = set()
for eq in eqs:
A = eq.lhs.args[0]
concentrations.add(A)
name = names[Expr(A)]
comp = Compartment.create(name)
cb.add_compartment(comp)
compartments[name] = comp
neweqs = list(eqs) # Remaining flows
for eq in eqs:
rhs = eq.rhs
assert isinstance(rhs, sympy.Expr)
for comp_func in concentrations.intersection(free_images(rhs)):
dep = rhs.as_independent(comp_func, as_Add=True)[1]
terms = sympy.Add.make_args(dep.expand())
for term in terms:
assert isinstance(term, sympy.Expr)
from_comp = None
to_comp = None
if len(concentrations.intersection(free_images(term))) >= 2:
# This means second order absorption -> find matching term
# to determine flow
if _is_positive(term):
for second_comp in concentrations.intersection(free_images(term)):
for eq_2 in eqs:
if eq_2.lhs.args[0].name == second_comp.name: # pyright: ignore
# If this is False, then input to compartment is of second order
if -term in sympy.Add.make_args(
eq_2.rhs.expand() # pyright: ignore
):
from_comp = compartments[names[Expr(second_comp)]]
to_comp = compartments[names[Expr(eq.lhs.args[0])]]
else:
# Find matching term to determine if flow is between
# compartments or not
if _is_positive(term):
for eq_2 in eqs:
if -term in sympy.Add.make_args(eq_2.rhs.expand()): # pyright: ignore
from_comp = compartments[names[Expr(eq_2.lhs.args[0])]]
to_comp = compartments[names[Expr(eq.lhs.args[0])]]
if from_comp is not None and to_comp is not None:
# FIXME: Get current flow from builder instead?
cs = CompartmentalSystem(cb)
current_flow = cs.get_flow(from_comp, to_comp)
if current_flow == 0:
cb.add_flow(from_comp, to_comp, term / comp_func)
else:
new_flow = term / comp_func + current_flow
cb.add_flow(from_comp, to_comp, new_flow)
for i, neweq in enumerate(neweqs):
xrhs = neweq.rhs
assert isinstance(xrhs, sympy.Expr)
if neweq.lhs.args[0].name == eq.lhs.args[0].name: # pyright: ignore
neweqs[i] = sympy.Eq(
neweq.lhs, sympy.expand(xrhs - term) # pyright: ignore
)
elif neweq.lhs.args[0].name == comp_func.name: # pyright: ignore
neweqs[i] = sympy.Eq(
neweq.lhs, sympy.expand(xrhs + term) # pyright: ignore
)
for eq in neweqs:
if eq.rhs != 0:
i = sympy.Integer(0)
o = sympy.Integer(0)
for term in sympy.Add.make_args(eq.rhs):
assert isinstance(term, sympy.Expr)
if _is_positive(term):
i = i + term # pyright: ignore
else:
o = o + term # pyright: ignore
comp_func = eq.lhs.args[0]
from_comp = compartments[names[Expr(comp_func)]]
if o != 0:
cb.add_flow(from_comp, output, -o / comp_func) # pyright: ignore
if i != 0:
cb.set_input(from_comp, i)
cs = CompartmentalSystem(cb)
return cs
[docs]
class CompartmentalSystem(Statement):
"""System of ODEs descibed as a compartmental system
Examples
--------
>>> from pharmpy.model import Bolus, Compartment, output
>>> from pharmpy.model import CompartmentalSystemBuilder, CompartmentalSystem
>>> cb = CompartmentalSystemBuilder()
>>> dose = Bolus.create("AMT")
>>> central = Compartment.create("CENTRAL", doses=(dose,))
>>> cb.add_compartment(central)
>>> peripheral = Compartment.create("PERIPHERAL")
>>> cb.add_compartment(peripheral)
>>> cb.add_flow(central, peripheral, "K12")
>>> cb.add_flow(peripheral, central, "K21")
>>> cb.add_flow(central, output, "CL / V")
>>> CompartmentalSystem(cb) # doctest: +SKIP
Bolus(AMT)
┌──────────┐
│PERIPHERAL│
└──────────┘
↑ │
K12 K21
│ ↓
┌───────┐ ┌──────────┐ ┌──────────┐
│CENTRAL│──K12→│PERIPHERAL│──K21→│ CENTRAL │──CL/V→
└───────┘ └──────────┘ └──────────┘
"""
def __init__(
self,
builder: CompartmentalSystemBuilder,
t: Expr = Expr.symbol('t'),
):
self._g = nx.freeze(builder._g.copy())
self._t = t
[docs]
@classmethod
def create(
cls,
builder: CompartmentalSystemBuilder,
t: Optional[Union[Expr, str]] = Expr.symbol('t'),
) -> CompartmentalSystem:
if builder is None:
raise TypeError('Argument `builder` cannot be None`')
elif not isinstance(builder, CompartmentalSystemBuilder):
raise TypeError(
f'Argument `builder` must be of type CompartmentalSystemBuilder: got `{type(builder)}`'
)
if not isinstance(t, Expr) or isinstance(t, str):
raise TypeError(f'Argument `t` must be of type str or Expr: got `{type(t)}`')
t = Expr(t)
return cls(builder=builder, t=t)
[docs]
def replace(self, **kwargs) -> CompartmentalSystem:
t = kwargs.get('t', self._t)
builder = kwargs.get('builder', None)
if builder is None:
builder = CompartmentalSystemBuilder(self)
return CompartmentalSystem.create(builder=builder, t=t)
@property
def t(self) -> Expr:
"""Independent variable of CompartmentalSystem"""
return self._t
@property
def eqs(self) -> tuple[BooleanExpr, ...]:
"""Tuple of equations"""
amount_funcs = Matrix(list(self.amounts))
derivatives = Matrix([Expr.derivative(fn, self.t) for fn in amount_funcs])
inputs = self.zero_order_inputs
a = self.compartmental_matrix @ amount_funcs + inputs
eqs = [
BooleanExpr(sympy.Eq(lhs, canonical_ode_rhs(rhs._sympy_())))
for lhs, rhs in zip(derivatives, a)
]
return tuple(eqs)
@property
def free_symbols(self) -> set[Expr]:
"""Get set of all free symbols in the compartmental system
Returns
-------
set
Set of symbols
Examples
--------
>>> from pharmpy.modeling import load_example_model
>>> model = load_example_model("pheno")
>>> model.statements.ode_system.free_symbols # doctest: +SKIP
{AMT, CL, V, t}
"""
free = {Expr.symbol('t')}
for _, _, rate in self._g.edges.data('rate'):
free |= rate.free_symbols
for node in _comps(self._g):
free |= node.free_symbols
return free
@property
def rhs_symbols(self) -> set[Expr]:
"""Get set of all free symbols in the right hand side expressions
Returns
-------
set
Set of symbols
Examples
--------
>>> from pharmpy.modeling import load_example_model
>>> model = load_example_model("pheno")
>>> model.statements.ode_system.rhs_symbols # doctest: +SKIP
{AMT, CL, V, t}
"""
return self.free_symbols # This works currently
[docs]
def subs(self, substitutions: Mapping[Expr, Expr]) -> CompartmentalSystem:
"""Substitute expressions or symbols in ODE system
Examples
--------
>>> from pharmpy.modeling import load_example_model
>>> model = load_example_model("pheno")
>>> model.statements.ode_system.subs({'AMT': 'DOSE'})
Bolus(DOSE, admid=1) → CENTRAL
┌───────┐
│CENTRAL│──CL/V→
└───────┘
"""
cb = CompartmentalSystemBuilder(self)
for u, v, rate in cb._g.edges.data('rate'):
rate_sub = rate.subs(substitutions)
cb._g.edges[u, v]['rate'] = rate_sub
mapping = {comp: comp.subs(substitutions) for comp in _comps(self._g)}
nx.relabel_nodes(cb._g, mapping, copy=False)
return CompartmentalSystem(cb)
def __eq__(self, other):
if other is self:
return True
if not isinstance(other, CompartmentalSystem):
return NotImplemented
return (
self._t == other._t
and nx.to_dict_of_dicts(self._g) == nx.to_dict_of_dicts(other._g)
and self.dosing_compartments == other.dosing_compartments
)
def __hash__(self):
return hash((self._t, self._g))
[docs]
def to_dict(self) -> dict[str, Any]:
comps = [comp for comp in self._g.nodes]
comps_dicts = tuple(comp.to_dict() for comp in comps)
edges = []
for from_comp, to_comp, rate in self._g.edges.data('rate'):
from_n = comps.index(from_comp)
to_n = comps.index(to_comp)
edge = (from_n, to_n, rate.serialize())
edges.append(edge)
d = {
'class': 'CompartmentalSystem',
'compartments': comps_dicts,
'rates': edges,
't': self._t.serialize(),
}
return d
[docs]
@classmethod
def from_dict(cls, d: dict[str, Any]) -> CompartmentalSystem:
cb = CompartmentalSystemBuilder()
comps = []
for comp_dict in d['compartments']:
if comp_dict['class'] == 'Output':
compartment = output
else:
compartment = Compartment.from_dict(comp_dict)
cb.add_compartment(compartment)
comps.append(compartment)
for from_n, to_n, rate in d['rates']:
from_comp = comps[from_n]
to_comp = comps[to_n]
cb.add_flow(from_comp, to_comp, Expr.deserialize(rate))
return cls(cb, t=Expr.deserialize(d['t']))
[docs]
def get_flow(
self, source: Optional[CompartmentBase], destination: Optional[CompartmentBase]
) -> Expr:
"""Get the rate of flow between two compartments
Parameters
----------
source : Compartment
Source compartment
destination : Compartment
Destination compartment
Returns
-------
Expression
Symbolic rate
Examples
--------
>>> from pharmpy.model import Compartment, CompartmentalSystemBuilder
>>> from pharmpy.model import CompartmentalSystem
>>> cb = CompartmentalSystemBuilder()
>>> depot = Compartment.create("DEPOT")
>>> cb.add_compartment(depot)
>>> central = Compartment.create("CENTRAL")
>>> cb.add_compartment(central)
>>> cb.add_flow(depot, central, "KA")
>>> odes = CompartmentalSystem(cb)
>>> odes.get_flow(depot, central)
KA
>>> odes.get_flow(central, depot)
0
"""
try:
rate = self._g.edges[source, destination]['rate']
except KeyError:
rate = Expr.integer(0)
return rate
[docs]
def get_compartment_outflows(
self, compartment: Union[str, CompartmentBase]
) -> list[tuple[CompartmentBase, Expr]]:
"""Get list of all flows going out from a compartment
Parameters
----------
compartment : Compartment or str
Get outflows for this compartment
Returns
-------
list
Pairs of compartments and symbolic rates
Examples
--------
>>> from pharmpy.modeling import load_example_model
>>> model = load_example_model("pheno")
>>> model.statements.ode_system.get_compartment_outflows("CENTRAL")
[(Output(), CL/V)]
"""
compartment = self.find_compartment_or_raise(compartment)
flows = []
for node in self._g.successors(compartment):
flow = self.get_flow(compartment, node)
flows.append((node, flow))
return flows
[docs]
def get_compartment_inflows(
self, compartment: Union[CompartmentBase, str]
) -> list[tuple[Compartment, Expr]]:
"""Get list of all flows going in to a compartment
Parameters
----------
compartment : Compartment or str
Get inflows to this compartment
Returns
-------
list
Pairs of compartments and symbolic rates
Examples
--------
>>> from pharmpy.modeling import load_example_model
>>> from pharmpy.model import output
>>> model = load_example_model("pheno")
>>> model.statements.ode_system.get_compartment_inflows(output)
[(Compartment(CENTRAL, amount=A_CENTRAL(t), doses=Bolus(AMT, admid=1)), CL/V)]
"""
if isinstance(compartment, str):
destination = self.find_compartment(compartment)
if destination is None:
raise ValueError(f"Cannot find compartment {compartment}")
else:
destination = compartment
flows = []
for node in self._g.predecessors(destination):
flow = self.get_flow(node, destination)
flows.append((node, flow))
return flows
[docs]
def get_bidirectionals(self, compartment: Union[CompartmentBase, str]) -> list[Compartment]:
"""Get list of all compartments with bidirectional flow from/to a compartment
Parameters
----------
compartment : Compartment or str
Compartment of interest
Returns
-------
list
Compartments with bidirectional flow
Examples
--------
>>> from pharmpy.modeling import load_example_model
>>> model = load_example_model("pheno")
>>> central = model.statements.ode_system.central_compartment
>>> model.statements.ode_system.get_bidirectionals(central)
[]
"""
compartment = self.find_compartment_or_raise(compartment)
comps = []
for node in self._g.predecessors(compartment):
if self._g.has_edge(compartment, node):
comps.append(node)
return comps
[docs]
def find_compartment(self, name: str) -> Optional[Compartment]:
"""Find a compartment using its name
Parameters
----------
name : str
Name of compartment to find
Returns
-------
Compartment
Compartment named name or None if not found
Examples
--------
>>> from pharmpy.modeling import load_example_model
>>> model = load_example_model("pheno")
>>> central = model.statements.ode_system.find_compartment("CENTRAL")
>>> central
Compartment(CENTRAL, amount=A_CENTRAL(t), doses=Bolus(AMT, admid=1))
"""
for comp in _comps(self._g):
if comp.name == name:
return comp
else:
return None
[docs]
def find_compartment_or_raise(self, comp: Union[str, CompartmentBase]) -> Compartment:
if isinstance(comp, CompartmentBase):
return comp # pyright: ignore
found_comp = self.find_compartment(comp)
if found_comp is None:
raise ValueError(f"No compartment named {comp}")
return found_comp
[docs]
def get_n_connected(self, comp: Compartment) -> int:
"""Get the number of compartments connected to a compartment
Parameters
----------
comp : Compartment
The compartment
Returns
-------
int
Number of compartments connected to comp
Examples
--------
>>> from pharmpy.modeling import load_example_model
>>> model = load_example_model("pheno")
>>> model.statements.ode_system.get_n_connected("CENTRAL")
0
"""
out_comps = {c for c, _ in self.get_compartment_outflows(comp)}
in_comps = {c for c, _ in self.get_compartment_inflows(comp)}
return len((out_comps | in_comps) - {output})
@property
def dosing_compartments(self) -> tuple[Compartment, ...]:
"""The dosing compartment(s)
A dosing compartment is a compartment that receives an input dose. Multiple
dose compartments are supported. The order of dose compartments is defined
to put the central compartment last.
Returns
-------
tuple
A tuple of dose compartments
Examples
--------
>>> from pharmpy.modeling import load_example_model
>>> model = load_example_model("pheno")
>>> model.statements.ode_system.dosing_compartments
(Compartment(CENTRAL, amount=A_CENTRAL(t), doses=Bolus(AMT, admid=1)),)
"""
dosing_comps = tuple()
comps = sorted(list(_comps(self._g)), key=lambda comp: comp.name)
for node in comps:
if node.doses:
if node.name != self.central_compartment.name:
if len(dosing_comps) >= 2:
dosing_comps = dosing_comps[:-1] + (node,) + dosing_comps[-1:]
else:
dosing_comps = (node,) + dosing_comps
else:
dosing_comps = dosing_comps + (node,)
if len(dosing_comps) != 0:
return dosing_comps
raise ValueError('No dosing compartment exists')
@property
def central_compartment(self) -> Compartment:
"""The central compartment
The central compartment is defined to be the compartment that has an outward flow
to the output compartment. Only one central compartment is supported.
Returns
-------
Compartment
Central compartment
Examples
--------
>>> from pharmpy.modeling import load_example_model
>>> model = load_example_model("pheno")
>>> model.statements.ode_system.central_compartment
Compartment(CENTRAL, amount=A_CENTRAL(t), doses=Bolus(AMT, admid=1))
"""
try:
# E.g. TMDD models have more than one output
# FIXME: Relying heavily on compartment naming for drug metabolite
central = list(self._g.predecessors(output))[-1]
if central.name in ["METABOLITE", "EFFECT", "COMPLEX", "RESPONSE"]:
central = self.find_compartment("CENTRAL")
if central is None:
raise ValueError('Cannot find central compartment')
except IndexError:
raise ValueError('Cannot find central compartment')
return central
[docs]
def find_peripheral_compartments(self, name: Optional[str] = None) -> list[Compartment]:
"""Find perihperal compartments
A peripheral compartment is defined as having one flow to the central compartment and
one flow from the central compartment.
If name is set, peripheral compartments connected to the compartment
with the associated name is returned.
Returns
-------
list of compartments
Peripheral compartments
Examples
--------
>>> from pharmpy.modeling import load_example_model
>>> model = load_example_model("pheno")
>>> model.statements.ode_system.find_peripheral_compartments()
[]
"""
if name is not None:
central = self.find_compartment(name)
if central is None:
raise ValueError(f"{name} is not a name of an existing compartment")
else:
central = self.central_compartment
oneout = {node for node, out_degree in self._g.out_degree() if out_degree == 1}
onein = {node for node, in_degree in self._g.in_degree() if in_degree == 1}
cout = {comp for comp in oneout if self.get_flow(comp, central) != 0}
cin = {comp for comp in onein if self.get_flow(central, comp) != 0}
peripherals = list(cout & cin)
# Return in deterministic order
peripherals = sorted(peripherals, key=lambda comp: comp.name)
return peripherals
[docs]
def find_transit_compartments(self, statements: Statements) -> list[Compartment]:
"""Find all transit compartments
Transit compartments are a chain of compartments with the same out rate starting from
the dose compartment. Because one single transit compartment cannot be distinguished
from one depot compartment such compartment will be defined to be a depot and not
a transit compartment.
Returns
-------
list of compartments
Transit compartments
Examples
--------
>>> from pharmpy.modeling import load_example_model
>>> model = load_example_model("pheno")
>>> model.statements.ode_system.find_transit_compartments(model.statements)
[]
"""
transits = []
comp = self.dosing_compartments[0]
if len(self.get_compartment_inflows(comp)) != 0:
return transits
outflows = self.get_compartment_outflows(comp)
if len(outflows) != 1:
return transits
transits.append(comp)
comp, rate = outflows[0]
rate = statements.before_odes.full_expression(rate)
while True:
if len(self.get_compartment_inflows(comp)) != 1:
break
outflows = self.get_compartment_outflows(comp)
if len(outflows) != 1:
break
next_comp, next_rate = outflows[0]
next_rate = statements.before_odes.full_expression(next_rate)
if rate != next_rate:
break
transits.append(comp)
comp = next_comp
# Special case of one transit directly into central is not defined as a transit
# Also not central itself
central = self.central_compartment
if len(transits) == 1 and (
self.get_flow(transits[0], central) != 0 or transits[0] == central
):
return []
else:
return transits
[docs]
def find_depot(self, statements: Statements) -> Optional[Compartment]:
"""Find the depot compartment
The depot compartment is defined to be the compartment that only has out flow to the
central compartment, but no flow from the central compartment.
For drug metabolite models however, it is possible to have outflow with
unidirectional flow to both the central and metabolite compartment. In this case,
the central compartment is found based on name.
Returns
-------
Compartment
Depot compartment
Examples
--------
>>> from pharmpy.modeling import load_example_model, set_first_order_absorption
>>> model = load_example_model("pheno")
>>> model = set_first_order_absorption(model)
>>> model.statements.ode_system.find_depot(model.statements)
Compartment(DEPOT, amount=A_DEPOT(t), doses=Bolus(AMT, admid=1))
"""
transits = self.find_transit_compartments(statements)
depot = self._find_depot()
if depot in transits:
depot = None
return depot
def _find_depot(self):
central = self.central_compartment
depot = None
for to_central, _ in self.get_compartment_inflows(central):
outflows = self.get_compartment_outflows(to_central)
assert len(outflows) == 1 or len(outflows) == 2
if len(outflows) == 2:
# FIXME: Use another method than compartment name
metabolite = self.find_compartment("METABOLITE")
if metabolite is None or not self.get_flow(to_central, metabolite):
break
inflows = self.get_compartment_inflows(to_central)
for in_comp, _ in inflows:
if in_comp == central:
break
else:
depot = to_central
break
return depot
@property
def compartmental_matrix(self) -> Matrix:
"""Compartmental matrix of the compartmental system
Examples
--------
>>> from pharmpy.modeling import load_example_model, set_first_order_absorption
>>> model = load_example_model("pheno")
>>> model.statements.ode_system.compartmental_matrix
⎡-CL ⎤
⎢────⎥
⎣ V ⎦
"""
nodes = self._order_compartments()
size = len(nodes)
f = symengine.zeros(size)
for i in range(0, size):
from_comp = nodes[i]
diagsum = 0
for j in range(0, size):
to_comp = nodes[j]
rate = self.get_flow(from_comp, to_comp)
if i != j:
f[j, i] = rate
diagsum -= rate
outrate = self.get_flow(from_comp, output)
f[i, i] = diagsum - outrate
return Matrix(f)
@property
def amounts(self) -> Matrix:
"""Column vector of amounts for all compartments
Examples
--------
>>> from pharmpy.modeling import load_example_model
>>> import sympy
>>> model = load_example_model("pheno")
>>> sympy.pprint(model.statements.ode_system.amounts)
[A_CENTRAL(t)]
"""
ordered_cmts = self._order_compartments()
amts = [cmt.amount for cmt in ordered_cmts]
return Matrix(amts)
@property
def compartment_names(self) -> list[str]:
"""Names of all compartments
Examples
--------
>>> from pharmpy.modeling import load_example_model
>>> model = load_example_model("pheno")
>>> model.statements.ode_system.compartment_names
['CENTRAL']
"""
ordered_cmts = self._order_compartments()
names = [cmt.name for cmt in ordered_cmts]
return names
def _order_compartments(self):
"""Return list of all compartments in canonical order"""
try:
dosecmt = self.dosing_compartments[0]
except ValueError:
# Fallback for cases where no dose is available (yet)
comps = list(_comps(self._g))
return sorted(comps, key=lambda comp: comp.name)
# Order compartments
def sortfunc(x):
a = list(x)
if output in a:
a.remove(output)
a = sorted(a, key=lambda x: x.name)
return iter(a)
nodes = list(nx.bfs_tree(self._g, dosecmt, sort_neighbors=sortfunc))
remaining_unsorted = set(self._g.nodes) - {output} - set(nodes)
remaining_with_input = {comp for comp in remaining_unsorted if comp.input != 0}
remaining_without_input = remaining_unsorted - remaining_with_input
remaining = sorted(remaining_with_input, key=lambda x: x.name) + sorted(
remaining_without_input, key=lambda x: x.name
)
while remaining: # Disjoint or upstream of dosing
comp = remaining.pop(0)
connected = list(nx.bfs_tree(self._g, comp, sort_neighbors=sortfunc))
for c in connected:
if c not in nodes:
nodes.append(c)
if c != comp:
remaining.remove(c)
return nodes
@property
def zero_order_inputs(self) -> Matrix:
"""Vector of all zero order inputs to each compartment
Example
-------
>>> from pharmpy.modeling import load_example_model, set_zero_order_absorption
>>> model = load_example_model("pheno")
>>> model.statements.ode_system.zero_order_inputs
[0]
"""
inputs = [node.input for node in self._order_compartments()]
return Matrix(inputs)
def __len__(self):
"""The number of compartments"""
return len(self._g.nodes) - 1
def _repr_html_(self):
# Use Unicode art for now. There should be ways of drawing networkx
s = str(self)
return f'<pre>{s}</pre>'
def __repr__(self):
def lag_string(comp):
return f"\ntlag={comp.lag_time}" if comp.lag_time != 0 else ""
def f_string(comp):
return f"\nF={comp.bioavailability}" if comp.bioavailability != 1 else ""
def comp_string(comp):
return comp.name + lag_string(comp) + f_string(comp)
current = self.dosing_compartments[0]
comp_height = 0
comp_width = 0
while True:
bidirects = self.get_bidirectionals(current)
outflows = self.get_compartment_outflows(current)
comp_height = max(comp_height, len(bidirects) + 1)
comp_width += 1
for comp, rate in outflows:
if comp not in bidirects and comp != output:
current = comp
break
else:
break
noutput = len(self.get_compartment_inflows(output))
have_zo_input = int(not self.zero_order_inputs.is_zero_matrix())
comp_nrows = comp_height * 2 - 1 + (1 if noutput > 1 else 0)
nrows = comp_nrows + have_zo_input
ncols = comp_width * 2
grid = unicode.Grid(nrows, ncols)
current = self.dosing_compartments[0]
col = 0
if comp_nrows == 1 or comp_nrows == 2:
main_row = 0 + have_zo_input
else:
main_row = 2 + have_zo_input
if have_zo_input:
# Assuming a fully linear layout
for i, zo in enumerate(self.zero_order_inputs):
if zo != 0:
try:
grid.set(0, i * 2, unicode.VerticalArrow(str(zo)))
except IndexError:
warnings.warn(
"""The ODE system cannot be printed. Try statements.before_odes
and statements.after_odes instead."""
)
while True:
bidirects = self.get_bidirectionals(current)
outflows = self.get_compartment_outflows(current)
comp_box = unicode.Box(comp_string(current))
grid.set(main_row, col, comp_box)
if bidirects:
grid.set(0, col, unicode.Box(comp_string(bidirects[0])))
grid.set(
1,
col,
unicode.DualVerticalArrows(
str(self.get_flow(current, bidirects[0])),
str(self.get_flow(bidirects[0], current)),
),
)
if len(bidirects) > 1:
grid.set(4, col, unicode.Box(comp_string(bidirects[1])))
grid.set(
3,
col,
unicode.DualVerticalArrows(
str(self.get_flow(bidirects[1], current)),
str(self.get_flow(current, bidirects[1])),
),
)
for comp, rate in outflows:
if comp not in bidirects and comp != output:
next_comp = comp
break
else:
next_comp = None
if next_comp:
rate = self.get_flow(current, next_comp)
else:
rate = self.get_flow(current, output)
arrow = unicode.Arrow(str(rate))
grid.set(main_row, col + 1, arrow)
if next_comp and noutput > 1:
rate = self.get_flow(current, output)
arrow = unicode.VerticalArrow(str(rate))
grid.set(main_row + 1, col, arrow)
col += 2
current = next_comp
if not current:
break
all_doses = ""
for dose_comp in self.dosing_compartments:
for dose in dose_comp.doses:
all_doses += f'{str(dose)} → {dose_comp.name} \n'
s = all_doses + str(grid).rstrip()
return s
class Dose(ABC):
"""Abstract base class for different types of doses"""
def __init__(self, amount: Expr, admid: int):
self._admid = admid
self._amount = amount
@property
def admid(self) -> int:
"""Administration ID of dose"""
return self._admid
@property
def amount(self) -> Expr:
"""Symbolic amount of dose"""
return self._amount
@abstractmethod
def subs(self, substitutions) -> Dose: ...
@property
@abstractmethod
def free_symbols(self) -> set[Expr]: ...
@abstractmethod
def to_dict(self) -> dict[str, Any]: ...
[docs]
class Bolus(Dose, Immutable):
"""A Bolus dose
Parameters
----------
amount : symbol
Symbolic amount of dose
admid : int
Administration ID
Examples
--------
>>> from pharmpy.model import Bolus
>>> dose = Bolus.create("AMT")
>>> dose
Bolus(AMT, admid=1)
"""
def __init__(self, amount: Expr, admid: int = 1):
super().__init__(amount, admid)
[docs]
@classmethod
def create(cls, amount: TExpr, admid: int = 1) -> Bolus:
return cls(Expr(amount), admid=admid)
[docs]
def replace(self, **kwargs) -> Bolus:
amount = kwargs.get("amount", self._amount)
admid = kwargs.get("admid", self._admid)
return Bolus.create(amount=amount, admid=admid)
@property
def amount(self) -> Expr:
"""Symbolic amount of dose"""
return self._amount
@property
def free_symbols(self) -> set[Expr]:
"""Get set of all free symbols in the dose
Examples
--------
>>> from pharmpy.model import Bolus
>>> dose = Bolus.create("AMT")
>>> dose.free_symbols
{AMT}
"""
return {self._amount}
[docs]
def subs(self, substitutions: Mapping[Expr, Expr]) -> Bolus:
"""Substitute expressions or symbols in dose
Parameters
----------
substitutions : dict
Dictionary of from, to pairs
Examples
--------
>>> from pharmpy.model import Bolus
>>> dose = Bolus.create("AMT")
>>> dose.subs({'AMT': 'DOSE'})
Bolus(DOSE, admid=1)
"""
return Bolus(self._amount.subs(substitutions), admid=self._admid)
def __eq__(self, other):
if not isinstance(other, Bolus):
return NotImplemented
return self._amount == other._amount and self._admid == other._admid
def __hash__(self):
return hash((self._amount, self._admid))
[docs]
def to_dict(self) -> dict[str, Any]:
return {'class': 'Bolus', 'amount': self._amount.serialize(), 'admid': self._admid}
[docs]
@classmethod
def from_dict(cls, d: dict[str, Any]) -> Bolus:
return cls(amount=Expr.deserialize(d['amount']), admid=d['admid'])
def __repr__(self):
return f'Bolus({self._amount}, admid={self._admid})'
[docs]
class Infusion(Dose, Immutable):
"""An infusion dose
Parameters
----------
amount : expression
Symbolic amount of dose
admid : int
Administration ID
rate : expression
Symbolic rate. Mutually exclusive with duration
duration : expression
Symbolic duration. Mutually excluseive with rate
Examples
--------
>>> from pharmpy.model import Infusion
>>> dose = Infusion.create("AMT", duration="D1")
>>> dose
Infusion(AMT, admid=1, duration=D1)
>>> dose = Infusion.create("AMT", rate="R1")
>>> dose
Infusion(AMT, admid=1, rate=R1)
"""
def __init__(
self,
amount: Expr,
admid: int = 1,
rate: Optional[Expr] = None,
duration: Optional[Expr] = None,
):
super().__init__(amount, admid)
self._rate = rate
self._duration = duration
[docs]
@classmethod
def create(
cls,
amount: TExpr,
admid: int = 1,
rate: Optional[TExpr] = None,
duration: Optional[TExpr] = None,
) -> Infusion:
if rate is None and duration is None:
raise ValueError('Need rate or duration for Infusion')
if rate is not None and duration is not None:
raise ValueError('Cannot have both rate and duration for Infusion')
if rate is not None:
rate = Expr(rate)
if duration is not None:
duration = Expr(duration)
return cls(Expr(amount), admid=admid, rate=rate, duration=duration)
[docs]
def replace(self, **kwargs) -> Infusion:
amount = kwargs.get("amount", self._amount)
admid = kwargs.get("admid", self._admid)
rate = kwargs.get("rate", self._rate)
duration = kwargs.get("duration", self._duration)
return Infusion.create(amount=amount, admid=admid, rate=rate, duration=duration)
@property
def rate(self) -> Optional[Expr]:
"""Symbolic rate
Mutually exclusive with duration.
"""
return self._rate
@property
def duration(self) -> Optional[Expr]:
"""Symbolc duration
Mutually exclusive with rate.
"""
return self._duration
@property
def free_symbols(self) -> set[Expr]:
"""Get set of all free symbols in the dose
Examples
--------
>>> from pharmpy.model import Infusion
>>> dose = Infusion.create("AMT", rate="RATE")
>>> dose.free_symbols # doctest: +SKIP
{AMT, RATE}
"""
if self._rate is not None:
symbs = self._rate.free_symbols
else:
assert self._duration is not None
symbs = self._duration.free_symbols
return symbs | self._amount.free_symbols
[docs]
def subs(self, substitutions: Mapping[Expr, Expr]) -> Infusion:
"""Substitute expressions or symbols in dose
Parameters
----------
substitutions : dict
Dictionary of from, to pairs
Examples
--------
>>> from pharmpy.model import Infusion
>>> dose = Infusion.create("AMT", duration="DUR")
>>> dose.subs({'DUR': 'D1'})
Infusion(AMT, admid=1, duration=D1)
"""
amount = self._amount.subs(substitutions)
if self._rate is not None:
rate = self._rate.subs(substitutions)
duration = None
else:
rate = None
assert self._duration is not None
duration = self._duration.subs(substitutions)
return Infusion(amount, admid=self._admid, rate=rate, duration=duration)
def __eq__(self, other):
if not isinstance(other, Infusion):
return NotImplemented
return (
self._admid == other._admid
and self._rate == other._rate
and self._duration == other._duration
and self._amount == other._amount
)
def __hash__(self):
return hash((self._admid, self._rate, self._duration, self._amount))
[docs]
def to_dict(self) -> dict[str, Any]:
return {
'class': 'Infusion',
'amount': self._amount.serialize(),
'rate': self._rate.serialize() if self._rate is not None else None,
'duration': self._duration.serialize() if self._duration is not None else None,
'admid': self._admid,
}
[docs]
@classmethod
def from_dict(cls, d: dict[str, Any]) -> Infusion:
return cls(
amount=Expr.deserialize(d['amount']),
admid=d['admid'],
rate=None if d['rate'] is None else Expr.deserialize(d['rate']),
duration=None if d['duration'] is None else Expr.deserialize(d['duration']),
)
def __repr__(self):
if self.rate is not None:
arg = f'rate={self._rate}'
else:
arg = f'duration={self._duration}'
return f'Infusion({self._amount}, admid={self._admid}, {arg})'
[docs]
class Compartment(CompartmentBase):
"""Compartment for a compartmental system
Parameters
----------
name : str
Compartment name
doses : tuple(Dose)
Dose object for dose into this compartment. Default None for no dose.
input : Expression
Expression for other inputs to the compartment
lag_time : Expression
Lag time for doses entering this compartment. Default 0
bioavailability : Expression
Bioavailability fraction for doses entering this compartment. Default 1
Examples
--------
>>> from pharmpy.model import Bolus, Compartment
>>> comp = Compartment.create("CENTRAL")
>>> comp
Compartment(CENTRAL, amount=A_CENTRAL(t))
>>> comp = Compartment.create("DEPOT", lag_time="ALAG")
>>> comp
Compartment(DEPOT, amount=A_DEPOT(t), lag_time=ALAG)
>>> dose = Bolus.create("AMT")
>>> comp = Compartment.create("DEPOT", doses=(dose,))
>>> comp
Compartment(DEPOT, amount=A_DEPOT(t), doses=Bolus(AMT, admid=1))
"""
def __init__(
self,
name: str,
amount: Expr,
doses: tuple[Dose, ...] = tuple(),
input: Expr = Expr.integer(0),
lag_time: Expr = Expr.integer(0),
bioavailability: Expr = Expr.integer(1),
):
self._name = name
self._amount = amount
self._doses = doses
self._input = input
self._lag_time = lag_time
self._bioavailability = bioavailability
[docs]
@classmethod
def create(
cls,
name: str,
amount: Optional[TExpr] = None,
doses: tuple[Dose, ...] = tuple(),
input: TExpr = Expr.integer(0),
lag_time: TExpr = Expr.integer(0),
bioavailability: TExpr = Expr.integer(1),
):
if not isinstance(name, str):
raise TypeError("Name of a Compartment must be of string type")
if amount is not None:
amount_expr = Expr(amount)
else:
# NOTE: Uses a default idv
amount_expr = Expr.function(f'A_{name}', 't')
if not isinstance(doses, tuple):
try:
doses = tuple(doses)
except TypeError:
raise TypeError("dose(s) need to be given as a sequence")
for d in doses:
if not isinstance(d, Dose):
raise TypeError("All doses need to be of type Dose")
input = Expr(input)
lag_time = Expr(lag_time)
bioavailability = Expr(bioavailability)
return cls(
name=name,
amount=amount_expr,
doses=doses,
input=input,
lag_time=lag_time,
bioavailability=bioavailability,
)
[docs]
def replace(self, **kwargs):
name = kwargs.get("name", self._name)
amount = kwargs.get("amount", self._amount)
doses = kwargs.get("doses", self._doses)
input = kwargs.get("input", self._input)
lag_time = kwargs.get("lag_time", self._lag_time)
bioavailability = kwargs.get("bioavailability", self._bioavailability)
return Compartment.create(
name=name,
amount=amount,
doses=doses,
input=input,
lag_time=lag_time,
bioavailability=bioavailability,
)
@property
def name(self) -> str:
"""Compartment name"""
return self._name
@property
def amount(self) -> Expr:
"""Compartment amount symbol"""
return self._amount
@property
def doses(self) -> tuple[Dose, ...]:
# FIXME: Return in defined order with oral doses coming first!
# Only do this for multiple doses.
if len(self._doses) > 1:
return tuple(sorted(self._doses, key=lambda d: isinstance(d, Infusion), reverse=True))
else:
return self._doses
@property
def input(self) -> Expr:
return self._input
@property
def lag_time(self) -> Expr:
"""Lag time for doses into compartment"""
return self._lag_time
@property
def bioavailability(self) -> Expr:
"""Bioavailability fraction for doses into compartment"""
return self._bioavailability
@property
def free_symbols(self) -> set[Expr]:
"""Get set of all free symbols in the compartment
Examples
--------
>>> from pharmpy.model import Bolus, Compartment
>>> dose = Bolus.create("AMT")
>>> comp = Compartment.create("CENTRAL", doses=(dose,), lag_time="ALAG")
>>> comp.free_symbols # doctest: +SKIP
{A_CENTRAL, ALAG, AMT}
"""
symbs = set()
for d in self.doses:
symbs |= d.free_symbols
symbs |= self.input.free_symbols
symbs |= self.lag_time.free_symbols
symbs |= self.bioavailability.free_symbols
return symbs
[docs]
def subs(self, substitutions: Mapping[Expr, Expr]) -> Compartment:
"""Substitute expressions or symbols in compartment
Examples
--------
>>> from pharmpy.model import Bolus, Compartment
>>> dose = Bolus.create("AMT")
>>> comp = Compartment.create("CENTRAL", doses=(dose,))
>>> comp.subs({"AMT": "DOSE"})
Compartment(CENTRAL, amount=A_CENTRAL(t), doses=Bolus(DOSE, admid=1))
"""
if self.doses:
new_doses = tuple()
for d in self.doses:
new_doses = new_doses + (d.subs(substitutions),)
else:
new_doses = tuple()
return Compartment(
self.name,
amount=self._amount.subs(substitutions),
doses=new_doses,
input=self._input.subs(substitutions),
lag_time=self._lag_time.subs(substitutions),
bioavailability=self._bioavailability.subs(substitutions),
)
def __eq__(self, other):
if not isinstance(other, Compartment):
return NotImplemented
return (
self._name == other._name
and self._amount == other._amount
and self._doses == other._doses
and self._input == other._input
and self._lag_time == other._lag_time
and self._bioavailability == other._bioavailability
)
def __hash__(self):
return hash(
(
self._name,
self._amount,
self._doses,
self._input,
self._lag_time,
self._bioavailability,
)
)
[docs]
def to_dict(self) -> dict[str, Any]:
if not self._doses:
all_doses = None
else:
all_doses = tuple(dose.to_dict() for dose in self._doses)
return {
'class': 'Compartment',
'name': self._name,
'amount': self._amount.serialize(),
'doses': all_doses,
'input': self._input.serialize(),
'lag_time': self._lag_time.serialize(),
'bioavailability': self._bioavailability.serialize(),
}
[docs]
@classmethod
def from_dict(cls, d: dict[str, Any]) -> Compartment:
if d['doses'] is None:
all_doses = tuple()
else:
all_doses = tuple()
for dose in d['doses']:
if dose['class'] == 'Bolus':
all_doses += (Bolus.from_dict(dose),)
else:
all_doses += (Infusion.from_dict(dose),)
return cls(
name=d['name'],
amount=Expr.deserialize(d['amount']),
doses=all_doses,
input=Expr.deserialize(d['input']),
lag_time=Expr.deserialize(d['lag_time']),
bioavailability=Expr.deserialize(d['bioavailability']),
)
def __repr__(self):
lag = '' if self._lag_time == 0 else f', lag_time={self._lag_time}'
doses = (
''
if self._doses is tuple()
else f', doses={self._doses[0] if len(self._doses) == 1 else self._doses}'
)
input = '' if self._input == 0 else f', input={self._input}'
bioavailability = (
'' if self._bioavailability == 1 else f', bioavailability={self._bioavailability}'
)
return (
f'Compartment({self._name}, amount={self._amount}{doses}{input}{lag}{bioavailability})'
)
[docs]
class Statements(Sequence, Immutable):
"""A sequence of symbolic statements describing the model
Two types of statements are supported: Assignment and CompartmentalSystem.
A Statements object can have 0 or 1 CompartmentalSystem. The order of
the statements is significant and the same symbol can be assigned
to multiple times.
Parameters
----------
statements : list or Statements
A list of Statement or another Statements to populate this object
"""
def __init__(self, statements: Union[Statements, Iterable[Statement]] = ()):
if not isinstance(statements, tuple):
statements = tuple(statements)
self._statements = statements
[docs]
@classmethod
def create(cls, statements: Optional[Union[Statements, Iterable[Statement]]] = None):
if isinstance(statements, Statements):
statements = statements
elif statements is None:
statements = ()
elif isinstance(statements, Iterable):
if any(not isinstance(s, Statement) for s in statements):
raise TypeError('`statements` must consist of only type Statement')
statements = tuple(statements)
else:
raise TypeError(
f'`statements` must be of type Statements or an iterable of Statement: got `{type(statements)}`'
)
return cls(statements)
@overload
def __getitem__(self, ind: slice) -> Statements: ...
@overload
def __getitem__(self, ind: int) -> Statement: ...
def __getitem__(self, ind): # pyright: ignore
if isinstance(ind, slice):
return Statements(self._statements[ind])
else:
return self._statements[ind]
def __len__(self):
return len(self._statements)
def __add__(self, other: Union[Statements, Statement, Iterable[Statement]]) -> Statements:
if isinstance(other, Statements):
return Statements(self._statements + other._statements)
elif isinstance(other, Statement):
return Statements(self._statements + (other,))
elif isinstance(other, Iterable):
return Statements.create(self._statements + tuple(other))
else:
return NotImplemented
def __radd__(self, other: Union[Statement, Iterable[Statement]]) -> Statements:
if isinstance(other, Iterable):
return Statements.create(tuple(other) + self._statements)
else:
return NotImplemented
@property
def free_symbols(self) -> set[Expr]:
"""Get a set of all free symbols
Examples
--------
>>> from pharmpy.modeling import load_example_model
>>> model = load_example_model("pheno")
>>> model.statements.free_symbols # doctest: +SKIP
{AMT, APGR, A_CENTRAL, BTIME, CL, DV, EPS(1), ETA(1), ETA(2), F, IPRED, IRES, IWRES, S1,
TAD, THETA(1), THETA(2), THETA(3), TIME, TVCL, TVV, V, W, WGT, Y, t}
"""
symbols = set()
for assignment in self:
symbols |= assignment.free_symbols
return symbols
def _get_ode_system_index(self):
return next(
map(
lambda t: t[0],
filter(lambda t: isinstance(t[1], CompartmentalSystem), enumerate(self)),
),
-1,
)
@property
def ode_system(self) -> Optional[CompartmentalSystem]:
"""Returns the ODE system of the model or None if the model doesn't have an ODE system
Examples
--------
>>> from pharmpy.modeling import load_example_model
>>> model = load_example_model("pheno")
>>> model.statements.ode_system
Bolus(AMT, admid=1) → CENTRAL
┌───────┐
│CENTRAL│──CL/V→
└───────┘
"""
i = self._get_ode_system_index()
if i == -1:
return None
else:
cs = self[i]
assert isinstance(cs, CompartmentalSystem)
return cs
@property
def before_odes(self) -> Statements:
"""All statements before the ODE system
Examples
--------
>>> from pharmpy.modeling import load_example_model
>>> model = load_example_model("pheno")
>>> model.statements.before_odes
TVCL = POP_CL⋅WGT
TVV = POP_VC⋅WGT
⎧TVV⋅(COVAPGR + 1) for APGR < 5
⎨
TVV = ⎩ TVV otherwise
ETA_CL
CL = TVCL⋅ℯ
ETA_VC
VC = TVV⋅ℯ
V = VC
S₁ = VC
"""
i = self._get_ode_system_index()
return self if i == -1 else self[:i]
@property
def after_odes(self) -> Statements:
"""All statements after the ODE system
Examples
--------
>>> from pharmpy.modeling import load_example_model
>>> model = load_example_model("pheno")
>>> model.statements.after_odes
A_CENTRAL(t)
────────────
F = S₁
Y = EPS₁⋅F + F
"""
i = self._get_ode_system_index()
return Statements() if i == -1 else self[i + 1 :]
@property
def error(self) -> Statements:
"""All statements after the ODE system or the whole model if no ODE system
Examples
--------
>>> from pharmpy.modeling import load_example_model
>>> model = load_example_model("pheno")
>>> model.statements.error
A_CENTRAL(t)
────────────
F = S₁
Y = EPS₁⋅F + F
"""
i = self._get_ode_system_index()
return self if i == -1 else self[i + 1 :]
[docs]
def subs(self, substitutions: Mapping[Expr, Expr]) -> Statements:
"""Substitute symbols in all statements.
Parameters
----------
substitutions : dict
Old-new pairs(can be type str or symbol)
Examples
--------
>>> from pharmpy.modeling import load_example_model
>>> model = load_example_model("pheno")
>>> stats = model.statements.subs({'WGT': 'WT'})
>>> stats.before_odes
TVCL = POP_CL⋅WT
TVV = POP_VC⋅WT
⎧TVV⋅(COVAPGR + 1) for APGR < 5
⎨
TVV = ⎩ TVV otherwise
ETA_CL
CL = TVCL⋅ℯ
ETA_VC
VC = TVV⋅ℯ
V = VC
S₁ = VC
"""
return Statements(s.subs(substitutions) for s in self)
def _lookup_last_assignment(
self, symbol: TSymbol
) -> tuple[Optional[int], Optional[Assignment]]:
if isinstance(symbol, str):
symbol = Expr.symbol(symbol)
ind = None
assignment = None
for i, statement in enumerate(self):
if isinstance(statement, Assignment):
if statement.symbol == symbol:
ind = i
assignment = statement
return ind, assignment
[docs]
def find_assignment(self, symbol: TSymbol) -> Optional[Assignment]:
"""Returns last assignment of symbol
Parameters
----------
symbol : Symbol or str
Symbol to look for
Returns
-------
Assignment or None
An Assignment or None if no assignment to symbol exists
Examples
--------
>>> from pharmpy.modeling import load_example_model
>>> model = load_example_model("pheno")
>>> model.statements.find_assignment("CL")
ETA_CL
CL = TVCL⋅ℯ
"""
return self._lookup_last_assignment(symbol)[1]
[docs]
def find_assignment_index(self, symbol: TSymbol) -> Optional[int]:
"""Returns index of last assignment of symbol
Parameters
----------
symbol : Symbol or str
Symbol to look for
Returns
-------
int or None
Index of Assignment or None if no assignment to symbol exists
Examples
--------
>>> from pharmpy.modeling import load_example_model
>>> model = load_example_model("pheno")
>>> model.statements.find_assignment_index("CL")
3
"""
return self._lookup_last_assignment(symbol)[0]
[docs]
def reassign(self, symbol: TSymbol, expression: TExpr) -> Statements:
"""Reassign symbol to expression
Set symbol to be expression and remove all previous assignments of symbol
Parameters
----------
symbol : sympy.Symbol or str
Symbol to reassign
expression : Expr or str
The new expression to assign to symbol
Return
------
Statements
Updated statements
Examples
--------
>>> from pharmpy.modeling import load_example_model
>>> model = load_example_model("pheno")
>>> model.statements.reassign("CL", "TVCL + eta") # doctest: +SKIP
"""
symbol = Expr(symbol)
expression = Expr(expression)
last = True
new = list(self._statements)
for i, stat in zip(range(len(new) - 1, -1, -1), reversed(new)):
if isinstance(stat, Assignment) and stat.symbol == symbol:
if last:
new[i] = Assignment(symbol, expression)
last = False
else:
del new[i]
return Statements(new)
def _create_dependency_graph(self):
"""Create a graph of dependencies between statements"""
graph = nx.DiGraph()
for i in range(len(self) - 1, -1, -1):
rhs = self[i].rhs_symbols
for j in range(i - 1, -1, -1):
statement = self[j]
if isinstance(statement, Assignment):
if statement.symbol in rhs:
graph.add_edge(i, j)
else:
assert isinstance(statement, CompartmentalSystem)
amts = set(statement.amounts)
if not rhs.isdisjoint(amts):
graph.add_edge(i, j)
return graph
[docs]
def direct_dependencies(self, statement: Statement) -> Statements:
"""Find all direct dependencies of a statement
Parameters
----------
statement : Statement
Input statement
Returns
-------
Statements
Direct dependency statements
Examples
--------
>>> from pharmpy.modeling import load_example_model
>>> model = load_example_model("pheno")
>>> odes = model.statements.ode_system
>>> model.statements.direct_dependencies(odes)
ETA_CL
CL = TVCL⋅ℯ
V = VC
"""
g = self._create_dependency_graph()
index = self.index(statement)
succ = sorted(list(g.successors(index)))
stats = Statements()
stats._statements = [self[i] for i in succ]
return stats
[docs]
def dependencies(self, symbol_or_statement: Union[TSymbol, Statement]) -> set[Expr]:
"""Find all dependencies of a symbol or statement
Parameters
----------
symbol : Symbol, str or Statement
Input symbol or statement
Returns
-------
set
Set of symbols
Examples
--------
>>> from pharmpy.modeling import load_example_model
>>> model = load_example_model("pheno")
>>> model.statements.dependencies("CL") # doctest: +SKIP
{ETA(1), THETA(1), WGT}
"""
if isinstance(symbol_or_statement, Statement):
i = self.index(symbol_or_statement)
else:
symbol = Expr(symbol_or_statement)
for i in range(len(self) - 1, -1, -1):
statement = self[i]
if (
isinstance(statement, Assignment)
and statement.symbol == symbol
or isinstance(statement, CompartmentalSystem)
and symbol in statement.amounts
):
break
else:
raise KeyError(f"Could not find symbol {symbol}")
g = self._create_dependency_graph()
symbs = self[i].rhs_symbols
if i == 0 or not g:
# Special case for models with only one statement or no dependent statements
return symbs
for j, _ in nx.bfs_predecessors(g, i, sort_neighbors=lambda x: reversed(sorted(x))):
statement = self[j]
if isinstance(statement, Assignment):
symbs -= {statement.symbol}
else:
assert isinstance(statement, CompartmentalSystem)
symbs -= set(statement.amounts)
symbs |= statement.rhs_symbols
return symbs
[docs]
def remove_symbol_definitions(
self, symbols: Sequence[Expr], statement: Statement
) -> Statements:
"""Remove symbols and dependencies not used elsewhere
If the statement no longer depends on the specified
symbols, this method will make sure that the definitions
of these symbols will be removed unless they are dependencies
of other statements.
Parameters
----------
symbols : iterable
Iterable of symbols no longer used in the statement
statement : Statement
Statement from which the symbols were removed
"""
graph = self._create_dependency_graph()
removed_ind = self._statements.index(statement)
# Statements defining symbols and dependencies
candidates = set()
symbols_set = set(symbols)
for i in range(removed_ind - 1, -1, -1):
stat = self[i]
if isinstance(stat, Assignment) and stat.symbol in symbols_set:
candidates.add(i)
for i in candidates.copy():
if i in graph:
candidates |= set(nx.dfs_preorder_nodes(graph, i))
# All statements needed for removed_ind
if removed_ind in graph:
keep = {down for _, down in nx.dfs_edges(graph, removed_ind)}
else:
keep = set()
candidates -= keep
# Other dependencies after removed_ind
additional = {down for up, down in graph.edges if up > removed_ind and down in candidates}
for add in additional.copy():
additional |= set(nx.dfs_preorder_nodes(graph, add))
remove = candidates - additional
return Statements(tuple(self[i] for i in range(len(self)) if i not in remove))
[docs]
def full_expression(self, expression: TExpr) -> Expr:
"""Expand an expression into its full definition
Parameters
----------
expression : expression or str
Expression to expand. A string will be converted to an expression.
Return
------
expression
Expanded expression
Examples
--------
>>> from pharmpy.modeling import load_example_model
>>> model = load_example_model("pheno")
>>> model.statements.before_odes.full_expression("CL")
POP_CL*WGT*exp(ETA_CL)
"""
expression = Expr(expression)
for statement in reversed(self):
if isinstance(statement, CompartmentalSystem):
raise ValueError(
"CompartmentalSystem not supported by full_expression. Use the properties before_odes "
"or after_odes."
)
expression = expression.subs({statement.symbol: statement.expression})
return expression
def __eq__(self, other):
if self is other:
return True
if not isinstance(other, Statements):
return NotImplemented
if len(self) != len(other):
return False
else:
for first, second in zip(self, other):
if first != second:
return False
return True
def __hash__(self):
return hash(self._statements)
[docs]
def to_dict(self) -> dict[str, Any]:
stats = tuple(s.to_dict() for s in self)
return {'statements': stats}
[docs]
@classmethod
def from_dict(cls, d: dict[str, Any]) -> Statements:
statements = []
for sdict in d['statements']:
if sdict['class'] == 'Assignment':
s = Assignment.from_dict(sdict)
else:
s = CompartmentalSystem.from_dict(sdict)
statements.append(s)
return cls(tuple(statements))
def __repr__(self):
return '\n'.join([repr(statement) for statement in self])
def _repr_html_(self):
html = r'\begin{align*}'
for statement in self:
if hasattr(statement, '_repr_html_'):
html += '\\end{align*}'
s = statement._repr_html_()
html += s + '\\begin{align*}'
else:
s = f'${statement._repr_latex_()}$'
s = s.replace('=', '&=')
s = s.replace('$', '')
s = s + r'\\'
html += s
return html + '\\end{align*}'