Source code for pharmpy.methods.scm.results

import re
from io import StringIO
from pathlib import Path

import numpy as np
import pandas as pd

from pharmpy import Model
from pharmpy.methods.psn_helpers import (
    arguments_from_command,
    options_from_command,
    tool_from_command,
)
from pharmpy.results import Results


[docs]class SCMResults(Results): """SCM Results class""" rst_path = Path(__file__).parent / 'report.rst' def __init__(self, steps=None): self.steps = steps
[docs] def ofv_summary(self, final_included=True, iterations=True): return ofv_summary_dataframe( self.steps, final_included=final_included, iterations=iterations )
[docs] def candidate_summary(self): return candidate_summary_dataframe(self.steps)
[docs]def candidate_summary_dataframe(steps): if steps is None: return None elif steps['is_backward'].all(): selected = steps[steps['selected']].copy() df = pd.DataFrame( [{'BackstepRemoved': row.Index[0]} for row in selected.itertuples()], index=selected.index, ) return df.droplevel('step') else: scmplus = True if 'stashed' in steps.columns else False backstep_removed = { f'{row.Index[1]}{row.Index[2]}-{row.Index[3]}': row.Index[0] for row in steps.itertuples() if row.is_backward and row.selected } forward_steps = steps.query('step > 0 & ~is_backward') df = pd.DataFrame( [ { 'N_test': True, 'N_ok': (not np.isnan(row.ofv_drop) and row.ofv_drop >= 0), 'N_localmin': (not np.isnan(row.ofv_drop) and row.ofv_drop < 0), 'N_failed': np.isnan(row.ofv_drop), 'StepIncluded': row.Index[0] if row.selected else None, 'StepStashed': row.Index[0] if (scmplus and row.stashed) else None, 'StepReadded': row.Index[0] if (scmplus and row.readded) else None, 'BackstepRemoved': backstep_removed.pop(row.model, None), } for row in forward_steps.itertuples() ], index=forward_steps.index, ) return df.groupby(level=['parameter', 'covariate', 'extended_state']).sum(min_count=1)
[docs]def ofv_summary_dataframe(steps, final_included=True, iterations=True): if steps is None or not (final_included or iterations): return None else: if final_included and iterations and not steps['is_backward'].iloc[-1]: # Will not be able to show final_included with additional info final_included = False # Use .copy() to ensure we do not work on original df df = steps[steps['selected']].copy() if iterations else pd.DataFrame() if iterations: df['is_backward'] = [ 'Backward' if backward else 'Forward' for backward in df['is_backward'] ] if final_included: if steps['is_backward'].iloc[-1]: # all rows from last step where selected is False last_stepnum = steps.index[-1][steps.index.names.index('step')] final = steps[~steps['selected']].loc[last_stepnum, :, :, :].copy() else: # all selected rows without ofv info final = pd.DataFrame(columns=steps.columns, index=steps[steps['selected']].index) final['is_backward'] = 'Final included' df = df.append(final) df.rename(columns={'is_backward': 'direction'}, inplace=True) columns = ['direction', 'reduced_ofv', 'extended_ofv', 'ofv_drop'] if 'pvalue' in steps.columns: columns.extend(['delta_df', 'pvalue', 'goal_pvalue']) return df[columns]
[docs]def psn_scm_parse_logfile(logfile, options, parcov_dictionary): """Read SCM results""" logfile = Path(logfile) df = pd.concat([step for step in log_steps(logfile, options, parcov_dictionary)]) if 'stashed' in df.columns: df.fillna(value={'stashed': False, 'readded': False}, inplace=True) return df
[docs]def file_blocks(path): block = [] separator = re.compile(r'^[-\s]*$') with open(path) as file: for row in file: if separator.match(row): if block: yield block block = [] else: block.append(row) if block: yield block
[docs]def parse_runtable_block(block, parcov_dictionary=None, included_relations=None): header = { 'ofv': ['model', 'test', 'base', 'new', 'dofv', 'cmp', 'goal', 'signif'], 'pvalue': ['model', 'test', 'base', 'new', 'dofv', 'cmp', 'goal', 'ddf', 'signif', 'pval'], } if not len(block) > 1: raise NotImplementedError('function parse_runtable_block called with empty table') gof = None is_backward = re.search(r'IN.?SIGNIFICANT', block[0]) if re.match(r'^MODEL\s+TEST\s+BASE\s+OFV', block[0]): gof = 'pvalue' elif re.match(r'^MODEL\s+TEST\s+NAME\s+BASE\s+', block[0]): gof = 'ofv' else: raise NotImplementedError('Unsupported runtable header string') rawtable = pd.read_fwf( StringIO(str.join('', block)), skiprows=1, header=None, names=header[gof], true_values=['YES!'], na_values=['FAILED'], ) if len(rawtable.base.unique()) > 1: rawtable = split_merged_base_and_new_ofv(rawtable) if gof == 'pvalue': if np.all([np.isnan(v) or v is None for v in rawtable['pval'].values]): # No model signficant, signif column is empty and missed by read_fwf rawtable['pval'] = rawtable['signif'] rawtable['signif'] = False else: if np.all([np.isnan(v) or v is None for v in rawtable['signif'].values]): rawtable['signif'] = False table = model_name_series_to_dataframe( rawtable.model, parcov_dictionary, is_backward, included_relations ) if is_backward: table['reduced_ofv'] = rawtable.new table['extended_ofv'] = rawtable.base table['ofv_drop'] = rawtable.dofv.multiply(-1.0) else: table['reduced_ofv'] = rawtable.base table['extended_ofv'] = rawtable.new table['ofv_drop'] = rawtable.dofv if gof == 'pvalue': table['delta_df'] = rawtable.ddf.multiply(-1) if is_backward else rawtable.ddf table['pvalue'] = rawtable.pval else: table['goal_ofv_drop'] = rawtable.goal.multiply(-1.0) if is_backward else rawtable.goal significant = np.array([False if np.isnan(s) else s for s in rawtable.signif]) table['is_backward'] = True if is_backward else False table['extended_significant'] = ~significant if is_backward else significant return table
[docs]def model_name_series_to_dataframe(modelname, parcov_dictionary, is_backward, included_relations): subdf = modelname.str.extract(r'(?P<parcov>.+)-(?P<state>\d+)$', expand=True) state = np.nan if is_backward: if included_relations: state = extended_states(modelname, included_relations) else: state = [int(s) for s in subdf.state.values] result = pd.DataFrame( {'model': modelname, 'parameter': None, 'covariate': None, 'extended_state': state} ) if parcov_dictionary: temp = pd.DataFrame( [parcov_dictionary[m] for m in subdf.parcov], columns=('parameter', 'covariate') ) result.parameter = temp.parameter result.covariate = temp.covariate return result
[docs]def parse_mixed_block(block): m1 = None readded = list() stashed = list() included_relations = dict() pattern = { 'm1': re.compile(r'Model\s+directory\s+(?P<m1folder>\S+)'), 'stashed': re.compile(r'Taking a step.* reducing scope with .*:\s*(?P<relations>\S+)'), 'readded': re.compile(r'Re-testing .* relations after .* :\s*(?P<relations>\S+)'), 'included': re.compile(r'Included relations so far:\s*(?P<relations>\S+)'), } for row in block: if pattern['stashed'].match(row): if stashed: raise NotImplementedError('Two scope reduction lines in the same logfile block') stashed = [ tuple(relation.split('-')) for relation in pattern['stashed'].match(row).group('relations').split(',') ] elif pattern['readded'].match(row): if readded: raise NotImplementedError('Two re-testing lines in the same logfile block') readded = [ tuple(relation.split('-')) for relation in pattern['readded'].match(row).group('relations').split(',') ] elif pattern['m1'].match(row): if m1: raise NotImplementedError('Two model directory lines in the same logfile block') m1 = pattern['m1'].match(row).group('m1folder') elif pattern['included'].match(row): for relation in pattern['included'].match(row).group('relations').split(','): par, cov, state = relation.split('-') if par not in included_relations.keys(): included_relations[par] = dict() included_relations[par][cov] = state return m1, readded, stashed, included_relations
[docs]def parse_chosen_relation_block(block): chosen = dict() criterion = dict() pattern = { 'chosen': re.compile( r'Parameter-covariate relation chosen in this ' + r'(forward|backward) step: ' + r'(?P<parameter>\S+)-(?P<covariate>\S+)-(?P<state>\S+)' ), 'backward': re.compile(r'Parameter-covariate relation chosen in this backward st'), 'gof_ofv': re.compile(r'CRITERION\s+OFV\b'), 'gof_pvalue': re.compile(r'CRITERION\s+' + r'(?P<gof>PVAL)\s+(>|<)\s+(?P<pvalue>\S+)\s*$'), 'included': re.compile(r'Relations included after this step'), 'parameter': re.compile(r'(?P<parameter>\S+)'), 'covstate': re.compile(r'\s*(?P<covariate>\D[^-\s]*)-(?P<state>\d+)'), } chosen_match = pattern['chosen'].match(block[0]) is_backward = False if pattern['backward'].match(block[0]): is_backward = True if chosen_match: chosen = chosen_match.groupdict() if pattern['gof_ofv'].match(block[1]): criterion = {'gof': 'OFV', 'pvalue': None} else: criterion = pattern['gof_pvalue'].match(block[1]).groupdict() criterion['is_backward'] = is_backward included_relations = dict() if len(block) > 4 and pattern['included'].match(block[4]): for row in block[5:]: row = row.strip() par = pattern['parameter'].match(row).group('parameter') if re.search(r'-\d+$', par): raise NotImplementedError('Missing whitespace between param and included covs') included_relations[par] = {p: c for p, c in pattern['covstate'].findall(row)} return chosen, criterion, included_relations
[docs]def names_without_state(model_names): return [parcov for parcov in model_names.str.extract(r'(.+)-\d+$').values.flatten()]
[docs]def extended_states(model_names, included_relations): model_parcov = names_without_state(model_names) statedict = {parcov: int(-99) for parcov in model_parcov} for par, d in included_relations.items(): for cov, state in d.items(): statedict[f'{par}{cov}'] = int(state) return [statedict[parcov] for parcov in model_parcov]
[docs]def step_data_frame(step, included_relations): df = step['runtable'] is_backward = df['is_backward'].iloc[0] if is_backward and included_relations: if np.all(np.isnan(df['extended_state'].values.flatten())): # This must be a backward step without preceeding steps of any kind # and where included_relations was not found from conf file df['extended_state'] = extended_states(df['model'], included_relations) df['step'] = step['number'] if 'pvalue' in df.columns: if step['criterion']: df.insert(9, 'goal_pvalue', step['criterion']['pvalue']) elif step['previous_criterion']: if step['previous_criterion']['is_backward'] == is_backward: # same direction as previous step with criterion df.insert(9, 'goal_pvalue', step['previous_criterion']['pvalue']) chosenmodel = 'no model' if step['chosen']: chosenmodel = ( step['chosen']['parameter'] + step['chosen']['covariate'] + r'-' + step['chosen']['state'] ) df['selected'] = [name == chosenmodel for name in df['model'].values] df['directory'] = str(step['m1']) model = df['model'] # move to end df.drop(columns=['model'], inplace=True) df['model'] = model if step['stashed'] or step['readded']: model_parcov = names_without_state(df['model']) if step['stashed']: stashed = {f'{par}{cov}' for par, cov in step['stashed']} df['stashed'] = [parcov in stashed for parcov in model_parcov] else: df['stashed'] = False if step['readded']: readded = {f'{par}{cov}' for par, cov in step['readded']} df['readded'] = [parcov in readded for parcov in model_parcov] else: df['readded'] = False return df.set_index(['step', 'parameter', 'covariate', 'extended_state'])
[docs]def prior_included_step(included_relations, gof_is_pvalue): states = list() extra = None if gof_is_pvalue: extra = {'delta_df': 0, 'pvalue': np.nan, 'goal_pvalue': np.nan} else: extra = {'goal_ofv_drop': np.nan} for par, d in included_relations.items(): for cov, state in d.items(): states.append( { 'step': int(0), 'parameter': par, 'covariate': cov, 'extended_state': int(state), 'reduced_ofv': np.nan, 'extended_ofv': np.nan, 'ofv_drop': np.nan, **extra, 'is_backward': False, 'extended_significant': True, 'selected': True, 'directory': '', 'model': '', } ) df = pd.DataFrame(states) return df.set_index(['step', 'parameter', 'covariate', 'extended_state'])
[docs]def empty_step(previous_number, previous_criterion=None): return { 'runtable': None, 'm1': None, 'chosen': None, 'stashed': None, 'readded': None, 'criterion': None, 'number': previous_number + 1, 'previous_criterion': previous_criterion, }
[docs]def have(something): return something is not None
[docs]def log_steps(path, options, parcov_dictionary=None): included_relations = options['included_relations'] basepath = Path(options['directory']) pattern = { 'runtable': re.compile(r'^MODEL\s+TEST\s+'), 'm1': re.compile(r'Model\s+directory\s+(?P<m1folder>\S+)'), 'chosen': re.compile(r'Parameter-covariate relation chosen in this'), } step = empty_step(0) for block in file_blocks(path): if pattern['runtable'].match(block[0]): # can be empty table with header, only set table if have content if len(block) > 1: step['runtable'] = parse_runtable_block( block, parcov_dictionary, included_relations ) is_forward = not step['runtable']['is_backward'].iloc[-1] if step['number'] == 1 and included_relations is not None and is_forward: yield prior_included_step( included_relations, gof_is_pvalue=('pvalue' in step['runtable'].columns) ) elif pattern['chosen'].match(block[0]): step['chosen'], step['criterion'], included = parse_chosen_relation_block(block) if included: included_relations = included elif pattern['m1'].match(block[0]): if have(step['runtable']): yield step_data_frame(step, included_relations) step = empty_step(step['number'], step['criterion']) step['m1'] = Path(pattern['m1'].match(block[0]).group('m1folder')).relative_to(basepath) else: # complex block, either gof ofv or scmplus. May contain m1 m1, readded, stashed, included = parse_mixed_block(block) if stashed: # stashing belongs to previously read runtable: do not yield step['stashed'] = stashed if included: included_relations = included if readded: # readding is done first in new step: yield if have a runtable if have(step['runtable']): yield step_data_frame(step, included_relations) step = empty_step(step['number'], step['criterion']) step['readded'] = readded if m1: # Writing m1 is done first in step, unless readding was done first # in which case runtable will be empty if have(step['runtable']): yield step_data_frame(step, included_relations) step = empty_step(step['number'], step['criterion']) step['m1'] = Path(m1).relative_to(basepath) if have(step['runtable']): yield step_data_frame(step, included_relations)
[docs]def split_merged_base_and_new_ofv(rawtable): testnames = rawtable.test.unique() if len(testnames) > 1: raise ValueError('Non-unique TEST column value in scm-step, model name too wide?') elif not re.match(r'(PVAL|OFV)', testnames[0]): raise ValueError(f'Unrecognized TEST column value {testnames[0]}') if len(rawtable.base.unique()) > 1: # We have missing space between columns base and new. # These are formatted with the same widths, so split # at equal lengths, i.e. at half of max length. Handle # case where a run has ofv 'FAILED' by skipping whitespace # in second part after string split. if not all(pd.isna(rawtable.iloc[:, -1])): # expect all values in last column to be NaN if ofv columns are merged raise Exception # Save names and then drop last column. Names are unique so this drops 1 col column_names = list(rawtable.columns) rawtable.drop(columns=column_names[-1], inplace=True) baseofvlength = int(max({len(x) for x in rawtable.base.values}) / 2) pattern = f'(?P<base>.{{{baseofvlength}}})' + r'\s*(?P<newfixed>\S*)' subdf = rawtable.base.str.extract(pattern, expand=True) subdf.replace('FAILED', np.nan, inplace=True) if len(subdf.base.unique()) > 1: # all base values should be identical now raise Exception # replace base column and insert newfixed rawtable.base = subdf.base rawtable.insert(3, 'newfixed', subdf.newfixed) # rename columns to get original labels at correct positions old = list(rawtable.columns) rawtable.rename(columns={o: correct for o, correct in zip(old, column_names)}, inplace=True) return rawtable
[docs]def psn_config_file_argument_from_command(command, path): # arguments: everything on command-line which does not start with - # and where 'name' (i.e. last portion when treated as a path object) is found # as file in input folder path # this may include a model file, so we need to parse file(s) to see # if we actually find included_relations path = Path(path) return [ Path(arg).name for arg in arguments_from_command(command) if (path / Path(arg).name).is_file() ]
[docs]def relations_from_config_file(path, files): # Allow multiple candidate files in case ambiguous command-line (unusual) path = Path(path) start_config_section = re.compile(r'\s*\[(?P<section>[a-z_]+)\]\s*$') empty_line = re.compile(r'^[-\s]*$') comment_line = re.compile(r'\s*;') included_lines = None test_lines = None included_relations = None test_relations = None scanning_included = False scanning_test = False for file in files: with open(path / file) as fn: for row in fn: m = start_config_section.match(row) if m: if m.group('section') == 'included_relations': scanning_included = True scanning_test = False included_lines = list() elif m.group('section') == 'test_relations': scanning_included = False scanning_test = True test_lines = list() else: scanning_included = False scanning_test = False elif scanning_included: if not empty_line.match(row) and not comment_line.match(row): included_lines.append(row.strip()) elif scanning_test: if not empty_line.match(row) and not comment_line.match(row): test_lines.append(row.strip()) if test_lines is not None and len(test_lines) > 0: break # do not check any other file, if more than one if included_lines is not None and len(included_lines) > 0: included_relations = dict() p = re.compile(r'\s*([^-]+)-(\d+)\s*') for row in included_lines: par, covstates = row.split(r'=') par = str.strip(par) included_relations[par] = { p.match(val).group(1): p.match(val).group(2) for val in covstates.split(r',') } if test_lines is not None and len(test_lines) > 0: test_relations = dict() for row in test_lines: par, covs = row.split(r'=') par = str.strip(par) test_relations[par] = [str.strip(val) for val in covs.split(r',')] return included_relations, test_relations
[docs]def psn_scm_options(path): path = Path(path) options = { 'directory': str(path), 'logfile': 'scmlog.txt', 'included_relations': None, 'test_relations': None, } scmplus = False config_files = None with open(path / 'meta.yaml') as meta: cmd = None for row in meta: if cmd is not None: if re.match(r'\s', row): # continuation is indented cmd += row # must not strip continue else: # no continuation: parse and remove if tool_from_command(cmd) == 'scmplus': scmplus = True for k, v in options_from_command(cmd).items(): if 'config_file'.startswith(k): config_files = [v] break if config_files is None: # not option -config_file, must have been given as argument config_files = psn_config_file_argument_from_command(cmd, path) cmd = None row = row.strip() if row.startswith('logfile: '): options['logfile'] = Path(re.sub(r'\s*logfile:\s*', '', row)).name elif row.startswith('directory: '): options['directory'] = str(Path(re.sub(r'\s*directory:\s*', '', row))) elif row.startswith('command_line: '): cmd = row if scmplus and Path(options['directory']).parts[-1] == 'rundir': options['directory'] = str(Path(options['directory']).parents[0]) if config_files is not None: options['included_relations'], options['test_relations'] = relations_from_config_file( path, config_files ) return options
[docs]def parcov_dict_from_test_relations(test_relations): parcov = dict() for par, covs in test_relations.items(): for cov in covs: parcov[f'{par}{cov}'] = (par, cov) return parcov
[docs]def add_covariate_effects(res, path): steps = res.steps if 'delta_df' not in steps: return def fn(row): if row.is_backward: return np.nan degrees = row.delta_df model_name = row.model.replace( '-', '' ) # FIXME: The dash should not have been in the table! model_path = path / row.directory / f'{model_name}.mod' if not model_path.is_file(): return np.nan model = Model(model_path) varpars = model.random_variables.free_symbols all_thetas = [param for param in model.parameters if param.symbol not in varpars] new_thetas = all_thetas[-degrees:] covariate_effects = { param.name: model.modelfit_results.parameter_estimates[param.name] for param in new_thetas } return covariate_effects steps['covariate_effects'] = steps.apply(fn, axis=1)
[docs]def psn_scm_results(path): """Create scm results from a PsN SCM run :param path: Path to PsN scm run directory :return: A :class:`SCMResults` object """ path = Path(path) if not path.is_dir(): raise IOError(f'Could not find scm directory: {str(path)}') options = psn_scm_options(path) logfile = path / options['logfile'] if not logfile.is_file(): raise IOError(f'Could not find scm logfile: {str(logfile)}') if options['test_relations'] is not None: parcov_dictionary = parcov_dict_from_test_relations(options['test_relations']) else: raise IOError(r'Could not find test_relations in scm config file') res = SCMResults(steps=psn_scm_parse_logfile(logfile, options, parcov_dictionary)) add_covariate_effects(res, path) return res