Skip to content

Commit 97ff0e7

Browse files
Merge pull request #166 from scikit-learn-contrib/feat/parallelisation
feat/parallelisation
2 parents 3cde1b4 + d43a6e2 commit 97ff0e7

File tree

3 files changed

+681
-140
lines changed

3 files changed

+681
-140
lines changed

docs/index.rst

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,14 @@
2222
.. toctree::
2323
:maxdepth: 2
2424
:hidden:
25-
:caption: API
25+
:caption: ANALYSIS
2626

27-
api
27+
analysis
28+
examples/tutorials/plot_tuto_mcar
2829

2930
.. toctree::
3031
:maxdepth: 2
3132
:hidden:
32-
:caption: ANALYSIS
33+
:caption: API
3334

34-
analysis
35-
examples/tutorials/plot_tuto_mcar
35+
api

qolmat/benchmark/comparator.py

Lines changed: 188 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
"""Script for comparator."""
22

33
import logging
4-
from typing import Any, Dict, List, Optional
4+
from typing import Any, Dict, List, Optional, Tuple
55

66
import numpy as np
77
import pandas as pd
8+
from joblib import Parallel, cpu_count, delayed
89

910
from qolmat.benchmark import hyperparameters, metrics
1011
from qolmat.benchmark.missing_patterns import _HoleGenerator
@@ -93,99 +94,225 @@ def get_errors(
9394
df_errors = pd.concat(dict_errors.values(), keys=dict_errors.keys())
9495
return df_errors
9596

96-
def evaluate_errors_sample(
97-
self,
98-
imputer: Any,
99-
df: pd.DataFrame,
100-
dict_config_opti_imputer: Dict[str, Any] = {},
101-
metric_optim: str = "mse",
102-
) -> pd.Series:
103-
"""Evaluate the errors in the cross-validation.
97+
def process_split(
98+
self, split_data: Tuple[int, pd.DataFrame, pd.DataFrame]
99+
) -> pd.DataFrame:
100+
"""Process a split.
104101
105102
Parameters
106103
----------
107-
imputer : Any
108-
imputation model
109-
df : pd.DataFrame
110-
dataframe to impute
111-
dict_config_opti_imputer : Dict
112-
search space for tested_model's hyperparameters
113-
metric_optim : str
114-
Loss function used when imputers undergo hyperparameter
115-
optimization
104+
split_data : Tuple
105+
contains (split_idx, df_mask, df_origin)
116106
117107
Returns
118108
-------
119-
pd.Series
120-
Series with the errors for each metric and each variable
109+
pd.DataFrame
110+
errors results
121111
122112
"""
123-
list_errors = []
124-
df_origin = df[self.selected_columns].copy()
125-
for df_mask in self.generator_holes.split(df_origin):
126-
df_corrupted = df_origin.copy()
127-
df_corrupted[df_mask] = np.nan
113+
_, df_mask, df_origin = split_data
114+
df_with_holes = df_origin.copy()
115+
df_with_holes[df_mask] = np.nan
116+
117+
subset = self.generator_holes.subset
118+
if subset is None:
119+
raise ValueError(
120+
"HoleGenerator `subset` should be overwritten in split "
121+
"but it is none!"
122+
)
123+
124+
split_results = {}
125+
for imputer_name, imputer in self.dict_imputers.items():
126+
dict_config_opti_imputer = self.dict_config_opti.get(
127+
imputer_name, {}
128+
)
129+
128130
imputer_opti = hyperparameters.optimize(
129131
imputer,
130-
df,
132+
df_origin,
131133
self.generator_holes,
132-
metric_optim,
134+
self.metric_optim,
133135
dict_config_opti_imputer,
134136
max_evals=self.max_evals,
135137
verbose=self.verbose,
136138
)
137-
df_imputed = imputer_opti.fit_transform(df_corrupted)
138-
subset = self.generator_holes.subset
139-
if subset is None:
140-
raise ValueError(
141-
"HoleGenerator `subset` should be overwritten in split "
142-
"but it is none!"
143-
)
144-
df_errors = self.get_errors(
139+
140+
df_imputed = imputer_opti.fit_transform(df_with_holes)
141+
errors = self.get_errors(
142+
df_origin[subset], df_imputed[subset], df_mask[subset]
143+
)
144+
split_results[imputer_name] = errors
145+
146+
return pd.concat(split_results, axis=1)
147+
148+
def process_imputer(
149+
self, imputer_data: Tuple[str, Any, List[pd.DataFrame], pd.DataFrame]
150+
) -> Tuple[str, pd.DataFrame]:
151+
"""Process an imputer.
152+
153+
Parameters
154+
----------
155+
imputer_data : Tuple[str, Any, List[pd.DataFrame], pd.DataFrame]
156+
contains (imputer_name, imputer, all_masks, df_origin)
157+
158+
Returns
159+
-------
160+
Tuple[str, pd.DataFrame]
161+
imputer name, errors results
162+
163+
"""
164+
imputer_name, imputer, all_masks, df_origin = imputer_data
165+
166+
subset = self.generator_holes.subset
167+
if subset is None:
168+
raise ValueError(
169+
"HoleGenerator `subset` should be overwritten in split "
170+
"but it is none!"
171+
)
172+
173+
dict_config_opti_imputer = self.dict_config_opti.get(imputer_name, {})
174+
imputer_opti = hyperparameters.optimize(
175+
imputer,
176+
df_origin,
177+
self.generator_holes,
178+
self.metric_optim,
179+
dict_config_opti_imputer,
180+
max_evals=self.max_evals,
181+
verbose=self.verbose,
182+
)
183+
184+
imputer_results = []
185+
for i, df_mask in enumerate(all_masks):
186+
df_with_holes = df_origin.copy()
187+
df_with_holes[df_mask] = np.nan
188+
df_imputed = imputer_opti.fit_transform(df_with_holes)
189+
errors = self.get_errors(
145190
df_origin[subset], df_imputed[subset], df_mask[subset]
146191
)
147-
list_errors.append(df_errors)
148-
df_errors = pd.DataFrame(list_errors)
149-
errors_mean = df_errors.mean(axis=0)
192+
imputer_results.append(errors)
150193

151-
return errors_mean
194+
return imputer_name, pd.concat(imputer_results).groupby(
195+
level=[0, 1]
196+
).mean()
152197

153198
def compare(
154199
self,
155-
df: pd.DataFrame,
156-
):
157-
"""Compure different imputation methods on dataframe df.
200+
df_origin: pd.DataFrame,
201+
use_parallel: bool = True,
202+
n_jobs: int = -1,
203+
parallel_over: str = "auto",
204+
) -> pd.DataFrame:
205+
"""Compare different imputers in parallel with hyperparams opti.
158206
159207
Parameters
160208
----------
161-
df : pd.DataFrame
162-
input dataframe (for comparison)
209+
df_origin : pd.DataFrame
210+
df with missing values
211+
n_splits : int, optional
212+
number of 'splits', i.e. fake dataframe with
213+
artificial holes, by default 10
214+
use_parallel : bool, optional
215+
if parallelisation, by default True
216+
n_jobs : int, optional
217+
number of jobs to use for the parallelisation, by default -1
218+
parallel_over : str, optional
219+
'splits' or 'imputers', by default "auto"
163220
164221
Returns
165222
-------
166223
pd.DataFrame
167-
Dataframe with the metrics results, imputers are in columns
168-
and indices represent metrics and variables.
224+
DataFrame (2-level index) with results.
225+
Columsn are imputers.
226+
0-level index are the metrics.
227+
1-level index are the column names.
169228
170229
"""
171-
dict_errors = {}
230+
logging.info(
231+
f"Starting comparison for {len(self.dict_imputers)} imputers."
232+
)
233+
234+
all_splits = list(self.generator_holes.split(df_origin))
172235

