Skip to content

Commit a7cf51b

Browse files
Merge pull request #5910 from cylc/8.2.x-sync
🤖 Merge 8.2.x-sync into master
2 parents df350cd + 8002595 commit a7cf51b

File tree

7 files changed

+277
-179
lines changed

7 files changed

+277
-179
lines changed

.github/workflows/test_fast.yml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ jobs:
3232
time-zone: 'XXX-09:35'
3333

3434
env:
35-
# Use non-UTC time zone
3635
TZ: ${{ matrix.time-zone }}
3736
PYTEST_ADDOPTS: --cov --cov-append -n 5 --color=yes
3837

changes.d/5893.fix

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fixed bug in computing a time interval-based runahead limit when future triggers are present.

cylc/flow/cycling/__init__.py

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -415,22 +415,6 @@ def get_stop_point(self):
415415
"""Return the last point of this sequence, or None if unbounded."""
416416
pass
417417

418-
def get_first_n_points(self, n, point=None):
419-
"""Return a list of first n points of this sequence."""
420-
if point is None:
421-
p1 = self.get_start_point()
422-
else:
423-
p1 = self.get_first_point(point)
424-
if p1 is None:
425-
return []
426-
result = [p1]
427-
for _ in range(1, n):
428-
p1 = self.get_next_point_on_sequence(p1)
429-
if p1 is None:
430-
break
431-
result.append(p1)
432-
return result
433-
434418
@abstractmethod
435419
def __eq__(self, other) -> bool:
436420
# Return True if other (sequence) is equal to self.

cylc/flow/cycling/iso8601.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -947,12 +947,22 @@ def _interval_parse(interval_string):
947947

948948
def point_parse(point_string: str) -> 'TimePoint':
949949
"""Parse a point_string into a proper TimePoint object."""
950-
return _point_parse(point_string, WorkflowSpecifics.DUMP_FORMAT)
950+
return _point_parse(
951+
point_string,
952+
WorkflowSpecifics.DUMP_FORMAT,
953+
WorkflowSpecifics.ASSUMED_TIME_ZONE
954+
)
951955

952956

953957
@lru_cache(10000)
954-
def _point_parse(point_string, _dump_fmt):
955-
"""Parse a point_string into a proper TimePoint object."""
958+
def _point_parse(point_string: str, _dump_fmt, _tz) -> 'TimePoint':
959+
"""Parse a point_string into a proper TimePoint object.
960+
961+
Args:
962+
point_string: The string to parse.
963+
_dump_fmt: Dump format (only used to avoid invalid cache hits).
964+
_tz: Cycle point time zone (only used to avoid invalid cache hits).
965+
"""
956966
if "%" in WorkflowSpecifics.DUMP_FORMAT:
957967
# May be a custom not-quite ISO 8601 dump format.
958968
with contextlib.suppress(IsodatetimeError):

cylc/flow/task_pool.py

Lines changed: 45 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -307,82 +307,48 @@ def compute_runahead(self, force=False) -> bool:
307307
308308
With force=True we recompute the limit even if the base point has not
309309
changed (needed if max_future_offset changed, or on reload).
310-
"""
311310
311+
"""
312312
limit = self.config.runahead_limit # e.g. P2 or P2D
313313
count_cycles = False
314314
with suppress(TypeError):
315315
# Count cycles (integer cycling, and optional for datetime too).
316316
ilimit = int(limit) # type: ignore
317317
count_cycles = True
318318

319-
base_point: 'PointBase'
320-
points: List['PointBase'] = []
319+
base_point: Optional['PointBase'] = None
321320

