Skip to content

Commit 1233456

Browse files
committed
use joblib in comparator over imputers or splits + rearrange index.rst
1 parent bc9b4c8 commit 1233456

File tree

3 files changed

+659
-142
lines changed

3 files changed

+659
-142
lines changed

docs/index.rst

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,14 @@
2222
.. toctree::
2323
:maxdepth: 2
2424
:hidden:
25-
:caption: API
25+
:caption: ANALYSIS
2626

27-
api
27+
analysis
28+
examples/tutorials/plot_tuto_mcar
2829

2930
.. toctree::
3031
:maxdepth: 2
3132
:hidden:
32-
:caption: ANALYSIS
33+
:caption: API
3334

34-
analysis
35-
examples/tutorials/plot_tuto_mcar
35+
api

qolmat/benchmark/comparator.py

Lines changed: 172 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
"""Script for comparator."""
22

33
import logging
4-
from typing import Any, Dict, List, Optional
4+
from typing import Any, Dict, List, Optional, Tuple
55

66
import numpy as np
77
import pandas as pd
8+
from joblib import Parallel, cpu_count, delayed
89

910
from qolmat.benchmark import hyperparameters, metrics
1011
from qolmat.benchmark.missing_patterns import _HoleGenerator
@@ -93,99 +94,207 @@ def get_errors(
9394
df_errors = pd.concat(dict_errors.values(), keys=dict_errors.keys())
9495
return df_errors
9596

96-
def evaluate_errors_sample(
97-
self,
98-
imputer: Any,
99-
df: pd.DataFrame,
100-
dict_config_opti_imputer: Dict[str, Any] = {},
101-
metric_optim: str = "mse",
102-
) -> pd.Series:
103-
"""Evaluate the errors in the cross-validation.
97+
def process_split(
98+
self, split_data: Tuple[int, pd.DataFrame, pd.DataFrame]
99+
) -> pd.DataFrame:
100+
"""Process a split.
104101
105102
Parameters
106103
----------
107-
imputer : Any
108-
imputation model
109-
df : pd.DataFrame
110-
dataframe to impute
111-
dict_config_opti_imputer : Dict
112-
search space for tested_model's hyperparameters
113-
metric_optim : str
114-
Loss function used when imputers undergo hyperparameter
115-
optimization
104+
split_data : Tuple
105+
contains (split_idx, df_mask, df_origin)
116106
117107
Returns
118108
-------
119-
pd.Series
120-
Series with the errors for each metric and each variable
109+
pd.DataFrame
110+
errors results
121111
122112
"""
123-
list_errors = []
124-
df_origin = df[self.selected_columns].copy()
125-
for df_mask in self.generator_holes.split(df_origin):
126-
df_corrupted = df_origin.copy()
127-
df_corrupted[df_mask] = np.nan
113+
_, df_mask, df_origin = split_data
114+
df_with_holes = df_origin.copy()
115+
df_with_holes[df_mask] = np.nan
116+
117+
split_results = {}
118+
for imputer_name, imputer in self.dict_imputers.items():
119+
dict_config_opti_imputer = self.dict_config_opti.get(
120+
imputer_name, {}
121+
)
122+
128123
imputer_opti = hyperparameters.optimize(
129124
imputer,
130-
df,
125+
df_origin,
131126
self.generator_holes,
132-
metric_optim,
127+
self.metric_optim,
133128
dict_config_opti_imputer,
134129
max_evals=self.max_evals,
135130
verbose=self.verbose,
136131
)
137-
df_imputed = imputer_opti.fit_transform(df_corrupted)
138-
subset = self.generator_holes.subset
139-
if subset is None:
140-
raise ValueError(
141-
"HoleGenerator `subset` should be overwritten in split "
142-
"but it is none!"
143-
)
144-
df_errors = self.get_errors(
145-
df_origin[subset], df_imputed[subset], df_mask[subset]
146-
)
147-
list_errors.append(df_errors)
148-
df_errors = pd.DataFrame(list_errors)
149-
errors_mean = df_errors.mean(axis=0)
150132

151-
return errors_mean
133+
df_imputed = imputer_opti.fit_transform(df_with_holes)
134+
errors = self.get_errors(df_origin, df_imputed, df_mask)
135+
split_results[imputer_name] = errors
136+
137+
return pd.concat(split_results, axis=1)
138+
139+
def process_imputer(
140+
self, imputer_data: Tuple[str, Any, List[pd.DataFrame], pd.DataFrame]
141+
) -> Tuple[str, pd.DataFrame]:
142+
"""Process an imputer.
143+
144+
Parameters
145+
----------
146+
imputer_data : Tuple[str, Any, List[pd.DataFrame], pd.DataFrame]
147+
contains (imputer_name, imputer, all_masks, df_origin)
148+
149+
Returns
150+
-------
151+
Tuple[str, pd.DataFrame]
152+
imputer name, errors results
153+
154+
"""
155+
imputer_name, imputer, all_masks, df_origin = imputer_data
156+
157+
dict_config_opti_imputer = self.dict_config_opti.get(imputer_name, {})
158+
imputer_opti = hyperparameters.optimize(
159+
imputer,
160+
df_origin,
161+
self.generator_holes,
162+
self.metric_optim,
163+
dict_config_opti_imputer,
164+
max_evals=self.max_evals,
165+
verbose=self.verbose,
166+
)
167+
168+
imputer_results = []
169+
for i, df_mask in enumerate(all_masks):
170+
df_with_holes = df_origin.copy()
171+
df_with_holes[df_mask] = np.nan
172+
df_imputed = imputer_opti.fit_transform(df_with_holes)
173+
errors = self.get_errors(df_origin, df_imputed, df_mask)
174+
imputer_results.append(errors)
175+
176+
return imputer_name, pd.concat(imputer_results).groupby(
177+
level=[0, 1]
178+
).mean()
152179

153180
def compare(
154181
self,
155-
df: pd.DataFrame,
156-
):
157-
"""Compure different imputation methods on dataframe df.
182+
df_origin: pd.DataFrame,
183+
use_parallel: bool = True,
184+
n_jobs: int = -1,
185+
parallel_over: str = "auto",
186+
) -> pd.DataFrame:
187+
"""Compare different imputers in parallel with hyperparams opti.
158188
159189
Parameters
160190
----------
161-
df : pd.DataFrame
162-
input dataframe (for comparison)
191+
df_origin : pd.DataFrame
192+
df with missing values
193+
n_splits : int, optional
194+
number of 'splits', i.e. fake dataframe with
195+
artificial holes, by default 10
196+
use_parallel : bool, optional
197+
if parallelisation, by default True
198+
n_jobs : int, optional
199+
number of jobs to use for the parallelisation, by default -1
200+
parallel_over : str, optional
201+
'splits' or 'imputers', by default "auto"
163202
164203
Returns
165204
-------
166205
pd.DataFrame
167-
Dataframe with the metrics results, imputers are in columns
168-
and indices represent metrics and variables.
206+
DataFrame (2-level index) with results.
207+
Columsn are imputers.
208+
0-level index are the metrics.
209+
1-level index are the column names.
169210
170211
"""
171-
dict_errors = {}
212+
logging.info(
213+
f"Starting comparison for {len(self.dict_imputers)} imputers."
214+
)
172215

