from __future__ import annotations
from collections.abc import Container as CollectionsContainer
from collections.abc import Iterable, Mapping
from collections.abc import Sequence as CollectionsSequence
from itertools import chain, product
from typing import Any, Collection, Container, Optional, Sequence, Union, overload
from pharmpy.basic import Expr, Matrix, TExpr, TSymbol
from pharmpy.deps import numpy as np
from pharmpy.deps import sympy, sympy_stats
from pharmpy.internals.expr.eval import eval_expr
from pharmpy.internals.expr.parse import parse as parse_expr
from pharmpy.internals.expr.subs import subs, xreplace_dict
from pharmpy.internals.immutable import Immutable
from pharmpy.internals.math import cov2corr, is_positive_semidefinite, nearest_positive_semidefinite
from .distributions.numeric import NumericDistribution
from .distributions.symbolic import Distribution, JointNormalDistribution, NormalDistribution
def _create_rng(seed: Optional[Union[int, np.random.Generator]] = None) -> np.random.Generator:
"""Create a new random number generator"""
if isinstance(seed, np.random.Generator):
return seed
else:
return np.random.default_rng(seed)
[docs]
class VariabilityLevel(Immutable):
"""A variability level
Parameters
----------
name : str
A unique identifying name
reference : bool
Is this the reference level? Normally IIV would be the reference level
group : str
Name of data column to group this level. None for no grouping (default)
"""
def __init__(self, name: str, reference: bool = False, group: Optional[str] = None):
self._name = name
self._reference = reference
self._group = group
[docs]
@classmethod
def create(cls, name: str, reference: bool = False, group: Optional[str] = None):
return VariabilityLevel(name, bool(reference), group)
[docs]
def replace(self, **kwargs) -> VariabilityLevel:
name = kwargs.get('name', self._name)
reference = kwargs.get('reference', self._reference)
group = kwargs.get('group', self._group)
return VariabilityLevel(name, bool(reference), group)
def __eq__(self, other: Any):
if not isinstance(other, VariabilityLevel):
return NotImplemented
return (
self._name == other._name
and self._reference == other._reference
and self._group == other._group
)
def __hash__(self):
return hash((self._name, self._reference, self._group))
[docs]
def to_dict(self) -> dict[str, Any]:
return {'name': self._name, 'reference': self._reference, 'group': self._group}
[docs]
@classmethod
def from_dict(cls, d: dict[str, Any]) -> VariabilityLevel:
return cls(**d)
@property
def name(self) -> str:
"""Name of the variability level"""
return self._name
@property
def reference(self) -> bool:
"""Is this the reference level"""
return self._reference
@property
def group(self) -> Optional[str]:
"""Group variable for variability level"""
return self._group
def __repr__(self):
return f"VariabilityLevel({self._name}, reference={self._reference}, group={self._group})"
[docs]
class VariabilityHierarchy(Immutable):
"""Description of a variability hierarchy"""
def __init__(self, levels: tuple[VariabilityLevel, ...] = ()):
self._levels = levels
[docs]
@classmethod
def create(
cls, levels: Optional[Union[Sequence[VariabilityLevel], VariabilityHierarchy]] = None
):
if levels is None:
levels = ()
elif isinstance(levels, VariabilityHierarchy):
return levels
else:
found_ref = False
for level in levels:
if not isinstance(level, VariabilityLevel):
raise ValueError("Can only add VariabilityLevel to VariabilityHierarchy")
if level.reference:
if found_ref:
raise ValueError("A VariabilityHierarchy can only have one reference level")
else:
found_ref = True
if not found_ref:
raise ValueError("A VariabilityHierarchy must have a reference level")
levels = tuple(levels)
return VariabilityHierarchy(levels)
[docs]
def replace(self, **kwargs) -> VariabilityHierarchy:
"""Replace properties and create a new VariabilityHierarchy object"""
levels = kwargs.get('levels', self._levels)
new = VariabilityHierarchy.create(levels)
return new
def __eq__(self, other: Any):
if not isinstance(other, VariabilityHierarchy):
return NotImplemented
if len(self._levels) != len(other._levels):
return False
else:
for l1, l2 in zip(self._levels, other._levels):
if l1 != l2:
return False
return True
def __hash__(self):
return hash(self._levels)
[docs]
def to_dict(self) -> dict[str, Any]:
levels = tuple(level.to_dict() for level in self._levels)
return {'levels': levels}
[docs]
@classmethod
def from_dict(cls, d: Mapping[str, Any]) -> VariabilityHierarchy:
levels = tuple(VariabilityLevel.from_dict(vl) for vl in d['levels'])
return cls(levels=levels)
def _lookup(self, ind: Union[int, str, VariabilityLevel]) -> VariabilityLevel:
# Lookup one index
if isinstance(ind, int):
# Index on numeric level for ints
i = self._find_reference()
return self._levels[ind - i]
elif isinstance(ind, str):
for varlev in self._levels:
if varlev.name == ind:
return varlev
elif isinstance(ind, VariabilityLevel):
for varlev in self._levels:
if varlev.name == ind.name:
return varlev
raise KeyError(f'Could not find level {ind} in VariabilityHierarchy')
@overload
def __getitem__(self, ind: Union[Sequence, VariabilityHierarchy]) -> VariabilityHierarchy: ...
@overload
def __getitem__(self, ind: Union[int, str, VariabilityLevel]) -> VariabilityLevel: ...
def __getitem__(self, ind):
if isinstance(ind, VariabilityHierarchy):
levels = [level.name for level in ind._levels]
elif not isinstance(ind, str) and isinstance(ind, CollectionsSequence):
levels = ind
else:
return self._lookup(ind)
new = [self._lookup(level) for level in levels]
return VariabilityHierarchy.create(new)
def __add__(self, other: VariabilityLevel) -> VariabilityHierarchy:
if isinstance(other, VariabilityLevel):
levels = (other,)
else:
raise ValueError(f"Cannot add {other} to VariabilityHierarchy")
new = VariabilityHierarchy.create(self._levels + levels)
return new
def __radd__(self, other: VariabilityLevel) -> VariabilityHierarchy:
if isinstance(other, VariabilityLevel):
return VariabilityHierarchy.create((other,) + self._levels)
else:
raise ValueError(f"Cannot add {other} to VariabilityLevel")
@property
def names(self) -> list[str]:
"""Names of all variability levels"""
return [varlev.name for varlev in self._levels]
def _find_reference(self) -> int:
# Find numerical level of first level
# No error checking since having a reference level is an invariant
return next( # pragma: no cover
(-i for i, level in enumerate(self._levels) if level.reference)
)
@property
def levels(self) -> dict[str, int]:
"""Dictionary of variability level name to numerical level"""
ind = self._find_reference()
d = {}
for level in self._levels:
d[level.name] = ind
ind += 1
return d
def __len__(self):
return len(self._levels)
def __contains__(self, value: str):
return value in self.names
[docs]
class RandomVariables(CollectionsSequence, Immutable):
"""A collection of distributions of random variables
This class provides a container for random variables that preserves their order
and acts list-like while also allowing for indexing on names.
Each RandomVariables object has two VariabilityHierarchies that describes the
allowed variability levels for the contined random variables. One hierarchy
for residual error variability (epsilons) and one for parameter variability (etas).
By default the eta hierarchy has the two levels IIV and IOV and the epsilon
hierarchy has one single level.
Parameters
----------
rvs : list
A list of RandomVariable to add. Default is to create an empty RandomVariables.
Examples
--------
>>> from pharmpy.model import RandomVariables, NormalDistribution, Parameter
>>> omega = Parameter("OMEGA_CL", 0.1)
>>> dist = NormalDistribution.create("IIV_CL", "iiv", 0, omega.symbol)
>>> rvs = RandomVariables.create([dist])
"""
def __init__(
self,
dists: tuple[Distribution, ...],
eta_levels: VariabilityHierarchy,
epsilon_levels: VariabilityHierarchy,
):
self._dists = dists
self._eta_levels = eta_levels
self._epsilon_levels = epsilon_levels
[docs]
@classmethod
def create(
cls,
dists: Optional[Union[Sequence[Distribution], Distribution]] = None,
eta_levels: Optional[VariabilityHierarchy] = None,
epsilon_levels: Optional[VariabilityHierarchy] = None,
):
if dists is None:
dists = ()
elif isinstance(dists, Distribution):
dists = (dists,)
else:
dists = tuple(dists)
names = set()
for dist in dists:
if not isinstance(dist, Distribution):
raise TypeError(f'Can not add variable of type {type(dist)} to RandomVariables')
for name in dist.names:
if name in names:
raise ValueError(
f'Names of random variables must be unique. Random Variable "{name}" '
'was added more than once to RandomVariables'
)
names.add(name)
if eta_levels is None:
iiv_level = VariabilityLevel('IIV', reference=True, group='ID')
iov_level = VariabilityLevel('IOV', reference=False, group='OCC')
eta_levels = VariabilityHierarchy((iiv_level, iov_level))
else:
if not isinstance(eta_levels, VariabilityHierarchy):
raise TypeError(
f'Type of eta_levels must be a VariabilityHierarchy not a {type(eta_levels)}'
)
if epsilon_levels is None:
ruv_level = VariabilityLevel('RUV', reference=True)
epsilon_levels = VariabilityHierarchy((ruv_level,))
else:
if not isinstance(epsilon_levels, VariabilityHierarchy):
raise TypeError(
f'Type of epsilon_levels must be a VariabilityHierarchy not a {type(epsilon_levels)}'
)
return cls(dists, eta_levels, epsilon_levels)
[docs]
def replace(self, **kwargs) -> RandomVariables:
dists = kwargs.get('dists', self._dists)
eta_levels = kwargs.get('eta_levels', self._eta_levels)
epsilon_levels = kwargs.get('epsilon_levels', self._epsilon_levels)
return RandomVariables.create(dists, eta_levels, epsilon_levels)
@property
def eta_levels(self) -> VariabilityHierarchy:
"""VariabilityHierarchy for all etas"""
return self._eta_levels
@property
def epsilon_levels(self) -> VariabilityHierarchy:
"""VariabilityHierarchy for all epsilons"""
return self._epsilon_levels
def __add__(
self, other: Union[Distribution, RandomVariables, Sequence[Distribution]]
) -> RandomVariables:
if isinstance(other, Distribution):
if other.level not in self._eta_levels and other.level not in self._epsilon_levels:
raise ValueError(
"Level of added distribution is not available in any variability hierarchy"
)
return RandomVariables(self._dists + (other,), self._eta_levels, self._epsilon_levels)
elif isinstance(other, RandomVariables):
if (
self._eta_levels != other._eta_levels
or self._epsilon_levels != other._epsilon_levels
):
raise ValueError("RandomVariables must have same variability hierarchies")
return RandomVariables(
self._dists + other._dists, self._eta_levels, self._epsilon_levels
)
else:
try:
dists = tuple(other)
except TypeError:
raise TypeError(f'Type {type(other)} cannot be added to RandomVariables')
else:
return RandomVariables(self._dists + dists, self._eta_levels, self._epsilon_levels)
def __radd__(self, other: Union[Distribution, Sequence[Distribution]]) -> RandomVariables:
if isinstance(other, Distribution):
if other.level not in self._eta_levels and other.level not in self._epsilon_levels:
raise ValueError(
f"Level {other.level} of added distribution is not available in any variability hierarchy"
)
return RandomVariables((other,) + self._dists, self._eta_levels, self._epsilon_levels)
else:
try:
dists = tuple(other)
except TypeError:
raise TypeError(f'Type {type(other)} cannot be added to RandomVariables')
else:
return RandomVariables(dists + self._dists, self._eta_levels, self._epsilon_levels)
def __len__(self):
return len(self._dists)
@property
def nrvs(self) -> int:
n = 0
for dist in self._dists:
n += len(dist)
return n
def __eq__(self, other: Any):
if self is other:
return True
if not isinstance(other, RandomVariables):
return NotImplemented
if len(self) == len(other):
for s, o in zip(self._dists, other._dists):
if s != o:
return False
return (
self._eta_levels == other._eta_levels
and self._epsilon_levels == other._epsilon_levels
)
return False
def __hash__(self):
return hash((self._dists, self._eta_levels, self._epsilon_levels))
[docs]
def to_dict(self) -> dict[str, Any]:
dists = tuple(d.to_dict() for d in self._dists)
return {
'dists': dists,
'eta_levels': self._eta_levels.to_dict(),
'epsilon_levels': self._epsilon_levels.to_dict(),
}
[docs]
@classmethod
def from_dict(cls, d: Mapping[str, Any]) -> RandomVariables:
eta_levels = VariabilityHierarchy.from_dict(d['eta_levels'])
epsilon_levels = VariabilityHierarchy.from_dict(d['epsilon_levels'])
dists = []
for dist_dict in d['dists']:
if dist_dict['class'] == 'NormalDistribution':
dist = NormalDistribution.from_dict(dist_dict)
else:
dist = JointNormalDistribution.from_dict(dist_dict)
dists.append(dist)
return cls(dists=tuple(dists), eta_levels=eta_levels, epsilon_levels=epsilon_levels)
def _lookup_rv(self, ind: TSymbol):
if isinstance(ind, Expr) and ind.is_symbol():
ind = ind.name
if isinstance(ind, str):
for i, dist in enumerate(self._dists):
if ind in dist.names:
return i, dist
raise KeyError(f'Could not find {ind} in RandomVariables')
@overload
def __getitem__(self, ind: Union[int, str, Expr]) -> Distribution: ...
@overload
def __getitem__(self, ind: Union[slice, Container[Union[str, Expr]]]) -> RandomVariables: ...
def __getitem__(self, ind):
if isinstance(ind, int):
return self._dists[ind]
elif isinstance(ind, slice):
return RandomVariables(
self._dists[ind.start : ind.stop : ind.step], self._eta_levels, self._epsilon_levels
)
elif not isinstance(ind, str) and isinstance(ind, CollectionsContainer):
remove = [
name for name in self.names if not ((name in ind) or (Expr.symbol(name) in ind))
]
split = self.unjoin(remove)
keep = tuple(
dist
for dist in split._dists
if dist.names[0] in ind or Expr.symbol(dist.names[0]) in ind
)
return RandomVariables(keep, self._eta_levels, self._epsilon_levels)
else:
_, rv = self._lookup_rv(ind)
return rv
def __contains__(self, ind: TSymbol):
try:
self._lookup_rv(ind)
return True
except KeyError:
return False
@property
def names(self) -> list[str]:
"""List of the names of all random variables"""
return list(chain.from_iterable(dist.names for dist in self._dists))
@property
def symbols(self) -> list[Expr]:
"""List with symbols for all random variables"""
return [Expr.symbol(name) for name in self.names]
@property
def epsilons(self) -> RandomVariables:
"""Get only the epsilons"""
return RandomVariables(
tuple(dist for dist in self._dists if dist.level in self._epsilon_levels.names),
self._eta_levels,
self._epsilon_levels,
)
@property
def etas(self) -> RandomVariables:
"""Get only the etas"""
return RandomVariables(
tuple(dist for dist in self._dists if dist.level in self._eta_levels.names),
self._eta_levels,
self._epsilon_levels,
)
@property
def iiv(self) -> RandomVariables:
"""Get only the iiv etas, i.e. etas with variability level 0"""
return RandomVariables(
tuple(dist for dist in self._dists if dist.level == self._eta_levels[0].name),
self._eta_levels,
self._epsilon_levels,
)
@property
def iov(self) -> RandomVariables:
"""Get only the iov etas, i.e. etas with variability level 1"""
return RandomVariables(
tuple(dist for dist in self._dists if dist.level == self._eta_levels[1].name),
self._eta_levels,
self._epsilon_levels,
)
@property
def free_symbols(self) -> set[Expr]:
"""Set of free symbols for all random variables"""
return set().union(*(dist.free_symbols for dist in self._dists))
@property
def parameter_names(self) -> tuple[str, ...]:
"""List of parameter names for all random variables"""
params = set().union(*(dist.parameter_names for dist in self._dists))
return tuple(sorted(map(str, params)))
@property
def variance_parameters(self) -> list[str]:
"""List of all parameters representing variance for all random variables"""
parameters = []
for dist in self._dists:
if isinstance(dist, NormalDistribution):
p = dist.variance
if p not in parameters:
parameters.append(p)
else:
assert isinstance(dist, JointNormalDistribution)
for p in dist.variance.diagonal():
if p not in parameters:
parameters.append(p)
return [p.name for p in parameters]
[docs]
def get_covariance(self, rv1: TSymbol, rv2: TSymbol) -> Expr:
"""Get covariance between two random variables"""
rv1 = Expr(rv1)
rv2 = Expr(rv2)
_, dist1 = self._lookup_rv(rv1)
_, dist2 = self._lookup_rv(rv2)
if dist1 is not dist2:
return Expr.integer(0)
else:
name1 = rv1.name if not isinstance(rv1, str) else rv1
name2 = rv2.name if not isinstance(rv2, str) else rv2
return dist1.get_covariance(name1, name2)
[docs]
def subs(self, d: Mapping[TExpr, TExpr]) -> RandomVariables:
"""Substitute expressions
Parameters
----------
d : dict
Dictionary of from: to pairs for substitution
Examples
--------
>>> from pharmpy.basic import Expr
>>> from pharmpy.model import NormalDistribution, RandomVariables, Parameter
>>> omega = Parameter("OMEGA_CL", 0.1)
>>> dist = NormalDistribution.create("IIV_CL", "IIV", 0, omega.symbol)
>>> rvs = RandomVariables.create([dist])
>>> rvs.subs({omega.symbol: Expr.symbol("OMEGA_NEW")})
IIV_CL ~ N(0, OMEGA_NEW)
"""
new_dists = tuple(dist.subs(d) for dist in self._dists)
return self.replace(dists=new_dists)
[docs]
def unjoin(self, inds: Union[str, Expr, Iterable[Union[str, Expr]]]) -> RandomVariables:
"""Remove all covariances the random variables have with other random variables
Parameters
----------
inds
One or multiple names or symbols to unjoin
Examples
--------
>>> from pharmpy.model import RandomVariables, JointNormalDistribution, Parameter
>>> omega_cl = Parameter("OMEGA_CL", 0.1)
>>> omega_v = Parameter("OMEGA_V", 0.1)
>>> corr_cl_v = Parameter("OMEGA_CL_V", 0.01)
>>> dist1 = JointNormalDistribution.create(["IIV_CL", "IIV_V"], 'IIV', [0, 0],
... [[omega_cl.symbol, corr_cl_v.symbol], [corr_cl_v.symbol, omega_v.symbol]])
>>> rvs = RandomVariables.create([dist1])
>>> rvs.unjoin('IIV_CL')
IIV_CL ~ N(0, OMEGA_CL)
IIV_V ~ N(0, OMEGA_V)
See Also
--------
join
"""
if isinstance(inds, Expr) and not inds.is_symbol():
raise ValueError("Expression must be symbol")
if isinstance(inds, str) or isinstance(inds, Expr):
inds = [inds]
inds = [ind.name if not isinstance(ind, str) else ind for ind in inds]
newdists = []
for dist in self._dists:
first = True
keep = None
if isinstance(dist, JointNormalDistribution) and any(
item in dist.names for item in inds
):
for i, name in enumerate(dist.names):
if name in inds: # unjoin this
mean = dist.mean[i]
variance = dist.variance[i, i]
new = NormalDistribution(name, dist.level, mean, variance)
newdists.append(new)
elif first: # first of the ones to keep
first = False
remove = [i for i, n in enumerate(dist.names) if n in inds]
if len(dist) - len(remove) == 1:
mean = dist.mean[i]
variance = dist.variance[i, i]
keep = NormalDistribution(name, dist.level, mean, variance)
else:
names = list(dist.names)
mean = sympy.Matrix(dist.mean)
variance = sympy.Matrix(dist.variance)
for i in reversed(remove):
del names[i]
mean.row_del(i)
variance.row_del(i)
variance.col_del(i)
keep = JointNormalDistribution(
tuple(names),
dist.level,
Matrix(mean),
Matrix(variance),
)
if keep is not None:
newdists.append(keep)
else:
newdists.append(dist)
new_rvs = RandomVariables(tuple(newdists), self._eta_levels, self._epsilon_levels)
return new_rvs
[docs]
def join(
self,
inds: Collection[Union[str, Expr]],
fill: Union[int, float, Expr] = 0,
name_template: Optional[str] = None,
param_names: Optional[list[str]] = None,
) -> tuple[RandomVariables, dict[str, tuple[str, str]]]:
"""Join random variables together into one joint distribution
Set new covariances (and previous 0 covs) to 'fill'.
All joined random variables will form a new joint normal distribution and
if they were part of previous joint normal distributions they will be taken out
from these and the remaining variables will be stay.
Parameters
----------
inds
Indices of variables to join
fill : value
Value to use for new covariances. Default is 0
name_template : str
A string template to use for new covariance symbols.
Using this option will override fill.
param_names : list
List of parameter names to be used together with
name_template.
Returns
-------
A tuple of a the new RandomVariables and a dictionary from newly created covariance parameter names to
tuple of parameter names. Empty dictionary if no parameter symbols were created
Examples
--------
>>> from pharmpy.model import RandomVariables, NormalDistribution, Parameter
>>> omega_cl = Parameter("OMEGA_CL", 0.1)
>>> omega_v = Parameter("OMEGA_V", 0.1)
>>> dist1 = NormalDistribution.create("IIV_CL", "IIV", 0, omega_cl.symbol)
>>> dist2 = NormalDistribution.create("IIV_V", "IIV", 0, omega_v.symbol)
>>> rvs = RandomVariables.create([dist1, dist2])
>>> rvs, _ = rvs.join(['IIV_CL', 'IIV_V'])
>>> rvs
⎡IIV_CL⎤ ⎧⎡0⎤ ⎡OMEGA_CL 0 ⎤⎫
⎢ ⎥ ~ N⎪⎢ ⎥, ⎢ ⎥⎪
⎣IIV_V ⎦ ⎩⎣0⎦ ⎣ 0 OMEGA_V⎦⎭
See Also
--------
unjoin
"""
if any(item not in self.names for item in inds):
raise KeyError("Cannot join non-existing random variable")
joined_rvs = self[inds]
assert isinstance(joined_rvs, RandomVariables)
means, M, names = joined_rvs._calc_covariance_matrix()
cov_to_params = {}
if fill != 0:
for row, col in product(range(M.rows), range(M.cols)):
if M[row, col] == 0:
M[row, col] = fill
elif name_template:
for row, col in product(range(M.rows), range(M.cols)):
if M[row, col] == 0 and row > col:
param_1, param_2 = M[row, row], M[col, col]
assert isinstance(param_names, list)
cov_name = name_template.format(param_names[col], param_names[row])
cov_to_params[cov_name] = (str(param_1), str(param_2))
M[row, col], M[col, row] = sympy.Symbol(cov_name), sympy.Symbol(cov_name)
joined_dist = JointNormalDistribution(
tuple(names),
joined_rvs[0].level,
Matrix(means),
Matrix(M),
)
unjoined_rvs = self.unjoin(inds)
newdists = []
first = True
for dist in unjoined_rvs._dists:
if any(item in dist.names for item in inds):
if first:
first = False
newdists.append(joined_dist)
else:
newdists.append(dist)
new_rvs = RandomVariables(tuple(newdists), self._eta_levels, self._epsilon_levels)
return new_rvs, cov_to_params
[docs]
def nearest_valid_parameters(self, parameter_values: Mapping[str, float]) -> dict[str, float]:
"""Force parameter values into being valid
As small changes as possible
returns a dict with the valid parameter values
"""
nearest = dict(parameter_values) # copy
for dist in self._dists:
if isinstance(dist, JointNormalDistribution):
symb_sigma = dist.variance
sigma = symb_sigma.subs(dict(parameter_values))
A = sigma.to_numpy()
B = nearest_positive_semidefinite(A)
if B is not A:
for row in range(len(A)):
for col in range(row + 1):
elt = symb_sigma[row, col]
nearest[elt.name] = B[row, col]
return nearest
[docs]
def validate_parameters(self, parameter_values: Mapping[str, float]) -> bool:
"""Validate a dict or Series of parameter values
Currently checks that all covariance matrices are positive semidefinite
"""
for dist in self._dists:
if isinstance(dist, JointNormalDistribution):
sigma = dist.variance
sigma = sigma.subs(parameter_values)
a = sigma.to_numpy()
if not is_positive_semidefinite(a):
return False
return True
[docs]
def sample(
self,
expr,
parameters: Optional[Mapping[str, float]] = None,
samples: int = 1,
rng: Optional[np.random.Generator] = None,
) -> np.ndarray:
"""Sample from the distribution of expr
parameters in the distributions will first be replaced"""
sympified_expr = parse_expr(expr)
xreplace_parameters = {} if parameters is None else xreplace_dict(parameters)
return _sample_from_distributions(
self,
sympified_expr,
xreplace_parameters,
samples,
_create_rng(rng),
)
def _calc_covariance_matrix(self) -> tuple[list[Expr], sympy.Matrix, list[str]]:
means = []
names = []
n = 0
for dist in self._dists:
names.extend(dist.names)
n += len(dist)
M = sympy.zeros(n)
if not names:
return means, M, names
row = 0
col = 0
for dist in self._dists:
if isinstance(dist, NormalDistribution):
means.append(dist.mean)
M[row, col] = dist.variance
row += 1
col += 1
else:
assert isinstance(dist, JointNormalDistribution)
means.extend(dist.mean)
var = dist.variance
for i in range(var.rows):
for j in range(var.cols):
M[row + i, col + j] = var[i, j]
row += var.rows
col += var.cols
return means, M, names
@property
def covariance_matrix(self) -> Matrix:
"""Covariance matrix of all random variables"""
_, M, _ = self._calc_covariance_matrix()
return Matrix(M)
def __repr__(self):
return '\n'.join(map(repr, self._dists))
def _repr_latex_(self) -> str:
lines = (dist.latex_string(aligned=True) for dist in self._dists)
return '\\begin{align*}\n' + r' \\ '.join(lines) + '\\end{align*}'
[docs]
def parameters_sdcorr(self, values: Mapping[str, float]) -> dict[str, float]:
"""Convert parameter values to sd/corr form
All parameter values will be converted to sd/corr assuming
they are given in var/cov form.
Parameters
----------
values : dict
Dict of parameter names to values
"""
newdict = dict(values)
for dist in self._dists:
if isinstance(dist, JointNormalDistribution):
sigma_sym = dist.variance
sigma = sigma_sym.subs(values).to_numpy()
corr = cov2corr(sigma)
for i in range(sigma_sym.rows):
for j in range(sigma_sym.cols):
elt = sigma_sym[i, j]
name = elt.name
if i != j:
newdict[name] = corr[i, j]
else:
newdict[name] = np.sqrt(sigma[i, j])
else:
assert isinstance(dist, NormalDistribution)
variance = dist.variance
if variance.is_symbol():
name = variance.name
else:
raise NotImplementedError(
"parameters_sdcorr only supports pure symbols as variances"
)
if name in newdict:
newdict[name] = float(
np.sqrt(np.array(variance.subs(values)).astype(np.float64))
)
return newdict
[docs]
def get_rvs_with_same_dist(self, rv: Union[str, sympy.Symbol]) -> RandomVariables:
"""Gets random variables with the same distribution as input random variable
The resulting RandomVariables objects includes the input random variable.
Parameters
----------
rv : str
Name of random variable
Returns
-------
RandomVariables
RandomVariables object with all distributions as input random variable (including input)
"""
_, dist_input = self._lookup_rv(rv)
rvs = [dist for dist in self if dist.variance == dist_input.variance]
return RandomVariables.create(rvs)
[docs]
def replace_with_sympy_rvs(self, expr: Expr) -> sympy.Expr:
"""Replaces Pharmpy RVs in a Sympy expression with Sympy RVs
Takes a Sympy expression and replaces all RVs with Sympy RVs, resulting expression
can be used in different Sympy functions (e.g. sympy.stats.std())
Parameters
----------
expr : sympy.Expr
Expression which will get RVs replaced
Returns
-------
sympy.Expr
Expression with replaced RVs
"""
rvs_in_expr = {self[rv] for rv in expr.free_symbols.intersection(self.symbols)}
subs_dict = {}
for i, rv in enumerate(rvs_in_expr):
if isinstance(rv, JointNormalDistribution):
# sympy.stats.MultivariateNormal uses variance, sympy.stats.Normal takes std
dist = sympy_stats.MultivariateNormal(f'__rv{i}', rv.mean, rv.variance)
for j in range(0, len(rv.names)):
subs_dict[rv.names[j]] = dist[j] # pyright: ignore reportIndexIssue
else:
subs_dict[rv.names[0]] = sympy_stats.Normal(
f'__rv{i}', rv.mean, sympy.sqrt(rv.variance)
)
sympy_expr = sympy.sympify(expr).subs(subs_dict)
return sympy_expr
def _sample_from_distributions(distributions, expr, parameters, nsamples, rng):
random_variable_symbols = expr.free_symbols.difference(parameters.keys())
filtered_distributions = filter_distributions(distributions, random_variable_symbols)
sampling_rvs = subs_distributions(filtered_distributions, parameters)
sampled_expr = subs(expr, parameters, simultaneous=True)
return sample_expr_from_rvs(sampling_rvs, sampled_expr, nsamples, rng)
def filter_distributions(
distributions: Iterable[Distribution], symbols: set[sympy.Symbol]
) -> Iterable[Distribution]:
covered_symbols = set()
for dist in distributions:
rvs_covered_by_dist = tuple(rv for rv in dist.names if sympy.Symbol(rv) in symbols)
if rvs_covered_by_dist:
yield dist[rvs_covered_by_dist]
covered_symbols.update(sympy.Symbol(rv) for rv in rvs_covered_by_dist)
if covered_symbols != symbols:
raise ValueError('Could not cover all requested symbols with given distributions')
def subs_distributions(
distributions: Iterable[Distribution], parameters: dict[sympy.Expr, float]
) -> Iterable[tuple[tuple[sympy.Expr, ...], NumericDistribution]]:
for dist in distributions:
rvs_symbols = tuple(map(sympy.Symbol, dist.names))
numeric_distribution = dist.evalf(parameters) # pyright: ignore reportArgumentType
yield (rvs_symbols, numeric_distribution)
def sample_expr_from_rvs(
sampling_rvs: Iterable[tuple[tuple[sympy.Expr, ...], NumericDistribution]],
expr: sympy.Expr,
nsamples: int,
rng,
):
samples = sample_rvs(sampling_rvs, nsamples, rng)
return eval_expr(expr, nsamples, samples)
def sample_rvs(
sampling_rvs: Iterable[tuple[tuple[sympy.Expr, ...], NumericDistribution]],
nsamples: int,
rng,
) -> dict[sympy.Expr, np.ndarray]:
data = {}
for symbols, distribution in sampling_rvs:
cursample = distribution.sample(rng, nsamples)
if len(symbols) > 1:
# NOTE: This makes column iteration faster
cursample = np.array(cursample, order='F')
for j, s in enumerate(symbols):
data[s] = cursample[:, j]
else:
assert len(symbols) > 0
data[symbols[0]] = cursample
return data