Source code for pharmpy.workflows.call

from typing import TypeVar

from .workflow import Workflow, WorkflowBuilder, insert_context

T = TypeVar('T')


[docs] def call_workflow(wf: Workflow[T], unique_name, ctx) -> T: """Dynamically call a workflow from another workflow. Currently only supports dask distributed Parameters ---------- wf : Workflow A workflow object unique_name : str A name of the results node that is unique between parent and dynamically created workflows ctx : Context Context to pass to new workflow Returns ------- Any Whatever the dynamic workflow returns """ from dask.distributed import get_client, rejoin, secede from .optimize import optimize_task_graph_for_dask_distributed wb = WorkflowBuilder(wf) insert_context(wb, ctx) wf = Workflow(wb) client = get_client() dsk = wf.as_dask_dict() dsk[unique_name] = dsk.pop('results') dsk_optimized = optimize_task_graph_for_dask_distributed(client, dsk) futures = client.get(dsk_optimized, unique_name, sync=False) secede() res: T = client.gather(futures) # pyright: ignore [reportGeneralTypeIssues] rejoin() return res