Skip to content

Commit 9688190

Browse files
committed
fix(py-async): minor robust enhancement for awaiting live updater (#355)
1 parent ff9d3af commit 9688190

File tree

3 files changed

+17
-9
lines changed

3 files changed

+17
-9
lines changed

examples/gdrive_text_embedding/main.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from dotenv import load_dotenv
22

3+
import asyncio
34
import cocoindex
45
import datetime
56
import os
@@ -73,5 +74,4 @@ async def _run():
7374

7475
if __name__ == "__main__":
7576
load_dotenv(override=True)
76-
import asyncio
7777
asyncio.run(_run())

python/cocoindex/flow.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
from . import op
2020
from .convert import dump_engine_object
2121
from .typing import encode_enriched_type
22-
from .runtime import op_execution_context
22+
from .runtime import execution_context
2323

2424
class _NameBuilder:
2525
_existing_names: set[str]
@@ -378,7 +378,7 @@ def __enter__(self) -> FlowLiveUpdater:
378378

379379
def __exit__(self, exc_type, exc_value, traceback):
380380
self.abort()
381-
asyncio.run(self.wait())
381+
execution_context.run(self.wait())
382382

383383
async def __aenter__(self) -> FlowLiveUpdater:
384384
return self
@@ -476,7 +476,7 @@ def _create_engine_flow() -> _engine.Flow:
476476
root_scope = DataScope(
477477
flow_builder_state, flow_builder_state.engine_flow_builder.root_scope())
478478
fl_def(FlowBuilder(flow_builder_state), root_scope)
479-
return flow_builder_state.engine_flow_builder.build_flow(op_execution_context.event_loop)
479+
return flow_builder_state.engine_flow_builder.build_flow(execution_context.event_loop)
480480

481481
return Flow(_create_engine_flow)
482482

@@ -572,7 +572,7 @@ def __init__(
572572
flow_builder_state.engine_flow_builder.set_direct_output(
573573
_data_slice_state(output).engine_data_slice)
574574
self._engine_flow = flow_builder_state.engine_flow_builder.build_transient_flow(
575-
op_execution_context.event_loop)
575+
execution_context.event_loop)
576576

577577
def __str__(self):
578578
return str(self._engine_flow)

python/cocoindex/runtime.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,12 @@
1+
"""
2+
This module provides a standalone execution runtime for executing coroutines in a thread-safe
3+
manner.
4+
"""
5+
16
import threading
27
import asyncio
3-
4-
class _OpExecutionContext:
8+
from typing import Coroutine
9+
class _ExecutionContext:
510
_lock: threading.Lock
611
_event_loop: asyncio.AbstractEventLoop | None = None
712

@@ -14,8 +19,11 @@ def event_loop(self) -> asyncio.AbstractEventLoop:
1419
with self._lock:
1520
if self._event_loop is None:
1621
self._event_loop = asyncio.new_event_loop()
17-
asyncio.set_event_loop(self._event_loop)
1822
threading.Thread(target=self._event_loop.run_forever, daemon=True).start()
1923
return self._event_loop
2024

21-
op_execution_context = _OpExecutionContext()
25+
def run(self, coro: Coroutine):
26+
"""Run a coroutine in the event loop, blocking until it finishes. Return its result."""
27+
return asyncio.run_coroutine_threadsafe(coro, self.event_loop).result()
28+
29+
execution_context = _ExecutionContext()

0 commit comments

Comments
 (0)