Source code for pharmpy.tools.qa.results

from pathlib import Path

import numpy as np
import pandas as pd
import yaml

import pharmpy.random_variables
import pharmpy.symbols
import pharmpy.tools.psn_helpers as psn_helpers
from pharmpy import Model
from pharmpy.results import Results, read_results


[docs]class QAResults(Results): def __init__( self, dofv=None, fullblock_parameters=None, boxcox_parameters=None, tdist_parameters=None, add_etas_parameters=None, iov_parameters=None, influential_individuals=None, covariate_effects=None, univariate_sum=None, residual_error=None, structural_bias=None, ): self.dofv = dofv self.fullblock_parameters = fullblock_parameters self.boxcox_parameters = boxcox_parameters self.tdist_parameters = tdist_parameters self.add_etas_parameters = add_etas_parameters self.iov_parameters = iov_parameters self.influential_individuals = influential_individuals self.covariate_effects = covariate_effects self.univariate_sum = univariate_sum self.residual_error = residual_error self.structural_bias = structural_bias
[docs]def calculate_results( original_model, base_model, fullblock_model=None, boxcox_model=None, tdist_model=None, add_etas_model=None, iov_model=None, etas_added_to=None, frem_results=None, cdd_results=None, scm_results=None, simeval_results=None, resmod_idv_results=None, ): fullblock_table, fullblock_dofv = calc_fullblock(original_model, fullblock_model) boxcox_table, boxcox_dofv = calc_transformed_etas( original_model, boxcox_model, 'boxcox', 'lambda' ) tdist_table, tdist_dofv = calc_transformed_etas(original_model, tdist_model, 'tdist', 'df') addetas_table, addetas_dofv = calc_add_etas(original_model, add_etas_model, etas_added_to) iov_table, iov_dofv = calc_iov(original_model, iov_model) frem_dofv = calc_frem_dofv(base_model, fullblock_model, frem_results) univariate_sum, scm_table, scm_dofv = calc_scm_dofv(scm_results) infinds, cdd_dofv = influential_individuals(cdd_results) outs, simeval_dofv = outliers(simeval_results, cdd_results) resmodtab, resmod_dofv = resmod(resmod_idv_results) dofv_table = pd.concat( [ fullblock_dofv, boxcox_dofv, tdist_dofv, addetas_dofv, iov_dofv, frem_dofv, scm_dofv, cdd_dofv, simeval_dofv, resmod_dofv, ] ) dofv_table.set_index(['section', 'run', 'dvid'], inplace=True) res = QAResults( dofv=dofv_table, fullblock_parameters=fullblock_table, boxcox_parameters=boxcox_table, tdist_parameters=tdist_table, add_etas_parameters=addetas_table, iov_parameters=iov_table, influential_individuals=infinds, covariate_effects=scm_table, univariate_sum=univariate_sum, residual_error=resmodtab, ) return res
[docs]def outliers(simeval_res, cdd_res): dofv_tab = pd.DataFrame( { 'section': ['outliers'], 'run': ['1'], 'dofv': [np.nan], 'df': [np.nan], } ) if simeval_res is None or cdd_res is None: return None, dofv_tab iofv = simeval_res.iofv_summary outliers = list(iofv.loc[iofv['residual_outlier']].index) cases = cdd_res.case_results.copy() cases['skipped_individuals'] = cases['skipped_individuals'].transform(lambda x: int(x[0])) cases.reset_index(inplace=True) cases.set_index('skipped_individuals', inplace=True) dofv = cases.loc[outliers].delta_ofv top_three = dofv.sort_values(ascending=False)[:3] dofv_tab = pd.DataFrame( { 'section': ['outliers'] * len(top_three), 'run': list(top_three.index), 'dofv': top_three, 'df': [np.nan] * len(top_three), } ) return None, dofv_tab
[docs]def influential_individuals(cdd_res): dofv_tab = pd.DataFrame( { 'section': ['influential_individuals'], 'run': ['1'], 'dofv': [np.nan], 'df': [np.nan], } ) if cdd_res is None: return None, dofv_tab df = cdd_res.case_results df = df.loc[df['delta_ofv'] > 3.84] skipped = [e[0] for e in df['skipped_individuals']] influentials = pd.DataFrame({'delta_ofv': df['delta_ofv']}, index=skipped) influentials.index.name = 'ID' top_three = influentials.sort_values(by=['delta_ofv']).iloc[:3] dofv_tab = pd.DataFrame( { 'section': ['influential_individuals'] * len(top_three), 'run': list(top_three.index), 'dofv': top_three['delta_ofv'], 'df': [np.nan] * len(top_three), } ) return influentials, dofv_tab
[docs]def resmod(res): dofv_tab = pd.DataFrame( { 'section': ['residual_error_model'], 'run': [np.nan], 'dvid': [np.nan], 'dofv': [np.nan], 'df': [np.nan], } ) if res is None: return None, dofv_tab df = res.models.copy() df = df.droplevel(0) df.drop('sum', level='DVID', errors='ignore', inplace=True) df['dOFV'] = -df['dOFV'] df.drop( ['idv_varying_RUV', 'idv_varying_combined', 'idv_varying_theta'], level='model', inplace=True, ) # Select the best idv_varying cut for each DVID remaining_timevar = [] for dvid in df.index.unique(level='DVID'): subdf = df.loc[dvid] idvvar = subdf.index.str.startswith("idv_varying") best_timevar_idx = subdf.loc[idvvar]['dOFV'].idxmax() for name in subdf.loc[idvvar].index: if name != best_timevar_idx: df.drop((dvid, name), inplace=True) remaining_timevar.append(best_timevar_idx) for ind in remaining_timevar: df.rename(index={ind: 'time_varying'}, level='model', inplace=True) df = df.groupby(level='DVID', sort=False).apply( lambda x: x.sort_values(['dOFV'], ascending=False) ) df = df.droplevel(0) # FIXME: Why was an extra DVID level added? n = df['parameters'].apply(lambda x: len(x)) df.insert(1, 'additional_parameters', n) for dvid in df.index.unique(level='DVID'): df.loc[(dvid, 'time_varying'), 'additional_parameters'] = 2 inds = list(df.index[:2]) dvid2 = [dvid for dvid, _ in inds] run2 = [run for _, run in inds] dofv_tab = pd.DataFrame( { 'section': ['residual_error_model'] * 2, 'run': run2, 'dvid': dvid2, 'dofv': list(df['dOFV'].iloc[:2]), 'df': list(df['additional_parameters'].iloc[:2]), } ) return df, dofv_tab
[docs]def calc_iov(original_model, iov_model): dofv_tab = pd.DataFrame( { 'section': ['parameter_variability'], 'run': ['iov'], 'dvid': [np.nan], 'dofv': [np.nan], 'df': [np.nan], } ) if iov_model is None: return None, dofv_tab iov_res = iov_model.modelfit_results if iov_res is None: return None, dofv_tab origres = original_model.modelfit_results iov_params = iov_model.random_variables.iov.variance_parameters iov_sds = [iov_res.parameter_estimates_sdcorr[param] for param in iov_params] iiv_params = iov_model.random_variables.iiv.variance_parameters iiv_params = [param for param in iiv_params if not original_model.parameters[param].fix] new_iiv_sds = [iov_res.parameter_estimates_sdcorr[param] for param in iiv_params] old_iiv_sds = [origres.parameter_estimates_sdcorr[param] for param in iiv_params] etas = [] for rvs, dist in original_model.random_variables.iiv.distributions(): if not set(iiv_params).isdisjoint({s.name for s in dist.free_symbols}): etas.extend(rvs) etas = [eta.name for eta in etas] table = pd.DataFrame( {'new_iiv_sd': new_iiv_sds, 'orig_iiv_sd': old_iiv_sds, 'iov_sd': iov_sds}, index=etas ) dofv = origres.ofv - iov_res.ofv dofv_tab = pd.DataFrame( { 'section': ['parameter_variability'], 'run': ['iov'], 'dvid': [np.nan], 'dofv': [dofv], 'df': [len(iov_params)], } ) return table, dofv_tab
[docs]def calc_add_etas(original_model, add_etas_model, etas_added_to): dofv_tab = pd.DataFrame( { 'section': ['parameter_variability'], 'run': ['add_etas'], 'dvid': [np.nan], 'dofv': [np.nan], 'df': [np.nan], } ) if add_etas_model is None: return None, dofv_tab add_etas_res = add_etas_model.modelfit_results if add_etas_res is None: return None, dofv_tab origres = original_model.modelfit_results original_etas = [rv.name for rv in original_model.random_variables.etas] all_etas = original_etas + etas_added_to added = [True] * len(original_etas) + [False] * len(etas_added_to) params = add_etas_model.random_variables.etas.variance_parameters params = [pharmpy.symbols.symbol(p) for p in params] orig_params = original_model.random_variables.etas.variance_parameters orig_params = [pharmpy.symbols.symbol(p) for p in orig_params] add_etas_sds = [add_etas_res.parameter_estimates_sdcorr[p.name] for p in params] orig_sds = [origres.parameter_estimates_sdcorr[p.name] for p in orig_params] orig_sds += [np.nan] * len(etas_added_to) table = pd.DataFrame( {'added': added, 'new_sd': add_etas_sds, 'orig_sd': orig_sds}, index=all_etas ) dofv = origres.ofv - add_etas_res.ofv dofv_tab = pd.DataFrame( { 'section': ['parameter_variability'], 'run': ['add_etas'], 'dvid': [np.nan], 'dofv': [dofv], 'df': [len(etas_added_to)], } ) return table, dofv_tab
[docs]def calc_scm_dofv(scm_results): dofv_tab = pd.DataFrame( { 'section': ['covariates'], 'run': ['scm'], 'dvid': [np.nan], 'dofv': [np.nan], 'df': [np.nan], } ) if scm_results is None: return None, None, dofv_tab table = scm_results.steps.copy() table.index = table.index.droplevel(3) table.index = table.index.droplevel(0) univariate_sum = table['ofv_drop'].sum() top = table.sort_values(by=['ofv_drop']).iloc[-1:] table['coeff'] = [list(coveff.values())[0] for coveff in table['covariate_effects']] table = table[['ofv_drop', 'coeff']] table.columns = ['dofv', 'coeff'] table.rename(mapper=lambda name: f'ETA({name[2:]})', level=0, inplace=True) dofv_tab = pd.DataFrame( { 'section': ['covariates'], 'run': top['model'].values, 'dvid': [np.nan], 'dofv': top['ofv_drop'].values, 'df': top['delta_df'].values, } ) return univariate_sum, table, dofv_tab
[docs]def calc_frem_dofv(base_model, fullblock_model, frem_results): """Calculate the dOFV for the frem model""" dofv_tab = pd.DataFrame( { 'section': ['covariates'], 'run': ['frem'], 'dvid': [np.nan], 'dofv': [np.nan], 'df': [np.nan], } ) if base_model is None or frem_results is None: return dofv_tab baseres = base_model.modelfit_results if baseres is not None: base_ofv = baseres.ofv else: return dofv_tab if fullblock_model is not None: fullres = fullblock_model.modelfit_results if fullres is not None: full_ofv = fullres.ofv else: return dofv_tab else: full_ofv = 0 model2_ofv = frem_results.ofv['ofv']['model_2'] model4_ofv = frem_results.ofv['ofv']['model_4'] dofv = model2_ofv - model4_ofv - (base_ofv - full_ofv) npar = len(frem_results.covariate_effects.index.get_level_values('parameter').unique()) ncov = len(frem_results.covariate_effects.index.get_level_values('covariate').unique()) dofv_tab = pd.DataFrame( { 'section': ['covariates'], 'run': ['frem'], 'dvid': [np.nan], 'dofv': [dofv], 'df': [npar * ncov], } ) return dofv_tab
[docs]def calc_transformed_etas(original_model, new_model, transform_name, parameter_name): """Retrieve new and old parameters of boxcox""" dofv_tab = pd.DataFrame( { 'section': ['parameter_variability'], 'run': [transform_name], 'dvid': [np.nan], 'dofv': [np.nan], 'df': [np.nan], } ) if new_model is None: return None, dofv_tab origres = original_model.modelfit_results newres = new_model.modelfit_results if newres is None: return None, dofv_tab params = new_model.random_variables.etas.variance_parameters params = [pharmpy.symbols.symbol(p) for p in params] boxcox_sds = [newres.parameter_estimates_sdcorr[p.name] for p in params] orig_sds = [origres.parameter_estimates_sdcorr[p.name] for p in params] thetas = newres.parameter_estimates_sdcorr[0 : len(params)] eta_names = [rv.name for rv in new_model.random_variables.etas] table = pd.DataFrame( {parameter_name: thetas.values, 'new_sd': boxcox_sds, 'old_sd': orig_sds}, index=eta_names ) dofv = origres.ofv - newres.ofv dofv_tab = pd.DataFrame( { 'section': ['parameter_variability'], 'run': [transform_name], 'dofv': [dofv], 'df': [len(eta_names)], } ) return table, dofv_tab
[docs]def calc_fullblock(original_model, fullblock_model): """Retrieve new and old parameters of full block""" dofv_tab = pd.DataFrame( { 'section': ['parameter_variability'], 'run': ['fullblock'], 'dvid': [np.nan], 'dofv': [np.nan], 'df': [np.nan], } ) if fullblock_model is None: return None, dofv_tab origres = original_model.modelfit_results fullres = fullblock_model.modelfit_results if fullres is None: return None, dofv_tab _, dist = fullblock_model.random_variables.iiv.distributions()[0] fullblock_parameters = [str(symb) for symb in dist.free_symbols] new_params = ( fullres.parameter_estimates_sdcorr[fullblock_parameters] .reindex(index=fullres.parameter_estimates_sdcorr.index) .dropna() ) old_params = origres.parameter_estimates_sdcorr table = pd.DataFrame({'new': new_params, 'old': old_params}).reindex(index=new_params.index) degrees = table['old'].isna().sum() dofv = origres.ofv - fullres.ofv dofv_tab = pd.DataFrame( { 'section': ['parameter_variability'], 'run': ['fullblock'], 'dvid': [np.nan], 'dofv': [dofv], 'df': [degrees], } ) return table, dofv_tab
[docs]def read_results_summary(path): summary_path = path / 'results_summary.yaml' if not summary_path.is_file(): return with open(summary_path, 'r') as fh: summary = yaml.safe_load(fh) dfs = [] for idvtab in summary['structural']: idv = idvtab['idv'] dvid = idvtab['dvid'] if dvid == 'NA': dvid = 1 cwres = [float(num) for num in idvtab['second_table']['CWRES']] cpred = [int(num) for num in idvtab['second_table']['%CPRED']] binmin = idvtab['table']['bin_min'] binmax = idvtab['table']['bin_max'] df = pd.DataFrame( { 'idv': idv, 'dvid': dvid, 'binid': list(range(1, len(cpred) + 1)), 'binmin': binmin, 'binmax': binmax, 'cwres': cwres, 'cpred': cpred, } ) dfs.append(df) bias = pd.concat(dfs) bias.set_index(['idv', 'dvid', 'binid'], inplace=True) return bias
[docs]def psn_qa_results(path): """Create qa results from a PsN qa run :param path: Path to PsN qa run directory :return: A :class:`QAResults` object """ path = Path(path) original_model = Model(path / 'linearize_run' / 'scm_dir1' / 'derivatives.mod') base_path = list(path.glob('*_linbase.mod'))[0] base_model = Model(base_path) fullblock_path = path / 'modelfit_run' / 'fullblock.mod' if fullblock_path.is_file(): fullblock_model = Model(fullblock_path) else: fullblock_model = None boxcox_path = path / 'modelfit_run' / 'boxcox.mod' if boxcox_path.is_file(): boxcox_model = Model(boxcox_path) else: boxcox_model = None tdist_path = path / 'modelfit_run' / 'tdist.mod' if tdist_path.is_file(): tdist_model = Model(tdist_path) else: tdist_model = None addetas_path = path / 'add_etas_run' / 'add_etas_linbase.mod' if addetas_path.is_file(): addetas_model = Model(addetas_path) else: addetas_model = None iov_path = path / 'modelfit_run' / 'iov.mod' if iov_path.is_file(): iov_model = Model(iov_path) else: iov_model = None frem_path = path / 'frem_run' / 'results.json' if frem_path.is_file(): frem_res = read_results(frem_path) else: frem_res = None cdd_path = path / 'cdd_run' / 'results.json' if cdd_path.is_file(): cdd_res = read_results(cdd_path) else: cdd_res = None scm_path = path / 'scm_run' / 'results.json' if scm_path.is_file(): scm_res = read_results(scm_path) else: scm_res = None simeval_path = path / 'simeval_run' / 'results.json' if simeval_path.is_file(): simeval_res = read_results(simeval_path) else: simeval_res = None args = psn_helpers.options_from_command(psn_helpers.psn_command(path)) if 'add_etas' not in args: etas_added_to = None else: etas_added_to = args['add_etas'].split(',') idv = args.get('resmod_idv', 'TIME') resmod_idv_path = path / f'resmod_{idv}' / 'results.json' if resmod_idv_path.is_file(): resmod_idv_res = read_results(resmod_idv_path) else: resmod_idv_res = None res = calculate_results( original_model, base_model, fullblock_model=fullblock_model, boxcox_model=boxcox_model, tdist_model=tdist_model, add_etas_model=addetas_model, iov_model=iov_model, etas_added_to=etas_added_to, frem_results=frem_res, cdd_results=cdd_res, scm_results=scm_res, simeval_results=simeval_res, resmod_idv_results=resmod_idv_res, ) bias = read_results_summary(path) res.structural_bias = bias return res