@@ -694,23 +694,16 @@ class Flow:
694694 """
695695
696696 _name : str
697- _lazy_engine_flow : Callable [[], _engine .Flow ] | None
697+ _engine_flow_creator : Callable [[], _engine .Flow ]
698+
699+ _lazy_flow_lock : Lock
700+ _lazy_engine_flow : _engine .Flow | None = None
698701
699702 def __init__ (self , name : str , engine_flow_creator : Callable [[], _engine .Flow ]):
700703 validate_flow_name (name )
701704 self ._name = name
702- engine_flow = None
703- lock = Lock ()
704-
705- def _lazy_engine_flow () -> _engine .Flow :
706- nonlocal engine_flow , lock
707- if engine_flow is None :
708- with lock :
709- if engine_flow is None :
710- engine_flow = engine_flow_creator ()
711- return engine_flow
712-
713- self ._lazy_engine_flow = _lazy_engine_flow
705+ self ._engine_flow_creator = engine_flow_creator
706+ self ._lazy_flow_lock = Lock ()
714707
715708 def _render_spec (self , verbose : bool = False ) -> Tree :
716709 """
@@ -794,15 +787,30 @@ def internal_flow(self) -> _engine.Flow:
794787 """
795788 Get the engine flow.
796789 """
797- if self ._lazy_engine_flow is None :
798- raise RuntimeError ( f"Flow { self .full_name } is already removed" )
799- return self ._lazy_engine_flow ()
790+ if self ._lazy_engine_flow is not None :
791+ return self ._lazy_engine_flow
792+ return self ._internal_flow ()
800793
801794 async def internal_flow_async (self ) -> _engine .Flow :
802795 """
803796 Get the engine flow. The async version.
804797 """
805- return await asyncio .to_thread (self .internal_flow )
798+ if self ._lazy_engine_flow is not None :
799+ return self ._lazy_engine_flow
800+ return await asyncio .to_thread (self ._internal_flow )
801+
802+ def _internal_flow (self ) -> _engine .Flow :
803+ """
804+ Get the engine flow. The async version.
805+ """
806+ with self ._lazy_flow_lock :
807+ if self ._lazy_engine_flow is not None :
808+ return self ._lazy_engine_flow
809+
810+ engine_flow = self ._engine_flow_creator ()
811+ self ._lazy_engine_flow = engine_flow
812+
813+ return engine_flow
806814
807815 def setup (self , report_to_stdout : bool = False ) -> None :
808816 """
0 commit comments