Skip to content

Commit 5b1d3c3

Browse files
authored
Support -L/--live-update for cocoindex server subcommand. (#251)
1 parent 383bd9f commit 5b1d3c3

File tree

3 files changed

+36
-20
lines changed

3 files changed

+36
-20
lines changed

python/cocoindex/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
"""
22
Cocoindex is a framework for building and running indexing pipelines.
33
"""
4-
from . import flow, functions, query, sources, storages, cli
4+
from . import functions, query, sources, storages, cli
55
from .flow import FlowBuilder, DataScope, DataSlice, Flow, flow_def
66
from .flow import EvaluateAndDumpOptions, GeneratedField, SourceRefreshOptions
7+
from .flow import update_all_flows, FlowLiveUpdaterOptions
78
from .llm import LlmSpec, LlmApiType
89
from .vector import VectorSimilarityMetric
910
from .lib import *

python/cocoindex/cli.py

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -52,19 +52,17 @@ def setup(delete_legacy_flows):
5252
help="Continuously watch changes from data sources and apply to the target index.")
5353
@click.option(
5454
"-q", "--quiet", is_flag=True, show_default=True, default=False,
55-
help="Avoid printing anything to the output, e.g. statistics.")
55+
help="Avoid printing anything to the standard output, e.g. statistics.")
5656
def update(flow_name: str | None, live: bool, quiet: bool):
5757
"""
5858
Update the index to reflect the latest data from data sources.
5959
"""
60-
async def _update_all():
61-
async def _update_flow(fl: flow.Flow):
62-
updater = flow.FlowLiveUpdater(
63-
fl,
64-
flow.FlowLiveUpdaterOptions(live_mode=live, print_stats=not quiet))
65-
await updater.wait()
66-
await asyncio.gather(*(_update_flow(fl) for fl in _flows_by_name(flow_name)))
67-
asyncio.run(_update_all())
60+
options = flow.FlowLiveUpdaterOptions(live_mode=live, print_stats=not quiet)
61+
if flow_name is None:
62+
asyncio.run(flow.update_all_flows(options))
63+
else:
64+
updater = flow.FlowLiveUpdater(_flow_by_name(flow_name), options)
65+
asyncio.run(updater.wait())
6866

6967
@cli.command()
7068
@click.argument("flow_name", type=str, required=False)
@@ -99,13 +97,22 @@ def evaluate(flow_name: str | None, output_dir: str | None, use_cache: bool = Tr
9997
"-c", "--cors-origin", type=str, default=_default_server_settings.cors_origin,
10098
help="The origin of the client (e.g. CocoInsight UI) to allow CORS from. "
10199
"e.g. `http://cocoindex.io` if you want to allow CocoInsight to access the server.")
102-
def server(address: str, cors_origin: str | None):
100+
@click.option(
101+
"-L", "--live-update", is_flag=True, show_default=True, default=False,
102+
help="Continuously watch changes from data sources and apply to the target index.")
103+
@click.option(
104+
"-q", "--quiet", is_flag=True, show_default=True, default=False,
105+
help="Avoid printing anything to the standard output, e.g. statistics.")
106+
def server(address: str, live_update: bool, quiet: bool, cors_origin: str | None):
103107
"""
104108
Start a HTTP server providing REST APIs.
105109
106110
It will allow tools like CocoInsight to access the server.
107111
"""
108112
lib.start_server(lib.ServerSettings(address=address, cors_origin=cors_origin))
113+
if live_update:
114+
options = flow.FlowLiveUpdaterOptions(live_mode=True, print_stats=not quiet)
115+
asyncio.run(flow.update_all_flows(options))
109116
input("Press Enter to stop...")
110117

111118

@@ -124,9 +131,3 @@ def _flow_name(name: str | None) -> str:
124131

125132
def _flow_by_name(name: str | None) -> flow.Flow:
126133
return flow.flow_by_name(_flow_name(name))
127-
128-
def _flows_by_name(name: str | None) -> list[flow.Flow]:
129-
if name is None:
130-
return flow.flows()
131-
else:
132-
return [flow.flow_by_name(name)]

python/cocoindex/flow.py

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
from __future__ import annotations
66

7+
import asyncio
78
import re
89
import inspect
910
import datetime
@@ -382,13 +383,13 @@ def __init__(self, fl: Flow, options: FlowLiveUpdaterOptions):
382383
self._engine_live_updater = _engine.FlowLiveUpdater(
383384
fl._lazy_engine_flow(), _dump_engine_object(options))
384385

385-
async def wait(self):
386+
async def wait(self) -> None:
386387
"""
387388
Wait for the live updater to finish.
388389
"""
389-
return await self._engine_live_updater.wait()
390+
await self._engine_live_updater.wait()
390391

391-
def abort(self):
392+
def abort(self) -> None:
392393
"""
393394
Abort the live updater.
394395
"""
@@ -522,6 +523,19 @@ def ensure_all_flows_built() -> None:
522523
for fl in _flows.values():
523524
fl.internal_flow()
524525

526+
async def update_all_flows(options: FlowLiveUpdaterOptions) -> dict[str, _engine.IndexUpdateInfo]:
527+
"""
528+
Update all flows.
529+
"""
530+
ensure_all_flows_built()
531+
async def _update_flow(fl: Flow) -> _engine.IndexUpdateInfo:
532+
updater = FlowLiveUpdater(fl, options)
533+
await updater.wait()
534+
return updater.update_stats()
535+
fls = flows()
536+
all_stats = await asyncio.gather(*(_update_flow(fl) for fl in fls))
537+
return {fl.name: stats for fl, stats in zip(fls, all_stats)}
538+
525539
_transient_flow_name_builder = _NameBuilder()
526540
class TransientFlow:
527541
"""

0 commit comments

Comments
 (0)