Skip to content

Commit a3b27b3

Browse files
broadcast: reduce the broadcast expire limit (#6964)
* broadcast: reduce the broadcast expire limit * Partially addresses #6308 * Broadcasts for a cycle are expired when there are no more active tasks in the cycle. * At Cylc 7, there was (almost) always a succeeded task in the pool one cycle behind each active task. * So the broadcast expire limit was the oldest cycle with active tasks *minus* the duration of the longest cycling sequence. * This commit restores the Cylc 7 broadcast expire limit which ensures that the broadcasts for the previous instance of any given task are preserved. * This better satisfies the use case of winding back a workflow to the previous cycle (i.e. to retrigger a task, reflow from a task or warm restart). --------- Co-authored-by: Tim Pillinger <[email protected]>
1 parent 7a58152 commit a3b27b3

File tree

9 files changed

+252
-27
lines changed

9 files changed

+252
-27
lines changed

changes.d/6964.feat.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
Automatic broadcast expiry is now delayed to make it easier to re-trigger
2+
tasks from the previous cycle.

cylc/flow/config.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2375,6 +2375,23 @@ def load_graph(self):
23752375
# dependencies are checked in generate_triggers:
23762376
self.check_terminal_outputs(parser.terminals)
23772377

2378+
# set of all cycling intervals containined within the workflow
2379+
cycling_intervals = {
2380+
sequence.get_interval()
2381+
for sequence in self.sequences
2382+
} | {
2383+
# add a null interval to handle async workflows
2384+
get_interval_cls().get_null()
2385+
}
2386+
2387+
# determine the longest cycling interval in the workflow
2388+
self.interval_of_longest_sequence: 'IntervalBase' = max({
2389+
interval for interval in cycling_intervals
2390+
# NOTE: None type sorts above a null interval for strange
2391+
# historical reasons so must be filtered out
2392+
if interval is not None
2393+
})
2394+
23782395
self.set_required_outputs(task_output_opt)
23792396

23802397
# Detect use of xtrigger names with '@' prefix (creates a task).

cylc/flow/cycling/iso8601.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -406,7 +406,11 @@ def __init__(
406406
exclusion_start_point,
407407
exclusion_end_point)
408408

409-
self.step = ISO8601Interval(str(self.recurrence.duration))
409+
self.step = (
410+
ISO8601Interval(str(self.recurrence.duration))
411+
if self.recurrence.duration
412+
else ISO8601Interval.get_null()
413+
)
410414
self.value = str(self.recurrence)
411415
# Concatenate the strings in exclusion list
412416
if self.exclusions:

cylc/flow/scheduler.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
)
5959
from uuid import uuid4
6060

61+
from metomi.isodatetime.exceptions import TimePointDumperBoundsError
6162
import psutil
6263

