Source code for pharmpy.model.execution_steps

from __future__ import annotations

from collections.abc import Iterable, Mapping, Sequence
from typing import Any, Optional, Union, overload

from pharmpy import DEFAULT_SEED
from pharmpy.basic import Expr
from pharmpy.deps import pandas as pd
from pharmpy.internals.immutable import Immutable, frozenmapping

SUPPORTED_SOLVERS = frozenset(('CVODES', 'DGEAR', 'DVERK', 'IDA', 'LSODA', 'LSODI'))


class ExecutionStep(Immutable):
    def __init__(
        self,
        solver: Optional[str] = None,
        solver_rtol: Optional[int] = None,
        solver_atol: Optional[int] = None,
        tool_options: frozenmapping[str, Any] = frozenmapping({}),
    ):
        self._solver = solver
        self._solver_rtol = solver_rtol
        self._solver_atol = solver_atol
        self._tool_options = tool_options

    @staticmethod
    def _canonicalize_solver(solver):
        if solver is not None:
            solver = solver.upper()
        if not (solver is None or solver in SUPPORTED_SOLVERS):
            raise ValueError(
                f"Unknown solver {solver}. Recognized solvers are {sorted(SUPPORTED_SOLVERS)}."
            )
        return solver

    @staticmethod
    def _canonicalize_tool_options(tool_options):
        tool_options = frozenmapping(tool_options)
        return tool_options

    @property
    def solver(self) -> Optional[str]:
        """Numerical solver to use when numerically solving the ODE system
        Supported solvers and their corresponding NONMEM ADVAN

        +----------------------------+------------------+
        | Solver                     | NONMEM ADVAN     |
        +============================+==================+
        | CVODES                     | ADVAN14          |
        +----------------------------+------------------+
        | DGEAR                      | ADVAN8           |
        +----------------------------+------------------+
        | DVERK                      | ADVAN6           |
        +----------------------------+------------------+
        | IDA                        | ADVAN15          |
        +----------------------------+------------------+
        | LSODA                      | ADVAN13          |
        +----------------------------+------------------+
        | LSODI                      | ADVAN9           |
        +----------------------------+------------------+
        """
        return self._solver

    @property
    def solver_rtol(self) -> Optional[int]:
        """Relative tolerance for numerical ODE system solver"""
        return self._solver_rtol

    @property
    def solver_atol(self) -> Optional[int]:
        """Absolute tolerance for numerical ODE system solver"""
        return self._solver_atol

    @property
    def tool_options(self) -> frozenmapping[str, Any]:
        """Dictionary of tool specific options"""
        return self._tool_options

    def _add_to_dict(self, d):
        tool_options = dict(self._tool_options)
        d['solver'] = self._solver
        d['solver_rtol'] = self._solver_rtol
        d['solver_atol'] = self._solver_atol
        d['tool_options'] = tool_options

    @staticmethod
    def _adjust_dict(d):
        del d['class']
        if isinstance(d['tool_options'], dict):
            d['tool_options'] = frozenmapping(d['tool_options'])

    def _partial_repr(self):
        solver = f"'{self.solver}'" if self.solver is not None else self.solver
        return (
            f"solver={solver}, "
            f"solver_rtol={self.solver_rtol}, solver_atol={self.solver_atol}, "
            f"tool_options={self.tool_options}"
        )

    def __eq__(self, other):
        # NOTE: No need to test class here
        return (
            self.solver == other.solver
            and self.solver_rtol == other.solver_rtol
            and self.solver_atol == other.solver_atol
            and self.tool_options == other.tool_options
        )

    def __hash__(self):
        return hash(
            (
                self._solver,
                self._solver_rtol,
                self._solver_atol,
                self._tool_options,
            )
        )


