Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 25 additions & 17 deletions python/cocoindex/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -694,23 +694,16 @@ class Flow:
"""

_name: str
_lazy_engine_flow: Callable[[], _engine.Flow] | None
_engine_flow_creator: Callable[[], _engine.Flow]

_lazy_flow_lock: Lock
_lazy_engine_flow: _engine.Flow | None = None

def __init__(self, name: str, engine_flow_creator: Callable[[], _engine.Flow]):
validate_flow_name(name)
self._name = name
engine_flow = None
lock = Lock()

def _lazy_engine_flow() -> _engine.Flow:
nonlocal engine_flow, lock
if engine_flow is None:
with lock:
if engine_flow is None:
engine_flow = engine_flow_creator()
return engine_flow

self._lazy_engine_flow = _lazy_engine_flow
self._engine_flow_creator = engine_flow_creator
self._lazy_flow_lock = Lock()

def _render_spec(self, verbose: bool = False) -> Tree:
"""
Expand Down Expand Up @@ -794,15 +787,30 @@ def internal_flow(self) -> _engine.Flow:
"""
Get the engine flow.
"""
if self._lazy_engine_flow is None:
raise RuntimeError(f"Flow {self.full_name} is already removed")
return self._lazy_engine_flow()
if self._lazy_engine_flow is not None:
return self._lazy_engine_flow
return self._internal_flow()

async def internal_flow_async(self) -> _engine.Flow:
"""
Get the engine flow. The async version.
"""
return await asyncio.to_thread(self.internal_flow)
if self._lazy_engine_flow is not None:
return self._lazy_engine_flow
return await asyncio.to_thread(self._internal_flow)

def _internal_flow(self) -> _engine.Flow:
"""
Get the engine flow. The async version.
"""
with self._lazy_flow_lock:
if self._lazy_engine_flow is not None:
return self._lazy_engine_flow

engine_flow = self._engine_flow_creator()
self._lazy_engine_flow = engine_flow

return engine_flow

def setup(self, report_to_stdout: bool = False) -> None:
"""
Expand Down
Loading