Source code for pharmpy.workflows.task
from __future__ import annotations
from typing import Callable, Self
from pharmpy.internals.immutable import Immutable
[docs]
class Task(Immutable):
"""One task in a workflow
Parameters
----------
name : str
Name of task
function : func
Task function
task_input : any
Input arguments to func
"""
def __init__(self, name: str, function: Callable, *task_input):
self._name = name
self._function = function
self._task_input: tuple = task_input
[docs]
@classmethod
def create(cls, name: str, function: Callable, *task_input) -> Self:
return cls(name, function, *task_input)
[docs]
def replace(self, **kwargs) -> Task:
name = kwargs.get("name", self._name)
function = kwargs.get("function", self._function)
task_input = kwargs.get("task_input", self._task_input)
return Task.create(
name,
function,
*task_input,
)
@property
def name(self) -> str:
"""Task name"""
return self._name
@property
def function(self) -> Callable:
"""Task function"""
return self._function
@property
def task_input(self) -> tuple:
"""Tuple of static input to function"""
return self._task_input
def __repr__(self) -> str:
return self._name