Skip to content

Commit 788a89d

Browse files
AlexTatemr-c
authored andcommitted
Actively running jobs respond to the kill switch by checking the switch's status in the monitor function. The monitor function, up to this point, has been for gathering memory usage statistics via a timer thread. A second timer thread now monitors the kill switch.
1 parent d7b7712 commit 788a89d

File tree

1 file changed

+54
-5
lines changed

1 file changed

+54
-5
lines changed

cwltool/job.py

Lines changed: 54 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -507,6 +507,7 @@ def process_monitor(self, sproc: "subprocess.Popen[str]", kill_switch: threading
507507
memory_usage: MutableSequence[Optional[int]] = [None]
508508

509509
mem_tm: "Optional[Timer]" = None
510+
ks_tm: "Optional[Timer]" = None
510511

511512
def get_tree_mem_usage(memory_usage: MutableSequence[Optional[int]]) -> None:
512513
nonlocal mem_tm
@@ -528,10 +529,27 @@ def get_tree_mem_usage(memory_usage: MutableSequence[Optional[int]]) -> None:
528529
if mem_tm is not None:
529530
mem_tm.cancel()
530531

532+
def monitor_kill_switch() -> None:
533+
nonlocal ks_tm
534+
if kill_switch.is_set():
535+
_logger.error("[job %s] terminating by kill switch", self.name)
536+
if sproc.stdin: sproc.stdin.close()
537+
sproc.terminate()
538+
else:
539+
ks_tm = Timer(interval=1, function=monitor_kill_switch)
540+
ks_tm.daemon = True
541+
ks_tm.start()
542+
543+
ks_tm = Timer(interval=1, function=monitor_kill_switch)
544+
ks_tm.daemon = True
545+
ks_tm.start()
546+
531547
mem_tm = Timer(interval=1, function=get_tree_mem_usage, args=(memory_usage,))
532548
mem_tm.daemon = True
533549
mem_tm.start()
550+
534551
sproc.wait()
552+
ks_tm.cancel()
535553
mem_tm.cancel()
536554
if memory_usage[0] is not None:
537555
_logger.info(
@@ -845,20 +863,48 @@ def docker_monitor(
845863
process: "subprocess.Popen[str]",
846864
kill_switch: threading.Event,
847865
) -> None:
848-
"""Record memory usage of the running Docker container."""
866+
"""Record memory usage of the running Docker container. Terminate if kill_switch is activated."""
867+
868+
ks_tm: "Optional[Timer]" = None
869+
cid: Optional[str] = None
870+
871+
def monitor_kill_switch() -> None:
872+
nonlocal ks_tm
873+
if kill_switch.is_set():
874+
_logger.error("[job %s] terminating by kill switch", self.name)
875+
if process.stdin:
876+
process.stdin.close()
877+
if cid is not None:
878+
kill_proc = subprocess.Popen( # nosec
879+
[docker_exe, "kill", cid], shell=False # nosec
880+
)
881+
try:
882+
kill_proc.wait(timeout=10)
883+
except subprocess.TimeoutExpired:
884+
kill_proc.kill()
885+
process.terminate() # Always terminate, even if we tried with the cidfile
886+
else:
887+
ks_tm = Timer(interval=1, function=monitor_kill_switch)
888+
ks_tm.daemon = True
889+
ks_tm.start()
890+
891+
ks_tm = Timer(interval=1, function=monitor_kill_switch)
892+
ks_tm.daemon = True
893+
ks_tm.start()
894+
849895
# Todo: consider switching to `docker create` / `docker start`
850896
# instead of `docker run` as `docker create` outputs the container ID
851897
# to stdout, but the container is frozen, thus allowing us to start the
852898
# monitoring process without dealing with the cidfile or too-fast
853899
# container execution
854-
cid: Optional[str] = None
855900
while cid is None:
856901
time.sleep(1)
857902
# This is needed to avoid a race condition where the job
858903
# was so fast that it already finished when it arrives here
859904
if process.returncode is None:
860905
process.poll()
861906
if process.returncode is not None:
907+
ks_tm.cancel()
862908
if cleanup_cidfile:
863909
try:
864910
os.remove(cidfile)
@@ -890,6 +936,9 @@ def docker_monitor(
890936
except OSError as exc:
891937
_logger.warning("Ignored error with %s stats: %s", docker_exe, exc)
892938
return
939+
finally:
940+
ks_tm.cancel()
941+
893942
max_mem_percent: float = 0.0
894943
mem_percent: float = 0.0
895944
with open(stats_file_name) as stats:
@@ -924,7 +973,7 @@ def _job_popen(
924973
job_script_contents: Optional[str] = None,
925974
timelimit: Optional[int] = None,
926975
name: Optional[str] = None,
927-
monitor_function: Optional[Callable[["subprocess.Popen[str]"], None]] = None,
976+
monitor_function: Optional[Callable[["subprocess.Popen[str]", "threading.Event"], None]] = None,
928977
default_stdout: Optional[Union[IO[bytes], TextIO]] = None,
929978
default_stderr: Optional[Union[IO[bytes], TextIO]] = None,
930979
) -> int:
@@ -979,7 +1028,7 @@ def terminate(): # type: () -> None
9791028
tm.daemon = True
9801029
tm.start()
9811030
if monitor_function:
982-
monitor_function(sproc)
1031+
monitor_function(sproc, kill_switch)
9831032
rcode = sproc.wait()
9841033

9851034
if tm is not None:
@@ -1055,7 +1104,7 @@ def terminate(): # type: () -> None
10551104
tm.daemon = True
10561105
tm.start()
10571106
if monitor_function:
1058-
monitor_function(sproc)
1107+
monitor_function(sproc, kill_switch)
10591108

10601109
rcode = sproc.wait()
10611110

0 commit comments

Comments
 (0)