Skip to content

Commit 21d18ba

Browse files
wxtimoliver-sandersMetRonnie
authored
Get poll to return task failure if job/log has been removed. (#6577)
Get poll to return task failure if job/log has been removed. --------- Co-authored-by: Oliver Sanders <[email protected]> Co-authored-by: Ronnie Dutta <[email protected]>
1 parent 8c7b7be commit 21d18ba

File tree

7 files changed

+160
-62
lines changed

7 files changed

+160
-62
lines changed

changes.d/6577.fix.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fixed a bug where if you prematurely deleted the job log directory, it would leave tasks permanently in the submitted or running states.

cylc/flow/job_runner_mgr.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,9 @@
4646
from cylc.flow.parsec.OrderedDict import OrderedDict
4747

4848

49+
JOB_FILES_REMOVED_MESSAGE = 'ERR_JOB_FILES_REMOVED'
50+
51+
4952
class JobPollContext():
5053
"""Context object for a job poll."""
5154
CONTEXT_ATTRIBUTES = (
@@ -439,6 +442,16 @@ def _filter_submit_output(cls, st_file_path, job_runner, out, err):
439442
def _jobs_poll_status_files(self, job_log_root, job_log_dir):
440443
"""Helper 1 for self.jobs_poll(job_log_root, job_log_dirs)."""
441444
ctx = JobPollContext(job_log_dir)
445+
# If the log directory has been deleted prematurely, return a task
446+
# failure and an explanation:
447+
if not os.path.exists(os.path.join(job_log_root, ctx.job_log_dir)):
448+
# The job may still be in the job runner and may yet succeed,
449+
# but we assume it failed & exited because it's the best we
450+
# can do as it is no longer possible to poll it.
451+
ctx.run_status = 1
452+
ctx.job_runner_exit_polled = 1
453+
ctx.run_signal = JOB_FILES_REMOVED_MESSAGE
454+
return ctx
442455
try:
443456
with open(
444457
os.path.join(job_log_root, ctx.job_log_dir, JOB_LOG_STATUS)

cylc/flow/task_job_mgr.py

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
Optional,
4545
Tuple,
4646
Union,
47+
cast,
4748
)
4849

4950
from cylc.flow import LOG
@@ -60,7 +61,7 @@
6061
is_remote_platform,
6162
)
6263
from cylc.flow.job_file import JobFileWriter
63-
from cylc.flow.job_runner_mgr import JobPollContext
64+
from cylc.flow.job_runner_mgr import JOB_FILES_REMOVED_MESSAGE, JobPollContext
6465
from cylc.flow.pathutil import get_remote_workflow_run_job_dir
6566
from cylc.flow.platforms import (
6667
get_host_from_platform,
@@ -864,7 +865,13 @@ def _poll_task_job_callback_255(self, workflow, itask, cmd_ctx, line):
864865
)
865866
self.poll_task_jobs(workflow, [itask])
866867

867-
def _poll_task_job_callback(self, workflow, itask, cmd_ctx, line):
868+
def _poll_task_job_callback(
869+
self,
870+
workflow: str,
871+
itask: 'TaskProxy',
872+
cmd_ctx: SubProcContext,
873+
line: str,
874+
):
868875
"""Helper for _poll_task_jobs_callback, on one task job."""
869876
ctx = SubProcContext(self.JOBS_POLL, None)
870877
ctx.out = line
@@ -891,16 +898,21 @@ def _poll_task_job_callback(self, workflow, itask, cmd_ctx, line):
891898
log_lvl = DEBUG if (
892899
itask.platform.get('communication method') == 'poll'
893900
) else INFO
901+
902+
if jp_ctx.run_signal == JOB_FILES_REMOVED_MESSAGE:
903+
LOG.error(
904+
f"platform: {itask.platform['name']} - job log directory "
905+
f"{job_tokens.relative_id} no longer exists"
906+
)
907+
894908
if jp_ctx.run_status == 1 and jp_ctx.run_signal in ["ERR", "EXIT"]:
895909
# Failed normally
896910
self.task_events_mgr.process_message(
897911
itask, log_lvl, TASK_OUTPUT_FAILED, jp_ctx.time_run_exit, flag)
898912
elif jp_ctx.run_status == 1 and jp_ctx.job_runner_exit_polled == 1:
899913
# Failed by a signal, and no longer in job runner
900914
self.task_events_mgr.process_message(
901-
itask, log_lvl, TASK_OUTPUT_FAILED, jp_ctx.time_run_exit, flag)
902-
self.task_events_mgr.process_message(
903-
itask, log_lvl, FAIL_MESSAGE_PREFIX + jp_ctx.run_signal,
915+
itask, log_lvl, f"{FAIL_MESSAGE_PREFIX}{jp_ctx.run_signal}",
904916
jp_ctx.time_run_exit,
905917
flag)
906918
elif jp_ctx.run_status == 1: # noqa: SIM114
@@ -1288,7 +1300,8 @@ def _prep_submit_task_job(
12881300
workflow, itask, '(platform not defined)', exc)
12891301
return False
12901302
else:
1291-
itask.platform = platform # type: ignore[assignment]
1303+
# (platform is not None here as subshell eval has finished)
1304+
itask.platform = cast('dict', platform)
12921305
# Retry delays, needed for the try_num
12931306
self._set_retry_timers(itask, rtconfig)
12941307

tests/flakyfunctional/cylc-poll/16-execution-time-limit.t

Lines changed: 10 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,12 @@
1414
#
1515
# You should have received a copy of the GNU General Public License
1616
# along with this program. If not, see <http://www.gnu.org/licenses/>.
17-
#-------------------------------------------------------------------------------
17+
1818
# Test execution time limit polling.
1919
export REQUIRE_PLATFORM='loc:* comms:poll runner:background'
2020
. "$(dirname "$0")/test_header"
21-
#-------------------------------------------------------------------------------
22-
set_test_number 4
21+
22+
set_test_number 5
2323
create_test_global_config '' "
2424
[platforms]
2525
[[$CYLC_TEST_PLATFORM]]
@@ -28,51 +28,16 @@ create_test_global_config '' "
2828
execution time limit polling intervals = PT5S
2929
"
3030
install_workflow "${TEST_NAME_BASE}" "${TEST_NAME_BASE}"
31-
#-------------------------------------------------------------------------------
31+
3232
run_ok "${TEST_NAME_BASE}-validate" cylc validate "${WORKFLOW_NAME}"
3333
workflow_run_ok "${TEST_NAME_BASE}-run" \
3434
cylc play --reference-test -v --no-detach "${WORKFLOW_NAME}" --timestamp
35-
#-------------------------------------------------------------------------------
36-
# shellcheck disable=SC2317
37-
cmp_times () {
38-
# Test if the times $1 and $2 are within $3 seconds of each other.
39-
python3 -u - "$@" <<'__PYTHON__'
40-
import sys
41-
from metomi.isodatetime.parsers import TimePointParser
42-
parser = TimePointParser()
43-
time_1 = parser.parse(sys.argv[1])
44-
time_2 = parser.parse(sys.argv[2])
45-
if abs((time_1 - time_2).get_seconds()) > int(sys.argv[3]):
46-
sys.exit("abs(predicted - actual) > tolerance: %s" % sys.argv[1:])
47-
__PYTHON__
48-
}
49-
time_offset () {
50-
# Add an ISO8601 duration to an ISO8601 date-time.
51-
python3 -u - "$@" <<'__PYTHON__'
52-
import sys
53-
from metomi.isodatetime.parsers import TimePointParser, DurationParser
54-
print(
55-
TimePointParser().parse(sys.argv[1]) + DurationParser().parse(sys.argv[2]))
56-
__PYTHON__
57-
}
58-
#-------------------------------------------------------------------------------
35+
5936
LOG="${WORKFLOW_RUN_DIR}/log/scheduler/log"
60-
# Test logging of the "next job poll" message when task starts.
61-
TEST_NAME="${TEST_NAME_BASE}-log-entry"
62-
LINE="$(grep '\[1/foo.* execution timeout=None, polling intervals=' "${LOG}")"
63-
run_ok "${TEST_NAME}" grep -q 'health: execution timeout=None, polling intervals=' <<< "${LINE}"
64-
# Determine poll times.
65-
PREDICTED_POLL_TIME=$(time_offset \
66-
"$(cut -d ' ' -f 1 <<< "${LINE}")" \
67-
"PT10S") # PT5S time limit + PT5S polling interval
68-
ACTUAL_POLL_TIME=$(sed -n \
69-
's|\(.*\) DEBUG - \[1/foo.* (polled)failed .*|\1|p' "${LOG}")
7037

71-
# Test execution timeout polling.
72-
# Main loop is roughly 1 second, but integer rounding may give an apparent 2
73-
# seconds delay, so set threshold as 2 seconds.
74-
run_ok "${TEST_NAME_BASE}-poll-time" \
75-
cmp_times "${PREDICTED_POLL_TIME}" "${ACTUAL_POLL_TIME}" '10'
76-
#-------------------------------------------------------------------------------
38+
log_scan "${TEST_NAME_BASE}-log" "${LOG}" 1 0 \
39+
"\[1/foo/01:submitted\] => running" \
40+
"\[1/foo/01:running\] poll now, (next in PT5S" \
41+
"\[1/foo/01:running\] (polled)failed/XCPU"
42+
7743
purge
78-
exit

tests/flakyfunctional/cylc-poll/16-execution-time-limit/flow.cylc

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,6 @@
1313
[runtime]
1414
[[foo]]
1515
platform = {{ environ['CYLC_TEST_PLATFORM'] }}
16-
init-script = cylc__job__disable_fail_signals ERR EXIT
17-
script = """
18-
cylc__job__wait_cylc_message_started
19-
# give it a while for the started message to get picked up by
20-
# the scheduler
21-
sleep 10
22-
exit 1
23-
"""
24-
[[[job]]]
25-
execution time limit = PT5S
16+
script = sleep 20
17+
execution time limit = PT10S
2618
[[bar]]

tests/integration/test_task_job_mgr.py

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,18 @@
1515
# along with this program. If not, see <http://www.gnu.org/licenses/>.
1616

1717
from contextlib import suppress
18+
import json
1819
import logging
1920
from typing import Any as Fixture
21+
from unittest.mock import Mock
2022

2123
from cylc.flow import CYLC_LOG
24+
from cylc.flow.job_runner_mgr import JOB_FILES_REMOVED_MESSAGE
2225
from cylc.flow.scheduler import Scheduler
23-
from cylc.flow.task_state import TASK_STATUS_RUNNING
26+
from cylc.flow.task_state import (
27+
TASK_STATUS_FAILED,
28+
TASK_STATUS_RUNNING,
29+
)
2430

2531

2632
async def test_run_job_cmd_no_hosts_error(
@@ -229,3 +235,33 @@ async def test_broadcast_platform_change(
229235
assert schd.pool.get_tasks()[0].platform['name'] == 'foo'
230236
# ... and that remote init failed because all hosts bad:
231237
assert log_filter(regex=r"platform: foo .*\(no hosts were reachable\)")
238+
239+
240+
async def test_poll_job_deleted_log_folder(
241+
one_conf, flow, scheduler, start, log_filter
242+
):
243+
"""Capture a task error caused by polling finding the job log dir deleted.
244+
245+
https://github.com/cylc/cylc-flow/issues/6425
246+
"""
247+
response = {
248+
'run_signal': JOB_FILES_REMOVED_MESSAGE,
249+
'run_status': 1,
250+
'job_runner_exit_polled': 1,
251+
}
252+
schd: Scheduler = scheduler(flow(one_conf))
253+
async with start(schd):
254+
itask = schd.pool.get_tasks()[0]
255+
itask.submit_num = 1
256+
job_id = itask.tokens.duplicate(job='01').relative_id
257+
schd.task_job_mgr._poll_task_job_callback(
258+
schd.workflow,
259+
itask,
260+
cmd_ctx=Mock(),
261+
line=f'2025-02-13T12:08:30Z|{job_id}|{json.dumps(response)}',
262+
)
263+
assert itask.state(TASK_STATUS_FAILED)
264+
265+
assert log_filter(
266+
logging.ERROR, f"job log directory {job_id} no longer exists"
267+
)

tests/unit/test_job_runner_mgr.py

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
2+
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
3+
#
4+
# This program is free software: you can redistribute it and/or modify
5+
# it under the terms of the GNU General Public License as published by
6+
# the Free Software Foundation, either version 3 of the License, or
7+
# (at your option) any later version.
8+
#
9+
# This program is distributed in the hope that it will be useful,
10+
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
# GNU General Public License for more details.
13+
#
14+
# You should have received a copy of the GNU General Public License
15+
# along with this program. If not, see <http://www.gnu.org/licenses/>.
16+
17+
from cylc.flow.job_runner_mgr import (
18+
JobRunnerManager, JOB_FILES_REMOVED_MESSAGE)
19+
20+
jrm = JobRunnerManager()
21+
22+
23+
SAMPLE_STATUS = """
24+
ignore me, I have no = sign
25+
CYLC_JOB_RUNNER_NAME=pbs
26+
CYLC_JOB_ID=2361713
27+
CYLC_JOB_RUNNER_SUBMIT_TIME=2025-01-28T14:46:04Z
28+
CYLC_JOB_PID=2361713
29+
CYLC_JOB_INIT_TIME=2025-01-28T14:46:05Z
30+
CYLC_MESSAGE=2025-01-28T14:46:05Z|INFO|sleep 31
31+
CYLC_JOB_RUNNER_EXIT_POLLED=2025-01-28T14:46:08Z
32+
CYLC_JOB_EXIT=SUCCEEDED
33+
CYLC_JOB_EXIT_TIME=2025-01-28T14:46:38Z
34+
"""
35+
36+
37+
def test__job_poll_status_files(tmp_path):
38+
"""Good Path: A valid job.status files exists"""
39+
(tmp_path / 'sub').mkdir()
40+
(tmp_path / 'sub' / 'job.status').write_text(SAMPLE_STATUS)
41+
ctx = jrm._jobs_poll_status_files(str(tmp_path), 'sub')
42+
assert ctx.job_runner_name == 'pbs'
43+
assert ctx.job_id == '2361713'
44+
assert ctx.job_runner_exit_polled == 1
45+
assert ctx.pid == '2361713'
46+
assert ctx.time_submit_exit == '2025-01-28T14:46:04Z'
47+
assert ctx.time_run == '2025-01-28T14:46:05Z'
48+
assert ctx.time_run_exit == '2025-01-28T14:46:38Z'
49+
assert ctx.run_status == 0
50+
assert ctx.messages == ['2025-01-28T14:46:05Z|INFO|sleep 31']
51+
52+
53+
def test__job_poll_status_files_task_failed(tmp_path):
54+
"""Good Path: A valid job.status files exists"""
55+
(tmp_path / 'sub').mkdir()
56+
(tmp_path / 'sub' / 'job.status').write_text("CYLC_JOB_EXIT=FOO")
57+
ctx = jrm._jobs_poll_status_files(str(tmp_path), 'sub')
58+
assert ctx.run_status == 1
59+
assert ctx.run_signal == 'FOO'
60+
61+
62+
def test__job_poll_status_files_deleted_logdir():
63+
"""The log dir has been deleted whilst the task is still active.
64+
Return the context with the message that the task has failed.
65+
"""
66+
ctx = jrm._jobs_poll_status_files('foo', 'bar')
67+
assert ctx.run_signal == JOB_FILES_REMOVED_MESSAGE
68+
assert ctx.run_status == 1
69+
assert ctx.job_runner_exit_polled == 1
70+
71+
72+
def test__job_poll_status_files_ioerror(tmp_path, capsys):
73+
"""There is no readable file.
74+
"""
75+
(tmp_path / 'sub').mkdir()
76+
jrm._jobs_poll_status_files(str(tmp_path), 'sub')
77+
cap = capsys.readouterr()
78+
assert '[Errno 2] No such file or directory' in cap.err

0 commit comments

Comments
 (0)