Skip to content

Commit 07cca5f

Browse files
authored
family add and prune fix (#6589)
* family add and prune fix * Apply suggestions from Ronnie Co-authored-by: Ronnie Dutta <[email protected]>
1 parent 8a8f2db commit 07cca5f

File tree

3 files changed

+129
-8
lines changed

3 files changed

+129
-8
lines changed

changes.d/6589.fix.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fix potential accumulation of old families in UI.

cylc/flow/data_store_mgr.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1919,10 +1919,14 @@ def _family_ascent_point_prune(
19191919
family. The work back up to origin checking these families are active.
19201920
19211921
"""
1922-
fp_data = self.data[self.workflow_id][FAMILY_PROXIES]
19231922
fp_updated = self.updated[FAMILY_PROXIES]
1924-
if fp_id in fp_data:
1925-
fam_node = fp_data[fp_id]
1923+
fam_node = self.data[self.workflow_id][FAMILY_PROXIES].get(
1924+
fp_id,
1925+
self.added[FAMILY_PROXIES].get(fp_id, None)
1926+
)
1927+
# Should never be None,
1928+
# leaving in as protection against potential race conditions.
1929+
if fam_node is not None:
19261930
# Gather child families, then check/update recursively
19271931
for child_id in fam_node.child_families:
19281932
if child_id in checked_ids:
@@ -1937,11 +1941,14 @@ def _family_ascent_point_prune(
19371941
child_tasks.update(fp_updated[fp_id].child_tasks)
19381942
if fp_updated[fp_id].child_families:
19391943
child_families.update(fp_updated[fp_id].child_families)
1940-
# if any child tasks or families are in window, don't prune.
1944+
# If any child tasks or families are in window,
1945+
# then don't prune this family.
19411946
if (
19421947
child_tasks.difference(node_ids)
19431948
or child_families.difference(prune_ids)
19441949
):
1950+
# If any child tasks or families will be pruned,
1951+
# then update family states.
19451952
if (
19461953
child_tasks.intersection(node_ids)
19471954
or child_families.intersection(prune_ids)

tests/integration/test_data_store_mgr.py

Lines changed: 117 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -318,10 +318,123 @@ async def test_update_data_structure(harness):
318318
assert TASK_STATUS_FAILED in set(collect_states(data, FAMILY_PROXIES))
319319
# state totals changed
320320
assert TASK_STATUS_FAILED in data[WORKFLOW].state_totals
321-
# Shows pruning worked
322-
# TODO: fixme
323-
# https://github.com/cylc/cylc-flow/issues/4175#issuecomment-1025666413
324-
# assert len({t.is_held for t in data[TASK_PROXIES].values()}) == 1
321+
322+
323+
async def test_prune_data_store(flow, scheduler, start):
324+
"""Test prune_data_store. This method will expand and reduce the data-store
325+
to invoke pruning.
326+
327+
Also test rapid addition and removal of families (as happens with suicide
328+
triggers):
329+
https://github.com/cylc/cylc-ui/issues/1999
330+
331+
"""
332+
id_ = flow({
333+
'scheduling': {
334+
'graph': {
335+
'R1': 'foo => bar'
336+
}
337+
},
338+
'runtime': {
339+
'FOOBAR': {},
340+
'FOO': {
341+
'inherit': 'FOOBAR'
342+
},
343+
'foo': {
344+
'inherit': 'FOO'
345+
},
346+
'BAR': {
347+
'inherit': 'FOOBAR'
348+
},
349+
'bar': {
350+
'inherit': 'BAR'
351+
}
352+
}
353+
})
354+
schd = scheduler(id_)
355+
async with start(schd):
356+
# initialise the data store
357+
await schd.update_data_structure()
358+
w_id = schd.data_store_mgr.workflow_id
359+
data = schd.data_store_mgr.data[w_id]
360+
schd.pool.hold_tasks(['*'])
361+
await schd.update_data_structure()
362+
assert len({t.is_held for t in data[TASK_PROXIES].values()}) == 2
363+
364+
# Window size reduction to invoke pruning
365+
schd.data_store_mgr.set_graph_window_extent(0)
366+
schd.data_store_mgr.update_data_structure()
367+
assert len({t.is_held for t in data[TASK_PROXIES].values()}) == 1
368+
369+
# Test rapid addition and removal
370+
# bar/BAR task/family proxies not in .added
371+
assert len({
372+
t.name
373+
for t in schd.data_store_mgr.added[TASK_PROXIES].values()
374+
if t.name == 'bar'
375+
}) == 0
376+
assert len({
377+
f.name
378+
for f in schd.data_store_mgr.added[FAMILY_PROXIES].values()
379+
if f.name == 'BAR'
380+
}) == 0
381+
# Add bar/BAR on set output of foo
382+
for itask in schd.pool.get_tasks():
383+
schd.pool.spawn_on_output(itask, TASK_OUTPUT_SUCCEEDED)
384+
# bar/BAR now found.
385+
assert len({
386+
t.name
387+
for t in schd.data_store_mgr.added[TASK_PROXIES].values()
388+
if t.name == 'bar'
389+
}) == 1
390+
assert len({
391+
f.name
392+
for f in schd.data_store_mgr.added[FAMILY_PROXIES].values()
393+
if f.name == 'BAR'
394+
}) == 1
395+
# Before updating the data-store, remove bar/BAR.
396+
schd.pool.remove(schd.pool._get_task_by_id('1/bar'), 'Test removal')
397+
schd.data_store_mgr.update_data_structure()
398+
# bar/BAR not found in data or added stores.
399+
assert len({
400+
t.name
401+
for t in data[TASK_PROXIES].values()
402+
if t.name == 'bar'
403+
}) == 0
404+
assert len({
405+
t.name
406+
for t in schd.data_store_mgr.added[TASK_PROXIES].values()
407+
if t.name == 'bar'
408+
}) == 0
409+
assert len({
410+
f.name
411+
for f in data[FAMILY_PROXIES].values()
412+
if f.name == 'BAR'
413+
}) == 0
414+
assert len({
415+
f.name
416+
for f in schd.data_store_mgr.added[FAMILY_PROXIES].values()
417+
if f.name == 'BAR'
418+
}) == 0
419+
420+
421+
async def test_family_ascent_point_prune(harness):
422+
"""Test _family_ascent_point_prune. This method tries to remove
423+
non-existent family."""
424+
schd, data = harness
425+
fp_id = 'NotAFamilyProxy'
426+
parent_ids = {fp_id}
427+
checked_ids = set()
428+
node_ids = set()
429+
schd.data_store_mgr._family_ascent_point_prune(
430+
next(iter(parent_ids)),
431+
node_ids,
432+
parent_ids,
433+
checked_ids,
434+
schd.data_store_mgr.family_pruned_ids
435+
)
436+
assert len(checked_ids) == 1
437+
assert len(parent_ids) == 0
325438

326439

327440
def test_delta_task_prerequisite(harness):

0 commit comments

Comments
 (0)