6364
from cylc.flow import (
@@ -109,8 +110,8 @@
109110
from cylc.flow.network import API
110111
from cylc.flow.network.authentication import key_housekeeping
111112
from cylc.flow.network.server import WorkflowRuntimeServer
112-
from cylc.flow.parsec.OrderedDict import DictTree
113113
from cylc.flow.parsec.exceptions import ParsecError
114+
from cylc.flow.parsec.OrderedDict import DictTree
114115
from cylc.flow.parsec.validate import DurationFloat
115116
from cylc.flow.pathutil import (
116117
get_workflow_name_from_id,
@@ -1680,7 +1681,18 @@ async def _main_loop(self) -> None:
16801681
# A simulated task state change occurred.
16811682
self.reset_inactivity_timer()
16821683

1683-
self.broadcast_mgr.expire_broadcast(self.pool.get_min_point())
1684+
# auto expire broadcasts
1685+
with suppress(TimePointDumperBoundsError):
1686+
# NOTE: TimePointDumperBoundsError will be raised for negative
1687+
# cycle points, we skip broadcast expiry in this circumstance
1688+
# (pre-initial condition)
1689+
if min_point := self.pool.get_min_point():
1690+
# NOTE: the broadcast expire limit is the oldest active cycle
1691+
# MINUS the longest cycling interval
1692+
self.broadcast_mgr.expire_broadcast(
1693+
min_point - self.config.interval_of_longest_sequence
1694+
)
1695+
16841696
self.late_tasks_check()
16851697

16861698
self.process_queued_task_messages()

pytest.ini

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ testpaths =
3232
cylc/flow/
3333
tests/unit/
3434
tests/integration/
35+
tests/conftest.py
3536
doctest_optionflags =
3637
NORMALIZE_WHITESPACE
3738
ELLIPSIS

tests/conftest.py

Lines changed: 74 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
import re
1919
from pathlib import Path
2020
from shutil import rmtree
21-
from typing import List, Optional, Tuple
21+
from typing import Callable, List, Optional, Tuple
2222

2323
import pytest
2424

@@ -175,9 +175,11 @@ def capcall(monkeypatch):
175175
function_string:
176176
The function to replace as it would be specified to
177177
monkeypatch.setattr.
178-
substitute_function:
179-
An optional function to replace it with, otherwise the captured
180-
function will return None.
178+
mock:
179+
* If True, the function will be replaced by a "return None".
180+
* If False, the original function will be run.
181+
* If a Callable is provided, this will be run in place of the
182+
original function.
181183
182184
Returns:
183185
[(args: Tuple, kwargs: Dict), ...]
@@ -190,15 +192,80 @@ def test_something(capcall):
190192
191193
"""
192194

193-
def _capcall(function_string, substitute_function=None):
195+
def _capcall(function_string: str, mock: bool | Callable = True):
194196
calls = []
195197

198+
if mock is True:
199+
fcn = lambda *args, **kwargs: None
200+
elif mock is False:
201+
fcn = import_object_from_string(function_string)
202+
else:
203+
fcn = mock
204+
196205
def _call(*args, **kwargs):
197206
calls.append((args, kwargs))
198-
if substitute_function:
199-
return substitute_function(*args, **kwargs)
207+
return fcn(*args, **kwargs)
200208

201209
monkeypatch.setattr(function_string, _call)
202210
return calls
203211

204212
return _capcall
213+
214+
215+
def import_object_from_string(string):
216+
"""Import a Python object from a string path.
217+
218+
The path may reference a module, function, class, method, whatever.
219+
220+
Examples:
221+
# import a module
222+
>>> import_object_from_string('os')
223+
<module 'os' ...>
224+
225+
# import a function
226+
>>> import_object_from_string('os.path.walk')
227+
<function walk at ...>
228+
229+
# import a constant from a namespace package
230+
>>> import_object_from_string('cylc.flow.LOG')
231+
<Logger cylc (WARNING)>
232+
233+
# import a class
234+
>>> import_object_from_string('pathlib.Path')
235+
<class 'pathlib.Path'>
236+
237+
# import a method
238+
>>> import_object_from_string('pathlib.Path.exists')
239+
<function Path.exists ...>
240+
241+
"""
242+
head = string
243+
tail = []
244+
while True:
245+
try:
246+
# try and import the thing
247+
module = __import__(head)
248+
except ModuleNotFoundError:
249+
# if it's not something we can import, lop the last item off the
250+
# end of the string and repeat
251+
if '.' in head:
252+
head, _tail = head.rsplit('.', 1)
253+
tail.append(_tail)
254+
else:
255+
# we definitely can't import this
256+
raise
257+
else:
258+
# we managed to import something
259+
if '(namespace)' in str(module):
260+
# with namespace packages you have to pull the module out of
261+
# the package yourself
262+
for part in head.split('.')[1:]:
263+
module = getattr(module, part)
264+
break
265+
266+
# extract the requested object from the module (if requested)
267+
obj = module
268+
for part in reversed(tail):
269+
obj = getattr(obj, part)
270+
271+
return obj

tests/flakyfunctional/xtriggers/01-workflow_state.t

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -68,14 +68,14 @@ __END__
6868
# set' ('+') and later '... INFO - Broadcast cancelled:' ('-') line, where we
6969
# use as a test case an arbitrary task where such setting & cancellation occurs:
7070
contains_ok "${WORKFLOW_LOG}" << __LOG_BROADCASTS__
71-
${LOG_INDENT}+ [2015/f1] [environment]upstream_workflow=${WORKFLOW_NAME_UPSTREAM}
72-
${LOG_INDENT}+ [2015/f1] [environment]upstream_task=foo
73-
${LOG_INDENT}+ [2015/f1] [environment]upstream_point=2015
74-
${LOG_INDENT}+ [2015/f1] [environment]upstream_trigger=data_ready
75-
${LOG_INDENT}- [2015/f1] [environment]upstream_workflow=${WORKFLOW_NAME_UPSTREAM}
76-
${LOG_INDENT}- [2015/f1] [environment]upstream_task=foo
77-
${LOG_INDENT}- [2015/f1] [environment]upstream_point=2015
78-
${LOG_INDENT}- [2015/f1] [environment]upstream_trigger=data_ready
71+
${LOG_INDENT}+ [2014/f1] [environment]upstream_workflow=${WORKFLOW_NAME_UPSTREAM}
72+
${LOG_INDENT}+ [2014/f1] [environment]upstream_task=foo
73+
${LOG_INDENT}+ [2014/f1] [environment]upstream_point=2014
74+
${LOG_INDENT}+ [2014/f1] [environment]upstream_trigger=data_ready
75+
${LOG_INDENT}- [2014/f1] [environment]upstream_workflow=${WORKFLOW_NAME_UPSTREAM}
76+
${LOG_INDENT}- [2014/f1] [environment]upstream_task=foo
77+
${LOG_INDENT}- [2014/f1] [environment]upstream_point=2014
78+
${LOG_INDENT}- [2014/f1] [environment]upstream_trigger=data_ready
7979
__LOG_BROADCASTS__
8080
# ... and 2) in the DB.
8181
TEST_NAME="${TEST_NAME_BASE}-check-broadcast-in-db"
@@ -88,14 +88,14 @@ sqlite3 "${DB_FILE}" \
8888
'SELECT change, point, namespace, key, value FROM broadcast_events
8989
ORDER BY time, change, point, namespace, key' >"${NAME}"
9090
contains_ok "${NAME}" << __DB_BROADCASTS__
91-
+|2015|f1|[environment]upstream_workflow|${WORKFLOW_NAME_UPSTREAM}
92-
+|2015|f1|[environment]upstream_task|foo
93-
+|2015|f1|[environment]upstream_point|2015
94-
+|2015|f1|[environment]upstream_trigger|data_ready
95-
-|2015|f1|[environment]upstream_workflow|${WORKFLOW_NAME_UPSTREAM}
96-
-|2015|f1|[environment]upstream_task|foo
97-
-|2015|f1|[environment]upstream_point|2015
98-
-|2015|f1|[environment]upstream_trigger|data_ready
91+
+|2014|f1|[environment]upstream_workflow|${WORKFLOW_NAME_UPSTREAM}
92+
+|2014|f1|[environment]upstream_task|foo
93+
+|2014|f1|[environment]upstream_point|2014
94+
+|2014|f1|[environment]upstream_trigger|data_ready
95+
-|2014|f1|[environment]upstream_workflow|${WORKFLOW_NAME_UPSTREAM}
96+
-|2014|f1|[environment]upstream_task|foo
97+
-|2014|f1|[environment]upstream_point|2014
98+
-|2014|f1|[environment]upstream_trigger|data_ready
9999
__DB_BROADCASTS__
100100

101101
purge

tests/functional/broadcast/00-simple.t

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@ cmp_ok "${NAME}" <<'__SELECT__'
4747
+|*|m7|[environment]BCAST|M7
4848
+|*|m8|[environment]BCAST|M8
4949
+|*|m9|[environment]BCAST|M9
50-
-|20100808T00|foo|[environment]BCAST|FOO
5150
__SELECT__
5251

5352
NAME='select-broadcast-states.out'
@@ -62,6 +61,7 @@ cmp_ok "${NAME}" <<'__SELECT__'
6261
*|m8|[environment]BCAST|M8
6362
*|m9|[environment]BCAST|M9
6463
*|root|[environment]BCAST|ROOT
64+
20100808T00|foo|[environment]BCAST|FOO
6565
20100809T00|baz|[environment]BCAST|BAZ
6666
20100809T00|m2|[environment]BCAST|M2
6767
__SELECT__

tests/integration/test_broadcast_mgr.py

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,11 @@
1717
"""Tests for Broadcast Manager."""
1818

1919

20+
import pytest
21+
from cylc.flow.cycling.integer import IntegerInterval, IntegerPoint
22+
from cylc.flow.cycling.iso8601 import ISO8601Interval, ISO8601Point
23+
24+
2025
async def test_reject_valid_broadcast_is_remote_clash_with_config(
2126
one_conf, flow, start, scheduler, log_filter
2227
):
@@ -90,3 +95,120 @@ async def test_reject_valid_broadcast_is_remote_clash_with_broadcast(
9095
{'platform': 'foo'},
9196
]
9297
}
98+
99+
100+
@pytest.mark.parametrize('cycling_mode', ('integer', 'gregorian', '360_day'))
101+
async def test_broadcast_expire_limit(
102+
cycling_mode,
103+
flow,
104+
scheduler,
105+
run,
106+
complete,
107+
capcall,
108+
):
109+
"""Test automatic broadcast expiry.
110+
111+
To prevent broadcasts from piling up and causing a memory leak, we expire
112+
(aka clear) them.
113+
114+
The broadcast expiry limit is the oldest active cycle MINUS the longest
115+
cycling sequence.
116+
117+
See https://github.com/cylc/cylc-flow/pull/6964
118+
"""
119+
# capture broadcast expiry calls
120+
_expires = capcall('cylc.flow.broadcast_mgr.BroadcastMgr.expire_broadcast')
121+
122+
def expires():
123+
"""Return a list of the cycle limit expired since the last call."""
124+
ret = [x[0][1] for x in _expires]
125+
_expires.clear()
126+
return ret
127+
128+
def cycle(number):
129+
"""Return a cycle point object in the relevant format."""
130+
if cycling_mode == 'integer':
131+
return IntegerPoint(str(number))
132+
else:
133+
return ISO8601Point(f'000{number}')
134+
135+
def interval(number):
136+
"""Return an integer object in the relevant format."""
137+
if cycling_mode == 'integer':
138+
return IntegerInterval(sequence(number))
139+
else:
140+
return ISO8601Interval(sequence(number))
141+
142+
def sequence(number):
143+
"""Return a sequence string in the relevant format."""
144+
if cycling_mode == 'integer':
145+
return f'P{number}'
146+
else:
147+
return f'P{number}Y'
148+
149+
# a workflow with a sequential task
150+
id_ = flow({
151+
'scheduler': {
152+
'cycle point format': 'CCYY'
153+
} if cycling_mode != 'integer' else {},
154+
155+
'scheduling': {
156+
'cycling mode': cycling_mode,
157+
'initial cycle point': cycle(1),
158+
'graph': {
159+
# the sequence with the sequential task
160+
sequence(1): f'a[-{sequence(1)}] => a',
161+
# a longer sequence to make the offset more interesting
162+
sequence(3): 'a',
163+
}
164+
}
165+
})
166+
schd = scheduler(id_, paused_start=False)
167+
168+
async with run(schd):
169+
# the longest cycling sequence has a step of "3"
170+
assert schd.config.interval_of_longest_sequence == interval(3)
171+
172+
# no broadcast expires should happen on startup
173+
assert expires() == []
174+
175+
# when a cycle closes, auto broadcast expiry should happen
176+
# NOTE: datetimes cannot be negative, so this expiry will be skipped
177+
# for datetimetime cycling workflows
178+
await complete(schd, f'{cycle(1)}/a')
179+
assert expires() in ([], [cycle(-1)])
180+
181+
await complete(schd, f'{cycle(2)}/a')
182+
assert expires() == [cycle(0)]
183+
184+
await complete(schd, f'{cycle(3)}/a')
185+
assert expires() == [cycle(1)]
186+
187+
188+
async def test_broadcast_expiry_async(
189+
one_conf, flow, scheduler, run, complete, capcall
190+
):
191+
"""Test auto broadcast expiry with async workflows.
192+
193+
Auto broadcast expiry should not happen in async workflows as there is only
194+
one cycle so it doesn't make sense.
195+
196+
See https://github.com/cylc/cylc-flow/pull/6964
197+
"""
198+
# capture broadcast expiry calls
199+
expires = capcall('cylc.flow.broadcast_mgr.BroadcastMgr.expire_broadcast')
200+
201+
id_ = flow(one_conf)
202+
schd = scheduler(id_, paused_start=False)
203+
204+
async with run(schd):
205+
# this is an async workflow so the longest cycling interval is a
206+
# null interval
207+
assert (
208+
schd.config.interval_of_longest_sequence
209+
== IntegerInterval.get_null()
210+
)
211+
await complete(schd)
212+
213+
# no auto-expiry should take place
214+
assert expires == []

0 commit comments

Comments
 (0)