"""
===================
Generic Model class
===================
**Base class of all implementations.**
Inherit to *implement*, i.e. to define support for a specific model type.
Definitions
-----------
"""
from __future__ import annotations
import dataclasses
import json
import warnings
from collections.abc import Mapping
from dataclasses import dataclass
from pathlib import Path
from typing import TYPE_CHECKING, Any, Optional, Union
import pharmpy
from pharmpy.basic import Expr, TExpr, TSymbol
from pharmpy.internals.df import hash_df_runtime
from pharmpy.internals.immutable import Immutable, cache_method, frozenmapping
from pharmpy.model.external import detect_model
from .datainfo import ColumnInfo, DataInfo
from .execution_steps import ExecutionSteps
from .parameters import Parameters
from .random_variables import RandomVariables
from .statements import CompartmentalSystem, Statements
if TYPE_CHECKING:
import pandas as pd
else:
from pharmpy.deps import pandas as pd
[docs]
class ModelError(Exception):
"""Exception for errors in model object"""
pass
[docs]
class ModelSyntaxError(ModelError):
"""Exception for Syntax errors in model code"""
def __init__(self, msg='model syntax error'):
super().__init__(msg)
[docs]
class ModelfitResultsError(ModelError):
"""Exception for issues with ModelfitResults"""
pass
@dataclass(frozen=True)
class ModelInternals:
def __init__(self):
pass
def replace(self, **kwargs):
return dataclasses.replace(self, **kwargs)
[docs]
class Model(Immutable):
"""The Pharmpy model class"""
filename_extension = '.ppmod'
def __init__(
self,
name: str = '',
parameters: Parameters = Parameters(),
random_variables: RandomVariables = RandomVariables.create(()),
statements: Statements = Statements(),
dataset: Optional[pd.DataFrame] = None,
datainfo: DataInfo = DataInfo(),
dependent_variables: frozenmapping[Expr, int] = frozenmapping({Expr.symbol('y'): 1}),
observation_transformation: Optional[frozenmapping[Expr, Expr]] = None,
execution_steps: ExecutionSteps = ExecutionSteps(),
parent_model: Optional[str] = None,
initial_individual_estimates: Optional[pd.DataFrame] = None,
value_type: str = 'PREDICTION',
description: str = '',
internals: Optional[ModelInternals] = None,
):
self._name = name
self._datainfo = datainfo
self._dataset = dataset
self._random_variables = random_variables
self._parameters = parameters
self._statements = statements
self._dependent_variables = dependent_variables
if observation_transformation is None:
observation_transformation = frozenmapping(
{dv: dv for dv in dependent_variables.keys()}
)
self._observation_transformation = observation_transformation
self._execution_steps = execution_steps
self._parent_model = parent_model
self._initial_individual_estimates = initial_individual_estimates
self._value_type = value_type
self._description = description
self._internals = internals
[docs]
@classmethod
def create(
cls,
name: str,
parameters: Optional[Parameters] = None,
random_variables: Optional[RandomVariables] = None,
statements: Optional[Statements] = None,
dataset: Optional[pd.DataFrame] = None,
datainfo: DataInfo = DataInfo(),
dependent_variables: Optional[Mapping[TSymbol, int]] = None,
observation_transformation: Optional[Mapping[TSymbol, TExpr]] = None,
execution_steps: Optional[ExecutionSteps] = None,
parent_model: Optional[str] = None,
initial_individual_estimates: Optional[pd.DataFrame] = None,
value_type: str = 'PREDICTION',
description: str = '',
internals: Optional[ModelInternals] = None,
):
Model._canonicalize_name(name)
dvs = Model._canonicalize_dependent_variables(dependent_variables)
obs_transformation = Model._canonicalize_observation_transformation(
observation_transformation, dvs
)
parameters = Model._canonicalize_parameters(parameters)
random_variables = Model._canonicalize_random_variables(random_variables)
parameters = Model._canonicalize_parameter_estimates(parameters, random_variables)
execution_steps = Model._canonicalize_execution_steps(execution_steps)
value_type = Model._canonicalize_value_type(value_type)
if not isinstance(datainfo, DataInfo):
raise TypeError("model.datainfo must be of DataInfo type")
if dataset is not None:
datainfo = update_datainfo(datainfo, dataset)
statements = Model._canonicalize_statements(
statements, parameters, random_variables, datainfo
)
return cls(
name=name,
dependent_variables=dvs,
observation_transformation=obs_transformation,
parameters=parameters,
random_variables=random_variables,
execution_steps=execution_steps,
statements=statements,
description=description,
parent_model=parent_model,
value_type=value_type,
internals=internals,
initial_individual_estimates=initial_individual_estimates,
dataset=dataset,
datainfo=datainfo,
)
@staticmethod
def _canonicalize_value_type(value: str) -> str:
allowed_strings = ('PREDICTION', 'LIKELIHOOD', '-2LL')
if isinstance(value, str):
if value.upper() not in allowed_strings:
raise ValueError(
f"Cannot set value_type to {value}. Must be one of {allowed_strings} "
f"or a symbol"
)
value = value.upper()
elif not (isinstance(value, Expr) and value.is_symbol()):
raise ValueError(f"Can only set value_type to one of {allowed_strings} or a symbol")
return value
@staticmethod
def _canonicalize_parameter_estimates(params, rvs) -> Parameters:
inits = params.inits
if not rvs.validate_parameters(inits):
nearest = rvs.nearest_valid_parameters(inits)
before, after = compare_before_after_params(inits, nearest)
warnings.warn(
f"Adjusting initial estimates to create positive semidefinite "
f"omega/sigma matrices.\nBefore adjusting: {before}.\n"
f"After adjusting: {after}"
)
params = params.set_initial_estimates(nearest)
return params
@staticmethod
def _canonicalize_random_variables(rvs: Optional[RandomVariables]) -> RandomVariables:
if isinstance(rvs, RandomVariables):
return rvs
elif rvs is None:
return RandomVariables.create()
else:
raise TypeError("model.random_variables must be of RandomVariables type")
@staticmethod
def _canonicalize_statements(
statements: Optional[Statements],
params: Parameters,
rvs: RandomVariables,
datainfo: DataInfo,
) -> Statements:
if statements is None:
return Statements()
if not isinstance(statements, Statements):
raise TypeError("model.statements must be of Statements type")
colnames = {Expr.symbol(colname) for colname in datainfo.names}
symbs_all = rvs.free_symbols.union(params.symbols).union(colnames)
if statements.ode_system is not None:
symbs_all = symbs_all.union({statements.ode_system.t})
for i, statement in enumerate(statements):
if isinstance(statement, CompartmentalSystem):
continue
symbs = statement.expression.free_symbols
if not symbs.issubset(symbs_all):
# E.g. after solve_ode_system
if statement.symbol.is_function():
symbs_all.add(Expr.symbol(statement.symbol.name))
for arg in statement.symbol.args:
if arg.is_symbol():
symbs_all.add(arg)
for symb in symbs:
if symb in symbs_all:
continue
if str(symb) == 'NaN':
continue
if statements.find_assignment(symb) is None:
raise ValueError(f'Symbol {symb} is not defined')
if statements[:i].find_assignment_index(symb) is None:
raise ValueError(f'Symbol {symb} defined after being used')
return statements
@staticmethod
def _canonicalize_name(name: str) -> None:
if not isinstance(name, str):
raise TypeError("Name of a model has to be of string type")
@staticmethod
def _canonicalize_dependent_variables(
dvs: Optional[Mapping[TSymbol, int]]
) -> frozenmapping[Expr, int]:
if dvs is None:
dvs = {Expr.symbol('y'): 1}
for key, value in dvs.items():
if isinstance(key, str):
key = Expr.symbol(key)
if not (isinstance(key, Expr) and key.is_symbol()):
raise TypeError("Dependent variable keys must be a string or a symbol")
if not isinstance(value, int):
raise TypeError("Dependent variable values must be of int type")
return frozenmapping(dvs)
@staticmethod
def _canonicalize_observation_transformation(
obs: Optional[Mapping[TSymbol, TExpr]],
dvs: frozenmapping[Expr, int],
) -> frozenmapping[Expr, Expr]:
if obs is None:
obs = {dv: dv for dv in dvs.keys()}
for key, value in obs.items():
key = Expr(key)
value = Expr(value)
return frozenmapping(obs)
@staticmethod
def _canonicalize_parameters(params: Optional[Parameters]) -> Parameters:
if params is None:
return Parameters()
else:
if not isinstance(params, Parameters):
raise TypeError("parameters must be of Parameters type")
return params
@staticmethod
def _canonicalize_execution_steps(steps: Optional[ExecutionSteps]) -> ExecutionSteps:
if steps is None:
return ExecutionSteps()
else:
if not isinstance(steps, ExecutionSteps):
raise TypeError("model.execution_steps must be of ExecutionSteps type")
return steps
[docs]
def replace(self, **kwargs) -> Model:
name = kwargs.get('name', self.name)
Model._canonicalize_name(name)
description = kwargs.get('description', self.description)
internals = kwargs.get('internals', self._internals)
parent_model = kwargs.get('parent_model', self.parent_model)
initial_individual_estimates = kwargs.get(
'initial_individual_estimates', self.initial_individual_estimates
)
for key_name in (
'name',
'description',
'internals',
'parent_model',
'initial_individual_estimates',
):
try:
kwargs.pop(key_name)
except KeyError:
pass
if 'dependent_variables' in kwargs:
dependent_variables = Model._canonicalize_dependent_variables(
kwargs['dependent_variables']
)
kwargs.pop('dependent_variables')
else:
dependent_variables = self.dependent_variables
if 'observation_transformation' in kwargs:
observation_transformation = Model._canonicalize_observation_transformation(
kwargs['observation_transformation'], dependent_variables
)
kwargs.pop('observation_transformation')
else:
observation_transformation = self.observation_transformation
if 'parameters' in kwargs:
parameters = Model._canonicalize_parameters(kwargs['parameters'])
kwargs.pop('parameters')
else:
parameters = self.parameters
if 'random_variables' in kwargs:
random_variables = Model._canonicalize_random_variables(kwargs['random_variables'])
kwargs.pop('random_variables')
else:
random_variables = self.random_variables
parameters = Model._canonicalize_parameter_estimates(parameters, random_variables)
if 'dataset' in kwargs:
dataset = kwargs['dataset']
new_dataset = True
kwargs.pop('dataset')
else:
dataset = self._dataset
new_dataset = False
if 'datainfo' in kwargs:
datainfo = kwargs['datainfo']
if not isinstance(datainfo, DataInfo):
raise TypeError("model.datainfo must be of DataInfo type")
new_datainfo = True
kwargs.pop('datainfo')
else:
datainfo = self._datainfo
new_datainfo = False
if new_dataset and dataset is not None:
datainfo = update_datainfo(datainfo, dataset)
if not new_datainfo:
datainfo = datainfo.replace(path=None)
# Has to be checked after datainfo is updated since it looks for symbols in datainfo as well
if 'statements' in kwargs:
statements = Model._canonicalize_statements(
kwargs['statements'], parameters, random_variables, datainfo
)
kwargs.pop('statements')
else:
statements = self.statements
if 'execution_steps' in kwargs:
execution_steps = Model._canonicalize_execution_steps(kwargs['execution_steps'])
kwargs.pop('execution_steps')
else:
execution_steps = self.execution_steps
if 'value_type' in kwargs:
value_type = Model._canonicalize_value_type(kwargs['value_type'])
kwargs.pop('value_type')
else:
value_type = self.value_type
if len(kwargs) != 0:
raise ValueError(f'Invalid keywords given : {[key for key in kwargs.keys()]}')
return self.__class__(
name=name,
dependent_variables=dependent_variables,
parameters=parameters,
random_variables=random_variables,
statements=statements,
dataset=dataset,
datainfo=datainfo,
execution_steps=execution_steps,
parent_model=parent_model,
initial_individual_estimates=initial_individual_estimates,
value_type=value_type,
description=description,
internals=internals,
observation_transformation=observation_transformation,
)
def __eq__(self, other: Any) -> bool:
"""Compare two models for equality
Tests whether a model is equal to another model. This ignores
implementation-specific details such as NONMEM $DATA and FILE
pointers, or certain $TABLE printing options.
Parameters
----------
other : Model
Other model to compare this one with
Examples
--------
>>> from pharmpy.model import Model
>>> from pharmpy.modeling import load_example_model
>>> a = load_example_model("pheno")
>>> a == a
True
>>> a == 0
Traceback (most recent call last):
...
NotImplementedError: Cannot compare Model with <class 'int'>
>>> a == None
Traceback (most recent call last):
...
NotImplementedError: Cannot compare Model with <class 'NoneType'>
>>> b = load_example_model("pheno")
>>> b == a
True
>>> a = a.replace(name='a')
>>> b = b.replace(name='b')
>>> a == b
True
"""
if self is other:
return True
if not isinstance(other, Model):
raise NotImplementedError(f'Cannot compare Model with {type(other)}')
if self.parameters != other.parameters:
return False
if self.random_variables != other.random_variables:
return False
if self.statements != other.statements:
return False
if self.dependent_variables != other.dependent_variables:
return False
if self.observation_transformation != other.observation_transformation:
return False
if self.execution_steps != other.execution_steps:
return False
if self.initial_individual_estimates is None:
if other.initial_individual_estimates is not None:
return False
else:
if other.initial_individual_estimates is None:
return False
elif not self.initial_individual_estimates.equals(other.initial_individual_estimates):
return False
if self.datainfo != other.datainfo:
return False
if self.value_type != other.value_type:
return False
return True
@cache_method
def __hash__(self):
dataset_hash = hash_df_runtime(self._dataset) if self._dataset is not None else None
return hash(
(
self._parameters,
self._random_variables,
self._statements,
self._dependent_variables,
self._observation_transformation,
self._execution_steps,
self._initial_individual_estimates,
self._datainfo,
dataset_hash,
self._value_type,
)
)
[docs]
def to_dict(self) -> dict[str, Any]:
if self._initial_individual_estimates is not None:
ie = self._initial_individual_estimates.to_dict()
else:
ie = None
depvars = {str(key): val for key, val in self._dependent_variables.items()}
obstrans = {
key.serialize(): val.serialize()
for key, val in self._observation_transformation.items()
}
return {
'parameters': self._parameters.to_dict(),
'random_variables': self._random_variables.to_dict(),
'statements': self._statements.to_dict(),
'execution_steps': self._execution_steps.to_dict(),
'datainfo': self._datainfo.to_dict(),
'value_type': self._value_type,
'dependent_variables': depvars,
'observation_transformation': obstrans,
'initial_individual_estimates': ie,
}
[docs]
@classmethod
def from_dict(cls, d: dict[str, Any]) -> Model:
ie_dict = d['initial_individual_estimates']
if ie_dict is None:
ie = None
else:
ie = pd.DataFrame.from_dict(ie_dict)
depvars = {Expr.symbol(key): value for key, value in d['dependent_variables'].items()}
obstrans = {
Expr.deserialize(key): Expr.deserialize(val)
for key, val in d['observation_transformation'].items()
}
return cls(
parameters=Parameters.from_dict(d['parameters']),
random_variables=RandomVariables.from_dict(d['random_variables']),
statements=Statements.from_dict(d['statements']),
execution_steps=ExecutionSteps.from_dict(d['execution_steps']),
datainfo=DataInfo.from_dict(d['datainfo']),
value_type=d['value_type'],
dependent_variables=frozenmapping(depvars),
observation_transformation=frozenmapping(obstrans),
initial_individual_estimates=ie,
)
def __repr__(self):
return f'<Pharmpy model object {self.name}>'
def _repr_html_(self) -> str:
stat = self.statements._repr_html_()
rvs = self.random_variables._repr_latex_()
return f'<hr>{stat}<hr>${rvs}$<hr>{self.parameters._repr_html_()}<hr>'
@property
def name(self) -> str:
"""Name of the model"""
return self._name
@property
def dependent_variables(self) -> frozenmapping[Expr, int]:
"""The dependent variables of the model mapped to the corresponding DVIDs"""
return self._dependent_variables
@property
def value_type(self) -> str:
"""The type of the model value (dependent variables)
By default this is set to 'PREDICTION' to mean that the model outputs a prediction.
It could optionally be set to 'LIKELIHOOD' or '-2LL' to let the model output the likelihood
or -2*log(likelihood) of the prediction. If set to a symbol this variable can be used to
change the type for different records. The model would then set this symbol to 0 for
a prediction value, 1 for likelihood and 2 for -2ll.
"""
return self._value_type
@property
def observation_transformation(self) -> frozenmapping[Expr, Expr]:
"""Transformation to be applied to the observation data"""
return self._observation_transformation
@property
def parameters(self) -> Parameters:
"""Definitions of population parameters
See :class:`pharmpy.Parameters`
"""
return self._parameters
@property
def random_variables(self) -> RandomVariables:
"""Definitions of random variables
See :class:`pharmpy.RandomVariables`
"""
return self._random_variables
@property
def statements(self) -> Statements:
"""Definitions of model statements
See :class:`pharmpy.Statements`
"""
return self._statements
@property
def execution_steps(self) -> ExecutionSteps:
"""Definitions of estimation steps
See :class:`pharmpy.ExecutionSteps`
"""
return self._execution_steps
@property
def datainfo(self) -> DataInfo:
"""Definitions of model statements
See :class:`pharmpy.Statements`
"""
return self._datainfo
@property
def dataset(self) -> Optional[pd.DataFrame]:
"""Dataset connected to model"""
return self._dataset
@property
def initial_individual_estimates(self) -> Optional[pd.DataFrame]:
"""Initial estimates for individual parameters"""
return self._initial_individual_estimates
@property
def internals(self) -> Optional[ModelInternals]:
"""Internal data for tool specific part of model"""
return self._internals
@property
def code(self) -> str:
"""Model type specific code"""
d = self.to_dict()
d['__magic__'] = "Pharmpy Model"
d['__version__'] = pharmpy.__version__
return json.dumps(d)
@property
def parent_model(self) -> Optional[str]:
"""Name of parent model"""
return self._parent_model
[docs]
def has_same_dataset_as(self, other: Model) -> bool:
"""Check if this model has the same dataset as another model
Parameters
----------
other : Model
Another model
Returns
-------
bool
True if both models have the same dataset
"""
if self.dataset is None:
if other.dataset is None:
return True
else:
return False
if other.dataset is None:
return False
# NOTE: Rely on duck-typing here (?)
return self.dataset.equals(other.dataset)
@property
def description(self) -> str:
"""A free text description of the model"""
return self._description
[docs]
@staticmethod
def parse_model(path: Union[Path, str]):
"""Create a model object by parsing a model file of any supported type
Parameters
----------
path : Path or str
Path to a model file
Returns
-------
Model
A model object
"""
path = Path(path)
with open(path, 'r', encoding='latin-1') as fp:
code = fp.read()
model_module = detect_model(code)
model = model_module.parse_model(code, path)
return model
[docs]
@staticmethod
def parse_model_from_string(code: str) -> Model:
"""Create a model object by parsing a string with model code of any supported type
Parameters
----------
code : str
Model code
Returns
-------
Model
A model object
"""
model_module = detect_model(code)
model = model_module.parse_model(code, None)
return model
[docs]
def update_source(self) -> Model:
"""Update source code of the model. If any paths need to be changed or added (e.g. for a
NONMEM model with an updated dataset) they will be replaced with DUMMYPATH"""
return self
[docs]
def write_files(self, path: Optional[Union[Path, str]] = None, force: bool = False) -> Model:
"""Write all extra files needed for a specific external format."""
return self
def compare_before_after_params(old, new):
# FIXME: Move this to the right module
before = {}
after = {}
for key, value in old.items():
if new[key] != value:
before[key] = value
after[key] = new[key]
return before, after
def update_datainfo(curdi: DataInfo, dataset: pd.DataFrame) -> DataInfo:
colnames = dataset.columns
columns = []
for colname in colnames:
try:
col = curdi[colname]
except IndexError:
datatype = ColumnInfo.convert_pd_dtype_to_datatype(dataset.dtypes[colname].name)
col = ColumnInfo.create(colname, datatype=datatype)
columns.append(col)
newdi = curdi.replace(columns=columns)
# NOTE: Remove path if dataset has been updated
return curdi if newdi == curdi else newdi.replace(path=None)