Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions petab/v1/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,15 +133,18 @@ def get_notnull_columns(df: pd.DataFrame, candidates: Iterable):
]


def get_observable_replacement_id(groupvars, groupvar) -> str:
def get_observable_replacement_id(
groupvars: list[str], groupvar: Sequence
) -> str:
"""Get the replacement ID for an observable.

Arguments:
groupvars:
The columns of a PEtab measurement table that should be unique
between observables in a flattened PEtab problem.
groupvar:
A specific grouping of `groupvars`.
A specific grouping of `groupvars`. Same length and order as
`groupvars`.

Returns:
The observable replacement ID.
Expand Down
260 changes: 256 additions & 4 deletions petab/v2/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,20 @@
"Parameter",
"ParameterScale",
"ParameterTable",
"flatten_timepoint_specific_output_overrides",
"unflatten_simulation_df",
]

_POSSIBLE_GROUPVARS_FLATTENED_PROBLEM = [
C.MODEL_ID,
C.EXPERIMENT_ID,
C.OBSERVABLE_ID,
C.OBSERVABLE_PARAMETERS,
C.NOISE_PARAMETERS,
]

logger = logging.getLogger(__name__)


def _is_finite_or_neg_inf(v: float, info: ValidationInfo) -> float:
if not np.isfinite(v) and v != -np.inf:
Expand Down Expand Up @@ -1143,7 +1155,11 @@ def __str__(self):
f"{observables}, {measurements}, {parameters}"
)

def __getitem__(self, key):
def __getitem__(
self, key
) -> (
Condition | Experiment | Observable | Measurement | Parameter | Mapping
):
"""Get PEtab entity by ID.

