Source code for pharmpy.modeling.results

from __future__ import annotations

import math
from itertools import chain
from typing import Iterable, Literal, Optional, Union

from pharmpy.basic import BooleanExpr, Expr
from pharmpy.deps import numpy as np
from pharmpy.deps import pandas as pd
from pharmpy.deps import sympy
from pharmpy.internals.expr.parse import parse as parse_expr
from pharmpy.internals.expr.subs import subs, xreplace_dict
from pharmpy.internals.math import round_to_n_sigdig
from pharmpy.model import (
    CompartmentalSystem,
    CompartmentalSystemBuilder,
    Model,
    get_and_check_dataset,
    output,
)
from pharmpy.model.distributions.numeric import ConstantDistribution
from pharmpy.model.random_variables import (
    eval_expr,
    filter_distributions,
    sample_rvs,
    subs_distributions,
)

from .data import get_ids, get_observations
from .expressions import get_individual_parameters
from .odes import get_and_check_odes, get_initial_conditions
from .parameter_sampling import create_rng, sample_parameters_from_covariance_matrix
from .parameters import get_omegas
from .random_variables import replace_non_random_rvs


[docs] def calculate_eta_shrinkage( model: Model, parameter_estimates: pd.Series, individual_estimates: pd.DataFrame, sd: bool = False, ): """Calculate eta shrinkage for each eta Parameters ---------- model : Model Pharmpy model parameter_estimates : pd.Series Parameter estimates individual_estimates : pd.DataFrame Table of individual (eta) estimates sd : bool Calculate shrinkage on the standard deviation scale (default is to calculate on the variance scale) Return ------ Series Shrinkage for each eta Examples -------- >>> from pharmpy.modeling import * >>> from pharmpy.tools import load_example_modelfit_results >>> model = load_example_model("pheno") >>> results = load_example_modelfit_results("pheno") >>> pe = results.parameter_estimates >>> ie = results.individual_estimates >>> calculate_eta_shrinkage(model, pe, ie) ETA_CL 0.720481 ETA_VC 0.240295 dtype: float64 >>> calculate_eta_shrinkage(model, pe, ie, sd=True) ETA_CL 0.471305 ETA_VC 0.128389 dtype: float64 See also -------- calculate_individual_shrinkage """ # Want parameter estimates combined with fixed parameter values param_inits = model.parameters.to_dataframe()['value'] pe = parameter_estimates.combine_first(param_inits) param_names = [str(param) for param in model.random_variables.etas.covariance_matrix.diagonal()] diag_ests = pe[param_names] diag_ests.index = individual_estimates.columns if not sd: shrinkage = 1 - (individual_estimates.var() / diag_ests) else: shrinkage = 1 - (individual_estimates.std() / (diag_ests**0.5)) return shrinkage
[docs] def calculate_individual_shrinkage( model: Model, parameter_estimates: pd.Series, individual_estimates_covariance: pd.DataFrame ): """Calculate the individual eta-shrinkage Definition: ieta_shr = (var(eta) / omega) Parameters ---------- model : Model Pharmpy model parameter_estimates : pd.Series Parameter estimates of model individual_estimates_covariance : pd.DataFrame Uncertainty covariance matrices of individual estimates Return ------ DataFrame Shrinkage for each eta and individual Examples -------- >>> from pharmpy.modeling import * >>> from pharmpy.tools import load_example_modelfit_results >>> model = load_example_model("pheno") >>> results = load_example_modelfit_results("pheno") >>> pe = results.parameter_estimates >>> covs = results.individual_estimates_covariance >>> calculate_individual_shrinkage(model, pe, covs) ETA_CL ETA_VC ID 1 0.847789 0.256473 2 0.796643 0.210669 3 0.755025 0.226957 4 0.764541 0.216405 5 0.816192 0.203974 6 0.778108 0.210992 7 0.659420 0.236875 8 0.668551 0.240097 9 0.260056 0.200374 10 0.725190 0.226563 11 0.972110 0.421852 12 0.249640 0.254119 13 0.730294 0.364932 14 0.165785 0.194464 15 0.813399 0.313554 16 0.797328 0.213211 17 0.769059 0.278079 18 0.098506 0.176778 19 0.749022 0.235386 20 0.742181 0.222932 21 0.317956 0.264473 22 0.943950 0.232732 23 0.707183 0.259077 24 0.553787 0.247717 25 0.826349 0.114302 26 0.854777 0.341384 27 0.820829 0.263235 28 0.999942 0.319986 29 0.967084 0.432760 30 0.404773 0.325215 31 0.999980 0.318421 32 0.925283 0.167667 33 0.913706 0.242106 34 0.875554 0.249197 35 0.849135 0.294294 36 0.172206 0.246422 37 0.747380 0.278340 38 0.187440 0.231249 39 0.237805 0.254485 40 0.999925 0.189793 41 0.941906 0.170998 42 0.923801 0.244046 43 0.999928 0.320403 44 0.237637 0.260453 45 0.869540 0.194503 46 0.999949 0.319750 47 0.983782 0.393234 48 0.698267 0.169337 49 0.776674 0.214962 50 0.688847 0.192608 51 0.822213 0.202534 52 0.511489 0.273601 53 0.964757 0.223448 54 0.762156 0.181648 55 0.965657 0.435741 56 0.995278 0.354798 57 0.813382 0.263372 58 0.727295 0.232867 59 0.738777 0.224742 See also -------- calculate_eta_shrinkage """ cov = individual_estimates_covariance pe = parameter_estimates # Want parameter estimates combined with fixed parameter values param_inits = model.parameters.to_dataframe()['value'] pe = pe.combine_first(param_inits) # Get all iiv and iov variance parameters diag = model.random_variables.etas.covariance_matrix.diagonal() param_names = [s.name for s in diag] diag_ests = pe[param_names] def fn(row, ests): names = row[0].index ser = pd.Series(np.diag(row[0].values) / ests, index=names) return ser ish = pd.DataFrame(cov).apply(fn, axis=1, ests=diag_ests.values) return ish
[docs] def calculate_individual_parameter_statistics( model: Model, expr_or_exprs: Union[ Iterable[BooleanExpr], Iterable[Expr], Iterable[str], BooleanExpr, Expr, str ], parameter_estimates: pd.Series, covariance_matrix: Optional[pd.DataFrame] = None, seed: Optional[Union[np.random.Generator, int]] = None, ): """Calculate statistics for individual parameters Calculate the mean (expected value of the distribution), variance (variance of the distribution) and standard error for individual parameters described by arbitrary expressions. Any dataset column or variable used in the model can be used in the expression. The exception being that variables that depends on the solution of the ODE system cannot be used. If covariates are used in the expression the statistics of the parameter is calculated at the median value of each covariate as well as at the 5:th and 95:th percentiles. If no parameter uncertainty is available for the model the standard error will not be calculated. Parameters ---------- model : Model A previously estimated model parameter_estimates : pd.Series Parameter estimates covariance_matrix : pd.DataFrame Parameter uncertainty covariance matrix expr_or_exprs : str expression or iterable of str or expressions Expressions or equations for parameters of interest. If equations are used the names of the left hand sides will be used as the names of the parameters. seed : Generator or int Random number generator or int seed Returns ------- pd.DataFrame A DataFrame of statistics indexed on parameter and covariate value. Examples -------- >>> from pharmpy.modeling import load_example_model, create_rng >>> from pharmpy.modeling import calculate_individual_parameter_statistics >>> from pharmpy.tools import load_example_modelfit_results >>> model = load_example_model("pheno") >>> results = load_example_modelfit_results("pheno") >>> rng = create_rng(23) >>> pe = results.parameter_estimates >>> cov = results.covariance_matrix >>> calculate_individual_parameter_statistics(model, "K=CL/V", pe, cov, seed=rng) mean variance stderr parameter covariates K p5 0.004234 0.000001 0.001100 median 0.004907 0.000001 0.001201 p95 0.004907 0.000001 0.001201 """ rng = create_rng(seed) if isinstance(expr_or_exprs, str): expr_or_exprs = [_split_equation(expr_or_exprs)] else: try: expr_or_exprs = [_split_equation(e) for e in expr_or_exprs] except TypeError: expr_or_exprs = [_split_equation(expr_or_exprs)] full_exprs = list( map( lambda e: (e[0], sympy.sympify(model.statements.before_odes.full_expression(e[1]))), expr_or_exprs, ) ) input_parameter_estimates = parameter_estimates parameter_estimates = xreplace_dict(parameter_estimates) all_free_symbols = set().union(*map(lambda e: e[1].free_symbols, full_exprs)) all_covariate_free_symbols = all_free_symbols.intersection( map(sympy.Symbol, model.datainfo.names) ) all_parameter_free_symbols = set(parameter_estimates.keys()) all_random_free_symbols = all_free_symbols.difference( all_parameter_free_symbols, all_covariate_free_symbols ) distributions = list( filter_distributions( model.random_variables, all_random_free_symbols, ) ) sampling_rvs = list( subs_distributions( distributions, {Expr(key): float(val) for key, val in parameter_estimates.items()}, ) ) batches = [] if not all_covariate_free_symbols: cases = {'median': {}} else: dataset = get_and_check_dataset(model) column_filter = ['ID'] + list(symbol.name for symbol in all_covariate_free_symbols) q5 = dataset[column_filter].groupby('ID').median().quantile(0.05) q95 = dataset[column_filter].groupby('ID').median().quantile(0.95) median = dataset[column_filter].groupby('ID').median().median() cases = { 'p5': xreplace_dict(q5), 'median': xreplace_dict(median), 'p95': xreplace_dict(q95), } filtered_sampling_rvs = list( filter( lambda r: any(map(all_random_free_symbols.__contains__, r[0])), sampling_rvs, ) ) nsamples = 1000000 nbatches = 100 batchsize = 10 samples = sample_rvs(filtered_sampling_rvs, nsamples, rng) if covariance_matrix is not None: parameters_samples = sample_parameters_from_covariance_matrix( model, input_parameter_estimates, covariance_matrix, n=nbatches, force_posdef_covmatrix=True, seed=rng, ) for _, row in parameters_samples.iterrows(): parameters = xreplace_dict(row) local_sampling_rvs = list( subs_distributions( distributions, {Expr(key): float(val) for key, val in parameters.items()} ) ) + [ ((key,), ConstantDistribution(value)) for key, value in parameters.items() if key in all_parameter_free_symbols ] batch = sample_rvs(local_sampling_rvs, batchsize, rng) batches.append(batch) table = pd.DataFrame(columns=['parameter', 'covariates', 'mean', 'variance', 'stderr']) i = 0 for name, full_expr in full_exprs: df = pd.DataFrame(index=list(cases.keys()), columns=['mean', 'variance', 'stderr']) parameter_estimates_expr = subs(full_expr, parameter_estimates, simultaneous=True) for case, cov_values in cases.items(): expr = subs(parameter_estimates_expr, cov_values, simultaneous=True) values = eval_expr(expr, nsamples, samples) mean = np.mean(values) variance = np.var(values) # NOTE: This is NaN for empty inputs, dtype is required for those. cov_expr = subs(full_expr, cov_values, simultaneous=True) stderr = pd.Series( chain.from_iterable(eval_expr(cov_expr, batchsize, batch) for batch in batches), dtype='float64', ).std() df.loc[case] = [mean, variance, stderr] df.index.name = 'covariates' df.reset_index(inplace=True) if not name: name = f'unknown{i}' i += 1 df['parameter'] = name table = pd.concat([table, df]) table.set_index(['parameter', 'covariates'], inplace=True) table = table.astype('float64') return table
[docs] def calculate_pk_parameters_statistics( model: Model, parameter_estimates: pd.Series, covariance_matrix: Optional[pd.DataFrame] = None, seed: Optional[Union[np.random.Generator, int]] = None, ): """Calculate statistics for common pharmacokinetic parameters Calculate the mean (expected value of the distribution), variance (variance of the distribution) and standard error for some individual pre-defined pharmacokinetic parameters. Parameters ---------- model : Model A previously estimated model parameter_estimates : pd.Series Parameter estimates covariance_matrix : pd.DataFrame Parameter uncertainty covariance matrix seed : Generator or int Random number generator or seed Returns ------- pd.DataFrame A DataFrame of statistics indexed on parameter and covariate value. Examples -------- >>> from pharmpy.modeling import load_example_model, create_rng >>> from pharmpy.modeling import calculate_pk_parameters_statistics >>> from pharmpy.tools import load_example_modelfit_results >>> model = load_example_model("pheno") >>> results = load_example_modelfit_results("pheno") >>> rng = create_rng(23) >>> pe = results.parameter_estimates >>> cov = results.covariance_matrix >>> calculate_pk_parameters_statistics(model, pe, cov, seed=rng) mean variance stderr parameter covariates t_half_elim p5 173.337164 1769.493756 44.852625 median 149.567842 1317.474199 37.819338 p95 149.567842 1317.474199 37.819338 k_e p5 0.004234 0.000001 0.001100 median 0.004907 0.000001 0.001201 p95 0.004907 0.000001 0.001201 See Also -------- calculate_individual_parameter_statistics : Calculation of statistics for arbitrary parameters """ odes = get_and_check_odes(model) central = odes.central_compartment statements = model.statements depot = odes.find_depot(statements) peripherals = odes.find_peripheral_compartments() elimination_rate = odes.get_flow(central, output) expressions = [] # Eq(name, expr) # FO abs + 1comp + FO elimination if len(odes) == 2 and depot and odes.t not in elimination_rate.free_symbols: ode_list, ics = odes.eqs, get_initial_conditions(model, dosing=True) ode_list = sympy.sympify(ode_list) ics = sympy.sympify(ics) sols = sympy.dsolve(ode_list, ics=ics) expr = sols[1].rhs d = sympy.diff(expr, odes.t) tmax_closed_form = sympy.solve(d, odes.t)[0] expressions.append(sympy.Eq(sympy.Symbol('t_max'), tmax_closed_form)) e2 = sympy.simplify(expr / depot.doses[0].amount / sympy.denom(elimination_rate)) cmax_dose_closed_form = sympy.simplify( subs(e2, {odes.t: tmax_closed_form}, simultaneous=True) ) expressions.append(sympy.Eq(sympy.Symbol('C_max_dose'), cmax_dose_closed_form)) # Any abs + 1comp + FO elimination if not peripherals and odes.t not in elimination_rate.free_symbols: elimination_system = odes for name in elimination_system.compartment_names: if name != central.name: # NOTE: Keep central cb = CompartmentalSystemBuilder(elimination_system) cb.remove_compartment(elimination_system.find_compartment(name)) elimination_system = CompartmentalSystem(cb) eq = elimination_system.eqs[0] eq = sympy.sympify(eq) ic = sympy.Function(elimination_system.amounts[0].name)(0) A0 = sympy.Symbol('A0') sols = sympy.dsolve(eq, ics={ic: A0}) eq_half = sympy.Eq(sympy.Rational(1, 2) * A0, sols.rhs) thalf_elim = sympy.solve(eq_half, odes.t)[0] expressions.append(sympy.Eq(sympy.Symbol('t_half_elim'), thalf_elim)) # Bolus dose + 2comp + FO elimination if len(peripherals) == 1 and len(odes) == 2 and odes.t not in elimination_rate.free_symbols: ode_list, ics = odes.eqs, get_initial_conditions(model, dosing=True) ode_list = sympy.sympify(ode_list) ics = sympy.sympify(ics) sols = sympy.dsolve(ode_list, ics=ics) A = sympy.Wild('A') B = sympy.Wild('B') alpha = sympy.Wild('alpha') beta = sympy.Wild('beta') m = sols[0].rhs.match(A * sympy.exp(-alpha) + B * sympy.exp(-beta)) beta = m[beta] / odes.t alpha = m[alpha] / odes.t A = m[A] / central.doses[0].amount B = m[B] / central.doses[0].amount if (beta - alpha).extract_multiplicatively(-1) is not None: # alpha > beta (sympy couldn't simplify this directly) alpha, beta = beta, alpha A, B = B, A expressions.append(sympy.Eq(sympy.Symbol('A'), A)) expressions.append(sympy.Eq(sympy.Symbol('B'), B)) expressions.append(sympy.Eq(sympy.Symbol('alpha'), alpha)) expressions.append(sympy.Eq(sympy.Symbol('beta'), beta)) # Any abs + any comp + FO elimination if odes.t not in elimination_rate.free_symbols: expressions.append(sympy.Eq(sympy.Symbol('k_e'), elimination_rate)) df = calculate_individual_parameter_statistics( model, expressions, parameter_estimates, covariance_matrix, seed=seed ) return df
def _split_equation(s): if isinstance(s, str): a = s.split('=') if len(a) == 1: name = None expr = parse_expr(s) else: name = a[0].strip() expr = parse_expr(a[1]) elif isinstance(s, sympy.Eq): name = s.lhs.name expr = s.rhs else: # sympy expr name = None expr = s if name is None and isinstance(expr, sympy.Symbol): name = expr.name return name, expr
[docs] def calculate_aic(model: Model, likelihood: float): """Calculate AIC AIC = -2LL + 2*n_estimated_parameters Parameters ---------- model : Model Pharmpy model object likelihood : float -2LL Returns ------- float AIC of model fit """ parameters = model.parameters.nonfixed return likelihood + 2 * len(parameters)
def _random_etas(model): var = model.random_variables.etas.variance_parameters zerofix = [model.parameters[e].fix and model.parameters[e].init == 0 for e in var] keep = [] for eta, zf in zip(model.random_variables.etas.names, zerofix): if not zf: keep.append(eta) return model.random_variables.etas[keep]
[docs] def calculate_bic( model: Model, likelihood: float, type: Literal['mixed', 'fixed', 'random', 'iiv'] = 'mixed', ) -> float: """Calculate BIC Different variations of the BIC can be calculated: * | mixed (default) | BIC = -2LL + n_random_parameters * log(n_individuals) + | n_fixed_parameters * log(n_observations) * | fixed | BIC = -2LL + n_estimated_parameters * log(n_observations) * | random | BIC = -2LL + n_estimated_parameters * log(n_individuals) * | iiv | BIC = -2LL + n_estimated_iiv_omega_parameters * log(n_individuals) Parameters ---------- model : Model Pharmpy model object likelihood : float -2LL to use type : {'mixed', 'fixed', 'random', 'iiv'} Type of BIC to calculate. Default is the mixed effects. Returns ------- float BIC of model fit Examples -------- >>> from pharmpy.modeling import * >>> from pharmpy.tools import load_example_modelfit_results >>> model = load_example_model("pheno") >>> results = load_example_modelfit_results("pheno") >>> ofv = results.ofv >>> calculate_bic(model, ofv) 611.7071686216575 >>> calculate_bic(model, ofv, type='fixed') 616.5366069867251 >>> calculate_bic(model, ofv, type='random') 610.741280948644 >>> calculate_bic(model, ofv, type='iiv') 594.4311311730211 """ parameters = model.parameters.nonfixed if type == 'fixed': penalty = len(parameters) * math.log(len(get_observations(model))) elif type == 'random': penalty = len(parameters) * math.log(len(get_ids(model))) elif type == 'iiv': nomegas_iiv = len( [name for name in model.random_variables.iiv.parameter_names if name in parameters] ) penalty = nomegas_iiv * math.log(len(get_ids(model))) elif type == 'mixed': theta_f, theta_r = _categorize_parameters(model) nsubs = len(get_ids(model)) nobs = len(get_observations(model)) penalty = len(theta_r) * math.log(nsubs) + len(theta_f) * math.log(nobs) else: supported_types = ('mixed', 'fixed', 'random', 'iiv') raise ValueError(f'Unknown `type` \'{type}\', must be one of: {supported_types}') return likelihood + penalty
def _categorize_parameters(model): # Categorize parameters into random and fixed model = replace_non_random_rvs(model) # To get rid of 0 FIX rvs indpars = get_individual_parameters(model) all_pop_params = set(model.parameters.nonfixed.symbols) omegas = set(get_omegas(model).symbols) etas = set(model.random_variables.etas.symbols) epsilons = set(model.random_variables.epsilons.symbols) randpars = omegas.intersection(all_pop_params) fixedpars = set() for indpar in indpars: expr = model.statements.before_odes.full_expression(indpar) symbols = expr.free_symbols param_symbols = symbols.intersection(all_pop_params) if symbols.isdisjoint(etas): fixedpars |= param_symbols - randpars else: fixedpars -= param_symbols randpars |= param_symbols for y in model.dependent_variables.keys(): expr = model.statements.after_odes.full_expression(y) symbols = expr.free_symbols param_symbols = symbols.intersection(all_pop_params) cureps = symbols.intersection(epsilons) cursigmas = model.random_variables[cureps].free_symbols - cureps cursigmas = cursigmas.intersection(all_pop_params) cursymbols = param_symbols | cursigmas if symbols.isdisjoint(etas): fixedpars |= cursymbols - randpars else: fixedpars -= cursymbols randpars |= cursymbols return fixedpars, randpars
[docs] def check_high_correlations(model: Model, cor: pd.DataFrame, limit: float = 0.9): """Check for highly correlated parameter estimates Parameters ---------- model : Model Pharmpy model object cor : pd.DataFrame Estimated correlation matrix limit : float Lower limit for a high correlation Returns ------- pd.Series Correlation values indexed on pairs of parameters for (absolute) correlations above limit Example ------- >>> from pharmpy.modeling import * >>> from pharmpy.tools import load_example_modelfit_results >>> model = load_example_model("pheno") >>> results = load_example_modelfit_results("pheno") >>> cor = results.correlation_matrix >>> check_high_correlations(model, cor, limit=0.3) POP_CL IIV_CL -0.387063 POP_VC COVAPGR -0.357003 IIV_VC 0.356831 dtype: float64 """ high_and_below_diagonal = cor.abs().ge(limit) & np.triu(np.ones(cor.shape), k=1).astype(bool) return cor.where(high_and_below_diagonal).stack()
[docs] def check_parameters_near_bounds( model: Model, values: pd.Series, zero_limit: float = 0.001, significant_digits: int = 2 ): """Check if any estimated parameter value is close to its bounds Parameters ---------- model : Model Pharmpy model object values : pd.Series Series of values with index a subset of parameter names. zero_limit : float maximum distance to 0 bounds significant_digits : int maximum distance to non-zero bounds in number of significant digits Returns ------- pd.Series Logical Series with same index as values Example ------- >>> from pharmpy.modeling import * >>> from pharmpy.tools import load_example_modelfit_results >>> model = load_example_model("pheno") >>> results = load_example_modelfit_results("pheno") >>> check_parameters_near_bounds(model, results.parameter_estimates) POP_CL False POP_VC False COVAPGR False IIV_CL False IIV_VC False SIGMA False dtype: bool """ ser = pd.Series( [ _is_close_to_bound(model.parameters[p], values.loc[p], zero_limit, significant_digits) for p in values.index ], index=values.index, dtype=bool, ) return ser
def _is_close_to_bound(param, value=None, zero_limit=0.01, significant_digits=2): if value is None: value = param.init return ( param.lower > -float("inf") and _is_near_target(value, param.lower, zero_limit, significant_digits) ) or ( param.upper < float("inf") and _is_near_target(value, param.upper, zero_limit, significant_digits) ) def _is_near_target(x, target, zero_limit, significant_digits): if target == 0: return abs(x) < abs(zero_limit) else: return round_to_n_sigdig(x, n=significant_digits) == round_to_n_sigdig( target, n=significant_digits )