Source code for pharmpy.model.distributions.symbolic

from __future__ import annotations

from abc import abstractmethod
from collections.abc import Collection, Hashable, Mapping, Sequence, Sized
from math import sqrt
from typing import Any, Union

import pharmpy.internals.unicode as unicode
from pharmpy.basic import Expr, Matrix, TExpr, TSymbol
from pharmpy.internals.immutable import Immutable, frozenmapping

from .numeric import FiniteDistribution as NumericFiniteDistribution
from .numeric import MultivariateNormalDistribution as NumericMultivariateNormalDistribution
from .numeric import NormalDistribution as NumericNormalDistribution
from .numeric import NumericDistribution


[docs] class Distribution(Sized, Hashable, Immutable):
[docs] @abstractmethod def replace(self, **kwargs) -> Distribution: pass
@property @abstractmethod def names(self) -> tuple[str, ...]: """Names of random variables of distribution""" pass @property @abstractmethod def level(self) -> str: """Name of VariabilityLevel of the random variables""" pass @property @abstractmethod def mean(self) -> Union[Expr, Matrix]: pass @property @abstractmethod def variance(self) -> Union[Expr, Matrix]: pass
[docs] @abstractmethod def get_variance(self, name: str) -> Expr: pass
[docs] @abstractmethod def get_covariance(self, name1: str, name2: str) -> Expr: pass
[docs] @abstractmethod def evalf(self, parameters: dict[Expr, float]) -> NumericDistribution: pass
def __len__(self) -> int: # NOTE: This needs to be overridden for joint distributions return 1 @abstractmethod def __getitem__(self, item) -> Distribution: pass @property @abstractmethod def free_symbols(self) -> set[Expr]: pass @property def parameter_names(self) -> tuple[str, ...]: """List of names of all parameters used in definition""" params = self.mean.free_symbols.union(self.variance.free_symbols) return tuple(sorted(map(str, params)))
[docs] @abstractmethod def subs(self, d: Mapping[TExpr, TExpr]) -> Distribution: pass
[docs] @abstractmethod def latex_string(self, aligned: bool = False) -> str: pass
[docs] @abstractmethod def to_dict(self) -> dict[str, Any]: pass
def _repr_latex_(self) -> str: return self.latex_string()
[docs] class NormalDistribution(Distribution): """Normal distribution for one random variable Parameters ---------- name : str Name of the random variable level : str Name of the variability level mean : expression or number Mean of the random variable variance : expression or number Variance of the random variable Example ------- >>> from pharmpy.model import NormalDistribution, Parameter >>> omega = Parameter.create('OMEGA_CL', 0.1) >>> dist = NormalDistribution.create("IIV_CL", "IIV", 0, omega.symbol) >>> dist IIV_CL ~ N(0, OMEGA_CL) """ def __init__(self, name: str, level: str, mean: Expr, variance: Expr): self._name = name self._level = level self._mean = mean self._variance = variance
[docs] @classmethod def create( cls, name: str, level: str, mean: Union[Expr, int, float], variance: Union[Expr, int, float] ): level = level.upper() mean = Expr(mean) variance = Expr(variance) if variance.is_nonnegative() is False: raise ValueError("Variance of normal distribution must be non-negative") return cls(name, level, mean, variance)
[docs] def replace(self, **kwargs) -> NormalDistribution: """Replace properties and create a new NormalDistribution""" name = kwargs.get('name', self._name) level = kwargs.get('level', self._level) mean = kwargs.get('mean', self._mean) variance = kwargs.get('variance', self._variance) new = NormalDistribution.create(name, level, mean, variance) return new
@property def names(self) -> tuple[str, ...]: return (self._name,) @property def level(self) -> str: return self._level @property def mean(self) -> Expr: return self._mean @property def variance(self) -> Expr: return self._variance @property def free_symbols(self) -> set[Expr]: """Free symbols including random variable itself""" fs = self._mean.free_symbols.union(self._variance.free_symbols) fs.add(Expr.symbol(self._name)) return fs
[docs] def subs(self, d: Mapping[TExpr, TExpr]) -> NormalDistribution: """Substitute expressions Parameters ---------- d : dict Dictionary of from: to pairs for substitution Examples -------- >>> import sympy >>> from pharmpy.model import NormalDistribution, Parameter >>> omega = Parameter.create("OMEGA_CL", 0.1) >>> dist = NormalDistribution.create("IIV_CL", "IIV", 0, omega.symbol) >>> dist = dist.subs({omega.symbol: sympy.Symbol("OMEGA_NEW")}) >>> dist IIV_CL ~ N(0, OMEGA_NEW) """ mean = self._mean.subs(d) variance = self._variance.subs(d) name = _subs_name(self._name, d) return NormalDistribution(name, self._level, mean, variance)
[docs] def evalf(self, parameters: dict[TSymbol, float]) -> NumericDistribution: mean = self._mean variance = self._variance try: if mean == 0: mu = 0 elif mean.is_symbol(): mu = float(parameters[mean]) else: raise NotImplementedError("Non-supported mean for NormalDistribution") sigma = 0 if variance == 0 else sqrt(float(parameters[variance])) return NumericNormalDistribution(mu, sigma) except KeyError as e: # NOTE: This handles missing parameter substitutions raise ValueError(e)
[docs] def get_variance(self, name: str) -> Expr: if name != self._name: raise KeyError(name) return self._variance
[docs] def get_covariance(self, name1: str, name2: str) -> Expr: if name1 == name2 == self._name: return self._variance else: raise KeyError((name1, name2))
def __getitem__(self, index) -> Distribution: return _getitem_single(self, index) def __eq__(self, other: Any): if not isinstance(other, NormalDistribution): return NotImplemented return ( self._name == other._name and self._level == other._level and self._mean == other._mean and self._variance == other._variance ) def __hash__(self): return hash((self._name, self._level, self._mean, self._variance))
[docs] def to_dict(self) -> dict[str, Any]: return { 'class': self.__class__.__name__, 'name': self._name, 'level': self._level, 'mean': self._mean.serialize(), 'variance': self._variance.serialize(), }
[docs] @classmethod def from_dict(cls, d: Mapping[str, Any]): return cls( name=d['name'], level=d['level'], mean=Expr.deserialize(d['mean']), variance=Expr.deserialize(d['variance']), )
def __repr__(self): return ( f'{Expr.symbol(self._name).unicode()}' f' ~ {unicode.mathematical_script_capital_n}' f'({self._mean.unicode()}, {self._variance.unicode()})' )
[docs] def latex_string(self, aligned: bool = False) -> str: if aligned: align_str = ' & ' else: align_str = '' rv = Expr.symbol(self._name).latex() mean = self._mean.latex() sigma = self._variance.latex() latex = rv + align_str + r'\sim \mathcal{N} \left(' + mean + ',' + sigma + r'\right)' if not aligned: latex = '$' + latex + '$' return latex
[docs] class JointNormalDistribution(Distribution): """Joint distribution of random variables Parameters ---------- names : list Names of the random variables level : str Variability level mean : matrix or list Vector of the means of the random variables variance : matrix or list of lists Covariance matrix of the random variables Example ------- >>> from pharmpy.model import JointNormalDistribution, Parameter >>> omega_cl = Parameter("OMEGA_CL", 0.1) >>> omega_v = Parameter("OMEGA_V", 0.1) >>> corr_cl_v = Parameter("OMEGA_CL_V", 0.01) >>> dist = JointNormalDistribution.create(["IIV_CL", "IIV_V"], "IIV", [0, 0], ... [[omega_cl.symbol, corr_cl_v.symbol], [corr_cl_v.symbol, omega_v.symbol]]) >>> dist ⎡IIV_CL⎤ ⎧⎡0⎤ ⎡ OMEGA_CL OMEGA_CL_V⎤⎫ ⎢ ⎥ ~ N⎪⎢ ⎥, ⎢ ⎥⎪ ⎣IIV_V ⎦ ⎩⎣0⎦ ⎣OMEGA_CL_V OMEGA_V ⎦⎭ """ def __init__( self, names: tuple[str, ...], level: str, mean: Matrix, variance: Matrix, ): self._names = names self._level = level self._mean = mean self._variance = variance
[docs] @classmethod def create(cls, names: Sequence[str], level: str, mean, variance): names = tuple(names) level = level.upper() mean = Matrix(mean) variance = Matrix(variance) if variance.is_positive_semidefinite() is False: raise ValueError( 'Covariance matrix of joint normal distribution is not positive semidefinite' ) return cls(names, level, mean, variance)
[docs] def replace(self, **kwargs) -> JointNormalDistribution: """Replace properties and create a new JointNormalDistribution""" names = kwargs.get('names', self._names) level = kwargs.get('level', self._level) mean = kwargs.get('mean', self._mean) variance = kwargs.get('variance', self._variance) new = JointNormalDistribution.create(names, level, mean, variance) return new
@property def names(self) -> tuple[str, ...]: return self._names @property def level(self) -> str: return self._level @property def mean(self) -> Matrix: return self._mean @property def variance(self) -> Matrix: return self._variance @property def free_symbols(self) -> set[Expr]: """Free symbols including random variable itself""" return self._mean.free_symbols.union( self._variance.free_symbols, (Expr.symbol(name) for name in self._names) )
[docs] def subs(self, d: Mapping[TExpr, TExpr]) -> JointNormalDistribution: """Substitute expressions Parameters ---------- d : dict Dictionary of from: to pairs for substitution Examples -------- >>> import sympy >>> from pharmpy.model import JointNormalDistribution, Parameter >>> omega1 = Parameter("OMEGA_CL", 0.1) >>> omega2 = Parameter("OMEGA_V", 0.1) >>> A = [[omega1.symbol, 0], [0, omega2.symbol]] >>> dist = JointNormalDistribution.create(["IIV_CL", "IIV_V"], "IIV", [0, 0], A) >>> dist = dist.subs({omega1.symbol: sympy.Symbol("OMEGA_NEW")}) >>> dist ⎡IIV_CL⎤ ⎧⎡0⎤ ⎡OMEGA_NEW 0 ⎤⎫ ⎢ ⎥ ~ N⎪⎢ ⎥, ⎢ ⎥⎪ ⎣IIV_V ⎦ ⎩⎣0⎦ ⎣ 0 OMEGA_V⎦⎭ """ mean = self._mean.subs(d) variance = self._variance.subs(d) new_names = tuple(_subs_name(name, d) for name in self._names) return JointNormalDistribution(new_names, self._level, mean, variance)
[docs] def evalf(self, parameters: dict[TSymbol, float]) -> NumericDistribution: try: mu = self._mean.evalf(parameters)[:, 0] sigma = self._variance.evalf(parameters) return NumericMultivariateNormalDistribution(mu, sigma) except RuntimeError as e: # NOTE: This handles missing parameter substitutions raise ValueError(e)
def __getitem__( self, index: Union[int, str, slice, Collection[Union[int, str]]] ) -> Union[NormalDistribution, JointNormalDistribution]: if isinstance(index, int): if -len(self) <= index < len(self): names = (self._names[index],) our_index = (index,) else: raise IndexError(index) elif isinstance(index, str): names = (index,) try: our_index = (self._names.index(index),) except ValueError: raise KeyError(names[0]) elif isinstance(index, slice): our_index = range(index.start, index.stop, index.step if index.step is not None else 1) names = tuple(self._names[i] for i in our_index) elif isinstance(index, Collection): if len(index) == 0 or len(index) > len(self._names): raise KeyError(index) collection = set(index) if not collection.issubset(self._names): raise KeyError(index) if len(collection) == len(self._names): return self index_list: list[int] = [] names_list: list[str] = [] for i, name in enumerate(self._names): if name in collection or i in collection: index_list.append(i) names_list.append(name) our_index = tuple(index_list) names = tuple(names_list) else: raise KeyError(index) if len(our_index) == 1: our_index = our_index[0] if len(names) == 1: assert isinstance(our_index, int) mean = self._mean[our_index, 0] variance = self._variance[our_index, our_index] return NormalDistribution(names[0], self._level, mean, variance) else: assert not isinstance(our_index, int) mean = self._mean[our_index, :] variance = self._variance[our_index, our_index] return JointNormalDistribution(names, self._level, mean, variance)
[docs] def get_variance(self, name: str) -> Expr: i = self._names.index(name) var = self._variance[i, i] return var
[docs] def get_covariance(self, name1: str, name2: str) -> Expr: i1 = self._names.index(name1) i2 = self._names.index(name2) cov = self._variance[i1, i2] return cov
def __eq__(self, other): if not isinstance(other, JointNormalDistribution): return NotImplemented return ( self._names == other._names and self._level == other._level and self._mean == other._mean and self._variance == other._variance ) def __len__(self) -> int: return len(self._names) def __hash__(self): return hash((self._names, self._level, self._mean, self._variance))
[docs] def to_dict(self) -> dict[str, Any]: return { 'class': self.__class__.__name__, 'names': self._names, 'level': self._level, 'mean': self._mean.serialize(), 'variance': self._variance.serialize(), }
[docs] @classmethod def from_dict(cls, d: Mapping[str, Any]): return cls( names=d['names'], level=d['level'], mean=Matrix.deserialize(d['mean']), variance=Matrix.deserialize(d['variance']), )
def __repr__(self): name_vector = Matrix(self._names) name_strings = name_vector.unicode().split('\n') mu_strings = self._mean.unicode().split('\n') sigma_strings = self._variance.unicode().split('\n') mu_height = len(mu_strings) sigma_height = len(sigma_strings) max_height = max(mu_height, sigma_height) left_parens = unicode.left_parens(max_height) right_parens = unicode.right_parens(max_height) # Pad the smaller of the matrices if mu_height != sigma_height: to_pad = mu_strings if mu_height < sigma_height else sigma_strings num_lines = abs(mu_height - sigma_height) padding = ' ' * len(to_pad[0]) for i in range(0, num_lines): if i % 2 == 0: to_pad.append(padding) else: to_pad.insert(0, padding) # Pad names if len(name_strings) < max_height: num_lines = abs(max_height - len(name_strings)) padding = ' ' * len(name_strings[0]) for i in range(0, num_lines): if i % 2 == 0: name_strings.append(padding) else: name_strings.insert(0, padding) central_index = max_height // 2 res = [] enumerator = enumerate( zip(name_strings, left_parens, mu_strings, sigma_strings, right_parens) ) for i, (name_line, lpar, mu_line, sigma_line, rpar) in enumerator: if i == central_index: res.append( name_line + f' ~ {unicode.mathematical_script_capital_n}' + lpar + mu_line + ', ' + sigma_line + rpar ) else: res.append(name_line + ' ' + lpar + mu_line + ' ' + sigma_line + rpar) return '\n'.join(res)
[docs] def latex_string(self, aligned: bool = False) -> str: if aligned: align_str = ' & ' else: align_str = '' rv_vec = Matrix(self._names).latex() mean_vec = self._mean.latex() sigma = self._variance.latex() latex = ( rv_vec + align_str + r'\sim \mathcal{N} \left(' + mean_vec + ',' + sigma + r'\right)' ) if not aligned: latex = '$' + latex + '$' return latex
def __getstate__(self): state = self.__dict__.copy() return state def __setstate__(self, state): self.__dict__.update(state)
[docs] class FiniteDistribution(Distribution): """A finite discrete distribution Parameters ---------- name : str Name of the random variable level : str Name of the variability level probabilities : frozenmapping[int, Expr] Probabilities for each value Example ------- >>> from pharmpy.model import FiniteDistribution, Parameter >>> mixprob = Parameter.create('MIXPROB', 0.1) >>> dist = FiniteDistribution.create("MIX", "IIV", {1: mixprob.symbol, 2: 1 - mixprob.symbol}) >>> dist MIX ~ Finite({P(1) = MIXPROB, P(2) = 1 - MIXPROB}) """ def __init__(self, name: str, level: str, probabilities: frozenmapping[int, Expr]): self._name = name self._level = level self._probabilities = probabilities
[docs] @classmethod def create(cls, name: str, level: str, probabilities: Mapping[int, Union[Expr, str]]): level = level.upper() probs = {n: Expr(expr) for n, expr in probabilities.items()} return cls(name, level, frozenmapping(probs))
[docs] def replace(self, **kwargs) -> FiniteDistribution: """Replace properties and create a new FiniteDistribution""" name = kwargs.get('name', self._name) level = kwargs.get('level', self._level) probabilities = kwargs.get('mean', self._probabilities) new = FiniteDistribution.create(name, level, probabilities) return new
@property def names(self) -> tuple[str, ...]: return (self._name,) @property def level(self) -> str: return self._level @property def probabilities(self) -> frozenmapping[int, Expr]: return self._probabilities @property def mean(self) -> Expr: s = Expr.integer(0) for n, expr in self._probabilities.items(): s += Expr.integer(n) * expr return s @property def variance(self) -> Expr: mean = self.mean s = Expr.integer(0) for n, expr in self._probabilities.items(): x = Expr.integer(n) s += (x - mean) ** Expr.integer(2) * expr return s @property def free_symbols(self) -> set[Expr]: """Free symbols including random variable itself""" symbs = {Expr.symbol(self._name)} for _, expr in self._probabilities.items(): symbs |= expr.free_symbols return symbs
[docs] def subs(self, d: Mapping[TExpr, TExpr]) -> FiniteDistribution: """Substitute expressions Parameters ---------- d : dict Dictionary of from: to pairs for substitution Examples -------- >>> import sympy >>> from pharmpy.basic import Expr >>> from pharmpy.model import FiniteDistribution, Parameter >>> mixprob = Parameter.create('MIXPROB', 0.1) >>> dist = FiniteDistribution.create("MIX", "IIV", {1: mixprob.symbol, 2: 1 - mixprob.symbol}) >>> dist MIX ~ Finite({P(1) = MIXPROB, P(2) = 1 - MIXPROB}) >>> dist = dist.subs({mixprob.symbol: Expr.symbol("NEWSYMBOL")}) >>> dist MIX ~ Finite({P(1) = NEWSYMBOL, P(2) = 1 - NEWSYMBOL}) """ newprobs = {} for n, expr in self._probabilities.items(): newprobs[n] = expr.subs(d) name = _subs_name(self._name, d) return FiniteDistribution(name, self._level, frozenmapping(newprobs))
[docs] def evalf(self, parameters: dict[TSymbol, float]) -> NumericDistribution: numprobs = {} for n, expr in self._probabilities.items(): numprobs[n] = float(expr.subs(parameters)) return NumericFiniteDistribution(numprobs)
[docs] def get_variance(self, name: str) -> Expr: if name != self._name: raise KeyError(name) return self.variance
[docs] def get_covariance(self, name1: str, name2: str) -> Expr: if name1 == name2 == self._name: return self.variance else: raise KeyError((name1, name2))
def __eq__(self, other: Any): if not isinstance(other, FiniteDistribution): return NotImplemented return ( self._name == other._name and self._level == other._level and self._probabilities == other._probabilities ) def __getitem__(self, index) -> Distribution: return _getitem_single(self, index) def __hash__(self): return hash((self._name, self._level, self._probabilities))
[docs] def to_dict(self) -> dict[str, Any]: probs = {n: expr.serialize() for n, expr in self._probabilities.items()} return { 'class': self.__class__.__name__, 'name': self._name, 'level': self._level, 'probabilities': probs, }
[docs] @classmethod def from_dict(cls, d: Mapping[str, Any]): probs = {n: Expr.deserialize(expr) for n, expr in d['probabilities'].items()} return cls( name=d['name'], level=d['level'], probabilities=frozenmapping(probs), )
def __repr__(self): probs = [] for n, expr in self._probabilities.items(): probs.append(f'P({n}) = {expr.unicode()}') return f'{Expr.symbol(self._name).unicode()}' f' ~ Finite({{' f'{", ".join(probs)}' f'}})'
[docs] def latex_string(self, aligned: bool = False) -> str: if aligned: align_str = ' & ' else: align_str = '' rv = Expr.symbol(self._name).latex() probs = [] for n, expr in self._probabilities.items(): probs.append(f'P({n}) = {expr.latex()}') latex = ( rv + align_str + f'\\sim {Expr.symbol("Finite").latex()} \\left({", ".join(probs)}\\right)' ) if not aligned: latex = '$' + latex + '$' return latex
def _subs_name(name: str, d: Mapping[TExpr, TExpr]) -> str: if name in d: new_name = d[name] elif (name_symbol := Expr.symbol(name)) in d: new_name = d[name_symbol] else: new_name = name return new_name if isinstance(new_name, str) else str(new_name) def _getitem_single( dist: Union[NormalDistribution, FiniteDistribution], index ) -> Union[NormalDistribution, FiniteDistribution]: if isinstance(index, int): if index != 0: raise IndexError(index) elif isinstance(index, str): if index != dist._name: raise KeyError(index) else: if isinstance(index, slice): index = list( range(index.start, index.stop, index.step if index.step is not None else 1) ) if index != [0]: raise IndexError(index) if isinstance(index, Collection): if len(index) != 1 or (dist._name not in index and 0 not in index): raise KeyError(index) else: raise KeyError(index) return dist