Skip to content

Commit 3230c4e

Browse files
authored
Merge pull request #6639 from hjoliver/foreground-handler-timeout
Kill foreground event handlers on timeout
1 parent d700c11 commit 3230c4e

File tree

4 files changed

+90
-25
lines changed

4 files changed

+90
-25
lines changed

changes.d/6639.fix.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Ensure that shutdown event handlers are killed if they exceed the process pool timeout.

cylc/flow/subprocpool.py

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
from tempfile import SpooledTemporaryFile
2626
from threading import RLock
2727
from time import time
28-
from subprocess import DEVNULL, run # nosec
28+
from subprocess import DEVNULL, TimeoutExpired, run # nosec
2929
from typing import TYPE_CHECKING, Any, Callable, List, Optional, Set
3030

3131
from cylc.flow import LOG, iter_entry_points
@@ -365,18 +365,38 @@ def put_command(
365365
)
366366

367367
@classmethod
368-
def run_command(cls, ctx):
368+
def run_command(cls, ctx, callback: Optional[Callable] = None):
369369
"""Execute command in ctx and capture its output and exit status.
370370
371+
Kills the subprocess if it exceeds the subprocess pool timeout.
372+
371373
Arguments:
372374
ctx (cylc.flow.subprocctx.SubProcContext):
373375
A context object containing the command to run and its status.
376+
callback:
377+
Optional callback function.
374378
"""
379+
timeout = glbl_cfg().get(['scheduler', 'process pool timeout'])
380+
375381
proc = cls._run_command_init(ctx)
376-
if proc:
377-
ctx.out, ctx.err = (f.decode() for f in proc.communicate())
382+
if not proc:
383+
return
384+
385+
try:
386+
ctx.out, ctx.err = (
387+
f.decode()
388+
for f in proc.communicate(timeout=float(timeout))
389+
)
390+
except TimeoutExpired:
391+
if _killpg(proc, SIGKILL):
392+
ctx.err = f"killed on timeout ({timeout})"
393+
ctx.ret_code = proc.wait()
394+
else:
378395
ctx.ret_code = proc.wait()
379-
cls._run_command_exit(ctx)
396+
397+
if callback is not None:
398+
callback(ctx)
399+
cls._run_command_exit(ctx)
380400

381401
def set_stopping(self):
382402
"""Stop job submission."""

cylc/flow/workflow_events.py

Lines changed: 16 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -302,14 +302,21 @@ def _send_mail(
302302
env=env,
303303
stdin_str=message
304304
)
305-
if self.proc_pool.closed:
306-
# Run command in foreground if process pool is closed
307-
self.proc_pool.run_command(proc_ctx)
308-
self._run_event_handlers_callback(proc_ctx)
305+
self._run_cmd(proc_ctx, callback=self._run_event_mail_callback)
306+
307+
def _run_cmd(self, ctx, callback):
308+
"""Queue or directly run a command and its callback.
309+
310+
Queue the command to the subprocess pool if possible, or else run it
311+
in the foreground (but still subject to the subprocess pool timeout).
312+
313+
"""
314+
if not self.proc_pool.closed:
315+
# Queue it to the subprocess pool.
316+
self.proc_pool.put_command(ctx, callback=callback)
309317
else:
310-
# Run command using process pool otherwise
311-
self.proc_pool.put_command(
312-
proc_ctx, callback=self._run_event_mail_callback)
318+
# Run it in the foreground.
319+
self.proc_pool.run_command(ctx, callback=callback)
313320

314321
def _run_event_custom_handlers(self, schd, template_variables, event):
315322
"""Helper for "run_event_handlers", custom event handlers."""
@@ -349,23 +356,14 @@ def _run_event_custom_handlers(self, schd, template_variables, event):
349356
env=dict(os.environ),
350357
shell=True # nosec (designed to run user defined code)
351358
)
352-
if self.proc_pool.closed:
353-
# Run command in foreground if abort on failure is set or if
354-
# process pool is closed
355-
self.proc_pool.run_command(proc_ctx)
356-
self._run_event_handlers_callback(proc_ctx)
357-
else:
358-
# Run command using process pool otherwise
359-
self.proc_pool.put_command(
360-
proc_ctx, callback=self._run_event_handlers_callback)
359+
self._run_cmd(proc_ctx, self._run_event_handlers_callback)
361360

362361
@staticmethod
363362
def _run_event_handlers_callback(proc_ctx):
364363
"""Callback on completion of a workflow event handler."""
365364
if proc_ctx.ret_code:
366-
msg = '%s EVENT HANDLER FAILED' % proc_ctx.cmd_key[1]
367365
LOG.error(str(proc_ctx))
368-
LOG.error(msg)
366+
LOG.error(f'{proc_ctx.cmd_key[1]} EVENT HANDLER FAILED')
369367
else:
370368
LOG.info(str(proc_ctx))
371369

tests/integration/test_workflow_events.py

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
from async_timeout import timeout as async_timeout
2020
import pytest
21+
from types import MethodType
2122

2223
from cylc.flow.scheduler import SchedulerError
2324

@@ -74,8 +75,8 @@ def _schd(config=None, **opts):
7475
async def test_startup_and_shutdown(test_scheduler, run):
7576
"""Test the startup and shutdown events.
7677
77-
* "statup" should fire every time a scheduler is started.
78-
* "shutdown" should fire every time a scheduler exits in a controlled fassion
78+
* "startup" should fire every time a scheduler is started.
79+
* "shutdown" should fire every time a scheduler does a controlled exit.
7980
(i.e. excluding aborts on unexpected internal errors).
8081
"""
8182
schd = test_scheduler()
@@ -185,3 +186,48 @@ async def test_restart_timeout(test_scheduler, scheduler, run, complete):
185186
async with run(schd2):
186187
await asyncio.sleep(0.1)
187188
assert schd2.get_events() == {'startup', 'restart timeout', 'shutdown'}
189+
190+
191+
async def test_shutdown_handler_timeout_kill(
192+
test_scheduler, run, monkeypatch, mock_glbl_cfg, caplog
193+
):
194+
"""Test shutdown handlers get killed on the process pool timeout.
195+
196+
Has to be done differently as the process pool is closed during shutdown.
197+
See GitHub #6639
198+
199+
"""
200+
def mock_run_event_handlers(self, event, reason=""):
201+
"""To replace scheduler.run_event_handlers(...).
202+
203+
Run workflow event handlers even in simulation mode.
204+
205+
"""
206+
self.workflow_event_handler.handle(self, event, str(reason))
207+
208+
# Configure a long-running shutdown handler.
209+
schd = test_scheduler({'shutdown handlers': 'sleep 10; echo'})
210+
211+
# Set a low process pool timeout value.
212+
mock_glbl_cfg(
213+
'cylc.flow.subprocpool.glbl_cfg',
214+
'''
215+
[scheduler]
216+
process pool timeout = PT1S
217+
'''
218+
)
219+
220+
async with async_timeout(30):
221+
async with run(schd):
222+
# Replace a scheduler method, to call handlers in simulation mode.
223+
monkeypatch.setattr(
224+
schd,
225+
'run_event_handlers',
226+
MethodType(mock_run_event_handlers, schd),
227+
)
228+
await asyncio.sleep(0.1)
229+
230+
assert (
231+
"[('workflow-event-handler-00', 'shutdown') err] killed on timeout (PT1S)"
232+
in caplog.text
233+
)

0 commit comments

Comments
 (0)