Source code for pharmpy.workflows.results

from __future__ import annotations

import importlib
import json
import lzma
import re
import warnings
from contextlib import closing
from dataclasses import dataclass
from io import StringIO
from lzma import open as lzma_open
from pathlib import Path
from typing import TYPE_CHECKING, Any, Literal, Optional, Union, overload

import pharmpy
from pharmpy.deps import altair as alt
from pharmpy.deps import pandas as pd
from pharmpy.internals.immutable import Immutable
from pharmpy.model import Model

if TYPE_CHECKING:
    from pharmpy.workflows import Log


def mfr(res: ModelfitResults) -> ModelfitResults:
    assert isinstance(res, ModelfitResults)
    return res


def _df_to_json(df: pd.DataFrame) -> dict[str, Any]:
    if str(df.columns.dtype) == 'int64':
        # Workaround for https://github.com/pandas-dev/pandas/issues/46392
        df.columns = df.columns.map(str)
    # Set double precision to 15 to remove some round-trip errors, however 17 should be set when its possible
    # See: https://github.com/pandas-dev/pandas/issues/38437
    df_json = df.to_json(orient='table', double_precision=15)
    assert df_json is not None
    return json.loads(df_json)


def _index_to_json(index: Union[pd.Index, pd.MultiIndex]) -> dict[str, Any]:
    if isinstance(index, pd.MultiIndex):
        return {'__class__': 'MultiIndex', **_df_to_json(index.to_frame(index=False))}
    return {'__class__': 'Index', **_df_to_json(index.to_frame(index=False))}


class ResultsJSONEncoder(json.JSONEncoder):
    def default(self, obj) -> Union[dict[str, Any], None]:
        # NOTE: This function is called when the base JSONEncoder does not know
        # how to encode the given object, so it will not be called on int,
        # float, str, list, tuple, and dict. It could be called on set for
        # instance, or any custom class.
        from pharmpy.workflows import Log

        if isinstance(obj, Results):
            d = obj.to_dict()
            d['__module__'] = obj.__class__.__module__
            d['__class__'] = obj.__class__.__qualname__
            return d
        elif isinstance(obj, pd.DataFrame):
            d = _df_to_json(obj)
            d['__class__'] = 'DataFrame'
            return d
        elif isinstance(obj, pd.Series):
            if obj.size >= 1 and isinstance(obj.iloc[0], pd.DataFrame):
                # NOTE: Hack special case for Series of DataFrame objects
                return {
                    'data': [{'__class__': 'DataFrame', **_df_to_json(df)} for df in obj.values],
                    'index': _index_to_json(obj.index),
                    'name': obj.name,
                    'dtype': str(obj.dtype),
                    '__class__': 'Series[DataFrame]',
                }

            # NOTE: Hack to work around poor support of to_json/read_json of
            # pd.Series with MultiIndex
            df = obj.to_frame()
            d = _df_to_json(df)
            d['__class__'] = 'Series'
            return d
        elif obj.__class__.__module__.startswith('altair.'):
            with warnings.catch_warnings():
                # FIXME: Remove filter once altair stops relying on deprecated APIs
                warnings.filterwarnings(
                    "ignore",
                    message=".*iteritems is deprecated and will be removed in a future version. Use .items instead.",
                    category=FutureWarning,
                )
                # This was fixed in altair v2.1.1 can be removed once we require that version
                warnings.filterwarnings(
                    "ignore",
                    message=".*the convert_dtype parameter is deprecated",
                    category=FutureWarning,
                )
                d = obj.to_dict()
            d['__module__'] = obj.__class__.__module__
            d['__class__'] = obj.__class__.__qualname__
            return d
        elif isinstance(obj, Model):
            # TODO: Consider using other representation, e.g. path
            return None
        elif isinstance(obj, Log):
            d: dict[Any, Any] = obj.to_dict()
            d['__class__'] = obj.__class__.__qualname__
            return d
        elif isinstance(obj, Path):
            d = {'path': str(obj), '__class__': 'PosixPath'}
            return d
        else:
            # NOTE: This will raise a proper TypeError
            return super().default(obj)


