Source code for pharmpy.modeling.blq
"""
:meta private:
"""
from __future__ import annotations
from typing import Literal, Optional
from pharmpy.basic import BooleanExpr, Expr
from pharmpy.deps import sympy, sympy_stats
from pharmpy.internals.expr.funcs import PHI
from pharmpy.internals.fn.type import check_list
from pharmpy.model import Assignment, ExecutionSteps, JointNormalDistribution, Model
from .data import remove_loq_data, set_lloq_data
from .expressions import _simplify_expression_from_parameters, create_symbol
SUPPORTED_METHODS = frozenset(['m1', 'm3', 'm4', 'm5', 'm6', 'm7'])
[docs]
def transform_blq(
    model: Model,
    method: Literal['m1', 'm3', 'm4', 'm5', 'm6', 'm7'] = 'm4',
    lloq: Optional[float] = None,
):
    """Transform for BLQ data
    Transform a given model, methods available are m1, m3, m4, m5, m6 and m7 [1]_.
    The blq information can come from the dataset, the lloq option or a combination. Both LLOQ and BLQ
    columns are supported. The table below explains which columns are used for the various cases:
    +-------------+-------------+------------+-------------------+---------------+-------------------+
    | lloq option | LLOQ column | BLQ column | Used as indicator | Used as level | Note              |
    +=============+=============+============+===================+===============+===================+
    | Available   | NA          | NA         | DV < lloq         | lloq          |                   |
    +-------------+-------------+------------+-------------------+---------------+-------------------+
    | NA          | Available   | NA         | DV < LLOQ         | LLOQ          |                   |
    +-------------+-------------+------------+-------------------+---------------+-------------------+
    | NA          | NA          | Available  | BLQ               | nothing       | Only for M1 and M7|
    +-------------+-------------+------------+-------------------+---------------+-------------------+
    | NA          | NA          | NA         | NA                | NA            | No BLQ handling   |
    +-------------+-------------+------------+-------------------+---------------+-------------------+
    | NA          | Available   | Available  | BLQ               | LLOQ          | DV column not used|
    +-------------+-------------+------------+-------------------+---------------+-------------------+
    | Available   | NA          | Available  | BLQ               | lloq          |                   |
    +-------------+-------------+------------+-------------------+---------------+-------------------+
    | Available   | Available   | NA         | DV < lloq         | lloq          | Column overridden |
    +-------------+-------------+------------+-------------------+---------------+-------------------+
    | Available   | Available   | Available  | DV < lloq         | lloq          | Columns overridden|
    +-------------+-------------+------------+-------------------+---------------+-------------------+
    BLQ observations are defined as shown in the table above.
    If both a BLQ and an LLOQ column exist in the dataset and no lloq is specified then all dv values in
    rows with BLQ = 1 are counted as BLQ observations. If instead an lloq value is specified then all rows with
    dv values below the lloq value are counted as BLQ observations.
    If no lloq is specified and no BLQ column exists in the dataset then all rows with dv values below the value
    specified in the DV column are counted as BLQ observations.
    M1 method:
        All BLQ observations are discarded.
        This may affect the size of the dataset.
    M3 method:
        Including the probability that the BLQ observations are below the LLOQ
        as part of the maximum likelihood estimation.
        For more details see :ref:`[1]<ref_article>`.
        This method modifies the Y statement of the model (see examples below).
    M4 method:
        Including the probability that the BLQ observations are below the LLOQ and positive
        as part of the maximum likelihood estimation.
        For more details see :ref:`[1]<ref_article>`.
        This method modifies the Y statement of the model (see examples below).
    M5 method:
        All BLQ observations are replaced by level/2, where level = lloq if lloq is specified.
        Else level = value specified in LLOQ column (see table above).
        This method may change entries in the dataset.
    M6 method:
        Every BLQ observation in a consecutive series of BLQ observations is discarded except for the first one.
        The remaining BLQ observations are replaced by level/2, where level = lloq if lloq is specified.
        Else level = value specified in LLOQ column (see table above).
        This method may change entries in the dataset as well as the size of the dataset.
    M7 method:
        All BLQ observations are replaced by 0.
        This method may change entries in the dataset.
    Current limitations of the m3 and m4 method:
    * Does not support covariance between epsilons
    * Supports additive, proportional, combined, and power error model
    .. _ref_article:
    .. [1] Beal SL. Ways to fit a PK model with some data below the quantification
       limit. J Pharmacokinet Pharmacodyn. 2001 Oct;28(5):481-504. doi: 10.1023/a:1012299115260.
       Erratum in: J Pharmacokinet Pharmacodyn 2002 Jun;29(3):309. PMID: 11768292.
    Parameters
    ----------
    model : Model
        Pharmpy model
    method : str
        Which BLQ method to use
    lloq : float, optional
        LLOQ limit to use, if None Pharmpy will use the BLQ/LLOQ column in the dataset
    Return
    ------
    Model
        Updated Pharmpy model
    Examples
    --------
    >>> from pharmpy.modeling import *
    >>> model = load_example_model("pheno")
    >>> model = transform_blq(model, method='m4', lloq=0.1)
    >>> model.statements.find_assignment("Y")
        ⎧ EPS₁⋅F + F   for DV ≥ LLOQ
        ⎪
        ⎨CUMD - CUMDZ
        ⎪────────────    otherwise
    Y = ⎩ 1 - CUMDZ
    See also
    --------
    remove_loq_data
    set_lloq_data
    """
    lometh = method.lower()
    check_list("method", lometh, SUPPORTED_METHODS)
    blq_col, lloq_col = _get_blq_and_lloq_columns(model)
    indicator, indicator_type, level, level_type = _which_indicator_and_level(
        lloq, lloq_col, blq_col, lometh
    )
    if lometh == 'm1':
        model = _m1_method(model, indicator, indicator_type)
    elif lometh in ('m3', 'm4'):
        _verify_model(model, method)
        model = _m3_m4_method(model, indicator, indicator_type, level, level_type, lometh)
    elif lometh == 'm5':
        model = _m5_method(model, indicator, indicator_type, level, level_type)
    elif lometh == 'm6':
        model = _m6_method(model, indicator, indicator_type, level, level_type)
    elif lometh == 'm7':
        model = _m7_method(model, indicator, indicator_type)
    return model
