Skip to content

Commit cf0d5df

Browse files
committed
rename from cancel_futures_on_shutdown to wait
1 parent 5beab96 commit cf0d5df

File tree

8 files changed

+52
-72
lines changed

8 files changed

+52
-72
lines changed

src/executorlib/executor/flux.py

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
from executorlib.executor.base import BaseExecutor
44
from executorlib.standalone.inputcheck import (
5-
check_cancel_futures_on_shutdown,
5+
check_wait_on_shutdown,
66
check_command_line_argument_lst,
77
check_init_function,
88
check_log_obj_size,
@@ -310,7 +310,7 @@ def __init__(
310310
plot_dependency_graph_filename: Optional[str] = None,
311311
export_workflow_filename: Optional[str] = None,
312312
log_obj_size: bool = False,
313-
cancel_futures_on_shutdown: bool = False,
313+
wait: bool = True,
314314
):
315315
"""
316316
The executorlib.FluxClusterExecutor leverages either the message passing interface (MPI), the SLURM workload
@@ -357,8 +357,7 @@ def __init__(
357357
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
358358
export_workflow_filename (str): Name of the file to store the exported workflow graph in.
359359
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
360-
cancel_futures_on_shutdown (bool): Whether to cancel pending futures and the corresponding Python processes
361-
on shutdown.
360+
wait (bool): Whether to wait for the completion of all tasks before shutting down the executor.
362361
363362
"""
364363
default_resource_dict: dict = {
@@ -418,7 +417,7 @@ def __init__(
418417
block_allocation=block_allocation,
419418
init_function=init_function,
420419
disable_dependencies=disable_dependencies,
421-
cancel_futures_on_shutdown=cancel_futures_on_shutdown,
420+
wait=wait,
422421
)
423422
)
424423
else:
@@ -459,7 +458,7 @@ def create_flux_executor(
459458
block_allocation: bool = False,
460459
init_function: Optional[Callable] = None,
461460
log_obj_size: bool = False,
462-
cancel_futures_on_shutdown: bool = False,
461+
wait: bool = True,
463462
) -> Union[OneProcessTaskScheduler, BlockAllocationTaskScheduler]:
464463
"""
465464
Create a flux executor
@@ -498,8 +497,7 @@ def create_flux_executor(
498497
of the individual function.
499498
init_function (None): optional function to preset arguments for functions which are submitted later
500499
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
501-
cancel_futures_on_shutdown (bool): Whether to cancel pending futures and the corresponding Python processes on
502-
shutdown.
500+
wait (bool): Whether to wait for the completion of all tasks before shutting down the executor.
503501
504502
Returns:
505503
InteractiveStepExecutor/ InteractiveExecutor
@@ -521,9 +519,7 @@ def create_flux_executor(
521519
check_command_line_argument_lst(
522520
command_line_argument_lst=resource_dict.get("slurm_cmd_args", [])
523521
)
524-
check_cancel_futures_on_shutdown(
525-
cancel_futures_on_shutdown=cancel_futures_on_shutdown
526-
)
522+
check_wait_on_shutdown(wait_on_shutdown=wait)
527523
if "openmpi_oversubscribe" in resource_dict:
528524
del resource_dict["openmpi_oversubscribe"]
529525
if "slurm_cmd_args" in resource_dict:

src/executorlib/executor/single.py

Lines changed: 13 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
from executorlib.executor.base import BaseExecutor
44
from executorlib.standalone.inputcheck import (
5-
check_cancel_futures_on_shutdown,
5+
check_wait_on_shutdown,
66
check_command_line_argument_lst,
77
check_gpus_per_worker,
88
check_init_function,
@@ -61,8 +61,7 @@ class SingleNodeExecutor(BaseExecutor):
6161
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
6262
export_workflow_filename (str): Name of the file to store the exported workflow graph in.
6363
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
64-
cancel_futures_on_shutdown (bool): Whether to cancel pending futures and the corresponding Python processes on
65-
shutdown.
64+
wait (bool): Whether to wait for the completion of all tasks before shutting down the executor.
6665
6766
Examples:
6867
```
@@ -100,7 +99,7 @@ def __init__(
10099
plot_dependency_graph_filename: Optional[str] = None,
101100
export_workflow_filename: Optional[str] = None,
102101
log_obj_size: bool = False,
103-
cancel_futures_on_shutdown: bool = False,
102+
wait: bool = True,
104103
):
105104
"""
106105
The executorlib.SingleNodeExecutor leverages either the message passing interface (MPI), the SLURM workload
@@ -146,8 +145,7 @@ def __init__(
146145
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
147146
export_workflow_filename (str): Name of the file to store the exported workflow graph in.
148147
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
149-
cancel_futures_on_shutdown (bool): Whether to cancel pending futures and the corresponding Python processes
150-
on shutdown.
148+
wait (bool): Whether to wait for the completion of all tasks before shutting down the executor.
151149
152150
"""
153151
default_resource_dict: dict = {
@@ -175,7 +173,7 @@ def __init__(
175173
block_allocation=block_allocation,
176174
init_function=init_function,
177175
log_obj_size=log_obj_size,
178-
cancel_futures_on_shutdown=cancel_futures_on_shutdown,
176+
wait=wait,
179177
),
180178
max_cores=max_cores,
181179
refresh_rate=refresh_rate,
@@ -197,7 +195,7 @@ def __init__(
197195
block_allocation=block_allocation,
198196
init_function=init_function,
199197
log_obj_size=log_obj_size,
200-
cancel_futures_on_shutdown=cancel_futures_on_shutdown,
198+
wait=wait,
201199
)
202200
)
203201

@@ -240,8 +238,7 @@ class TestClusterExecutor(BaseExecutor):
240238
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
241239
export_workflow_filename (str): Name of the file to store the exported workflow graph in.
242240
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
243-
cancel_futures_on_shutdown (bool): Whether to cancel pending futures and the corresponding Python processes on
244-
shutdown.
241+
wait (bool): Whether to wait for the completion of all tasks before shutting down the executor.
245242
246243
Examples:
247244
```
@@ -279,7 +276,7 @@ def __init__(
279276
plot_dependency_graph_filename: Optional[str] = None,
280277
export_workflow_filename: Optional[str] = None,
281278
log_obj_size: bool = False,
282-
cancel_futures_on_shutdown: bool = False,
279+
wait: bool = True,
283280
):
284281
"""
285282
The executorlib.api.TestClusterExecutor is designed to test the file based communication used in the
@@ -318,8 +315,7 @@ def __init__(
318315
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
319316
export_workflow_filename (str): Name of the file to store the exported workflow graph in.
320317
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
321-
cancel_futures_on_shutdown (bool): Whether to cancel pending futures and the corresponding Python processes
322-
on shutdown.
318+
wait (bool): Whether to wait for the completion of all tasks before shutting down the executor.
323319
324320
"""
325321
default_resource_dict: dict = {
@@ -359,7 +355,7 @@ def __init__(
359355
init_function=init_function,
360356
disable_dependencies=disable_dependencies,
361357
execute_function=execute_in_subprocess,
362-
cancel_futures_on_shutdown=cancel_futures_on_shutdown,
358+
wait=wait,
363359
)
364360
)
365361
else:
@@ -393,7 +389,7 @@ def create_single_node_executor(
393389
block_allocation: bool = False,
394390
init_function: Optional[Callable] = None,
395391
log_obj_size: bool = False,
396-
cancel_futures_on_shutdown: bool = False,
392+
wait: bool = True,
397393
) -> Union[OneProcessTaskScheduler, BlockAllocationTaskScheduler]:
398394
"""
399395
Create a single node executor
@@ -428,8 +424,7 @@ def create_single_node_executor(
428424
of the individual function.
429425
init_function (None): optional function to preset arguments for functions which are submitted later
430426
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
431-
cancel_futures_on_shutdown (bool): Whether to cancel pending futures and the corresponding Python processes on
432-
shutdown.
427+
wait (bool): Whether to wait for the completion of all tasks before shutting down the executor.
433428
434429
Returns:
435430
InteractiveStepExecutor/ InteractiveExecutor
@@ -446,9 +441,7 @@ def create_single_node_executor(
446441
check_command_line_argument_lst(
447442
command_line_argument_lst=resource_dict.get("slurm_cmd_args", [])
448443
)
449-
check_cancel_futures_on_shutdown(
450-
cancel_futures_on_shutdown=cancel_futures_on_shutdown
451-
)
444+
check_wait_on_shutdown(wait_on_shutdown=wait)
452445
if "threads_per_core" in resource_dict:
453446
del resource_dict["threads_per_core"]
454447
if "gpus_per_core" in resource_dict:

src/executorlib/executor/slurm.py

Lines changed: 13 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
from executorlib.executor.base import BaseExecutor
44
from executorlib.standalone.inputcheck import (
5-
check_cancel_futures_on_shutdown,
5+
check_wait_on_shutdown,
66
check_init_function,
77
check_log_obj_size,
88
check_plot_dependency_graph,
@@ -66,8 +66,7 @@ class SlurmClusterExecutor(BaseExecutor):
6666
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
6767
export_workflow_filename (str): Name of the file to store the exported workflow graph in.
6868
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
69-
cancel_futures_on_shutdown (bool): Whether to cancel pending futures and the corresponding Python processes on
70-
shutdown.
69+
wait (bool): Whether to wait for the completion of all tasks before shutting down the executor.
7170
7271
Examples:
7372
```
@@ -107,7 +106,7 @@ def __init__(
107106
plot_dependency_graph_filename: Optional[str] = None,
108107
export_workflow_filename: Optional[str] = None,
109108
log_obj_size: bool = False,
110-
cancel_futures_on_shutdown: bool = False,
109+
wait: bool = True,
111110
):
112111
"""
113112
The executorlib.SlurmClusterExecutor leverages either the message passing interface (MPI), the SLURM workload
@@ -154,8 +153,7 @@ def __init__(
154153
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
155154
export_workflow_filename (str): Name of the file to store the exported workflow graph in.
156155
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
157-
cancel_futures_on_shutdown (bool): Whether to cancel pending futures and the corresponding Python processes
158-
on shutdown.
156+
wait (bool): Whether to wait for the completion of all tasks before shutting down the executor.
159157
160158
"""
161159
default_resource_dict: dict = {
@@ -216,7 +214,7 @@ def __init__(
216214
block_allocation=block_allocation,
217215
init_function=init_function,
218216
disable_dependencies=disable_dependencies,
219-
cancel_futures_on_shutdown=cancel_futures_on_shutdown,
217+
wait=wait,
220218
)
221219
)
222220
else:
@@ -288,8 +286,7 @@ class SlurmJobExecutor(BaseExecutor):
288286
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
289287
export_workflow_filename (str): Name of the file to store the exported workflow graph in.
290288
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
291-
cancel_futures_on_shutdown (bool): Whether to cancel pending futures and the corresponding Python processes on
292-
shutdown.
289+
wait (bool): Whether to wait for the completion of all tasks before shutting down the executor.
293290
294291
Examples:
295292
```
@@ -328,7 +325,7 @@ def __init__(
328325
plot_dependency_graph_filename: Optional[str] = None,
329326
export_workflow_filename: Optional[str] = None,
330327
log_obj_size: bool = False,
331-
cancel_futures_on_shutdown: bool = False,
328+
wait: bool = True,
332329
):
333330
"""
334331
The executorlib.SlurmJobExecutor leverages either the message passing interface (MPI), the SLURM workload
@@ -378,8 +375,7 @@ def __init__(
378375
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
379376
export_workflow_filename (str): Name of the file to store the exported workflow graph in.
380377
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
381-
cancel_futures_on_shutdown (bool): Whether to cancel pending futures and the corresponding Python processes
382-
on shutdown.
378+
wait (bool): Whether to wait for the completion of all tasks before shutting down the executor.
383379
384380
"""
385381
default_resource_dict: dict = {
@@ -408,7 +404,7 @@ def __init__(
408404
block_allocation=block_allocation,
409405
init_function=init_function,
410406
log_obj_size=log_obj_size,
411-
cancel_futures_on_shutdown=cancel_futures_on_shutdown,
407+
wait=wait,
412408
),
413409
max_cores=max_cores,
414410
refresh_rate=refresh_rate,
@@ -431,7 +427,7 @@ def __init__(
431427
block_allocation=block_allocation,
432428
init_function=init_function,
433429
log_obj_size=log_obj_size,
434-
cancel_futures_on_shutdown=cancel_futures_on_shutdown,
430+
wait=wait,
435431
)
436432
)
437433

@@ -446,7 +442,7 @@ def create_slurm_executor(
446442
block_allocation: bool = False,
447443
init_function: Optional[Callable] = None,
448444
log_obj_size: bool = False,
449-
cancel_futures_on_shutdown: bool = False,
445+
wait: bool = True,
450446
) -> Union[OneProcessTaskScheduler, BlockAllocationTaskScheduler]:
451447
"""
452448
Create a SLURM executor
@@ -486,8 +482,7 @@ def create_slurm_executor(
486482
of the individual function.
487483
init_function (None): optional function to preset arguments for functions which are submitted later
488484
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
489-
cancel_futures_on_shutdown (bool): Whether to cancel pending futures and the corresponding Python processes on
490-
shutdown.
485+
wait (bool): Whether to wait for the completion of all tasks before shutting down the executor.
491486
492487
Returns:
493488
InteractiveStepExecutor/ InteractiveExecutor
@@ -500,9 +495,7 @@ def create_slurm_executor(
500495
resource_dict["log_obj_size"] = log_obj_size
501496
resource_dict["pmi_mode"] = pmi_mode
502497
check_init_function(block_allocation=block_allocation, init_function=init_function)
503-
check_cancel_futures_on_shutdown(
504-
cancel_futures_on_shutdown=cancel_futures_on_shutdown
505-
)
498+
check_wait_on_shutdown(wait_on_shutdown=wait)
506499
if block_allocation:
507500
resource_dict["init_function"] = init_function
508501
max_workers = validate_number_of_cores(

src/executorlib/standalone/inputcheck.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,15 @@ def check_oversubscribe(oversubscribe: bool) -> None:
1717
)
1818

1919

20-
def check_cancel_futures_on_shutdown(
21-
cancel_futures_on_shutdown: bool,
20+
def check_wait_on_shutdown(
21+
wait_on_shutdown: bool,
2222
) -> None:
2323
"""
24-
Check if cancel_futures_on_shutdown is True and raise a ValueError if it is.
24+
Check if wait_on_shutdown is False and raise a ValueError if it is.
2525
"""
26-
if cancel_futures_on_shutdown:
26+
if not wait_on_shutdown:
2727
raise ValueError(
28-
"The cancel_futures_on_shutdown parameter is only supported for the executorlib.FluxClusterExecutor and executorlib.SlurmClusterExecutor."
28+
"The wait_on_shutdown parameter is only supported for the executorlib.FluxClusterExecutor and executorlib.SlurmClusterExecutor."
2929
)
3030

3131

src/executorlib/task_scheduler/file/shared.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ def execute_tasks_h5(
5757
backend: Optional[str] = None,
5858
disable_dependencies: bool = False,
5959
pmi_mode: Optional[str] = None,
60-
cancel_futures_on_shutdown: bool = False,
60+
wait: bool = True,
6161
) -> None:
6262
"""
6363
Execute tasks stored in a queue using HDF5 files.
@@ -73,8 +73,7 @@ def execute_tasks_h5(
7373
backend (str, optional): name of the backend used to spawn tasks.
7474
disable_dependencies (boolean): Disable resolving future objects during the submission.
7575
pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only)
76-
cancel_futures_on_shutdown (bool): Whether to cancel pending futures and the corresponding Python processes on
77-
shutdown.
76+
wait (bool): Whether to wait for the completion of all tasks before shutting down the executor.
7877
7978
Returns:
8079
None
@@ -89,7 +88,7 @@ def execute_tasks_h5(
8988
with contextlib.suppress(queue.Empty):
9089
task_dict = future_queue.get_nowait()
9190
if task_dict is not None and "shutdown" in task_dict and task_dict["shutdown"]:
92-
if task_dict["wait"] and not cancel_futures_on_shutdown:
91+
if task_dict["wait"] and wait:
9392
while len(memory_dict) > 0:
9493
memory_dict = {
9594
key: _check_task_output(
@@ -100,7 +99,7 @@ def execute_tasks_h5(
10099
for key, value in memory_dict.items()
101100
if not value.done()
102101
}
103-
if not task_dict["cancel_futures"] and not cancel_futures_on_shutdown:
102+
if not task_dict["cancel_futures"] and wait:
104103
if (
105104
terminate_function is not None
106105
and terminate_function == terminate_subprocess

0 commit comments

Comments
 (0)