Source code for pharmpy.tools.amd.run

import re
import warnings
from pathlib import Path
from typing import Callable, Literal, Optional, Sequence, Union

from pharmpy import DEFAULT_SEED
from pharmpy.basic import TSymbol
from pharmpy.deps import numpy as np
from pharmpy.deps import pandas as pd
from pharmpy.internals.fn.type import check_list, with_runtime_arguments_type_check
from pharmpy.model import Model
from pharmpy.modeling import (
    add_parameter_uncertainty_step,
    add_predictions,
    add_residuals,
    create_basic_pk_model,
    create_rng,
    find_clearance_parameters,
    get_central_volume_and_clearance,
    get_pk_parameters,
    has_mixed_mm_fo_elimination,
    plot_abs_cwres_vs_ipred,
    plot_cwres_vs_idv,
    plot_dv_vs_ipred,
    plot_dv_vs_pred,
    plot_eta_distributions,
    plot_vpc,
    set_dataset,
    set_simulation,
)
from pharmpy.modeling.blq import has_blq_transformation, transform_blq
from pharmpy.modeling.common import convert_model, filter_dataset
from pharmpy.modeling.covariate_effect import get_covariates_allowed_in_covariate_effect
from pharmpy.modeling.parameter_variability import get_occasion_levels
from pharmpy.modeling.tmdd import DV_TYPES
from pharmpy.reporting import generate_report
from pharmpy.tools import retrieve_models, write_results
from pharmpy.tools.allometry.tool import validate_allometric_variable
from pharmpy.tools.common import table_final_eta_shrinkage
from pharmpy.tools.mfl.feature.covariate import covariates as extract_covariates
from pharmpy.tools.mfl.parse import ModelFeatures, get_model_features
from pharmpy.tools.mfl.parse import parse as mfl_parse
from pharmpy.tools.mfl.statement.feature.covariate import Covariate
from pharmpy.tools.mfl.statement.feature.peripherals import Peripherals
from pharmpy.tools.mfl.statement.statement import Statement
from pharmpy.tools.reporting import get_rst_path
from pharmpy.tools.run import (
    create_metadata,
    get_run_setup,
    is_strictness_fulfilled,
    split_common_options,
    summarize_errors_from_entries,
)
from pharmpy.workflows import ModelEntry, Results
from pharmpy.workflows.model_database.local_directory import get_modelfit_results
from pharmpy.workflows.results import ModelfitResults

from ..run import run_tool
from .results import AMDResults

ALLOWED_STRATEGY = ["default", "reevaluation", "SIR", "SRI", "RSI"]
ALLOWED_ADMINISTRATION = ["iv", "oral", "ivoral"]
ALLOWED_MODELTYPE = ['basic_pk', 'pkpd', 'drug_metabolite', 'tmdd']
RETRIES_STRATEGIES = ["final", "all_final", "skip"]
DEFAULT_STRICTNESS = "minimization_successful or (rounding_errors and sigdigs>=0.1)"


def spawn_seed(rng) -> int:
    # Creates a 128bit seed for a subtool
    # FIXME: If this works well this function can be moved to modeling
    a = rng.integers(2**64 - 1, size=2, dtype=np.uint64)
    x = int(a[0]) * 2**64 + int(a[1])
    return x


