from __future__ import annotations
from collections.abc import Iterable, Mapping, Sequence
from typing import Any, Optional, Union, overload
from pharmpy.basic import Expr
from pharmpy.deps import pandas as pd
from pharmpy.internals.immutable import Immutable, frozenmapping
SUPPORTED_SOLVERS = frozenset(('CVODES', 'DGEAR', 'DVERK', 'IDA', 'LSODA', 'LSODI'))
class ExecutionStep(Immutable):
def __init__(
self,
solver: Optional[str] = None,
solver_rtol: Optional[int] = None,
solver_atol: Optional[int] = None,
tool_options: frozenmapping[str, Any] = frozenmapping({}),
):
self._solver = solver
self._solver_rtol = solver_rtol
self._solver_atol = solver_atol
self._tool_options = tool_options
@staticmethod
def _canonicalize_solver(solver):
if solver is not None:
solver = solver.upper()
if not (solver is None or solver in SUPPORTED_SOLVERS):
raise ValueError(
f"Unknown solver {solver}. Recognized solvers are {sorted(SUPPORTED_SOLVERS)}."
)
return solver
@staticmethod
def _canonicalize_tool_options(tool_options):
tool_options = frozenmapping(tool_options)
return tool_options
@property
def solver(self) -> Optional[str]:
"""Numerical solver to use when numerically solving the ODE system
Supported solvers and their corresponding NONMEM ADVAN
+----------------------------+------------------+
| Solver | NONMEM ADVAN |
+============================+==================+
| CVODES | ADVAN14 |
+----------------------------+------------------+
| DGEAR | ADVAN8 |
+----------------------------+------------------+
| DVERK | ADVAN6 |
+----------------------------+------------------+
| IDA | ADVAN15 |
+----------------------------+------------------+
| LSODA | ADVAN13 |
+----------------------------+------------------+
| LSODI | ADVAN9 |
+----------------------------+------------------+
"""
return self._solver
@property
def solver_rtol(self) -> Optional[int]:
"""Relative tolerance for numerical ODE system solver"""
return self._solver_rtol
@property
def solver_atol(self) -> Optional[int]:
"""Absolute tolerance for numerical ODE system solver"""
return self._solver_atol
@property
def tool_options(self) -> frozenmapping[str, Any]:
"""Dictionary of tool specific options"""
return self._tool_options
def _add_to_dict(self, d):
tool_options = dict(self._tool_options)
d['solver'] = self._solver
d['solver_rtol'] = self._solver_rtol
d['solver_atol'] = self._solver_atol
d['tool_options'] = tool_options
@staticmethod
def _adjust_dict(d):
del d['class']
if isinstance(d['tool_options'], dict):
d['tool_options'] = frozenmapping(d['tool_options'])
def _partial_repr(self):
solver = f"'{self.solver}'" if self.solver is not None else self.solver
return (
f"solver={solver}, "
f"solver_rtol={self.solver_rtol}, solver_atol={self.solver_atol}, "
f"tool_options={self.tool_options}"
)
def __eq__(self, other):
# NOTE: No need to test class here
return (
self.solver == other.solver
and self.solver_rtol == other.solver_rtol
and self.solver_atol == other.solver_atol
and self.tool_options == other.tool_options
)
def __hash__(self):
return hash(
(
self._solver,
self._solver_rtol,
self._solver_atol,
self._tool_options,
)
)
[docs]
class EstimationStep(ExecutionStep):
"""Definition of one estimation operation"""
"""Supported estimation methods
"""
supported_methods = frozenset(('FO', 'FOCE', 'ITS', 'IMPMAP', 'IMP', 'SAEM', 'BAYES'))
supported_parameter_uncertainty_methods = frozenset(('SANDWICH', 'SMAT', 'RMAT', 'EFIM'))
def __init__(
self,
method: str,
interaction: bool = False,
parameter_uncertainty_method: Optional[str] = None,
evaluation: bool = False,
maximum_evaluations: Optional[int] = None,
laplace: bool = False,
isample: Optional[int] = None,
niter: Optional[int] = None,
auto: Optional[bool] = None,
keep_every_nth_iter: Optional[int] = None,
residuals: Sequence[str] = (),
predictions: Sequence[str] = (),
solver: Optional[str] = None,
solver_rtol: Optional[int] = None,
solver_atol: Optional[int] = None,
tool_options: frozenmapping[str, Any] = frozenmapping({}),
derivatives: Sequence[Sequence[Expr]] = (),
individual_eta_samples: bool = False,
):
self._method = method
self._interaction = interaction
self._parameter_uncertainty_method = parameter_uncertainty_method
self._evaluation = evaluation
self._maximum_evaluations = maximum_evaluations
self._laplace = laplace
self._isample = isample
self._niter = niter
self._auto = auto
self._keep_every_nth_iter = keep_every_nth_iter
self._residuals = residuals
self._predictions = predictions
self._derivatives = derivatives
self._individual_eta_samples = individual_eta_samples
super().__init__(
solver=solver,
solver_rtol=solver_rtol,
solver_atol=solver_atol,
tool_options=tool_options,
)
[docs]
@classmethod
def create(
cls,
method: str,
interaction: bool = False,
parameter_uncertainty_method: Optional[str] = None,
evaluation: bool = False,
maximum_evaluations: Optional[int] = None,
laplace: bool = False,
isample: Optional[int] = None,
niter: Optional[int] = None,
auto: Optional[bool] = None,
keep_every_nth_iter: Optional[int] = None,
residuals: Sequence[str] = (),
predictions: Sequence[str] = (),
solver: Optional[str] = None,
solver_rtol: Optional[int] = None,
solver_atol: Optional[int] = None,
tool_options: Mapping[str, Any] = frozenmapping({}),
derivatives: Sequence[Sequence[Expr]] = (),
individual_eta_samples: bool = False,
):
method = EstimationStep._canonicalize_and_check_method(method)
if maximum_evaluations is not None and maximum_evaluations < 1:
raise ValueError(
'Number of maximum evaluations must be more than one, use '
'evaluation=True or tool_options for special cases (e.g. 0 and -1'
'in NONMEM)'
)
try:
residuals = tuple(sorted(residuals))
except TypeError:
raise TypeError(
f"Residuals could not be converted to tuple. Recieved type '{type(residuals)}'"
)
try:
predictions = tuple(sorted(predictions))
except TypeError:
raise TypeError(
f"Predictions could not be converted to tuple. Recieved type '{type(predictions)}'"
)
if derivatives:
derivatives = EstimationStep._canonicalize_derivatives(derivatives)
if parameter_uncertainty_method is not None:
parameter_uncertainty_method = parameter_uncertainty_method.upper()
if not (
parameter_uncertainty_method is None
or parameter_uncertainty_method
in EstimationStep.supported_parameter_uncertainty_methods
):
raise ValueError(
f"Unknown parameter uncertainty method {parameter_uncertainty_method}. "
f"Recognized methods are {sorted(EstimationStep.supported_parameter_uncertainty_methods)}."
)
solver = ExecutionStep._canonicalize_solver(solver)
tool_options = ExecutionStep._canonicalize_tool_options(tool_options)
return cls(
method=method,
interaction=interaction,
parameter_uncertainty_method=parameter_uncertainty_method,
evaluation=evaluation,
maximum_evaluations=maximum_evaluations,
laplace=laplace,
isample=isample,
niter=niter,
auto=auto,
keep_every_nth_iter=keep_every_nth_iter,
residuals=residuals,
predictions=predictions,
solver=solver,
solver_rtol=solver_rtol,
solver_atol=solver_atol,
tool_options=tool_options,
derivatives=derivatives,
individual_eta_samples=bool(individual_eta_samples),
)
[docs]
def replace(self, **kwargs) -> EstimationStep:
"""Derive a new EstimationStep with new properties"""
d = {key[1:]: value for key, value in self.__dict__.items()}
d.update(kwargs)
new = EstimationStep.create(**d)
return new
@staticmethod
def _canonicalize_and_check_method(method: str) -> str:
method = method.upper()
if method not in EstimationStep.supported_methods:
raise ValueError(
f'EstimationStep: {method} not recognized. Use any of {sorted(EstimationStep.supported_methods)}.'
)
return method
@staticmethod
def _canonicalize_derivatives(derivatives: Sequence[Sequence[Expr]]) -> tuple:
try:
derivatives = tuple(tuple(d) for d in derivatives)
except TypeError:
raise TypeError(
"Given derivatives cannot be converted to tuple of tuples. "
" Check type of input arguments."
)
for d in derivatives:
for d_arg in d:
if not (isinstance(d_arg, Expr) and d_arg.is_symbol()):
raise TypeError(
f"Each derivative argument must be a symbol of type 'Expr'. "
f"Found '{d_arg}' of type {type(d_arg)}"
)
derivatives = tuple(
sorted([tuple(sorted(d, key=str)) for d in derivatives], key=lambda der: str(der[0]))
)
return derivatives
@property
def method(self) -> str:
"""Name of the estimation method"""
return self._method
@property
def maximum_evaluations(self) -> Optional[int]:
"""Maximum allowable number of evaluations of the objective function"""
return self._maximum_evaluations
@property
def interaction(self) -> bool:
"""Preserve eta-epsilon interaction in the computation of the objective function"""
return self._interaction
@property
def evaluation(self) -> bool:
"""Only perform model evaluation"""
return self._evaluation
@property
def parameter_uncertainty_method(self) -> Optional[str]:
"""Method to use when estimating parameter uncertainty.
Supported methods and their corresponding NMTRAN code:
+-------------------------------+-----------------------+
| Method | NMTRAN |
+===============================+=======================+
| Sandwich | $COVARIANCE |
+-------------------------------+-----------------------+
| Cross-product gradient (SMAT) | $COVARIANCE MATRIX=S |
+-------------------------------+-----------------------+
| Observed FIM (RMAT) | $COVARIANCE MATRIX=R |
+-------------------------------+-----------------------+
| Expected FIM (EFIM) | $DESIGN |
+-------------------------------+-----------------------+
By default the following options are appended:
UNCONDITIONAL: The uncertainty step is implemented regardless of how the estimation step terminates.
PRINT=E: Print the eigenvalues of the correlation matrix.
PRECOND=1: Perform up to 1 preconditioning cycle on the R matrix.
"""
return self._parameter_uncertainty_method
@property
def laplace(self) -> bool:
"""Use the laplacian method"""
return self._laplace
@property
def isample(self) -> Optional[int]:
"""Number of samples per subject (or similar) for EM methods"""
return self._isample
@property
def niter(self) -> Optional[int]:
"""Number of iterations for EM methods"""
return self._niter
@property
def auto(self) -> Optional[bool]:
"""Let estimation tool automatically add settings"""
return self._auto
@property
def keep_every_nth_iter(self) -> Optional[int]:
"""Keep results for every nth iteration"""
return self._keep_every_nth_iter
@property
def residuals(self) -> Sequence[str]:
"""List of residuals to calculate"""
return self._residuals
@property
def predictions(self) -> Sequence[str]:
"""List of predictions to estimate"""
return self._predictions
@property
def derivatives(self) -> Sequence[Sequence[Expr]]:
"""List of derivates to calculate when running"""
return self._derivatives
@property
def individual_eta_samples(self) -> bool:
"""Should individual eta samples be generated"""
return self._individual_eta_samples
@property
def tool_options(self) -> frozenmapping[str, Any]:
"""Dictionary of tool specific options"""
return self._tool_options
def __eq__(self, other):
if self is other:
return True
if not isinstance(other, EstimationStep):
return NotImplemented
return (
self._method == other._method
and self._interaction == other._interaction
and self._parameter_uncertainty_method == other._parameter_uncertainty_method
and self._evaluation == other._evaluation
and self._maximum_evaluations == other._maximum_evaluations
and self._laplace == other._laplace
and self._isample == other._isample
and self._niter == other._niter
and self._auto == other._auto
and self._keep_every_nth_iter == other._keep_every_nth_iter
and self._derivatives == other._derivatives
and self._predictions == other._predictions
and self._residuals == other._residuals
and self._individual_eta_samples == other._individual_eta_samples
and super().__eq__(other)
)
def __hash__(self):
return hash(
(
self._method,
self._interaction,
self._parameter_uncertainty_method,
self._evaluation,
self._maximum_evaluations,
self._laplace,
self._isample,
self._niter,
self._auto,
self._keep_every_nth_iter,
self._individual_eta_samples,
super().__hash__(),
)
)
[docs]
def to_dict(self) -> dict[str, Any]:
d = {
'class': 'EstimationStep',
'method': self._method,
'interaction': self._interaction,
'parameter_uncertainty_method': self._parameter_uncertainty_method,
'evaluation': self._evaluation,
'maximum_evaluations': self._maximum_evaluations,
'laplace': self._laplace,
'isample': self._isample,
'niter': self._niter,
'auto': self._auto,
'keep_every_nth_iter': self._keep_every_nth_iter,
'derivatives': tuple(str(d) for d in self._derivatives),
'predictions': self._predictions,
'residuals': self._residuals,
'individual_eta_samples': self._individual_eta_samples,
}
super()._add_to_dict(d)
return d
[docs]
@classmethod
def from_dict(cls, d: dict[str, Any]) -> EstimationStep:
d = dict(d)
ExecutionStep._adjust_dict(d)
return cls(**d)
def __repr__(self):
parameter_uncertainty_method = (
f"'{self.parameter_uncertainty_method}'"
if self.parameter_uncertainty_method is not None
else self.parameter_uncertainty_method
)
return (
f"EstimationStep('{self.method}', interaction={self.interaction}, "
f"parameter_uncertainty_method={parameter_uncertainty_method}, evaluation={self.evaluation}, "
f"maximum_evaluations={self.maximum_evaluations}, laplace={self.laplace}, "
f"isample={self.isample}, niter={self.niter}, auto={self.auto}, "
f"keep_every_nth_iter={self.keep_every_nth_iter}, "
f"individual_eta_samples={self.individual_eta_samples}, {super()._partial_repr()})"
)
[docs]
class SimulationStep(ExecutionStep):
"""Definition of one simulation operation"""
def __init__(
self,
n: int = 1,
seed: int = 64206,
solver: Optional[str] = None,
solver_rtol: Optional[int] = None,
solver_atol: Optional[int] = None,
tool_options: frozenmapping[str, Any] = frozenmapping({}),
):
self._n = n
self._seed = seed
super().__init__(
solver=solver,
solver_rtol=solver_rtol,
solver_atol=solver_atol,
tool_options=tool_options,
)
[docs]
@classmethod
def create(
cls,
n: int = 1,
seed: int = 64206,
solver: Optional[str] = None,
solver_rtol: Optional[int] = None,
solver_atol: Optional[int] = None,
tool_options: Mapping[str, Any] = frozenmapping({}),
):
if n < 1:
raise ValueError("Need at least one replicate in SimulationStep")
return cls(n=n, seed=seed)
[docs]
def replace(self, **kwargs) -> SimulationStep:
"""Derive a new SimulationStep with new properties"""
d = {key[1:]: value for key, value in self.__dict__.items()}
d.update(kwargs)
new = SimulationStep.create(**d)
return new
@property
def n(self) -> int:
"""Number of simulation replicates"""
return self._n
@property
def seed(self) -> int:
"""Random seed"""
return self._seed
def __eq__(self, other):
if self is other:
return True
if not isinstance(other, SimulationStep):
return NotImplemented
return self._n == other._n and self._seed == other._seed and super().__eq__(other)
def __hash__(self):
return hash((self._n, self._seed, super().__hash__()))
[docs]
def to_dict(self) -> dict[str, Any]:
d = {
'class': 'SimulationStep',
'n': self._n,
'seed': self._seed,
}
super()._add_to_dict(d)
return d
[docs]
@classmethod
def from_dict(cls, d: dict[str, Any]) -> SimulationStep:
d = dict(d)
ExecutionStep._adjust_dict(d)
return cls(**d)
def __repr__(self):
return f"SimulationStep(n={self._n}, seed={self._seed}, {super()._partial_repr()})"
[docs]
class ExecutionSteps(Sequence, Immutable):
"""A sequence of estimation steps
Parameters
----------
steps : iterable or None
Used for initialization
"""
def __init__(self, steps: tuple[Union[EstimationStep, SimulationStep], ...] = ()):
self._steps = steps
[docs]
@classmethod
def create(cls, steps: Optional[Sequence[Union[EstimationStep, SimulationStep]]] = None):
if steps is None:
steps = ()
else:
steps = tuple(steps)
return ExecutionSteps(steps=steps)
[docs]
def replace(self, **kwargs) -> ExecutionSteps:
steps = kwargs.get('steps', self._steps)
return ExecutionSteps.create(steps)
@overload
def __getitem__(self, i: int) -> EstimationStep: ...
@overload
def __getitem__(self, i: slice) -> ExecutionSteps: ...
def __getitem__(
self, i: Union[int, slice]
) -> Union[EstimationStep, SimulationStep, ExecutionSteps]:
if isinstance(i, slice):
return ExecutionSteps(self._steps[i])
return self._steps[i]
def __add__(self, other: Union[EstimationStep, ExecutionSteps, Iterable]) -> ExecutionSteps:
if isinstance(other, ExecutionSteps):
return ExecutionSteps(self._steps + other._steps)
elif isinstance(other, EstimationStep) or isinstance(other, SimulationStep):
return ExecutionSteps(self._steps + (other,))
else:
return ExecutionSteps(self._steps + tuple(other))
def __radd__(self, other: Union[EstimationStep, Iterable]) -> ExecutionSteps:
if isinstance(other, EstimationStep) or isinstance(other, SimulationStep):
return ExecutionSteps((other,) + self._steps)
else:
return ExecutionSteps(tuple(other) + self._steps)
def __len__(self):
return len(self._steps)
def __eq__(self, other: Any):
if self is other:
return True
if not isinstance(other, ExecutionSteps):
return NotImplemented
if len(self) != len(other):
return False
for s1, s2 in zip(self, other):
if s1 != s2:
return False
return True
def __hash__(self):
return hash(self._steps)
[docs]
def to_dict(self) -> dict[str, Any]:
return {'steps': tuple(step.to_dict() for step in self._steps)}
[docs]
@classmethod
def from_dict(cls, d: dict[str, Any]) -> ExecutionSteps:
steps = []
for sdict in d['steps']:
if sdict['class'] == 'EstimationStep':
s = EstimationStep.from_dict(sdict)
else:
s = SimulationStep.from_dict(sdict)
steps.append(s)
return cls(steps=tuple(steps))
[docs]
def to_dataframe(self) -> pd.DataFrame:
"""Convert to DataFrame
Use this to create an overview of all estimation steps
Returns
-------
pd.DataFrame
DataFrame overview
>>> from pharmpy.modeling import load_example_model
>>> model = load_example_model("pheno")
>>> model.execution_steps.to_dataframe() # doctest: +ELLIPSIS
method interaction parameter_uncertainty_method ... auto keep_every_nth_iter tool_options
0 FOCE True SANDWICH ... None None {}
"""
steps = [s for s in self._steps if isinstance(s, EstimationStep)]
method = [s.method for s in steps]
interaction = [s.interaction for s in steps]
parameter_uncertainty_method = [s.parameter_uncertainty_method for s in steps]
evaluation = [s.evaluation for s in steps]
maximum_evaluations = [s.maximum_evaluations for s in steps]
laplace = [s.laplace for s in steps]
isample = [s.isample for s in steps]
niter = [s.niter for s in steps]
auto = [s.auto for s in steps]
keep_every_nth_iter = [s.keep_every_nth_iter for s in steps]
tool_options = [dict(s.tool_options) if s.tool_options else dict() for s in steps]
df = pd.DataFrame(
{
'method': method,
'interaction': interaction,
'parameter_uncertainty_method': parameter_uncertainty_method,
'evaluation': evaluation,
'maximum_evaluations': maximum_evaluations,
'laplace': laplace,
'isample': isample,
'niter': niter,
'auto': auto,
'keep_every_nth_iter': keep_every_nth_iter,
'tool_options': tool_options,
}
)
return df
def __repr__(self) -> str:
if len(self) == 0:
return "ExecutionSteps()"
return self.to_dataframe().to_string()
def _repr_html_(self) -> str:
if len(self) == 0:
return "ExecutionSteps()"
else:
return self.to_dataframe().to_html()