Skip to content

Commit 7ea8c26

Browse files
authored
Change gdrive example to continously watch change in main. (#252)
1 parent 5b1d3c3 commit 7ea8c26

File tree

4 files changed

+35
-20
lines changed

4 files changed

+35
-20
lines changed

examples/gdrive_text_embedding/main.py

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -51,21 +51,23 @@ def gdrive_text_embedding_flow(flow_builder: cocoindex.FlowBuilder, data_scope:
5151

5252
@cocoindex.main_fn()
5353
def _run():
54-
# Run queries in a loop to demonstrate the query capabilities.
55-
while True:
56-
try:
57-
query = input("Enter search query (or Enter to quit): ")
58-
if query == '':
54+
# Use a `FlowLiveUpdater` to keep the flow data updated.
55+
with cocoindex.FlowLiveUpdater(gdrive_text_embedding_flow):
56+
# Run queries in a loop to demonstrate the query capabilities.
57+
while True:
58+
try:
59+
query = input("Enter search query (or Enter to quit): ")
60+
if query == '':
61+
break
62+
results, _ = query_handler.search(query, 10)
63+
print("\nSearch results:")
64+
for result in results:
65+
print(f"[{result.score:.3f}] {result.data['filename']}")
66+
print(f" {result.data['text']}")
67+
print("---")
68+
print()
69+
except KeyboardInterrupt:
5970
break
60-
results, _ = query_handler.search(query, 10)
61-
print("\nSearch results:")
62-
for result in results:
63-
print(f"[{result.score:.3f}] {result.data['filename']}")
64-
print(f" {result.data['text']}")
65-
print("---")
66-
print()
67-
except KeyboardInterrupt:
68-
break
6971

7072
if __name__ == "__main__":
7173
load_dotenv(override=True)

python/cocoindex/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from . import functions, query, sources, storages, cli
55
from .flow import FlowBuilder, DataScope, DataSlice, Flow, flow_def
66
from .flow import EvaluateAndDumpOptions, GeneratedField, SourceRefreshOptions
7-
from .flow import update_all_flows, FlowLiveUpdaterOptions
7+
from .flow import update_all_flows, FlowLiveUpdater, FlowLiveUpdaterOptions
88
from .llm import LlmSpec, LlmApiType
99
from .vector import VectorSimilarityMetric
1010
from .lib import *

python/cocoindex/flow.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -370,7 +370,7 @@ class FlowLiveUpdaterOptions:
370370
"""
371371
Options for live updating a flow.
372372
"""
373-
live_mode: bool = False
373+
live_mode: bool = True
374374
print_stats: bool = False
375375

376376
class FlowLiveUpdater:
@@ -379,9 +379,16 @@ class FlowLiveUpdater:
379379
"""
380380
_engine_live_updater: _engine.FlowLiveUpdater
381381

382-
def __init__(self, fl: Flow, options: FlowLiveUpdaterOptions):
382+
def __init__(self, fl: Flow, options: FlowLiveUpdaterOptions | None = None):
383383
self._engine_live_updater = _engine.FlowLiveUpdater(
384-
fl._lazy_engine_flow(), _dump_engine_object(options))
384+
fl._lazy_engine_flow(), _dump_engine_object(options or FlowLiveUpdaterOptions()))
385+
386+
def __enter__(self) -> FlowLiveUpdater:
387+
return self
388+
389+
def __exit__(self, exc_type, exc_value, traceback):
390+
self.abort()
391+
asyncio.run(self.wait())
385392

386393
async def wait(self) -> None:
387394
"""

src/execution/live_updater.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -177,8 +177,14 @@ impl FlowLiveUpdater {
177177

178178
pub async fn wait(&mut self) -> Result<()> {
179179
while let Some(result) = self.tasks.join_next().await {
180-
if let Err(e) = (|| anyhow::Ok(result??))() {
181-
error!("{:?}", e.context("Error in applying changes from a source"));
180+
match result {
181+
Err(e) if !e.is_cancelled() => {
182+
error!("{:?}", e);
183+
}
184+
Ok(Err(e)) => {
185+
error!("{:?}", e.context("Error in applying changes from a source"));
186+
}
187+
_ => {}
182188
}
183189
}
184190
Ok(())

0 commit comments

Comments
 (0)