Skip to content

Commit 5fbc467

Browse files
committed
feat(setup-api): update and officially expose setup API
1 parent 559abf6 commit 5fbc467

File tree

4 files changed

+98
-87
lines changed

4 files changed

+98
-87
lines changed

python/cocoindex/__init__.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@
1010
from .flow import FlowBuilder, DataScope, DataSlice, Flow, transform_flow
1111
from .flow import flow_def
1212
from .flow import EvaluateAndDumpOptions, GeneratedField
13-
from .flow import update_all_flows_async, FlowLiveUpdater, FlowLiveUpdaterOptions
13+
from .flow import FlowLiveUpdater, FlowLiveUpdaterOptions
14+
from .flow import update_all_flows_async, setup_all_flows, drop_all_flows
1415
from .lib import init, start_server, stop, main_fn
1516
from .llm import LlmSpec, LlmApiType
1617
from .index import VectorSimilarityMetric, VectorIndexDef, IndexOptions
@@ -40,9 +41,11 @@
4041
"flow_def",
4142
"EvaluateAndDumpOptions",
4243
"GeneratedField",
43-
"update_all_flows_async",
4444
"FlowLiveUpdater",
4545
"FlowLiveUpdaterOptions",
46+
"update_all_flows_async",
47+
"setup_all_flows",
48+
"drop_all_flows",
4649
# Lib
4750
"init",
4851
"start_server",

python/cocoindex/cli.py

Lines changed: 28 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
import threading
88
import types
99
from types import FrameType
10-
from typing import Any
10+
from typing import Any, Iterable
1111

1212
import click
1313
import watchfiles
@@ -17,7 +17,7 @@
1717
from rich.table import Table
1818

1919
from . import flow, lib, setting
20-
from .setup import make_setup_bundle, make_drop_bundle, flow_names_with_setup
20+
from .setup import flow_names_with_setup
2121

2222
# Create ServerSettings lazily upon first call, as environment variables may be loaded from files, etc.
2323
COCOINDEX_HOST = "https://cocoindex.io"
@@ -244,15 +244,15 @@ def show(app_flow_specifier: str, color: bool, verbose: bool) -> None:
244244

245245

246246
def _setup_flows(
247+
flow_iter: Iterable[flow.Flow],
247248
*,
248-
flow_full_names: list[str],
249249
force: bool,
250250
quiet: bool = False,
251251
always_show_setup: bool = False,
252252
) -> None:
253-
setup_bundle = make_setup_bundle(flow_full_names=flow_full_names)
253+
setup_bundle = flow.make_setup_bundle(flow_iter)
254254
description, is_up_to_date = setup_bundle.describe()
255-
if always_show_setup or not (quiet and (force or is_up_to_date)):
255+
if always_show_setup or not is_up_to_date:
256256
click.echo(description)
257257
if is_up_to_date:
258258
if not quiet:
@@ -285,9 +285,7 @@ def setup(app_target: str, force: bool) -> None:
285285
"""
286286
app_ref = _get_app_ref_from_specifier(app_target)
287287
_load_user_app(app_ref)
288-
_setup_flows(
289-
flow_full_names=flow.flow_full_names(), force=force, always_show_setup=True
290-
)
288+
_setup_flows(flow.flows().values(), force=force, always_show_setup=True)
291289

292290

293291
@cli.command("drop")
@@ -311,7 +309,6 @@ def drop(app_target: str | None, flow_name: tuple[str, ...], force: bool) -> Non
311309
2. Drop specific named flows: `cocoindex drop <APP_TARGET> [FLOW_NAME...]`
312310
"""
313311
app_ref = None
314-
flow_names: list[str] = []
315312