def _m1_method(model, indicator, tp):
    if tp in ('lloq', 'LLOQ'):
        return remove_loq_data(model, lloq=indicator)
    else:  # tp == 'blq'
        return remove_loq_data(model, blq=indicator)
def _m5_method(model, indicator, tp, level, level_type):
    if level_type == 'LLOQ':
        half = f'{level}/2'
    else:
        half = level / 2
    if tp in ('lloq', 'LLOQ'):
        return set_lloq_data(model, half, lloq=indicator)
    else:  # tp == 'blq'
        return set_lloq_data(model, half, blq=indicator)
def _m6_method(model, indicator, tp, level, level_type):
    if level_type == 'LLOQ':
        half = f'{level}/2'
    else:
        half = level / 2
    if tp in ('lloq', 'LLOQ'):
        model = remove_loq_data(model, lloq=indicator, keep=1)
        return set_lloq_data(model, half, lloq=indicator)
    else:  # tp == 'blq'
        model = remove_loq_data(model, blq=indicator, keep=1)
        return set_lloq_data(model, half, blq=indicator)
def _m7_method(model, indicator, tp):
    if tp in ('lloq', 'LLOQ'):
        return set_lloq_data(model, 0, lloq=indicator)
    else:  # tp == 'blq':
        return set_lloq_data(model, 0, blq=indicator)
def _m3_m4_method(model, indicator, indicator_type, level, level_type, method):
    sset = model.statements
    est_steps = model.execution_steps
    est_steps_new = ExecutionSteps(tuple(est_step.replace(laplace=True) for est_step in est_steps))
    model = model.replace(execution_steps=est_steps_new)
    # FIXME: Handle other DVs?
    y_symb = list(model.dependent_variables.keys())[0]
    y = sset.find_assignment(y_symb)
    ipred = y.expression.subs({rv: 0 for rv in model.random_variables.epsilons.names})
    lloq_symbol = create_symbol(model, 'LLOQ')
    if indicator_type == 'lloq':
        indicator_symb = lloq_symbol
    else:
        indicator_symb = Expr.symbol(indicator)
    if level_type == 'lloq':
        level_symb = lloq_symbol
    else:
        level_symb = Expr.symbol(level)
    sd = _get_sd(model, y)
    symb_dv = Expr.symbol(model.datainfo.dv_column.name)
    symb_fflag = create_symbol(model, 'F_FLAG')
    symb_cumd = create_symbol(model, 'CUMD')
    if indicator_type in ('lloq', 'LLOQ'):
        is_above_lloq = BooleanExpr.ge(symb_dv, indicator_symb)
    else:
        is_above_lloq = BooleanExpr.eq(indicator_symb, 0)
    assignments = [sd]
    if indicator_type == 'lloq' or level_type == 'lloq':
        if indicator_type == 'lloq':
            symbol = indicator_symb
            value = indicator
        else:
            symbol = level_symb
            value = level
        lloq = Assignment(symbol, Expr.float(value))
        assignments.append(lloq)
    assignments += Assignment.create(symb_fflag, Expr.piecewise((0, is_above_lloq), (1, True)))
    cumd = Assignment.create(symb_cumd, PHI((level_symb - ipred) / sd.symbol))
    if method == 'm3':
        assignments += Assignment.create(
            y.symbol, Expr.piecewise((y.expression, is_above_lloq), (cumd.expression, True))
        )
    else:
        assignments += cumd
        symb_cumdz = create_symbol(model, 'CUMDZ')
        assignments += Assignment.create(symb_cumdz, PHI(-ipred / sd.symbol))
        y_below_lloq = (symb_cumd - symb_cumdz) / (1 - symb_cumdz)
        assignments += Assignment.create(
            y.symbol, Expr.piecewise((y.expression, is_above_lloq), (y_below_lloq, True))
        )
    y_idx = sset.find_assignment_index(y.symbol)
    sset_new = sset[:y_idx] + assignments + sset[y_idx + 1 :]
    model = model.replace(statements=sset_new, value_type=symb_fflag)
    return model.update_source()
