Source code for pharmpy.methods.frem.results

import itertools
from pathlib import Path

import altair as alt
import numpy as np
import pandas as pd
import symengine

import pharmpy.symbols as symbols
from pharmpy import Model
from pharmpy.data import ColumnType
from pharmpy.math import conditional_joint_normal, is_posdef
from pharmpy.parameter_sampling import sample_from_covariance_matrix, sample_individual_estimates
from pharmpy.results import Results


[docs]class FREMResults(Results): """FREM Results class What follows is a description on how the FREM results are stored. See :py:mod:`pharmpy.methods.frem` for a description of how the results are calculated. .. attribute:: covariate_baselines DataFrame with the baseline covariate values used in the analysis for each indiviual. Index is ID. One column for each covariate. .. code-block:: WGT APGR ID 1.0 1.4 7.0 2.0 1.5 9.0 3.0 1.5 6.0 4.0 0.9 6.0 5.0 1.4 7.0 6.0 1.2 5.0 7.0 1.0 5.0 8.0 1.2 7.0 9.0 1.4 8.0 ... ... ... .. attribute:: covariate_effects DataFrame with covariate effects. Index is parameter, covariate and condition where condition can be either 5th or 95th. Columns are p5, mean and p95 for the 5th percentile of the effect, the estimated effect and the 95th percentile of the effect respectively. Effect sizes are in fractions. Example: .. code-block:: p5 mean p95 parameter covariate condition ETA(1) WGT 5th 0.901869 0.972920 1.051093 95th 0.903849 1.064605 1.233111 APGR 5th 1.084238 1.248766 1.413923 95th 0.848297 0.901919 0.962312 ETA(2) WGT 5th 0.942450 1.004400 1.068390 95th 0.874409 0.995763 1.127777 APGR 5th 0.832008 0.924457 1.021307 95th 0.990035 1.039444 1.091288 .. attribute:: covariate_effects_plot Plot of the covariate effects generated by :py:meth:`plot_covariate_effects` .. attribute:: covariate_statistics DataFrame with summary statistics of the covariate baselines. Index is covariates. Columns are p5 with 95th percentile, mean, p95 with 95th percentile, stdev, ref with the used reference value, categorical and other with the non-reference value for categorical covariates. Example: .. code-block p5 mean p95 stdev ref categorical other covariate WGT 0.7 1.525424 3.2 0.704565 1.525424 False NaN APGR 1.0 6.423729 9.0 2.237636 6.423729 False NaN .. attribute:: individual_effects DataFrame with individual covariate effects. Index is ID and parameter. Columns are observed, p5 and p95 for the observed individual effect, 5th and 95th percentiles of the estimated individual effects respectively. Example: .. code-block:: observed p5 p95 ID parameter 1.0 ETA(1) 0.973309 0.946282 0.982391 ETA(2) 1.009411 0.995276 1.024539 2.0 ETA(1) 0.911492 0.832168 0.941760 ETA(2) 1.036200 0.990081 1.091438 3.0 ETA(1) 1.013772 1.007555 1.028463 ... ... ... ... 57.0 ETA(2) 0.987500 0.942709 1.031111 58.0 ETA(1) 0.939409 0.883782 0.956792 ETA(2) 1.023321 0.993543 1.057408 59.0 ETA(1) 0.992578 0.952785 1.027261 ETA(2) 0.999220 0.968931 1.033819 .. attribute:: individual_effects_plot Plot of the individual effects generated by :py:meth:`plot_individual_effects` .. attribute:: unexplained_variability DataFrame with remaining unexplained variability. Index is parameter and covariate. Covariate is none for no covariates and all for all covariates. Example: .. code-block:: sd_observed sd_5th sd_95th parameter covariate ETA(1) none 0.195327 0.221382 0.298465 WGT 0.194527 0.218385 0.292261 APGR 0.182497 0.202192 0.279766 all 0.178956 0.197282 0.268090 ETA(2) none 0.158316 0.152510 0.210044 WGT 0.158313 0.148041 0.207276 APGR 0.155757 0.149037 0.203477 all 0.155551 0.144767 0.201658 .. attribute:: unexplained_variability_plot Plot of the unexplained variability generated by :py:meth:`plot_unexplained_variability` .. attribute:: parameter_inits_and_estimates Initial parameter estimates and estimates after fitting for all intermediate models and the final model. .. attribute:: base_parameter_change The relative change in parameter estimates from base model to the FREM model. .. attribute:: covariate_estimates Model estimates of covariate statistics .. attribute:: parameter_variability Conditioned parameter variability .. attribute:: coefficients Parameter covariate coefficients. Calculated one at a time or all together. """ rst_path = Path(__file__).parent / 'report.rst' def __init__( self, coefficients=None, parameter_variability=None, covariate_effects=None, individual_effects=None, unexplained_variability=None, covariate_statistics=None, covariate_effects_plot=None, individual_effects_plot=None, unexplained_variability_plot=None, covariate_baselines=None, parameter_inits_and_estimates=None, base_parameter_change=None, estimated_covariates=None, ofv=None, ): # FIXME: Lots of boilerplate code ahead. Could be simplified with python 3.7 dataclass # or namedtuple self.coefficients = coefficients self.parameter_variability = parameter_variability self.covariate_effects = covariate_effects self.individual_effects = individual_effects self.unexplained_variability = unexplained_variability self.covariate_statistics = covariate_statistics self.covariate_effects_plot = covariate_effects_plot self.individual_effects_plot = individual_effects_plot self.unexplained_variability_plot = unexplained_variability_plot self.covariate_baselines = covariate_baselines self.parameter_inits_and_estimates = parameter_inits_and_estimates self.base_parameter_change = base_parameter_change self.ofv = ofv self.estimated_covariates = estimated_covariates
[docs] def add_plots(self): self.covariate_effects_plot = self.plot_covariate_effects() self.individual_effects_plot = self.plot_individual_effects() self.unexplained_variability_plot = self.plot_unexplained_variability()
[docs] def plot_covariate_effects(self): """Plot covariate effects""" ce = (self.covariate_effects - 1) * 100 cov_stats = pd.melt( self.covariate_statistics.reset_index(), var_name='condition', id_vars=['covariate'], value_vars=['p5', 'p95', 'other'], ) cov_stats = cov_stats.replace({'p5': '5th', 'p95': '95th'}).set_index( ['covariate', 'condition'] ) ce = ce.join(cov_stats, how='inner') # The left join reorders the index, pandas bug #34133 ce = ce.reorder_levels(['parameter', 'covariate', 'condition']) param_names = list(ce.index.get_level_values('parameter').unique()) plots = [] for parameter in param_names: df = ce.xs(parameter, level=0) df = df.reset_index() error_bars = ( alt.Chart(df) .mark_errorbar(ticks=True) .encode( x=alt.X('p5:Q', title='Effect size in percent', scale=alt.Scale(zero=False)), x2=alt.X2('p95:Q'), y=alt.Y('condition:N', title=None), ) ) rule = ( alt.Chart(df) .mark_rule(strokeDash=[10, 4], color='gray') .encode(x=alt.X('xzero:Q')) .transform_calculate(xzero="0") ) points = ( alt.Chart(df) .mark_point(filled=True, color='black') .encode( x=alt.X('mean:Q'), y=alt.Y('condition:N'), ) ) text = ( alt.Chart(df) .mark_text(dy=-15, color="red") .encode(x=alt.X("mean:Q"), y=alt.Y("condition:N"), text=alt.Text("value:Q")) ) plot = ( alt.layer(error_bars, rule, points, text, data=df, width=700, height=100) .facet(columns=1.0, row=alt.Facet('covariate:N', title=None), title=f'{parameter}') .resolve_scale(y='independent') ) plots.append(plot) v = alt.vconcat(*plots).resolve_scale(x='shared') return v
[docs] def plot_individual_effects(self): """Plot individual effects""" covs = self.covariate_baselines ie = self.individual_effects.join(covs) param_names = list(ie.index.get_level_values('parameter').unique()) ie = (ie - 1) * 100 ie = ie.sort_values(by=['observed']) plots = [] for parameter in param_names: df = ie.xs(parameter, level=1) id_order = list(df.index) id_order = [str(int(x)) for x in id_order] if len(df) > 20: id_order[10] = '...' df = df.reset_index() df['ID'] = df['ID'].astype(int).astype(str) error_bars = ( alt.Chart(df) .mark_errorbar(ticks=True) .encode( x=alt.X('p5:Q', title='Effect size in percent', scale=alt.Scale(zero=False)), x2=alt.X2('p95:Q'), y=alt.Y('ID:N', title='ID', sort=id_order), tooltip=['ID', 'p5', 'observed', 'p95'] + list(covs.columns), ) ) rule = ( alt.Chart(df) .mark_rule(strokeDash=[10, 2], color='gray') .encode(x=alt.X('xzero:Q')) .transform_calculate(xzero="0") ) points = ( alt.Chart(df) .mark_point(size=40, filled=True, color='black') .encode( x=alt.X('observed:Q'), y=alt.Y('ID:N', sort=id_order), ) ) plot = alt.layer( points, error_bars, rule, data=df, width=700, title=f'Individuals for parameter {parameter}', ) if len(df) > 20: plot = ( plot.transform_window( sort=[alt.SortField('observed', order='ascending')], rank='row_number(observed)', ) .transform_window( sort=[alt.SortField('observed', order='descending')], nrank='row_number(observed)', ) .transform_filter('datum.rank <= 10 | datum.nrank <= 11') .transform_calculate( ID="datum.nrank == 11 ? '...' : datum.ID", p5="datum.nrank == 11 ? '...' : datum.p5", p95="datum.nrank == 11 ? '...' : datum.p95", observed="datum.nrank == 11 ? '...' : datum.observed", ) ) plots.append(plot) v = alt.vconcat(*plots).resolve_scale(x='shared') return v
[docs] def plot_unexplained_variability(self): """Plot unexplained variability""" uv = self.unexplained_variability param_names = list(uv.index.get_level_values('parameter').unique()) cov_order = list(uv.index.get_level_values('covariate').unique()) plots = [] for parameter in param_names: df = uv.xs(parameter, level=0) df = df.reset_index() error_bars = ( alt.Chart(df) .mark_errorbar(ticks=True) .encode( x=alt.X( 'sd_5th:Q', title='SD of unexplained variability', scale=alt.Scale(zero=False), ), x2=alt.X2('sd_95th:Q'), y=alt.Y('covariate:N', title='covariate', sort=cov_order), tooltip=['sd_5th', 'sd_observed', 'sd_95th', 'covariate'], ) ) rule = ( alt.Chart(df) .mark_rule(strokeDash=[10, 2], color='gray') .encode(x=alt.X('xzero:Q')) .transform_calculate(xzero="0") ) points = ( alt.Chart(df) .mark_point(size=40, filled=True, color='black') .encode( x=alt.X('sd_observed:Q'), y=alt.Y('covariate:N', sort=cov_order), ) ) plot = alt.layer( points, error_bars, rule, data=df, width=700, title=f'Unexplained variability on {parameter}', ) plots.append(plot) v = alt.vconcat(*plots).resolve_scale(x='shared') return v
[docs]def calculate_results( frem_model, continuous, categorical, method=None, intermediate_models=None, seed=None, **kwargs ): """Calculate FREM results :param method: Either 'cov_sampling' or 'bipp' """ if intermediate_models is None: intermediate_models = [] if method is None or method == 'cov_sampling': try: res = calculate_results_using_cov_sampling( frem_model, continuous, categorical, seed=seed, **kwargs ) except AttributeError: # Fallback to bipp res = calculate_results_using_bipp(frem_model, continuous, categorical, seed=seed) elif method == 'bipp': res = calculate_results_using_bipp(frem_model, continuous, categorical, seed=seed) else: raise ValueError(f'Unknown frem postprocessing method {method}') mod_names = [] mod_ofvs = [] if intermediate_models: for intmod in intermediate_models: intmod_res = intmod.modelfit_results if intmod_res is not None: mod_ofvs.append(intmod_res.ofv) mod_names.append(intmod.name) mod_ofvs.append(frem_model.modelfit_results.ofv) mod_names.append(frem_model.name) ofv = pd.DataFrame({'ofv': mod_ofvs}, index=mod_names) ofv.index.name = 'model_name' res.ofv = ofv add_parameter_inits_and_estimates(res, frem_model, intermediate_models) if intermediate_models: add_base_vs_frem_model(res, frem_model, intermediate_models[0]) estimated_covbase = _calculate_covariate_baselines(frem_model, continuous + categorical) mean = estimated_covbase.mean() stdev = estimated_covbase.std() estcovs = pd.DataFrame({'mean': mean, 'stdev': stdev}) res.estimated_covariates = estcovs return res
[docs]def add_base_vs_frem_model(res, frem_model, model_1): base_ests = model_1.modelfit_results.parameter_estimates final_ests = frem_model.modelfit_results.parameter_estimates ser = pd.Series(dtype=np.float64) for param in base_ests.keys(): ser[param] = (final_ests[param] - base_ests[param]) / abs(base_ests[param]) * 100 ser.name = 'relative_change' res.base_parameter_change = ser
[docs]def add_parameter_inits_and_estimates(res, frem_model, intermediate_models): model_names = [] df = pd.DataFrame() for model in intermediate_models: try: df = df.append(model.parameters.nonfixed_inits, ignore_index=True) except AttributeError: df = df.append(np.NaN) try: df = df.append(model.modelfit_results.parameter_estimates, ignore_index=True) except AttributeError: df = df.append(np.NaN) model_names.append(model.name) try: df = df.append(frem_model.parameters.nonfixed_inits, ignore_index=True) df = df.reindex(columns=frem_model.parameters.nonfixed_inits.keys()) except AttributeError: df = df.append(np.NaN) try: df = df.append(frem_model.modelfit_results.parameter_estimates, ignore_index=True) except AttributeError: df = df.append(np.NaN) model_names.append(frem_model.name) index = pd.MultiIndex.from_product([model_names, ['init', 'estimate']], names=['model', 'type']) df.index = index res.parameter_inits_and_estimates = df
[docs]def calculate_results_using_cov_sampling( frem_model, continuous, categorical, cov_model=None, force_posdef_samples=500, force_posdef_covmatrix=False, samples=1000, rescale=True, seed=None, ): """Calculate the FREM results using covariance matrix for uncertainty :param cov_model: Take the parameter uncertainty covariance matrix from this model instead of the frem model. :param force_posdef_samples: The number of sampling tries before stopping to use rejection sampling and instead starting to shift values so that the frem matrix becomes positive definite. Set to 0 to always force positive definiteness. :param force_posdef_covmatrix: Set to force the covariance matrix of the frem movdel or the cov model to be positive definite. Default is to raise in this case. :param samples: The number of parameter vector samples to use. """ if cov_model is not None: uncertainty_results = cov_model.modelfit_results else: uncertainty_results = frem_model.modelfit_results _, dist = frem_model.random_variables.iiv.distributions()[-1] sigma_symb = dist.sigma parameters = [ s for s in frem_model.modelfit_results.parameter_estimates.index if symbols.symbol(s) in sigma_symb.free_symbols ] parvecs = sample_from_covariance_matrix( frem_model, modelfit_results=uncertainty_results, force_posdef_samples=force_posdef_samples, force_posdef_covmatrix=force_posdef_covmatrix, parameters=parameters, n=samples, seed=seed, ) res = calculate_results_from_samples( frem_model, continuous, categorical, parvecs, rescale=rescale ) return res
[docs]def calculate_results_from_samples(frem_model, continuous, categorical, parvecs, rescale=True): """Calculate the FREM results given samples of parameter estimates""" n = len(parvecs) rvs, dist = frem_model.random_variables.iiv.distributions()[-1] sigma_symb = dist.sigma parameters = [ s for s in frem_model.modelfit_results.parameter_estimates.index if symbols.symbol(s) in sigma_symb.free_symbols ] parvecs = parvecs.append(frem_model.modelfit_results.parameter_estimates.loc[parameters]) df = frem_model.dataset covariates = continuous + categorical df.pharmpy.column_type[covariates] = ColumnType.COVARIATE covariate_baselines = df.pharmpy.covariate_baselines covariate_baselines = covariate_baselines[covariates] cov_means = covariate_baselines.mean() cov_modes = covariate_baselines.mode().iloc[0] # Select first mode if more than one cov_others = pd.Series(index=cov_modes[categorical].index, dtype=np.float64) cov_stdevs = covariate_baselines.std() for _, row in covariate_baselines.iterrows(): for cov in categorical: if row[cov] != cov_modes[cov]: cov_others[cov] = row[cov] if not cov_others.isna().values.any(): break cov_refs = pd.concat((cov_means[continuous], cov_modes[categorical])) cov_5th = covariate_baselines.quantile(0.05, interpolation='lower') cov_95th = covariate_baselines.quantile(0.95, interpolation='higher') is_categorical = cov_refs.index.isin(categorical) res = FREMResults() res.covariate_statistics = pd.DataFrame( { 'p5': cov_5th, 'mean': cov_means, 'p95': cov_95th, 'stdev': cov_stdevs, 'ref': cov_refs, 'categorical': is_categorical, 'other': cov_others, }, index=covariates, ) res.covariate_statistics.index.name = 'covariate' ncovs = len(covariates) npars = sigma_symb.rows - ncovs param_names = get_params(frem_model, rvs, npars) nids = len(covariate_baselines) param_indices = list(range(npars)) if rescale: scaling = np.diag(np.concatenate((np.ones(npars), cov_stdevs.values))) mu_bars_given_5th = np.empty((n, ncovs, npars)) mu_bars_given_95th = np.empty((n, ncovs, npars)) mu_id_bars = np.empty((n, nids, npars)) original_id_bar = np.empty((nids, npars)) variability = np.empty((n, ncovs + 2, npars)) # none, cov1, cov2, ..., all original_variability = np.empty((ncovs + 2, npars)) coefficients_index = pd.MultiIndex.from_product( [['all', 'each'], param_names], names=['condition', 'parameter'] ) coefficients = pd.DataFrame(index=coefficients_index, columns=covariates, dtype=np.float64) parameter_variability = [] # Switch to symengine for speed # Could also assume order of parameters, but not much gain sigma_symb = symengine.sympify(sigma_symb) parvecs.columns = [symengine.Symbol(colname) for colname in parvecs.columns] estimated_covbase = _calculate_covariate_baselines(frem_model, covariates) covbase = estimated_covbase.to_numpy() for sample_no, params in parvecs.iterrows(): sigma = sigma_symb.subs(dict(params)) sigma = np.array(sigma).astype(np.float64) if rescale: sigma = scaling @ sigma @ scaling if sample_no != 'estimates': variability[sample_no, 0, :] = np.diag(sigma)[:npars] else: original_variability[0, :] = np.diag(sigma)[:npars] # Coefficients conditioned on all parameters # Sigma_12 * Sigma_22^-1 coeffs_all = sigma[0:npars, npars:] @ np.linalg.inv(sigma[npars:, npars:]) coefficients.loc['all'] = coeffs_all for i, cov in enumerate(covariates): indices = param_indices + [i + npars] cov_sigma = sigma[indices][:, indices] cov_mu = np.array([0] * npars + [cov_refs[cov]]) if cov in categorical: first_reference = cov_others[cov] else: first_reference = cov_5th[cov] mu_bar_given_5th_cov, sigma_bar = conditional_joint_normal( cov_mu, cov_sigma, np.array([first_reference]) ) if sample_no != 'estimates': mu_bar_given_95th_cov, _ = conditional_joint_normal( cov_mu, cov_sigma, np.array([cov_95th[cov]]) ) mu_bars_given_5th[sample_no, i, :] = mu_bar_given_5th_cov mu_bars_given_95th[sample_no, i, :] = mu_bar_given_95th_cov variability[sample_no, i + 1, :] = np.diag(sigma_bar) else: original_variability[i + 1, :] = np.diag(sigma_bar) parameter_variability.append(sigma_bar) # Calculate coefficients # Cov(Par, covariate) / Var(covariate) for parind, parname in enumerate(param_names): coefficients[cov]['each', parname] = cov_sigma[parind][-1] / cov_sigma[-1][-1] for i in range(len(estimated_covbase)): row = covbase[i, :] id_mu = np.array([0] * npars + list(cov_refs)) mu_id_bar, sigma_id_bar = conditional_joint_normal(id_mu, sigma, row) if sample_no != 'estimates': mu_id_bars[sample_no, i, :] = mu_id_bar variability[sample_no, -1, :] = np.diag(sigma_id_bar) else: original_id_bar[i, :] = mu_id_bar original_variability[ncovs + 1, :] = np.diag(sigma_id_bar) parameter_variability_all = sigma_id_bar res.coefficients = coefficients # Create covariate effects table mu_bars_given_5th = np.exp(mu_bars_given_5th) mu_bars_given_95th = np.exp(mu_bars_given_95th) means_5th = np.mean(mu_bars_given_5th, axis=0) means_95th = np.mean(mu_bars_given_95th, axis=0) q5_5th = np.quantile(mu_bars_given_5th, 0.05, axis=0) q5_95th = np.quantile(mu_bars_given_95th, 0.05, axis=0) q95_5th = np.quantile(mu_bars_given_5th, 0.95, axis=0) q95_95th = np.quantile(mu_bars_given_95th, 0.95, axis=0) df = pd.DataFrame(columns=['parameter', 'covariate', 'condition', 'p5', 'mean', 'p95']) for param, cov in itertools.product(range(npars), range(ncovs)): if covariates[cov] in categorical: df = df.append( { 'parameter': param_names[param], 'covariate': covariates[cov], 'condition': 'other', 'p5': q5_5th[cov, param], 'mean': means_5th[cov, param], 'p95': q95_5th[cov, param], }, ignore_index=True, ) else: df = df.append( { 'parameter': param_names[param], 'covariate': covariates[cov], 'condition': '5th', 'p5': q5_5th[cov, param], 'mean': means_5th[cov, param], 'p95': q95_5th[cov, param], }, ignore_index=True, ) df = df.append( { 'parameter': param_names[param], 'covariate': covariates[cov], 'condition': '95th', 'p5': q5_95th[cov, param], 'mean': means_95th[cov, param], 'p95': q95_95th[cov, param], }, ignore_index=True, ) df.set_index(['parameter', 'covariate', 'condition'], inplace=True) res.covariate_effects = df # Create id table mu_id_bars = np.exp(mu_id_bars) original_id_bar = np.exp(original_id_bar) with np.testing.suppress_warnings() as sup: # Would warn in case of missing covariates sup.filter(RuntimeWarning, "All-NaN slice encountered") id_5th = np.nanquantile(mu_id_bars, 0.05, axis=0) id_95th = np.nanquantile(mu_id_bars, 0.95, axis=0) df = pd.DataFrame(columns=['parameter', 'observed', 'p5', 'p95']) for curid, param in itertools.product(range(nids), range(npars)): df = df.append( pd.Series( { 'parameter': param_names[param], 'observed': original_id_bar[curid, param], 'p5': id_5th[curid, param], 'p95': id_95th[curid, param], }, name=covariate_baselines.index[curid], ) ) df.index.name = 'ID' df = df.set_index('parameter', append=True) res.individual_effects = df # Create unexplained variability table sd_5th = np.sqrt(np.nanquantile(variability, 0.05, axis=0)) sd_95th = np.sqrt(np.nanquantile(variability, 0.95, axis=0)) original_sd = np.sqrt(original_variability) df = pd.DataFrame(columns=['parameter', 'covariate', 'sd_observed', 'sd_5th', 'sd_95th']) for par, cond in itertools.product(range(npars), range(ncovs + 2)): if cond == 0: condition = 'none' elif cond == ncovs + 1: condition = 'all' else: condition = covariates[cond - 1] df = df.append( { 'parameter': param_names[par], 'covariate': condition, 'sd_observed': original_sd[cond, par], 'sd_5th': sd_5th[cond, par], 'sd_95th': sd_95th[cond, par], }, ignore_index=True, ) df = df.set_index(['parameter', 'covariate']) res.unexplained_variability = df res.covariate_baselines = covariate_baselines # Create frem_parameter_variability index = pd.MultiIndex.from_product( [['all'] + covariates, param_names], names=['condition', 'parameter'] ) df = pd.DataFrame(index=index) for i in range(len(param_names)): for j in range(len(param_names)): df.loc[('all', param_names[i]), param_names[j]] = parameter_variability_all[i][j] for k, name in enumerate(covariates): for i in range(len(param_names)): for j in range(len(param_names)): df.loc[(name, param_names[i]), param_names[j]] = parameter_variability[k][i][j] res.parameter_variability = df return res
[docs]def get_params(frem_model, rvs, npars): param_names = [rv.name for rv in rvs][:npars] sset = frem_model.statements.before_ode() symbs = [] for p in param_names: s = sset.find_assignment(p, is_symbol=False) if str(s.expression) == p: s = sset.find_assignment(s.symbol.name, is_symbol=False) symbs.append(s.symbol.name) duplicates = set([e for e in symbs if symbs.count(e) > 1]) for i, s in enumerate(symbs): if s in duplicates: symbs[i] = rename_duplicate(symbs, s) return symbs
[docs]def rename_duplicate(params, stem): i = 1 while True: candidate = f'{stem}({i})' if candidate not in params: return candidate i += 1
def _calculate_covariate_baselines(model, covariates): exprs = [ ass.expression.args[0][0] for ass in model.statements if symbols.symbol('FREMTYPE') in ass.free_symbols and ass.symbol.name == 'IPRED' ] exprs = [ expr.subs(dict(model.modelfit_results.parameter_estimates)).subs(model.parameters.inits) for expr in exprs ] new = [] for expr in exprs: for symb in expr.free_symbols: stat = model.statements.find_assignment(symb.name) if stat is not None: expr = expr.subs(symb, stat.expression) new.append(expr) exprs = new def fn(row): return [np.float64(expr.subs(dict(row))) for expr in exprs] df = model.modelfit_results.individual_estimates.apply(fn, axis=1, result_type='expand') df.columns = covariates return df
[docs]def calculate_results_using_bipp( frem_model, continuous, categorical, rescale=True, samples=2000, seed=None ): """Estimate a covariance matrix for the frem model using the BIPP method Bootstrap on the individual parameter posteriors Only the individual estimates, individual unvertainties and the parameter estimates are needed. """ if seed is None or isinstance(seed, int): seed = np.random.default_rng(seed) rvs, dist = frem_model.random_variables.iiv.distributions()[-1] etas = [rv.name for rv in rvs] pool = sample_individual_estimates(frem_model, parameters=etas, seed=seed) ninds = len(pool.index.unique()) ishr = frem_model.modelfit_results.individual_shrinkage lower_indices = np.tril_indices(len(etas)) pop_params = np.array(dist.sigma).astype(str)[lower_indices] parameter_samples = np.empty((samples, len(pop_params))) remaining_samples = samples k = 0 while k < remaining_samples: bootstrap = pool.sample(n=ninds, replace=True, random_state=seed.bit_generator) ishk = ishr.loc[bootstrap.index] cf = (1 / (1 - ishk.mean())) ** (1 / 2) corrected_bootstrap = bootstrap * cf bootstrap_cov = corrected_bootstrap.cov() if not is_posdef(bootstrap_cov.to_numpy()): continue parameter_samples[k, :] = bootstrap_cov.values[lower_indices] k += 1 frame = pd.DataFrame(parameter_samples, columns=pop_params) res = calculate_results_from_samples( frem_model, continuous, categorical, frame, rescale=rescale ) return res
[docs]def psn_reorder_base_model_inits(model, path): """Reorder omega inits from base model in PsN If base model was reordered PsN writes the omega inits dict to m1/model_1.inits """ order_path = path / 'm1' / 'model_1.inits' if order_path.is_file(): with open(order_path, 'r') as fh: lines = fh.readlines() lines = lines[1:-1] replacements = dict() for line in lines: stripped = line.strip().replace(' ', '').replace("'", '').rstrip(',').replace('>', '') a = stripped.split('=') coords = a[0][6:-1] t = tuple(coords.split(',')) t = (int(t[0]), int(t[1])) try: replacements[t] = float(a[1]) except ValueError: pass def sortfunc(x): return float(x[0]) + float(x[1]) / 1000 order = sorted(replacements, key=sortfunc) values = [replacements[i] for i in order] i = 0 for p in model.parameters: if i == len(values): break if p.name in model.random_variables.all_parameters(): p.init = values[i] i += 1
[docs]def psn_frem_results(path, force_posdef_covmatrix=False, force_posdef_samples=500, method=None): """Create frem results from a PsN FREM run :param path: Path to PsN frem run directory :return: A :class:`FREMResults` object """ path = Path(path) model_4_path = path / 'final_models' / 'model_4.mod' if not model_4_path.is_file(): raise IOError(f'Could not find FREM model 4: {str(model_4_path)}') model_4 = Model(model_4_path) if model_4.modelfit_results is None: raise ValueError('Model 4 has no results') cov_model = None if method == 'cov_sampling': try: model_4.modelfit_results.covariance_matrix except Exception: model_4b_path = path / 'final_models' / 'model_4b.mod' try: model_4b = Model(model_4b_path) except FileNotFoundError: pass else: cov_model = model_4b with open(path / 'covariates_summary.csv') as covsum: covsum.readline() raw_cov_list = covsum.readline() all_covariates = raw_cov_list[1:].rstrip().split(',') # FIXME: Not introducing yaml parser in pharmpy just yet. Options should be collected # differently. Perhaps using json logtransformed_covariates = [] with open(path / 'meta.yaml') as meta: for row in meta: row = row.strip() if row.startswith('rescale: 1'): rescale = True elif row.startswith('rescale: 0'): rescale = False if row.startswith("log: ''"): logtransformed_covariates = [] elif row.startswith('log: '): logtransformed_covariates = row[5:].split(',') # add log transformed columns for the -log option. Should be done when creating dataset df = model_4.dataset if logtransformed_covariates: for lncov in logtransformed_covariates: df[f'LN{lncov}'] = np.log(df[lncov]) model_4.dataset = df nunique = model_4.dataset.pharmpy.baselines[all_covariates].nunique() continuous = list(nunique.index[nunique != 2]) categorical = list(nunique.index[nunique == 2]) intmod_names = ['model_1.mod', 'model_2.mod', 'model_3.mod', 'model_3b.mod'] intmods = [] for m in intmod_names: intmod_path = path / 'm1' / m if intmod_path.is_file(): intmod = Model(intmod_path) intmods.append(intmod) model1b = Model(path / 'm1' / 'model_1b.mod') model1 = intmods[0] model1b.modelfit_results = model1.modelfit_results model1b.modelfit_results.parameter_estimates = model1b.parameters.nonfixed_inits psn_reorder_base_model_inits(model1b, path) res = calculate_results( model_4, continuous, categorical, method=method, force_posdef_covmatrix=force_posdef_covmatrix, force_posdef_samples=force_posdef_samples, cov_model=cov_model, rescale=rescale, intermediate_models=intmods, seed=np.random.default_rng(9843), ) return res