def _df_read_json(obj) -> pd.DataFrame:
    # Convert time strings to naive datetime and then to string
    # Needed because of https://github.com/pandas-dev/pandas/issues/52595
    for row in obj['data']:
        if 'time' in row:
            row['time'] = pd.to_datetime(row['time']).tz_localize(None)
            row['time'] = row['time'].isoformat()

    return pd.read_json(StringIO(json.dumps(obj)), typ='frame', orient='table', precise_float=True)


def _multi_index_read_json(obj) -> pd.MultiIndex:
    return pd.MultiIndex.from_frame(_df_read_json(obj))


class ResultsJSONDecoder(json.JSONDecoder):
    def __init__(self, *args, **kwargs):
        json.JSONDecoder.__init__(self, object_hook=self.object_hook, *args, **kwargs)

    def object_hook(self, obj):
        # NOTE: This hook will be called for every dict produced by the
        # base JSONDecoder. It will not be called on int, float, str, or list.
        module = None
        cls = None

        if '__module__' in obj:
            module = obj['__module__']
            del obj['__module__']

        if '__class__' in obj:
            cls = obj['__class__']
            del obj['__class__']

        # NOTE: Handling cls not None and module is None is kept for backwards
        # compatibility

        if cls is None and module is not None:
            raise ValueError('Cannot specify module without specifying class')

        if module is None or module.startswith('pandas.'):
            if cls == 'DataFrame':
                return _df_read_json(obj)
            elif cls == 'Series':
                # NOTE: Hack to work around poor support of to_json/read_json of
                # pd.Series with MultiIndex
                df = _df_read_json(obj)
                series = df.iloc[:, 0]  # NOTE: First and only column.
                return series
            elif cls == 'Series[DataFrame]':
                # NOTE: Hack to work around poor support of Series of DataFrame
                # objects. All subobjects have already been converted.
                return pd.Series(
                    obj['data'], index=obj['index'], dtype=obj['dtype'], name=obj['name']
                )
            elif cls == 'Index':
                return _multi_index_read_json(obj).get_level_values(0)
            elif cls == 'MultiIndex':
                return _multi_index_read_json(obj)

        if module is None:
            if cls == 'vega-lite':
                # NOTE: Slow parsing for parsing PsN frem output and old format
                return alt.Chart.from_dict(obj, validate=True)

        if module is not None and module.startswith('altair.'):
            # NOTE: Fast parsing when reading own output
            assert cls is not None
            try:
                class_ = getattr(alt, cls)
            except AttributeError:
                raise ValueError(f'Unknown class {cls} in {module}')
            return class_.from_dict(obj, validate=False)

        if cls is not None and cls.endswith('Results'):
            if module is None:
                # NOTE: Kept for backwards compatibility: we guess the module
                # path based on the class name.
                tool_name = cls[:-7].lower()  # NOTE: Trim "Results" suffix
                tool_module = importlib.import_module(f'pharmpy.tools.{tool_name}')
                results_class = tool_module.results_class
            else:
                tool_module = importlib.import_module(module)
                results_class = getattr(tool_module, cls)

            return results_class.from_dict(obj)

        if cls == 'PosixPath':
            return Path(obj)
        if cls == 'Log':
            from pharmpy.workflows import Log

            return Log.from_dict(obj)

        return obj


def _is_likely_to_be_json(source: str):
    # NOTE: Heuristic to determine if path or buffer: first non-space character
    # is '{'.
    match = re.match(r'\s*([^\s])', source)
    return match is not None and match.group(1) == '{'