def has_blq_transformation(model: Model):
    # FIXME: Make more general
    y_symb = list(model.dependent_variables.keys())[0]
    y = model.statements.error.find_assignment(y_symb)
    if not y:
        raise ValueError(f'Could not find assignment for \'{y_symb}\'')
    y_expr = y.expression
    if not y_expr.is_piecewise():
        return False
    for statement, cond in y_expr.piecewise_args:
        blq_symb, _ = get_blq_symb_and_type(model)
        if blq_symb in cond.free_symbols:
            break
    else:
        return False
    expected_m3 = ['SD', 'F_FLAG']
    expected_m4 = ['SD', 'F_FLAG', 'CUMD', 'CUMDZ']
    return _has_all_expected_symbs(model.statements.error, expected_m3) or _has_all_expected_symbs(
        model.statements.error, expected_m4
    )
def _get_blq_and_lloq_columns(model: Model):
    try:
        blq_datainfo = model.datainfo.typeix['blq']
    except IndexError:
        blq = None
    else:
        blq = blq_datainfo[0].name
    try:
        lloq_datainfo = model.datainfo.typeix['lloq']
    except IndexError:
        lloq = None
    else:
        lloq = lloq_datainfo[0].name
    return blq, lloq
def _which_indicator_and_level(lloq, lloq_col, blq_col, method):
    # Returns indicator, indicator type, level and level_type
    # indicator type can be 'lloq' for a value, 'LLOQ' for a column or 'blq'
    # level_type can be 'lloq' or 'LLOQ'
    if lloq is not None:
        if lloq_col is None and blq_col is not None:
            return blq_col, 'blq', lloq, 'lloq'
        else:
            return lloq, 'lloq', lloq, 'lloq'
    elif blq_col is not None:
        if lloq is None and lloq_col is None and method not in ('m1', 'm7'):
            raise ValueError(
                "Can only find a BLQ column. Only supported for the M1 and M7 methods."
            )
        return blq_col, 'blq', lloq_col, 'LLOQ'
    elif lloq_col is not None:
        return lloq_col, 'LLOQ', lloq_col, 'LLOQ'
    else:
        raise ValueError("No BLQ or LLOQ information available")
def _get_blq_name_and_type(model: Model):
    try:
        blq_datainfo = model.datainfo.typeix['lloq']
        return blq_datainfo[0].name, 'lloq'
    except IndexError:
        blq_datainfo = model.datainfo.typeix['blq']
        return blq_datainfo[0].name, 'blq'
def get_blq_symb_and_type(model: Model):
    try:
        name, tp = _get_blq_name_and_type(model)
        return Expr.symbol(name), tp
    except IndexError:
        return Expr.symbol('LLOQ'), 'lloq'
def _has_all_expected_symbs(sset, expected_symbs):
    symb_names = [s.symbol.name for s in sset]
    return all(symb in symb_names for symb in expected_symbs)
def _verify_model(model, method):
    rvs = model.random_variables.epsilons
    if any(isinstance(rv, JointNormalDistribution) for rv in rvs):
        raise ValueError(
            f'Invalid input model: covariance between epsilons not supported in `method` {method}'
        )
def _get_sd(model, y):
    y_expr = model.statements.find_assignment(y.symbol).expression
    sd_expr = get_sd_expr(y_expr, model.random_variables, model.parameters)
    symb_sd = create_symbol(model, 'SD')
    return Assignment.create(symb_sd, sd_expr)
def get_sd_expr(y_expr, rvs, params):
    rv_terms = [arg for arg in y_expr.args if arg.free_symbols.intersection(rvs.free_symbols)]
    sd_expr = []
    for i, term in enumerate(rv_terms, 1):
        rvs_in_term = rvs.free_symbols.intersection(term.free_symbols)
        if len(rvs_in_term) > 1:
            raise ValueError(
                'Invalid input model: error model not supported, terms in error model cannot contain '
                'more than one random variable'
            )
        expr = rvs.replace_with_sympy_rvs(term)
        sd_expr.append(sympy_stats.std(expr))
    return _simplify_expression_from_parameters(
        sympy.sqrt(sympy.Add(*[expr**2 for expr in sd_expr])), params
    )