Source code for pharmpy.modeling.blq

"""
:meta private:
"""

from __future__ import annotations

from typing import Literal, Optional

from pharmpy.basic import BooleanExpr, Expr
from pharmpy.deps import sympy, sympy_stats
from pharmpy.internals.expr.funcs import PHI
from pharmpy.internals.fn.type import check_list
from pharmpy.model import Assignment, ExecutionSteps, JointNormalDistribution, Model

from .data import remove_loq_data, set_lloq_data
from .expressions import _simplify_expression_from_parameters, create_symbol

SUPPORTED_METHODS = frozenset(['m1', 'm3', 'm4', 'm5', 'm6', 'm7'])


[docs] def transform_blq( model: Model, method: Literal['m1', 'm3', 'm4', 'm5', 'm6', 'm7'] = 'm4', lloq: Optional[float] = None, ): """Transform for BLQ data Transform a given model, methods available are m1, m3, m4, m5, m6 and m7 [1]_. The blq information can come from the dataset, the lloq option or a combination. Both LLOQ and BLQ columns are supported. The table below explains which columns are used for the various cases: +-------------+-------------+------------+-------------------+---------------+-------------------+ | lloq option | LLOQ column | BLQ column | Used as indicator | Used as level | Note | +=============+=============+============+===================+===============+===================+ | Available | NA | NA | DV < lloq | lloq | | +-------------+-------------+------------+-------------------+---------------+-------------------+ | NA | Available | NA | DV < LLOQ | LLOQ | | +-------------+-------------+------------+-------------------+---------------+-------------------+ | NA | NA | Available | BLQ | nothing | Only for M1 and M7| +-------------+-------------+------------+-------------------+---------------+-------------------+ | NA | NA | NA | NA | NA | No BLQ handling | +-------------+-------------+------------+-------------------+---------------+-------------------+ | NA | Available | Available | BLQ | LLOQ | DV column not used| +-------------+-------------+------------+-------------------+---------------+-------------------+ | Available | NA | Available | BLQ | lloq | | +-------------+-------------+------------+-------------------+---------------+-------------------+ | Available | Available | NA | DV < lloq | lloq | Column overridden | +-------------+-------------+------------+-------------------+---------------+-------------------+ | Available | Available | Available | DV < lloq | lloq | Columns overridden| +-------------+-------------+------------+-------------------+---------------+-------------------+ BLQ observations are defined as shown in the table above. If both a BLQ and an LLOQ column exist in the dataset and no lloq is specified then all dv values in rows with BLQ = 1 are counted as BLQ observations. If instead an lloq value is specified then all rows with dv values below the lloq value are counted as BLQ observations. If no lloq is specified and no BLQ column exists in the dataset then all rows with dv values below the value specified in the DV column are counted as BLQ observations. M1 method: All BLQ observations are discarded. This may affect the size of the dataset. M3 method: Including the probability that the BLQ observations are below the LLOQ as part of the maximum likelihood estimation. For more details see :ref:`[1]<ref_article>`. This method modifies the Y statement of the model (see examples below). M4 method: Including the probability that the BLQ observations are below the LLOQ and positive as part of the maximum likelihood estimation. For more details see :ref:`[1]<ref_article>`. This method modifies the Y statement of the model (see examples below). M5 method: All BLQ observations are replaced by level/2, where level = lloq if lloq is specified. Else level = value specified in LLOQ column (see table above). This method may change entries in the dataset. M6 method: Every BLQ observation in a consecutive series of BLQ observations is discarded except for the first one. The remaining BLQ observations are replaced by level/2, where level = lloq if lloq is specified. Else level = value specified in LLOQ column (see table above). This method may change entries in the dataset as well as the size of the dataset. M7 method: All BLQ observations are replaced by 0. This method may change entries in the dataset. Current limitations of the m3 and m4 method: * Does not support covariance between epsilons * Supports additive, proportional, combined, and power error model .. _ref_article: .. [1] Beal SL. Ways to fit a PK model with some data below the quantification limit. J Pharmacokinet Pharmacodyn. 2001 Oct;28(5):481-504. doi: 10.1023/a:1012299115260. Erratum in: J Pharmacokinet Pharmacodyn 2002 Jun;29(3):309. PMID: 11768292. Parameters ---------- model : Model Pharmpy model method : str Which BLQ method to use lloq : float, optional LLOQ limit to use, if None Pharmpy will use the BLQ/LLOQ column in the dataset Return ------ Model Pharmpy model object Examples -------- >>> from pharmpy.modeling import * >>> model = load_example_model("pheno") >>> model = transform_blq(model, method='m4', lloq=0.1) >>> model.statements.find_assignment("Y") ⎧ EPS₁⋅F + F for DV ≥ LLOQ ⎨CUMD - CUMDZ ⎪──────────── otherwise Y = ⎩ 1 - CUMDZ See also -------- remove_loq_data set_lloq_data """ lometh = method.lower() check_list("method", lometh, SUPPORTED_METHODS) blq_col, lloq_col = _get_blq_and_lloq_columns(model) indicator, indicator_type, level, level_type = _which_indicator_and_level( lloq, lloq_col, blq_col, lometh ) if lometh == 'm1': model = _m1_method(model, indicator, indicator_type) elif lometh in ('m3', 'm4'): _verify_model(model, method) model = _m3_m4_method(model, indicator, indicator_type, level, level_type, lometh) elif lometh == 'm5': model = _m5_method(model, indicator, indicator_type, level, level_type) elif lometh == 'm6': model = _m6_method(model, indicator, indicator_type, level, level_type) elif lometh == 'm7': model = _m7_method(model, indicator, indicator_type) return model
def _m1_method(model, indicator, tp): if tp in ('lloq', 'LLOQ'): return remove_loq_data(model, lloq=indicator) else: # tp == 'blq' return remove_loq_data(model, blq=indicator) def _m5_method(model, indicator, tp, level, level_type): if level_type == 'LLOQ': half = f'{level}/2' else: half = level / 2 if tp in ('lloq', 'LLOQ'): return set_lloq_data(model, half, lloq=indicator) else: # tp == 'blq' return set_lloq_data(model, half, blq=indicator) def _m6_method(model, indicator, tp, level, level_type): if level_type == 'LLOQ': half = f'{level}/2' else: half = level / 2 if tp in ('lloq', 'LLOQ'): model = remove_loq_data(model, lloq=indicator, keep=1) return set_lloq_data(model, half, lloq=indicator) else: # tp == 'blq' model = remove_loq_data(model, blq=indicator, keep=1) return set_lloq_data(model, half, blq=indicator) def _m7_method(model, indicator, tp): if tp in ('lloq', 'LLOQ'): return set_lloq_data(model, 0, lloq=indicator) else: # tp == 'blq': return set_lloq_data(model, 0, blq=indicator) def _m3_m4_method(model, indicator, indicator_type, level, level_type, method): sset = model.statements est_steps = model.execution_steps est_steps_new = ExecutionSteps(tuple(est_step.replace(laplace=True) for est_step in est_steps)) model = model.replace(execution_steps=est_steps_new) # FIXME: Handle other DVs? y_symb = list(model.dependent_variables.keys())[0] y = sset.find_assignment(y_symb) ipred = y.expression.subs({rv: 0 for rv in model.random_variables.epsilons.names}) lloq_symbol = create_symbol(model, 'LLOQ') if indicator_type == 'lloq': indicator_symb = lloq_symbol else: indicator_symb = Expr.symbol(indicator) if level_type == 'lloq': level_symb = lloq_symbol else: level_symb = Expr.symbol(level) sd = _get_sd(model, y) symb_dv = Expr.symbol(model.datainfo.dv_column.name) symb_fflag = create_symbol(model, 'F_FLAG') symb_cumd = create_symbol(model, 'CUMD') if indicator_type in ('lloq', 'LLOQ'): is_above_lloq = BooleanExpr.ge(symb_dv, indicator_symb) else: is_above_lloq = BooleanExpr.eq(indicator_symb, 0) assignments = [sd] if indicator_type == 'lloq' or level_type == 'lloq': if indicator_type == 'lloq': symbol = indicator_symb value = indicator else: symbol = level_symb value = level lloq = Assignment(symbol, Expr.float(value)) assignments.append(lloq) assignments += Assignment.create(symb_fflag, Expr.piecewise((0, is_above_lloq), (1, True))) cumd = Assignment.create(symb_cumd, PHI((level_symb - ipred) / sd.symbol)) if method == 'm3': assignments += Assignment.create( y.symbol, Expr.piecewise((y.expression, is_above_lloq), (cumd.expression, True)) ) else: assignments += cumd symb_cumdz = create_symbol(model, 'CUMDZ') assignments += Assignment.create(symb_cumdz, PHI(-ipred / sd.symbol)) y_below_lloq = (symb_cumd - symb_cumdz) / (1 - symb_cumdz) assignments += Assignment.create( y.symbol, Expr.piecewise((y.expression, is_above_lloq), (y_below_lloq, True)) ) y_idx = sset.find_assignment_index(y.symbol) sset_new = sset[:y_idx] + assignments + sset[y_idx + 1 :] model = model.replace(statements=sset_new) return model.update_source() def has_blq_transformation(model: Model): # FIXME: Make more general y_symb = list(model.dependent_variables.keys())[0] y = model.statements.error.find_assignment(y_symb) if not y: raise ValueError(f'Could not find assignment for \'{y_symb}\'') y_expr = y.expression if not y_expr.is_piecewise(): return False for statement, cond in y_expr.args: # pyright: ignore [reportGeneralTypeIssues] blq_symb, _ = get_blq_symb_and_type(model) if blq_symb in cond.free_symbols: break else: return False expected_m3 = ['SD', 'F_FLAG'] expected_m4 = ['SD', 'F_FLAG', 'CUMD', 'CUMDZ'] return _has_all_expected_symbs(model.statements.error, expected_m3) or _has_all_expected_symbs( model.statements.error, expected_m4 ) def _get_blq_and_lloq_columns(model: Model): try: blq_datainfo = model.datainfo.typeix['blq'] except IndexError: blq = None else: blq = blq_datainfo[0].name try: lloq_datainfo = model.datainfo.typeix['lloq'] except IndexError: lloq = None else: lloq = lloq_datainfo[0].name return blq, lloq def _which_indicator_and_level(lloq, lloq_col, blq_col, method): # Returns indicator, indicator type, level and level_type # indicator type can be 'lloq' for a value, 'LLOQ' for a column or 'blq' # level_type can be 'lloq' or 'LLOQ' if lloq is not None: if lloq_col is None and blq_col is not None: return blq_col, 'blq', lloq, 'lloq' else: return lloq, 'lloq', lloq, 'lloq' elif blq_col is not None: if lloq is None and lloq_col is None and method not in ('m1', 'm7'): raise ValueError( "Can only find a BLQ column. Only supported for the M1 and M7 methods." ) return blq_col, 'blq', lloq_col, 'LLOQ' elif lloq_col is not None: return lloq_col, 'LLOQ', lloq_col, 'LLOQ' else: raise ValueError("No BLQ or LLOQ information available") def _get_blq_name_and_type(model: Model): try: blq_datainfo = model.datainfo.typeix['lloq'] return blq_datainfo[0].name, 'lloq' except IndexError: blq_datainfo = model.datainfo.typeix['blq'] return blq_datainfo[0].name, 'blq' def get_blq_symb_and_type(model: Model): try: name, tp = _get_blq_name_and_type(model) return Expr.symbol(name), tp except IndexError: return Expr.symbol('LLOQ'), 'lloq' def _has_all_expected_symbs(sset, expected_symbs): symb_names = [s.symbol.name for s in sset] return all(symb in symb_names for symb in expected_symbs) def _verify_model(model, method): rvs = model.random_variables.epsilons if any(isinstance(rv, JointNormalDistribution) for rv in rvs): raise ValueError( f'Invalid input model: covariance between epsilons not supported in `method` {method}' ) def _get_sd(model, y): y_expr = model.statements.find_assignment(y.symbol).expression sd_expr = get_sd_expr(y_expr, model.random_variables, model.parameters) symb_sd = create_symbol(model, 'SD') return Assignment.create(symb_sd, sd_expr) def get_sd_expr(y_expr, rvs, params): rv_terms = [arg for arg in y_expr.args if arg.free_symbols.intersection(rvs.free_symbols)] sd_expr = [] for i, term in enumerate(rv_terms, 1): rvs_in_term = rvs.free_symbols.intersection(term.free_symbols) if len(rvs_in_term) > 1: raise ValueError( 'Invalid input model: error model not supported, terms in error model cannot contain ' 'more than one random variable' ) expr = rvs.replace_with_sympy_rvs(term) sd_expr.append(sympy_stats.std(expr)) return _simplify_expression_from_parameters( sympy.sqrt(sympy.Add(*[expr**2 for expr in sd_expr])), params )