Skip to content

Commit 4ff66ed

Browse files
Neeratyoymfeurer
andauthored
Parallel evaluation of tasks (#1020)
* Black fix + removal of untested unit test * Black fix * More unit tests * Docstrings + unit test robustness * Skipping unit test for lower sklearn versions * Skipping unit test for lower sklearn versions * Refining unit tests * fix merge conflict Co-authored-by: Matthias Feurer <[email protected]>
1 parent ff7a251 commit 4ff66ed

File tree

4 files changed

+281
-52
lines changed

4 files changed

+281
-52
lines changed

openml/config.py

Lines changed: 33 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ def stop_using_configuration_for_example(cls):
177177
cls._start_last_called = False
178178

179179

180-
def _setup():
180+
def _setup(config=None):
181181
"""Setup openml package. Called on first import.
182182
183183
Reads the config file and sets up apikey, server, cache appropriately.
@@ -220,13 +220,27 @@ def _setup():
220220
"not working properly." % config_dir
221221
)
222222

223-
config = _parse_config(config_file)
224-
apikey = config.get("FAKE_SECTION", "apikey")
225-
server = config.get("FAKE_SECTION", "server")
223+
if config is None:
224+
config = _parse_config(config_file)
226225

227-
cache_dir = config.get("FAKE_SECTION", "cachedir")
228-
cache_directory = os.path.expanduser(cache_dir)
226+
def _get(config, key):
227+
return config.get("FAKE_SECTION", key)
229228

229+
avoid_duplicate_runs = config.getboolean("FAKE_SECTION", "avoid_duplicate_runs")
230+
else:
231+
232+
def _get(config, key):
233+
return config.get(key)
234+
235+
avoid_duplicate_runs = config.get("avoid_duplicate_runs")
236+
237+
apikey = _get(config, "apikey")
238+
server = _get(config, "server")
239+
short_cache_dir = _get(config, "cachedir")
240+
connection_n_retries = int(_get(config, "connection_n_retries"))
241+
max_retries = int(_get(config, "max_retries"))
242+
243+
cache_directory = os.path.expanduser(short_cache_dir)
230244
# create the cache subdirectory
231245
if not os.path.exists(cache_directory):
232246
try:
@@ -237,9 +251,6 @@ def _setup():
237251
"OpenML-Python not working properly." % cache_directory
238252
)
239253

240-
avoid_duplicate_runs = config.getboolean("FAKE_SECTION", "avoid_duplicate_runs")
241-
connection_n_retries = int(config.get("FAKE_SECTION", "connection_n_retries"))
242-
max_retries = int(config.get("FAKE_SECTION", "max_retries"))
243254
if connection_n_retries > max_retries:
244255
raise ValueError(
245256
"A higher number of retries than {} is not allowed to keep the "
@@ -268,6 +279,17 @@ def _parse_config(config_file: str):
268279
return config
269280

270281

282+
def get_config_as_dict():
283+
config = dict()
284+
config["apikey"] = apikey
285+
config["server"] = server
286+
config["cachedir"] = cache_directory
287+
config["avoid_duplicate_runs"] = avoid_duplicate_runs
288+
config["connection_n_retries"] = connection_n_retries
289+
config["max_retries"] = max_retries
290+
return config
291+
292+
271293
def get_cache_directory():
272294
"""Get the current cache directory.
273295
@@ -307,11 +329,13 @@ def set_cache_directory(cachedir):
307329
)
308330
stop_using_configuration_for_example = ConfigurationForExamples.stop_using_configuration_for_example
309331

332+
310333
__all__ = [
311334
"get_cache_directory",
312335
"set_cache_directory",
313336
"start_using_configuration_for_example",
314337
"stop_using_configuration_for_example",
338+
"get_config_as_dict",
315339
]
316340

317341
_setup()

openml/runs/functions.py

Lines changed: 108 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import xmltodict
1313
import numpy as np
1414
import pandas as pd
15+
from joblib.parallel import Parallel, delayed
1516