321+
# First get the runahead base point.
322322
if not self.main_pool:
323-
# No tasks yet, just consider sequence points.
324-
if count_cycles:
325-
# Get the first ilimit points in each sequence.
326-
# (After workflow start point - sequence may begin earlier).
327-
points = [
328-
point
329-
for plist in [
330-
seq.get_first_n_points(
331-
ilimit, self.config.start_point)
332-
for seq in self.config.sequences
333-
]
334-
for point in plist
335-
]
336-
# Drop points beyond the limit.
337-
points = sorted(points)[:ilimit + 1]
338-
base_point = min(points)
339-
340-
else:
341-
# Start at first point in each sequence.
342-
# (After workflow start point - sequence may begin earlier).
343-
points = [
344-
point
345-
for point in {
346-
seq.get_first_point(self.config.start_point)
347-
for seq in self.config.sequences
348-
}
349-
if point is not None
350-
]
351-
base_point = min(points)
352-
# Drop points beyond the limit.
353-
points = [
354-
point
355-
for point in points
356-
if point <= base_point + limit
357-
]
358-
323+
# Find the earliest sequence point beyond the workflow start point.
324+
base_point = min(
325+
point
326+
for point in {
327+
seq.get_first_point(self.config.start_point)
328+
for seq in self.config.sequences
329+
}
330+
if point is not None
331+
)
359332
else:
360-
# Find the earliest point with unfinished tasks.
333+
# Find the earliest point with incomplete tasks.
361334
for point, itasks in sorted(self.get_tasks_by_point().items()):
335+
# All n=0 tasks are incomplete by definition, but Cylc 7
336+
# ignores failed ones (it does not ignore submit-failed!).
362337
if (
363-
points # got the limit already so this point too
364-
or any(
365-
not itask.state(
366-
TASK_STATUS_FAILED,
367-
TASK_STATUS_SUCCEEDED,
368-
TASK_STATUS_EXPIRED
369-
)
370-
or (
371-
# For Cylc 7 back-compat, ignore incomplete tasks.
372-
# (Success is required in back-compat mode, so
373-
# failedtasks end up as incomplete; and Cylc 7
374-
# ignores failed tasks in computing the limit).
375-
itask.state.outputs.is_incomplete()
376-
and not cylc.flow.flags.cylc7_back_compat
377-
)
338+
cylc.flow.flags.cylc7_back_compat and
339+
all(
340+
itask.state(TASK_STATUS_FAILED)
378341
for itask in itasks
379342
)
380343
):
381-
points.append(point)
344+
continue
345+
base_point = point
346+
break
382347

383-
if not points:
384-
return False
385-
base_point = min(points)
348+
if base_point is None:
349+
return False
350+
351+
LOG.debug(f"Runahead: base point {base_point}")
386352

387353
if self._prev_runahead_base_point is None:
388354
self._prev_runahead_base_point = base_point
@@ -399,8 +365,10 @@ def compute_runahead(self, force=False) -> bool:
399365
# change or the runahead limit is already at stop point.
400366
return False
401367

402-
# Get all cycle points possible after the base point.
403-
sequence_points: Set['PointBase']
368+
# Now generate all possible cycle points from the base point and stop
369+
# at the runahead limit point. Note both cycle count and time interval
370+
# limits involve all possible cycles, not just active cycles.
371+
sequence_points: Set['PointBase'] = set()
404372
if (
405373
not force
406374
and self._prev_runahead_sequence_points
@@ -410,44 +378,48 @@ def compute_runahead(self, force=False) -> bool:
410378
sequence_points = self._prev_runahead_sequence_points
411379
else:
412380
# Recompute possible points.
413-
sequence_points = set()
414381
for sequence in self.config.sequences:
415-
seq_point = sequence.get_next_point(base_point)
382+
seq_point = sequence.get_first_point(base_point)
416383
count = 1
417384
while seq_point is not None:
418385
if count_cycles:
419386
# P0 allows only the base cycle point to run.
420387
if count > 1 + ilimit:
388+
# this point may be beyond the runahead limit
421389
break
422390
else:
423391
# PT0H allows only the base cycle point to run.
424392
if seq_point > base_point + limit:
393+
# this point can not be beyond the runahead limit
425394
break
426395
count += 1
427396
sequence_points.add(seq_point)
428397
seq_point = sequence.get_next_point(seq_point)
429398
self._prev_runahead_sequence_points = sequence_points
430399
self._prev_runahead_base_point = base_point
431400

432-
points = set(points).union(sequence_points)
433-
434401
if count_cycles:
435-
# Some sequences may have different intervals.
436-
limit_point = sorted(points)[:(ilimit + 1)][-1]
402+
# (len(list) may be less than ilimit due to sequence end)
403+
limit_point = sorted(sequence_points)[:ilimit + 1][-1]
437404
else:
438-
# We already stopped at the runahead limit.
439-
limit_point = sorted(points)[-1]
405+
limit_point = max(sequence_points)
440406

441-
# Adjust for future offset and stop point, if necessary.
407+
# Adjust for future offset and stop point.
442408
pre_adj_limit = limit_point
443409
if self.max_future_offset is not None:
444410
limit_point += self.max_future_offset
445-
LOG.debug(f"{pre_adj_limit} -> {limit_point} (future offset)")
411+
LOG.debug(
412+
"Runahead (future trigger adjust):"
413+
f" {pre_adj_limit} -> {limit_point}"
414+
)
446415
if self.stop_point and limit_point > self.stop_point:
447416
limit_point = self.stop_point
448-
LOG.debug(f"{pre_adj_limit} -> {limit_point} (stop point)")
449-
LOG.debug(f"Runahead limit: {limit_point}")
417+
LOG.debug(
418+
"Runahead (stop point adjust):"
419+
f" {pre_adj_limit} -> {limit_point} (stop point)"
420+
)
450421

422+
LOG.debug(f"Runahead limit: {limit_point}")
451423
self.runahead_limit_point = limit_point
452424
return True
453425

0 commit comments

Comments
 (0)