Skip to content

Commit 3d5bdc9

Browse files
committed
Fix TypeError when tokens lacks a job number
1 parent 7f9e10b commit 3d5bdc9

File tree

7 files changed

+43
-30
lines changed

7 files changed

+43
-30
lines changed

cylc/flow/id.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -366,6 +366,20 @@ def is_null(self) -> bool:
366366
self[key] for key in self._REGULAR_KEYS
367367
)
368368

369+
@property
370+
def submit_num(self) -> int | None:
371+
"""The job submit number as an integer, or None if not set.
372+
373+
Examples:
374+
>>> Tokens('//c/t/01').submit_num
375+
1
376+
>>> Tokens('//c/t').submit_num is None
377+
True
378+
"""
379+
if self['job'] is None:
380+
return None
381+
return int(self['job'])
382+
369383
@overload
370384
def duplicate(
371385
self,

cylc/flow/network/resolvers.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
Optional,
3333
Tuple,
3434
TYPE_CHECKING,
35-
Union,
3635
cast,
3736
)
3837
from uuid import uuid4
@@ -69,9 +68,9 @@
6968

7069
class TaskMsg(NamedTuple):
7170
"""Tuple for Scheduler.message_queue"""
72-
job_id: Union[Tokens, str]
71+
job_id: Tokens
7372
event_time: str
74-
severity: Union[str, int]
73+
severity: str | int
7574
message: str
7675

7776

