Source code for pharmpy.data.read

# Read dataset from file
import re
import warnings
from io import StringIO
from pathlib import Path

import numpy as np
import pandas as pd
from lark import Lark

import pharmpy.data
from pharmpy.data import ColumnType, DatasetError, DatasetWarning


[docs]class NMTRANDataIO(StringIO): """An IO class that is a prefilter for pandas.read_table. Things that must be done before using pandas will be done here. Currently it takes care of filtering out ignored rows and handles special delimiter cases """ def __init__(self, filename_or_io, ignore_character='#'): """filename_or_io is a string with a path, a path object or any IO object, i.e. StringIO""" if not ignore_character: ignore_character = '#' if hasattr(filename_or_io, 'read'): contents = filename_or_io.read() else: with open(str(filename_or_io), 'r', encoding='latin-1') as datafile: contents = datafile.read() # All variations of newlines are converted into \n if ignore_character == '@': # FIXME: Does this really handle the final line with no new line? comment_regexp = re.compile(r'^[ \t]*[A-Za-z#@].*\n', re.MULTILINE) else: comment_regexp = re.compile('^[' + ignore_character + '].*\n', re.MULTILINE) contents = re.sub(comment_regexp, '', contents) if re.search(r' \t', contents): # Space before TAB not allowed (see documentation) raise DatasetError( "The dataset contains a TAB preceeded by a space, " "which is not allowed by NM-TRAN" ) if re.search(r'^[ \t]*\n$', contents, re.MULTILINE): # Blank lines raise DatasetError( "The dataset contains one or more blank lines. This is not " "allowed by NM-TRAN without the BLANKOK option" ) super().__init__(contents)
[docs]def convert_fortran_number(number_string): """This function will try to convert the number_string from the general fortran exponential format into an np.float64. It covers "1d1", "1D1", "a+b", "a-b", "+" and "-". All other cases will return None to signal that the number_string is not of the special form. Move somewhere else. Will be used in output parsing as well """ try: y = np.float64(number_string) return y except (TypeError, ValueError): pass if number_string == '+' or number_string == '-': return 0.0 m = re.match(r'([+\-]?)([^+\-dD]*)([+-])([^+\-dD]*)', number_string) if m: mantissa_sign = '-' if m.group(1) == '-' else '' mantissa = m.group(2) exponent_sign = m.group(3) exponent = m.group(4) return np.float64(mantissa_sign + mantissa + "E" + exponent_sign + exponent) if "D" in number_string or "d" in number_string: clean_number = number_string.replace("D", "e").replace("d", "e") try: y = np.float64(clean_number) return y except (TypeError, ValueError): pass raise ValueError(f"Could not convert the fortran number {number_string} to float")
def _convert_data_item(x, null_value): if x is None or x == '.' or x == '': x = null_value if len(x) > 24: raise DatasetError("The dataset contains an item that is longer than 24 characters") try: converted = convert_fortran_number(x) except ValueError as e: raise DatasetError(str(e)) from e if converted in pharmpy.data.conf.na_values: return np.nan return converted def _make_ids_unique(df, columns): """Check if id numbers are reused and make renumber. If not simply pass through the dataset.""" try: id_label = df.pharmpy.id_label except KeyError: return df if id_label in columns: id_series = df[id_label] id_change = id_series.diff(1) != 0 if len(id_series[id_change]) != len(df.pharmpy.ids): warnings.warn( "Dataset contains non-unique id numbers. Renumbering starting from 1", DatasetWarning, ) df[df.pharmpy.id_label] = id_change.cumsum() return df def _filter_ignore_accept(df, ignore, accept, null_value): if ignore and accept: raise ValueError("Cannot have both IGNORE and ACCEPT") if not ignore and not accept: return df if ignore: statements = ignore else: statements = accept grammar = r''' start: column [space] operator [space] value | column [space] value column: COLNAME operator: OP_EQ | OP_STR_EQ | OP_NE | OP_STR_NE | OP_LT | OP_GT | OP_LT_EQ | OP_GT_EQ value: TEXT | QUOTE space: WS COLNAME: /\w+/ WS: /\s+/ OP_EQ : ".EQN." OP_STR_EQ: ".EQ." | "==" | "=" OP_NE : ".NEN." OP_STR_NE: ".NE." | "/=" OP_LT : ".LT." | "<" OP_GT : ".GT." | ">" OP_LT_EQ : ".LE." | "<=" OP_GT_EQ : ".GE." | ">=" TEXT: /[^"',;()=\s]+/ QUOTE: /"[^"]*"/ | /'[^']*'/ ''' parser = Lark(grammar) for s in statements: tree = parser.parse(s) operator = '==' operator_type = str for st in tree.iter_subtrees(): if st.data == 'column': column = str(st.children[0]) elif st.data == 'value': value = str(st.children[0]) elif st.data == 'operator': operator_token = st.children[0] tp = operator_token.type if tp == 'OP_EQ': operator = '==' operator_type = float elif tp == 'OP_NE': operator = '!=' operator_type = float elif tp == 'OP_LT': operator = '<' operator_type = float elif tp == 'OP_GT': operator = '>' operator_type = float elif tp == 'OP_LT_EQ': operator = '<=' operator_type = float elif tp == 'OP_GT_EQ': operator = '>=' operator_type = float elif tp == 'OP_STR_EQ': operator = '==' operator_type = str elif tp == 'OP_STR_NE': operator = '!=' operator_type = str if len(value) >= 3 and ( (value.startswith("'") and value.endswith("'")) or (value.startswith('"') and value.endswith('"')) ): value = value[1:-1] if operator_type == str: expression = f'{column} {operator} "{value}"' if ignore: expression = 'not(' + expression + ')' df.query(expression, inplace=True) else: # Need to temporary convert column. Refer to NONMEM fileformat documentation # for further information. # Using a name with spaces since this cannot collide with other NONMEM names magic_colname = 'a a' df[magic_colname] = df[column].apply(_convert_data_item, args=(str(null_value),)) expression = f'`{magic_colname}` {operator} {value}' if ignore: expression = 'not(' + expression + ')' df.query(expression, inplace=True) df.drop(labels=magic_colname, axis=1, inplace=True) df.reset_index(drop=True, inplace=True) return df
[docs]def infer_column_type(colname): """If possible infer the column type from the column name else use unknown""" if colname == 'ID' or colname == 'L1': return ColumnType.ID elif colname == 'DV': return ColumnType.DV elif colname == 'MDV': return ColumnType.EVENT else: return ColumnType.UNKNOWN
# def _translate_nonmem_time_value(time): # if ':' in time: # components = time.split(':') # if len(components) > 3: # raise DatasetError(f'Bad TIME format: {time}') # hours = convert_fortran_number(components[0]) + convert_fortran_number(components[1]) * 60 # if len(components) == 3: # hours += convert_fortran_number(components[2]) # return hours # else: # return time # def _translate_nonmem_time_and_date(df, timecol='TIME', datecol=None): # relative_time = df[timecol].str.contains(':').any() # if relative_time: # df[timecol] = df[timecol].apply(_translate_nonmem_time_value) # df[timecol] = df[timecol] - df[timecol].first() # return df # """ # if date: # m = re.match(r'(?P<first>^[-]?\d+)(\D+(?P<second>\d+))?(\D+(?P<third>\d+))?$', date) # if not m: # raise ValueError('Bad DATE format: {date}') # first = m.group('first') # second = m.group('second') # third = m.group('third') # if second is None and third is None: # hours += float(first) * 24 # day # elif third is None: # float(first) * 24 + float(second) # day and month # elif date_label == 'DAT2': # yy-mm-dd # """
[docs]def read_nonmem_dataset( path_or_io, raw=False, ignore_character='#', colnames=tuple(), coltypes=None, drop=None, null_value='0', parse_columns=tuple(), ignore=None, accept=None, ): """Read a nonmem dataset from file column types will be inferred from the column names raw - minimal processing, data will be kept in string format. ignore_character colnames - List or tuple of names to give each column given in order. Names need to be unique drop - A list or tuple of booleans of which columns to drop null_value - Value to use for NULL, i.e. empty records or padding parse_columns - Only applicable when raw=True. A list of columns to parse. ignore/accept - List of ignore/accept expressions The following postprocessing operations are done to a non-raw dataset 1. Convert ordinary floating point numbers to float64 2. Convert numbers of special fortran format to float64 3. Convert None, '.', empty string to the NULL value 4. Convert Inf/NaN properly 5. Pad with null_token columns if $INPUT has more columns than the dataset 6. Strip away superfluous columns from the dataset """ if drop is None: drop = [False] * len(colnames) non_dropped = [name for name, dropped in zip(colnames, drop) if not dropped] if len(non_dropped) > len(set(non_dropped)): raise KeyError('Column names are not unique') file_io = NMTRANDataIO(path_or_io, ignore_character) df = pd.read_table( file_io, sep=r' *, *| *[\t] *| +', na_filter=False, header=None, engine='python', quoting=3, dtype=object, ) df = pharmpy.data.PharmDataFrame(df) diff_cols = len(df.columns) - len(colnames) if diff_cols > 0: df.columns = list(colnames) + [None] * diff_cols if not raw: # Remove unnamed columns df.drop(columns=[None], inplace=True) elif diff_cols < 0: if raw: df.columns = colnames[0 : len(df.columns)] else: for i in range(abs(diff_cols)): # Create empty columns. df[f'__{i}]'] = null_value # FIXME assure no name collisions here df.columns = colnames else: df.columns = colnames if coltypes is None: for label in df.columns: df.pharmpy.column_type[label] = infer_column_type(label) else: if len(coltypes) < len(df.columns): coltypes += [pharmpy.data.ColumnType.UNKNOWN] * (len(df.columns) - len(coltypes)) df.pharmpy.column_type[list(df.columns)] = coltypes df = _filter_ignore_accept(df, ignore, accept, null_value) # if 'TIME' in df.columns: # FIXME: Must handle synonyms # try: # id_label = df.pharmpy.id_label # df = df.groupby(id_label).apply(_translate_nonmem_time_and_date) # except KeyError: # df.apply(_translate_nonmem_time_and_date) # FIXME: Do not remove dropped columns for now. # if drop: # indices_to_drop = [i for i, x in enumerate(drop) if not x] # df = df.iloc[:, indices_to_drop].copy() if not raw: parse_columns = [col for col, dropped in zip(df.columns, drop) if not dropped] # FIXME: This is instead of proper handling of these columns parse_columns = [ x for x in parse_columns if x not in ['TIME', 'DATE', 'DAT1', 'DAT2', 'DAT3'] ] for column in parse_columns: df[column] = df[column].apply(_convert_data_item, args=(str(null_value),)) df = _make_ids_unique(df, parse_columns) return df
[docs]def read_csv(path_or_io, raw=False, parse_columns=tuple()): """Read a csv with header into a PharmDataFrame""" if not raw: df = pd.read_csv(path_or_io) else: df = pd.read_csv(path_or_io, dtype=str) for col in parse_columns: df[col] = pd.to_numeric(df[col]) df = pharmpy.data.PharmDataFrame(df) # Set name of PharmDataFrame in case we have a Pathlike input if isinstance(path_or_io, Path) or isinstance(path_or_io, str): path = Path(path_or_io) df.name = path.stem return df