[docs] class EstimationStep(ExecutionStep): """Definition of one estimation operation""" """Supported estimation methods """ supported_methods = frozenset(('FO', 'FOCE', 'ITS', 'IMPMAP', 'IMP', 'SAEM', 'BAYES')) supported_parameter_uncertainty_methods = frozenset(('SANDWICH', 'SMAT', 'RMAT', 'EFIM')) def __init__( self, method: str, interaction: bool = False, parameter_uncertainty_method: Optional[str] = None, evaluation: bool = False, maximum_evaluations: Optional[int] = None, laplace: bool = False, isample: Optional[int] = None, niter: Optional[int] = None, auto: Optional[bool] = None, keep_every_nth_iter: Optional[int] = None, residuals: Sequence[str] = (), predictions: Sequence[str] = (), solver: Optional[str] = None, solver_rtol: Optional[int] = None, solver_atol: Optional[int] = None, tool_options: frozenmapping[str, Any] = frozenmapping({}), derivatives: Sequence[Sequence[Expr]] = (), individual_eta_samples: bool = False, ): self._method = method self._interaction = interaction self._parameter_uncertainty_method = parameter_uncertainty_method self._evaluation = evaluation self._maximum_evaluations = maximum_evaluations self._laplace = laplace self._isample = isample self._niter = niter self._auto = auto self._keep_every_nth_iter = keep_every_nth_iter self._residuals = residuals self._predictions = predictions self._derivatives = derivatives self._individual_eta_samples = individual_eta_samples super().__init__( solver=solver, solver_rtol=solver_rtol, solver_atol=solver_atol, tool_options=tool_options, )
[docs] @classmethod def create( cls, method: str, interaction: bool = False, parameter_uncertainty_method: Optional[str] = None, evaluation: bool = False, maximum_evaluations: Optional[int] = None, laplace: bool = False, isample: Optional[int] = None, niter: Optional[int] = None, auto: Optional[bool] = None, keep_every_nth_iter: Optional[int] = None, residuals: Sequence[str] = (), predictions: Sequence[str] = (), solver: Optional[str] = None, solver_rtol: Optional[int] = None, solver_atol: Optional[int] = None, tool_options: Mapping[str, Any] = frozenmapping({}), derivatives: Sequence[Sequence[Expr]] = (), individual_eta_samples: bool = False, ): method = EstimationStep._canonicalize_and_check_method(method) if maximum_evaluations is not None and maximum_evaluations < 1: raise ValueError( 'Number of maximum evaluations must be more than one, use ' 'evaluation=True or tool_options for special cases (e.g. 0 and -1' 'in NONMEM)' ) try: residuals = tuple(sorted(residuals)) except TypeError: raise TypeError( f"Residuals could not be converted to tuple. Recieved type '{type(residuals)}'" ) try: predictions = tuple(sorted(predictions)) except TypeError: raise TypeError( f"Predictions could not be converted to tuple. Recieved type '{type(predictions)}'" ) if derivatives: derivatives = EstimationStep._canonicalize_derivatives(derivatives) if parameter_uncertainty_method is not None: parameter_uncertainty_method = parameter_uncertainty_method.upper() if not ( parameter_uncertainty_method is None or parameter_uncertainty_method in EstimationStep.supported_parameter_uncertainty_methods ): raise ValueError( f"Unknown parameter uncertainty method {parameter_uncertainty_method}. " f"Recognized methods are {sorted(EstimationStep.supported_parameter_uncertainty_methods)}." ) solver = ExecutionStep._canonicalize_solver(solver) tool_options = ExecutionStep._canonicalize_tool_options(tool_options) return cls( method=method, interaction=interaction, parameter_uncertainty_method=parameter_uncertainty_method, evaluation=evaluation, maximum_evaluations=maximum_evaluations, laplace=laplace, isample=isample, niter=niter, auto=auto, keep_every_nth_iter=keep_every_nth_iter, residuals=residuals, predictions=predictions, solver=solver, solver_rtol=solver_rtol, solver_atol=solver_atol, tool_options=tool_options, derivatives=derivatives, individual_eta_samples=bool(individual_eta_samples), )
[docs] def replace(self, **kwargs) -> EstimationStep: """Derive a new EstimationStep with new properties""" d = {key[1:]: value for key, value in self.__dict__.items()} d.update(kwargs) new = EstimationStep.create(**d) return new
@staticmethod def _canonicalize_and_check_method(method: str) -> str: method = method.upper() if method not in EstimationStep.supported_methods: raise ValueError( f'EstimationStep: {method} not recognized. Use any of {sorted(EstimationStep.supported_methods)}.' ) return method @staticmethod def _canonicalize_derivatives(derivatives: Sequence[Sequence[Expr]]) -> tuple: try: derivatives = tuple(tuple(d) for d in derivatives) except TypeError: raise TypeError( "Given derivatives cannot be converted to tuple of tuples. " " Check type of input arguments." ) for d in derivatives: for d_arg in d: if not (isinstance(d_arg, Expr) and d_arg.is_symbol()): raise TypeError( f"Each derivative argument must be a symbol of type 'Expr'. " f"Found '{d_arg}' of type {type(d_arg)}" ) derivatives = tuple( sorted([tuple(sorted(d, key=str)) for d in derivatives], key=lambda der: str(der[0])) ) return derivatives @property def method(self) -> str: """Name of the estimation method""" return self._method @property def maximum_evaluations(self) -> Optional[int]: """Maximum allowable number of evaluations of the objective function""" return self._maximum_evaluations @property def interaction(self) -> bool: """Preserve eta-epsilon interaction in the computation of the objective function""" return self._interaction @property def evaluation(self) -> bool: """Only perform model evaluation""" return self._evaluation @property def parameter_uncertainty_method(self) -> Optional[str]: """Method to use when estimating parameter uncertainty. Supported methods and their corresponding NMTRAN code: +-------------------------------+-----------------------+ | Method | NMTRAN | +===============================+=======================+ | Sandwich | $COVARIANCE | +-------------------------------+-----------------------+ | Cross-product gradient (SMAT) | $COVARIANCE MATRIX=S | +-------------------------------+-----------------------+ | Observed FIM (RMAT) | $COVARIANCE MATRIX=R | +-------------------------------+-----------------------+ | Expected FIM (EFIM) | $DESIGN | +-------------------------------+-----------------------+ By default the following options are appended: UNCONDITIONAL: The uncertainty step is implemented regardless of how the estimation step terminates. PRINT=E: Print the eigenvalues of the correlation matrix. PRECOND=1: Perform up to 1 preconditioning cycle on the R matrix. """ return self._parameter_uncertainty_method @property def laplace(self) -> bool: """Use the laplacian method""" return self._laplace @property def isample(self) -> Optional[int]: """Number of samples per subject (or similar) for EM methods""" return self._isample @property def niter(self) -> Optional[int]: """Number of iterations for EM methods""" return self._niter @property def auto(self) -> Optional[bool]: """Let estimation tool automatically add settings""" return self._auto @property def keep_every_nth_iter(self) -> Optional[int]: """Keep results for every nth iteration""" return self._keep_every_nth_iter @property def residuals(self) -> Sequence[str]: """List of residuals to calculate""" return self._residuals @property def predictions(self) -> Sequence[str]: """List of predictions to estimate""" return self._predictions @property def derivatives(self) -> Sequence[Sequence[Expr]]: """List of derivates to calculate when running""" return self._derivatives @property def individual_eta_samples(self) -> bool: """Should individual eta samples be generated""" return self._individual_eta_samples @property def tool_options(self) -> frozenmapping[str, Any]: """Dictionary of tool specific options""" return self._tool_options def __eq__(self, other): if self is other: return True if not isinstance(other, EstimationStep): return NotImplemented return ( self._method == other._method and self._interaction == other._interaction and self._parameter_uncertainty_method == other._parameter_uncertainty_method and self._evaluation == other._evaluation and self._maximum_evaluations == other._maximum_evaluations and self._laplace == other._laplace and self._isample == other._isample and self._niter == other._niter and self._auto == other._auto and self._keep_every_nth_iter == other._keep_every_nth_iter and self._derivatives == other._derivatives and self._predictions == other._predictions and self._residuals == other._residuals and self._individual_eta_samples == other._individual_eta_samples and super().__eq__(other) ) def __hash__(self): return hash( ( self._method, self._interaction, self._parameter_uncertainty_method, self._evaluation, self._maximum_evaluations, self._laplace, self._isample, self._niter, self._auto, self._keep_every_nth_iter, self._individual_eta_samples, super().__hash__(), ) )
[docs] def to_dict(self) -> dict[str, Any]: d = { 'class': 'EstimationStep', 'method': self._method, 'interaction': self._interaction, 'parameter_uncertainty_method': self._parameter_uncertainty_method, 'evaluation': self._evaluation, 'maximum_evaluations': self._maximum_evaluations, 'laplace': self._laplace, 'isample': self._isample, 'niter': self._niter, 'auto': self._auto, 'keep_every_nth_iter': self._keep_every_nth_iter, 'derivatives': tuple(str(d) for d in self._derivatives), 'predictions': self._predictions, 'residuals': self._residuals, 'individual_eta_samples': self._individual_eta_samples, } super()._add_to_dict(d) return d
[docs] @classmethod def from_dict(cls, d: dict[str, Any]) -> EstimationStep: d = dict(d) ExecutionStep._adjust_dict(d) return cls(**d)
def __repr__(self): parameter_uncertainty_method = ( f"'{self.parameter_uncertainty_method}'" if self.parameter_uncertainty_method is not None else self.parameter_uncertainty_method ) return ( f"EstimationStep('{self.method}', interaction={self.interaction}, " f"parameter_uncertainty_method={parameter_uncertainty_method}, evaluation={self.evaluation}, " f"maximum_evaluations={self.maximum_evaluations}, laplace={self.laplace}, " f"isample={self.isample}, niter={self.niter}, auto={self.auto}, " f"keep_every_nth_iter={self.keep_every_nth_iter}, " f"individual_eta_samples={self.individual_eta_samples}, {super()._partial_repr()})" )
[docs] class SimulationStep(ExecutionStep): """Definition of one simulation operation""" def __init__( self, n: int = 1, seed: int = DEFAULT_SEED, solver: Optional[str] = None, solver_rtol: Optional[int] = None, solver_atol: Optional[int] = None, tool_options: frozenmapping[str, Any] = frozenmapping({}), ): self._n = n self._seed = seed super().__init__( solver=solver, solver_rtol=solver_rtol, solver_atol=solver_atol, tool_options=tool_options, )
[docs] @classmethod def create( cls, n: int = 1, seed: int = DEFAULT_SEED, solver: Optional[str] = None, solver_rtol: Optional[int] = None, solver_atol: Optional[int] = None, tool_options: Mapping[str, Any] = frozenmapping({}), ): if n < 1: raise ValueError("Need at least one replicate in SimulationStep") return cls(n=n, seed=seed)
[docs] def replace(self, **kwargs) -> SimulationStep: """Derive a new SimulationStep with new properties""" d = {key[1:]: value for key, value in self.__dict__.items()} d.update(kwargs) new = SimulationStep.create(**d) return new
@property def n(self) -> int: """Number of simulation replicates""" return self._n @property def seed(self) -> int: """Random seed""" return self._seed def __eq__(self, other): if self is other: return True if not isinstance(other, SimulationStep): return NotImplemented return self._n == other._n and self._seed == other._seed and super().__eq__(other) def __hash__(self): return hash((self._n, self._seed, super().__hash__()))
[docs] def to_dict(self) -> dict[str, Any]: d = { 'class': 'SimulationStep', 'n': self._n, 'seed': self._seed, } super()._add_to_dict(d) return d
[docs] @classmethod def from_dict(cls, d: dict[str, Any]) -> SimulationStep: d = dict(d) ExecutionStep._adjust_dict(d) return cls(**d)
def __repr__(self): return f"SimulationStep(n={self._n}, seed={self._seed}, {super()._partial_repr()})"
[docs] class ExecutionSteps(Sequence, Immutable): """A sequence of estimation steps Parameters ---------- steps : iterable or None Used for initialization """ def __init__(self, steps: tuple[Union[EstimationStep, SimulationStep], ...] = ()): self._steps = steps
[docs] @classmethod def create(cls, steps: Optional[Sequence[Union[EstimationStep, SimulationStep]]] = None): if steps is None: steps = () else: steps = tuple(steps) return ExecutionSteps(steps=steps)
[docs] def replace(self, **kwargs) -> ExecutionSteps: steps = kwargs.get('steps', self._steps) return ExecutionSteps.create(steps)
@overload def __getitem__(self, i: int) -> EstimationStep: ... @overload def __getitem__(self, i: slice) -> ExecutionSteps: ... def __getitem__( self, i: Union[int, slice] ) -> Union[EstimationStep, SimulationStep, ExecutionSteps]: if isinstance(i, slice): return ExecutionSteps(self._steps[i]) return self._steps[i] def __add__(self, other: Union[EstimationStep, ExecutionSteps, Iterable]) -> ExecutionSteps: if isinstance(other, ExecutionSteps): return ExecutionSteps(self._steps + other._steps) elif isinstance(other, EstimationStep) or isinstance(other, SimulationStep): return ExecutionSteps(self._steps + (other,)) else: return ExecutionSteps(self._steps + tuple(other)) def __radd__(self, other: Union[EstimationStep, Iterable]) -> ExecutionSteps: if isinstance(other, EstimationStep) or isinstance(other, SimulationStep): return ExecutionSteps((other,) + self._steps) else: return ExecutionSteps(tuple(other) + self._steps) def __len__(self): return len(self._steps) def __eq__(self, other: Any): if self is other: return True if not isinstance(other, ExecutionSteps): return NotImplemented if len(self) != len(other): return False for s1, s2 in zip(self, other): if s1 != s2: return False return True def __hash__(self): return hash(self._steps)
[docs] def to_dict(self) -> dict[str, Any]: return {'steps': tuple(step.to_dict() for step in self._steps)}
[docs] @classmethod def from_dict(cls, d: dict[str, Any]) -> ExecutionSteps: steps = [] for sdict in d['steps']: if sdict['class'] == 'EstimationStep': s = EstimationStep.from_dict(sdict) else: s = SimulationStep.from_dict(sdict) steps.append(s) return cls(steps=tuple(steps))
[docs] def to_dataframe(self) -> pd.DataFrame: """Convert to DataFrame Use this to create an overview of all estimation steps Returns ------- pd.DataFrame DataFrame overview >>> from pharmpy.modeling import load_example_model >>> model = load_example_model("pheno") >>> model.execution_steps.to_dataframe() # doctest: +ELLIPSIS method interaction parameter_uncertainty_method ... auto keep_every_nth_iter tool_options 0 FOCE True SANDWICH ... None None {} """ steps = [s for s in self._steps if isinstance(s, EstimationStep)] method = [s.method for s in steps] interaction = [s.interaction for s in steps] parameter_uncertainty_method = [s.parameter_uncertainty_method for s in steps] evaluation = [s.evaluation for s in steps] maximum_evaluations = [s.maximum_evaluations for s in steps] laplace = [s.laplace for s in steps] isample = [s.isample for s in steps] niter = [s.niter for s in steps] auto = [s.auto for s in steps] keep_every_nth_iter = [s.keep_every_nth_iter for s in steps] tool_options = [dict(s.tool_options) if s.tool_options else dict() for s in steps] df = pd.DataFrame( { 'method': method, 'interaction': interaction, 'parameter_uncertainty_method': parameter_uncertainty_method, 'evaluation': evaluation, 'maximum_evaluations': maximum_evaluations, 'laplace': laplace, 'isample': isample, 'niter': niter, 'auto': auto, 'keep_every_nth_iter': keep_every_nth_iter, 'tool_options': tool_options, } ) return df
def __repr__(self) -> str: if len(self) == 0: return "ExecutionSteps()" return self.to_dataframe().to_string() def _repr_html_(self) -> str: if len(self) == 0: return "ExecutionSteps()" else: return self.to_dataframe().to_html()