Source code for pharmpy.modeling.block_rvs

"""
:meta private:
"""
import warnings
from typing import List, Optional, Union

from pharmpy.deps import numpy as np
from pharmpy.deps import pandas as pd
from pharmpy.deps import sympy
from pharmpy.internals.math import corr2cov, nearest_postive_semidefinite
from pharmpy.model import Model, Parameter, Parameters
from pharmpy.modeling.help_functions import _get_etas


[docs]def create_joint_distribution( model: Model, rvs: Optional[List[str]] = None, individual_estimates: Optional[pd.DataFrame] = None, ): """ Combines some or all etas into a joint distribution. The etas must be IIVs and cannot be fixed. Initial estimates for covariance between the etas is dependent on whether the model has results from a previous run. In that case, the correlation will be calculated from individual estimates, otherwise correlation will be set to 10%. Parameters ---------- model : Model Pharmpy model rvs : list Sequence of etas or names of etas to combine. If None, all etas that are IIVs and non-fixed will be used (full block). None is default. individual_estimates : pd.DataFrame Optional individual estimates to use for calculation of initial estimates Return ------ Model Pharmpy model object Examples -------- >>> from pharmpy.modeling import load_example_model, create_joint_distribution >>> model = load_example_model("pheno") >>> model.random_variables.etas ETA₁ ~ N(0, IVCL) ETA₂ ~ N(0, IVV) >>> model = create_joint_distribution(model, ['ETA_1', 'ETA_2']) >>> model.random_variables.etas ⎡ETA₁⎤ ⎧⎡0⎤ ⎡ IVCL IIV_CL_IIV_V⎤⎫ ⎢ ⎥ ~ N⎪⎢ ⎥, ⎢ ⎥⎪ ⎣ETA₂⎦ ⎩⎣0⎦ ⎣IIV_CL_IIV_V IVV ⎦⎭ See also -------- split_joint_distribution : split etas into separate distributions """ all_rvs = model.random_variables if rvs is None: rvs = [] iiv_rvs = model.random_variables.iiv for rv in iiv_rvs: for name in rv.parameter_names: if model.parameters[name].fix: break else: rvs.extend(rv.names) else: for name in rvs: if name in all_rvs and all_rvs[name].level == 'IOV': raise ValueError( f'{name} describes IOV: Joining IOV random variables is currently not supported' ) if len(rvs) == 1: raise ValueError('At least two random variables are needed') sset = model.statements paramnames = [] for rv in rvs: parameter_names = '_'.join( [s.symbol.name for s in sset if sympy.Symbol(rv) in s.rhs_symbols] ) paramnames.append(parameter_names) all_rvs, cov_to_params = all_rvs.join( rvs, name_template='IIV_{}_IIV_{}', param_names=paramnames ) pset_new = model.parameters for cov_name, param_names in cov_to_params.items(): parent1, parent2 = model.parameters[param_names[0]], model.parameters[param_names[1]] covariance_init = _choose_param_init(model, individual_estimates, all_rvs, parent1, parent2) param_new = Parameter(cov_name, covariance_init) pset_new += param_new model = model.replace(parameters=Parameters.create(pset_new), random_variables=all_rvs) return model.update_source()
[docs]def split_joint_distribution(model: Model, rvs: Optional[Union[List[str], str]] = None): """ Splits etas following a joint distribution into separate distributions. Parameters ---------- model : Model Pharmpy model rvs : str, list Name/names of etas to separate. If None, all etas that are IIVs and non-fixed will become single. None is default. Return ------ Model Pharmpy model object Examples -------- >>> from pharmpy.modeling import * >>> model = load_example_model("pheno") >>> model = create_joint_distribution(model, ['ETA_1', 'ETA_2']) >>> model.random_variables.etas ⎡ETA₁⎤ ⎧⎡0⎤ ⎡ IVCL IIV_CL_IIV_V⎤⎫ ⎢ ⎥ ~ N⎪⎢ ⎥, ⎢ ⎥⎪ ⎣ETA₂⎦ ⎩⎣0⎦ ⎣IIV_CL_IIV_V IVV ⎦⎭ >>> model = split_joint_distribution(model, ['ETA_1', 'ETA_2']) >>> model.random_variables.etas ETA₁ ~ N(0, IVCL) ETA₂ ~ N(0, IVV) See also -------- create_joint_distribution : combine etas into a join distribution """ all_rvs = model.random_variables names = _get_etas(model, rvs) new_rvs = all_rvs.unjoin(names) parameters_before = all_rvs.parameter_names parameters_after = new_rvs.parameter_names removed_parameters = set(parameters_before) - set(parameters_after) new_params = Parameters( tuple([p for p in model.parameters if p.name not in removed_parameters]) ) model = model.replace(random_variables=new_rvs, parameters=new_params).update_source() return model
def _choose_param_init(model, individual_estimates, rvs, parent1, parent2): etas = [] for name in rvs.names: if rvs[name].get_variance(name).name in (parent1.name, parent2.name): etas.append(name) sd = np.array([np.sqrt(parent1.init), np.sqrt(parent2.init)]) init_default = round(0.1 * sd[0] * sd[1], 7) last_estimation_step = [est for est in model.estimation_steps if not est.evaluation][-1] if last_estimation_step.method == 'FO': return init_default elif individual_estimates is not None: try: ie = individual_estimates if not all(eta in ie.columns for eta in etas): return init_default except KeyError: return init_default # NOTE Use pd.corr() and not pd.cov(). SD is chosen from the final estimates, if cov is used # it will be calculated from the EBEs. eta_corr = ie[etas].corr() if eta_corr.isnull().values.any(): warnings.warn( f'Correlation of individual estimates between {parent1.name} and ' f'{parent2.name} is NaN, returning default initial estimate' ) return init_default cov = corr2cov(eta_corr.to_numpy(), sd) cov[cov == 0] = 0.0001 cov = nearest_postive_semidefinite(cov) init_cov = cov[1][0] return round(init_cov, 7) else: return init_default