|
| 1 | +import abc |
| 2 | +import numpy as np |
| 3 | +import pathlib |
| 4 | +import pandas as pd |
| 5 | +import petab |
| 6 | +import shutil |
| 7 | +import sympy as sp |
| 8 | +import tempfile |
| 9 | +from typing import Dict, Optional, Union |
| 10 | + |
| 11 | + |
| 12 | +class Simulator(abc.ABC): |
| 13 | + """ |
| 14 | + Base class that specific simulators should inherit. |
| 15 | + Specific simulators should minimally implement the |
| 16 | + `simulate_without_noise` method. |
| 17 | + Example (AMICI): https://bit.ly/33SUSG4 |
| 18 | +
|
| 19 | + Attributes: |
| 20 | + noise_formulas: |
| 21 | + The formulae that will be used to calculate the scale of noise |
| 22 | + distributions. |
| 23 | + petab_problem: |
| 24 | + A PEtab problem, which will be simulated. |
| 25 | + rng: |
| 26 | + A NumPy random generator, used to sample from noise distributions. |
| 27 | + temporary_working_dir: |
| 28 | + Whether `working_dir` is a temporary directory, which can be |
| 29 | + deleted without significant consequence. |
| 30 | + working_dir: |
| 31 | + All simulator-specific output files will be saved here. This |
| 32 | + directory and its contents may be modified and deleted, and |
| 33 | + should be considered ephemeral. |
| 34 | + """ |
| 35 | + def __init__(self, |
| 36 | + petab_problem: petab.Problem, |
| 37 | + working_dir: Optional[Union[pathlib.Path, str]] = None): |
| 38 | + """ |
| 39 | + Initialize the simulator with sufficient information to perform a |
| 40 | + simulation. If no working directory is specified, a temporary one is |
| 41 | + created. |
| 42 | +
|
| 43 | + Arguments: |
| 44 | + petab_problem: |
| 45 | + A PEtab problem. |
| 46 | + working_dir: |
| 47 | + All simulator-specific output files will be saved here. This |
| 48 | + directory and its contents may be modified and deleted, and |
| 49 | + should be considered ephemeral. |
| 50 | + """ |
| 51 | + self.petab_problem = petab_problem |
| 52 | + |
| 53 | + self.temporary_working_dir = False |
| 54 | + if working_dir is None: |
| 55 | + working_dir = tempfile.mkdtemp() |
| 56 | + self.temporary_working_dir = True |
| 57 | + if not isinstance(working_dir, pathlib.Path): |
| 58 | + working_dir = pathlib.Path(working_dir) |
| 59 | + self.working_dir = working_dir |
| 60 | + self.working_dir.mkdir(parents=True, exist_ok=True) |
| 61 | + |
| 62 | + self.noise_formulas = petab.calculate.get_symbolic_noise_formulas( |
| 63 | + self.petab_problem.observable_df) |
| 64 | + self.rng = np.random.default_rng() |
| 65 | + |
| 66 | + def remove_working_dir(self, force: bool = False, **kwargs) -> None: |
| 67 | + """ |
| 68 | + Remove the simulator working directory and all files within (see the |
| 69 | + `__init__` method arguments). |
| 70 | +
|
| 71 | + Arguments: |
| 72 | + force: |
| 73 | + If True, the working directory is removed regardless of |
| 74 | + whether it is a temporary directory. |
| 75 | + """ |
| 76 | + if force or self.temporary_working_dir: |
| 77 | + shutil.rmtree(self.working_dir, **kwargs) |
| 78 | + if self.working_dir.is_dir(): |
| 79 | + print('Failed to remove the working directory: ' |
| 80 | + + str(self.working_dir)) |
| 81 | + else: |
| 82 | + print('By default, specified working directories are not removed. ' |
| 83 | + 'Please call this method with `force=True`, or manually ' |
| 84 | + f'delete the working directory: {self.working_dir}') |
| 85 | + |
| 86 | + @abc.abstractmethod |
| 87 | + def simulate_without_noise(self) -> pd.DataFrame: |
| 88 | + """ |
| 89 | + Simulate a PEtab problem. This is an abstract method that should be |
| 90 | + implemented in a simulation package. Links to examples of this are in |
| 91 | + the class docstring. |
| 92 | +
|
| 93 | + Returns: |
| 94 | + Simulated data, as a PEtab measurements table, which should be |
| 95 | + equivalent to replacing all values in the `petab.C.MEASUREMENT` |
| 96 | + column of the measurements table (of the PEtab problem supplied to |
| 97 | + the `__init__` method), with simulated values. |
| 98 | + """ |
| 99 | + |
| 100 | + def simulate( |
| 101 | + self, |
| 102 | + noise: bool = False, |
| 103 | + noise_scaling_factor: float = 1, |
| 104 | + **kwargs |
| 105 | + ) -> pd.DataFrame: |
| 106 | + """Simulate a PEtab problem, optionally with noise. |
| 107 | +
|
| 108 | + Arguments: |
| 109 | + noise: If True, noise is added to simulated data. |
| 110 | + noise_scaling_factor: |
| 111 | + A multiplier of the scale of the noise distribution. |
| 112 | +
|
| 113 | + Returns: |
| 114 | + Simulated data, as a PEtab measurements table. |
| 115 | + """ |
| 116 | + simulation_df = self.simulate_without_noise(**kwargs) |
| 117 | + if noise: |
| 118 | + simulation_df = self.add_noise(simulation_df, noise_scaling_factor) |
| 119 | + return simulation_df |
| 120 | + |
| 121 | + def add_noise( |
| 122 | + self, |
| 123 | + simulation_df: pd.DataFrame, |
| 124 | + noise_scaling_factor: float = 1, |
| 125 | + ) -> pd.DataFrame: |
| 126 | + """Add noise to simulated data. |
| 127 | +
|
| 128 | + Arguments: |
| 129 | + simulation_df: |
| 130 | + A PEtab measurements table that contains simulated data. |
| 131 | + noise_scaling_factor: |
| 132 | + A multiplier of the scale of the noise distribution. |
| 133 | +
|
| 134 | + Returns: |
| 135 | + Simulated data with noise, as a PEtab measurements table. |
| 136 | + """ |
| 137 | + simulation_df_with_noise = simulation_df.copy() |
| 138 | + simulation_df_with_noise[petab.C.MEASUREMENT] = [ |
| 139 | + sample_noise( |
| 140 | + self.petab_problem, |
| 141 | + row, |
| 142 | + row[petab.C.MEASUREMENT], |
| 143 | + self.noise_formulas, |
| 144 | + self.rng, |
| 145 | + noise_scaling_factor, |
| 146 | + ) |
| 147 | + for _, row in simulation_df_with_noise.iterrows() |
| 148 | + ] |
| 149 | + return simulation_df_with_noise |
| 150 | + |
| 151 | + |
| 152 | +def sample_noise( |
| 153 | + petab_problem: petab.Problem, |
| 154 | + measurement_row: pd.Series, |
| 155 | + simulated_value: float, |
| 156 | + noise_formulas: Optional[Dict[str, sp.Expr]] = None, |
| 157 | + rng: Optional[np.random.Generator] = None, |
| 158 | + noise_scaling_factor: float = 1, |
| 159 | +) -> float: |
| 160 | + """Generate a sample from a PEtab noise distribution. |
| 161 | +
|
| 162 | + Arguments: |
| 163 | + petab_problem: |
| 164 | + The PEtab problem used to generate the simulated value. |
| 165 | + Instance of `petab.Problem`. |
| 166 | + measurement_row: |
| 167 | + The row in the PEtab problem measurement table that corresponds |
| 168 | + to the simulated value. |
| 169 | + simulated_value: |
| 170 | + A simulated value without noise. |
| 171 | + noise_formulas: |
| 172 | + Processed noise formulas from the PEtab observables table, in the |
| 173 | + form output by the `petab.calculate.get_symbolic_noise_formulas` |
| 174 | + method. |
| 175 | + rng: |
| 176 | + A NumPy random generator. |
| 177 | + noise_scaling_factor: |
| 178 | + A multiplier of the scale of the noise distribution. |
| 179 | +
|
| 180 | + Returns: |
| 181 | + The sample from the PEtab noise distribution. |
| 182 | + """ |
| 183 | + if noise_formulas is None: |
| 184 | + noise_formulas = petab.calculate.get_symbolic_noise_formulas( |
| 185 | + petab_problem.observable_df) |
| 186 | + if rng is None: |
| 187 | + rng = np.random.default_rng() |
| 188 | + |
| 189 | + noise_value = petab.calculate.evaluate_noise_formula( |
| 190 | + measurement_row, |
| 191 | + noise_formulas, |
| 192 | + petab_problem.parameter_df, |
| 193 | + simulated_value |
| 194 | + ) |
| 195 | + |
| 196 | + # default noise distribution is petab.C.NORMAL |
| 197 | + noise_distribution = ( |
| 198 | + petab_problem |
| 199 | + .observable_df |
| 200 | + .loc[measurement_row[petab.C.OBSERVABLE_ID]] |
| 201 | + .get(petab.C.NOISE_DISTRIBUTION, petab.C.NORMAL) |
| 202 | + ) |
| 203 | + |
| 204 | + # below is e.g.: `np.random.normal(loc=simulation, scale=noise_value)` |
| 205 | + return getattr(rng, noise_distribution)( |
| 206 | + loc=simulated_value, |
| 207 | + scale=noise_value * noise_scaling_factor |
| 208 | + ) |
0 commit comments