Skip to content

Commit cc09811

Browse files
authored
Ensure proper clean up when ctrl+c'ing cluster jobs (#1081)
* also clean up main path and log in case clean up fails silently * also remove pickle input files when ctrl+c'ing slurm jobs; log when deletion does not work * fix test
1 parent c8684de commit cc09811

File tree

2 files changed

+16
-10
lines changed

2 files changed

+16
-10
lines changed

cluster_tools/cluster_tools/schedulers/cluster_executor.py

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,7 @@ def handle_kill(
176176

177177
self.inner_handle_kill(signum, frame)
178178
self.wait_thread.stop()
179+
self.clean_up()
179180

180181
if (
181182
existing_sigint_handler != signal.default_int_handler
@@ -248,15 +249,15 @@ def inner_submit(
248249
) -> Tuple[List["Future[str]"], List[Tuple[int, int]]]:
249250
pass
250251

251-
def _cleanup(self, jobid: str) -> None:
252+
def _maybe_mark_logs_for_cleanup(self, jobid: str) -> None:
252253
"""Given a job ID as returned by _start, perform any necessary
253254
cleanup after the job has finished.
254255
"""
255256
if self.keep_logs:
256257
return
257258

258-
outf = self.format_log_file_path(self.cfut_dir, jobid)
259-
self.files_to_clean_up.append(outf)
259+
log_path = self.format_log_file_path(self.cfut_dir, jobid)
260+
self.files_to_clean_up.append(log_path)
260261

261262
@staticmethod
262263
@abstractmethod
@@ -367,7 +368,7 @@ def _completion(self, jobid: str, failed_early: bool) -> None:
367368
if not should_keep_output:
368369
self.files_to_clean_up.append(outfile_name)
369370

370-
self._cleanup(jobid)
371+
self._maybe_mark_logs_for_cleanup(jobid)
371372

372373
def ensure_not_shutdown(self) -> None:
373374
if self.was_requested_to_shutdown:
@@ -462,10 +463,10 @@ def get_main_meta_path(cfut_dir: str, workerid: str) -> str:
462463
return os.path.join(cfut_dir, f"cfut.main_path.{workerid}.txt")
463464

464465
def store_main_path_to_meta_file(self, workerid: str) -> None:
465-
with open(
466-
self.get_main_meta_path(self.cfut_dir, workerid), "w", encoding="utf-8"
467-
) as file:
466+
main_meta_path = self.get_main_meta_path(self.cfut_dir, workerid)
467+
with open(main_meta_path, "w", encoding="utf-8") as file:
468468
file.write(file_path_to_absolute_module(sys.argv[0]))
469+
self.files_to_clean_up.append(main_meta_path)
469470

470471
def map_to_futures(
471472
self,
@@ -615,11 +616,16 @@ def shutdown(self, wait: bool = True, cancel_futures: bool = True) -> None:
615616
self.wait_thread.stop()
616617
self.wait_thread.join()
617618

619+
self.clean_up()
620+
621+
def clean_up(self) -> None:
618622
for file_to_clean_up in self.files_to_clean_up:
619623
try:
620624
os.unlink(file_to_clean_up)
621-
except OSError: # noqa: PERF203 `try`-`except` within a loop incurs performance overhead
622-
pass
625+
except OSError as exc: # noqa: PERF203 `try`-`except` within a loop incurs performance overhead
626+
logging.warning(
627+
f"Could not delete file during clean up. Path: {file_to_clean_up} Exception: {exc}. Continuing..."
628+
)
623629
self.files_to_clean_up = []
624630

625631
def map( # type: ignore[override]

cluster_tools/tests/test_slurm.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ def test_slurm_cfut_dir() -> None:
7979
assert future.result() == 4
8080

8181
assert os.path.exists(cfut_dir)
82-
assert len(os.listdir(cfut_dir)) == 2
82+
assert len(os.listdir(cfut_dir)) == 1 # only the log file should still exist
8383

8484

8585
def test_slurm_max_submit_user() -> None:

0 commit comments

Comments
 (0)