Skip to content

Commit 3cdd0d6

Browse files
committed
chore(py-async): more cleanups to make Python SDK async-compliant (#359)
1 parent 0b45eef commit 3cdd0d6

File tree

4 files changed

+58
-32
lines changed

4 files changed

+58
-32
lines changed

python/cocoindex/cli.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
from . import flow, lib
66
from .setup import sync_setup, drop_setup, flow_names_with_setup, apply_setup_changes
7+
from .runtime import execution_context
78

89
@click.group()
910
def cli():
@@ -113,11 +114,13 @@ def update(flow_name: str | None, live: bool, quiet: bool):
113114
Update the index to reflect the latest data from data sources.
114115
"""
115116
options = flow.FlowLiveUpdaterOptions(live_mode=live, print_stats=not quiet)
116-
if flow_name is None:
117-
asyncio.run(flow.update_all_flows(options))
118-
else:
119-
updater = flow.FlowLiveUpdater(_flow_by_name(flow_name), options)
120-
asyncio.run(updater.wait())
117+
async def _update():
118+
if flow_name is None:
119+
await flow.update_all_flows(options)
120+
else:
121+
updater = await flow.FlowLiveUpdater.create(_flow_by_name(flow_name), options)
122+
await updater.wait()
123+
execution_context.run(_update())
121124

122125
@cli.command()
123126
@click.argument("flow_name", type=str, required=False)
@@ -167,7 +170,7 @@ def server(address: str, live_update: bool, quiet: bool, cors_origin: str | None
167170
lib.start_server(lib.ServerSettings(address=address, cors_origin=cors_origin))
168171
if live_update:
169172
options = flow.FlowLiveUpdaterOptions(live_mode=True, print_stats=not quiet)
170-
asyncio.run(flow.update_all_flows(options))
173+
execution_context.run(flow.update_all_flows(options))
171174
input("Press Enter to stop...")
172175

173176

python/cocoindex/flow.py

Lines changed: 34 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -369,9 +369,22 @@ class FlowLiveUpdater:
369369
"""
370370
_engine_live_updater: _engine.FlowLiveUpdater
371371

372-
def __init__(self, fl: Flow, options: FlowLiveUpdaterOptions | None = None):
373-
self._engine_live_updater = _engine.FlowLiveUpdater(
374-
fl._lazy_engine_flow(), dump_engine_object(options or FlowLiveUpdaterOptions()))
372+
def __init__(self, arg: Flow | _engine.FlowLiveUpdater, options: FlowLiveUpdaterOptions | None = None):
373+
if isinstance(arg, _engine.FlowLiveUpdater):
374+
self._engine_live_updater = arg
375+
else:
376+
self._engine_live_updater = execution_context.run(_engine.FlowLiveUpdater(
377+
arg.internal_flow(), dump_engine_object(options or FlowLiveUpdaterOptions())))
378+
379+
@staticmethod
380+
async def create(fl: Flow, options: FlowLiveUpdaterOptions | None = None) -> FlowLiveUpdater:
381+
"""
382+
Create a live updater for a flow.
383+
"""
384+
engine_live_updater = await _engine.FlowLiveUpdater.create(
385+
await fl.ainternal_flow(),
386+
dump_engine_object(options or FlowLiveUpdaterOptions()))
387+
return FlowLiveUpdater(engine_live_updater)
375388

376389
def __enter__(self) -> FlowLiveUpdater:
377390
return self
@@ -450,7 +463,7 @@ async def update(self) -> _engine.IndexUpdateInfo:
450463
Update the index defined by the flow.
451464
Once the function returns, the indice is fresh up to the moment when the function is called.
452465
"""
453-
updater = FlowLiveUpdater(self, FlowLiveUpdaterOptions(live_mode=False))
466+
updater = await FlowLiveUpdater.create(self, FlowLiveUpdaterOptions(live_mode=False))
454467
await updater.wait()
455468
return updater.update_stats()
456469

@@ -466,6 +479,12 @@ def internal_flow(self) -> _engine.Flow:
466479
"""
467480
return self._lazy_engine_flow()
468481

482+
async def ainternal_flow(self) -> _engine.Flow:
483+
"""
484+
Get the engine flow. The async version.
485+
"""
486+
return await asyncio.to_thread(self.internal_flow)
487+
469488
def _create_lazy_flow(name: str | None, fl_def: Callable[[FlowBuilder, DataScope], None]) -> Flow:
470489
"""
471490
Create a flow without really building it yet.
@@ -523,17 +542,23 @@ def ensure_all_flows_built() -> None:
523542
"""
524543
Ensure all flows are built.
525544
"""
526-
with _flows_lock:
527-
for fl in _flows.values():
528-
fl.internal_flow()
545+
for fl in flows():
546+
fl.internal_flow()
547+
548+
async def aensure_all_flows_built() -> None:
549+
"""
550+
Ensure all flows are built.
551+
"""
552+
for fl in flows():
553+
await fl.ainternal_flow()
529554

530555
async def update_all_flows(options: FlowLiveUpdaterOptions) -> dict[str, _engine.IndexUpdateInfo]:
531556
"""
532557
Update all flows.
533558
"""
534-
ensure_all_flows_built()
559+
await aensure_all_flows_built()
535560
async def _update_flow(fl: Flow) -> _engine.IndexUpdateInfo:
536-
updater = FlowLiveUpdater(fl, options)
561+
updater = await FlowLiveUpdater.create(fl, options)
537562
await updater.wait()
538563
return updater.update_stats()
539564
fls = flows()

python/cocoindex/lib.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,8 @@ async def _inner(*args, **kwargs):
101101
try:
102102
if _should_run_cli():
103103
# Schedule to a separate thread as it invokes nested event loop.
104-
return await asyncio.to_thread(_run_cli)
104+
# return await asyncio.to_thread(_run_cli)
105+
return _run_cli()
105106
return await fn(*args, **kwargs)
106107
finally:
107108
stop()

src/py/mod.rs

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -97,24 +97,21 @@ pub struct FlowLiveUpdater(pub Arc<tokio::sync::RwLock<execution::FlowLiveUpdate
9797

9898
#[pymethods]
9999
impl FlowLiveUpdater {
100-
#[new]
101-
pub fn new(
102-
py: Python<'_>,
100+
#[staticmethod]
101+
pub fn create<'py>(
102+
py: Python<'py>,
103103
flow: &Flow,
104104
options: Pythonized<execution::FlowLiveUpdaterOptions>,
105-
) -> PyResult<Self> {
106-
py.allow_threads(|| {
107-
let live_updater = get_runtime()
108-
.block_on(async {
109-
let live_updater = execution::FlowLiveUpdater::start(
110-
flow.0.clone(),
111-
&get_lib_context()?.pool,
112-
options.into_inner(),
113-
)
114-
.await?;
115-
anyhow::Ok(live_updater)
116-
})
117-
.into_py_result()?;
105+
) -> PyResult<Bound<'py, PyAny>> {
106+
let flow = flow.0.clone();
107+
future_into_py(py, async move {
108+
let live_updater = execution::FlowLiveUpdater::start(
109+
flow,
110+
&get_lib_context().into_py_result()?.pool,
111+
options.into_inner(),
112+
)
113+
.await
114+
.into_py_result()?;
118115
Ok(Self(Arc::new(tokio::sync::RwLock::new(live_updater))))
119116
})
120117
}

0 commit comments

Comments
 (0)