Source code for pharmpy.workflows.model_database.local_directory

import json
import shutil
from contextlib import contextmanager
from os import stat
from pathlib import Path
from typing import Union

from pharmpy.internals.fs.lock import path_lock
from pharmpy.internals.fs.path import path_absolute
from pharmpy.model import DataInfo, Model
from pharmpy.modeling import write_csv, write_model
from pharmpy.workflows.model_entry import ModelEntry
from pharmpy.workflows.results import ModelfitResults, read_results

from ..hashing import ModelHash
from .baseclass import (
    ModelSnapshot,
    ModelTransaction,
    NonTransactionalModelDatabase,
    PendingTransactionError,
    TransactionalModelDatabase,
)

DIRECTORY_PHARMPY_METADATA = '.pharmpy'
DIRECTORY_DATASETS = '.datasets'
DIRECTORY_INDEX = '.hash'
FILE_METADATA = 'metadata.json'
FILE_MODELFIT_RESULTS = 'results.json'
FILE_PENDING = 'PENDING'
FILE_LOCK = '.lock'


def get_modelfit_results(model, path, esttool=None):
    # FIXME: This is a workaround. The proper solution is to only read the results.json from
    # the database. For this to work roundtrip of DataFrames in json is needed.
    # This is currently broken because of rounding issue in pandas
    # Also the modelfit_results attribute will soon be removed from model objects.
    import pharmpy.model.external.nonmem as nonmem_model
    import pharmpy.tools.external.nonmem as nonmem

    if esttool is not None:
        if esttool == 'dummy':
            import pharmpy.tools.external.dummy as tool
        elif esttool == 'nonmem':
            import pharmpy.tools.external.nonmem as tool
        elif esttool == 'nlmixr':
            import pharmpy.tools.external.nlmixr as tool
        res = tool.parse_modelfit_results(model, path)
        return res

    if isinstance(model, nonmem_model.Model):
        res = nonmem.parse_modelfit_results(model, path)
    else:
        import pharmpy.model.external.nlmixr as nlmixr_model
        import pharmpy.tools.external.nlmixr as nlmixr

        assert isinstance(model, nlmixr_model.Model)
        res = nlmixr.parse_modelfit_results(model, path)

    return res


def create_model_entry(model, modelfit_results):
    # FIXME: This function is to avoid duplication of this logic, this can be removed once
    #  parent_model has been moved from Model and log has been moved from modelfit_results
    #  and each database implementation has methods for retrieving these
    # Currently no parent information can be retrieved
    if not isinstance(modelfit_results, ModelfitResults):
        modelfit_results = None
        log = None
    else:
        log = modelfit_results.log

    return ModelEntry(model=model, modelfit_results=modelfit_results, log=log)


