Skip to content

Commit d20dec1

Browse files
committed
fix(annotation): make execution_context.run() have speicfic annotation
1 parent 894ef37 commit d20dec1

File tree

3 files changed

+17
-8
lines changed

3 files changed

+17
-8
lines changed

examples/amazon_s3_embedding/main.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,16 @@ def _main() -> None:
101101
pool = ConnectionPool(os.getenv("COCOINDEX_DATABASE_URL"))
102102

103103
amazon_s3_text_embedding_flow.setup()
104-
with cocoindex.FlowLiveUpdater(amazon_s3_text_embedding_flow):
104+
with cocoindex.FlowLiveUpdater(amazon_s3_text_embedding_flow) as updater:
105+
updater.abort()
106+
updater.wait()
107+
108+
while True:
109+
updates = updater.next_status_updates()
110+
print(f"Updates: {updates}")
111+
if not updates.active_sources:
112+
break
113+
105114
# Run queries in a loop to demonstrate the query capabilities.
106115
while True:
107116
query = input("Enter search query (or Enter to quit): ")

python/cocoindex/flow.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -879,10 +879,7 @@ def update_all_flows(
879879
"""
880880
Update all flows.
881881
"""
882-
return cast(
883-
dict[str, _engine.IndexUpdateInfo],
884-
execution_context.run(update_all_flows_async(options)),
885-
)
882+
return execution_context.run(update_all_flows_async(options))
886883

887884

888885
async def update_all_flows_async(
@@ -1037,7 +1034,7 @@ def eval(self, *args: Any, **kwargs: Any) -> T:
10371034
"""
10381035
Evaluate the transform flow.
10391036
"""
1040-
return cast(T, execution_context.run(self.eval_async(*args, **kwargs)))
1037+
return execution_context.run(self.eval_async(*args, **kwargs))
10411038

10421039
async def eval_async(self, *args: Any, **kwargs: Any) -> T:
10431040
"""

python/cocoindex/runtime.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,10 @@
55

66
import threading
77
import asyncio
8-
from typing import Any, Coroutine
8+
from typing import Any, Coroutine, TypeVar
9+
10+
11+
T = TypeVar("T")
912

1013

1114
class _ExecutionContext:
@@ -26,7 +29,7 @@ def event_loop(self) -> asyncio.AbstractEventLoop:
2629
).start()
2730
return self._event_loop
2831

29-
def run(self, coro: Coroutine[Any, Any, Any]) -> Any:
32+
def run(self, coro: Coroutine[Any, Any, T]) -> T:
3033
"""Run a coroutine in the event loop, blocking until it finishes. Return its result."""
3134
return asyncio.run_coroutine_threadsafe(coro, self.event_loop).result()
3235

0 commit comments

Comments
 (0)