Skip to content

Commit f204996

Browse files
committed
Merge branch 'main' into jelly-ex
2 parents 095b7f2 + 8ccb2a0 commit f204996

File tree

14 files changed

+577
-424
lines changed

14 files changed

+577
-424
lines changed

python/cocoindex/cli.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
from . import flow, lib
66
from .setup import sync_setup, drop_setup, flow_names_with_setup, apply_setup_changes
7+
from .runtime import execution_context
78

89
@click.group()
910
def cli():
@@ -113,11 +114,13 @@ def update(flow_name: str | None, live: bool, quiet: bool):
113114
Update the index to reflect the latest data from data sources.
114115
"""
115116
options = flow.FlowLiveUpdaterOptions(live_mode=live, print_stats=not quiet)
116-
if flow_name is None:
117-
asyncio.run(flow.update_all_flows(options))
118-
else:
119-
updater = flow.FlowLiveUpdater(_flow_by_name(flow_name), options)
120-
asyncio.run(updater.wait())
117+
async def _update():
118+
if flow_name is None:
119+
await flow.update_all_flows(options)
120+
else:
121+
updater = await flow.FlowLiveUpdater.create(_flow_by_name(flow_name), options)
122+
await updater.wait()
123+
execution_context.run(_update())
121124

122125
@cli.command()
123126
@click.argument("flow_name", type=str, required=False)
@@ -167,7 +170,7 @@ def server(address: str, live_update: bool, quiet: bool, cors_origin: str | None
167170
lib.start_server(lib.ServerSettings(address=address, cors_origin=cors_origin))
168171
if live_update:
169172
options = flow.FlowLiveUpdaterOptions(live_mode=True, print_stats=not quiet)
170-
asyncio.run(flow.update_all_flows(options))
173+
execution_context.run(flow.update_all_flows(options))
171174
input("Press Enter to stop...")
172175

173176

python/cocoindex/flow.py

Lines changed: 34 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -369,9 +369,22 @@ class FlowLiveUpdater:
369369
"""
370370
_engine_live_updater: _engine.FlowLiveUpdater
371371

372-
def __init__(self, fl: Flow, options: FlowLiveUpdaterOptions | None = None):
373-
self._engine_live_updater = _engine.FlowLiveUpdater(
374-
fl._lazy_engine_flow(), dump_engine_object(options or FlowLiveUpdaterOptions()))
372+
def __init__(self, arg: Flow | _engine.FlowLiveUpdater, options: FlowLiveUpdaterOptions | None = None):
373+
if isinstance(arg, _engine.FlowLiveUpdater):
374+
self._engine_live_updater = arg
375+
else:
376+
self._engine_live_updater = execution_context.run(_engine.FlowLiveUpdater(
377+
arg.internal_flow(), dump_engine_object(options or FlowLiveUpdaterOptions())))
378+
379+
@staticmethod
380+
async def create(fl: Flow, options: FlowLiveUpdaterOptions | None = None) -> FlowLiveUpdater:
381+
"""
382+
Create a live updater for a flow.
383+
"""
384+
engine_live_updater = await _engine.FlowLiveUpdater.create(
385+
await fl.ainternal_flow(),
386+
dump_engine_object(options or FlowLiveUpdaterOptions()))
387+
return FlowLiveUpdater(engine_live_updater)
375388

376389
def __enter__(self) -> FlowLiveUpdater:
377390
return self
@@ -450,7 +463,7 @@ async def update(self) -> _engine.IndexUpdateInfo:
450463
Update the index defined by the flow.
451464
Once the function returns, the indice is fresh up to the moment when the function is called.
452465
"""
453-
updater = FlowLiveUpdater(self, FlowLiveUpdaterOptions(live_mode=False))
466+
updater = await FlowLiveUpdater.create(self, FlowLiveUpdaterOptions(live_mode=False))
454467
await updater.wait()
455468
return updater.update_stats()
456469

@@ -466,6 +479,12 @@ def internal_flow(self) -> _engine.Flow:
466479
"""
467480
return self._lazy_engine_flow()
468481

482+
async def ainternal_flow(self) -> _engine.Flow:
483+
"""
484+
Get the engine flow. The async version.
485+
"""
486+
return await asyncio.to_thread(self.internal_flow)
487+
469488
def _create_lazy_flow(name: str | None, fl_def: Callable[[FlowBuilder, DataScope], None]) -> Flow:
470489
"""
471490
Create a flow without really building it yet.
@@ -523,17 +542,23 @@ def ensure_all_flows_built() -> None:
523542
"""
524543
Ensure all flows are built.
525544
"""
526-
with _flows_lock:
527-
for fl in _flows.values():
528-
fl.internal_flow()
545+
for fl in flows():
546+
fl.internal_flow()
547+
548+
async def aensure_all_flows_built() -> None:
549+
"""
550+
Ensure all flows are built.
551+
"""
552+
for fl in flows():
553+
await fl.ainternal_flow()
529554

530555
async def update_all_flows(options: FlowLiveUpdaterOptions) -> dict[str, _engine.IndexUpdateInfo]:
531556
"""
532557
Update all flows.
533558
"""
534-
ensure_all_flows_built()
559+
await aensure_all_flows_built()
535560
async def _update_flow(fl: Flow) -> _engine.IndexUpdateInfo:
536-
updater = FlowLiveUpdater(fl, options)
561+
updater = await FlowLiveUpdater.create(fl, options)
537562
await updater.wait()
538563
return updater.update_stats()
539564
fls = flows()

python/cocoindex/lib.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,8 @@ async def _inner(*args, **kwargs):
101101
try:
102102
if _should_run_cli():
103103
# Schedule to a separate thread as it invokes nested event loop.
104-
return await asyncio.to_thread(_run_cli)
104+
# return await asyncio.to_thread(_run_cli)
105+
return _run_cli()
105106
return await fn(*args, **kwargs)
106107
finally:
107108
stop()

0 commit comments

Comments
 (0)