Source code for pharmpy.tools.cdd.results

import re
from math import sqrt
from pathlib import Path

import numpy as np
import pandas as pd
from scipy.linalg import cho_factor, solve_triangular

from pharmpy import Model
from pharmpy.results import Results
from pharmpy.tools.psn_helpers import model_paths, options_from_command


[docs]class CDDResults(Results): """CDD Results class""" rst_path = Path(__file__).parent / 'report.rst' def __init__(self, case_results=None, case_column=None, individual_predictions_plot=None): self.case_results = case_results self.case_column = case_column self.individual_predictions_plot = individual_predictions_plot
[docs]def compute_cook_scores(base_estimate, cdd_estimates, covariance_matrix): # covariance_matrix may be jackknife estimate of covariance try: # chol * chol^T = covariance_matrix # inv(chol^T) * inv(chol) = inv(covariance_matrix) # delta_vector * inv(covariance_matrix) * delta_vector^T = # delta_vector * inv(chol^T) * inv(chol) * delta_vector^T which is # the 2-norm of x where x is # solution to triangular system delta_vector^T = chol * x # Below we solve for all delta-vectors in one line chol, islow = cho_factor(covariance_matrix) delta_matrix = cdd_estimates - base_estimate x = solve_triangular(chol, delta_matrix.transpose(), lower=islow, trans=1) return [np.linalg.norm(v) for v in x.transpose()] except Exception: return None
[docs]def compute_delta_ofv(base_model, cdd_models, skipped_individuals): iofv = base_model.modelfit_results.individual_ofv if iofv is None: return [np.nan for m in cdd_models] cdd_ofvs = [m.modelfit_results.ofv if m.modelfit_results else np.nan for m in cdd_models] # need to set dtype for index.difference to work skipped_indices = [ pd.Index(np.array(skipped, dtype=iofv.index.dtype)) for skipped in skipped_individuals ] return [ sum(iofv[iofv.index.difference(skipped)]) - ofv for skipped, ofv in zip(skipped_indices, cdd_ofvs) ]
[docs]def compute_jackknife_covariance_matrix(cdd_estimates): bigN = len(cdd_estimates.index) delta_est = cdd_estimates - cdd_estimates.mean() return delta_est.transpose() @ delta_est * (bigN - 1) / bigN
[docs]def compute_covariance_ratios(cdd_models, covariance_matrix): try: orig_det = np.linalg.det(covariance_matrix) return [ sqrt(np.linalg.det(m.modelfit_results.covariance_matrix) / orig_det) if m.modelfit_results and m.modelfit_results.covariance_matrix is not None else np.nan for m in cdd_models ] except Exception: return None
[docs]def calculate_results(base_model, cdd_models, case_column, skipped_individuals, **kwargs): """Calculate CDD results""" if base_model.modelfit_results is None: raise ValueError('cdd base model has no results') cdd_estimates = pd.DataFrame( data=[ pd.Series(m.modelfit_results.parameter_estimates, name=m.name) for m in cdd_models if m.modelfit_results ] ) cdd_model_names = [m.name for m in cdd_models] res = CDDResults(case_column=case_column) # create Series of NaN values and then replace any computable results cook_temp = pd.Series(np.nan, index=cdd_model_names) try: base_model.modelfit_results.covariance_matrix except Exception: pass else: cook_temp.update( pd.Series( compute_cook_scores( base_model.modelfit_results.parameter_estimates, cdd_estimates, base_model.modelfit_results.covariance_matrix, ), index=cdd_estimates.index, ) ) jack_cook_score = None if len(cdd_model_names) == cdd_estimates.shape[0]: # all models have results jackkknife_covariance_matrix = compute_jackknife_covariance_matrix(cdd_estimates) jack_cook_score = pd.Series( compute_cook_scores( base_model.modelfit_results.parameter_estimates, cdd_estimates, jackkknife_covariance_matrix, ), index=cdd_model_names, ) dofv = compute_delta_ofv(base_model, cdd_models, skipped_individuals) dofv_influential = [elt > 3.86 for elt in dofv] infl_list = [ skipped[0] for skipped, infl in zip(skipped_individuals, dofv_influential) if infl and len(skipped) == 1 ] if infl_list: try: iplot = base_model.modelfit_results.plot_individual_predictions( individuals=infl_list, predictions=['PRED', 'CIPREDI'] ) except KeyError: iplot = None else: iplot = None res.individual_predictions_plot = iplot try: covmatrix = base_model.modelfit_results.covariance_matrix except Exception: covratios = np.nan else: covratios = compute_covariance_ratios(cdd_models, covmatrix) case_results = pd.DataFrame( { 'cook_score': cook_temp, 'jackknife_cook_score': jack_cook_score, 'delta_ofv': dofv, 'dofv_influential': dofv_influential, 'covariance_ratio': covratios, 'skipped_individuals': skipped_individuals, }, index=cdd_model_names, ) case_results.index = pd.RangeIndex(start=1, stop=len(case_results) + 1) res.case_results = case_results return res
[docs]def psn_cdd_options(path): path = Path(path) options = dict(model_path=None, outside_n_sd_check=None, case_column='ID') with open(path / 'meta.yaml') as meta: cmd = None for row in meta: if cmd is not None: if re.match(r'\s', row): # continuation is indented cmd += row # must not strip continue else: # no continuation: parse and remove for k, v in options_from_command(cmd).items(): if 'case_column'.startswith(k): options['case_column'] = v break cmd = None row = row.strip() if row.startswith('model_files:'): row = next(meta).strip() options['model_path'] = re.sub(r'^-\s*', '', row) elif row.startswith('outside_n_sd_check: '): options['outside_n_sd_check'] = int(re.sub(r'\D', '', row)) elif row.startswith('command_line: '): cmd = row return options
[docs]def psn_cdd_skipped_individuals(path): path = Path(path) / 'skipped_individuals1.csv' with open(path) as skipped: # rows may have different number of values, cannot use pd.read_csv a = [row.rstrip().split(',') for row in skipped] # If scientific notation convert to proper integer. Only supports integer IDs a = [[str(int(float(elt))) for elt in row] for row in a] return a
[docs]def psn_cdd_results(path): """Create cdd results from a PsN CDD run :param path: Path to PsN cdd run directory :return: A :class:`CDDResults` object """ path = Path(path) if not path.is_dir(): raise IOError(f'Could not find cdd folder: {str(path)}') options = psn_cdd_options(path) model_path = Path(options['model_path']) base_model = Model(model_path) cdd_models = [Model(p) for p in model_paths(path, 'cdd_*.mod')] skipped_individuals = psn_cdd_skipped_individuals(path) res = calculate_results(base_model, cdd_models, options['case_column'], skipped_individuals) return res