from __future__ import annotations
from collections.abc import Iterable, Mapping, Sequence
from typing import Any, Optional, Union, overload
from pharmpy import DEFAULT_SEED
from pharmpy.basic import Expr
from pharmpy.deps import pandas as pd
from pharmpy.internals.immutable import Immutable, frozenmapping
SUPPORTED_SOLVERS = frozenset(('CVODES', 'DGEAR', 'DVERK', 'IDA', 'LSODA', 'LSODI'))
class ExecutionStep(Immutable):
def __init__(
self,
solver: Optional[str] = None,
solver_rtol: Optional[int] = None,
solver_atol: Optional[int] = None,
tool_options: frozenmapping[str, Any] = frozenmapping({}),
):
self._solver = solver
self._solver_rtol = solver_rtol
self._solver_atol = solver_atol
self._tool_options = tool_options
@staticmethod
def _canonicalize_solver(solver):
if solver is not None:
solver = solver.upper()
if not (solver is None or solver in SUPPORTED_SOLVERS):
raise ValueError(
f"Unknown solver {solver}. Recognized solvers are {sorted(SUPPORTED_SOLVERS)}."
)
return solver
@staticmethod
def _canonicalize_tool_options(tool_options):
tool_options = frozenmapping(tool_options)
return tool_options
@property
def solver(self) -> Optional[str]:
"""Numerical solver to use when numerically solving the ODE system
Supported solvers and their corresponding NONMEM ADVAN
+----------------------------+------------------+
| Solver | NONMEM ADVAN |
+============================+==================+
| CVODES | ADVAN14 |
+----------------------------+------------------+
| DGEAR | ADVAN8 |
+----------------------------+------------------+
| DVERK | ADVAN6 |
+----------------------------+------------------+
| IDA | ADVAN15 |
+----------------------------+------------------+
| LSODA | ADVAN13 |
+----------------------------+------------------+
| LSODI | ADVAN9 |
+----------------------------+------------------+
"""
return self._solver
@property
def solver_rtol(self) -> Optional[int]:
"""Relative tolerance for numerical ODE system solver"""
return self._solver_rtol
@property
def solver_atol(self) -> Optional[int]:
"""Absolute tolerance for numerical ODE system solver"""
return self._solver_atol
@property
def tool_options(self) -> frozenmapping[str, Any]:
"""Dictionary of tool specific options"""
return self._tool_options
def _add_to_dict(self, d):
tool_options = dict(self._tool_options)
d['solver'] = self._solver
d['solver_rtol'] = self._solver_rtol
d['solver_atol'] = self._solver_atol
d['tool_options'] = tool_options
@staticmethod
def _adjust_dict(d):
del d['class']
if isinstance(d['tool_options'], dict):
d['tool_options'] = frozenmapping(d['tool_options'])
def _partial_repr(self):
solver = f"'{self.solver}'" if self.solver is not None else self.solver
return (
f"solver={solver}, "
f"solver_rtol={self.solver_rtol}, solver_atol={self.solver_atol}, "
f"tool_options={self.tool_options}"
)
def __eq__(self, other):
# NOTE: No need to test class here
return (
self.solver == other.solver
and self.solver_rtol == other.solver_rtol
and self.solver_atol == other.solver_atol
and self.tool_options == other.tool_options
)
def __hash__(self):
return hash(
(
self._solver,
self._solver_rtol,
self._solver_atol,
self._tool_options,
)
)
[docs]
class EstimationStep(ExecutionStep):
"""Definition of one estimation operation"""
"""Supported estimation methods
"""
supported_methods = frozenset(('FO', 'FOCE', 'ITS', 'IMPMAP', 'IMP', 'SAEM', 'BAYES'))
supported_parameter_uncertainty_methods = frozenset(('SANDWICH', 'SMAT', 'RMAT', 'EFIM'))
def __init__(
self,
method: str,
interaction: bool = False,
parameter_uncertainty_method: Optional[str] = None,
evaluation: bool = False,
maximum_evaluations: Optional[int] = None,
laplace: bool = False,
isample: Optional[int] = None,
niter: Optional[int] = None,
auto: Optional[bool] = None,
keep_every_nth_iter: Optional[int] = None,
residuals: Sequence[str] = (),
predictions: Sequence[str] = (),
solver: Optional[str] = None,
solver_rtol: Optional[int] = None,
solver_atol: Optional[int] = None,
tool_options: frozenmapping[str, Any] = frozenmapping({}),
derivatives: Sequence[Sequence[Expr]] = (),
individual_eta_samples: bool = False,
):
self._method = method
self._interaction = interaction
self._parameter_uncertainty_method = parameter_uncertainty_method
self._evaluation = evaluation
self._maximum_evaluations = maximum_evaluations
self._laplace = laplace
self._isample = isample
self._niter = niter
self._auto = auto
self._keep_every_nth_iter = keep_every_nth_iter
self._residuals = residuals
self._predictions = predictions
self._derivatives = derivatives
self._individual_eta_samples = individual_eta_samples
super().__init__(
solver=solver,
solver_rtol=solver_rtol,
solver_atol=solver_atol,
tool_options=tool_options,
)
[docs]
@classmethod
def create(
cls,
method: str,
interaction: bool = False,
parameter_uncertainty_method: Optional[str] = None,
evaluation: bool = False,
maximum_evaluations: Optional[int] = None,
laplace: bool = False,
isample: Optional[int] = None,
niter: Optional[int] = None,
auto: Optional[bool] = None,
keep_every_nth_iter: Optional[int] = None,
residuals: Sequence[str] = (),
predictions: Sequence[str] = (),
solver: Optional[str] = None,
solver_rtol: Optional[int] = None,
solver_atol: Optional[int] = None,
tool_options: Mapping[str, Any] = frozenmapping({}),
derivatives: Sequence[Sequence[Expr]] = (),
individual_eta_samples: bool = False,
):
method = EstimationStep._canonicalize_and_check_method(method)
if maximum_evaluations is not None and maximum_evaluations < 1:
raise ValueError(
'Number of maximum evaluations must be more than one, use '
'evaluation=True or tool_options for special cases (e.g. 0 and -1'
'in NONMEM)'
)
try:
residuals = tuple(sorted(residuals))
except TypeError:
raise TypeError(
f"Residuals could not be converted to tuple. Recieved type '{type(residuals)}'"
)
try:
predictions = tuple(sorted(predictions))
except TypeError:
raise TypeError(
f"Predictions could not be converted to tuple. Recieved type '{type(predictions)}'"
)
if derivatives:
derivatives = EstimationStep._canonicalize_derivatives(derivatives)
if parameter_uncertainty_method is not None:
parameter_uncertainty_method = parameter_uncertainty_method.upper()
if not (
parameter_uncertainty_method is None
or parameter_uncertainty_method
in EstimationStep.supported_parameter_uncertainty_methods
):
raise ValueError(
f"Unknown parameter uncertainty method {parameter_uncertainty_method}. "
f"Recognized methods are {sorted(EstimationStep.supported_parameter_uncertainty_methods)}."
)
solver = ExecutionStep._canonicalize_solver(solver)
tool_options = ExecutionStep._canonicalize_tool_options(tool_options)
return cls(
method=method,
interaction=interaction,
parameter_uncertainty_method=parameter_uncertainty_method,
evaluation=evaluation,
maximum_evaluations=maximum_evaluations,
laplace=laplace,
isample=isample,
niter=niter,
auto=auto,
keep_every_nth_iter=keep_every_nth_iter,
residuals=residuals,
predictions=predictions,
solver=solver,
solver_rtol=solver_rtol,
solver_atol=solver_atol,
tool_options=tool_options,
derivatives=derivatives,
individual_eta_samples=bool(individual_eta_samples),
)
[docs]
def replace(self, **kwargs) -> EstimationStep:
"""Derive a new EstimationStep with new properties"""
d = {key[1:]: value for key, value in self.__dict__.items()}
d.update(kwargs)
new = EstimationStep.create(**d)
return new
@staticmethod
def _canonicalize_and_check_method(method: str) -> str:
method = method.upper()
if method not in EstimationStep.supported_methods:
raise ValueError(
f'EstimationStep: {method} not recognized. Use any of {sorted(EstimationStep.supported_methods)}.'
)
return method
@staticmethod
def _canonicalize_derivatives(derivatives: Sequence[Sequence[Expr]]) -> tuple:
try:
derivatives = tuple(tuple(d) for d in derivatives)
except TypeError:
raise TypeError(
"Given derivatives cannot be converted to tuple of tuples. "
" Check type of input arguments."
)
for d in derivatives:
for d_arg in d:
if not (isinstance(d_arg, Expr) and d_arg.is_symbol()):
raise TypeError(
f"Each derivative argument must be a symbol of type 'Expr'. "
f"Found '{d_arg}' of type {type(d_arg)}"
)
derivatives = tuple(
sorted([tuple(sorted(d, key=str)) for d in derivatives], key=lambda der: str(der[0]))
)
return derivatives
@property
def method(self) -> str:
"""Name of the estimation method"""
return self._method
@property
def maximum_evaluations(self) -> Optional[int]:
"""Maximum allowable number of evaluations of the objective function"""
return self._maximum_evaluations
@property
def interaction(self) -> bool:
"""Preserve eta-epsilon interaction in the computation of the objective function"""
return self._interaction
@property
def evaluation(self) -> bool:
"""Only perform model evaluation"""
return self._evaluation
@property
def parameter_uncertainty_method(self) -> Optional[str]:
"""Method to use when estimating parameter uncertainty.
Supported methods and their corresponding NMTRAN code:
+-------------------------------+-----------------------+
| Method | NMTRAN |
+===============================+=======================+
| Sandwich | $COVARIANCE |
+-------------------------------+-----------------------+
| Cross-product gradient (SMAT) | $COVARIANCE MATRIX=S |
+-------------------------------+-----------------------+
| Observed FIM (RMAT) | $COVARIANCE MATRIX=R |
+-------------------------------+-----------------------+
| Expected FIM (EFIM) | $DESIGN |
+-------------------------------+-----------------------+
By default the following options are appended:
UNCONDITIONAL: The uncertainty step is implemented regardless of how the estimation step terminates.
PRINT=E: Print the eigenvalues of the correlation matrix.
PRECOND=1: Perform up to 1 preconditioning cycle on the R matrix.
"""
return self._parameter_uncertainty_method
@property
def laplace(self) -> bool:
"""Use the laplacian method"""
return self._laplace
@property
def isample(self) -> Optional[int]:
"""Number of samples per subject (or similar) for EM methods"""
return self._isample
@property
def niter(self) -> Optional[int]:
"""Number of iterations for EM methods"""
return self._niter
@property
def auto(self) -> Optional[bool]:
"""Let estimation tool automatically add settings"""
return self._auto
@property
def keep_every_nth_iter(self) -> Optional[int]:
"""Keep results for every nth iteration"""
return self._keep_every_nth_iter
@property
def residuals(self) -> Sequence[str]:
"""List of residuals to calculate"""
return self._residuals
@property
def predictions(self) -> Sequence[str]:
"""List of predictions to estimate"""
return self._predictions
@property
def derivatives(self) -> Sequence[Sequence[Expr]]:
"""List of derivates to calculate when running"""
return self._derivatives
@property
def individual_eta_samples(self) -> bool:
"""Should individual eta samples be generated"""
return self._individual_eta_samples
@property
def tool_options(self) -> frozenmapping[str, Any]:
"""Dictionary of tool specific options"""
return self._tool_options
def __eq__(self, other):
if self is other:
return True
if not isinstance(other, EstimationStep):
return NotImplemented
return (
self._method == other._method
and self._interaction == other._interaction
and self._parameter_uncertainty_method == other._parameter_uncertainty_method
and self._evaluation == other._evaluation
and self._maximum_evaluations == other._maximum_evaluations
and self._laplace == other._laplace
and self._isample == other._isample
and self._niter == other._niter
and self._auto == other._auto
and self._keep_every_nth_iter == other._keep_every_nth_iter
and self._derivatives == other._derivatives
and self._predictions == other._predictions
and self._residuals == other._residuals
and self._individual_eta_samples == other._individual_eta_samples
and super().__eq__(other)
)
def __hash__(self):
return hash(
(
self._method,
self._interaction,
self._parameter_uncertainty_method,
self._evaluation,
self._maximum_evaluations,
self._laplace,
self._isample,
self._niter,
self._auto,
self._keep_every_nth_iter,
self._individual_eta_samples,
super().__hash__(),
)
)
[docs]
def to_dict(self) -> dict[str, Any]:
d = {
'class': 'EstimationStep',
'method': self._method,
'interaction': self._interaction,
'parameter_uncertainty_method': self._parameter_uncertainty_method,
'evaluation': self._evaluation,
'maximum_evaluations': self._maximum_evaluations,
'laplace': self._laplace,
'isample': self._isample,
'niter': self._niter,
'auto': self._auto,
'keep_every_nth_iter': self._keep_every_nth_iter,
'derivatives': tuple(str(d) for d in self._derivatives),
'predictions': self._predictions,
'residuals': self._residuals,
'individual_eta_samples': self._individual_eta_samples,
}
super()._add_to_dict(d)
return d
[docs]
@classmethod
def from_dict(cls, d: dict[str, Any]) -> EstimationStep:
d = dict(d)
ExecutionStep._adjust_dict(d)
return cls(**d)
def __repr__(self):
parameter_uncertainty_method = (
f"'{self.parameter_uncertainty_method}'"
if self.parameter_uncertainty_method is not None
else self.parameter_uncertainty_method
)
return (
f"EstimationStep('{self.method}', interaction={self.interaction}, "
f"parameter_uncertainty_method={parameter_uncertainty_method}, evaluation={self.evaluation}, "
f"maximum_evaluations={self.maximum_evaluations}, laplace={self.laplace}, "
f"isample={self.isample}, niter={self.niter}, auto={self.auto}, "
f"keep_every_nth_iter={self.keep_every_nth_iter}, "
f"individual_eta_samples={self.individual_eta_samples}, {super()._partial_repr()})"
)
[docs]
class SimulationStep(ExecutionStep):
"""Definition of one simulation operation"""
def __init__(
self,
n: int = 1,
seed: int = DEFAULT_SEED,
solver: Optional[str] = None,
solver_rtol: Optional[int] = None,
solver_atol: Optional[int] = None,
tool_options: frozenmapping[str, Any] = frozenmapping({}),
):
self._n = n
self._seed = seed
super().__init__(
solver=solver,
solver_rtol=solver_rtol,
solver_atol=solver_atol,
tool_options=tool_options,
)
[docs]
@classmethod
def create(
cls,
n: int = 1,
seed: int = DEFAULT_SEED,
solver: Optional[str] = None,
solver_rtol: Optional[int] = None,
solver_atol: Optional[int] = None,
tool_options: Mapping[str, Any] = frozenmapping({}),
):
if n < 1:
raise ValueError("Need at least one replicate in SimulationStep")
return cls(n=n, seed=seed)
[docs]
def replace(self, **kwargs) -> SimulationStep:
"""Derive a new SimulationStep with new properties"""
d = {key[1:]: value for key, value in self.__dict__.items()}
d.update(kwargs)
new = SimulationStep.create(**d)
return new
@property
def n(self) -> int:
"""Number of simulation replicates"""
return self._n
@property
def seed(self) -> int:
"""Random seed"""
return self._seed
def __eq__(self, other):
if self is other:
return True
if not isinstance(other, SimulationStep):
return NotImplemented
return self._n == other._n and self._seed == other._seed and super().__eq__(other)
def __hash__(self):
return hash((self._n, self._seed, super().__hash__()))
[docs]
def to_dict(self) -> dict[str, Any]:
d = {
'class': 'SimulationStep',
'n': self._n,
'seed': self._seed,
}
super()._add_to_dict(d)
return d
[docs]
@classmethod
def from_dict(cls, d: dict[str, Any]) -> SimulationStep:
d = dict(d)
ExecutionStep._adjust_dict(d)
return cls(**d)
def __repr__(self):
return f"SimulationStep(n={self._n}, seed={self._seed}, {super()._partial_repr()})"
[docs]
class ExecutionSteps(Sequence, Immutable):
"""A sequence of estimation steps
Parameters
----------
steps : iterable or None
Used for initialization
"""
def __init__(self, steps: tuple[Union[EstimationStep, SimulationStep], ...] = ()):
self._steps = steps
[docs]
@classmethod
def create(cls, steps: Optional[Sequence[Union[EstimationStep, SimulationStep]]] = None):
if steps is None:
steps = ()
else:
steps = tuple(steps)
return ExecutionSteps(steps=steps)
[docs]
def replace(self, **kwargs) -> ExecutionSteps:
steps = kwargs.get('steps', self._steps)
return ExecutionSteps.create(steps)
@overload
def __getitem__(self, i: int) -> EstimationStep: ...
@overload
def __getitem__(self, i: slice) -> ExecutionSteps: ...
def __getitem__(
self, i: Union[int, slice]
) -> Union[EstimationStep, SimulationStep, ExecutionSteps]:
if isinstance(i, slice):
return ExecutionSteps(self._steps[i])
return self._steps[i]
def __add__(self, other: Union[EstimationStep, ExecutionSteps, Iterable]) -> ExecutionSteps:
if isinstance(other, ExecutionSteps):
return ExecutionSteps(self._steps + other._steps)
elif isinstance(other, EstimationStep) or isinstance(other, SimulationStep):
return ExecutionSteps(self._steps + (other,))
else:
return ExecutionSteps(self._steps + tuple(other))
def __radd__(self, other: Union[EstimationStep, Iterable]) -> ExecutionSteps:
if isinstance(other, EstimationStep) or isinstance(other, SimulationStep):
return ExecutionSteps((other,) + self._steps)
else:
return ExecutionSteps(tuple(other) + self._steps)
def __len__(self):
return len(self._steps)
def __eq__(self, other: Any):
if self is other:
return True
if not isinstance(other, ExecutionSteps):
return NotImplemented
if len(self) != len(other):
return False
for s1, s2 in zip(self, other):
if s1 != s2:
return False
return True
def __hash__(self):
return hash(self._steps)
[docs]
def to_dict(self) -> dict[str, Any]:
return {'steps': tuple(step.to_dict() for step in self._steps)}
[docs]
@classmethod
def from_dict(cls, d: dict[str, Any]) -> ExecutionSteps:
steps = []
for sdict in d['steps']:
if sdict['class'] == 'EstimationStep':
s = EstimationStep.from_dict(sdict)
else:
s = SimulationStep.from_dict(sdict)
steps.append(s)
return cls(steps=tuple(steps))
[docs]
def to_dataframe(self) -> pd.DataFrame:
"""Convert to DataFrame
Use this to create an overview of all estimation steps
Returns
-------
pd.DataFrame
DataFrame overview
>>> from pharmpy.modeling import load_example_model
>>> model = load_example_model("pheno")
>>> model.execution_steps.to_dataframe() # doctest: +ELLIPSIS
method interaction parameter_uncertainty_method ... auto keep_every_nth_iter tool_options
0 FOCE True SANDWICH ... None None {}
"""
steps = [s for s in self._steps if isinstance(s, EstimationStep)]
method = [s.method for s in steps]
interaction = [s.interaction for s in steps]
parameter_uncertainty_method = [s.parameter_uncertainty_method for s in steps]
evaluation = [s.evaluation for s in steps]
maximum_evaluations = [s.maximum_evaluations for s in steps]
laplace = [s.laplace for s in steps]
isample = [s.isample for s in steps]
niter = [s.niter for s in steps]
auto = [s.auto for s in steps]
keep_every_nth_iter = [s.keep_every_nth_iter for s in steps]
tool_options = [dict(s.tool_options) if s.tool_options else dict() for s in steps]
df = pd.DataFrame(
{
'method': method,
'interaction': interaction,
'parameter_uncertainty_method': parameter_uncertainty_method,
'evaluation': evaluation,
'maximum_evaluations': maximum_evaluations,
'laplace': laplace,
'isample': isample,
'niter': niter,
'auto': auto,
'keep_every_nth_iter': keep_every_nth_iter,
'tool_options': tool_options,
}
)
return df
def __repr__(self) -> str:
if len(self) == 0:
return "ExecutionSteps()"
return self.to_dataframe().to_string()
def _repr_html_(self) -> str:
if len(self) == 0:
return "ExecutionSteps()"
else:
return self.to_dataframe().to_html()