@@ -839,7 +838,12 @@ def put_messages(
839838
"""
840839
for severity, message in messages:
841840
self.schd.message_queue.put(
842-
TaskMsg(task_job, event_time, severity, message)
841+
TaskMsg(
842+
Tokens(task_job, relative=True),
843+
event_time,
844+
severity,
845+
message,
846+
)
843847
)
844848
return (True, f'Messages queued: {len(messages)}')
845849

cylc/flow/scheduler.py

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -964,7 +964,7 @@ def process_queued_task_messages(self) -> None:
964964
"""Process incoming task messages for each task proxy.
965965
966966
"""
967-
messages: 'Dict[str, List[Tuple[Optional[int], TaskMsg]]]' = {}
967+
messages: dict[str, list[TaskMsg]] = {}
968968

969969
# Retrieve queued messages
970970
while self.message_queue.qsize():
@@ -973,29 +973,23 @@ def process_queued_task_messages(self) -> None:
973973
except Empty:
974974
break
975975
self.message_queue.task_done()
976-
tokens = Tokens(task_msg.job_id, relative=True)
977976
# task ID (job stripped)
978-
task_id = tokens.duplicate(job=None).relative_id
979-
messages.setdefault(task_id, [])
980-
# job may be None (e.g. simulation mode)
981-
job = int(tokens['job']) if tokens['job'] else None
982-
messages[task_id].append(
983-
(job, task_msg)
984-
)
977+
task_id = task_msg.job_id.duplicate(job=None).relative_id
978+
messages.setdefault(task_id, []).append(task_msg)
985979

986980
unprocessed_messages: List[TaskMsg] = []
987981
# Poll tasks for which messages caused a backward state change.
988982
to_poll_tasks: List[TaskProxy] = []
989983
for task_id, message_items in messages.items():
990984
itask = self.pool._get_task_by_id(task_id)
991985
if itask is None:
992-
unprocessed_messages.extend(tm for _, tm in message_items)
986+
unprocessed_messages.extend(message_items)
993987
continue
994988
should_poll = False
995-
for submit_num, tm in message_items:
989+
for tm in message_items:
996990
if self.task_events_mgr.process_message(
997991
itask, tm.severity, tm.message, tm.event_time,
998-
self.task_events_mgr.FLAG_RECEIVED, submit_num
992+
self.task_events_mgr.FLAG_RECEIVED, tm.job_id.submit_num
999993
):
1000994
should_poll = True
1001995
if should_poll:
@@ -1008,9 +1002,7 @@ def process_queued_task_messages(self) -> None:
10081002
# proxy can be removed, but the orphaned job still sends messages.
10091003
warn = ""
10101004
for tm in unprocessed_messages:
1011-
job_tokens = self.tokens.duplicate(
1012-
Tokens(tm.job_id, relative=True)
1013-
)
1005+
job_tokens = self.tokens.duplicate(tm.job_id)
10141006
tdef = self.config.get_taskdef(job_tokens['task'])
10151007
if not self.task_events_mgr.process_job_message(
10161008
job_tokens, tdef, tm.message, tm.event_time

cylc/flow/task_events_mgr.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -647,7 +647,7 @@ def process_events(self, schd: 'Scheduler') -> None:
647647
SubProcContext(
648648
(
649649
(id_key.handler, id_key.event),
650-
int(id_key.tokens['job']),
650+
id_key.tokens.submit_num,
651651
),
652652
timer.ctx.cmd,
653653
env=os.environ,
@@ -994,7 +994,7 @@ def process_job_message(
994994
job_tokens.workflow,
995995
tdef,
996996
get_point(job_tokens['cycle']),
997-
submit_num=int(job_tokens['job']),
997+
submit_num=job_tokens.submit_num,
998998
data_mode=True,
999999
transient=True,
10001000
)
@@ -1286,7 +1286,7 @@ def _job_logs_retrieval_callback(self, proc_ctx, schd) -> None:
12861286
log_ctx = SubProcContext(
12871287
(
12881288
(id_key.handler, id_key.event),
1289-
int(id_key.tokens['job'])
1289+
id_key.tokens.submit_num
12901290
),
12911291
None,
12921292
)

cylc/flow/task_proxy.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ def __init__(
222222
flow_nums: Optional['FlowNums'] = None,
223223
status: str = TASK_STATUS_WAITING,
224224
is_held: bool = False,
225-
submit_num: int = 0,
225+
submit_num: int | None = 0,
226226
is_late: bool = False,
227227
is_manual_submit: bool = False,
228228
flow_wait: bool = False,

tests/integration/test_optional_outputs.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
)
3333
from cylc.flow.cycling.integer import IntegerPoint
3434
from cylc.flow.cycling.iso8601 import ISO8601Point
35-
from cylc.flow.id import TaskTokens
35+
from cylc.flow.id import TaskTokens, Tokens
3636
from cylc.flow.network.resolvers import TaskMsg
3737
from cylc.flow.task_events_mgr import (
3838
TaskEventsManager,
@@ -208,7 +208,7 @@ async def test_expire_orthogonality(flow, scheduler, start):
208208
# tell the scheduler that the task *submit-failed*
209209
schd.message_queue.put(
210210
TaskMsg(
211-
'1/a/01',
211+
Tokens('//1/a/01'),
212212
'2000-01-01T00:00:00+00',
213213
'INFO',
214214
TaskEventsManager.EVENT_SUBMIT_FAILED
@@ -222,7 +222,7 @@ async def test_expire_orthogonality(flow, scheduler, start):
222222
# tell the scheduler that the task *failed*
223223
schd.message_queue.put(
224224
TaskMsg(
225-
'1/a/01',
225+
Tokens('//1/a/01'),
226226
'2000-01-01T00:00:00+00',
227227
'INFO',
228228
TaskEventsManager.EVENT_FAILED,
@@ -236,7 +236,7 @@ async def test_expire_orthogonality(flow, scheduler, start):
236236
# tell the scheduler that the task *expired*
237237
schd.message_queue.put(
238238
TaskMsg(
239-
'1/a/01',
239+
Tokens('//1/a/01'),
240240
'2000-01-01T00:00:00+00',
241241
'INFO',
242242
TaskEventsManager.EVENT_EXPIRED,

tests/integration/test_task_events_mgr.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -275,20 +275,23 @@ async def test__process_message_failed_with_retry(
275275
assert 'failed/OOK' in failed_record[1]
276276

277277

278-
async def test__unhandled_message(one: Scheduler, start, log_filter):
278+
@pytest.mark.parametrize('id_', ['1/no_such_task/01', '1/no_job'])
279+
async def test__unhandled_message(id_, one: Scheduler, start, log_filter):
279280
"""It should log unhandled messages."""
280281

281282
async with start(one):
282283
one.message_queue.put(
283-
TaskMsg("1/no_such_task/01", "time", 'INFO', "the quick brown")
284+
TaskMsg(
285+
Tokens(id_, relative=True), "time", 'INFO', "the quick brown"
286+
)
284287
)
285288
one.process_queued_task_messages()
286289

287290
_, warning_msg = log_filter(level=logging.WARNING)[-1]
288291
assert (
289292
'Undeliverable task messages received and ignored:' in warning_msg
290293
)
291-
assert '1/no_such_task/01: INFO - "the quick brown"' in warning_msg
294+
assert f'{id_}: INFO - "the quick brown"' in warning_msg
292295

293296

294297
@pytest.mark.parametrize('template', TEMPLATES)

0 commit comments

Comments
 (0)