Skip to content

Commit b64fcd6

Browse files
hjoliveroliver-sandersMetRonniewxtim
authored
Simplify and fix runahead computation. (#5893)
## Simplify and fix runahead computation. * In back-compat mode the cycle point time zone is assumed to be local, whereas in normal mode it is assumed to be UTC. There was contamination of the point parse caching where the time zone would carry over from tests of back-compat vs normal mode ### Testing * New runhead integration test. * Reload: test reloading doesn't nudge the runahead limit * We were using the pytest-env plugin to run the tests in a non-UTC time zone: The pytest-env plugin doesn't work with pytest-xdist so this was being ignored. * Add both runahead formats to existing tests for compute runahead. Add compat mode and not compat mode versions of the future triggers bug test. * Wrote a test to check for changes of runahead limit based on changing task statuses in compat mode. --------- Co-authored-by: Oliver Sanders <[email protected]> Co-authored-by: Ronnie Dutta <[email protected]> Co-authored-by: Tim Pillinger <[email protected]>
1 parent dd41648 commit b64fcd6

File tree

9 files changed

+286
-185
lines changed

9 files changed

+286
-185
lines changed

.github/workflows/test_fast.yml

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,17 @@ jobs:
2020
fail-fast: false # Don't let a failed MacOS run stop the Ubuntu runs
2121
matrix:
2222
os: ['ubuntu-latest']
23-
python-version: ['3.7', '3.8', '3.9', '3.10', '3.11']
23+
python-version: ['3.7', '3.8', '3.10', '3.11']
2424
include:
25+
# mac os test
2526
- os: 'macos-11'
26-
python-version: '3.7'
27+
python-version: '3.7' # oldest supported version
28+
# non-utc timezone test
29+
- os: 'ubuntu-latest'
30+
python-version: '3.9' # not the oldest, not the most recent version
31+
time-zone: 'XXX-09:35'
2732
env:
33+
TZ: ${{ matrix.time-zone }}
2834
PYTEST_ADDOPTS: --cov --cov-append -n 5 --color=yes
2935
steps:
3036
- name: Checkout

changes.d/5893.fix

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fixed bug in computing a time interval-based runahead limit when future triggers are present.

cylc/flow/cycling/__init__.py

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -415,22 +415,6 @@ def get_stop_point(self):
415415
"""Return the last point of this sequence, or None if unbounded."""
416416
pass
417417

418-
def get_first_n_points(self, n, point=None):
419-
"""Return a list of first n points of this sequence."""
420-
if point is None:
421-
p1 = self.get_start_point()
422-
else:
423-
p1 = self.get_first_point(point)
424-
if p1 is None:
425-
return []
426-
result = [p1]
427-
for _ in range(1, n):
428-
p1 = self.get_next_point_on_sequence(p1)
429-
if p1 is None:
430-
break
431-
result.append(p1)
432-
return result
433-
434418
@abstractmethod
435419
def __eq__(self, other) -> bool:
436420
# Return True if other (sequence) is equal to self.

cylc/flow/cycling/iso8601.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -943,12 +943,22 @@ def _interval_parse(interval_string):
943943

944944
def point_parse(point_string: str) -> 'TimePoint':
945945
"""Parse a point_string into a proper TimePoint object."""
946-
return _point_parse(point_string, WorkflowSpecifics.DUMP_FORMAT)
946+
return _point_parse(
947+
point_string,
948+
WorkflowSpecifics.DUMP_FORMAT,
949+
WorkflowSpecifics.ASSUMED_TIME_ZONE
950+
)
947951

948952

949953
@lru_cache(10000)
950-
def _point_parse(point_string, _dump_fmt):
951-
"""Parse a point_string into a proper TimePoint object."""
954+
def _point_parse(point_string: str, _dump_fmt, _tz) -> 'TimePoint':
955+
"""Parse a point_string into a proper TimePoint object.
956+
957+
Args:
958+
point_string: The string to parse.
959+
_dump_fmt: Dump format (only used to avoid invalid cache hits).
960+
_tz: Cycle point time zone (only used to avoid invalid cache hits).
961+
"""
952962
if "%" in WorkflowSpecifics.DUMP_FORMAT:
953963
# May be a custom not-quite ISO 8601 dump format.
954964
with contextlib.suppress(IsodatetimeError):

cylc/flow/task_pool.py

Lines changed: 45 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -309,82 +309,48 @@ def compute_runahead(self, force=False) -> bool:
309309
310310
With force=True we recompute the limit even if the base point has not
311311
changed (needed if max_future_offset changed, or on reload).
312-
"""
313312
313+
"""
314314
limit = self.config.runahead_limit # e.g. P2 or P2D
315315
count_cycles = False
316316
with suppress(TypeError):
317317
# Count cycles (integer cycling, and optional for datetime too).
318318
ilimit = int(limit) # type: ignore
319319
count_cycles = True
320320

321-
base_point: 'PointBase'
322-
points: List['PointBase'] = []
321+
base_point: Optional['PointBase'] = None
323322

323+
# First get the runahead base point.
324324
if not self.main_pool:
325-
# No tasks yet, just consider sequence points.
326-
if count_cycles:
327-
# Get the first ilimit points in each sequence.
328-
# (After workflow start point - sequence may begin earlier).
329-
points = [
330-
point
331-
for plist in [
332-
seq.get_first_n_points(
333-
ilimit, self.config.start_point)
334-
for seq in self.config.sequences
335-
]
336-
for point in plist
337-
]
338-
# Drop points beyond the limit.
339-
points = sorted(points)[:ilimit + 1]
340-
base_point = min(points)
341-
342-
else:
343-
# Start at first point in each sequence.
344-
# (After workflow start point - sequence may begin earlier).
345-
points = [
346-
point
347-
for point in {
348-
seq.get_first_point(self.config.start_point)
349-
for seq in self.config.sequences
350-
}
351-
if point is not None
352-
]
353-
base_point = min(points)
354-
# Drop points beyond the limit.
355-
points = [
356-
point
357-
for point in points
358-
if point <= base_point + limit
359-
]
360-
325+
# Find the earliest sequence point beyond the workflow start point.
326+
base_point = min(
327+
point
328+
for point in {
329+
seq.get_first_point(self.config.start_point)
330+
for seq in self.config.sequences
331+
}
332+
if point is not None
333+
)
361334
else:
362-
# Find the earliest point with unfinished tasks.
335+
# Find the earliest point with incomplete tasks.
363336
for point, itasks in sorted(self.get_tasks_by_point().items()):
337+
# All n=0 tasks are incomplete by definition, but Cylc 7
338+
# ignores failed ones (it does not ignore submit-failed!).
364339
if (
365-
points # got the limit already so this point too
366-
or any(
367-
not itask.state(
368-
TASK_STATUS_FAILED,
369-
TASK_STATUS_SUCCEEDED,
370-
TASK_STATUS_EXPIRED
371-
)
372-
or (
373-
# For Cylc 7 back-compat, ignore incomplete tasks.
374-
# (Success is required in back-compat mode, so
375-
# failedtasks end up as incomplete; and Cylc 7
376-
# ignores failed tasks in computing the limit).
377-
itask.state.outputs.is_incomplete()
378-
and not cylc.flow.flags.cylc7_back_compat
379-
)
340+
cylc.flow.flags.cylc7_back_compat and
341+
all(
342+
itask.state(TASK_STATUS_FAILED)
380343
for itask in itasks
381344
)
382345
):
383-
points.append(point)
346+
continue
347+
base_point = point
348+
break
384349

385-
if not points:
386-
return False
387-
base_point = min(points)
350+
if base_point is None:
351+
return False
352+
353+
LOG.debug(f"Runahead: base point {base_point}")
388354

389355
if self._prev_runahead_base_point is None:
390356
self._prev_runahead_base_point = base_point
@@ -401,8 +367,10 @@ def compute_runahead(self, force=False) -> bool:
401367
# change or the runahead limit is already at stop point.
402368
return False
403369

404-
# Get all cycle points possible after the base point.
405-
sequence_points: Set['PointBase']
370+
# Now generate all possible cycle points from the base point and stop
371+
# at the runahead limit point. Note both cycle count and time interval
372+
# limits involve all possible cycles, not just active cycles.
373+
sequence_points: Set['PointBase'] = set()
406374
if (
407375
not force
408376
and self._prev_runahead_sequence_points
@@ -412,44 +380,48 @@ def compute_runahead(self, force=False) -> bool:
412380
sequence_points = self._prev_runahead_sequence_points
413381
else:
414382
# Recompute possible points.
415-
sequence_points = set()
416383
for sequence in self.config.sequences:
417-
seq_point = sequence.get_next_point(base_point)
384+
seq_point = sequence.get_first_point(base_point)
418385
count = 1
419386
while seq_point is not None:
420387
if count_cycles:
421388
# P0 allows only the base cycle point to run.
422389
if count > 1 + ilimit:
390+
# this point may be beyond the runahead limit
423391
break
424392
else:
425393
# PT0H allows only the base cycle point to run.
426394
if seq_point > base_point + limit:
395+
# this point can not be beyond the runahead limit
427396
break
428397
count += 1
429398
sequence_points.add(seq_point)
430399
seq_point = sequence.get_next_point(seq_point)
431400
self._prev_runahead_sequence_points = sequence_points
432401
self._prev_runahead_base_point = base_point
433402

434-
points = set(points).union(sequence_points)
435-
436403
if count_cycles:
437-
# Some sequences may have different intervals.
438-
limit_point = sorted(points)[:(ilimit + 1)][-1]
404+
# (len(list) may be less than ilimit due to sequence end)
405+
limit_point = sorted(sequence_points)[:ilimit + 1][-1]
439406
else:
440-
# We already stopped at the runahead limit.
441-
limit_point = sorted(points)[-1]
407+
limit_point = max(sequence_points)
442408

443-
# Adjust for future offset and stop point, if necessary.
409+
# Adjust for future offset and stop point.
444410
pre_adj_limit = limit_point
445411
if self.max_future_offset is not None:
446412
limit_point += self.max_future_offset
447-
LOG.debug(f"{pre_adj_limit} -> {limit_point} (future offset)")
413+
LOG.debug(
414+
"Runahead (future trigger adjust):"
415+
f" {pre_adj_limit} -> {limit_point}"
416+
)
448417
if self.stop_point and limit_point > self.stop_point:
449418
limit_point = self.stop_point
450-
LOG.debug(f"{pre_adj_limit} -> {limit_point} (stop point)")
451-
LOG.debug(f"Runahead limit: {limit_point}")
419+
LOG.debug(
420+
"Runahead (stop point adjust):"
421+
f" {pre_adj_limit} -> {limit_point} (stop point)"
422+
)
452423

424+
LOG.debug(f"Runahead limit: {limit_point}")
453425
self.runahead_limit_point = limit_point
454426
return True
455427

pytest.ini

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,10 @@ testpaths =
3232
cylc/flow/
3333
tests/unit/
3434
tests/integration/
35-
env =
36-
# a weird timezone to check that tests aren't assuming the local timezone
37-
TZ=XXX-09:35
3835
doctest_optionflags =
3936
NORMALIZE_WHITESPACE
4037
IGNORE_EXCEPTION_DETAIL
4138
ELLIPSIS
4239
asyncio_mode = auto
4340
markers=
44-
linkcheck: Test links
41+
linkcheck: Test links

setup.cfg

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,6 @@ tests =
119119
pytest-asyncio>=0.17,!=0.23.*
120120
pytest-cov>=2.8.0
121121
pytest-xdist>=2
122-
pytest-env>=0.6.2
123122
pytest>=6
124123
testfixtures>=6.11.0
125124
towncrier>=23

0 commit comments

Comments
 (0)