Source code for

from __future__ import annotations

import importlib
import inspect
import re
import warnings
from import Mapping, Sequence
from datetime import datetime
from pathlib import Path
from typing import Any, Optional, Union, get_type_hints

import pharmpy
import pharmpy.workflows.results
from pharmpy.deps import numpy as np
from pharmpy.deps import pandas as pd
from pharmpy.internals.fs.path import normalize_user_given_path
from pharmpy.model import Model
from pharmpy.modeling import (
from pharmpy.modeling.lrt import degrees_of_freedom as lrt_df
from pharmpy.modeling.lrt import test as lrt_test
from import create_results as psn_create_results
from pharmpy.workflows import Results, Workflow, execute_workflow, split_common_options
from pharmpy.workflows.context import Context, LocalDirectoryContext
from pharmpy.workflows.model_database import ModelDatabase
from pharmpy.workflows.model_entry import ModelEntry
from pharmpy.workflows.results import ModelfitResults, mfr

from .external import parse_modelfit_results

[docs] def fit( model_or_models: Union[Model, list[Model]], tool: Optional[str] = None, path: Optional[Union[Path, str]] = None, context: Optional[Context] = None, ) -> Union[ModelfitResults, list[ModelfitResults]]: """Fit models. Parameters ---------- model_or_models : Model | list[Model] List of models or one single model tool : str Estimation tool to use. None to use default path : Path | str Path to fit directory context : Context Run in this context Return ------ ModelfitResults | list[ModelfitResults] ModelfitResults for the model or models Examples -------- >>> from pharmpy.modeling import load_example_model >>> from import fit >>> model = load_example_model("pheno") # doctest: +SKIP >>> results = fit(model) # doctest: +SKIP See also -------- run_tool """ single, models = ( (True, [model_or_models]) if isinstance(model_or_models, Model) else (False, model_or_models) ) modelfit_results = run_tool('modelfit', models, tool=tool, path=path, context=context) return modelfit_results if single else list(modelfit_results)
[docs] def create_results(path: Union[str, Path], **kwargs) -> Results: """Create/recalculate results object given path to run directory Parameters ---------- path : str, Path Path to run directory kwargs Arguments to pass to tool specific create results function Returns ------- Results Results object for tool Examples -------- >>> from import create_results >>> res = create_results("frem_dir1") # doctest: +SKIP See also -------- read_results """ path = normalize_user_given_path(path) res = psn_create_results(path, **kwargs) return res
[docs] def read_results(path: Union[str, Path]) -> Results: """Read results object from file Parameters ---------- path : str, Path Path to results file Return ------ Results Results object for tool Examples -------- >>> from import read_results >>> res = read_results("results.json") # doctest: +SKIP See also -------- create_results """ path = normalize_user_given_path(path) res = pharmpy.workflows.results.read_results(path) return res
[docs] def run_tool(name: str, *args, **kwargs) -> Union[Model, list[Model], tuple[Model], Results]: """Run tool workflow Parameters ---------- name : str Name of tool to run args Arguments to pass to tool kwargs Arguments to pass to tool Return ------ Results Results object for tool Examples -------- >>> from pharmpy.modeling import * >>> model = load_example_model("pheno") >>> from import run_tool # doctest: +SKIP >>> res = run_tool("ruvsearch", model) # doctest: +SKIP """ # NOTE: The implementation of run_tool is split into those two functions to # allow for individual testing and mocking. tool = import_tool(name) return run_tool_with_name(name, tool, args, kwargs)
def import_tool(name: str): return importlib.import_module(f'{name}') def run_tool_with_name( name: str, tool, args: Sequence, kwargs: Mapping[str, Any] ) -> Union[Model, list[Model], tuple[Model], Results]: dispatching_options, common_options, tool_options = split_common_options(kwargs) create_workflow = tool.create_workflow dispatcher, ctx = _get_run_setup(dispatching_options, common_options, name) tool_params = inspect.signature(create_workflow).parameters tool_param_types = get_type_hints(create_workflow) tool_metadata = _create_metadata( database=ctx, dispatcher=dispatcher, tool_name=name, tool_params=tool_params, tool_param_types=tool_param_types, args=args, tool_options=tool_options, common_options=common_options, ) ctx.store_metadata(tool_metadata) if validate_input := getattr(tool, 'validate_input', None): validate_input(*args, **tool_options) wf: Workflow = create_workflow(*args, **tool_options) assert == name res = execute_workflow(wf, dispatcher=dispatcher, context=ctx) assert name == 'modelfit' or isinstance(res, Results) or name == 'simulation' tool_metadata = _update_metadata(tool_metadata, res) ctx.store_metadata(tool_metadata) return res def _create_metadata( database: Context, dispatcher, tool_name: str, tool_params, tool_param_types, args: Sequence, tool_options: Mapping[str, Any], common_options: Mapping[str, Any], ): tool_metadata = _create_metadata_tool( database, tool_name, tool_params, tool_param_types, args, tool_options ) setup_metadata = _create_metadata_common(database, dispatcher, tool_name, common_options) tool_metadata['common_options'] = setup_metadata return tool_metadata def _update_metadata(tool_metadata, res): # FIXME: Make metadata immutable tool_metadata['stats']['end_time'] = _now() return tool_metadata
[docs] def resume_tool(path: str): """Resume tool workflow from tool database path Parameters ---------- path : str The path to the tool database Return ------ Results Results object for tool Examples -------- >>> from pharmpy.modeling import * # doctest: +SKIP >>> res = resume_tool("resmod_dir1") # doctest: +SKIP """ dispatcher, tool_database = _get_run_setup_from_metadata(path) tool_metadata = tool_database.retrieve_metadata() tool_name = tool_metadata['tool_name'] tool = importlib.import_module(f'{tool_name}') create_workflow = tool.create_workflow tool_params = inspect.signature(create_workflow).parameters tool_param_types = get_type_hints(create_workflow) tool_options = _parse_tool_options_from_json_metadata( tool_metadata, tool_params, tool_param_types, tool_database ) args, kwargs = _parse_args_kwargs_from_tool_options(tool_params, tool_options) if validate_input := getattr(tool, 'validate_input', None): validate_input(*args, **kwargs) wf: Workflow = create_workflow(*args, **kwargs) assert == tool_name res = execute_workflow(wf, dispatcher=dispatcher, database=tool_database) assert tool_name == 'modelfit' or isinstance(res, Results) tool_metadata = _update_metadata(tool_metadata, res) tool_database.store_metadata(tool_metadata) return res
def _parse_tool_options_from_json_metadata( tool_metadata, tool_params, tool_param_types, tool_database, ): tool_options = tool_metadata['tool_options'] # NOTE: Load models to memory for model_key in _input_model_param_keys(tool_params, tool_param_types): model_metadata = tool_options.get(model_key) if model_metadata is None: raise ValueError( f'Cannot resume run because model argument "{model_key}" cannot be restored.' ) assert model_metadata['__class__'] == 'Model' model_name = model_metadata['arg_name'] db_name = model_metadata['db_name'] db: ModelDatabase = tool_database.model_database try: model = db.retrieve_model(db_name) model = model.replace(name=model_name) except KeyError: raise ValueError( f'Cannot resume run because model argument "{model_key}" ({model_name}) cannot be restored.' ) tool_options = tool_options.copy() tool_options[model_key] = model # NOTE: Load results to memory for results_key in _results_param_keys(tool_params, tool_param_types): results_json = tool_options.get(results_key) if results_json is not None: tool_options = tool_options.copy() tool_options[results_key] = pharmpy.workflows.results.read_results(results_json) return tool_options def _parse_args_kwargs_from_tool_options(tool_params, tool_options): args = [] kwargs = {} for p in tool_params.values(): # Positional args if p.default == p.empty: args.append(tool_options[]) # Named args else: if in tool_options.keys(): kwargs[] = tool_options[] return args, kwargs def _create_metadata_tool( database: Context, tool_name: str, tool_params, tool_param_types, args: Sequence, kwargs: Mapping[str, Any], ): # FIXME: Add config file dump, estimation tool etc. tool_metadata = { 'pharmpy_version': pharmpy.__version__, 'tool_name': tool_name, 'stats': {'start_time': _now()}, 'tool_options': {}, } for i, p in enumerate(tool_params.values()): try: name, value =, args[i] except IndexError: try: name, value =, kwargs[] except KeyError: if p.default == p.empty: # Positional args raise ValueError(f'{tool_name}: \'{}\' was not set') else: # Named args name, value =, p.default tool_metadata['tool_options'][name] = value if tool_name != 'modelfit': db = database.model_database for key, arg_name, db_name in _store_input_models( db, tool_params, tool_param_types, args, kwargs ): tool_metadata['tool_options'][key] = { '__class__': 'Model', 'arg_name': arg_name, 'db_name': db_name, } return tool_metadata def _create_metadata_common( database: Context, dispatcher, toolname: Optional[str], common_options: Mapping[str, Any] ): setup_metadata = {} setup_metadata['dispatcher'] = dispatcher.__name__ # FIXME: Naming of workflows/tools should be consistent (db and input name of tool) setup_metadata['context'] = { 'class': type(database).__name__, 'toolname': toolname, 'path': str(database.path), } for key, value in common_options.items(): if key not in setup_metadata.keys(): if isinstance(value, Path): value = str(value) setup_metadata[str(key)] = value return setup_metadata def _store_input_models( db: ModelDatabase, params, types, args: Sequence, kwargs: Mapping[str, Any] ): for param_key, model in _input_models(params, types, args, kwargs): input_model_name = f'input_{param_key}' _store_input_model(db, model, input_model_name) yield param_key,, input_model_name def _filter_params(kind, params, types): for i, param_key in enumerate(params): param = params[param_key] param_type = types.get(param_key) if param_type in (kind, Optional[kind]): # NOTE: We do not handle *{param_key}, or **{param_key} assert param.kind != param.VAR_POSITIONAL assert param.kind != param.VAR_KEYWORD yield i, param_key def _input_model_param_keys(params, types): for _, param_key in _filter_params(Model, params, types): yield param_key def _results_param_keys(params, types): for _, param_key in _filter_params(ModelfitResults, params, types): yield param_key def _input_models(params, types, args: Sequence, kwargs: Mapping[str, Any]): for i, param_key in _filter_params(Model, params, types): model = args[i] if i < len(args) else kwargs.get(param_key) if model is None: continue yield param_key, model def _store_input_model(db: ModelDatabase, model: Model, name: str): model_copy = model.replace(name=name) with db.transaction(model_copy) as txn: txn.store_model() txn.store_modelfit_results() def _now(): return def _get_run_setup(dispatching_options, common_options, toolname) -> tuple[Any, Context]: try: dispatcher = dispatching_options['dispatcher'] except KeyError: from pharmpy.workflows import default_dispatcher dispatcher = default_dispatcher ctx = dispatching_options.get('context', None) if ctx is None: from pharmpy.workflows import default_context common_path = dispatching_options.get('path', None) if common_path is not None: path = Path(dispatching_options['path']) ctx = default_context(, path.parent, common_options=common_options) else: n = 1 while True: name = f"{toolname}{n}" if not default_context.exists(name): ctx = default_context(name, common_options=common_options) break n += 1 return dispatcher, ctx
[docs] def retrieve_models( source: Union[str, Path, Context], names: Optional[list[str]] = None, ) -> list[Model]: """Retrieve models after a tool run Any models created and run by the tool can be retrieved. Parameters ---------- source : str, Path, Context Source where to find models. Can be a path (as str or Path), or a Context names : list List of names of the models to retrieve or None for all Return ------ list List of retrieved model objects Examples -------- >>> from import retrieve_models >>> tooldir_path = 'path/to/tool/directory' >>> models = retrieve_models(tooldir_path, names=['run1']) # doctest: +SKIP See also -------- retrieve_final_model """ if isinstance(source, Path) or isinstance(source, str): path = Path(source) context = LocalDirectoryContext(path) elif isinstance(source, Context): context = source else: raise NotImplementedError(f'Not implemented for type \'{type(source)}\'') names_all = context.list_all_names() if names is None: names = names_all diff = set(names).difference(names_all) if diff: raise ValueError(f'Models {diff} not in database') models = [context.retrieve_model_entry(name).model for name in names] return models
[docs] def retrieve_final_model(res: Results) -> Model: """Retrieve final model from a result object Parameters ---------- res : Results A results object Return ------ Model Reference to final model Examples -------- >>> from import read_results, retrieve_final_model >>> res = read_results("results.json") # doctest: +SKIP >>> model = retrieve_final_model(res) # doctest: +SKIP See also -------- retrieve_models """ try: final_model = getattr(res, 'final_model') except AttributeError: raise ValueError('Attribute \'final_model\' is missing from results object') if final_model is None: raise ValueError('Attribute \'final_model\' is None') return retrieve_models(res, names=[])[0]
[docs] def write_results(results: Results, path: Union[str, Path], lzma: bool = False, csv: bool = False): """Write results object to json (or csv) file Note that the csv-file cannot be read into a results object again. Parameters ---------- results : Results Pharmpy results object path : Path Path to results file lzma : bool True for lzma compression. Not applicable to csv file csv : bool Save as csv file """ path = normalize_user_given_path(path) if csv: results.to_csv(path) else: results.to_json(path, lzma=lzma)
[docs] def summarize_errors(context: Context) -> pd.DataFrame: """Summarize errors and warnings from all runs in a context. Summarize the errors and warnings found after running the model/models. Parameters ---------- context : Context Context in which models were run Return ------ pd.DataFrame A DataFrame of errors with model name, category (error or warning), and an int as index, an empty DataFrame if there were no errors or warnings found. """ names = context.list_all_names() mes = [context.retrieve_model_entry(name) for name in names] return summarize_errors_from_entries(mes)
def summarize_errors_from_entries(mes: list[ModelEntry]): idcs, rows = [], [] for me in mes: name = res = me.modelfit_results if res is not None and len(res.log) > 0: for i, entry in enumerate(res.log): idcs.append((name, entry.category, i)) rows.append([entry.time, entry.message]) index_names = ['model', 'category', 'error_no'] col_names = ['time', 'message'] index = pd.MultiIndex.from_tuples(idcs, names=index_names) if rows: df = pd.DataFrame(rows, columns=col_names, index=index) else: df = pd.DataFrame(columns=col_names, index=index) return df.sort_index()
[docs] def rank_models( base_model: Model, base_model_res: ModelfitResults, models: list[Model], models_res: list[ModelfitResults], parent_dict: Optional[Union[dict[str, str], dict[Model, Model]]] = None, strictness: Optional[str] = "minimization_successful", rank_type: str = 'ofv', cutoff: Optional[float] = None, bic_type: str = 'mixed', **kwargs, ) -> pd.DataFrame: """Ranks a list of models Ranks a list of models with a given ranking function Parameters ---------- base_model : Model Base model to compare to base_model_res : ModelfitResults Results of base model models : list List of models models_res : list List of modelfit results parent_dict : dict Dict where key is child and value is parent. Only relevant for LRT, if None base will be set as parent strictness : str or None Strictness criteria that are allowed for ranking. Default is "minimization_successful". rank_type : str Name of ranking type. Available options are 'ofv', 'aic', 'bic', 'lrt' (OFV with LRT) cutoff : float or None Value to use as cutoff. If using LRT, cutoff denotes p-value. Default is None bic_type : str Type of BIC to calculate. Default is the mixed effects. kwargs Arguments to pass to calculate BIC (such as `mult_test_p` and `mult_test_p`) Return ------ pd.DataFrame DataFrame of the ranked models Examples -------- >>> from pharmpy.modeling import load_example_model >>> from import rank_models >>> model_1 = load_example_model("pheno") >>> model_2 = load_example_model("pheno_linear") >>> rank_models(model_1, [model_2], ... rank_type='lrt') # doctest: +SKIP """ if len(models) != len(models_res): raise ValueError('Different length of `models` and `models_res`') if rank_type == 'lrt' and not parent_dict: parent_dict = { for model in models} if parent_dict and not isinstance(list(parent_dict.keys())[0], str): parent_dict = { for child, parent in parent_dict.items()} models_all = [base_model] + models res_all = [base_model_res] + models_res rank_values, delta_values = {}, {} models_to_rank = [] ref_value = _get_rankval(base_model, base_model_res, strictness, rank_type, bic_type, **kwargs) model_dict = { (model, res) for model, res in zip(models_all, res_all)} # Filter on strictness for model, res in zip(models_all, res_all): # Exclude OFV etc. if model was not successful rank_value = _get_rankval(model, res, strictness, rank_type, bic_type, **kwargs) if np.isnan(rank_value): continue if == pass elif rank_type == 'lrt': parent_model, parent_res = model_dict[parent_dict[]] if cutoff is None: co = 0.05 if lrt_df(parent_model, model) >= 0 else 0.01 elif isinstance(cutoff, tuple): co = cutoff[0] if lrt_df(parent_model, model) >= 0 else cutoff[1] else: assert isinstance(cutoff, (float, int)) co = cutoff parent_ofv = np.nan if (mfr := parent_res) is None else mfr.ofv model_ofv = np.nan if (mfr := res) is None else mfr.ofv if not lrt_test(parent_model, model, parent_ofv, model_ofv, co): continue elif cutoff is not None: if ref_value - rank_value <= cutoff: continue # Add ranking value and model rank_values[] = rank_value delta_values[] = ref_value - rank_value models_to_rank.append(model) # Sort def _get_delta(model): if np.isnan(ref_value): return -rank_values[] return delta_values[] models_sorted = sorted(models_to_rank, key=_get_delta, reverse=True) # Create rank for models, if two have the same value they will have the same rank ranking = {} rank, count, prev = 0, 0, None for model in models_sorted: count += 1 value = _get_delta(model) if value != prev: rank += count prev = value count = 0 ranking[] = rank rows = {} for model in models_all: delta, rank_value, rank = np.nan, np.nan, np.nan if in ranking.keys(): rank = ranking[] if in rank_values.keys(): rank_value = rank_values[] if in delta_values.keys(): delta = delta_values[] rows[] = (delta, rank_value, rank) if rank_type == 'lrt': rank_type_name = 'ofv' else: rank_type_name = rank_type index = pd.Index(rows.keys(), name='model') df = pd.DataFrame( rows.values(), index=index, columns=[f'd{rank_type_name}', f'{rank_type_name}', 'rank'] ) if np.isnan(ref_value): return df.sort_values(by=[f'{rank_type_name}']) else: return df.sort_values(by=[f'd{rank_type_name}'], ascending=False)
class ArrayEvaluator: def __init__(self, x): self.x = x def __lt__(self, value): return all(e < value for e in self.x) def __eq__(self, value): return all(e == value for e in self.x) def __le__(self, value): return all(e <= value for e in self.x) def __ge__(self, value): return all(e >= value for e in self.x) def __gt__(self, value): return all(e > value for e in self.x)
[docs] def is_strictness_fulfilled( res: ModelfitResults, model: Model, statement: str, ) -> bool: """Takes a ModelfitResults object and a statement as input and returns True/False if the evaluation of the statement is True/False. Parameters ---------- results : ModelfitResults ModelfitResults object model : Model Model for parameter specific strictness. statement : str A statement containing the strictness criteria Return ------ bool A bool indicating whether the strictness criteria are fulfilled or not. Examples -------- >>> from import * >>> from pharmpy.modeling import * >>> res = load_example_modelfit_results('pheno') >>> model = load_example_model('pheno') >>> is_strictness_fulfilled(res, model, "minimization_successful or rounding_errors") True """ if res is None or np.isnan(res.ofv): return False if statement is not None: statement = statement.lower() allowed_args = [ 'minimization_successful', 'rounding_errors', 'sigdigs', 'maxevals_exceeded', 'rse', 'rse_theta', 'rse_omega', 'rse_sigma', 'condition_number', 'final_zero_gradient', 'final_zero_gradient_theta', 'final_zero_gradient_omega', 'final_zero_gradient_sigma', 'estimate_near_boundary', 'estimate_near_boundary_theta', 'estimate_near_boundary_omega', 'estimate_near_boundary_sigma', ] unwanted_args = ['and', 'or', 'not'] find_all_words = re.findall(r'[^\d\W]+', statement) args_in_statement = [w for w in find_all_words if w not in unwanted_args] find_all_non_allowed_operators = re.findall(r"[^\w\s\.\<\>\=\!\(\)]", statement) if len(find_all_non_allowed_operators) > 0: raise ValueError( f"Unallowed operators found: {', '.join(find_all_non_allowed_operators)}" ) # Check that only allowed arguments are in the statement if not all(map(lambda x: x in allowed_args, args_in_statement)): raise ValueError( f'Some expressions were not correct. Valid arguments are: {allowed_args}' ) else: minimization_successful = res.minimization_successful # noqa rounding_errors = res.termination_cause == "rounding_errors" # noqa maxevals_exceeded = res.termination_cause == "maxevals_exceeded" # noqa sigdigs = ArrayEvaluator([res.significant_digits]) # noqa final_zero_gradient = 'final_zero_gradient' in res.warnings # noqa estimate_near_boundary = 'estimate_near_boundary' in res.warnings # noqa if 'condition_number' in args_in_statement: if res.covariance_matrix is not None: condition_number = ArrayEvaluator( # noqa [np.linalg.cond(res.covariance_matrix)] ) else: raise ValueError("Could not calculate condition_number.") if "rse" in args_in_statement: if res.relative_standard_errors is not None: rse = ArrayEvaluator(res.relative_standard_errors) # noqa else: raise ValueError("Could not calculate relative standard error.") if ( 'rse_theta' in args_in_statement or 'rse_omega' in args_in_statement or 'rse_sigma' in args_in_statement ): rse = res.relative_standard_errors rse_theta = ArrayEvaluator(rse[rse.index.isin(get_thetas(model).names)]) # noqa rse_omega = ArrayEvaluator(rse[rse.index.isin(get_omegas(model).names)]) # noqa rse_sigma = ArrayEvaluator(rse[rse.index.isin(get_sigmas(model).names)]) # noqa if ( 'final_zero_gradient_theta' in args_in_statement or 'final_zero_gradient_omega' in args_in_statement or 'final_zero_gradient_sigma' in args_in_statement ): grd = res.gradients final_zero_gradient_theta = ( # noqa grd[grd.index.isin(get_thetas(model).names)] == 0 ).any() or grd[grd.index.isin(get_thetas(model).names)].isnull().any() final_zero_gradient_omega = ( # noqa grd[grd.index.isin(get_omegas(model).names)] == 0 ).any() or grd[grd.index.isin(get_thetas(model).names)].isnull().any() final_zero_gradient_sigma = ( # noqa grd[grd.index.isin(get_sigmas(model).names)] == 0 ).any() or grd[grd.index.isin(get_thetas(model).names)].isnull().any() if ( 'estimate_near_boundary' in args_in_statement or 'estimate_near_boundary_theta' in args_in_statement or 'estimate_near_boundary_omega' in args_in_statement or 'estimate_near_boundary_sigma' in args_in_statement ): ests = res.parameter_estimates estimate_near_boundary = check_parameters_near_bounds(model, ests).any() # noqa estimate_near_boundary_theta = check_parameters_near_bounds( # noqa model, ests[ests.index.isin(get_thetas(model).names)] ).any() estimate_near_boundary_omega = check_parameters_near_bounds( # noqa model, ests[ests.index.isin(get_omegas(model).names)] ).any() estimate_near_boundary_sigma = check_parameters_near_bounds( # noqa model, ests[ests.index.isin(get_sigmas(model).names)] ).any() return eval(statement) else: return True
def _get_rankval(model, res, strictness, rank_type, bic_type, **kwargs): if not is_strictness_fulfilled(res, model, strictness): return np.nan if rank_type in ['ofv', 'lrt']: return res.ofv elif rank_type == 'aic': return calculate_aic(model, res.ofv) elif rank_type == 'bic': return calculate_bic(model, res.ofv, bic_type, **kwargs) else: raise ValueError('Unknown rank_type: must be ofv, lrt, aic, or bic')
[docs] def summarize_modelfit_results( context: Context, include_all_execution_steps: bool = False, ) -> pd.DataFrame: """Summarize results of model runs Summarize different results after fitting a model, includes runtime, ofv, and parameter estimates (with errors). If include_all_execution_steps is False, only the last estimation step will be included (note that in that case, the minimization_successful value will be referring to the last estimation step, if last step is evaluation it will go backwards until it finds an estimation step that wasn't an evaluation). Parameters ---------- context : Context Context in which models were run include_all_execution_steps : bool Whether to include all estimation steps, default is False Return ------ pd.DataFrame A DataFrame of modelfit results with model name and estmation step as index. """ names = context.list_all_names() mes = [context.retrieve_model_entry(name) for name in names] df = summarize_modelfit_results_from_entries(mes, include_all_execution_steps) return df
def summarize_modelfit_results_from_entries( mes: list[ModelEntry], include_all_execution_steps: bool = False, ) -> pd.DataFrame: if mes is None: raise ValueError('Option `results` is None') if all(me is None for me in mes): raise ValueError('All input results are empty') summaries = [] for me in mes: if me is not None and me.modelfit_results is not None: summary = _get_model_result_summary(me, include_all_execution_steps) summary.insert(0, 'description', me.model.description) summaries.append(summary) with warnings.catch_warnings(): # Needed because of warning in pandas 2.1.1 warnings.filterwarnings( "ignore", message="The behavior of DataFrame concatenation with empty or all-NA entries is deprecated", category=FutureWarning, ) df = pd.concat(summaries) return df def _get_model_result_summary(me, include_all_execution_steps=False): res = me.modelfit_results if not include_all_execution_steps: summary_dict = _summarize_step(res, -1) index = pd.Index([], name='model') summary_df = pd.DataFrame(summary_dict, index=index) else: summary_dicts = [] tuples = [] for i in range(len(res.evaluation)): summary_dict = _summarize_step(res, i) is_evaluation = res.evaluation.iloc[i] if is_evaluation: run_type = 'evaluation' else: run_type = 'estimation' summary_dict = {'run_type': run_type, **summary_dict} summary_dicts.append(summary_dict) tuples.append((, i + 1)) index = pd.MultiIndex.from_tuples(tuples, names=['model', 'step']) summary_df = pd.DataFrame(summary_dicts, index=index) no_of_errors = len(res.log.errors) no_of_warnings = len(res.log.warnings) minimization_idx = summary_df.columns.get_loc('minimization_successful') summary_df.insert(loc=minimization_idx + 1, column='errors_found', value=no_of_errors) summary_df.insert(loc=minimization_idx + 2, column='warnings_found', value=no_of_warnings) return summary_df def _summarize_step(res, i): summary_dict = {} if i >= 0: minsucc = res.minimization_successful_iterations.iloc[i] else: minsucc = res.minimization_successful if minsucc is not None: summary_dict['minimization_successful'] = minsucc else: summary_dict['minimization_successful'] = False if i == -1 and res.ofv_iterations is not None: i = max(res.ofv_iterations.index.get_level_values(0)) - 1 summary_dict['ofv'] = _get_ofv(res, i) summary_dict['runtime_total'] = res.runtime_total summary_dict['estimation_runtime'] = _get_estimation_runtime(res, i) pe = _get_parameter_estimates(res, i) ses = res.standard_errors rses = res.relative_standard_errors for param in pe.index: summary_dict[f'{param}_estimate'] = pe[param] if ses is not None: summary_dict[f'{param}_SE'] = ses[param] if rses is not None: summary_dict[f'{param}_RSE'] = rses[param] return summary_dict def _get_ofv(res, i): if res.ofv_iterations is None: return res.ofv return res.ofv_iterations[i + 1,].iloc[-1] def _get_parameter_estimates(res, i): if res.parameter_estimates_iterations is None: return res.parameter_estimates return res.parameter_estimates_iterations.loc[i + 1,].iloc[-1] def _get_estimation_runtime(res, i): if res.estimation_runtime_iterations is None: return res.runtime_total return res.estimation_runtime_iterations.iloc[i]
[docs] def read_modelfit_results(path: Union[str, Path]) -> ModelfitResults: """Read results from external tool for a model Parameters ---------- path : Path or str Path to model file Return ------ ModelfitResults Results object """ path = normalize_user_given_path(path) model = read_model(path) res = parse_modelfit_results(model, path) return res
def _get_run_setup_from_metadata(path): import pharmpy.workflows as workflows context = workflows.default_context(name=path, ref=None) tool_metadata = context.retrieve_metadata() common_options = tool_metadata['common_options'] # TODO: Be more general dispatcher = getattr(workflows, common_options['dispatcher'].split('.')[-1]) return dispatcher, context
[docs] def load_example_modelfit_results(name: str): """Load the modelfit results of an example model Load the modelfit results of an example model built into Pharmpy Parameters ---------- name : str Name of the model. Currently available models are "pheno" and "pheno_linear" Returns ------- ModelfitResults Loaded modelfit results object Example ------- >>> from import load_example_modelfit_results >>> results = load_example_modelfit_results("pheno") >>> results.parameter_estimates PTVCL 0.004696 PTVV 0.984258 THETA_3 0.158920 IVCL 0.029351 IVV 0.027906 SIGMA_1_1 0.013241 Name: estimates, dtype: float64 """ available = ('pheno', 'pheno_linear') if name not in available: raise ValueError(f'Unknown example model {name}. Available examples: {available}') path = Path(__file__).resolve().parent.parent / 'internals' / 'example_models' / (name + '.mod') res = read_modelfit_results(path) return res