This allows accessing PEtab entities such as conditions, experiments,
Expand Down Expand Up @@ -2320,7 +2336,9 @@ def get_output_parameters(
# filter out symbols that are defined in the model or mapped to
# such symbols
for candidate in sorted(candidates):
if self.model.symbol_allowed_in_observable_formula(candidate):
if self.model and self.model.symbol_allowed_in_observable_formula(
candidate
):
continue

# does it map to a model entity?
Expand All @@ -2329,8 +2347,11 @@ def get_output_parameters(
mapping.petab_id == candidate
and mapping.model_id is not None
):
if self.model.symbol_allowed_in_observable_formula(
mapping.model_id
if (
self.model
and self.model.symbol_allowed_in_observable_formula(
mapping.model_id
)
):
break
else:
Expand All @@ -2339,6 +2360,71 @@ def get_output_parameters(

return output_parameters

def has_timepoint_specific_overrides(
self,
ignore_scalar_numeric_noise_parameters: bool = False,
ignore_scalar_numeric_observable_parameters: bool = False,
) -> bool:
"""Check if the measurements have timepoint-specific observable or
noise parameter overrides.

:param ignore_scalar_numeric_noise_parameters:
ignore scalar numeric assignments to noiseParameter placeholders

:param ignore_scalar_numeric_observable_parameters:
ignore scalar numeric assignments to observableParameter
placeholders

:return: True if the problem has timepoint-specific overrides, False
otherwise.
"""
if not self.measurements:
return False

from ..v1.core import get_notnull_columns
from ..v1.lint import is_scalar_float

measurement_df = self.measurement_df
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe these should be renamed to Problem.get_measurement_df to avoid confusing v1 PEtab users when they switch to v2. Otherwise they might try editing Problem.measurement_df directly like in v1 and wonder why their problem isn't changing. I think I might have been the one to suggest implementing Problem.measurement_df though...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tracked at #409. No strong opinion on this one. I think it could also be addressed through documentation.


# mask numeric values
for col, allow_scalar_numeric in [
(
C.OBSERVABLE_PARAMETERS,
ignore_scalar_numeric_observable_parameters,
),
(C.NOISE_PARAMETERS, ignore_scalar_numeric_noise_parameters),
]:
if col not in measurement_df:
continue

measurement_df[col] = measurement_df[col].apply(str)

if allow_scalar_numeric:
measurement_df.loc[
measurement_df[col].apply(is_scalar_float), col
] = ""

grouping_cols = get_notnull_columns(
measurement_df,
_POSSIBLE_GROUPVARS_FLATTENED_PROBLEM,
)
grouped_df = measurement_df.groupby(grouping_cols, dropna=False)

grouping_cols = get_notnull_columns(
measurement_df,
[
C.MODEL_ID,
C.OBSERVABLE_ID,
C.EXPERIMENT_ID,
],
)
grouped_df2 = measurement_df.groupby(grouping_cols)

# data frame has timepoint specific overrides if grouping by noise
# parameters and observable parameters in addition to observable and
# experiment id yields more groups
return len(grouped_df) != len(grouped_df2)


class ModelFile(BaseModel):
"""A file in the PEtab problem configuration."""
Expand Down Expand Up @@ -2457,3 +2543,169 @@ def format_version_tuple(self) -> tuple[int, int, int, str]:
"""The format version as a tuple of major/minor/patch `int`s and a
suffix."""
return parse_version(self.format_version)


def _get_flattened_id_mappings(
petab_problem: Problem,
) -> dict[str, str]:
"""Get mapping from flattened to unflattenedobservable IDs.

:param petab_problem:
The unflattened PEtab problem.

:returns:
A mapping from original observable ID to flattened ID.
"""
from ..v1.core import (
get_notnull_columns,
get_observable_replacement_id,
)

groupvars = get_notnull_columns(
petab_problem.measurement_df, _POSSIBLE_GROUPVARS_FLATTENED_PROBLEM
)
mappings: dict[str, str] = {}

old_observable_ids = {obs.id for obs in petab_problem.observables}
for groupvar, _ in petab_problem.measurement_df.groupby(
groupvars, dropna=False
):
observable_id = groupvar[groupvars.index(C.OBSERVABLE_ID)]
observable_replacement_id = get_observable_replacement_id(
groupvars, groupvar
)

logger.debug(f"Creating synthetic observable {observable_id}")
if (
observable_id != observable_replacement_id
and observable_replacement_id in old_observable_ids
):
raise RuntimeError(
"could not create synthetic observables "
f"since {observable_replacement_id} was "
"already present in observable table"
)

mappings[observable_replacement_id] = observable_id

return mappings


def flatten_timepoint_specific_output_overrides(
petab_problem: Problem,
) -> None:
"""Flatten timepoint-specific output parameter overrides.

If the PEtab problem definition has timepoint-specific
`observableParameters` or `noiseParameters` for the same observable,
replace those by replicating the respective observable.

This is a helper function for some tools which may not support such
timepoint-specific mappings. The observable table and measurement table
are modified in place.

:param petab_problem:
PEtab problem to work on. Modified in place.
"""
from ..v1.core import (
get_notnull_columns,
get_observable_replacement_id,
)

# Update observables
def create_new_observable(old_id, new_id) -> Observable:
if old_id not in petab_problem.observable_df.index:
raise ValueError(
f"Observable {old_id} not found in observable table."
)

# copy original observable and update ID
observable: Observable = copy.deepcopy(petab_problem[old_id])
observable.id = new_id

# update placeholders
old_obs_placeholders = observable.observable_placeholders or []
old_noise_placeholders = observable.noise_placeholders or []
suffix = new_id.removeprefix(old_id)
observable.observable_placeholders = [
f"{sym.name}{suffix}" for sym in observable.observable_placeholders
]
observable.noise_placeholders = [
f"{sym.name}{suffix}" for sym in observable.noise_placeholders
]

# placeholders in formulas
subs = dict(
zip(
old_obs_placeholders,
observable.observable_placeholders,
strict=False,
)
)
observable.formula = observable.formula.subs(subs)
subs |= dict(
zip(
old_noise_placeholders,
observable.noise_placeholders,
strict=False,
)
)
observable.noise_formula = observable.noise_formula.subs(subs)

return observable

mappings = _get_flattened_id_mappings(petab_problem)

petab_problem.observable_tables = [
ObservableTable(
[
create_new_observable(old_id, new_id)
for new_id, old_id in mappings.items()
]
)
]

# Update measurements
groupvars = get_notnull_columns(
petab_problem.measurement_df, _POSSIBLE_GROUPVARS_FLATTENED_PROBLEM
)
for measurement_table in petab_problem.measurement_tables:
for measurement in measurement_table.measurements:
# TODO: inefficient, but ok for a start
group_vals = (
MeasurementTable([measurement])
.to_df()
.iloc[0][groupvars]
.tolist()
)
new_obs_id = get_observable_replacement_id(groupvars, group_vals)
measurement.observable_id = new_obs_id


def unflatten_simulation_df(
simulation_df: pd.DataFrame,
petab_problem: petab.problem.Problem,
) -> pd.DataFrame:
"""Unflatten simulations from a flattened PEtab problem.

A flattened PEtab problem is the output of applying
:func:`flatten_timepoint_specific_output_overrides` to a PEtab problem.

:param simulation_df:
The simulation dataframe. A dataframe in the same format as a PEtab
measurements table, but with the ``measurement`` column switched
with a ``simulation`` column.
:param petab_problem:
The unflattened PEtab problem.

:returns:
The simulation dataframe for the unflattened PEtab problem.
"""
mappings = _get_flattened_id_mappings(petab_problem)
original_observable_ids = simulation_df[C.OBSERVABLE_ID].replace(mappings)
unflattened_simulation_df = simulation_df.assign(
**{
C.OBSERVABLE_ID: original_observable_ids,
}
)
return unflattened_simulation_df
Loading