Source code for pharmpy.modeling.units

from __future__ import annotations

from collections import defaultdict
from typing import Dict, Iterable, Set, Tuple, TypeVar, Union

from pharmpy.basic import Unit
from pharmpy.deps import sympy
from pharmpy.internals.expr.subs import subs
from pharmpy.internals.expr.tree import prune
from pharmpy.internals.graph.directed.reachability import reachable_from
from pharmpy.model import Assignment, CompartmentalSystem, Model

T = TypeVar('T')


def _extract_minus(expr):
    if expr.is_Mul and expr.args[0] == -1:
        return sympy.Mul(*expr.args[1:])
    else:
        return expr


[docs] def get_unit_of(model: Model, variable: Union[str, sympy.Symbol]) -> Unit: """Derive the physical unit of a variable in the model Unit information for the dataset needs to be available. The variable can be defined in the code, a dataset olumn, a parameter or a random variable. Parameters ---------- model : Model Pharmpy model object variable : str or Symbol Find physical unit of this variable Returns ------- Unit A unit expression Examples -------- >>> from pharmpy.modeling import load_example_model, get_unit_of >>> model = load_example_model("pheno") >>> get_unit_of(model, "Y") milligram/liter >>> get_unit_of(model, "V") liter >>> get_unit_of(model, "WGT") kilogram """ if isinstance(variable, str): symbol = sympy.Symbol(variable) else: symbol = variable variable = variable.name di = model.datainfo if variable in di.names: return di[variable].unit # FIXME: Handle other DVs? y = sympy.sympify(list(model.dependent_variables.keys())[0]) input_units = {sympy.Symbol(col.name): col.unit._expr for col in di} pruned_nodes = {sympy.exp} def pruning_predicate(e: sympy.Expr) -> bool: return e.func in pruned_nodes unit_eqs = [] # FIXME: Using private _expr in some places. sympify doesn't work for some reason. a = di[di.dv_column.name].unit._expr unit_eqs.append(y - a) for s in model.statements: if isinstance(s, Assignment): expr = sympy.expand( subs( prune(pruning_predicate, sympy.sympify(s.expression)), input_units, simultaneous=True, ) ) if expr.is_Add: for term in expr.args: unit_eqs.append(sympy.sympify(s.symbol) - _extract_minus(term)) else: unit_eqs.append(sympy.sympify(s.symbol) - _extract_minus(expr)) elif isinstance(s, CompartmentalSystem): amt_unit = di[di.typeix['dose'][0].name].unit._expr time_unit = di[di.idv_column.name].unit._expr for e in s.compartmental_matrix.diagonal(): e = sympy.sympify(e) if e.is_Add: for term in e.args: unit_eqs.append(amt_unit / time_unit - _extract_minus(term)) elif e == 0: pass else: unit_eqs.append(amt_unit / time_unit - _extract_minus(e)) for a in s.amounts: unit_eqs.append(amt_unit - a) # NOTE: This keeps only the equations required to solve for "symbol" filtered_unit_eqs = _filter_equations(unit_eqs, symbol) # NOTE: For some reason telling sympy to solve for "symbol" does not work sol = sympy.solve(filtered_unit_eqs, dict=True) return Unit(sol[0][symbol])
def _filter_equations( equations: Iterable[sympy.Expr], symbol: sympy.Symbol ) -> Iterable[sympy.Expr]: # NOTE: This has the side effect of deduplicating equations fs = {eq: eq.free_symbols for eq in equations} # NOTE: We could first contract clique edges, but I have not found a way to # make it as elegant as the current implementation edges = _cliques_spanning_forest_edges_linear_superset(fs.values()) graph = _adjacency_list(edges) dependent_symbols = reachable_from( {symbol}, graph.__getitem__, ) # NOTE: All symbols are in the same connected component, so we only need to # test one symbol for each equation return ( eq for eq, symbols in fs.items() if symbols and next(iter(symbols)) in dependent_symbols ) def _adjacency_list(edges: Iterable[Tuple[T, T]]) -> Dict[T, Set[T]]: graph = defaultdict(set) for u, v in edges: graph[u].add(v) graph[v].add(u) return graph def _cliques_spanning_forest_edges_linear_superset( cliques: Iterable[Iterable[T]], ) -> Iterable[Tuple[T, T]]: # NOTE: This is not a forest, but it has a linear number of edges in the # input size. Building a spanning tree would require a union-find data # structure and superlinear time, which is unnecessary here since we are # only interested in connected components of the graph. for clique in cliques: yield from _clique_spanning_tree_edges(clique) def _clique_spanning_tree_edges(clique: Iterable[T]) -> Iterable[Tuple[T, T]]: it = iter(clique) try: u = next(it) except StopIteration: return for v in it: yield (u, v)