Skip to content

Commit a7186c3

Browse files
authored
Cancel cluster jobs on shutdown (#838)
* cancel all running slurm and pbs jobs in case the executor is killed * avoid using the logging module during shutdown to avoid additional errors * try to close multiprocessing logging handler on SystemExit * revert logger closing * do not react to sigterm and do not call sys.exit to allow clean shutdown by calling process * update changelog * adapt test * use logging during shutdown since it shouldn't cause additional errors * apply PR feedback * format * fix exception during shutdown for non-array jobs * add args and kwargs to ignored-argument-names for pylint * Merge branch 'master' of github.com:scalableminds/webknossos-libs into cancel-cluster-jobs * signal jobs with SIGINT instead of SIGTERM to allow to cancel recursively scheduled jobs * Only send SIGINT to running jobs as scancel stalls otherwise. Use scancel without a signal parameter to cancel pending jobs. * Cancel pending jobs even if canceling running jobs did not yield exit code 0 * Do not interfere with existing SIGINT handlers. Call it after signal handling in case one exists. * Avoid dead lock in executor shutdown * First cancel the pending slurm jobs, then the running ones to avoid race conditions * fix handle_kill call in tests * Merge branch 'master' of github.com:scalableminds/webknossos-libs into cancel-cluster-jobs * improve troubleshooting instructions in dockered slurm README * Add test for slurm job cancellation and prepare slurm version update * fix typing * use new slurm docker image with updated slurm version * fix linting * correctly restore SLURM_MAX_RUNNING_SIZE env variable in tests
1 parent 27f7426 commit a7186c3

File tree

10 files changed

+163
-35
lines changed

10 files changed

+163
-35
lines changed

cluster_tools/.pylintrc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -354,7 +354,7 @@ max-args=5
354354

355355
# Argument names that match this expression will be ignored. Default to name
356356
# with leading underscore
357-
ignored-argument-names=_.*
357+
ignored-argument-names=_.*|args|kwargs
358358

359359
# Maximum number of locals for function / method body
360360
max-locals=15

cluster_tools/Changelog.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ For upgrade instructions, please check the respective *Breaking Changes* section
1414
### Added
1515

1616
### Changed
17+
- When using the slurm or pbs distribution strategy, scheduled jobs are automatically canceled when aborting a run, i.e. if the SIGINT signal is received. [#838](https://github.com/scalableminds/webknossos-libs/pull/838)
1718

1819
### Fixed
1920

cluster_tools/cluster_tools/schedulers/cluster_executor.py

Lines changed: 28 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
with_preliminary_postfix,
2323
)
2424

25+
NOT_YET_SUBMITTED_STATE = "NOT_YET_SUBMITTED"
26+
2527

2628
def join_messages(strings: List[str]) -> str:
2729
return " ".join(x.strip() for x in strings if x.strip())
@@ -86,14 +88,19 @@ def __init__(
8688
self.jobs_lock = threading.Lock()
8789
self.jobs_empty_cond = threading.Condition(self.jobs_lock)
8890
self.keep_logs = keep_logs
91+
self.is_shutting_down = False
8992

9093
self.wait_thread = FileWaitThread(self._completion, self)
9194
self.wait_thread.start()
9295

9396
os.makedirs(self.cfut_dir, exist_ok=True)
9497

95-
signal.signal(signal.SIGINT, self.handle_kill)
96-
signal.signal(signal.SIGTERM, self.handle_kill)
98+
# Clean up if a SIGINT signal is received. However, do not interfere with the
99+
# existing signal handler of the process or the
100+
# shutdown of the main process which sends SIGTERM signals to terminate all
101+
# child processes.
102+
existing_sigint_handler = signal.getsignal(signal.SIGINT)
103+
signal.signal(signal.SIGINT, partial(self.handle_kill, existing_sigint_handler))
97104

98105
self.meta_data = {}
99106
assert not (
@@ -109,15 +116,24 @@ def __init__(
109116
def executor_key(cls):
110117
pass
111118

112-
def handle_kill(self, _signum, _frame):
119+
def handle_kill(self, existing_sigint_handler, signum, frame):
120+
if self.is_shutting_down:
121+
return
122+
123+
self.is_shutting_down = True
124+
125+
self.inner_handle_kill(signum, frame)
113126
self.wait_thread.stop()
114-
job_ids = ",".join(str(id) for id in self.jobs.keys())
115-
logging.debug(
116-
"A termination signal was registered. The following jobs are still running on the cluster:\n{}".format(
117-
job_ids
118-
)
119-
)
120-
sys.exit(130)
127+
128+
if (
129+
existing_sigint_handler # pylint: disable=comparison-with-callable
130+
!= signal.default_int_handler
131+
):
132+
existing_sigint_handler(signum, frame)
133+
134+
@abstractmethod
135+
def inner_handle_kill(self, _signum, _frame):
136+
pass
121137

122138
@abstractmethod
123139
def check_job_state(
@@ -426,7 +442,7 @@ def map_to_futures(self, fun, allArgs, output_pickle_path_getter=None):
426442
# Register the job in the jobs array, although the jobid is not known yet.
427443
# Otherwise it might happen that self.jobs becomes empty, but some of the jobs were
428444
# not even submitted yet.
429-
self.jobs[workerid_with_index] = "pending"
445+
self.jobs[workerid_with_index] = NOT_YET_SUBMITTED_STATE
430446

431447
job_count = len(allArgs)
432448
job_name = get_function_name(fun)
@@ -495,7 +511,7 @@ def shutdown(self, wait=True):
495511
self.was_requested_to_shutdown = True
496512
if wait:
497513
with self.jobs_lock:
498-
if self.jobs:
514+
if self.jobs and self.wait_thread.is_alive():
499515
self.jobs_empty_cond.wait()
500516

501517
self.wait_thread.stop()

cluster_tools/cluster_tools/schedulers/kube.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,15 @@ def get_job_id_string(cls) -> Optional[str]:
8181
return job_id
8282
return cls.get_jobid_with_index(job_id, job_index)
8383

84+
def inner_handle_kill(self, *args, **kwargs):
85+
job_ids = ",".join(str(job_id) for job_id in self.jobs.keys())
86+
87+
print(
88+
"Couldn't automatically cancel all Kubernetes jobs. The following jobs are still running on the cluster:\n{}".format(
89+
job_ids
90+
)
91+
)
92+
8493
def ensure_kubernetes_namespace(self):
8594
kubernetes_client = KubernetesClient()
8695
try:

cluster_tools/cluster_tools/schedulers/pbs.py

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import os
55
import re
66
from concurrent import futures
7-
from typing import Dict, List, Optional, Tuple
7+
from typing import Dict, List, Optional, Tuple, Union
88

99
from typing_extensions import Literal
1010

@@ -53,6 +53,31 @@ def format_log_file_name(job_id_with_index, suffix=".stdout"):
5353
def get_job_id_string(cls):
5454
return cls.get_current_job_id()
5555

56+
def inner_handle_kill(self, *args, **kwargs):
57+
scheduled_job_ids: List[Union[int, str]] = list(self.jobs.keys())
58+
59+
if len(scheduled_job_ids):
60+
# Array jobs (whose id looks like `<job_id>_<array_index>`) don't need to be canceled individually,
61+
# but can be canceled together using the job_id.
62+
split_job_ids = map(lambda x: str(x).split("_"), scheduled_job_ids)
63+
# However array job ids need to include [] in the end.
64+
unique_job_ids = set(
65+
job_id_parts[0] if len(job_id_parts) == 1 else f"{job_id_parts[0]}[]"
66+
for job_id_parts in split_job_ids
67+
)
68+
# Send SIGINT signal instead of SIGTERM using qdel. This way, the jobs can
69+
# react to the signal, safely shutdown and signal (cancel) jobs they possibly scheduled, recursively.
70+
_stdout, stderr, exit_code = call(
71+
f"qsig -s SIGINT {' '.join(unique_job_ids)}"
72+
)
73+
74+
if exit_code == 0:
75+
logging.debug(f"Canceled PBS jobs {', '.join(unique_job_ids)}.")
76+
else:
77+
logging.warning(
78+
f"Couldn't automatically cancel all PBS jobs. Reason: {stderr}"
79+
)
80+
5681
def submit_text(self, job):
5782
"""Submits a PBS job represented as a job file string. Returns
5883
the job ID.

cluster_tools/cluster_tools/schedulers/slurm.py

Lines changed: 39 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,14 @@
77
import sys
88
import threading
99
from functools import lru_cache
10-
from typing import List, Optional, Tuple, Type
10+
from typing import List, Optional, Tuple, Type, Union
1111

1212
from typing_extensions import Literal
1313

1414
from cluster_tools.util import call, chcall, random_string
1515

1616
from .cluster_executor import (
17+
NOT_YET_SUBMITTED_STATE,
1718
ClusterExecutor,
1819
RemoteException,
1920
RemoteOutOfMemoryException,
@@ -163,18 +164,23 @@ def get_max_submit_jobs():
163164
return max_submit_jobs
164165

165166
@staticmethod
166-
def get_number_of_submitted_jobs():
167+
def get_number_of_submitted_jobs(state: Optional[str] = None):
167168
number_of_submitted_jobs = 0
169+
state_string = f"-t {state}" if state else ""
168170
# --array so that each job array element is displayed on a separate line and -h to hide the header
169-
stdout, stderr, exit_code = call("squeue --array -u $USER -h | wc -l")
171+
stdout, stderr, exit_code = call(
172+
f"squeue --array -u $USER -h {state_string} | wc -l"
173+
)
174+
175+
job_state_string = f"with state {state} " if state else ""
170176
if exit_code == 0:
171177
number_of_submitted_jobs = int(stdout.decode("utf8"))
172178
logging.debug(
173-
f"Number of currently submitted jobs is {number_of_submitted_jobs}."
179+
f"Number of currently submitted jobs {job_state_string}is {number_of_submitted_jobs}."
174180
)
175181
else:
176182
logging.warning(
177-
f"Number of currently submitted jobs couldn't be determined. Reason: {stderr}"
183+
f"Number of currently submitted jobs {job_state_string}couldn't be determined. Reason: {stderr}"
178184
)
179185
return number_of_submitted_jobs
180186

@@ -197,10 +203,36 @@ def submit_text(cls, job, cfut_dir):
197203

198204
return int(job_id)
199205

200-
def handle_kill(self, *args, **kwargs):
206+
def inner_handle_kill(self, *args, **kwargs):
201207
for submit_thread in self.submit_threads:
202208
submit_thread.stop()
203-
super().handle_kill(*args, **kwargs)
209+
210+
# Jobs with a NOT_YET_SUBMITTED_STATE have not been submitted to the cluster yet
211+
scheduled_job_ids: List[Union[int, str]] = [
212+
job_id
213+
for job_id, job_state in self.jobs.items()
214+
if job_state != NOT_YET_SUBMITTED_STATE
215+
]
216+
217+
if len(scheduled_job_ids):
218+
# Array jobs (whose id looks like `<job_id>_<array_index>`) don't need to be signaled individually,
219+
# but can be canceled together using the job_id.
220+
unique_job_ids = set(map(lambda x: str(x).split("_")[0], scheduled_job_ids))
221+
job_id_string = " ".join(unique_job_ids)
222+
# Send SIGINT signal to running jobs instead of terminating the jobs right away. This way, the jobs can
223+
# react to the signal, safely shutdown and signal (cancel) jobs they possibly scheduled, recursively.
224+
_, stderr, _ = call(
225+
f"scancel --state=PENDING {job_id_string}; scancel -s SIGINT --state=RUNNING {job_id_string}; scancel --state=SUSPENDED {job_id_string}"
226+
)
227+
228+
maybe_error_or_warning = (
229+
f"\nErrors and warnings (if all jobs were pending 'Invalid job id' errors are expected):\n{stderr.decode('utf8')}"
230+
if stderr
231+
else ""
232+
)
233+
print(
234+
f"Canceled slurm jobs {', '.join(unique_job_ids)}.{maybe_error_or_warning}"
235+
)
204236

205237
def cleanup_submit_threads(self):
206238
self.submit_threads = [

cluster_tools/dockered-slurm/README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ Run `docker-compose` to instantiate the cluster:
3232
$ docker-compose up -d
3333
```
3434

35+
> Note: If you encounter permission errors (`Failed to check keyfile "/etc/munge/munge.key": Permission denied`), follow the steps from the "Deleting the Cluster" section and run the previous command again.
36+
3537
## Register the Cluster with SlurmDBD
3638

3739
To register the cluster to the slurmdbd daemon, run the `register_cluster.sh`
@@ -48,6 +50,8 @@ $ ./register_cluster.sh
4850
> You can check the status of the cluster by viewing the logs: `docker-compose
4951
> logs -f`
5052
53+
> Note: If you encounter an error that the daemon is not running (`Error response from daemon: Container <...> is not running`), the start of the containers was not successful. Check the logs using `docker-compose logs -f` and revisit the last step.
54+
5155
## Accessing the Cluster
5256

5357
Use `docker exec` to run a bash shell on the controller container:

cluster_tools/dockered-slurm/docker-compose.yml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ services:
1515
- ..:/cluster_tools
1616

1717
slurmdbd:
18-
image: scalableminds/slurm-docker-cluster:master__2021363052
18+
image: scalableminds/slurm-docker-cluster:master__3840662994
1919
command: ["slurmdbd"]
2020
container_name: slurmdbd
2121
hostname: slurmdbd
@@ -29,7 +29,7 @@ services:
2929
- mysql
3030

3131
slurmctld:
32-
image: scalableminds/slurm-docker-cluster:master__2021363052
32+
image: scalableminds/slurm-docker-cluster:master__3840662994
3333
command: ["slurmctld"]
3434
container_name: slurmctld
3535
environment:
@@ -48,7 +48,7 @@ services:
4848
- "slurmdbd"
4949

5050
c1:
51-
image: scalableminds/slurm-docker-cluster:master__2021363052
51+
image: scalableminds/slurm-docker-cluster:master__3840662994
5252
command: ["slurmd"]
5353
hostname: c1
5454
container_name: c1
@@ -64,7 +64,7 @@ services:
6464
- "slurmctld"
6565

6666
c2:
67-
image: scalableminds/slurm-docker-cluster:master__2021363052
67+
image: scalableminds/slurm-docker-cluster:master__3840662994
6868
command: ["slurmd"]
6969
hostname: c2
7070
container_name: c2

cluster_tools/dockered-slurm/slurm.conf

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ SlurmctldPidFile=/var/run/slurmd/slurmctld.pid
2323
SlurmdPidFile=/var/run/slurmd/slurmd.pid
2424
ProctrackType=proctrack/linuxproc
2525
#PluginDir=
26-
CacheGroups=0
2726
#FirstJobId=
2827
ReturnToService=0
2928
#MaxJobCount=
@@ -58,7 +57,6 @@ SchedulerType=sched/backfill
5857
#SchedulerRootFilter=
5958
SelectType=select/cons_res
6059
SelectTypeParameters=CR_CPU_Memory
61-
FastSchedule=1
6260
#PriorityType=priority/multifactor
6361
#PriorityDecayHalfLife=14-0
6462
#PriorityUsageResetPeriod=14-0
@@ -83,7 +81,6 @@ JobAcctGatherFrequency=30
8381
AccountingStorageType=accounting_storage/slurmdbd
8482
AccountingStorageHost=slurmdbd
8583
AccountingStoragePort=6819
86-
AccountingStorageLoc=slurm_acct_db
8784
#AccountingStoragePass=
8885
#AccountingStorageUser=
8986
#

0 commit comments

Comments
 (0)