1617
import openml
1718
import openml.utils
@@ -54,6 +55,7 @@ def run_model_on_task(
5455
upload_flow: bool = False,
5556
return_flow: bool = False,
5657
dataset_format: str = "dataframe",
58+
n_jobs: Optional[int] = None,
5759
) -> Union[OpenMLRun, Tuple[OpenMLRun, OpenMLFlow]]:
5860
"""Run the model on the dataset defined by the task.
5961
@@ -84,6 +86,10 @@ def run_model_on_task(
8486
dataset_format : str (default='dataframe')
8587
If 'array', the dataset is passed to the model as a numpy array.
8688
If 'dataframe', the dataset is passed to the model as a pandas dataframe.
89+
n_jobs : int (default=None)
90+
The number of processes/threads to distribute the evaluation asynchronously.
91+
If `None` or `1`, then the evaluation is treated as synchronous and processed sequentially.
92+
If `-1`, then the job uses as many cores available.
8793
8894
Returns
8995
-------
@@ -131,6 +137,7 @@ def get_task_and_type_conversion(task: Union[int, str, OpenMLTask]) -> OpenMLTas
131137
add_local_measures=add_local_measures,
132138
upload_flow=upload_flow,
133139
dataset_format=dataset_format,
140+
n_jobs=n_jobs,
134141
)
135142
if return_flow:
136143
return run, flow
@@ -146,6 +153,7 @@ def run_flow_on_task(
146153
add_local_measures: bool = True,
147154
upload_flow: bool = False,
148155
dataset_format: str = "dataframe",
156+
n_jobs: Optional[int] = None,
149157
) -> OpenMLRun:
150158

151159
"""Run the model provided by the flow on the dataset defined by task.
@@ -181,6 +189,10 @@ def run_flow_on_task(
181189
dataset_format : str (default='dataframe')
182190
If 'array', the dataset is passed to the model as a numpy array.
183191
If 'dataframe', the dataset is passed to the model as a pandas dataframe.
192+
n_jobs : int (default=None)
193+
The number of processes/threads to distribute the evaluation asynchronously.
194+
If `None` or `1`, then the evaluation is treated as synchronous and processed sequentially.
195+
If `-1`, then the job uses as many cores available.
184196
185197
Returns
186198
-------
@@ -265,6 +277,7 @@ def run_flow_on_task(
265277
extension=flow.extension,
266278
add_local_measures=add_local_measures,
267279
dataset_format=dataset_format,
280+
n_jobs=n_jobs,
268281
)
269282

270283
data_content, trace, fold_evaluations, sample_evaluations = res
@@ -425,6 +438,7 @@ def _run_task_get_arffcontent(
425438
extension: "Extension",
426439
add_local_measures: bool,
427440
dataset_format: str,
441+
n_jobs: int = None,
428442
) -> Tuple[
429443
List[List],
430444
Optional[OpenMLRunTrace],
@@ -447,55 +461,37 @@ def _run_task_get_arffcontent(
447461
# methods, less maintenance, less confusion)
448462
num_reps, num_folds, num_samples = task.get_split_dimensions()
449463

464+
jobs = []
450465
for n_fit, (rep_no, fold_no, sample_no) in enumerate(
451466
itertools.product(range(num_reps), range(num_folds), range(num_samples),), start=1
452467
):
453-
454-
train_indices, test_indices = task.get_train_test_split_indices(
455-
repeat=rep_no, fold=fold_no, sample=sample_no
456-
)
457-
if isinstance(task, OpenMLSupervisedTask):
458-
x, y = task.get_X_and_y(dataset_format=dataset_format)
459-
if dataset_format == "dataframe":
460-
train_x = x.iloc[train_indices]
461-
train_y = y.iloc[train_indices]
462-
test_x = x.iloc[test_indices]
463-
test_y = y.iloc[test_indices]
464-
else:
465-
train_x = x[train_indices]
466-
train_y = y[train_indices]
467-
test_x = x[test_indices]
468-
test_y = y[test_indices]
469-
elif isinstance(task, OpenMLClusteringTask):
470-
x = task.get_X(dataset_format=dataset_format)
471-
if dataset_format == "dataframe":
472-
train_x = x.iloc[train_indices]
473-
else:
474-
train_x = x[train_indices]
475-
train_y = None
476-
test_x = None
477-
test_y = None
478-
else:
479-
raise NotImplementedError(task.task_type)
480-
481-
config.logger.info(
482-
"Going to execute flow '%s' on task %d for repeat %d fold %d sample %d.",
483-
flow.name,
484-
task.task_id,
485-
rep_no,
486-
fold_no,
487-
sample_no,
488-
)
489-
490-
pred_y, proba_y, user_defined_measures_fold, trace = extension._run_model_on_fold(
468+
jobs.append((n_fit, rep_no, fold_no, sample_no))
469+
470+
# The forked child process may not copy the configuration state of OpenML from the parent.
471+
# Current configuration setup needs to be copied and passed to the child processes.
472+
_config = config.get_config_as_dict()
473+
# Execute runs in parallel
474+
# assuming the same number of tasks as workers (n_jobs), the total compute time for this
475+
# statement will be similar to the slowest run
476+
job_rvals = Parallel(verbose=0, n_jobs=n_jobs)(
477+
delayed(_run_task_get_arffcontent_parallel_helper)(
478+
extension=extension,
479+
flow=flow,
480+
fold_no=fold_no,
491481
model=model,
492-
task=task,
493-
X_train=train_x,
494-
y_train=train_y,
495482
rep_no=rep_no,
496-
fold_no=fold_no,
497-
X_test=test_x,
483+
sample_no=sample_no,
484+
task=task,
485+
dataset_format=dataset_format,
486+
configuration=_config,
498487
)
488+
for n_fit, rep_no, fold_no, sample_no in jobs
489+
) # job_rvals contain the output of all the runs with one-to-one correspondence with `jobs`
490+
491+
for n_fit, rep_no, fold_no, sample_no in jobs:
492+
pred_y, proba_y, test_indices, test_y, trace, user_defined_measures_fold = job_rvals[
493+
n_fit - 1
494+
]
499495
if trace is not None:
500496
traces.append(trace)
501497

@@ -615,6 +611,75 @@ def _calculate_local_measure(sklearn_fn, openml_name):
615611
)
616612

617613

614+
def _run_task_get_arffcontent_parallel_helper(
615+
extension: "Extension",
616+
flow: OpenMLFlow,
617+
fold_no: int,
618+
model: Any,
619+
rep_no: int,
620+
sample_no: int,
621+
task: OpenMLTask,
622+
dataset_format: str,
623+
configuration: Dict = None,
624+
) -> Tuple[
625+
np.ndarray,
626+
Optional[pd.DataFrame],
627+
np.ndarray,
628+
Optional[pd.DataFrame],
629+
Optional[OpenMLRunTrace],
630+
"OrderedDict[str, float]",
631+
]:
632+
# Sets up the OpenML instantiated in the child process to match that of the parent's
633+
# if configuration=None, loads the default
634+
config._setup(configuration)
635+
636+
train_indices, test_indices = task.get_train_test_split_indices(
637+
repeat=rep_no, fold=fold_no, sample=sample_no
638+
)
639+
640+
if isinstance(task, OpenMLSupervisedTask):
641+
x, y = task.get_X_and_y(dataset_format=dataset_format)
642+
if dataset_format == "dataframe":
643+
train_x = x.iloc[train_indices]
644+
train_y = y.iloc[train_indices]
645+
test_x = x.iloc[test_indices]
646+
test_y = y.iloc[test_indices]
647+
else:
648+
train_x = x[train_indices]
649+
train_y = y[train_indices]
650+
test_x = x[test_indices]
651+
test_y = y[test_indices]
652+
elif isinstance(task, OpenMLClusteringTask):
653+
x = task.get_X(dataset_format=dataset_format)
654+
if dataset_format == "dataframe":
655+
train_x = x.iloc[train_indices]
656+
else:
657+
train_x = x[train_indices]
658+
train_y = None
659+
test_x = None
660+
test_y = None
661+
else:
662+
raise NotImplementedError(task.task_type)
663+
config.logger.info(
664+
"Going to execute flow '%s' on task %d for repeat %d fold %d sample %d.",
665+
flow.name,
666+
task.task_id,
667+
rep_no,
668+
fold_no,
669+
sample_no,
670+
)
671+
pred_y, proba_y, user_defined_measures_fold, trace, = extension._run_model_on_fold(
672+
model=model,
673+
task=task,
674+
X_train=train_x,
675+
y_train=train_y,
676+
rep_no=rep_no,
677+
fold_no=fold_no,
678+
X_test=test_x,
679+
)
680+
return pred_y, proba_y, test_indices, test_y, trace, user_defined_measures_fold
681+
682+
618683
def get_runs(run_ids):
619684
"""Gets all runs in run_ids list.
620685

tests/test_openml/test_config.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,35 @@ def test_non_writable_home(self, log_handler_mock, warnings_mock, expanduser_moc
2525
self.assertEqual(log_handler_mock.call_count, 1)
2626
self.assertFalse(log_handler_mock.call_args_list[0][1]["create_file_handler"])
2727

28+
def test_get_config_as_dict(self):
29+
""" Checks if the current configuration is returned accurately as a dict. """
30+
config = openml.config.get_config_as_dict()
31+
_config = dict()
32+
_config["apikey"] = "610344db6388d9ba34f6db45a3cf71de"
33+
_config["server"] = "https://test.openml.org/api/v1/xml"
34+
_config["cachedir"] = self.workdir
35+
_config["avoid_duplicate_runs"] = False
36+
_config["connection_n_retries"] = 10
37+
_config["max_retries"] = 20
38+
self.assertIsInstance(config, dict)
39+
self.assertEqual(len(config), 6)
40+
self.assertDictEqual(config, _config)
41+
42+
def test_setup_with_config(self):
43+
""" Checks if the OpenML configuration can be updated using _setup(). """
44+
_config = dict()
45+
_config["apikey"] = "610344db6388d9ba34f6db45a3cf71de"
46+
_config["server"] = "https://www.openml.org/api/v1/xml"
47+
_config["cachedir"] = self.workdir
48+
_config["avoid_duplicate_runs"] = True
49+
_config["connection_n_retries"] = 100
50+
_config["max_retries"] = 1000
51+
orig_config = openml.config.get_config_as_dict()
52+
openml.config._setup(_config)
53+
updated_config = openml.config.get_config_as_dict()
54+
openml.config._setup(orig_config) # important to not affect other unit tests
55+
self.assertDictEqual(_config, updated_config)
56+
2857

2958
class TestConfigurationForExamples(openml.testing.TestBase):
3059
def test_switch_to_example_configuration(self):

0 commit comments

Comments
 (0)