173-
for name, imputer in self.dict_imputers.items():
174-
dict_config_opti_imputer = self.dict_config_opti.get(name, {})
236+
if parallel_over == "auto":
237+
parallel_over = (
238+
"splits"
239+
if len(all_splits) > len(self.dict_imputers)
240+
else "imputers"
241+
)
175242

176-
try:
177-
logging.info(f"Testing model: {name}...")
178-
dict_errors[name] = self.evaluate_errors_sample(
179-
imputer, df, dict_config_opti_imputer, self.metric_optim
243+
if use_parallel:
244+
logging.info(f"Parallelisation over: {parallel_over}...")
245+
if parallel_over == "splits":
246+
split_data = [
247+
(i, df_mask, df_origin)
248+
for i, df_mask in enumerate(all_splits)
249+
]
250+
n_jobs = self.get_optimal_n_jobs(split_data, n_jobs)
251+
results = Parallel(n_jobs=n_jobs)(
252+
delayed(self.process_split)(data) for data in split_data
180253
)
181-
logging.info("done.")
182-
except Exception as excp:
183-
logging.info(
184-
f"Error while testing {name} of type "
185-
f"{type(imputer).__name__}!"
254+
final_results = pd.concat(results).groupby(level=[0, 1]).mean()
255+
elif parallel_over == "imputers":
256+
imputer_data = [
257+
(name, imputer, all_splits, df_origin)
258+
for name, imputer in self.dict_imputers.items()
259+
]
260+
n_jobs = self.get_optimal_n_jobs(imputer_data, n_jobs)
261+
results = Parallel(n_jobs=n_jobs)(
262+
delayed(self.process_imputer)(data)
263+
for data in imputer_data
264+
)
265+
final_results = pd.concat(dict(results), axis=1)
266+
else:
267+
raise ValueError(
268+
"`parallel_over` should be `auto`, `splits` or `imputers`."
186269
)
187-
raise excp
188270

189-
df_errors = pd.DataFrame(dict_errors)
271+
else:
272+
logging.info("Sequential treatment...")
273+
if parallel_over == "splits":
274+
split_data = [
275+
(i, df_mask, df_origin)
276+
for i, df_mask in enumerate(all_splits)
277+
]
278+
results = [self.process_split(data) for data in split_data]
279+
final_results = pd.concat(results).groupby(level=[0, 1]).mean()
280+
elif parallel_over == "imputers":
281+
imputer_data = [
282+
(name, imputer, all_splits, df_origin)
283+
for name, imputer in self.dict_imputers.items()
284+
]
285+
results = [self.process_imputer(data) for data in imputer_data]
286+
final_results = pd.concat(dict(results), axis=1)
287+
else:
288+
raise ValueError(
289+
"`parallel_over` should be `auto`, `splits` or `imputers`."
290+
)
190291

191-
return df_errors
292+
logging.info("Comparison successfully terminated.")
293+
return final_results
294+
295+
@staticmethod
296+
def get_optimal_n_jobs(split_data: List, n_jobs: int = -1) -> int:
297+
"""Determine the optimal number of parallel jobs to use.
298+
299+
If `n_jobs` is specified by the user, that value is used.
300+
Otherwise, the function returns the minimum between the number of
301+
CPU cores and the number of tasks (i.e., the length of `split_data`),
302+
ensuring that no more jobs than tasks are launched.
303+
304+
Parameters
305+
----------
306+
split_data : List
307+
A collection of data to be processed in parallel.
308+
The length of this collection determines the number of tasks.
309+
n_jobs : int
310+
The number of jobs (parallel workers) to use, by default -1
311+
312+
Returns
313+
-------
314+
int
315+
The optimal number of jobs to run in parallel
316+
317+
"""
318+
return min(cpu_count(), len(split_data)) if n_jobs == -1 else n_jobs

0 commit comments

Comments
 (0)