@@ -1578,50 +1578,42 @@ def can_be_spawned(self, name: str, point: 'PointBase') -> bool:
1578
1578
1579
1579
def _get_task_history (
1580
1580
self , name : str , point : 'PointBase' , flow_nums : Set [int ]
1581
- ) -> Tuple [bool , int , str , bool ]:
1582
- """Get history of previous submits for this task .
1581
+ ) -> Tuple [int , Optional [ str ] , bool ]:
1582
+ """Get submit_num, status, flow_wait for point/name in flow_nums .
1583
1583
1584
1584
Args:
1585
1585
name: task name
1586
1586
point: task cycle point
1587
1587
flow_nums: task flow numbers
1588
1588
1589
1589
Returns:
1590
- never_spawned: if task never spawned before
1591
- submit_num: submit number of previous submit
1592
- prev_status: task status of previous sumbit
1593
- prev_flow_wait: if previous submit was a flow-wait task
1590
+ (submit_num, status, flow_wait)
1591
+ If no matching history, status will be None
1594
1592
1595
1593
"""
1594
+ submit_num : int = 0
1595
+ status : Optional [str ] = None
1596
+ flow_wait = False
1597
+
1596
1598
info = self .workflow_db_mgr .pri_dao .select_prev_instances (
1597
1599
name , str (point )
1598
1600
)
1599
- try :
1600
- submit_num : int = max (s [0 ] for s in info )
1601
- except ValueError :
1602
- # never spawned in any flow
1603
- submit_num = 0
1604
- never_spawned = True
1605
- else :
1606
- never_spawned = False
1607
- # (submit_num could still be zero, if removed before submit)
1608
-
1609
- prev_status : str = TASK_STATUS_WAITING
1610
- prev_flow_wait = False
1601
+ with suppress (ValueError ):
1602
+ submit_num = max (s [0 ] for s in info )
1611
1603
1612
- for _snum , f_wait , old_fnums , status in info :
1604
+ for _snum , f_wait , old_fnums , old_status in info :
1613
1605
if set .intersection (flow_nums , old_fnums ):
1614
1606
# matching flows
1615
- prev_status = status
1616
- prev_flow_wait = f_wait
1617
- if prev_status in TASK_STATUSES_FINAL :
1607
+ status = old_status
1608
+ flow_wait = f_wait
1609
+ if status in TASK_STATUSES_FINAL :
1618
1610
# task finished
1619
1611
break
1620
1612
# Else continue: there may be multiple entries with flow
1621
1613
# overlap due to merges (they'll have have same snum and
1622
1614
# f_wait); keep going to find the finished one, if any.
1623
1615
1624
- return never_spawned , submit_num , prev_status , prev_flow_wait
1616
+ return submit_num , status , flow_wait
1625
1617
1626
1618
def _load_historical_outputs (self , itask : 'TaskProxy' ) -> None :
1627
1619
"""Load a task's historical outputs from the DB."""
@@ -1631,8 +1623,11 @@ def _load_historical_outputs(self, itask: 'TaskProxy') -> None:
1631
1623
# task never ran before
1632
1624
self .db_add_new_flow_rows (itask )
1633
1625
else :
1626
+ flow_seen = False
1634
1627
for outputs_str , fnums in info .items ():
1635
1628
if itask .flow_nums .intersection (fnums ):
1629
+ # DB row has overlap with itask's flows
1630
+ flow_seen = True
1636
1631
# BACK COMPAT: In Cylc >8.0.0,<8.3.0, only the task
1637
1632
# messages were stored in the DB as a list.
1638
1633
# from: 8.0.0
@@ -1649,6 +1644,9 @@ def _load_historical_outputs(self, itask: 'TaskProxy') -> None:
1649
1644
# [message] - always the full task message
1650
1645
for msg in outputs :
1651
1646
itask .state .outputs .set_message_complete (msg )
1647
+ if not flow_seen :
1648
+ # itask never ran before in its assigned flows
1649
+ self .db_add_new_flow_rows (itask )
1652
1650
1653
1651
def spawn_task (
1654
1652
self ,
@@ -1658,44 +1656,52 @@ def spawn_task(
1658
1656
force : bool = False ,
1659
1657
flow_wait : bool = False ,
1660
1658
) -> Optional [TaskProxy ]:
1661
- """Return task proxy if not completed in this flow, or if forced .
1659
+ """Return a new task proxy for the given flow if possible .
1662
1660
1663
- If finished previously with flow wait, just try to spawn children.
1661
+ We need to hit the DB for:
1662
+ - submit number
1663
+ - task status
1664
+ - flow-wait
1665
+ - completed outputs (e.g. via "cylc set")
1664
1666
1665
- Note finished tasks may be incomplete, but we don't automatically
1666
- re-run incomplete tasks in the same flow.
1667
+ If history records a final task status (for this flow):
1668
+ - if not flow wait, don't spawn (return None)
1669
+ - if flow wait, don't spawn (return None) but do spawn children
1670
+ - if outputs are incomplete, don't auto-rerun it (return None)
1667
1671
1668
- For every task spawned, we need a DB lookup for submit number,
1669
- and flow-wait.
1672
+ Otherwise, spawn the task and load any completed outputs.
1670
1673
1671
1674
"""
1672
- if not self .can_be_spawned (name , point ):
1673
- return None
1674
-
1675
- never_spawned , submit_num , prev_status , prev_flow_wait = (
1675
+ submit_num , prev_status , prev_flow_wait = (
1676
1676
self ._get_task_history (name , point , flow_nums )
1677
1677
)
1678
1678
1679
- if (
1680
- not never_spawned and
1681
- not prev_flow_wait and
1682
- submit_num == 0
1683
- ):
1684
- # Previous instance removed before completing any outputs.
1685
- LOG .debug (f"Not spawning { point } /{ name } - task removed" )
1686
- return None
1687
-
1679
+ # Create the task proxy with any completed outputs loaded.
1688
1680
itask = self ._get_task_proxy_db_outputs (
1689
1681
point ,
1690
1682
self .config .get_taskdef (name ),
1691
1683
flow_nums ,
1692
- status = prev_status ,
1684
+ status = prev_status or TASK_STATUS_WAITING ,
1693
1685
submit_num = submit_num ,
1694
1686
flow_wait = flow_wait ,
1695
1687
)
1696
1688
if itask is None :
1697
1689
return None
1698
1690
1691
+ if (
1692
+ prev_status is not None
1693
+ and not itask .state .outputs .get_completed_outputs ()
1694
+ ):
1695
+ # If itask has any history in this flow but no completed outputs
1696
+ # we can infer it was deliberately removed, so don't respawn it.
1697
+ # TODO (follow-up work):
1698
+ # - this logic fails if task removed after some outputs completed
1699
+ # - this is does not conform to future "cylc remove" flow-erasure
1700
+ # behaviour which would result in respawning of the removed task
1701
+ # See github.com/cylc/cylc-flow/pull/6186/#discussion_r1669727292
1702
+ LOG .debug (f"Not respawning { point } /{ name } - task was removed" )
1703
+ return None
1704
+
1699
1705
if prev_status in TASK_STATUSES_FINAL :
1700
1706
# Task finished previously.
1701
1707
msg = f"[{ point } /{ name } :{ prev_status } ] already finished"
@@ -1878,7 +1884,6 @@ def set_prereqs_and_outputs(
1878
1884
- future tasks must be specified individually
1879
1885
- family names are not expanded to members
1880
1886
1881
-
1882
1887
Uses a transient task proxy to spawn children. (Even if parent was
1883
1888
previously spawned in this flow its children might not have been).
1884
1889
@@ -1963,6 +1968,7 @@ def _set_outputs_itask(
1963
1968
self .data_store_mgr .delta_task_outputs (itask )
1964
1969
self .workflow_db_mgr .put_update_task_state (itask )
1965
1970
self .workflow_db_mgr .put_update_task_outputs (itask )
1971
+ self .workflow_db_mgr .process_queued_ops ()
1966
1972
1967
1973
def _set_prereqs_itask (
1968
1974
self ,
@@ -2168,10 +2174,9 @@ def force_trigger_tasks(
2168
2174
if not self .can_be_spawned (name , point ):
2169
2175
continue
2170
2176
2171
- _ , submit_num , _prev_status , prev_fwait = (
2177
+ submit_num , _ , prev_fwait = (
2172
2178
self ._get_task_history (name , point , flow_nums )
2173
2179
)
2174
-
2175
2180
itask = TaskProxy (
2176
2181
self .tokens ,
2177
2182
self .config .get_taskdef (name ),
0 commit comments