99from flowco .dataflow .phase import Phase
1010from flowco .session .session import session
1111from flowco .util .output import error , log , warn , logger
12- from flowco .util .stopper import Stopper
1312from typing import (
1413 Callable ,
1514 Dict ,
@@ -194,6 +193,7 @@ def build_with_worklist(
194193 graph : DataFlowGraph ,
195194 target_phase : Phase ,
196195 node_ids : List [str ] | str | None ,
196+ should_stop : Callable [[], bool ] = lambda : False ,
197197 ) -> Iterator [BuildUpdate ]:
198198
199199 node_ids = graph .listify_node_ids (node_ids )
@@ -205,11 +205,11 @@ def build_with_worklist(
205205
206206 if config .sequential :
207207 yield from self .build_with_worklist_sequential (
208- pass_config , graph , target_phase , node_ids
208+ pass_config , graph , target_phase , node_ids , should_stop
209209 )
210210 else :
211211 yield from self .build_with_worklist_parallel (
212- pass_config , graph , target_phase , node_ids
212+ pass_config , graph , target_phase , node_ids , should_stop
213213 )
214214
215215 def build_with_worklist_sequential (
@@ -218,6 +218,7 @@ def build_with_worklist_sequential(
218218 graph : DataFlowGraph ,
219219 target_phase : Phase ,
220220 node_ids : List [str ] | str | None ,
221+ should_stop : Callable [[], bool ],
221222 ) -> Iterator [BuildUpdate ]:
222223
223224 @dataclass
@@ -236,7 +237,7 @@ def process_workitem(graph: DataFlowGraph, work_item: WorkItem):
236237 node = graph [node_id ]
237238
238239 with logger (f"{ node_id } :{ node .pill } " ):
239- if session . get ( "stopper" , Stopper ). should_stop ():
240+ if should_stop ():
240241 return NodeResult (work_item , node , node , "Stopped" )
241242
242243 node .build_status = phase_to_message .get (
@@ -322,8 +323,7 @@ def submit_items(current_worklist: Dict[WorkItem, List[WorkItem]]):
322323 current_worklist = submit_items (worklist )
323324
324325 with logger ("Running worklist" ):
325- stopper = session .get ("stopper" , Stopper )
326- while len (done ) < len (worklist ) and not stopper .should_stop ():
326+ while len (done ) < len (worklist ) and not should_stop ():
327327 try :
328328 log (f"Building: { len (worklist .keys ()) - len (done )} steps" )
329329 item = in_flight .pop (0 )
@@ -372,6 +372,7 @@ def build_with_worklist_parallel(
372372 graph : DataFlowGraph ,
373373 target_phase : Phase ,
374374 node_ids : List [str ] | str | None ,
375+ should_stop : Callable [[], bool ],
375376 ) -> Iterator [BuildUpdate ]:
376377
377378 @dataclass
@@ -396,7 +397,7 @@ def process_workitem(graph: DataFlowGraph, work_item: WorkItem):
396397
397398 with buffer_output (f"{ node .pill } " ) as buffer :
398399 with logger (f"Starting" ):
399- if session . get ( "stopper" , Stopper ). should_stop ():
400+ if should_stop ():
400401 result_queue .put (
401402 NodeResult (work_item , node , node , "Stopped" )
402403 )
@@ -520,8 +521,7 @@ def submit_items(current_worklist: Dict[WorkItem, List[WorkItem]]):
520521 current_worklist = submit_items (worklist )
521522
522523 with logger ("Running worklist" ):
523- stopper = session .get ("stopper" , Stopper )
524- while len (done ) < len (worklist ) and not stopper .should_stop ():
524+ while len (done ) < len (worklist ) and not should_stop ():
525525 try :
526526 log (f"Remaining: { len (worklist .keys ()) - len (done )} steps" )
527527 result = result_queue .get ()
@@ -553,7 +553,8 @@ def submit_items(current_worklist: Dict[WorkItem, List[WorkItem]]):
553553 except Exception as e :
554554 error (e )
555555 raise e
556- if stopper .should_stop ():
556+
557+ if should_stop ():
557558 executor .shutdown (wait = False , cancel_futures = True )
558559 log (f"Stopping!" )
559560
0 commit comments