Skip to content

Commit 8f4f45e

Browse files
[Feature] Set wait=False during initialization of Executor (#893)
* Only terminate tasks on queue when cancel_futures is set to true * Add cancel_futures_on_shutdown to FileTaskScheduler * Add cancel_futures_on_shutdown in __init__() * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Update flux.py * Update slurm.py * Update test_standalone_inputcheck.py * fix * fixes * try different tests * rename from cancel_futures_on_shutdown to wait * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * continue renaming --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
1 parent 618c77c commit 8f4f45e

File tree

9 files changed

+119
-14
lines changed

9 files changed

+119
-14
lines changed

src/executorlib/executor/flux.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
check_plot_dependency_graph,
1010
check_pmi,
1111
check_refresh_rate,
12+
check_wait_on_shutdown,
1213
validate_number_of_cores,
1314
)
1415
from executorlib.task_scheduler.interactive.blockallocation import (
@@ -67,6 +68,7 @@ class FluxJobExecutor(BaseExecutor):
6768
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
6869
export_workflow_filename (str): Name of the file to store the exported workflow graph in.
6970
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
71+
wait (bool): Whether to wait for the completion of all tasks before shutting down the executor.
7072
7173
Examples:
7274
```
@@ -108,6 +110,7 @@ def __init__(
108110
plot_dependency_graph_filename: Optional[str] = None,
109111
export_workflow_filename: Optional[str] = None,
110112
log_obj_size: bool = False,
113+
wait: bool = True,
111114
):
112115
"""
113116
The executorlib.FluxJobExecutor leverages either the message passing interface (MPI), the SLURM workload manager
@@ -156,6 +159,7 @@ def __init__(
156159
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
157160
export_workflow_filename (str): Name of the file to store the exported workflow graph in.
158161
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
162+
wait (bool): Whether to wait for the completion of all tasks before shutting down the executor.
159163
160164
"""
161165
default_resource_dict: dict = {
@@ -187,6 +191,7 @@ def __init__(
187191
block_allocation=block_allocation,
188192
init_function=init_function,
189193
log_obj_size=log_obj_size,
194+
wait=wait,
190195
),
191196
max_cores=max_cores,
192197
refresh_rate=refresh_rate,
@@ -212,6 +217,7 @@ def __init__(
212217
block_allocation=block_allocation,
213218
init_function=init_function,
214219
log_obj_size=log_obj_size,
220+
wait=wait,
215221
)
216222
)
217223

@@ -261,6 +267,7 @@ class FluxClusterExecutor(BaseExecutor):
261267
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
262268
export_workflow_filename (str): Name of the file to store the exported workflow graph in.
263269
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
270+
wait (bool): Whether to wait for the completion of all tasks before shutting down the executor.
264271
265272
Examples:
266273
```
@@ -300,6 +307,7 @@ def __init__(
300307
plot_dependency_graph_filename: Optional[str] = None,
301308
export_workflow_filename: Optional[str] = None,
302309
log_obj_size: bool = False,
310+
wait: bool = True,
303311
):
304312
"""
305313
The executorlib.FluxClusterExecutor leverages either the message passing interface (MPI), the SLURM workload
@@ -346,6 +354,7 @@ def __init__(
346354
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
347355
export_workflow_filename (str): Name of the file to store the exported workflow graph in.
348356
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
357+
wait (bool): Whether to wait for the completion of all tasks before shutting down the executor.
349358
350359
"""
351360
default_resource_dict: dict = {
@@ -405,6 +414,7 @@ def __init__(
405414
block_allocation=block_allocation,
406415
init_function=init_function,
407416
disable_dependencies=disable_dependencies,
417+
wait=wait,
408418
)
409419
)
410420
else:
@@ -445,6 +455,7 @@ def create_flux_executor(
445455
block_allocation: bool = False,
446456
init_function: Optional[Callable] = None,
447457
log_obj_size: bool = False,
458+
wait: bool = True,
448459
) -> Union[OneProcessTaskScheduler, BlockAllocationTaskScheduler]:
449460
"""
450461
Create a flux executor
@@ -483,6 +494,7 @@ def create_flux_executor(
483494
of the individual function.
484495
init_function (None): optional function to preset arguments for functions which are submitted later
485496
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
497+
wait (bool): Whether to wait for the completion of all tasks before shutting down the executor.
486498
487499
Returns:
488500
InteractiveStepExecutor/ InteractiveExecutor
@@ -504,6 +516,7 @@ def create_flux_executor(
504516
check_command_line_argument_lst(
505517
command_line_argument_lst=resource_dict.get("slurm_cmd_args", [])
506518
)
519+
check_wait_on_shutdown(wait_on_shutdown=wait)
507520
if "openmpi_oversubscribe" in resource_dict:
508521
del resource_dict["openmpi_oversubscribe"]
509522
if "slurm_cmd_args" in resource_dict:

src/executorlib/executor/single.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
check_init_function,
88
check_plot_dependency_graph,
99
check_refresh_rate,
10+
check_wait_on_shutdown,
1011
validate_number_of_cores,
1112
)
1213
from executorlib.standalone.interactive.spawner import MpiExecSpawner
@@ -60,6 +61,7 @@ class SingleNodeExecutor(BaseExecutor):
6061
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
6162
export_workflow_filename (str): Name of the file to store the exported workflow graph in.
6263
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
64+
wait (bool): Whether to wait for the completion of all tasks before shutting down the executor.
6365
6466
Examples:
6567
```
@@ -97,6 +99,7 @@ def __init__(
9799
plot_dependency_graph_filename: Optional[str] = None,
98100
export_workflow_filename: Optional[str] = None,
99101
log_obj_size: bool = False,
102+
wait: bool = True,
100103
):
101104
"""
102105
The executorlib.SingleNodeExecutor leverages either the message passing interface (MPI), the SLURM workload
@@ -142,6 +145,7 @@ def __init__(
142145
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
143146
export_workflow_filename (str): Name of the file to store the exported workflow graph in.
144147
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
148+
wait (bool): Whether to wait for the completion of all tasks before shutting down the executor.
145149
146150
"""
147151
default_resource_dict: dict = {
@@ -169,6 +173,7 @@ def __init__(
169173
block_allocation=block_allocation,
170174
init_function=init_function,
171175
log_obj_size=log_obj_size,
176+
wait=wait,
172177
),
173178
max_cores=max_cores,
174179
refresh_rate=refresh_rate,
@@ -190,6 +195,7 @@ def __init__(
190195
block_allocation=block_allocation,
191196
init_function=init_function,
192197
log_obj_size=log_obj_size,
198+
wait=wait,
193199
)
194200
)
195201

@@ -232,6 +238,7 @@ class TestClusterExecutor(BaseExecutor):
232238
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
233239
export_workflow_filename (str): Name of the file to store the exported workflow graph in.
234240
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
241+
wait (bool): Whether to wait for the completion of all tasks before shutting down the executor.
235242
236243
Examples:
237244
```
@@ -269,6 +276,7 @@ def __init__(
269276
plot_dependency_graph_filename: Optional[str] = None,
270277
export_workflow_filename: Optional[str] = None,
271278
log_obj_size: bool = False,
279+
wait: bool = True,
272280
):
273281
"""
274282
The executorlib.api.TestClusterExecutor is designed to test the file based communication used in the
@@ -307,6 +315,7 @@ def __init__(
307315
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
308316
export_workflow_filename (str): Name of the file to store the exported workflow graph in.
309317
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
318+
wait (bool): Whether to wait for the completion of all tasks before shutting down the executor.
310319
311320
"""
312321
default_resource_dict: dict = {
@@ -346,6 +355,7 @@ def __init__(
346355
init_function=init_function,
347356
disable_dependencies=disable_dependencies,
348357
execute_function=execute_in_subprocess,
358+
wait=wait,
349359
)
350360
)
351361
else:
@@ -379,6 +389,7 @@ def create_single_node_executor(
379389
block_allocation: bool = False,
380390
init_function: Optional[Callable] = None,
381391
log_obj_size: bool = False,
392+
wait: bool = True,
382393
) -> Union[OneProcessTaskScheduler, BlockAllocationTaskScheduler]:
383394
"""
384395
Create a single node executor
@@ -413,6 +424,7 @@ def create_single_node_executor(
413424
of the individual function.
414425
init_function (None): optional function to preset arguments for functions which are submitted later
415426
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
427+
wait (bool): Whether to wait for the completion of all tasks before shutting down the executor.
416428
417429
Returns:
418430
InteractiveStepExecutor/ InteractiveExecutor
@@ -429,6 +441,7 @@ def create_single_node_executor(
429441
check_command_line_argument_lst(
430442
command_line_argument_lst=resource_dict.get("slurm_cmd_args", [])
431443
)
444+
check_wait_on_shutdown(wait_on_shutdown=wait)
432445
if "threads_per_core" in resource_dict:
433446
del resource_dict["threads_per_core"]
434447
if "gpus_per_core" in resource_dict:

src/executorlib/executor/slurm.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
check_log_obj_size,
77
check_plot_dependency_graph,
88
check_refresh_rate,
9+
check_wait_on_shutdown,
910
validate_number_of_cores,
1011
)
1112
from executorlib.task_scheduler.interactive.blockallocation import (
@@ -65,6 +66,7 @@ class SlurmClusterExecutor(BaseExecutor):
6566
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
6667
export_workflow_filename (str): Name of the file to store the exported workflow graph in.
6768
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
69+
wait (bool): Whether to wait for the completion of all tasks before shutting down the executor.
6870
6971
Examples:
7072
```
@@ -104,6 +106,7 @@ def __init__(
104106
plot_dependency_graph_filename: Optional[str] = None,
105107
export_workflow_filename: Optional[str] = None,
106108
log_obj_size: bool = False,
109+
wait: bool = True,
107110
):
108111
"""
109112
The executorlib.SlurmClusterExecutor leverages either the message passing interface (MPI), the SLURM workload
@@ -150,6 +153,7 @@ def __init__(
150153
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
151154
export_workflow_filename (str): Name of the file to store the exported workflow graph in.
152155
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
156+
wait (bool): Whether to wait for the completion of all tasks before shutting down the executor.
153157
154158
"""
155159
default_resource_dict: dict = {
@@ -210,6 +214,7 @@ def __init__(
210214
block_allocation=block_allocation,
211215
init_function=init_function,
212216
disable_dependencies=disable_dependencies,
217+
wait=wait,
213218
)
214219
)
215220
else:
@@ -281,6 +286,7 @@ class SlurmJobExecutor(BaseExecutor):
281286
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
282287
export_workflow_filename (str): Name of the file to store the exported workflow graph in.
283288
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
289+
wait (bool): Whether to wait for the completion of all tasks before shutting down the executor.
284290
285291
Examples:
286292
```
@@ -319,6 +325,7 @@ def __init__(
319325
plot_dependency_graph_filename: Optional[str] = None,
320326
export_workflow_filename: Optional[str] = None,
321327
log_obj_size: bool = False,
328+
wait: bool = True,
322329
):
323330
"""
324331
The executorlib.SlurmJobExecutor leverages either the message passing interface (MPI), the SLURM workload
@@ -368,6 +375,7 @@ def __init__(
368375
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
369376
export_workflow_filename (str): Name of the file to store the exported workflow graph in.
370377
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
378+
wait (bool): Whether to wait for the completion of all tasks before shutting down the executor.
371379
372380
"""
373381
default_resource_dict: dict = {
@@ -396,6 +404,7 @@ def __init__(
396404
block_allocation=block_allocation,
397405
init_function=init_function,
398406
log_obj_size=log_obj_size,
407+
wait=wait,
399408
),
400409
max_cores=max_cores,
401410
refresh_rate=refresh_rate,
@@ -418,6 +427,7 @@ def __init__(
418427
block_allocation=block_allocation,
419428
init_function=init_function,
420429
log_obj_size=log_obj_size,
430+
wait=wait,
421431
)
422432
)
423433

@@ -432,6 +442,7 @@ def create_slurm_executor(
432442
block_allocation: bool = False,
433443
init_function: Optional[Callable] = None,
434444
log_obj_size: bool = False,
445+
wait: bool = True,
435446
) -> Union[OneProcessTaskScheduler, BlockAllocationTaskScheduler]:
436447
"""
437448
Create a SLURM executor
@@ -471,6 +482,7 @@ def create_slurm_executor(
471482
of the individual function.
472483
init_function (None): optional function to preset arguments for functions which are submitted later
473484
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
485+
wait (bool): Whether to wait for the completion of all tasks before shutting down the executor.
474486
475487
Returns:
476488
InteractiveStepExecutor/ InteractiveExecutor
@@ -483,6 +495,7 @@ def create_slurm_executor(
483495
resource_dict["log_obj_size"] = log_obj_size
484496
resource_dict["pmi_mode"] = pmi_mode
485497
check_init_function(block_allocation=block_allocation, init_function=init_function)
498+
check_wait_on_shutdown(wait_on_shutdown=wait)
486499
if block_allocation:
487500
resource_dict["init_function"] = init_function
488501
max_workers = validate_number_of_cores(

src/executorlib/standalone/inputcheck.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,18 @@ def check_oversubscribe(oversubscribe: bool) -> None:
1717
)
1818

1919

20+
def check_wait_on_shutdown(
21+
wait_on_shutdown: bool,
22+
) -> None:
23+
"""
24+
Check if wait_on_shutdown is False and raise a ValueError if it is.
25+
"""
26+
if not wait_on_shutdown:
27+
raise ValueError(
28+
"The wait_on_shutdown parameter is only supported for the executorlib.FluxClusterExecutor and executorlib.SlurmClusterExecutor."
29+
)
30+
31+
2032
def check_command_line_argument_lst(command_line_argument_lst: list[str]) -> None:
2133
"""
2234
Check if command_line_argument_lst is not empty and raise a ValueError if it is.

src/executorlib/task_scheduler/base.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,9 @@ def shutdown(self, wait: bool = True, *, cancel_futures: bool = False):
198198
if cancel_futures and self._future_queue is not None:
199199
cancel_items_in_queue(que=self._future_queue)
200200
if self._process is not None and self._future_queue is not None:
201-
self._future_queue.put({"shutdown": True, "wait": wait})
201+
self._future_queue.put(
202+
{"shutdown": True, "wait": wait, "cancel_futures": cancel_futures}
203+
)
202204
if wait and isinstance(self._process, Thread):
203205
self._process.join()
204206
self._future_queue.join()

0 commit comments

Comments
 (0)