@@ -792,8 +792,9 @@ def increment_graph_window(
792
792
source_tokens ,
793
793
point ,
794
794
flow_nums ,
795
- False ,
796
- itask
795
+ is_parent = False ,
796
+ itask = itask ,
797
+ replace_existing = True ,
797
798
)
798
799
799
800
# Pre-populate from previous walks
@@ -1153,24 +1154,27 @@ def generate_ghost_task(
1153
1154
is_parent : bool = False ,
1154
1155
itask : Optional ['TaskProxy' ] = None ,
1155
1156
n_depth : int = 0 ,
1157
+ replace_existing : bool = False ,
1156
1158
) -> None :
1157
1159
"""Create task-point element populated with static data.
1158
1160
1159
1161
Args:
1160
1162
source_tokens
1161
1163
point
1162
1164
flow_nums
1163
- is_parent:
1164
- Used to determine whether to load DB state.
1165
- itask:
1166
- Update task-node from corresponding task proxy object.
1165
+ is_parent: Used to determine whether to load DB state.
1166
+ itask: Update task-node from corresponding task proxy object.
1167
1167
n_depth: n-window graph edge distance.
1168
+ replace_existing: Replace any existing data for task as it may
1169
+ be out of date (e.g. flow nums).
1168
1170
"""
1169
1171
tp_id = tokens .id
1170
1172
if (
1171
1173
tp_id in self .data [self .workflow_id ][TASK_PROXIES ]
1172
1174
or tp_id in self .added [TASK_PROXIES ]
1173
1175
):
1176
+ if replace_existing and itask is not None :
1177
+ self .delta_from_task_proxy (itask )
1174
1178
return
1175
1179
1176
1180
name = tokens ['task' ]
@@ -2525,6 +2529,26 @@ def delta_task_xtrigger(self, sig, satisfied):
2525
2529
xtrigger .time = update_time
2526
2530
self .updates_pending = True
2527
2531
2532
+ def delta_from_task_proxy (self , itask : TaskProxy ) -> None :
2533
+ """Create delta from existing pool task proxy.
2534
+
2535
+ Args:
2536
+ itask (cylc.flow.task_proxy.TaskProxy):
2537
+ Update task-node from corresponding task proxy
2538
+ objects from the workflow task pool.
2539
+
2540
+ """
2541
+ tproxy : Optional [PbTaskProxy ]
2542
+ tp_id , tproxy = self .store_node_fetcher (itask .tokens )
2543
+ if not tproxy :
2544
+ return
2545
+ update_time = time ()
2546
+ tp_delta = self .updated [TASK_PROXIES ].setdefault (
2547
+ tp_id , PbTaskProxy (id = tp_id ))
2548
+ tp_delta .stamp = f'{ tp_id } @{ update_time } '
2549
+ self ._process_internal_task_proxy (itask , tp_delta )
2550
+ self .updates_pending = True
2551
+
2528
2552
# -----------
2529
2553
# Job Deltas
2530
2554
# -----------
0 commit comments