173-
for name, imputer in self.dict_imputers.items():
174-
dict_config_opti_imputer = self.dict_config_opti.get(name, {})
216+
all_splits = list(self.generator_holes.split(df_origin))
175217

176-
try:
177-
logging.info(f"Testing model: {name}...")
178-
dict_errors[name] = self.evaluate_errors_sample(
179-
imputer, df, dict_config_opti_imputer, self.metric_optim
218+
if parallel_over == "auto":
219+
parallel_over = (
220+
"splits"
221+
if len(all_splits) > len(self.dict_imputers)
222+
else "imputers"
223+
)
224+
225+
if use_parallel:
226+
logging.info(f"Parallelisation over: {parallel_over}...")
227+
if parallel_over == "splits":
228+
split_data = [
229+
(i, df_mask, df_origin)
230+
for i, df_mask in enumerate(all_splits)
231+
]
232+
n_jobs = self.get_optimal_n_jobs(split_data, n_jobs)
233+
results = Parallel(n_jobs=n_jobs)(
234+
delayed(self.process_split)(data) for data in split_data
235+
)
236+
final_results = pd.concat(results).groupby(level=[0, 1]).mean()
237+
elif parallel_over == "imputers":
238+
imputer_data = [
239+
(name, imputer, all_splits, df_origin)
240+
for name, imputer in self.dict_imputers.items()
241+
]
242+
n_jobs = self.get_optimal_n_jobs(imputer_data, n_jobs)
243+
results = Parallel(n_jobs=n_jobs)(
244+
delayed(self.process_imputer)(data)
245+
for data in imputer_data
180246
)
181-
logging.info("done.")
182-
except Exception as excp:
183-
logging.info(
184-
f"Error while testing {name} of type "
185-
f"{type(imputer).__name__}!"
247+
final_results = pd.concat(dict(results), axis=1)
248+
else:
249+
raise ValueError(
250+
"`parallel_over` should be `auto`, `splits` or `imputers`."
251+
)
252+
253+
else:
254+
logging.info("Sequential treatment...")
255+
if parallel_over == "splits":
256+
split_data = [
257+
(i, df_mask, df_origin)
258+
for i, df_mask in enumerate(all_splits)
259+
]
260+
results = [self.process_split(data) for data in split_data]
261+
final_results = pd.concat(results).groupby(level=[0, 1]).mean()
262+
elif parallel_over == "imputers":
263+
imputer_data = [
264+
(name, imputer, all_splits, df_origin)
265+
for name, imputer in self.dict_imputers.items()
266+
]
267+
results = [self.process_imputer(data) for data in imputer_data]
268+
final_results = pd.concat(dict(results), axis=1)
269+
else:
270+
raise ValueError(
271+
"`parallel_over` should be `auto`, `splits` or `imputers`."
186272
)
187-
raise excp
188273

189-
df_errors = pd.DataFrame(dict_errors)
274+
logging.info("Comparison successfully terminated.")
275+
return final_results
190276

191-
return df_errors
277+
@staticmethod
278+
def get_optimal_n_jobs(split_data: List, n_jobs: int = -1) -> int:
279+
"""Determine the optimal number of parallel jobs to use.
280+
281+
If `n_jobs` is specified by the user, that value is used.
282+
Otherwise, the function returns the minimum between the number of
283+
CPU cores and the number of tasks (i.e., the length of `split_data`),
284+
ensuring that no more jobs than tasks are launched.
285+
286+
Parameters
287+
----------
288+
split_data : List
289+
A collection of data to be processed in parallel.
290+
The length of this collection determines the number of tasks.
291+
n_jobs : int
292+
The number of jobs (parallel workers) to use, by default -1
293+
294+
Returns
295+
-------
296+
int
297+
The optimal number of jobs to run in parallel
298+
299+
"""
300+
return min(cpu_count(), len(split_data)) if n_jobs == -1 else n_jobs

0 commit comments

Comments
 (0)