Source code for pharmpy.workflows.task

from typing import Callable, Generic, TypeVar

from pharmpy.internals.immutable import Immutable

T = TypeVar('T')


[docs] class Task(Generic[T], Immutable): """One task in a workflow Parameters ---------- name : str Name of task function : func Task function task_input : any Input arguments to func """ def __init__(self, name: str, function: Callable[..., T], *task_input): self._name = name self._function = function self._task_input: tuple = task_input
[docs] @classmethod def create(cls, name: str, function: Callable[..., T], *task_input): return cls(name, function, *task_input)
[docs] def replace(self, **kwargs): name = kwargs.get("name", self._name) function = kwargs.get("function", self._function) task_input = kwargs.get("task_input", self._task_input) return Task.create( name, function, *task_input, )
@property def name(self): """Task name""" return self._name @property def function(self): """Task function""" return self._function @property def task_input(self): """Tuple of static input to function""" return self._task_input def __repr__(self): return self._name