from __future__ import annotations
from abc import abstractmethod
from collections.abc import Collection, Hashable, Mapping, Sequence, Sized
from math import sqrt
from typing import Any, Union
import pharmpy.internals.unicode as unicode
from pharmpy.basic import Expr, Matrix, TExpr, TSymbol
from pharmpy.internals.immutable import Immutable
from .numeric import MultivariateNormalDistribution as NumericMultivariateNormalDistribution
from .numeric import NormalDistribution as NumericNormalDistribution
from .numeric import NumericDistribution
[docs]
class Distribution(Sized, Hashable, Immutable):
[docs]
@abstractmethod
def replace(self, **kwargs) -> Distribution:
pass
@property
@abstractmethod
def names(self) -> tuple[str, ...]:
"""Names of random variables of distribution"""
pass
@property
@abstractmethod
def level(self) -> str:
"""Name of VariabilityLevel of the random variables"""
pass
@property
@abstractmethod
def mean(self) -> Union[Expr, Matrix]:
pass
@property
@abstractmethod
def variance(self) -> Union[Expr, Matrix]:
pass
[docs]
@abstractmethod
def get_variance(self, name: str) -> Expr:
pass
[docs]
@abstractmethod
def get_covariance(self, name1: str, name2: str) -> Expr:
pass
[docs]
@abstractmethod
def evalf(self, parameters: dict[Expr, float]) -> NumericDistribution:
pass
@abstractmethod
def __getitem__(self, index) -> Distribution:
pass
@property
@abstractmethod
def free_symbols(self) -> set[Expr]:
pass
@property
def parameter_names(self) -> tuple[str, ...]:
"""List of names of all parameters used in definition"""
params = self.mean.free_symbols.union(self.variance.free_symbols)
return tuple(sorted(map(str, params)))
[docs]
@abstractmethod
def subs(self, d: Mapping[TExpr, TExpr]) -> Distribution:
pass
[docs]
@abstractmethod
def latex_string(self, aligned: bool = False) -> str:
pass
[docs]
@abstractmethod
def to_dict(self) -> dict[str, Any]:
pass
def _repr_latex_(self) -> str:
return self.latex_string()
[docs]
class NormalDistribution(Distribution):
"""Normal distribution for one random variable
Parameters
----------
name : str
Name of the random variable
level : str
Name of the variability level
mean : expression or number
Mean of the random variable
variance : expression or number
Variance of the random variable
Example
-------
>>> from pharmpy.model import NormalDistribution, Parameter
>>> omega = Parameter.create('OMEGA_CL', 0.1)
>>> dist = NormalDistribution.create("IIV_CL", "IIV", 0, omega.symbol)
>>> dist
IIV_CL ~ N(0, OMEGA_CL)
"""
def __init__(self, name: str, level: str, mean: Expr, variance: Expr):
self._name = name
self._level = level
self._mean = mean
self._variance = variance
[docs]
@classmethod
def create(
cls, name: str, level: str, mean: Union[Expr, int, float], variance: Union[Expr, int, float]
):
level = level.upper()
mean = Expr(mean)
variance = Expr(variance)
if variance.is_nonnegative() is False:
raise ValueError("Variance of normal distribution must be non-negative")
return cls(name, level, mean, variance)
[docs]
def replace(self, **kwargs) -> NormalDistribution:
"""Replace properties and create a new NormalDistribution"""
name = kwargs.get('name', self._name)
level = kwargs.get('level', self._level)
mean = kwargs.get('mean', self._mean)
variance = kwargs.get('variance', self._variance)
new = NormalDistribution.create(name, level, mean, variance)
return new
@property
def names(self) -> tuple[str, ...]:
return (self._name,)
@property
def level(self) -> str:
return self._level
@property
def mean(self) -> Expr:
return self._mean
@property
def variance(self) -> Expr:
return self._variance
@property
def free_symbols(self) -> set[Expr]:
"""Free symbols including random variable itself"""
fs = self._mean.free_symbols.union(self._variance.free_symbols)
fs.add(Expr.symbol(self._name))
return fs
[docs]
def subs(self, d: Mapping[TExpr, TExpr]) -> NormalDistribution:
"""Substitute expressions
Parameters
----------
d : dict
Dictionary of from: to pairs for substitution
Examples
--------
>>> import sympy
>>> from pharmpy.model import NormalDistribution, Parameter
>>> omega = Parameter.create("OMEGA_CL", 0.1)
>>> dist = NormalDistribution.create("IIV_CL", "IIV", 0, omega.symbol)
>>> dist = dist.subs({omega.symbol: sympy.Symbol("OMEGA_NEW")})
>>> dist
IIV_CL ~ N(0, OMEGA_NEW)
"""
mean = self._mean.subs(d)
variance = self._variance.subs(d)
name = _subs_name(self._name, d)
return NormalDistribution(name, self._level, mean, variance)
[docs]
def evalf(self, parameters: dict[TSymbol, float]) -> NumericDistribution:
mean = self._mean
variance = self._variance
try:
if mean == 0:
mu = 0
elif mean.is_symbol():
mu = float(parameters[mean])
else:
raise NotImplementedError("Non-supported mean for NormalDistribution")
sigma = 0 if variance == 0 else sqrt(float(parameters[variance]))
return NumericNormalDistribution(mu, sigma)
except KeyError as e:
# NOTE: This handles missing parameter substitutions
raise ValueError(e)
def __getitem__(self, index):
if isinstance(index, int):
if index != 0:
raise IndexError(index)
elif isinstance(index, str):
if index != self._name:
raise KeyError(index)
else:
if isinstance(index, slice):
index = list(
range(index.start, index.stop, index.step if index.step is not None else 1)
)
if index != [0]:
raise IndexError(index)
if isinstance(index, Collection):
if len(index) != 1 or (self._name not in index and 0 not in index):
raise KeyError(index)
else:
raise KeyError(index)
return self
[docs]
def get_variance(self, name: str) -> Expr:
if name != self._name:
raise KeyError(name)
return self._variance
[docs]
def get_covariance(self, name1: str, name2: str) -> Expr:
if name1 == name2 == self._name:
return self._variance
else:
raise KeyError((name1, name2))
def __eq__(self, other: Any):
if not isinstance(other, NormalDistribution):
return NotImplemented
return (
self._name == other._name
and self._level == other._level
and self._mean == other._mean
and self._variance == other._variance
)
def __len__(self):
return 1
def __hash__(self):
return hash((self._name, self._level, self._mean, self._variance))
[docs]
def to_dict(self) -> dict[str, Any]:
return {
'class': self.__class__.__name__,
'name': self._name,
'level': self._level,
'mean': self._mean.serialize(),
'variance': self._variance.serialize(),
}
[docs]
@classmethod
def from_dict(cls, d: Mapping[str, Any]):
return cls(
name=d['name'],
level=d['level'],
mean=Expr.deserialize(d['mean']),
variance=Expr.deserialize(d['variance']),
)
def __repr__(self):
return (
f'{Expr.symbol(self._name).unicode()}'
f' ~ {unicode.mathematical_script_capital_n}'
f'({self._mean.unicode()}, {self._variance.unicode()})'
)
[docs]
def latex_string(self, aligned: bool = False) -> str:
if aligned:
align_str = ' & '
else:
align_str = ''
rv = Expr.symbol(self._name).latex()
mean = self._mean.latex()
sigma = self._variance.latex()
latex = rv + align_str + r'\sim \mathcal{N} \left(' + mean + ',' + sigma + r'\right)'
if not aligned:
latex = '$' + latex + '$'
return latex
[docs]
class JointNormalDistribution(Distribution):
"""Joint distribution of random variables
Parameters
----------
names : list
Names of the random variables
level : str
Variability level
mean : matrix or list
Vector of the means of the random variables
variance : matrix or list of lists
Covariance matrix of the random variables
Example
-------
>>> from pharmpy.model import JointNormalDistribution, Parameter
>>> omega_cl = Parameter("OMEGA_CL", 0.1)
>>> omega_v = Parameter("OMEGA_V", 0.1)
>>> corr_cl_v = Parameter("OMEGA_CL_V", 0.01)
>>> dist = JointNormalDistribution.create(["IIV_CL", "IIV_V"], "IIV", [0, 0],
... [[omega_cl.symbol, corr_cl_v.symbol], [corr_cl_v.symbol, omega_v.symbol]])
>>> dist
⎡IIV_CL⎤ ⎧⎡0⎤ ⎡ OMEGA_CL OMEGA_CL_V⎤⎫
⎢ ⎥ ~ N⎪⎢ ⎥, ⎢ ⎥⎪
⎣IIV_V ⎦ ⎩⎣0⎦ ⎣OMEGA_CL_V OMEGA_V ⎦⎭
"""
def __init__(
self,
names: tuple[str, ...],
level: str,
mean: Matrix,
variance: Matrix,
):
self._names = names
self._level = level
self._mean = mean
self._variance = variance
[docs]
@classmethod
def create(cls, names: Sequence[str], level: str, mean, variance):
names = tuple(names)
level = level.upper()
mean = Matrix(mean)
variance = Matrix(variance)
if variance.is_positive_semidefinite() is False:
raise ValueError(
'Covariance matrix of joint normal distribution is not positive semidefinite'
)
return cls(names, level, mean, variance)
[docs]
def replace(self, **kwargs) -> JointNormalDistribution:
"""Replace properties and create a new JointNormalDistribution"""
names = kwargs.get('names', self._names)
level = kwargs.get('level', self._level)
mean = kwargs.get('mean', self._mean)
variance = kwargs.get('variance', self._variance)
new = JointNormalDistribution.create(names, level, mean, variance)
return new
@property
def names(self) -> tuple[str, ...]:
return self._names
@property
def level(self) -> str:
return self._level
@property
def mean(self) -> Matrix:
return self._mean
@property
def variance(self) -> Matrix:
return self._variance
@property
def free_symbols(self) -> set[Expr]:
"""Free symbols including random variable itself"""
return self._mean.free_symbols.union(
self._variance.free_symbols, (Expr.symbol(name) for name in self._names)
)
[docs]
def subs(self, d: Mapping[TExpr, TExpr]) -> JointNormalDistribution:
"""Substitute expressions
Parameters
----------
d : dict
Dictionary of from: to pairs for substitution
Examples
--------
>>> import sympy
>>> from pharmpy.model import JointNormalDistribution, Parameter
>>> omega1 = Parameter("OMEGA_CL", 0.1)
>>> omega2 = Parameter("OMEGA_V", 0.1)
>>> A = [[omega1.symbol, 0], [0, omega2.symbol]]
>>> dist = JointNormalDistribution.create(["IIV_CL", "IIV_V"], "IIV", [0, 0], A)
>>> dist = dist.subs({omega1.symbol: sympy.Symbol("OMEGA_NEW")})
>>> dist
⎡IIV_CL⎤ ⎧⎡0⎤ ⎡OMEGA_NEW 0 ⎤⎫
⎢ ⎥ ~ N⎪⎢ ⎥, ⎢ ⎥⎪
⎣IIV_V ⎦ ⎩⎣0⎦ ⎣ 0 OMEGA_V⎦⎭
"""
mean = self._mean.subs(d)
variance = self._variance.subs(d)
new_names = tuple(_subs_name(name, d) for name in self._names)
return JointNormalDistribution(new_names, self._level, mean, variance)
[docs]
def evalf(self, parameters: dict[TSymbol, float]) -> NumericDistribution:
try:
mu = self._mean.evalf(parameters)[:, 0]
sigma = self._variance.evalf(parameters)
return NumericMultivariateNormalDistribution(mu, sigma)
except RuntimeError as e:
# NOTE: This handles missing parameter substitutions
raise ValueError(e)
def __getitem__(
self, index: Union[int, str, slice, Collection[Union[int, str]]]
) -> Union[NormalDistribution, JointNormalDistribution]:
if isinstance(index, int):
if -len(self) <= index < len(self):
names = (self._names[index],)
our_index = (index,)
else:
raise IndexError(index)
elif isinstance(index, str):
names = (index,)
try:
our_index = (self._names.index(index),)
except ValueError:
raise KeyError(names[0])
elif isinstance(index, slice):
our_index = range(index.start, index.stop, index.step if index.step is not None else 1)
names = tuple(self._names[i] for i in our_index)
elif isinstance(index, Collection):
if len(index) == 0 or len(index) > len(self._names):
raise KeyError(index)
collection = set(index)
if not collection.issubset(self._names):
raise KeyError(index)
if len(collection) == len(self._names):
return self
index_list: list[int] = []
names_list: list[str] = []
for i, name in enumerate(self._names):
if name in collection or i in collection:
index_list.append(i)
names_list.append(name)
our_index = tuple(index_list)
names = tuple(names_list)
else:
raise KeyError(index)
if len(our_index) == 1:
our_index = our_index[0]
if len(names) == 1:
assert isinstance(our_index, int)
mean = self._mean[our_index, 0]
variance = self._variance[our_index, our_index]
return NormalDistribution(names[0], self._level, mean, variance)
else:
assert not isinstance(our_index, int)
mean = self._mean[our_index, :]
variance = self._variance[our_index, our_index]
return JointNormalDistribution(names, self._level, mean, variance)
[docs]
def get_variance(self, name: str) -> Expr:
i = self._names.index(name)
var = self._variance[i, i]
return var
[docs]
def get_covariance(self, name1: str, name2: str) -> Expr:
i1 = self._names.index(name1)
i2 = self._names.index(name2)
cov = self._variance[i1, i2]
return cov
def __eq__(self, other):
if not isinstance(other, JointNormalDistribution):
return NotImplemented
return (
self._names == other._names
and self._level == other._level
and self._mean == other._mean
and self._variance == other._variance
)
def __len__(self):
return len(self._names)
def __hash__(self):
return hash((self._names, self._level, self._mean, self._variance))
[docs]
def to_dict(self) -> dict[str, Any]:
return {
'class': self.__class__.__name__,
'names': self._names,
'level': self._level,
'mean': self._mean.serialize(),
'variance': self._variance.serialize(),
}
[docs]
@classmethod
def from_dict(cls, d: Mapping[str, Any]):
return cls(
names=d['names'],
level=d['level'],
mean=Matrix.deserialize(d['mean']),
variance=Matrix.deserialize(d['variance']),
)
def __repr__(self):
name_vector = Matrix(self._names)
name_strings = name_vector.unicode().split('\n')
mu_strings = self._mean.unicode().split('\n')
sigma_strings = self._variance.unicode().split('\n')
mu_height = len(mu_strings)
sigma_height = len(sigma_strings)
max_height = max(mu_height, sigma_height)
left_parens = unicode.left_parens(max_height)
right_parens = unicode.right_parens(max_height)
# Pad the smaller of the matrices
if mu_height != sigma_height:
to_pad = mu_strings if mu_height < sigma_height else sigma_strings
num_lines = abs(mu_height - sigma_height)
padding = ' ' * len(to_pad[0])
for i in range(0, num_lines):
if i % 2 == 0:
to_pad.append(padding)
else:
to_pad.insert(0, padding)
# Pad names
if len(name_strings) < max_height:
num_lines = abs(max_height - len(name_strings))
padding = ' ' * len(name_strings[0])
for i in range(0, num_lines):
if i % 2 == 0:
name_strings.append(padding)
else:
name_strings.insert(0, padding)
central_index = max_height // 2
res = []
enumerator = enumerate(
zip(name_strings, left_parens, mu_strings, sigma_strings, right_parens)
)
for i, (name_line, lpar, mu_line, sigma_line, rpar) in enumerator:
if i == central_index:
res.append(
name_line
+ f' ~ {unicode.mathematical_script_capital_n}'
+ lpar
+ mu_line
+ ', '
+ sigma_line
+ rpar
)
else:
res.append(name_line + ' ' + lpar + mu_line + ' ' + sigma_line + rpar)
return '\n'.join(res)
[docs]
def latex_string(self, aligned: bool = False) -> str:
if aligned:
align_str = ' & '
else:
align_str = ''
rv_vec = Matrix(self._names).latex()
mean_vec = self._mean.latex()
sigma = self._variance.latex()
latex = (
rv_vec + align_str + r'\sim \mathcal{N} \left(' + mean_vec + ',' + sigma + r'\right)'
)
if not aligned:
latex = '$' + latex + '$'
return latex
def __getstate__(self):
state = self.__dict__.copy()
return state
def __setstate__(self, state):
self.__dict__.update(state)
def _subs_name(name: str, d: Mapping[TExpr, TExpr]) -> str:
if name in d:
new_name = d[name]
elif (name_symbol := Expr.symbol(name)) in d:
new_name = d[name_symbol]
else:
new_name = name
return new_name if isinstance(new_name, str) else str(new_name)