-
Notifications
You must be signed in to change notification settings - Fork 7
Misc fixes & annotations #450
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
a616088
8da8ffa
2df6ab3
2207165
6c4911b
4481491
8fc20e3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -69,8 +69,20 @@ | |
| "Parameter", | ||
| "ParameterScale", | ||
| "ParameterTable", | ||
| "flatten_timepoint_specific_output_overrides", | ||
| "unflatten_simulation_df", | ||
| ] | ||
|
|
||
| _POSSIBLE_GROUPVARS_FLATTENED_PROBLEM = [ | ||
| C.MODEL_ID, | ||
| C.EXPERIMENT_ID, | ||
| C.OBSERVABLE_ID, | ||
| C.OBSERVABLE_PARAMETERS, | ||
| C.NOISE_PARAMETERS, | ||
| ] | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| def _is_finite_or_neg_inf(v: float, info: ValidationInfo) -> float: | ||
| if not np.isfinite(v) and v != -np.inf: | ||
|
|
@@ -1143,7 +1155,11 @@ def __str__(self): | |
| f"{observables}, {measurements}, {parameters}" | ||
| ) | ||
|
|
||
| def __getitem__(self, key): | ||
| def __getitem__( | ||
| self, key | ||
| ) -> ( | ||
| Condition | Experiment | Observable | Measurement | Parameter | Mapping | ||
| ): | ||
| """Get PEtab entity by ID. | ||
|
|
||
| This allows accessing PEtab entities such as conditions, experiments, | ||
|
|
@@ -2320,7 +2336,9 @@ def get_output_parameters( | |
| # filter out symbols that are defined in the model or mapped to | ||
| # such symbols | ||
| for candidate in sorted(candidates): | ||
| if self.model.symbol_allowed_in_observable_formula(candidate): | ||
| if self.model and self.model.symbol_allowed_in_observable_formula( | ||
| candidate | ||
| ): | ||
| continue | ||
|
|
||
| # does it map to a model entity? | ||
|
|
@@ -2329,8 +2347,11 @@ def get_output_parameters( | |
| mapping.petab_id == candidate | ||
| and mapping.model_id is not None | ||
| ): | ||
| if self.model.symbol_allowed_in_observable_formula( | ||
| mapping.model_id | ||
| if ( | ||
| self.model | ||
| and self.model.symbol_allowed_in_observable_formula( | ||
| mapping.model_id | ||
| ) | ||
| ): | ||
| break | ||
| else: | ||
|
|
@@ -2339,6 +2360,71 @@ def get_output_parameters( | |
|
|
||
| return output_parameters | ||
|
|
||
| def has_timepoint_specific_overrides( | ||
| self, | ||
| ignore_scalar_numeric_noise_parameters: bool = False, | ||
| ignore_scalar_numeric_observable_parameters: bool = False, | ||
dweindl marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| ) -> bool: | ||
| """Check if the measurements have timepoint-specific observable or | ||
| noise parameter overrides. | ||
|
|
||
| :param ignore_scalar_numeric_noise_parameters: | ||
| ignore scalar numeric assignments to noiseParameter placeholders | ||
|
|
||
| :param ignore_scalar_numeric_observable_parameters: | ||
| ignore scalar numeric assignments to observableParameter | ||
| placeholders | ||
|
|
||
| :return: True if the problem has timepoint-specific overrides, False | ||
| otherwise. | ||
| """ | ||
| if not self.measurements: | ||
| return False | ||
|
|
||
| from ..v1.core import get_notnull_columns | ||
| from ..v1.lint import is_scalar_float | ||
dweindl marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| measurement_df = self.measurement_df | ||
|
||
|
|
||
| # mask numeric values | ||
| for col, allow_scalar_numeric in [ | ||
| ( | ||
| C.OBSERVABLE_PARAMETERS, | ||
| ignore_scalar_numeric_observable_parameters, | ||
| ), | ||
| (C.NOISE_PARAMETERS, ignore_scalar_numeric_noise_parameters), | ||
| ]: | ||
| if col not in measurement_df: | ||
| continue | ||
|
|
||
| measurement_df[col] = measurement_df[col].apply(str) | ||
|
|
||
| if allow_scalar_numeric: | ||
| measurement_df.loc[ | ||
| measurement_df[col].apply(is_scalar_float), col | ||
| ] = "" | ||
|
|
||
| grouping_cols = get_notnull_columns( | ||
| measurement_df, | ||
| _POSSIBLE_GROUPVARS_FLATTENED_PROBLEM, | ||
| ) | ||
| grouped_df = measurement_df.groupby(grouping_cols, dropna=False) | ||
|
|
||
| grouping_cols = get_notnull_columns( | ||
| measurement_df, | ||
| [ | ||
| C.MODEL_ID, | ||
| C.OBSERVABLE_ID, | ||
| C.EXPERIMENT_ID, | ||
| ], | ||
| ) | ||
| grouped_df2 = measurement_df.groupby(grouping_cols) | ||
|
|
||
| # data frame has timepoint specific overrides if grouping by noise | ||
| # parameters and observable parameters in addition to observable and | ||
| # experiment id yields more groups | ||
| return len(grouped_df) != len(grouped_df2) | ||
|
|
||
|
|
||
| class ModelFile(BaseModel): | ||
| """A file in the PEtab problem configuration.""" | ||
|
|
@@ -2457,3 +2543,169 @@ def format_version_tuple(self) -> tuple[int, int, int, str]: | |
| """The format version as a tuple of major/minor/patch `int`s and a | ||
| suffix.""" | ||
| return parse_version(self.format_version) | ||
|
|
||
|
|
||
| def _get_flattened_id_mappings( | ||
| petab_problem: Problem, | ||
| ) -> dict[str, str]: | ||
| """Get mapping from flattened to unflattened observable IDs. | ||
|
|
||
| :param petab_problem: | ||
| The unflattened PEtab problem. | ||
|
|
||
| :returns: | ||
| A mapping from flattened ID to original observable ID. | ||
| """ | ||
| from ..v1.core import ( | ||
| get_notnull_columns, | ||
| get_observable_replacement_id, | ||
| ) | ||
|
|
||
| groupvars = get_notnull_columns( | ||
| petab_problem.measurement_df, _POSSIBLE_GROUPVARS_FLATTENED_PROBLEM | ||
| ) | ||
| mappings: dict[str, str] = {} | ||
|
|
||
| old_observable_ids = {obs.id for obs in petab_problem.observables} | ||
| for groupvar, _ in petab_problem.measurement_df.groupby( | ||
| groupvars, dropna=False | ||
| ): | ||
| observable_id = groupvar[groupvars.index(C.OBSERVABLE_ID)] | ||
| observable_replacement_id = get_observable_replacement_id( | ||
| groupvars, groupvar | ||
| ) | ||
|
|
||
| logger.debug(f"Creating synthetic observable {observable_id}") | ||
| if ( | ||
| observable_id != observable_replacement_id | ||
| and observable_replacement_id in old_observable_ids | ||
| ): | ||
| raise RuntimeError( | ||
| "could not create synthetic observables " | ||
| f"since {observable_replacement_id} was " | ||
| "already present in observable table" | ||
| ) | ||
|
|
||
| mappings[observable_replacement_id] = observable_id | ||
|
|
||
| return mappings | ||
|
|
||
|
|
||
| def flatten_timepoint_specific_output_overrides( | ||
| petab_problem: Problem, | ||
| ) -> None: | ||
| """Flatten timepoint-specific output parameter overrides. | ||
|
|
||
| If the PEtab problem definition has timepoint-specific | ||
| `observableParameters` or `noiseParameters` for the same observable, | ||
| replace those by replicating the respective observable. | ||
|
|
||
| This is a helper function for some tools which may not support such | ||
| timepoint-specific mappings. The observable table and measurement table | ||
| are modified in place. | ||
|
|
||
| :param petab_problem: | ||
| PEtab problem to work on. Modified in place. | ||
| """ | ||
| from ..v1.core import ( | ||
| get_notnull_columns, | ||
| get_observable_replacement_id, | ||
| ) | ||
|
|
||
| # Update observables | ||
| def create_new_observable(old_id, new_id) -> Observable: | ||
| if old_id not in petab_problem.observable_df.index: | ||
| raise ValueError( | ||
| f"Observable {old_id} not found in observable table." | ||
| ) | ||
|
|
||
| # copy original observable and update ID | ||
| observable: Observable = copy.deepcopy(petab_problem[old_id]) | ||
| observable.id = new_id | ||
|
|
||
| # update placeholders | ||
| old_obs_placeholders = observable.observable_placeholders or [] | ||
| old_noise_placeholders = observable.noise_placeholders or [] | ||
dweindl marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| suffix = new_id.removeprefix(old_id) | ||
| observable.observable_placeholders = [ | ||
| f"{sym.name}{suffix}" for sym in observable.observable_placeholders | ||
| ] | ||
| observable.noise_placeholders = [ | ||
| f"{sym.name}{suffix}" for sym in observable.noise_placeholders | ||
| ] | ||
|
|
||
| # placeholders in formulas | ||
| subs = dict( | ||
| zip( | ||
| old_obs_placeholders, | ||
| observable.observable_placeholders, | ||
| strict=True, | ||
| ) | ||
| ) | ||
| observable.formula = observable.formula.subs(subs) | ||
| subs |= dict( | ||
| zip( | ||
| old_noise_placeholders, | ||
| observable.noise_placeholders, | ||
| strict=True, | ||
| ) | ||
| ) | ||
| observable.noise_formula = observable.noise_formula.subs(subs) | ||
|
|
||
| return observable | ||
|
|
||
| mappings = _get_flattened_id_mappings(petab_problem) | ||
|
|
||
| petab_problem.observable_tables = [ | ||
| ObservableTable( | ||
| [ | ||
| create_new_observable(old_id, new_id) | ||
| for new_id, old_id in mappings.items() | ||
| ] | ||
| ) | ||
| ] | ||
|
|
||
| # Update measurements | ||
| groupvars = get_notnull_columns( | ||
| petab_problem.measurement_df, _POSSIBLE_GROUPVARS_FLATTENED_PROBLEM | ||
| ) | ||
| for measurement_table in petab_problem.measurement_tables: | ||
| for measurement in measurement_table.measurements: | ||
| # TODO: inefficient, but ok for a start | ||
| group_vals = ( | ||
| MeasurementTable([measurement]) | ||
| .to_df() | ||
| .iloc[0][groupvars] | ||
| .tolist() | ||
| ) | ||
| new_obs_id = get_observable_replacement_id(groupvars, group_vals) | ||
| measurement.observable_id = new_obs_id | ||
|
|
||
|
|
||
| def unflatten_simulation_df( | ||
| simulation_df: pd.DataFrame, | ||
| petab_problem: petab.problem.Problem, | ||
| ) -> pd.DataFrame: | ||
| """Unflatten simulations from a flattened PEtab problem. | ||
|
|
||
| A flattened PEtab problem is the output of applying | ||
| :func:`flatten_timepoint_specific_output_overrides` to a PEtab problem. | ||
|
|
||
| :param simulation_df: | ||
| The simulation dataframe. A dataframe in the same format as a PEtab | ||
| measurements table, but with the ``measurement`` column switched | ||
| with a ``simulation`` column. | ||
| :param petab_problem: | ||
| The unflattened PEtab problem. | ||
|
|
||
| :returns: | ||
| The simulation dataframe for the unflattened PEtab problem. | ||
| """ | ||
| mappings = _get_flattened_id_mappings(petab_problem) | ||
| original_observable_ids = simulation_df[C.OBSERVABLE_ID].replace(mappings) | ||
| unflattened_simulation_df = simulation_df.assign( | ||
| **{ | ||
| C.OBSERVABLE_ID: original_observable_ids, | ||
| } | ||
| ) | ||
| return unflattened_simulation_df | ||
Uh oh!
There was an error while loading. Please reload this page.