Skip to content

Commit 207984e

Browse files
authored
Fix data store job accumulation bug (#6656)
1 parent e3263b3 commit 207984e

File tree

3 files changed

+39
-16
lines changed

3 files changed

+39
-16
lines changed

changes.d/6656.fix.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fix bug where old cycle points could accumulate in the UI.

cylc/flow/data_store_mgr.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1734,7 +1734,7 @@ def update_data_structure(self):
17341734
self.update_workflow()
17351735

17361736
# Don't process updated deltas of pruned nodes
1737-
self.prune_pruned_updated_nodes()
1737+
self.dedupe_pruned_updated_task_proxies()
17381738

17391739
# Gather deltas
17401740
self.batch_deltas()
@@ -1958,8 +1958,8 @@ def _family_ascent_point_prune(
19581958
if fp_id in parent_ids:
19591959
parent_ids.remove(fp_id)
19601960

1961-
def prune_pruned_updated_nodes(self):
1962-
"""Remove updated nodes that will also be pruned this batch.
1961+
def dedupe_pruned_updated_task_proxies(self):
1962+
"""Remove updated task proxies that will also be pruned in this batch.
19631963
19641964
This will avoid processing and sending deltas that will immediately
19651965
be pruned. Kept separate from other pruning to allow for update
@@ -1969,19 +1969,19 @@ def prune_pruned_updated_nodes(self):
19691969
tp_data = self.data[self.workflow_id][TASK_PROXIES]
19701970
tp_added = self.added[TASK_PROXIES]
19711971
tp_updated = self.updated[TASK_PROXIES]
1972-
j_updated = self.updated[JOBS]
19731972
for tp_id in self.pruned_task_proxies:
19741973
if tp_id in tp_updated:
19751974
if tp_id in tp_data:
1976-
node = tp_data[tp_id]
1975+
node: PbTaskProxy = tp_data[tp_id]
19771976
elif tp_id in tp_added:
19781977
node = tp_added[tp_id]
19791978
else:
19801979
continue
1981-
update_node = tp_updated.pop(tp_id)
1982-
for j_id in list(node.jobs) + list(update_node.jobs):
1983-
if j_id in j_updated:
1984-
del j_updated[j_id]
1980+
update_node: PbTaskProxy = tp_updated.pop(tp_id)
1981+
# Remove this task's added/updated jobs in this batch too:
1982+
for j_id in set(node.jobs).union(update_node.jobs):
1983+
for record in (self.added, self.updated):
1984+
record[JOBS].pop(j_id, None)
19851985
self.n_window_edges.difference_update(update_node.edges)
19861986
self.deltas[EDGES].pruned.extend(update_node.edges)
19871987
self.pruned_task_proxies.clear()

tests/integration/test_data_store_mgr.py

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,19 @@
1414
# You should have received a copy of the GNU General Public License
1515
# along with this program. If not, see <http://www.gnu.org/licenses/>.
1616

17-
from typing import Iterable, List, TYPE_CHECKING, cast
17+
from logging import INFO
18+
from typing import (
19+
Iterable,
20+
List,
21+
cast,
22+
)
1823

1924
import pytest
2025

21-
from cylc.flow.data_messages_pb2 import PbPrerequisite, PbTaskProxy
26+
from cylc.flow.data_messages_pb2 import (
27+
PbPrerequisite,
28+
PbTaskProxy,
29+
)
2230
from cylc.flow.data_store_mgr import (
2331
EDGES,
2432
FAMILY_PROXIES,
@@ -31,22 +39,19 @@
3139
from cylc.flow.scheduler import Scheduler
3240
from cylc.flow.task_events_mgr import TaskEventsManager
3341
from cylc.flow.task_outputs import (
34-
TASK_OUTPUT_SUBMITTED,
3542
TASK_OUTPUT_STARTED,
43+
TASK_OUTPUT_SUBMITTED,
3644
TASK_OUTPUT_SUCCEEDED,
3745
)
3846
from cylc.flow.task_state import (
3947
TASK_STATUS_FAILED,
48+
TASK_STATUS_PREPARING,
4049
TASK_STATUS_SUCCEEDED,
4150
TASK_STATUS_WAITING,
4251
)
4352
from cylc.flow.wallclock import get_current_time_string
4453

4554

46-
if TYPE_CHECKING:
47-
from cylc.flow.scheduler import Scheduler
48-
49-
5055
# NOTE: Some of these tests mutate the data store, so running them in
5156
# isolation may see failures when they actually pass if you run the
5257
# whole file
@@ -503,3 +508,20 @@ def _patch_remove(*args, **kwargs):
503508
TASK_OUTPUT_SUCCEEDED,
504509
}
505510
assert get_delta_outputs() is None
511+
512+
513+
async def test_remove_added_jobs_of_pruned_task(one: Scheduler, start):
514+
"""When a task is pruned, any of its jobs added in the same batch
515+
must be removed from the batch.
516+
517+
See https://github.com/cylc/cylc-flow/pull/6656
518+
"""
519+
async with start(one):
520+
itask = one.pool.get_tasks()[0]
521+
itask.state_reset(TASK_STATUS_PREPARING)
522+
one.task_events_mgr.process_message(itask, INFO, TASK_OUTPUT_SUCCEEDED)
523+
assert not one.data_store_mgr.data[one.id][JOBS]
524+
assert len(one.data_store_mgr.added[JOBS]) == 1
525+
one.data_store_mgr.update_data_structure()
526+
assert not one.data_store_mgr.data[one.id][JOBS]
527+
assert not one.data_store_mgr.added[JOBS]

0 commit comments

Comments
 (0)