Skip to content

Commit 5299b4a

Browse files
authored
Merge pull request #8179 from aldbr/main_FIX_watchdog-killing-payload
[9.0] fix(wms): Watchdog does not kill payload properly and runs endlessly
2 parents db98c79 + 940f06c commit 5299b4a

File tree

3 files changed

+56
-12
lines changed

3 files changed

+56
-12
lines changed

src/DIRAC/Core/Utilities/Subprocess.py

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -268,18 +268,33 @@ def killChild(self, recursive=True):
268268
269269
:param boolean recursive: flag to kill all descendants
270270
"""
271-
272-
parent = psutil.Process(self.childPID)
273-
children = parent.children(recursive=recursive)
274-
children.append(parent)
275-
for p in children:
271+
pgid = os.getpgid(self.childPID)
272+
if pgid != os.getpgrp():
276273
try:
277-
p.send_signal(signal.SIGTERM)
278-
except psutil.NoSuchProcess:
274+
# Child is in its own group: kill the group
275+
os.killpg(pgid, signal.SIGTERM)
276+
except OSError:
277+
# Process is already dead
279278
pass
280-
_gone, alive = psutil.wait_procs(children, timeout=10)
281-
for p in alive:
282-
p.kill()
279+
else:
280+
# No separate group: walk the tree
281+
parent = psutil.Process(self.childPID)
282+
procs = parent.children(recursive=recursive)
283+
procs.append(parent)
284+
for p in procs:
285+
try:
286+
p.terminate()
287+
except psutil.NoSuchProcess:
288+
pass
289+
_gone, alive = psutil.wait_procs(procs, timeout=10)
290+
# Escalate any survivors
291+
for p in alive:
292+
try:
293+
p.kill()
294+
except psutil.NoSuchProcess:
295+
pass
296+
297+
self.childKilled = True
283298

284299
def pythonCall(self, function, *stArgs, **stKeyArgs):
285300
"""call python function :function: with :stArgs: and :stKeyArgs:"""
@@ -405,7 +420,9 @@ def __readFromSystemCommandOutput(self, fd, bufferIndex):
405420
self.killChild()
406421
return self.__generateSystemCommandError(1, f"{retDict['Message']} for '{self.cmdSeq}' call")
407422

408-
def systemCall(self, cmdSeq, callbackFunction=None, shell=False, env=None, preexec_fn=None):
423+
def systemCall(
424+
self, cmdSeq, callbackFunction=None, shell=False, env=None, preexec_fn=None, start_new_session=False
425+
):
409426
"""system call (no shell) - execute :cmdSeq:"""
410427

411428
if shell:
@@ -426,6 +443,7 @@ def systemCall(self, cmdSeq, callbackFunction=None, shell=False, env=None, preex
426443
env=env,
427444
universal_newlines=True,
428445
preexec_fn=preexec_fn,
446+
start_new_session=start_new_session,
429447
)
430448
self.childPID = self.child.pid
431449
except OSError as v:

src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapper.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1584,7 +1584,9 @@ def run(self):
15841584
start = time.time()
15851585
initialStat = os.times()
15861586
log.verbose("Cmd called", self.cmd)
1587-
output = self.spObject.systemCall(self.cmd, env=self.exeEnv, callbackFunction=self.sendOutput, shell=True)
1587+
output = self.spObject.systemCall(
1588+
self.cmd, env=self.exeEnv, callbackFunction=self.sendOutput, shell=True, start_new_session=True
1589+
)
15881590
log.verbose(f"Output of system call within execution thread: {output}")
15891591
self.executionResults["Thread"] = output
15901592
timing = time.time() - start

src/DIRAC/WorkloadManagementSystem/JobWrapper/test/Test_JobWrapper.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,30 @@ def test_processFailedSubprocess(mocker):
296296
assert not result["Value"]["watchdogStats"]
297297

298298

299+
@pytest.mark.slow
300+
def test_processKilledSubprocess(mocker):
301+
"""Test the process method of the JobWrapper class: the job is stalled and is killed by the Watchdog."""
302+
jw = JobWrapper()
303+
jw.jobArgs = {"CPUTime": 100, "Memory": 1}
304+
305+
mocker.patch.object(jw, "_JobWrapper__report")
306+
mocker.patch.object(jw, "_JobWrapper__setJobParam")
307+
308+
mock_progress_call = mocker.patch("DIRAC.WorkloadManagementSystem.JobWrapper.Watchdog.Watchdog._checkProgress")
309+
mock_progress_call.return_value = S_ERROR("Job is stalled!")
310+
311+
with tempfile.NamedTemporaryFile(delete=True) as std_out, tempfile.NamedTemporaryFile(delete=True) as std_err:
312+
jw.outputFile = std_out.name
313+
jw.errorFile = std_err.name
314+
result = jw.process("sleep 20", {})
315+
316+
assert result["OK"]
317+
assert result["Value"]["payloadStatus"] == 15 # SIGTERM
318+
assert not result["Value"]["payloadOutput"]
319+
assert not result["Value"]["payloadExecutorError"]
320+
assert result["Value"]["watchdogError"] == "Job is stalled!" # Error message from the watchdog
321+
322+
299323
@pytest.mark.slow
300324
def test_processQuickExecutionNoWatchdog(mocker):
301325
"""Test the process method of the JobWrapper class: the payload is too fast to start the watchdog."""

0 commit comments

Comments
 (0)