[docs] def run_amd( input: Union[Model, Path, str], results: Optional[ModelfitResults] = None, modeltype: str = 'basic_pk', administration: str = 'oral', strategy: str = "default", cl_init: Optional[float] = None, vc_init: Optional[float] = None, mat_init: Optional[float] = None, b_init: Optional[float] = None, emax_init: Optional[float] = None, ec50_init: Optional[float] = None, met_init: Optional[float] = None, search_space: Optional[str] = None, lloq_method: Optional[str] = None, lloq_limit: Optional[float] = None, allometric_variable: Optional[TSymbol] = None, occasion: Optional[str] = None, path: Optional[Union[str, Path]] = None, resume: bool = False, strictness: str = DEFAULT_STRICTNESS, dv_types: Optional[dict[Literal[DV_TYPES], int]] = None, mechanistic_covariates: Optional[list[Union[str, tuple[str]]]] = None, retries_strategy: Literal["final", "all_final", "skip"] = "all_final", # seed is a common option but needs to be here since amd is not yet a proper tool seed: int = DEFAULT_SEED, parameter_uncertainty_method: Optional[Literal['SANDWICH', 'SMAT', 'RMAT', 'EFIM']] = None, ignore_datainfo_fallback: bool = False, _E: Optional[dict[str, Union[float, str]]] = None, ): """Run Automatic Model Development (AMD) tool Parameters ---------- input : Model, Path or DataFrame Starting model or dataset results : ModelfitResults Reults of input if input is a model modeltype : str Type of model to build. Valid strings are 'basic_pk', 'pkpd', 'drug_metabolite' and 'tmdd' administration : str Route of administration. Either 'iv', 'oral' or 'ivoral' strategy : str Run algorithm for AMD procedure. Valid options are 'default', 'reevaluation', 'SIR', 'SRI', and 'RSI'. cl_init : float Initial estimate for the population clearance vc_init : float Initial estimate for the central compartment population volume mat_init : float Initial estimate for the mean absorption time (not for iv models) b_init : float Initial estimate for the baseline (PKPD model) emax_init : float Initial estimate for E_max (PKPD model) ec50_init : float Initial estimate for EC_50 (PKPD model) met_init : float Initial estimate for mean equilibration time (PKPD model) search_space : str MFL for search space for structural and covariate model lloq_method : str Method for how to remove LOQ data. See `transform_blq` for list of available methods lloq_limit : float Lower limit of quantification. If None LLOQ column from dataset will be used allometric_variable: str or Symbol Variable to use for allometry. This option is deprecated. Please use ALLOMETRY in the mfl instead. occasion : str Name of occasion column path : str or Path Path to run AMD in resume : bool Whether to allow resuming previous run strictness : str Strictness criteria dv_types : dict or None Dictionary of DV types for TMDD models with multiple DVs. mechanistic_covariates : list List of covariates or tuple of covariate and parameter combination to run in a separate proioritized covsearch run. For instance ["WT", ("CRCL", "CL")]. The effects are extracted from the search space for covsearch. retries_strategy: str Whether or not to run retries tool. Valid options are 'skip', 'all_final' or 'final'. Default is 'final'. seed : int Random seed to be used. parameter_uncertainty_method: {'SANDWICH', 'SMAT', 'RMAT', 'EFIM'} or None Parameter uncertainty method. ignore_datainfo_fallback : bool Ignore using datainfo to get information not given by the user. Default is False _E: dict EXPERIMENTAL FEATURE. Dictionary of different E-values used in mBIC. Returns ------- AMDResults Results for the run Examples -------- >>> from pharmpy.modeling import * >>> from pharmpy.tools import run_amd, load_example_modelfit_results >>> model = load_example_model("pheno") >>> results = load_example_modelfit_results("pheno") >>> res = run_amd(model, results=results) # doctest: +SKIP See also -------- run_iiv run_tool """ args = locals() validate_input(**args) rng = create_rng(seed) ctx = _setup_run(args) ctx.log_info("Starting tool amd") from pharmpy.model.external import nonmem # FIXME: We should not depend on NONMEM if search_space is not None: try: ss_mfl = mfl_parse(search_space, True) except: # noqa E722 raise ValueError(f'Invalid `search_space`, could not be parsed: "{search_space}"') else: ss_mfl = ModelFeatures() if ss_mfl.allometry is not None: # Take it out and put back later if allometric_variable is not None: raise ValueError( "Having both allometric_variable and ALLOMETRY in the mfl is not allowed" ) mfl_allometry = ss_mfl.allometry ss_mfl = ss_mfl.replace(allometry=None) else: mfl_allometry = None if modeltype == 'pkpd': dv = 2 iiv_strategy = 'pd_fullblock' else: dv = None iiv_strategy = 'fullblock' if isinstance(input, str): input = Path(input) if isinstance(input, Path): model = create_basic_pk_model( administration, dataset_path=input, cl_init=cl_init, vc_init=vc_init, mat_init=mat_init, ) model = convert_model(model, 'nonmem') # FIXME: Workaround for results retrieval system elif isinstance(input, pd.DataFrame): model = create_basic_pk_model( administration, cl_init=cl_init, vc_init=vc_init, mat_init=mat_init, ) model = set_dataset(model, input, datatype='nonmem') model = convert_model(model, 'nonmem') # FIXME: Workaround for results retrieval system elif isinstance(input, nonmem.model.Model): model = input model = model.replace(name='start') ctx.store_input_model_entry(model) else: # Redundant with validation raise TypeError( f'Invalid input: got `{input}` of type {type(input)},' f' only NONMEM model or standalone dataset are supported currently.' ) if 'dvid' in model.datainfo.types: dvid_name = model.datainfo.typeix['dvid'][0].name elif 'DVID' in model.datainfo.names: dvid_name = 'DVID' else: dvid_name = None model = add_predictions(model, ['PRED', 'CIPREDI']) model = add_residuals(model, ['CWRES']) # FIXME : Handle validation differently? # AMD start model (dataset) is required before validation args['input'] = model later_input_validation(**args) to_be_skipped = check_skip( ctx, model, occasion, allometric_variable, ignore_datainfo_fallback, search_space ) if parameter_uncertainty_method is not None: model = add_parameter_uncertainty_step(model, parameter_uncertainty_method) if lloq_method is not None: model = transform_blq( model, method=lloq_method, lloq=lloq_limit, ) if strategy == "default": order = ['structural', 'iivsearch', 'residual', 'iovsearch', 'allometry', 'covariates'] elif strategy == "reevaluation": order = [ 'structural', 'iivsearch', 'residual', 'iovsearch', 'allometry', 'covariates', 'iivsearch', 'residual', ] elif strategy == 'SIR': order = ['structural', 'iivsearch', 'residual'] elif strategy == 'SRI': order = ['structural', 'residual', 'iivsearch'] elif strategy == 'RSI': order = ['residual', 'structural', 'iivsearch'] if modeltype == 'pkpd' and 'allometry' in order: ctx.log_warning('Skipping allometry since modeltype is "pkpd"') order.remove('allometry') if to_be_skipped: order = [tool for tool in order if tool not in to_be_skipped] if modeltype in ['pkpd', 'drug_metabolite']: if modeltype == 'pkpd': structsearch_features = ss_mfl.filter("pd") else: structsearch_features = ss_mfl.filter("metabolite") if len(structsearch_features.mfl_statement_list()) == 0: if modeltype == 'pkpd': structsearch_features = mfl_parse( "DIRECTEFFECT([LINEAR, EMAX, SIGMOID]);" "EFFECTCOMP([LINEAR, EMAX, SIGMOID]);" "INDIRECTEFFECT([LINEAR, EMAX, SIGMOID], *)", True, ) else: if administration in ['oral', 'ivoral']: structsearch_features = mfl_parse( "METABOLITE([PSC, BASIC]);PERIPHERALS([0,1], MET)", True ) else: structsearch_features = mfl_parse( "METABOLITE([BASIC]);PERIPHERALS([0,1], MET)", True ) modelsearch_features = ss_mfl.filter("pk") if len(modelsearch_features.mfl_statement_list()) == 0: if modeltype in ('basic_pk', 'drug_metabolite') and administration == 'oral': modelsearch_features = mfl_parse( "ABSORPTION([FO,ZO,SEQ-ZO-FO]);" "ELIMINATION(FO);" "LAGTIME([OFF,ON]);" "TRANSITS([0,1,3,10],*);" "PERIPHERALS(0..1)", True, ) elif modeltype in ('basic_pk', 'drug_metabolite') and administration == 'ivoral': modelsearch_features = mfl_parse( "ABSORPTION([FO,ZO,SEQ-ZO-FO]);" "ELIMINATION(FO);" "LAGTIME([OFF,ON]);" "TRANSITS([0,1,3,10],*);" "PERIPHERALS(0..2)", True, ) elif modeltype == 'tmdd' and administration == 'oral': modelsearch_features = mfl_parse( "ABSORPTION([FO,ZO,SEQ-ZO-FO]);" "ELIMINATION([MM, MIX-FO-MM]);" "LAGTIME([OFF,ON]);" "TRANSITS([0,1,3,10],*);" "PERIPHERALS(0..1)", True, ) elif modeltype == 'tmdd' and administration == 'ivoral': modelsearch_features = mfl_parse( "ABSORPTION([FO,ZO,SEQ-ZO-FO]);" "ELIMINATION([MM, MIX-FO-MM]);" "LAGTIME([OFF,ON]);" "TRANSITS([0,1,3,10],*);" "PERIPHERALS(0..2)", True, ) else: modelsearch_features = mfl_parse("ELIMINATION(FO);" "PERIPHERALS(0..2)", True) if mfl_allometry is not None: modelsearch_features = modelsearch_features.replace(allometry=mfl_allometry) covsearch_features = ModelFeatures.create(covariate=ss_mfl.covariate) if not covsearch_features.covariate: if modeltype != 'pkpd': cov_ss = mfl_parse( "COVARIATE?(@IIV, @CONTINUOUS, EXP);" "COVARIATE?(@IIV,@CATEGORICAL, CAT)", True ) else: cov_ss = mfl_parse( "COVARIATE?(@PD_IIV, @CONTINUOUS, EXP);" "COVARIATE?(@PD_IIV,@CATEGORICAL, CAT)", True, ) covsearch_features = covsearch_features.replace(covariate=cov_ss.covariate) if modeltype == 'basic_pk' and administration == 'ivoral': # FIXME : Allow addition between search space with reference values in COVARITATE statement covsearch_features = mfl_parse(str(cov_ss) + ";COVARIATE?(RUV,ADMID,CAT)", True) if modeltype == "tmdd": orig_dataset = model.dataset if dv_types is not None: model = filter_dataset(model, f'{dvid_name} < 2') model = model.replace(dataset=model.dataset.reset_index()) run_subfuncs = {} for section in order: if section == 'structural': if modeltype != 'pkpd': run_subfuncs['structural_covariates'] = _subfunc_structural_covariates( amd_start_model=model, search_space=covsearch_features, strictness=strictness, ctx=ctx, ) if modeltype == 'pkpd': func = _subfunc_structsearch( type=modeltype, search_space=structsearch_features, b_init=b_init, emax_init=emax_init, ec50_init=ec50_init, met_init=met_init, strictness=strictness, ctx=ctx, ) run_subfuncs['structsearch'] = func elif modeltype == 'tmdd': func = _subfunc_structsearch_tmdd( search_space=modelsearch_features, type=modeltype, strictness=strictness, dv_types=dv_types, orig_dataset=orig_dataset, ctx=ctx, ) run_subfuncs['structsearch'] = func else: func = _subfunc_modelsearch( search_space=modelsearch_features, strictness=strictness, E=_E, ctx=ctx ) run_subfuncs['modelsearch'] = func # Perfomed 'after' modelsearch if modeltype == 'drug_metabolite': func = _subfunc_structsearch( type=modeltype, search_space=structsearch_features, ctx=ctx, ) run_subfuncs['structsearch'] = func elif section == 'iivsearch': if 'iivsearch' in run_subfuncs.keys(): run_name = 'rerun_iivsearch' func = _subfunc_iiv( iiv_strategy='no_add', strictness=strictness, E=_E, ctx=ctx, dir_name="rerun_iivsearch", ) else: run_name = 'iivsearch' func = _subfunc_iiv( iiv_strategy=iiv_strategy, strictness=strictness, E=_E, ctx=ctx, dir_name="iivsearch", ) run_subfuncs[run_name] = func elif section == 'iovsearch': func = _subfunc_iov( amd_start_model=model, occasion=occasion, strictness=strictness, E=_E, ctx=ctx ) run_subfuncs['iovsearch'] = func elif section == 'residual': if any(k.startswith('ruvsearch') for k in run_subfuncs.keys()): run_name = 'rerun_ruvsearch' else: run_name = 'ruvsearch' if modeltype == 'drug_metabolite': # FIXME : Assume the dv number? # Perform two searches # One for the drug func = _subfunc_ruvsearch( dv=1, strictness=strictness, ctx=ctx, dir_name=f'{run_name}_drug', ) run_subfuncs[f'{run_name}_drug'] = func # And one for the metabolite func = _subfunc_ruvsearch( dv=2, strictness=strictness, ctx=ctx, dir_name=f'{run_name}_metabolite', ) run_subfuncs[f'{run_name}_metabolite'] = func elif modeltype == 'tmdd' and dv_types is not None: for key, value in dv_types.items(): func = _subfunc_ruvsearch( dv=value, strictness=strictness, ctx=ctx, dir_name=f'{run_name}_tmdd_{key}', ) run_subfuncs[f'ruvsearch_{key}'] = func else: func = _subfunc_ruvsearch(dv=dv, strictness=strictness, ctx=ctx, dir_name=run_name) run_subfuncs[f'{run_name}'] = func elif section == 'allometry': if mfl_allometry is None: func = _subfunc_allometry( amd_start_model=model, allometric_variable=allometric_variable, ctx=ctx ) run_subfuncs['allometry'] = func elif section == 'covariates': func = _subfunc_mechanistic_exploratory_covariates( amd_start_model=model, search_space=covsearch_features, mechanistic_covariates=mechanistic_covariates, strictness=strictness, ctx=ctx, ) run_subfuncs['covsearch'] = func else: raise ValueError(f"Unrecognized section {section} in order.") if retries_strategy == 'all_final': func = _subfunc_retires( tool=section, strictness=strictness, seed=spawn_seed(rng), ctx=ctx ) run_subfuncs[f'{section}_retries'] = func if retries_strategy == 'final': func = _subfunc_retires(tool="", strictness=strictness, seed=spawn_seed(rng), ctx=ctx) run_subfuncs['retries'] = func # Filter data to only contain dvid=1 if modeltype == "drug_metabolite": orig_dataset = model.dataset # FIXME : remove model = filter_dataset(model, f'{dvid_name} != 2') model = model.replace(dataset=model.dataset.reset_index()) if results is None: subctx = ctx.create_subcontext('modelfit') results = run_tool('modelfit', model, path=subctx.path, resume=resume) if not is_strictness_fulfilled(model, results, DEFAULT_STRICTNESS): ctx.log_warning('Base model failed strictness') model = model.replace(name='base') ctx.store_model_entry(ModelEntry.create(model=model, modelfit_results=results)) model_entry = ModelEntry.create(model=model, modelfit_results=results) next_model_entry = model_entry sum_subtools = [] sum_models = dict() sum_subtools.append(_create_sum_subtool('start', model_entry)) for tool_name, func in run_subfuncs.items(): next_model, next_res = next_model_entry.model, next_model_entry.modelfit_results if modeltype == 'drug_metabolite' and tool_name == "structsearch": next_model = next_model.replace(dataset=orig_dataset) subresults = func(next_model, next_res) if subresults is None: continue else: final_model = subresults.final_model.replace(name=f"final_{tool_name}") final_model_entry = ModelEntry.create( model=final_model, modelfit_results=subresults.final_results, parent=next_model ) ctx.store_model_entry(final_model_entry) if (mfl_allometry is not None and tool_name == 'modelsearch') or ( tool_name == "allometry" and 'allometry' in order[: order.index('covariates')] ): cov_before = ModelFeatures.create_from_mfl_string(get_model_features(next_model)) cov_after = ModelFeatures.create_from_mfl_string(get_model_features(final_model)) cov_differences = cov_after - cov_before if cov_differences: covsearch_features = covsearch_features.expand(final_model) covsearch_features += cov_differences func = _subfunc_mechanistic_exploratory_covariates( amd_start_model=model, search_space=covsearch_features, strictness=strictness, mechanistic_covariates=mechanistic_covariates, ctx=ctx, ) run_subfuncs['covsearch'] = func next_model = final_model next_model_entry = final_model_entry sum_subtools.append(_create_sum_subtool(tool_name, next_model_entry)) sum_models[tool_name] = subresults.summary_models # FIXME: add start model summary_models = _create_model_summary(sum_models) summary_tool = _create_tool_summary(sum_subtools) if summary_models is None: ctx.log_warning( 'AMDResults.summary_models is None because none of the tools yielded a summary.' ) final_model = next_model_entry.model final_results = next_model_entry.modelfit_results summary_errors = summarize_errors_from_entries([next_model_entry]) ctx.store_final_model_entry(final_model) # run simulation for VPC plot # NOTE: The seed is set to be in range for NONMEM sim_model = set_simulation(final_model, n=300, seed=int(rng.integers(2**31 - 1))) sim_res = _run_simulation(sim_model, ctx) simulation_data = sim_res.table if final_results.predictions is not None: dv_vs_ipred_plot = plot_dv_vs_ipred(model, final_results.predictions, dvid_name) dv_vs_pred_plot = plot_dv_vs_pred(model, final_results.predictions, dvid_name) else: dv_vs_pred_plot = None dv_vs_ipred_plot = None if final_results.residuals is not None: cwres_vs_idv_plot = plot_cwres_vs_idv(model, final_results.residuals, dvid_name) else: cwres_vs_idv_plot = None if final_results.predictions is not None and final_results.residuals is not None: abs_cwres_vs_ipred_plot = plot_abs_cwres_vs_ipred( model, predictions=final_results.predictions, residuals=final_results.residuals, stratify_on=dvid_name, ) else: abs_cwres_vs_ipred_plot = None if final_results.individual_estimates is not None: eta_distribution_plot = plot_eta_distributions( final_model, final_results.individual_estimates ) else: eta_distribution_plot = None if not simulation_data.empty: final_vpc_plot = plot_vpc(final_model, simulation_data, stratify_on=dvid_name) else: ctx.log_warning("No vpc could be generated. Did the simulation fail?") final_vpc_plot = None res = AMDResults( final_model=final_model.name, summary_tool=summary_tool, summary_models=summary_models, summary_errors=summary_errors, final_model_parameter_estimates=_table_final_parameter_estimates( final_results.parameter_estimates_sdcorr, final_results.standard_errors_sdcorr ), final_model_dv_vs_ipred_plot=dv_vs_ipred_plot, final_model_dv_vs_pred_plot=dv_vs_pred_plot, final_model_cwres_vs_idv_plot=cwres_vs_idv_plot, final_model_abs_cwres_vs_ipred_plot=abs_cwres_vs_ipred_plot, final_model_eta_distribution_plot=eta_distribution_plot, final_model_eta_shrinkage=table_final_eta_shrinkage(final_model, final_results), final_model_vpc_plot=final_vpc_plot, ) # Since we are outside of the regular tools machinery the following is needed results_path = ctx.path / 'results.json' write_results(results=res, path=results_path) write_results(results=res, path=ctx.path / 'results.csv', csv=True) rst_path = get_rst_path(res) target_path = ctx.path / 'results.html' with warnings.catch_warnings(): warnings.simplefilter("ignore") generate_report(rst_path, results_path, target_path) ctx.log_info("Finishing tool amd") return res
# FIXME: this function is a workaround until AMD is a real tool. def _setup_run(kwargs): dispatching_options, common_options, tool_options = split_common_options(kwargs) dispatcher, ctx = get_run_setup(dispatching_options, common_options, 'amd') tool_metadata = create_metadata( database=ctx, tool_name='amd', tool_func=run_amd, args=tuple(), tool_options=tool_options, common_options=common_options, dispatching_options=dispatching_options, ) # Workaround to remove common options from metadata to mimic real tools. # These are included since create_metadata uses the function signature. tool_options_new = tool_metadata['tool_options'].copy() for key, value in tool_metadata['tool_options'].items(): if key in dispatching_options.keys() or key in common_options.keys(): tool_options_new.pop(key) tool_metadata['tool_options'] = tool_options_new ctx.store_metadata(tool_metadata) return ctx def _table_final_parameter_estimates(parameter_estimates, ses): rse = ses / parameter_estimates rse.name = "RSE" df = pd.concat([parameter_estimates, rse], axis=1) return df def _create_sum_subtool(tool_name, selected_model_entry): model, res = selected_model_entry.model, selected_model_entry.modelfit_results return { 'tool': tool_name, 'selected_model': model.name, 'description': model.description, 'n_params': len(model.parameters.nonfixed), 'ofv': res.ofv, } def _create_model_summary(summaries): dfs = [] for tool_name, df in summaries.items(): df = df.reset_index() df['tool'] = [tool_name] * len(df) df.set_index(['tool', 'step', 'model'], inplace=True) dfs.append(df) model_summary = pd.concat(dfs, axis=0) return model_summary def _create_tool_summary(rows): summary_prev = None rows_updated = [] for summary in rows: summary_updated = summary if not summary_prev: summary_updated['d_params'] = 0 summary_updated['dofv'] = 0 else: summary_updated['d_params'] = summary['n_params'] - summary_prev['n_params'] summary_updated['dofv'] = summary_prev['ofv'] - summary['ofv'] rows_updated.append(summary_updated) summary_prev = summary columns = ['tool', 'selected_model', 'description', 'ofv', 'dofv', 'n_params', 'd_params'] df = pd.DataFrame.from_records(rows_updated, columns=columns).set_index(['tool']) return df SubFunc = Callable[[Model], Optional[Results]] def noop_subfunc(_: Model): return None def _run_simulation(model, ctx): subctx = ctx.create_subcontext('simulation') res = run_tool('simulation', model=model, path=subctx.path) return res def _subfunc_retires(tool, strictness, seed, ctx): subctx = ctx.create_subcontext(f'{tool}_retries') def _run_retries(model, modelfit_results): res = run_tool( 'retries', model=model, results=modelfit_results, strictness=strictness, scale='UCP', prefix_name=tool, seed=seed, path=subctx.path, ) assert isinstance(res, Results) return res return _run_retries def _subfunc_modelsearch(search_space: tuple[Statement, ...], strictness, E, ctx) -> SubFunc: subctx = ctx.create_subcontext('modelsearch') def _run_modelsearch(model, modelfit_results): if E and 'modelsearch' in E.keys(): rank_type = 'mbic' e = E['modelsearch'] else: rank_type = 'bic' e = None res = run_tool( 'modelsearch', model=model, results=modelfit_results, search_space=search_space, algorithm='reduced_stepwise', strictness=strictness, rank_type=rank_type, E=e, path=subctx.path, ) assert isinstance(res, Results) return res return _run_modelsearch def _subfunc_structsearch(ctx, **kwargs) -> SubFunc: subctx = ctx.create_subcontext("structsearch") def _run_structsearch(model, modelfit_results): res = run_tool( 'structsearch', model=model, results=modelfit_results, **kwargs, path=subctx.path, ) assert isinstance(res, Results) return res return _run_structsearch def _subfunc_structsearch_tmdd( search_space, type, strictness, dv_types, orig_dataset, ctx ) -> SubFunc: subctx1 = ctx.create_subcontext("modelsearch") subctx2 = ctx.create_subcontext("structsearch") def _run_structsearch_tmdd(model, modelfit_results): res = run_tool( 'modelsearch', model=model, results=modelfit_results, search_space=search_space, algorithm='reduced_stepwise', strictness=strictness, rank_type='bic', path=subctx1.path, ) final_model = res.final_model all_models = [ subctx1.retrieve_model_entry(model_name).model for model_name in subctx1.list_all_names() if model_name not in ['input', 'final'] ] if not has_mixed_mm_fo_elimination(final_model): # Only select models that have mixed MM FO elimination # If no model with mixed MM FO then final model from modelsearch will be used models_mixed_mm_fo_el = [ model.name for model in all_models if has_mixed_mm_fo_elimination(model) ] if len(models_mixed_mm_fo_el) > 0: rank_all = res.summary_tool.dropna(subset='bic')[['rank']] rank_filtered = rank_all.query('model in @models_mixed_mm_fo_el') if len(rank_filtered) > 0: rank_filtered = rank_filtered.sort_values(by=['rank']) highest_ranked = rank_filtered.index[0] final_model = retrieve_models(subctx1.path, names=[highest_ranked])[0] res_path = ( subctx1.path / "models" / final_model.name / ("model" + subctx1.model_database.file_extension) ) final_res = get_modelfit_results(final_model, res_path) extra_model = None extra_model_results = None n_peripherals = len(final_model.statements.ode_system.find_peripheral_compartments()) modelfeatures = ModelFeatures.create_from_mfl_string(get_model_features(final_model)) # Model features - 1 peripheral compartment modelfeatures_minus = modelfeatures.replace( peripherals=(Peripherals((n_peripherals - 1,)),) ) # Loop through all models and find one with same features models = [ model.name for model in all_models if ModelFeatures.create_from_mfl_string(get_model_features(model)) == modelfeatures_minus ] if len(models) > 0: # Find highest ranked model rank_all = res.summary_tool.dropna(subset='bic')[['rank']] rank_filtered = rank_all.query('model in @models') if len(rank_filtered) > 0: rank_filtered = rank_filtered.sort_values(by=['rank']) highest_ranked = rank_filtered.index[0] extra_model = retrieve_models(subctx1.path, names=[highest_ranked])[0] if dv_types is not None: extra_model = extra_model.replace(dataset=orig_dataset) res_path = ( subctx1.path / "models" / extra_model.name / ("model" + subctx1.model_database.file_extension) ) extra_model_results = get_modelfit_results(extra_model, res_path) # Replace original dataset if multiple DVs if dv_types is not None: final_model = final_model.replace(dataset=orig_dataset) res = run_tool( 'structsearch', model=final_model, results=final_res, type=type, extra_model=extra_model, extra_model_results=extra_model_results, strictness=strictness, dv_types=dv_types, path=subctx2.path, ) assert isinstance(res, Results) return res return _run_structsearch_tmdd def _subfunc_iiv(iiv_strategy, strictness, E, ctx, dir_name) -> SubFunc: subctx = ctx.create_subcontext(dir_name) def _run_iiv(model, modelfit_results): if E and 'iivsearch' in E.keys(): rank_type = 'mbic' e_p, e_q = E['iivsearch'] else: rank_type = 'bic' e_p, e_q = None, None keep = [ str(symbol) for symbol in get_central_volume_and_clearance(model) if symbol in find_clearance_parameters(model) ] res = run_tool( 'iivsearch', model=model, results=modelfit_results, algorithm='top_down_exhaustive', iiv_strategy=iiv_strategy, strictness=strictness, rank_type=rank_type, E_p=e_p, E_q=e_q, keep=keep, path=subctx.path, ) assert isinstance(res, Results) return res return _run_iiv def _subfunc_ruvsearch(dv, strictness, ctx, dir_name) -> SubFunc: subctx = ctx.create_subcontext(dir_name) def _run_ruvsearch(model, modelfit_results): if has_blq_transformation(model): skip, max_iter = ['IIV_on_RUV', 'time_varying'], 1 else: skip, max_iter = [], 3 res = run_tool( 'ruvsearch', model=model, results=modelfit_results, skip=skip, max_iter=max_iter, dv=dv, strictness=strictness, path=subctx.path, ) assert isinstance(res, Results) return res return _run_ruvsearch def _subfunc_structural_covariates( amd_start_model: Model, search_space: ModelFeatures, strictness, ctx, ) -> SubFunc: subctx = ctx.create_subcontext("covsearch_structural") def _run_structural_covariates(model, modelfit_results): allowed_parameters = allowed_parameters = set(get_pk_parameters(model)).union( str(statement.symbol) for statement in model.statements.before_odes ) # Extract all forced mfl = search_space mfl_covariates = mfl.expand(model).covariate structural_searchspace = [] skipped_parameters = set() for cov_statement in mfl_covariates: if not cov_statement.optional.option: filtered_parameters = tuple( [p for p in cov_statement.parameter if p in allowed_parameters] ) # Not optional -> Add to all search spaces (was added in structural run) structural_searchspace.append( Covariate( filtered_parameters, cov_statement.covariate, cov_statement.fp, cov_statement.op, cov_statement.optional, ) ) skipped_parameters.union(set(cov_statement.parameter) - set(filtered_parameters)) # Ignore warning? if skipped_parameters: ctx.log_warning( f'{skipped_parameters} missing in start model and structural covariate effect cannot be added' ' Might be added during a later COVsearch step if possible.' ) if not structural_searchspace and not skipped_parameters: # Uneccessary to warn (?) return None elif not structural_searchspace: ctx.log_warning( 'No applicable structural covariates found in search space. Skipping structural_COVsearch' ) return None struct_searchspace = mfl.create_from_mfl_statement_list(structural_searchspace) res = run_tool( 'covsearch', model=model, results=modelfit_results, search_space=struct_searchspace, strictness=strictness, path=subctx.path, ) assert isinstance(res, Results) return res return _run_structural_covariates def _subfunc_mechanistic_exploratory_covariates( amd_start_model: Model, search_space: ModelFeatures, mechanistic_covariates, strictness, ctx, ) -> SubFunc: covariates = set(extract_covariates(amd_start_model, search_space.mfl_statement_list())) if covariates: allowed_covariates = get_covariates_allowed_in_covariate_effect(amd_start_model) for covariate in sorted(covariates): if covariate not in allowed_covariates: raise ValueError( f'Invalid `search_space` because of invalid covariate found in' f' search_space: got `{covariate}`,' f' must be in {sorted(allowed_covariates)}.' ) else: ctx.log_warning( 'COVsearch will most likely be skipped because no covariates could be found.' ' Check search_space definition' ' and .datainfo usage of "covariate" type and "continuous" flag.' ) # FIXME: Will always create these subcontext1 = ctx.create_subcontext("covsearch_mechanistic") subcontext2 = ctx.create_subcontext("covsearch_exploratory") def _run_mechanistic_exploratory_covariates(model, modelfit_results): index_offset = 0 # For naming runs effects = search_space.convert_to_funcs(model=model) if not effects: ctx.log_warning( 'Skipping COVsearch because no effect candidates could be generated.' ' Check search_space definition' ' and .datainfo usage of "covariate" type and "continuous" flag.' ) return None if mechanistic_covariates: mechanistic_searchspace, filtered_searchspace = _mechanistic_cov_extraction( search_space, model, mechanistic_covariates ) # FIXME : Move to validation if not mechanistic_searchspace: ctx.log_warning( 'No covariate effect for given mechanistic covariates found.' ' Skipping mechanistic COVsearch.' ) else: res = run_tool( 'covsearch', model=model, results=modelfit_results, search_space=mechanistic_searchspace, strictness=strictness, path=subcontext1.path, ) covsearch_model_number = [ re.search(r"(\d*)$").group(1) for model_name in subcontext1.list_all_names() if model_name.startswith('covsearch') ] if covsearch_model_number: index_offset = max(covsearch_model_number) # Get largest number of run if res.final_model.name != model.name: model = res.final_model res_path = ( subcontext1.path / "models" / model.name / ("model" + subcontext1.model_database.file_extension) ) modelfit_results = get_modelfit_results(model, res_path) added_covs = ModelFeatures.create_from_mfl_string( get_model_features(model) ).covariate filtered_searchspace.extend( added_covs ) # Avoid removing added cov in exploratory else: filtered_searchspace = search_space res = run_tool( 'covsearch', model=model, results=modelfit_results, search_space=filtered_searchspace, strictness=strictness, path=subcontext2.path, naming_index_offset=index_offset, ) assert isinstance(res, Results) return res return _run_mechanistic_exploratory_covariates def _mechanistic_cov_extraction(search_space, model, mechanistic_covariates): mechanistic_covariates = [c if isinstance(c, str) else set(c) for c in mechanistic_covariates] # Extract them and all forced mfl = search_space mfl_covariates = mfl.expand(model).covariate mechanistic_searchspace = [] for cov_statement in mfl_covariates: if not cov_statement.optional.option: # Not optional -> Add search space (was added in structural run) mechanistic_searchspace.append(cov_statement) else: current_cov = [] current_param = [] for cov in cov_statement.covariate: if cov in mechanistic_covariates: current_cov.append(cov) current_param.append(cov_statement.parameter) else: for param in cov_statement.parameter: if {cov, param} in mechanistic_covariates: current_cov.append(cov) current_param.append([param]) for cc, cp in zip(current_cov, current_param): mechanistic_cov = Covariate( tuple(cp), (cc,), cov_statement.fp, cov_statement.op, cov_statement.optional, ) mechanistic_searchspace.append(mechanistic_cov) if mechanistic_searchspace: mechanistic_searchspace = ModelFeatures.create_from_mfl_statement_list( mechanistic_searchspace ) filtered_searchspace = mfl - mechanistic_searchspace else: filtered_searchspace = mfl return mechanistic_searchspace, filtered_searchspace def _subfunc_allometry(amd_start_model: Model, allometric_variable, ctx) -> SubFunc: if allometric_variable is None: # Somewhat redundant with validation function allometric_variable = amd_start_model.datainfo.descriptorix["body weight"][0].name subctx = ctx.create_subcontext("allometry") def _run_allometry(model, modelfit_results): res = run_tool( 'allometry', model=model, results=modelfit_results, allometric_variable=allometric_variable, path=subctx.path, ) assert isinstance(res, Results) return res return _run_allometry def _subfunc_iov(amd_start_model, occasion, strictness, E, ctx) -> SubFunc: subctx = ctx.create_subcontext("iovsearch") def _run_iov(model, modelfit_results): if E and 'iovsearch' in E.keys(): rank_type = 'mbic' e = E['iovsearch'] else: rank_type = 'bic' e = None res = run_tool( 'iovsearch', model=model, results=modelfit_results, column=occasion, strictness=strictness, rank_type=rank_type, E=e, path=subctx.path, ) assert isinstance(res, Results) return res return _run_iov def check_skip( context, model: Model, occasion: str, allometric_variable: str, ignore_datainfo_fallback: bool = False, search_space: Optional[str] = None, ): to_be_skipped = [] # IOVSEARCH if occasion is None: context.log_warning('IOVsearch will be skipped because occasion is None.') to_be_skipped.append("iovsearch") else: categories = get_occasion_levels(model.dataset, occasion) if len(categories) < 2: context.log_warning( f'Skipping IOVsearch because there are less than two ' f'occasion categories in column "{occasion}": {categories}.' ) to_be_skipped.append("iovsearch") # ALLOMETRY if allometric_variable is None: if not ignore_datainfo_fallback: try: model.datainfo.descriptorix["body weight"] except IndexError: context.log_warning( 'Allometry will be skipped because allometric_variable is None and could' ' not be inferred through .datainfo via "body weight" descriptor.' ) to_be_skipped.append("allometry") else: context.log_warning( 'Allometry will be skipped because allometric_variable is None and' ' ignore_datainfo_fallback is True' ) to_be_skipped.append("allometry") if search_space is not None: ss_mfl = mfl_parse(search_space, True) covsearch_features = ModelFeatures.create(covariate=ss_mfl.covariate) covsearch_features = covsearch_features.expand(model) covariates = [] if cov_attr := covsearch_features.covariate: covariates.extend([x for cov in cov_attr for x in cov.covariate]) if not covariates: if ignore_datainfo_fallback: context.log_warning( 'COVsearch will be skipped because no covariates were given' ' and ignore_datainfo_fallback is True.' ) to_be_skipped.append("covariates") elif not any(column.type == 'covariate' for column in model.datainfo): context.log_warning( 'COVsearch will be skipped because no covariates were given' ' or could be extracted.' ' Check search_space definition' ' and .datainfo usage of "covariate" type and "continuous" flag.' ) to_be_skipped.append("covariates") else: if ignore_datainfo_fallback: context.log_warning( 'COVsearch will be skipped because no covariates were given' ' and ignore_datainfo_fallback is True.' ) to_be_skipped.append("covariates") elif not any(column.type == 'covariate' for column in model.datainfo): context.log_warning( 'COVsearch will be skipped because no covariates were given' ' or could be extracted.' ' Check search_space definition' ' and .datainfo usage of "covariate" type and "continuous" flag.' ) to_be_skipped.append("covariates") return to_be_skipped @with_runtime_arguments_type_check def validate_input( input: Union[Model, Path, str, pd.DataFrame], results: Optional[ModelfitResults] = None, modeltype: str = 'basic_pk', administration: str = 'oral', strategy: str = "default", cl_init: Optional[float] = None, vc_init: Optional[float] = None, mat_init: Optional[float] = None, b_init: Optional[float] = None, emax_init: Optional[float] = None, ec50_init: Optional[float] = None, met_init: Optional[float] = None, search_space: Optional[str] = None, lloq_method: Optional[str] = None, lloq_limit: Optional[float] = None, allometric_variable: Optional[TSymbol] = None, occasion: Optional[str] = None, path: Optional[Union[str, Path]] = None, resume: bool = False, strictness: Optional[str] = "minimization_successful or (rounding_errors and sigdigs>=0.1)", dv_types: Optional[dict[Literal[DV_TYPES], int]] = None, mechanistic_covariates: Optional[list[Union[str, tuple]]] = None, retries_strategy: Literal["final", "all_final", "skip"] = "all_final", seed: Union[np.random.Generator, int] = DEFAULT_SEED, parameter_uncertainty_method: Optional[Literal['SANDWICH', 'SMAT', 'RMAT', 'EFIM']] = None, ignore_datainfo_fallback: bool = False, _E: Optional[dict[str, Union[float, str, Sequence[Union[float, str]]]]] = None, ): check_list("modeltype", modeltype, ALLOWED_MODELTYPE) check_list("administration", administration, ALLOWED_ADMINISTRATION) check_list("strategy", strategy, ALLOWED_STRATEGY) if modeltype == 'pkpd': if cl_init is not None or vc_init is not None or mat_init is not None: raise ValueError("Cannot provide pk parameter inits for pkpd model") if b_init is None: raise ValueError("Initial estimate for baseline is needed") if emax_init is None: raise ValueError("Initial estimate for E_max is needed") if ec50_init is None: raise ValueError("Initial estimate for EC_50 is needed") if met_init is None: raise ValueError("Initial estimate for MET is needed") else: if cl_init is None: raise ValueError("Initial estimate for CL is needed") if vc_init is None: raise ValueError("Initial estimate for VC is needed") if administration in ('oral', 'ivoral') and mat_init is None: raise ValueError("Initial estimate for MAT is needed") if search_space is not None: try: ss_mfl = mfl_parse(search_space, True) except: # noqa E722 raise ValueError(f'Invalid `search_space`, could not be parsed: "{search_space}"') if len(ss_mfl.mfl_statement_list()) == 0: raise ValueError(f'`search_space` evaluated to be empty : "{search_space}') if ( administration == "oral" and ss_mfl.absorption is not None and "INST" in (a.name for a in ss_mfl.absorption.modes) ): raise ValueError( 'The given search space have instantaneous absorption (´INST´)' ' which is not allowed with ´oral´ administration.' ) check_list("retries_strategy", retries_strategy, RETRIES_STRATEGIES) if _E: if any(value in (0.0, '0%') for value in _E.values()): raise ValueError('E-values in `_E` cannot be 0') def later_input_validation( input: Model, results: Optional[ModelfitResults] = None, modeltype: str = 'basic_pk', administration: str = 'oral', strategy: str = "default", cl_init: Optional[float] = None, vc_init: Optional[float] = None, mat_init: Optional[float] = None, b_init: Optional[float] = None, emax_init: Optional[float] = None, ec50_init: Optional[float] = None, met_init: Optional[float] = None, search_space: Optional[str] = None, lloq_method: Optional[str] = None, lloq_limit: Optional[float] = None, allometric_variable: Optional[TSymbol] = None, occasion: Optional[str] = None, path: Optional[Union[str, Path]] = None, resume: bool = False, strictness: Optional[str] = "minimization_successful or (rounding_errors and sigdigs>=0.1)", dv_types: Optional[dict[Literal[DV_TYPES], int]] = None, mechanistic_covariates: Optional[list[Union[str, tuple]]] = None, retries_strategy: Literal["final", "all_final", "skip"] = "all_final", seed: Optional[Union[np.random.Generator, int]] = None, parameter_uncertainty_method: Optional[Literal['SANDWICH', 'SMAT', 'RMAT', 'EFIM']] = None, ignore_datainfo_fallback: bool = False, _E: Optional[dict[str, Union[float, str, Sequence[Union[float, str]]]]] = None, ): # FIXME: This function should be removed and refactored into validate_inputs # and optionally give warnings/errors during the run # it currently depends on a model being created if a dataset was input to AMD. model = input # IOVSEARCH if occasion is not None and occasion not in model.dataset: raise ValueError( f'Invalid `occasion`: got `{occasion}`,' f' must be one of {sorted(model.datainfo.names)}.' ) # ALLOMETRY if allometric_variable is not None: validate_allometric_variable(model, allometric_variable) # COVSEARCH if mechanistic_covariates: allowed_covariates = get_covariates_allowed_in_covariate_effect(model) allowed_parameters = allowed_parameters = set(get_pk_parameters(model)).union( str(statement.symbol) for statement in model.statements.before_odes ) for c in mechanistic_covariates: if isinstance(c, str): if c not in allowed_covariates: raise ValueError( f'Invalid mechanistic covariate: {c}.' f' Must be in {sorted(allowed_covariates)}' ) else: if len(c) != 2: raise ValueError( f'Invalid argument in `mechanistic_covariate`: {c}.' f' Tuples need to be given with one parameter and one covariate.' ) cov_found = False par_found = False for a in c: if a in allowed_covariates: if cov_found: raise ValueError( f'`mechanistic_covariates` contain invalid argument: got `{c}`,' f' which contain two covariates.' f' Tuples need to be given with one parameter and one covariate.' ) cov_found = True elif a in allowed_parameters: if par_found: raise ValueError( f'`mechanistic_covariates` contain invalid argument: got `{c}`,' f' which contain two parameters.' f' Tuples need to be given with one parameter and one covariate.' ) par_found = True else: raise ValueError( f'`mechanistic_covariates` contain invalid argument: got `{c}`,' f' which contain {a}.' f' Tuples need to be given with one parameter and one covariate.' ) if search_space is not None: ss_mfl = mfl_parse(search_space, True) covsearch_features = ModelFeatures.create(covariate=ss_mfl.covariate) covsearch_features = covsearch_features.expand(model) covariates = [] if cov_attr := covsearch_features.covariate: # Check COVARIATE() covariates.extend([x for cov in cov_attr for x in cov.covariate]) if covariates: allowed_covariates = get_covariates_allowed_in_covariate_effect(model) for covariate in sorted(covariates): if covariate not in allowed_covariates: raise ValueError( f'Invalid `search_space` because of invalid covariate found in' f' search_space: got `{covariate}`,' f' must be in {sorted(allowed_covariates)}.' )