[docs] class LocalDirectoryDatabase(NonTransactionalModelDatabase): """ModelDatabase implementation for single local directory All files will be stored in the same directory. It is assumed that all files connected to a model are named modelname + extension. This means that care must be taken to keep filenames unique. Clashing filenames will be overwritten. It is recommended to use the LocalModelDirectoryDatabase instead. Parameters ---------- path : str or Path Path to the database directory. Will be created if it does not exist. file_extension : str File extension to use for model files. """ def __init__(self, path='.', file_extension='.mod'): path = Path(path) path.mkdir(parents=True, exist_ok=True) self.path = path_absolute(path) self.file_extension = file_extension self.ignored_names = frozenset(('stdout', 'stderr', 'nonmem.json', 'nlmixr.json'))
[docs] def store_model(self, model): pass
[docs] def store_local_file(self, model, path, new_filename=None): path_object = Path(path) if path_object.name not in self.ignored_names and path_object.is_file(): dest_path = self.path if new_filename: dest_path = self.path / new_filename shutil.copy2(path, dest_path)
[docs] def retrieve_local_files(self, name, destination_path): # Retrieve all files stored for one model files = self.path.glob(f'{name}.*') for f in files: shutil.copy2(f, destination_path)
[docs] def retrieve_file(self, name, filename): # Return path to file path = self.path / filename if path.is_file() and stat(path).st_size > 0: return path else: raise FileNotFoundError(f"Cannot retrieve {filename} for {name}")
[docs] def retrieve_model(self, name): filename = name + self.file_extension path = self.path / filename from pharmpy.model import Model try: model = Model.parse_model(path) except FileNotFoundError: raise KeyError('Model cannot be found in database') return model
[docs] def retrieve_modelfit_results(self, name): model = self.retrieve_model(name) return get_modelfit_results(model, self.path)
[docs] def retrieve_model_entry(self, name): model = self.retrieve_model(name) modelfit_results = self.retrieve_modelfit_results(name) return create_model_entry(model, modelfit_results)
[docs] def store_metadata(self, model, metadata): pass
[docs] def store_modelfit_results(self, model): pass
[docs] def store_model_entry(self, model_entry): pass
def __repr__(self): return f"LocalDirectoryDatabase({self.path})"
[docs] class LocalModelDirectoryDatabase(TransactionalModelDatabase): """ModelDatabase implementation for a local directory structure Files will be stored in separate subdirectories named after each model. There are no restrictions on names of the files so models can have the same name of some connected file without creating a name clash. Parameters ---------- path : str or Path Path to the base database directory. Will be created if it does not exist. file_extension : str File extension to use for model files. """ def __init__(self, path: Union[str, Path] = '.', file_extension='.mod'): path = Path(path) path.mkdir(parents=True, exist_ok=True) self.path = path_absolute(path) self.file_extension = file_extension def _read_lock(self): # NOTE: Obtain shared (blocking) lock on the entire database path = self.path / FILE_LOCK path.touch(exist_ok=True) return path_lock(str(path), shared=True) def _write_lock(self): # NOTE: Obtain exclusive (blocking) lock on the entire database path = self.path / FILE_LOCK path.touch(exist_ok=True) return path_lock(str(path), shared=False)
[docs] @contextmanager def snapshot(self, obj: Union[Model, ModelEntry, ModelHash]): key = ModelHash(obj) model_path = self.path / str(key) destination = model_path / DIRECTORY_PHARMPY_METADATA destination.mkdir(parents=True, exist_ok=True) with self._read_lock(): # NOTE: Check that no pending transaction exists path = destination / FILE_PENDING if path.exists(): # TODO: Finish pending transaction from journal if possible raise PendingTransactionError() yield LocalModelDirectoryDatabaseSnapshot(self, obj)
[docs] @contextmanager def transaction(self, obj: Union[Model, ModelEntry, ModelHash]): key = ModelHash(obj) model_path = self.path / str(key) destination = model_path / DIRECTORY_PHARMPY_METADATA destination.mkdir(parents=True, exist_ok=True) with self._write_lock(): # NOTE: Mark state as pending path = destination / FILE_PENDING try: path.touch(exist_ok=False) except FileExistsError: # TODO: Finish pending transaction from journal if possible raise PendingTransactionError() yield LocalModelDirectoryDatabaseTransaction(self, obj) # NOTE: Commit transaction (only if no exception was raised) path.unlink()
def __repr__(self): return f"LocalModelDirectoryDatabase({self.path})"
class LocalModelDirectoryDatabaseTransaction(ModelTransaction): def store_model(self): if self.model_entry is None: raise ValueError("Cannot store model: No model attached to transaction") model = self.model_entry.model datasets_path = self.database.path / DIRECTORY_DATASETS model_path = self.database.path / str(self.key) model_file_path = model_path / ("model" + model.filename_extension) if model_file_path.is_file(): return model # NOTE: Get the hash of the dataset and list filenames with contents # matching this hash only h = self.key.dataset_hash h_dir = datasets_path / DIRECTORY_INDEX / str(h) if h_dir.is_dir(): hpath = next(h_dir.iterdir()) # NOTE: This variable holds a string similar to "run1.csv" matching_model_filename = hpath.name data_path = datasets_path / matching_model_filename dipath = data_path.with_suffix('.datainfo') # TODO: Maybe catch FileNotFoundError and similar here (pass) curdi = DataInfo.read_json(dipath) # NOTE: Paths are not compared here if curdi == model.datainfo: datainfo = model.datainfo.replace(path=curdi.path) model = model.replace(datainfo=datainfo) else: h_dir.mkdir(parents=True, exist_ok=True) highest = 0 for file in datasets_path.iterdir(): name = file.name if name.startswith('data') and name.endswith('.csv'): number = int(name[4:-4]) # Remove data and .csv if number > highest: highest = number dataset_basename = f'data{highest + 1}' dataset_filename = f'{dataset_basename}.csv' # NOTE: Create the index file at .datasets/.hash/<hash>/<dataset_filename> index_path = h_dir / dataset_filename index_path.touch() data_path = path_absolute(datasets_path / dataset_filename) datainfo = model.datainfo.replace(path=data_path) model = model.replace(datainfo=datainfo) model = write_csv(model, path=data_path, force=True) # NOTE: Write datainfo last so that we are "sure" dataset is there # if datainfo is there model.datainfo.to_json(datasets_path / (dataset_basename + '.datainfo')) # NOTE: Write the model model_path.mkdir(exist_ok=True) write_model(model, model_path / ("model" + model.filename_extension), force=True) return model def store_local_file(self, path, new_filename=None): if Path(path).is_file(): destination = self.database.path / str(self.key) destination.mkdir(parents=True, exist_ok=True) if new_filename: destination = destination / new_filename shutil.copy2(path, destination) def store_metadata(self, metadata): destination = self.database.path / str(self.key) / DIRECTORY_PHARMPY_METADATA destination.mkdir(parents=True, exist_ok=True) with open(destination / FILE_METADATA, 'w') as f: json.dump(metadata, f, indent=2) def store_modelfit_results(self): destination = self.database.path / str(self.key) / DIRECTORY_PHARMPY_METADATA destination.mkdir(parents=True, exist_ok=True) modelfit_results = self.model_entry.modelfit_results if modelfit_results is not None: modelfit_results.to_json(destination / FILE_MODELFIT_RESULTS) def store_model_entry(self): if self.model_entry is None: raise ValueError('Transaction does not have `model_entry` attribute') # FIXME: Store parent self.store_model() self.store_modelfit_results() class LocalModelDirectoryDatabaseSnapshot(ModelSnapshot): def retrieve_local_files(self, destination_path): path = self.database.path / str(self.key) files = path.glob('*') for f in files: if f.is_file(): shutil.copy2(f, destination_path) else: shutil.copytree(f, Path(destination_path) / f.stem) def retrieve_file(self, filename): # Return path to file path = self.database.path / str(self.key) / filename if path.is_file() and stat(path).st_size > 0: return path else: raise FileNotFoundError(f"Cannot retrieve {filename} for {self.name}") def retrieve_model(self): path = self._find_full_model_path() # NOTE: This will guess the model type model = Model.parse_model(path) return model def _find_full_model_path(self): extensions = ('.mod', '.ctl') root = self.database.path / str(self.key) errors = [] for extension in extensions: filename = "model" + extension path = root / filename if path.is_file(): return path else: errors.append(path) else: raise KeyError( f'Could not find model in {self.database}.' f' Looked up {", ".join(map(lambda p: f"`{p}`", errors))}.' ) def retrieve_modelfit_results(self): model = self.retrieve_model() path = self._find_full_model_path() res = get_modelfit_results(model, path) if res is not None: return res # FIXME: The following does not work because deserialization of modelfit # results is not generic enough. We only use it to make the resume_tool # test pass. path = ( self.database.path / str(self.key) / DIRECTORY_PHARMPY_METADATA / FILE_MODELFIT_RESULTS ) if path.is_file(): return read_results(path) else: return None def retrieve_model_entry(self): model = self.retrieve_model() modelfit_results = self.retrieve_modelfit_results() return create_model_entry(model, modelfit_results)