from __future__ import annotations
from typing import Any, Optional, Union, overload
from pharmpy.basic import BooleanExpr, Expr, Quantity, Unit
from pharmpy.basic.expr import solve
from pharmpy.model import (
AddColumn,
Assignment,
CompartmentalSystem,
CompartmentalSystemBuilder,
Drop,
Infusion,
Model,
Statements,
get_and_check_dataset,
get_and_check_odes,
)
from .compartments import get_bioavailability, get_lag_times
from .odes import get_initial_conditions, get_zero_order_inputs
@overload
def get_unit_of(model: Model, variable: None) -> dict[str, Unit]: ...
@overload
def get_unit_of(model: Model, variable: Union[str, Expr]) -> Unit: ...
[docs]
def get_unit_of(model: Model, variable: Union[str, Expr, None] = None) -> Unit | dict[str, Unit]:
"""Derive the physical unit of a variable in the model
Unit information for the dataset needs to be available.
The variable can be defined in the code, a dataset column, a parameter
or a random variable. Optionally units could be derived for all variables
in the model.
Parameters
----------
model : Model
Pharmpy model
variable : str | Expr | None
Find physical unit of this variable. For None get a dict with units for
all variables defined by the model.
Returns
-------
Unit
A unit expression
Examples
--------
>>> from pharmpy.modeling import load_example_model, get_unit_of
>>> model = load_example_model("pheno")
>>> get_unit_of(model, "Y")
mg/L
>>> get_unit_of(model, "VC")
L
>>> get_unit_of(model, "WGT")
kg
"""
if isinstance(variable, str):
variable = Expr.symbol(variable)
all_symbols = get_all_symbols(model)
if variable not in all_symbols:
raise ValueError(f"Variable {variable} is not defined in the model")
di = model.datainfo
# FIXME: No multiple DV-support for now
# Map from Symbol -> known Unit
known = {
col.symbol: col.variable.properties.get("unit", None)
for col in di
if not col.variable.properties.get("unit", None) is None
}
# Set of tuples symbol, expression where units cannot yet be deduced
unknown = set()
y = list(model.dependent_variables.keys())[0]
known[y] = known[di.dv_column.symbol]
id_symbol = model.datainfo.id_column.symbol
known[id_symbol] = Unit("")
if model.statements.ode_system is not None:
amount_unit = di.typeix['dose'][0].variable.properties.get("unit", None)
if amount_unit is not None:
for amt in model.statements.ode_system.amounts:
known[amt] = amount_unit
idv_unit = di.idv_column.variable.properties.get("unit", None)
if idv_unit is not None:
known[model.statements.ode_system.t] = idv_unit
lag_times = get_lag_times(model)
for lag_time in lag_times.values():
handle_assignment(di.idv_column.symbol, lag_time, known, unknown, model)
bios = get_bioavailability(model)
for bio in bios.values():
handle_assignment(id_symbol, bio, known, unknown, model)
if amount_unit is not None:
amount_symbol = di.typeix['dose'][0].symbol
ics = get_initial_conditions(model)
for ic in ics.values():
if ic != 0:
handle_assignment(amount_symbol, ic, known, unknown, model)
else:
amount_unit = None
idv_unit = None
add_covariances(model, unknown)
for s in reversed(model.statements):
if variable is not None and variable in known:
return known[variable]
if isinstance(s, Assignment):
handle_assignment(s.symbol, s.expression, known, unknown, model)
elif isinstance(s, CompartmentalSystem):
eqs = s.eqs
zinps = get_zero_order_inputs(model)
for eq, zinp in zip(eqs, zinps):
func = eq.lhs.args[0]
assert isinstance(func, Expr)
funcname = func.name
# FIXME: Could collide
derivative_symbol = Expr.symbol(f"d{funcname}_dt")
if amount_unit is not None and idv_unit is not None:
known[derivative_symbol] = amount_unit / idv_unit
handle_assignment(derivative_symbol, eq.rhs, known, unknown, model)
if zinp != 0:
handle_assignment(derivative_symbol, zinp, known, unknown, model)
unknown = recheck_unknowns(unknown, known, model)
if variable is not None and variable in known:
return known[variable]
if variable is not None:
raise RuntimeError(f"Couldn't deduct unit for {variable}")
else:
all_units = {}
for symbol in all_symbols:
all_units[str(symbol)] = known.get(symbol, None)
return all_units
def add_covariances(model, unknown):
# Unit of variance is the square of the unit of the eta
# Unit of a covariance is the product of the units of the variances
cov = model.random_variables.covariance_matrix
for rv, diag in zip(model.random_variables.symbols, cov.diagonal()):
unknown.add((diag, rv**2))
if len(cov) == 1:
return
for row in range(1, cov.rows):
for col in range(0, row):
e = cov[row, col]
if e != 0:
unknown.add((e, cov[row, row] * cov[col, col]))
def get_all_symbols(model):
symbols = (
set(model.parameters.symbols)
| set(model.random_variables.symbols)
| model.statements.lhs_symbols
| set(model.datainfo.symbols)
)
if model.statements.ode_system is not None:
symbols.add(model.statements.ode_system.t)
return symbols
def product(a, start: Any = 1):
prod = start
for e in a:
prod *= e
return prod
def simplify_for_units(expr: Expr) -> Expr:
# Remove known unitless cases: exp, log
# FIXME: Remove constants other than -1, 1 and 0
if expr.is_add():
return sum((simplify_for_units(term) for term in expr.expr_args), start=Expr(0))
elif expr.is_mul():
return product((simplify_for_units(factor) for factor in expr.expr_args), start=Expr(1))
elif expr.is_exp() or (expr.is_function() and expr.name == "log"):
return Expr(1)
elif expr.is_pow():
return simplify_for_units(expr.expr_args[0]) ** simplify_for_units(expr.expr_args[1])
elif expr.is_function() and expr.name in {"forward", "first"}:
return simplify_for_units(expr.expr_args[0])
elif expr.is_function() and expr.name in {"newind", "count_if"}:
return Expr(1)
else:
return expr
def deduct_equal_units(symbol: Expr, expr: Expr) -> list[BooleanExpr]:
# FIXME: we could also recurse down to additions inside exp and log or parentheses
expr = simplify_for_units(expr)
eqs = []
expr = expr.expand()
if expr.is_add():
for term in expr.expr_args:
eqs.append(BooleanExpr.eq(symbol, simplify_for_units(term)))
elif expr.is_piecewise():
for piece, _ in expr.piecewise_args:
if piece != symbol and not piece.is_number():
eqs.append(BooleanExpr.eq(symbol, simplify_for_units(piece)))
else:
eqs.append(BooleanExpr.eq(symbol, simplify_for_units(expr)))
return eqs
def derive_unit(expr, known):
if expr.is_number():
unit = Unit(1)
elif expr.is_symbol():
unit = known[expr]
elif expr.is_mul():
unit = product([derive_unit(factor, known) for factor in expr.args], start=Unit(1))
elif expr.is_pow():
base, exp = expr.args
if not exp.is_integer():
raise NotImplementedError("Non integer exponent not implemented for unit deduction")
unit = derive_unit(base, known) ** int(exp)
else:
raise NotImplementedError("Expression not implemented for unit deduction")
return unit
def used_symbols(expr, model):
# This is a workardound for free_symbols which doesn't give A_...(t)
assignment = Assignment(Expr("DUMMY"), expr)
symbols = assignment.rhs_symbols
# Handle case where t is only found in amounts. Not needed for deduction
odes = model.statements.ode_system
if odes is not None:
d = {amt: Expr(f"__DUMMY___{i}") for i, amt in enumerate(odes.amounts)}
if model.statements.ode_system.t not in expr.subs(d).free_symbols:
symbols -= {model.statements.ode_system.t}
return symbols
def handle_exp(expression, known, unknown, model):
if isinstance(expression, Expr) and expression.is_exp():
idsymb = model.datainfo.id_column.symbol
handle_assignment(idsymb, expression.args[1], known, unknown, model)
elif len(expression.args) > 0:
for arg in expression.args:
handle_exp(arg, known, unknown, model)
def handle_assignment(symbol, expression, known, unknown, model):
handle_exp(expression, known, unknown, model)
eqs = deduct_equal_units(symbol, expression)
sol = solve(eqs, exclude=known.keys())
for lhs, rhs in sol.items():
# FIXME: Could also learn the unit of the whole assignment
# FIXME: Unit of covariance is product of unit of both rvs
# FIXME: Can get information from conditions in piecewises
if rhs == 1:
unit = Unit(1)
elif used_symbols(rhs, model).issubset(known.keys()):
unit = derive_unit(rhs, known)
else:
unknown.add((lhs, rhs))
continue
known[lhs] = unit
def recheck_unknowns(unknown, known, model):
still_unknown = set()
for symbol, expression in unknown:
# We need to attempt solving the equation again since
# the unknown might not be on the lhs
eq = BooleanExpr.eq(symbol, expression)
sol = solve([eq], exclude=known.keys())
sol_symbol, sol_expression = sol.popitem()
if used_symbols(sol_expression, model).issubset(known.keys()):
unit = derive_unit(sol_expression, known)
known[sol_symbol] = unit
else:
still_unknown.add((symbol, expression))
return still_unknown
[docs]
def convert_unit(
model: Model,
variable: str,
unit: Union[str, Unit],
original_unit: Optional[Union[str, Unit]] = None,
in_dataset: bool = False,
) -> Model:
"""Convert between units for a data variable
The conversion could either be handled in the model code or optionally in the dataset (if applicable).
Parameters
----------
model : Model
Pharmpy model
variable : str
Which variable in the dataset or the model code to convert
unit : str
The new unit
original_unit : str
If no original unit is available in the datainfo this will be used
in_dataset : bool
Set to True if the conversion should be done in the dataset instead of in model code
Returns
-------
Model
Updated Pharmpy model
Examples
--------
>>> from pharmpy.modeling import load_example_model, convert_unit
>>> model = load_example_model("pheno")
>>> model = convert_unit(model, "WGT", "g")
"""
unit = Unit(unit)
column = model.datainfo[variable]
if column.type == 'idv':
raise ValueError(f"Cannot scale the independent variable ({variable})")
if original_unit is None:
original_unit = column.variable.properties.get("unit", None)
if original_unit is None:
raise ValueError("Cannot find the original unit of {variable}")
original_unit = Unit(original_unit)
if original_unit == unit:
return model
molar_mass = column.variable.properties.get("molar_mass", None)
if not original_unit.is_compatible_with(unit, molar_mass=molar_mass):
raise ValueError(f"Unable to convert from {original_unit} to {unit}: different dimensions.")
if molar_mass:
conversion_factor = (
Quantity(1.0, original_unit)
.convert_to(unit, molar_mass=Quantity(molar_mass, Unit("g/mol")))
.value
)
else:
conversion_factor = Quantity(1.0, original_unit).convert_to(unit).value
conversion_factor = (
int(conversion_factor) if int(conversion_factor) == conversion_factor else conversion_factor
)
if not in_dataset:
if column.type in {'dose', 'dv'}:
odes = get_and_check_odes(model)
_raise_if_rate(odes)
dosing_cmts = odes.dosing_compartments
cb = CompartmentalSystemBuilder(odes)
cb.set_bioavailability(
dosing_cmts[0], dosing_cmts[0].bioavailability * conversion_factor
)
amounts = set(odes.amounts)
after_odes = []
for s in model.statements.after_odes:
if not s.rhs_symbols.isdisjoint(amounts):
new_s = Assignment.create(s.symbol, s.expression / conversion_factor)
after_odes.append(new_s)
else:
after_odes.append(s)
new_statements = model.statements.before_odes + CompartmentalSystem(cb) + after_odes
else:
original_symbol = Expr.symbol(variable)
scaled_symbol = Expr.symbol(f"SCALED_{variable}")
expr = conversion_factor * original_symbol
assignment = Assignment.create(scaled_symbol, expr)
new_statements = assignment + Statements.create(
[s.subs({original_symbol: scaled_symbol}) for s in model.statements]
)
model = model.replace(statements=new_statements)
else:
df = get_and_check_dataset(model)
df, di = _scale_dataset_column(df, model.datainfo, variable, conversion_factor, unit)
other_type = _get_other_type(column.type)
if other_type is not None:
other_name = di.find_single_column_name(other_type)
original_other_unit = model.datainfo[other_name].variable.properties.get("unit", None)
if original_other_unit is not None:
if other_type == 'dose':
new_v = (
unit.replace_unit_of_dimension(original_other_unit) / original_other_unit
)
new_other_unit = unit / new_v
else:
new_other_unit = original_other_unit.replace_unit_of_dimension(unit)
else:
new_other_unit = None
df, di = _scale_dataset_column(df, di, other_name, conversion_factor, new_other_unit)
model = model.replace(dataset=df, datainfo=di)
return model.update_source()
def _raise_if_rate(odes):
for comp in odes.dosing_compartments:
for dose in comp.doses:
if isinstance(dose, Infusion) and dose.rate is not None:
raise ValueError("Cannot convert unit of infusions with rate in model code")
def _get_other_type(tp):
if tp == 'dv':
other_type = 'dose'
elif tp == 'dose':
other_type = 'dv'
else:
other_type = None
return other_type
def _scale_dataset_column(df, di, variable, conversion_factor, unit):
scaled_column = conversion_factor * df[variable]
df = df.assign(**{variable: scaled_column})
if unit is not None:
new_var = di[variable].variable.set_property("unit", unit)
new_col = di[variable].replace(variable_mapping=new_var)
di = di.set_column(new_col)
prov_new = (Drop.create(variable), AddColumn.create(variable))
di = di.replace(provenance=di.provenance + prov_new)
return df, di