diff --git a/python/cocoindex/flow.py b/python/cocoindex/flow.py index 4d1da28d6..033d59f53 100644 --- a/python/cocoindex/flow.py +++ b/python/cocoindex/flow.py @@ -694,23 +694,16 @@ class Flow: """ _name: str - _lazy_engine_flow: Callable[[], _engine.Flow] | None + _engine_flow_creator: Callable[[], _engine.Flow] + + _lazy_flow_lock: Lock + _lazy_engine_flow: _engine.Flow | None = None def __init__(self, name: str, engine_flow_creator: Callable[[], _engine.Flow]): validate_flow_name(name) self._name = name - engine_flow = None - lock = Lock() - - def _lazy_engine_flow() -> _engine.Flow: - nonlocal engine_flow, lock - if engine_flow is None: - with lock: - if engine_flow is None: - engine_flow = engine_flow_creator() - return engine_flow - - self._lazy_engine_flow = _lazy_engine_flow + self._engine_flow_creator = engine_flow_creator + self._lazy_flow_lock = Lock() def _render_spec(self, verbose: bool = False) -> Tree: """ @@ -794,15 +787,30 @@ def internal_flow(self) -> _engine.Flow: """ Get the engine flow. """ - if self._lazy_engine_flow is None: - raise RuntimeError(f"Flow {self.full_name} is already removed") - return self._lazy_engine_flow() + if self._lazy_engine_flow is not None: + return self._lazy_engine_flow + return self._internal_flow() async def internal_flow_async(self) -> _engine.Flow: """ Get the engine flow. The async version. """ - return await asyncio.to_thread(self.internal_flow) + if self._lazy_engine_flow is not None: + return self._lazy_engine_flow + return await asyncio.to_thread(self._internal_flow) + + def _internal_flow(self) -> _engine.Flow: + """ + Get the engine flow. The async version. + """ + with self._lazy_flow_lock: + if self._lazy_engine_flow is not None: + return self._lazy_engine_flow + + engine_flow = self._engine_flow_creator() + self._lazy_engine_flow = engine_flow + + return engine_flow def setup(self, report_to_stdout: bool = False) -> None: """