Skip to content

Commit 33b8714

Browse files
committed
feat(status-updates): add next_status_updates() for FlowLiveUpdater
1 parent 894ef37 commit 33b8714

File tree

5 files changed

+350
-185
lines changed

5 files changed

+350
-185
lines changed

python/cocoindex/flow.py

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -534,6 +534,18 @@ class FlowLiveUpdaterOptions:
534534
print_stats: bool = False
535535

536536

537+
class FlowUpdaterStatusUpdates(NamedTuple):
538+
"""
539+
Status updates for a flow updater.
540+
"""
541+
542+
# Sources that are still active, i.e. not stopped processing.
543+
active_sources: list[str]
544+
545+
# Sources with updates since last time.
546+
updated_sources: list[str]
547+
548+
537549
class FlowLiveUpdater:
538550
"""
539551
A live updater for a flow.
@@ -587,7 +599,23 @@ async def wait_async(self) -> None:
587599
"""
588600
Wait for the live updater to finish. Async version.
589601
"""
590-
await self._get_engine_live_updater().wait()
602+
await self._get_engine_live_updater().wait_async()
603+
604+
def next_status_updates(self) -> FlowUpdaterStatusUpdates:
605+
"""
606+
Get the next status updates.
607+
"""
608+
return execution_context.run(self.next_status_updates_async())
609+
610+
async def next_status_updates_async(self) -> FlowUpdaterStatusUpdates:
611+
"""
612+
Get the next status updates. Async version.
613+
"""
614+
updates = await self._get_engine_live_updater().next_status_updates_async()
615+
return FlowUpdaterStatusUpdates(
616+
active_sources=updates.active_sources,
617+
updated_sources=updates.updated_sources,
618+
)
591619

592620
def abort(self) -> None:
593621
"""
@@ -879,10 +907,7 @@ def update_all_flows(
879907
"""
880908
Update all flows.
881909
"""
882-
return cast(
883-
dict[str, _engine.IndexUpdateInfo],
884-
execution_context.run(update_all_flows_async(options)),
885-
)
910+
return execution_context.run(update_all_flows_async(options))
886911

887912

888913
async def update_all_flows_async(
@@ -1037,7 +1062,7 @@ def eval(self, *args: Any, **kwargs: Any) -> T:
10371062
"""
10381063
Evaluate the transform flow.
10391064
"""
1040-
return cast(T, execution_context.run(self.eval_async(*args, **kwargs)))
1065+
return execution_context.run(self.eval_async(*args, **kwargs))
10411066

10421067
async def eval_async(self, *args: Any, **kwargs: Any) -> T:
10431068
"""

python/cocoindex/runtime.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,10 @@
55

66
import threading
77
import asyncio
8-
from typing import Any, Coroutine
8+
from typing import Any, Coroutine, TypeVar
9+
10+
11+
T = TypeVar("T")
912

1013

1114
class _ExecutionContext:
@@ -26,7 +29,7 @@ def event_loop(self) -> asyncio.AbstractEventLoop:
2629
).start()
2730
return self._event_loop
2831

29-
def run(self, coro: Coroutine[Any, Any, Any]) -> Any:
32+
def run(self, coro: Coroutine[Any, Any, T]) -> T:
3033
"""Run a coroutine in the event loop, blocking until it finishes. Return its result."""
3134
return asyncio.run_coroutine_threadsafe(coro, self.event_loop).result()
3235

0 commit comments

Comments
 (0)