316313
if not app_target:
317314
raise click.UsageError(
@@ -321,44 +318,39 @@ def drop(app_target: str | None, flow_name: tuple[str, ...], force: bool) -> Non
321318

322319
app_ref = _get_app_ref_from_specifier(app_target)
323320
_load_user_app(app_ref)
321+
322+
flows: Iterable[flow.Flow]
324323
if flow_name:
325-
flow_names = list(flow_name)
326-
click.echo(
327-
f"Preparing to drop specified flows: {', '.join(flow_names)} (in '{app_ref}').",
328-
err=True,
329-
)
324+
flows = []
325+
for name in flow_name:
326+
try:
327+
flows.append(flow.flow_by_name(name))
328+
except KeyError:
329+
click.echo(
330+
f"Warning: Failed to get flow `{name}`. Ignored.",
331+
err=True,
332+
)
330333
else:
331-
flow_names = flow.flow_names()
332-
if not flow_names:
333-
click.echo(f"No flows found defined in '{app_ref}' to drop.")
334-
return
335-
click.echo(
336-
f"Preparing to drop all flows defined in '{app_ref}': {', '.join(flow_names)}.",
337-
err=True,
338-
)
334+
flows = flow.flows().values()
339335

340-
flow_full_names = []
341-
for name in flow_names:
342-
try:
343-
flow_full_names.append(flow.flow_by_name(name).full_name)
344-
except KeyError:
345-
click.echo(
346-
f"Warning: Failed to get flow `{name}`. Ignored.",
347-
err=True,
348-
)
336+
flow_full_names = ", ".join(fl.full_name for fl in flows)
337+
click.echo(
338+
f"Preparing to drop specified flows: {flow_full_names} (in '{app_ref}').",
339+
err=True,
340+
)
349341

350-
if not flow_full_names:
342+
if not flows:
351343
click.echo("No flows identified for the drop operation.")
352344
return
353345

354-
setup_bundle = make_drop_bundle(flow_full_names=flow_full_names)
346+
setup_bundle = flow.make_drop_bundle(flows)
355347
description, is_up_to_date = setup_bundle.describe()
356348
click.echo(description)
357349
if is_up_to_date:
358350
click.echo("No flows need to be dropped.")
359351
return
360352
if not force and not click.confirm(
361-
f"\nThis will apply changes to drop setup for: {', '.join(flow_full_names)}. Continue? [yes/N]",
353+
f"\nThis will apply changes to drop setup for: {flow_full_names}. Continue? [yes/N]",
362354
default=False,
363355
show_default=False,
364356
):
@@ -420,19 +412,15 @@ def update(
420412
if flow_name is None:
421413
if setup:
422414
_setup_flows(
423-
flow_full_names=flow.flow_full_names(),
415+
flow.flows().values(),
424416
force=force,
425417
quiet=quiet,
426418
)
427419
return flow.update_all_flows(options)
428420
else:
429421
fl = flow.flow_by_name(flow_name)
430422
if setup:
431-
_setup_flows(
432-
flow_full_names=[fl.full_name],
433-
force=force,
434-
quiet=quiet,
435-
)
423+
_setup_flows((fl,), force=force, quiet=quiet)
436424
with flow.FlowLiveUpdater(fl, options) as updater:
437425
updater.wait()
438426
return updater.update_stats()

python/cocoindex/flow.py

Lines changed: 44 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
import inspect
1010
import datetime
1111
import functools
12-
1312
from typing import (
1413
Any,
1514
Callable,
@@ -20,6 +19,7 @@
2019
get_origin,
2120
NamedTuple,
2221
cast,
22+
Iterable,
2323
)
2424
from threading import Lock
2525
from enum import Enum
@@ -34,7 +34,7 @@
3434
from .convert import dump_engine_object, encode_engine_value, make_engine_value_decoder
3535
from .typing import encode_enriched_type
3636
from .runtime import execution_context
37-
from .setup import make_setup_bundle, make_drop_bundle
37+
from .setup import SetupChangeBundle
3838

3939

4040
class _NameBuilder:
@@ -668,7 +668,7 @@ async def setup_async(self, report_to_stdout: bool = False) -> None:
668668
"""
669669
Setup the flow. The async version.
670670
"""
671-
await make_setup_bundle(flow_full_names=[self.full_name]).apply_async(
671+
await make_setup_bundle([self]).describe_and_apply_async(
672672
report_to_stdout=report_to_stdout
673673
)
674674

@@ -682,7 +682,7 @@ async def drop_async(self, report_to_stdout: bool = False) -> None:
682682
"""
683683
Drop the flow. The async version.
684684
"""
685-
await make_drop_bundle(flow_full_names=[self.full_name]).apply_async(
685+
await make_drop_bundle([self]).describe_and_apply_async(
686686
report_to_stdout=report_to_stdout
687687
)
688688

@@ -751,14 +751,6 @@ def flow_names() -> list[str]:
751751
return list(_flows.keys())
752752

753753

754-
def flow_full_names() -> list[str]:
755-
"""
756-
Get the full names of all flows.
757-
"""
758-
with _flows_lock:
759-
return [fl.full_name for fl in _flows.values()]
760-
761-
762754
def flows() -> dict[str, Flow]:
763755
"""
764756
Get all flows.
@@ -1004,3 +996,43 @@ def _transform_flow_wrapper(fn: Callable[..., DataSlice[T]]) -> TransformFlow[T]
1004996
return _transform_flow
1005997

1006998
return _transform_flow_wrapper
999+
1000+
1001+
def make_setup_bundle(flow_iter: Iterable[Flow]) -> SetupChangeBundle:
1002+
"""
1003+
Make a bundle to setup flows with the given names.
1004+
"""
1005+
full_names = []
1006+
for fl in flow_iter:
1007+
fl.internal_flow()
1008+
full_names.append(fl.full_name)
1009+
return SetupChangeBundle(_engine.make_setup_bundle(full_names))
1010+
1011+
1012+
def make_drop_bundle(flow_iter: Iterable[Flow]) -> SetupChangeBundle:
1013+
"""
1014+
Make a bundle to drop flows with the given names.
1015+
"""
1016+
full_names = []
1017+
for fl in flow_iter:
1018+
fl.internal_flow()
1019+
full_names.append(fl.full_name)
1020+
return SetupChangeBundle(_engine.make_drop_bundle(full_names))
1021+
1022+
1023+
def setup_all_flows(report_to_stdout: bool = False) -> None:
1024+
"""
1025+
Setup all flows registered in the current process.
1026+
"""
1027+
with _flows_lock:
1028+
flow_list = list(_flows.values())
1029+
make_setup_bundle(flow_list).describe_and_apply(report_to_stdout=report_to_stdout)
1030+
1031+
1032+
def drop_all_flows(report_to_stdout: bool = False) -> None:
1033+
"""
1034+
Drop all flows registered in the current process.
1035+
"""
1036+
with _flows_lock:
1037+
flow_list = list(_flows.values())
1038+
make_drop_bundle(flow_list).describe_and_apply(report_to_stdout=report_to_stdout)

python/cocoindex/setup.py

Lines changed: 21 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
This module provides APIs to manage the setup of flows.
33
"""
44

5-
from . import flow
65
from . import setting
76
from . import _engine # type: ignore
87
from .runtime import execution_context
@@ -49,39 +48,28 @@ async def describe_async(self) -> tuple[str, bool]:
4948
"""
5049
return await self._engine_bundle.describe_async() # type: ignore
5150

51+
def describe_and_apply(self, report_to_stdout: bool = False) -> None:
52+
"""
53+
Describe the setup changes and apply them if `report_to_stdout` is True.
54+
Silently apply setup changes otherwise.
55+
"""
56+
execution_context.run(
57+
self.describe_and_apply_async(report_to_stdout=report_to_stdout)
58+
)
5259

53-
def make_setup_bundle(*, flow_full_names: list[str]) -> SetupChangeBundle:
54-
"""
55-
Make a bundle to setup flows with the given names.
56-
"""
57-
flow.ensure_all_flows_built()
58-
return SetupChangeBundle(_engine.make_setup_bundle(flow_full_names))
59-
60-
61-
def make_drop_bundle(*, flow_full_names: list[str]) -> SetupChangeBundle:
62-
"""
63-
Make a bundle to drop flows with the given names.
64-
"""
65-
flow.ensure_all_flows_built()
66-
return SetupChangeBundle(_engine.make_drop_bundle(flow_full_names))
67-
68-
69-
def setup_all_flows(report_to_stdout: bool = False) -> None:
70-
"""
71-
Setup all flows registered in the current process.
72-
"""
73-
make_setup_bundle(flow_full_names=flow.flow_full_names()).apply(
74-
report_to_stdout=report_to_stdout
75-
)
76-
77-
78-
def drop_all_flows(report_to_stdout: bool = False) -> None:
79-
"""
80-
Drop all flows registered in the current process.
81-
"""
82-
make_drop_bundle(flow_full_names=flow.flow_full_names()).apply(
83-
report_to_stdout=report_to_stdout
84-
)
60+
async def describe_and_apply_async(self, *, report_to_stdout: bool = False) -> None:
61+
"""
62+
Describe the setup changes and apply them if `report_to_stdout` is True.
63+
Silently apply setup changes otherwise. Async version of `describe_and_apply`.
64+
"""
65+
if report_to_stdout:
66+
desc, is_up_to_date = await self.describe_async()
67+
print("Setup status:\n")
68+
print(desc)
69+
if is_up_to_date:
70+
print("No setup changes to apply.")
71+
return
72+
await self.apply_async(report_to_stdout=report_to_stdout)
8573

8674

8775
def flow_names_with_setup() -> list[str]:

0 commit comments

Comments
 (0)