def read_results(path_or_str: Union[str, Path]):
    if isinstance(path_or_str, str) and _is_likely_to_be_json(path_or_str):
        manager = closing(StringIO(path_or_str))
    else:
        path = Path(path_or_str)
        if path.is_dir():
            path /= 'results.json'

        if path.name.endswith('.xz'):
            manager = lzma.open(path, 'r', encoding='utf-8')
        else:
            manager = open(path, 'r')

    with manager as readable:
        return json.load(readable, cls=ResultsJSONDecoder)


[docs] @dataclass(frozen=True) class Results(Immutable): """Base class for all result classes""" __version__: str = pharmpy.__version__ # NOTE: Default version if not overridden
[docs] @classmethod def from_dict(cls, d: dict[str, Any]): """Create results object from dictionary""" removed_keys = { '__version__', 'best_model', # NOTE: Was removed in d5b3503 and 8578c8b 'input_model', # NOTE: Was removed in d5b3503 and 8578c8b 'summary_individuals', # NOTE: Was removed in 5873fe9 'summary_individuals_count', # NOTE: Was removed in 5873fe9 } return cls( __version__=d.get('__version__', 'unknown'), # NOTE: Override default version **{k: v for k, v in d.items() if k not in removed_keys}, )
@overload def to_json(self, path: None = None, lzma: Literal[False] = False) -> str: ... @overload def to_json(self, path: Path, lzma: bool = False) -> None: ...
[docs] def to_json(self, path: Optional[Path] = None, lzma: bool = False) -> Union[str, None]: """Serialize results object as json Parameters ---------- path : Path Path to save json file or None to serialize to string lzma : bool Set to compress file with lzma Returns ------- str Json as string unless path was used """ s = ResultsJSONEncoder().encode(self) if path: if not lzma: with open(path, 'w') as fh: fh.write(s) else: xz_path = path.parent / (path.name + '.xz') with lzma_open(xz_path, 'w') as fh: fh.write(bytes(s, 'utf-8')) else: return s
[docs] def get_and_reset_index(self, attr: str, **kwargs) -> pd.DataFrame: """Wrapper to reset index of attribute or result from method. Used to facilitate importing multiindex dataframes into R """ val = getattr(self, attr) if callable(val): df = val(**kwargs) else: df = val return df.reset_index()
[docs] def to_dict(self) -> dict[str, Any]: """Convert results object to a dictionary""" return vars(self).copy()
def __str__(self): start = self.__class__.__name__ s = f'{start}\n\n' d = self.to_dict() for key, value in d.items(): if value.__class__.__module__.startswith('altair.'): continue s += f'{key}\n' if isinstance(value, pd.DataFrame): s += value.to_string() elif isinstance(value, list): # Print list of lists as table if len(value) > 0 and isinstance(value[0], list): df = pd.DataFrame(value) df_str = df.to_string(index=False) df_str = df_str.split('\n')[1:] s += '\n'.join(df_str) else: s += str(value) + '\n' s += '\n\n' return s
[docs] def to_csv(self, path: Path): """Save results as a human readable csv file Index will not be printed if it is a basic range. Parameters ---------- path : Path Path to csv-file """ d = self.to_dict() s = "" for key, value in d.items(): if value.__class__.__module__.startswith('altair.'): continue elif isinstance(value, Model): continue elif isinstance(value, ModelfitResults): continue elif isinstance(value, list) and isinstance(value[0], Model): continue s += f'{key}\n' if isinstance(value, pd.DataFrame): if isinstance(value.index, pd.RangeIndex): use_index = False else: use_index = True csv = value.to_csv(index=use_index) assert isinstance(csv, str) s += csv elif isinstance(value, pd.Series): csv = value.to_csv() assert isinstance(csv, str) s += csv elif isinstance(value, list): # Print list of lists as table if len(value) > 0 and isinstance(value[0], list): for row in value: s += f'{",".join(map(str, row))}\n' else: s += str(value) + '\n' s += '\n' with open(path, 'w', newline='') as fh: print(s, file=fh)
[docs] @dataclass(frozen=True) class ModelfitResults(Results): """Base class for results from a modelfit operation Attributes ---------- correlation_matrix : pd.DataFrame Correlation matrix of the population parameter estimates covariance_matrix : pd.DataFrame Covariance matrix of the population parameter estimates precision_matrix : pd.DataFrame Precision matrix of the population parameter estimates evaluation_ofv : float The objective function value as if the model was evaluated. Currently works for classical estimation methods by taking the OFV of the first iteration. individual_ofv : pd.Series OFV for each individual individual_estimates : pd.DataFrame Estimates for etas individual_estimates_covariance : pd.Series Estimated covariance between etas parameter_estimates : pd.Series Population parameter estimates parameter_estimates_iterations : pd.DataFrame All recorded iterations for parameter estimates parameter_estimates_sdcorr : pd.Series Population parameter estimates with variability parameters as standard deviations and correlations residuals: pd.DataFrame Table of various residuals predictions: pd.DataFrame Table of various predictions derivaitves: pd.DataFrame Table of various derivatives estimation_runtime : float Runtime for one estimation step runtime_total : float Total runtime of estimation standard_errors : pd.Series Standard errors of the population parameter estimates standard_errors_sdcorr : pd.Series Standard errors of the population parameter estimates on standard deviation and correlation scale relative_standard_errors : pd.Series Relative standard errors of the population parameter estimates termination_cause : str The cause of premature termination. One of 'maxevals_exceeded' and 'rounding_errors' function_evaluations : int Number of function evaluations evaluation : pd.Series A bool for each estimation step. True if this was a model evaluation and False otherwise covstep_successful : bool or None Covariance status. gradients : pd.Series Final parameter gradients gradients_iterations : pd.DataFrame All recorded parameter gradients warnings : list List of warnings individual_eta_samples : pd.DataFrame Individual eta samples """ ofv: Optional[float] = None ofv_iterations: Optional[pd.Series] = None parameter_estimates: Optional[pd.Series] = None parameter_estimates_sdcorr: Optional[pd.Series] = None parameter_estimates_iterations: Optional[pd.DataFrame] = None covariance_matrix: Optional[pd.DataFrame] = None correlation_matrix: Optional[pd.DataFrame] = None precision_matrix: Optional[pd.DataFrame] = None standard_errors: Optional[pd.Series] = None standard_errors_sdcorr: Optional[pd.Series] = None relative_standard_errors: Optional[pd.Series] = None minimization_successful: Optional[bool] = None minimization_successful_iterations: Optional[pd.DataFrame] = None estimation_runtime: Optional[float] = None estimation_runtime_iterations: Optional[pd.DataFrame] = None individual_ofv: Optional[pd.Series] = None individual_estimates: Optional[pd.DataFrame] = None individual_estimates_covariance: Optional[pd.DataFrame] = None residuals: Optional[pd.DataFrame] = None predictions: Optional[pd.DataFrame] = None derivatives: Optional[pd.DataFrame] = None runtime_total: Optional[float] = None termination_cause: Optional[str] = None termination_cause_iterations: Optional[pd.Series] = None function_evaluations: Optional[float] = None function_evaluations_iterations: Optional[pd.Series] = None significant_digits: Optional[float] = None significant_digits_iterations: Optional[pd.Series] = None log_likelihood: Optional[float] = None log: Optional['Log'] = None evaluation: Optional[pd.Series] = None covstep_successful: Optional[bool, None] = None gradients: Optional[pd.Series] = None gradients_iterations: Optional[pd.DataFrame] = (None,) warnings: Optional[list[str]] = None individual_eta_samples: Optional[pd.DataFrame] = None def __repr__(self): return '<Pharmpy modelfit results object>'
[docs] @dataclass(frozen=True) class SimulationResults(Results): """Base class for resutls from simulation operation Attributes ---------- table : pd.DataFrame Table file of model """ table: Optional[pd.DataFrame] = None def __repr__(self): return '<Pharmpy simulation results object>'