import re
from io import StringIO
from pathlib import Path
import numpy as np
import pandas as pd
from pharmpy import Model
from pharmpy.results import Results
from pharmpy.tools.psn_helpers import (
arguments_from_command,
options_from_command,
tool_from_command,
)
[docs]class SCMResults(Results):
"""SCM Results class"""
rst_path = Path(__file__).parent / 'report.rst'
def __init__(self, steps=None):
self.steps = steps
[docs] def ofv_summary(self, final_included=True, iterations=True):
return ofv_summary_dataframe(
self.steps, final_included=final_included, iterations=iterations
)
[docs] def candidate_summary(self):
return candidate_summary_dataframe(self.steps)
[docs]def candidate_summary_dataframe(steps):
if steps is None:
return None
elif steps['is_backward'].all():
selected = steps[steps['selected']].copy()
df = pd.DataFrame(
[{'BackstepRemoved': row.Index[0]} for row in selected.itertuples()],
index=selected.index,
)
return df.droplevel('step')
else:
scmplus = True if 'stashed' in steps.columns else False
backstep_removed = {
f'{row.Index[1]}{row.Index[2]}-{row.Index[3]}': row.Index[0]
for row in steps.itertuples()
if row.is_backward and row.selected
}
forward_steps = steps.query('step > 0 & ~is_backward')
df = pd.DataFrame(
[
{
'N_test': True,
'N_ok': (not np.isnan(row.ofv_drop) and row.ofv_drop >= 0),
'N_localmin': (not np.isnan(row.ofv_drop) and row.ofv_drop < 0),
'N_failed': np.isnan(row.ofv_drop),
'StepIncluded': row.Index[0] if row.selected else None,
'StepStashed': row.Index[0] if (scmplus and row.stashed) else None,
'StepReadded': row.Index[0] if (scmplus and row.readded) else None,
'BackstepRemoved': backstep_removed.pop(row.model, None),
}
for row in forward_steps.itertuples()
],
index=forward_steps.index,
)
return df.groupby(level=['parameter', 'covariate', 'extended_state']).sum(min_count=1)
[docs]def ofv_summary_dataframe(steps, final_included=True, iterations=True):
if steps is None or not (final_included or iterations):
return None
else:
if final_included and iterations and not steps['is_backward'].iloc[-1]:
# Will not be able to show final_included with additional info
final_included = False
# Use .copy() to ensure we do not work on original df
df = steps[steps['selected']].copy() if iterations else pd.DataFrame()
if iterations:
df['is_backward'] = [
'Backward' if backward else 'Forward' for backward in df['is_backward']
]
if final_included:
if steps['is_backward'].iloc[-1]:
# all rows from last step where selected is False
last_stepnum = steps.index[-1][steps.index.names.index('step')]
final = steps[~steps['selected']].loc[last_stepnum, :, :, :].copy()
else:
# all selected rows without ofv info
final = pd.DataFrame(columns=steps.columns, index=steps[steps['selected']].index)
final['is_backward'] = 'Final included'
df = df.append(final)
df.rename(columns={'is_backward': 'direction'}, inplace=True)
columns = ['direction', 'reduced_ofv', 'extended_ofv', 'ofv_drop']
if 'pvalue' in steps.columns:
columns.extend(['delta_df', 'pvalue', 'goal_pvalue'])
return df[columns]
[docs]def psn_scm_parse_logfile(logfile, options, parcov_dictionary):
"""Read SCM results"""
logfile = Path(logfile)
df = pd.concat([step for step in log_steps(logfile, options, parcov_dictionary)])
if 'stashed' in df.columns:
df.fillna(value={'stashed': False, 'readded': False}, inplace=True)
return df
[docs]def file_blocks(path):
block = []
separator = re.compile(r'^[-\s]*$')
with open(path) as file:
for row in file:
if separator.match(row):
if block:
yield block
block = []
else:
block.append(row)
if block:
yield block
[docs]def parse_runtable_block(block, parcov_dictionary=None, included_relations=None):
header = {
'ofv': ['model', 'test', 'base', 'new', 'dofv', 'cmp', 'goal', 'signif'],
'pvalue': ['model', 'test', 'base', 'new', 'dofv', 'cmp', 'goal', 'ddf', 'signif', 'pval'],
}
if not len(block) > 1:
raise NotImplementedError('function parse_runtable_block called with empty table')
gof = None
is_backward = re.search(r'IN.?SIGNIFICANT', block[0])
if re.match(r'^MODEL\s+TEST\s+BASE\s+OFV', block[0]):
gof = 'pvalue'
elif re.match(r'^MODEL\s+TEST\s+NAME\s+BASE\s+', block[0]):
gof = 'ofv'
else:
raise NotImplementedError('Unsupported runtable header string')
# First column might be wider than 16 characters. Must be padded in that case
lens = []
for line in block:
a = line.split(maxsplit=1)
lens.append(len(a[0]))
maxlen = max(lens)
if maxlen > 16:
for i, length in enumerate(lens):
collen = 16 if length < 16 else length
block[i] = block[i][:length] + (maxlen - collen) * ' ' + block[i][length:]
rawtable = pd.read_fwf(
StringIO(str.join('', block)),
skiprows=1,
header=None,
names=header[gof],
true_values=['YES!'],
na_values=['FAILED'],
)
if len(rawtable.base.unique()) > 1:
rawtable = split_merged_base_and_new_ofv(rawtable)
if gof == 'pvalue':
if np.all([np.isnan(v) or v is None for v in rawtable['pval'].values]):
# No model signficant, signif column is empty and missed by read_fwf
rawtable['pval'] = rawtable['signif']
rawtable['signif'] = False
else:
if np.all([np.isnan(v) or v is None for v in rawtable['signif'].values]):
rawtable['signif'] = False
table = model_name_series_to_dataframe(
rawtable.model, parcov_dictionary, is_backward, included_relations
)
if is_backward:
table['reduced_ofv'] = rawtable.new
table['extended_ofv'] = rawtable.base
table['ofv_drop'] = rawtable.dofv.multiply(-1.0)
else:
table['reduced_ofv'] = rawtable.base
table['extended_ofv'] = rawtable.new
table['ofv_drop'] = rawtable.dofv
if gof == 'pvalue':
table['delta_df'] = rawtable.ddf.multiply(-1) if is_backward else rawtable.ddf
table['pvalue'] = rawtable.pval
else:
table['goal_ofv_drop'] = rawtable.goal.multiply(-1.0) if is_backward else rawtable.goal
significant = np.array([False if np.isnan(s) else s for s in rawtable.signif])
table['is_backward'] = True if is_backward else False
table['extended_significant'] = ~significant if is_backward else significant
return table
[docs]def model_name_series_to_dataframe(modelname, parcov_dictionary, is_backward, included_relations):
subdf = modelname.str.extract(r'(?P<parcov>.+)-(?P<state>\d+)$', expand=True)
state = np.nan
if is_backward:
if included_relations:
state = extended_states(modelname, included_relations)
else:
state = [int(s) for s in subdf.state.values]
result = pd.DataFrame(
{'model': modelname, 'parameter': None, 'covariate': None, 'extended_state': state}
)
if parcov_dictionary:
temp = pd.DataFrame(
[parcov_dictionary[m] for m in subdf.parcov], columns=('parameter', 'covariate')
)
result.parameter = temp.parameter
result.covariate = temp.covariate
return result
[docs]def parse_mixed_block(block):
m1 = None
readded = list()
stashed = list()
included_relations = dict()
pattern = {
'm1': re.compile(r'Model\s+directory\s+(?P<m1folder>\S+)'),
'stashed': re.compile(r'Taking a step.* reducing scope with .*:\s*(?P<relations>\S+)'),
'readded': re.compile(r'Re-testing .* relations after .* :\s*(?P<relations>\S+)'),
'included': re.compile(r'Included relations so far:\s*(?P<relations>\S+)'),
}
for row in block:
if pattern['stashed'].match(row):
if stashed:
raise NotImplementedError('Two scope reduction lines in the same logfile block')
stashed = [
tuple(relation.split('-'))
for relation in pattern['stashed'].match(row).group('relations').split(',')
]
elif pattern['readded'].match(row):
if readded:
raise NotImplementedError('Two re-testing lines in the same logfile block')
readded = [
tuple(relation.split('-'))
for relation in pattern['readded'].match(row).group('relations').split(',')
]
elif pattern['m1'].match(row):
if m1:
raise NotImplementedError('Two model directory lines in the same logfile block')
m1 = pattern['m1'].match(row).group('m1folder')
elif pattern['included'].match(row):
for relation in pattern['included'].match(row).group('relations').split(','):
par, cov, state = relation.split('-')
if par not in included_relations.keys():
included_relations[par] = dict()
included_relations[par][cov] = state
return m1, readded, stashed, included_relations
[docs]def parse_chosen_relation_block(block):
chosen = dict()
criterion = dict()
pattern = {
'chosen': re.compile(
r'Parameter-covariate relation chosen in this '
+ r'(forward|backward) step: '
+ r'(?P<parameter>\S+)-(?P<covariate>\S+)-(?P<state>\S+)'
),
'backward': re.compile(r'Parameter-covariate relation chosen in this backward st'),
'gof_ofv': re.compile(r'CRITERION\s+OFV\b'),
'gof_pvalue': re.compile(r'CRITERION\s+' + r'(?P<gof>PVAL)\s+(>|<)\s+(?P<pvalue>\S+)\s*$'),
'included': re.compile(r'Relations included after this step'),
'parameter': re.compile(r'(?P<parameter>\S+)'),
'covstate': re.compile(r'\s*(?P<covariate>\D[^-\s]*)-(?P<state>\d+)'),
}
chosen_match = pattern['chosen'].match(block[0])
is_backward = False
if pattern['backward'].match(block[0]):
is_backward = True
if chosen_match:
chosen = chosen_match.groupdict()
if pattern['gof_ofv'].match(block[1]):
criterion = {'gof': 'OFV', 'pvalue': None}
else:
criterion = pattern['gof_pvalue'].match(block[1]).groupdict()
criterion['is_backward'] = is_backward
included_relations = dict()
if len(block) > 4 and pattern['included'].match(block[4]):
for row in block[5:]:
row = row.strip()
par = pattern['parameter'].match(row).group('parameter')
if re.search(r'-\d+$', par):
raise NotImplementedError('Missing whitespace between param and included covs')
included_relations[par] = {p: c for p, c in pattern['covstate'].findall(row)}
return chosen, criterion, included_relations
[docs]def names_without_state(model_names):
return [parcov for parcov in model_names.str.extract(r'(.+)-\d+$').values.flatten()]
[docs]def extended_states(model_names, included_relations):
model_parcov = names_without_state(model_names)
statedict = {parcov: int(-99) for parcov in model_parcov}
for par, d in included_relations.items():
for cov, state in d.items():
statedict[f'{par}{cov}'] = int(state)
return [statedict[parcov] for parcov in model_parcov]
[docs]def step_data_frame(step, included_relations):
df = step['runtable']
is_backward = df['is_backward'].iloc[0]
if is_backward and included_relations:
if np.all(np.isnan(df['extended_state'].values.flatten())):
# This must be a backward step without preceeding steps of any kind
# and where included_relations was not found from conf file
df['extended_state'] = extended_states(df['model'], included_relations)
df['step'] = step['number']
if 'pvalue' in df.columns:
if step['criterion']:
df.insert(9, 'goal_pvalue', step['criterion']['pvalue'])
elif step['previous_criterion']:
if step['previous_criterion']['is_backward'] == is_backward:
# same direction as previous step with criterion
df.insert(9, 'goal_pvalue', step['previous_criterion']['pvalue'])
chosenmodel = 'no model'
if step['chosen']:
chosenmodel = (
step['chosen']['parameter']
+ step['chosen']['covariate']
+ r'-'
+ step['chosen']['state']
)
df['selected'] = [name == chosenmodel for name in df['model'].values]
df['directory'] = str(step['m1'])
model = df['model'] # move to end
df.drop(columns=['model'], inplace=True)
df['model'] = model
if step['stashed'] or step['readded']:
model_parcov = names_without_state(df['model'])
if step['stashed']:
stashed = {f'{par}{cov}' for par, cov in step['stashed']}
df['stashed'] = [parcov in stashed for parcov in model_parcov]
else:
df['stashed'] = False
if step['readded']:
readded = {f'{par}{cov}' for par, cov in step['readded']}
df['readded'] = [parcov in readded for parcov in model_parcov]
else:
df['readded'] = False
return df.set_index(['step', 'parameter', 'covariate', 'extended_state'])
[docs]def prior_included_step(included_relations, gof_is_pvalue):
states = list()
extra = None
if gof_is_pvalue:
extra = {'delta_df': 0, 'pvalue': np.nan, 'goal_pvalue': np.nan}
else:
extra = {'goal_ofv_drop': np.nan}
for par, d in included_relations.items():
for cov, state in d.items():
states.append(
{
'step': int(0),
'parameter': par,
'covariate': cov,
'extended_state': int(state),
'reduced_ofv': np.nan,
'extended_ofv': np.nan,
'ofv_drop': np.nan,
**extra,
'is_backward': False,
'extended_significant': True,
'selected': True,
'directory': '',
'model': '',
}
)
df = pd.DataFrame(states)
return df.set_index(['step', 'parameter', 'covariate', 'extended_state'])
[docs]def empty_step(previous_number, previous_criterion=None):
return {
'runtable': None,
'm1': None,
'chosen': None,
'stashed': None,
'readded': None,
'criterion': None,
'number': previous_number + 1,
'previous_criterion': previous_criterion,
}
[docs]def have(something):
return something is not None
[docs]def log_steps(path, options, parcov_dictionary=None):
included_relations = options['included_relations']
basepath = Path(options['directory'])
pattern = {
'runtable': re.compile(r'^MODEL\s+TEST\s+'),
'm1': re.compile(r'Model\s+directory\s+(?P<m1folder>\S+)'),
'chosen': re.compile(r'Parameter-covariate relation chosen in this'),
}
step = empty_step(0)
for block in file_blocks(path):
if pattern['runtable'].match(block[0]):
# can be empty table with header, only set table if have content
if len(block) > 1:
step['runtable'] = parse_runtable_block(
block, parcov_dictionary, included_relations
)
is_forward = not step['runtable']['is_backward'].iloc[-1]
if step['number'] == 1 and included_relations is not None and is_forward:
yield prior_included_step(
included_relations, gof_is_pvalue=('pvalue' in step['runtable'].columns)
)
elif pattern['chosen'].match(block[0]):
step['chosen'], step['criterion'], included = parse_chosen_relation_block(block)
if included:
included_relations = included
elif pattern['m1'].match(block[0]):
if have(step['runtable']):
yield step_data_frame(step, included_relations)
step = empty_step(step['number'], step['criterion'])
step['m1'] = Path(pattern['m1'].match(block[0]).group('m1folder')).relative_to(basepath)
else:
# complex block, either gof ofv or scmplus. May contain m1
m1, readded, stashed, included = parse_mixed_block(block)
if stashed:
# stashing belongs to previously read runtable: do not yield
step['stashed'] = stashed
if included:
included_relations = included
if readded:
# readding is done first in new step: yield if have a runtable
if have(step['runtable']):
yield step_data_frame(step, included_relations)
step = empty_step(step['number'], step['criterion'])
step['readded'] = readded
if m1:
# Writing m1 is done first in step, unless readding was done first
# in which case runtable will be empty
if have(step['runtable']):
yield step_data_frame(step, included_relations)
step = empty_step(step['number'], step['criterion'])
step['m1'] = Path(m1).relative_to(basepath)
if have(step['runtable']):
yield step_data_frame(step, included_relations)
[docs]def split_merged_base_and_new_ofv(rawtable):
testnames = rawtable.test.unique()
if len(testnames) > 1:
raise ValueError('Non-unique TEST column value in scm-step, model name too wide?')
elif not re.match(r'(PVAL|OFV)', testnames[0]):
raise ValueError(f'Unrecognized TEST column value {testnames[0]}')
if len(rawtable.base.unique()) > 1:
# We have missing space between columns base and new.
# These are formatted with the same widths, so split
# at equal lengths, i.e. at half of max length. Handle
# case where a run has ofv 'FAILED' by skipping whitespace
# in second part after string split.
if not all(pd.isna(rawtable.iloc[:, -1])):
# expect all values in last column to be NaN if ofv columns are merged
raise Exception
# Save names and then drop last column. Names are unique so this drops 1 col
column_names = list(rawtable.columns)
rawtable.drop(columns=column_names[-1], inplace=True)
baseofvlength = int(max({len(x) for x in rawtable.base.values}) / 2)
pattern = f'(?P<base>.{{{baseofvlength}}})' + r'\s*(?P<newfixed>\S*)'
subdf = rawtable.base.str.extract(pattern, expand=True)
subdf.replace('FAILED', np.nan, inplace=True)
if len(subdf.base.unique()) > 1:
# all base values should be identical now
raise Exception
# replace base column and insert newfixed
rawtable.base = subdf.base
rawtable.insert(3, 'newfixed', subdf.newfixed)
# rename columns to get original labels at correct positions
old = list(rawtable.columns)
rawtable.rename(columns={o: correct for o, correct in zip(old, column_names)}, inplace=True)
return rawtable
[docs]def psn_config_file_argument_from_command(command, path):
# arguments: everything on command-line which does not start with -
# and where 'name' (i.e. last portion when treated as a path object) is found
# as file in input folder path
# this may include a model file, so we need to parse file(s) to see
# if we actually find included_relations
path = Path(path)
return [
Path(arg).name
for arg in arguments_from_command(command)
if (path / Path(arg).name).is_file()
]
[docs]def relations_from_config_file(path, files):
# Allow multiple candidate files in case ambiguous command-line (unusual)
path = Path(path)
start_config_section = re.compile(r'\s*\[(?P<section>[a-z_]+)\]\s*$')
empty_line = re.compile(r'^[-\s]*$')
comment_line = re.compile(r'\s*;')
included_lines = None
test_lines = None
included_relations = None
test_relations = None
scanning_included = False
scanning_test = False
for file in files:
with open(path / file) as fn:
for row in fn:
m = start_config_section.match(row)
if m:
if m.group('section') == 'included_relations':
scanning_included = True
scanning_test = False
included_lines = list()
elif m.group('section') == 'test_relations':
scanning_included = False
scanning_test = True
test_lines = list()
else:
scanning_included = False
scanning_test = False
elif scanning_included:
if not empty_line.match(row) and not comment_line.match(row):
included_lines.append(row.strip())
elif scanning_test:
if not empty_line.match(row) and not comment_line.match(row):
test_lines.append(row.strip())
if test_lines is not None and len(test_lines) > 0:
break # do not check any other file, if more than one
if included_lines is not None and len(included_lines) > 0:
included_relations = dict()
p = re.compile(r'\s*([^-]+)-(\d+)\s*')
for row in included_lines:
par, covstates = row.split(r'=')
par = str.strip(par)
included_relations[par] = {
p.match(val).group(1): p.match(val).group(2) for val in covstates.split(r',')
}
if test_lines is not None and len(test_lines) > 0:
test_relations = dict()
for row in test_lines:
par, covs = row.split(r'=')
par = str.strip(par)
test_relations[par] = [str.strip(val) for val in covs.split(r',')]
return included_relations, test_relations
[docs]def psn_scm_options(path):
path = Path(path)
options = {
'directory': str(path),
'logfile': 'scmlog.txt',
'included_relations': None,
'test_relations': None,
}
scmplus = False
config_files = None
with open(path / 'meta.yaml') as meta:
cmd = None
for row in meta:
if cmd is not None:
if re.match(r'\s', row): # continuation is indented
cmd += row # must not strip
continue
else: # no continuation: parse and remove
if tool_from_command(cmd) == 'scmplus':
scmplus = True
for k, v in options_from_command(cmd).items():
if 'config_file'.startswith(k):
config_files = [v]
break
if config_files is None:
# not option -config_file, must have been given as argument
config_files = psn_config_file_argument_from_command(cmd, path)
cmd = None
row = row.strip()
if row.startswith('logfile: '):
options['logfile'] = Path(re.sub(r'\s*logfile:\s*', '', row)).name
elif row.startswith('directory: '):
options['directory'] = str(Path(re.sub(r'\s*directory:\s*', '', row)))
elif row.startswith('command_line: '):
cmd = row
if scmplus and Path(options['directory']).parts[-1] == 'rundir':
options['directory'] = str(Path(options['directory']).parents[0])
if config_files is not None:
options['included_relations'], options['test_relations'] = relations_from_config_file(
path, config_files
)
return options
[docs]def parcov_dict_from_test_relations(test_relations):
parcov = dict()
for par, covs in test_relations.items():
for cov in covs:
parcov[f'{par}{cov}'] = (par, cov)
return parcov
[docs]def add_covariate_effects(res, path):
steps = res.steps
if 'delta_df' not in steps:
return
def fn(row):
if row.is_backward:
return np.nan
degrees = row.delta_df
model_name = row.model.replace(
'-', ''
) # FIXME: The dash should not have been in the table!
model_path = path / row.directory / f'{model_name}.mod'
if not model_path.is_file():
return np.nan
model = Model(model_path)
varpars = model.random_variables.free_symbols
all_thetas = [param for param in model.parameters if param.symbol not in varpars]
new_thetas = all_thetas[-degrees:]
covariate_effects = {
param.name: model.modelfit_results.parameter_estimates[param.name]
for param in new_thetas
}
return covariate_effects
steps['covariate_effects'] = steps.apply(fn, axis=1)
[docs]def psn_scm_results(path):
"""Create scm results from a PsN SCM run
:param path: Path to PsN scm run directory
:return: A :class:`SCMResults` object
"""
path = Path(path)
if not path.is_dir():
raise IOError(f'Could not find scm directory: {str(path)}')
options = psn_scm_options(path)
logfile = path / options['logfile']
if not logfile.is_file():
raise IOError(f'Could not find scm logfile: {str(logfile)}')
if options['test_relations'] is not None:
parcov_dictionary = parcov_dict_from_test_relations(options['test_relations'])
else:
raise IOError(r'Could not find test_relations in scm config file')
res = SCMResults(steps=psn_scm_parse_logfile(logfile, options, parcov_dictionary))
add_covariate_effects(res, path)
return res