Skip to content

Commit d96f9ce

Browse files
authored
Update examples on parallel processing (#1059)
1 parent 05e7263 commit d96f9ce

File tree

3 files changed

+369
-168
lines changed

3 files changed

+369
-168
lines changed

examples/60_search/example_parallel_manual_spawning.py

Lines changed: 0 additions & 168 deletions
This file was deleted.
Lines changed: 215 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,215 @@
1+
# -*- encoding: utf-8 -*-
2+
"""
3+
======================================================
4+
Parallel Usage: Spawning workers from the command line
5+
======================================================
6+
7+
*Auto-sklearn* uses
8+
`dask.distributed <https://distributed.dask.org/en/latest/index.html>`_
9+
for parallel optimization.
10+
11+
This example shows how to start the dask scheduler and spawn
12+
workers for *Auto-sklearn* manually from the command line. Use this example
13+
as a starting point to parallelize *Auto-sklearn* across multiple
14+
machines. If you want to start everything manually from within Python
15+
please see `this example <example_parallel_manual_spawning_python.html>`_.
16+
To run *Auto-sklearn* in parallel on a single machine check out the example
17+
`Parallel Usage on a single machine <example_parallel_n_jobs.html>`_.
18+
19+
You can learn more about the dask command line interface from
20+
https://docs.dask.org/en/latest/setup/cli.html.
21+
22+
When manually passing a dask client to Auto-sklearn, all logic
23+
must be guarded by ``if __name__ == "__main__":`` statements! We use
24+
multiple such statements to properly render this example as a notebook
25+
and also allow execution via the command line.
26+
27+
Background
28+
==========
29+
30+
To run Auto-sklearn distributed on multiple machines we need to set
31+
up three components:
32+
33+
1. **Auto-sklearn and a dask client**. This will manage all workload, find new
34+
configurations to evaluate and submit jobs via a dask client. As this
35+
runs Bayesian optimization it should be executed on its own CPU.
36+
2. **The dask workers**. They will do the actual work of running machine
37+
learning algorithms and require their own CPU each.
38+
3. **The scheduler**. It manages the communication between the dask client
39+
and the different dask workers. As the client and all workers connect
40+
to the scheduler it must be started first. This is a light-weight job
41+
and does not require its own CPU.
42+
43+
We will now start these three components in reverse order: scheduler,
44+
workers and client. Also, in a real setup, the scheduler and the workers should
45+
be started from the command line and not from within a Python file via
46+
the ``subprocess`` module as done here (for the sake of having a self-contained
47+
example).
48+
"""
49+
50+
###########################################################################
51+
# Import statements
52+
# =================
53+
54+
import multiprocessing
55+
import subprocess
56+
import time
57+
58+
import dask.distributed
59+
import sklearn.datasets
60+
import sklearn.metrics
61+
62+
from autosklearn.classification import AutoSklearnClassifier
63+
from autosklearn.constants import MULTICLASS_CLASSIFICATION
64+
65+
tmp_folder = '/tmp/autosklearn_parallel_3_example_tmp'
66+
output_folder = '/tmp/autosklearn_parallel_3_example_out'
67+
68+
worker_processes = []
69+
70+
71+
###########################################################################
72+
# 0. Setup client-scheduler communication
73+
# =======================================
74+
#
75+
# In this examples the dask scheduler is started without an explicit
76+
# address and port. Instead, the scheduler takes a free port and stores
77+
# relevant information in a file for which we provided the name and
78+
# location. This filename is also given to the worker so they can find all
79+
# relevant information to connect to the scheduler.
80+
81+
scheduler_file_name = 'scheduler-file.json'
82+
83+
84+
############################################################################
85+
# 1. Start scheduler
86+
# ==================
87+
#
88+
# Starting the scheduler is done with the following bash command:
89+
#
90+
# .. code:: bash
91+
#
92+
# dask-scheduler --scheduler-file scheduler-file.json --idle-timeout 10
93+
#
94+
# We will now execute this bash command from within Python to have a
95+
# self-contained example:
96+
97+
def cli_start_scheduler(scheduler_file_name):
98+
call_string = (
99+
"dask-scheduler --scheduler-file %s --idle-timeout 10"
100+
) % scheduler_file_name
101+
proc = subprocess.run(call_string, stdout=subprocess.PIPE,
102+
stderr=subprocess.STDOUT, shell=True, check=True)
103+
while proc.returncode is None:
104+
time.sleep(1)
105+
106+
107+
if __name__ == "__main__":
108+
process_python_worker = multiprocessing.Process(
109+
target=cli_start_scheduler,
110+
args=(scheduler_file_name, ),
111+
)
112+
process_python_worker.start()
113+
worker_processes.append(process_python_worker)
114+
115+
# Wait a second for the scheduler to become available
116+
time.sleep(1)
117+
118+
119+
############################################################################
120+
# 2. Start two workers
121+
# ====================
122+
#
123+
# Starting the scheduler is done with the following bash command:
124+
#
125+
# .. code:: bash
126+
#
127+
# DASK_DISTRIBUTED__WORKER__DAEMON=False \
128+
# dask-worker --nthreads 1 --lifetime 35 --memory-limit 0 \
129+
# --scheduler-file scheduler-file.json
130+
#
131+
# We will now execute this bash command from within Python to have a
132+
# self-contained example. Please note, that
133+
# ``DASK_DISTRIBUTED__WORKER__DAEMON=False`` is required in this
134+
# case as dask-worker creates a new process, which by default is not
135+
# compatible with Auto-sklearn creating new processes in the workers itself.
136+
# We disable dask's memory management by passing ``--memory-limit`` as
137+
# Auto-sklearn does the memory management itself.
138+
139+
def cli_start_worker(scheduler_file_name):
140+
call_string = (
141+
"DASK_DISTRIBUTED__WORKER__DAEMON=False "
142+
"dask-worker --nthreads 1 --lifetime 35 --memory-limit 0 "
143+
"--scheduler-file %s"
144+
) % scheduler_file_name
145+
proc = subprocess.run(call_string, stdout=subprocess.PIPE,
146+
stderr=subprocess.STDOUT, shell=True)
147+
while proc.returncode is None:
148+
time.sleep(1)
149+
150+
if __name__ == '__main__':
151+
for _ in range(2):
152+
process_cli_worker = multiprocessing.Process(
153+
target=cli_start_worker,
154+
args=(scheduler_file_name, ),
155+
)
156+
process_cli_worker.start()
157+
worker_processes.append(process_cli_worker)
158+
159+
# Wait a second for workers to become available
160+
time.sleep(1)
161+
162+
############################################################################
163+
# 3. Creating a client in Python
164+
# ==============================
165+
#
166+
# Finally we create a dask cluster which also connects to the scheduler via
167+
# the information in the file created by the scheduler.
168+
169+
client = dask.distributed.Client(scheduler_file=scheduler_file_name)
170+
171+
############################################################################
172+
# Start Auto-sklearn
173+
# ~~~~~~~~~~~~~~~~~~
174+
if __name__ == "__main__":
175+
X, y = sklearn.datasets.load_breast_cancer(return_X_y=True)
176+
X_train, X_test, y_train, y_test = \
177+
sklearn.model_selection.train_test_split(X, y, random_state=1)
178+
automl = AutoSklearnClassifier(
179+
time_left_for_this_task=30,
180+
per_run_time_limit=10,
181+
memory_limit=1024,
182+
tmp_folder=tmp_folder,
183+
output_folder=output_folder,
184+
seed=777,
185+
# n_jobs is ignored internally as we pass a dask client.
186+
n_jobs=1,
187+
# Pass a dask client which connects to the previously constructed cluster.
188+
dask_client=client,
189+
)
190+
automl.fit(X_train, y_train)
191+
192+
automl.fit_ensemble(
193+
y_train,
194+
task=MULTICLASS_CLASSIFICATION,
195+
dataset_name='digits',
196+
ensemble_size=20,
197+
ensemble_nbest=50,
198+
)
199+
200+
predictions = automl.predict(X_test)
201+
print(automl.sprint_statistics())
202+
print("Accuracy score", sklearn.metrics.accuracy_score(y_test, predictions))
203+
204+
205+
############################################################################
206+
# Wait until all workers are closed
207+
# =================================
208+
#
209+
# This is only necessary if the workers are started from within this python
210+
# script. In a real application one would start them directly from the command
211+
# line.
212+
if __name__ == '__main__':
213+
process_python_worker.join()
214+
for process in worker_processes:
215+
process.join()

0 commit comments

Comments
 (0)