Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/docs/core/cli-commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ cocoindex server [OPTIONS] APP_TARGET
| `-cl, --cors-local INTEGER` | Allow `http://localhost:<port>` to access the server. |
| `-L, --live-update` | Continuously watch changes from data sources and apply to the target index. |
| `--setup` | Automatically setup backends for the flow if it's not setup yet. |
| `--reset` | Drop existing setup before starting server (equivalent to running 'cocoindex drop' first). |
| `--reset` | Drop existing setup before starting server (equivalent to running 'cocoindex drop' first). `--reset` implies `--setup`. |
| `--reexport` | Reexport to targets even if there's no change. |
| `-f, --force` | Force setup without confirmation prompts. |
| `-q, --quiet` | Avoid printing anything to the standard output, e.g. statistics. |
Expand Down Expand Up @@ -207,7 +207,7 @@ cocoindex update [OPTIONS] APP_FLOW_SPECIFIER
| `-L, --live` | Continuously watch changes from data sources and apply to the target index. |
| `--reexport` | Reexport to targets even if there's no change. |
| `--setup` | Automatically setup backends for the flow if it's not setup yet. |
| `--reset` | Drop existing setup before updating (equivalent to running 'cocoindex drop' first). |
| `--reset` | Drop existing setup before updating (equivalent to running 'cocoindex drop' first). `--reset` implies `--setup`. |
| `-f, --force` | Force setup without confirmation prompts. |
| `-q, --quiet` | Avoid printing anything to the standard output, e.g. statistics. |
| `--help` | Show this message and exit. |
Expand Down
113 changes: 54 additions & 59 deletions python/cocoindex/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import asyncio
import datetime
import importlib.util
import json
import os
import signal
import threading
Expand Down Expand Up @@ -220,7 +221,7 @@ def show(app_flow_specifier: str, color: bool, verbose: bool) -> None:
console.print(table)


def _drop_flows(flows: Iterable[flow.Flow], force: bool = False) -> None:
def _drop_flows(flows: Iterable[flow.Flow], app_ref: str, force: bool = False) -> None:
"""
Helper function to drop flows without user interaction.
Used internally by --reset flag
Expand All @@ -229,17 +230,29 @@ def _drop_flows(flows: Iterable[flow.Flow], force: bool = False) -> None:
flows: Iterable of Flow objects to drop
force: If True, skip confirmation prompts
"""
flows_list = list(flows)
if not flows_list:
flow_full_names = ", ".join(fl.full_name for fl in flows)
click.echo(
f"Preparing to drop specified flows: {flow_full_names} (in '{app_ref}').",
err=True,
)

if not flows:
click.echo("No flows identified for the drop operation.")
return

setup_bundle = flow.make_drop_bundle(flows_list)
setup_bundle = flow.make_drop_bundle(flows)
description, is_up_to_date = setup_bundle.describe()

click.echo(description)
if is_up_to_date:
click.echo("No flows need to be dropped.")
return
if not force and not click.confirm(
f"\nThis will apply changes to drop setup for: {flow_full_names}. Continue? [yes/N]",
default=False,
show_default=False,
):
click.echo("Drop operation aborted by user.")
return

click.echo(description)
setup_bundle.apply(report_to_stdout=True)


Expand Down Expand Up @@ -310,7 +323,7 @@ def setup(app_target: str, force: bool, reset: bool) -> None:

# If --reset is specified, drop existing setup first
if reset:
_drop_flows(flow.flows().values(), force=force)
_drop_flows(flow.flows().values(), app_ref=app_ref, force=force)

_setup_flows(flow.flows().values(), force=force, always_show_setup=True)

Expand Down Expand Up @@ -360,30 +373,7 @@ def drop(app_target: str | None, flow_name: tuple[str, ...], force: bool) -> Non
else:
flows = flow.flows().values()

flow_full_names = ", ".join(fl.full_name for fl in flows)
click.echo(
f"Preparing to drop specified flows: {flow_full_names} (in '{app_ref}').",
err=True,
)

if not flows:
click.echo("No flows identified for the drop operation.")
return

setup_bundle = flow.make_drop_bundle(flows)
description, is_up_to_date = setup_bundle.describe()
click.echo(description)
if is_up_to_date:
click.echo("No flows need to be dropped.")
return
if not force and not click.confirm(
f"\nThis will apply changes to drop setup for: {flow_full_names}. Continue? [yes/N]",
default=False,
show_default=False,
):
click.echo("Drop operation aborted by user.")
return
setup_bundle.apply(report_to_stdout=True)
_drop_flows(flows, app_ref=app_ref, force=force)


@cli.command()
Expand Down Expand Up @@ -415,7 +405,7 @@ def drop(app_target: str | None, flow_name: tuple[str, ...], force: bool) -> Non
is_flag=True,
show_default=True,
default=False,
help="Drop existing setup before updating (equivalent to running 'cocoindex drop' first).",
help="Drop existing setup before updating (equivalent to running 'cocoindex drop' first). `--reset` implies `--setup`.",
)
@click.option(
"-f",
Expand Down Expand Up @@ -450,15 +440,13 @@ def update(
"""
app_ref, flow_name = _parse_app_flow_specifier(app_flow_specifier)
_load_user_app(app_ref)
flow_list = (
[flow.flow_by_name(flow_name)] if flow_name else list(flow.flows().values())
)

# If --reset is specified, drop existing setup first
if reset:
if flow_name:
# Reset specific flow only
_drop_flows([flow.flow_by_name(flow_name)], force=force)
else:
# Reset all flows
_drop_flows(flow.flows().values(), force=force)
_drop_flows(flow_list, app_ref=app_ref, force=force)

if live:
click.secho(
Expand All @@ -471,19 +459,14 @@ def update(
reexport_targets=reexport,
print_stats=not quiet,
)
if reset or setup:
_setup_flows(flow_list, force=force, quiet=quiet)

if flow_name is None:
if setup:
_setup_flows(
flow.flows().values(),
force=force,
quiet=quiet,
)
execution_context.run(_update_all_flows_with_hint_async(options))
else:
fl = flow.flow_by_name(flow_name)
if setup:
_setup_flows((fl,), force=force, quiet=quiet)
with flow.FlowLiveUpdater(fl, options) as updater:
assert len(flow_list) == 1
with flow.FlowLiveUpdater(flow_list[0], options) as updater:
updater.wait()
if options.live_mode:
_show_no_live_update_hint()
Expand Down Expand Up @@ -586,7 +569,7 @@ def evaluate(
is_flag=True,
show_default=True,
default=False,
help="Drop existing setup before starting server (equivalent to running 'cocoindex drop' first).",
help="Drop existing setup before starting server (equivalent to running 'cocoindex drop' first). `--reset` implies `--setup`.",
)
@click.option(
"--reexport",
Expand Down Expand Up @@ -648,12 +631,14 @@ def server(
cors_cocoindex,
cors_local,
live_update,
setup,
reset,
reexport,
force,
quiet,
)
kwargs = {
"run_reset": reset,
"run_setup": setup,
"force": force,
}

if reload:
watch_paths = {os.getcwd()}
Expand All @@ -671,6 +656,7 @@ def server(
*watch_paths,
target=_reloadable_server_target,
args=args,
kwargs=kwargs,
watch_filter=watchfiles.PythonFilter(),
callback=lambda changes: click.secho(
f"\nDetected changes in {len(changes)} file(s), reloading server...\n",
Expand All @@ -682,12 +668,19 @@ def server(
"NOTE: Flow code changes will NOT be reflected until you restart to load the new code. Use --reload to enable auto-reload.\n",
fg="yellow",
)
_run_server(*args)
_run_server(*args, **kwargs)


def _reloadable_server_target(*args: Any, **kwargs: Any) -> None:
"""Reloadable target for the watchfiles process."""
_initialize_cocoindex_in_process()

kwargs["run_setup"] = kwargs["run_setup"] or kwargs["run_reset"]
changed_files = json.loads(os.environ.get("WATCHFILES_CHANGES", "[]"))
if changed_files:
kwargs["run_reset"] = False
kwargs["force"] = True

_run_server(*args, **kwargs)


Expand All @@ -698,11 +691,13 @@ def _run_server(
cors_cocoindex: bool = False,
cors_local: int | None = None,
live_update: bool = False,
run_setup: bool = False,
run_reset: bool = False,
reexport: bool = False,
force: bool = False,
quiet: bool = False,
/,
*,
force: bool = False,
run_reset: bool = False,
run_setup: bool = False,
) -> None:
"""Helper function to run the server with specified settings."""
_load_user_app(app_ref)
Expand All @@ -728,7 +723,7 @@ def _run_server(

# If --reset is specified, drop existing setup first
if run_reset:
_drop_flows(flow.flows().values(), force=force)
_drop_flows(flow.flows().values(), app_ref=app_ref, force=force)

server_settings = setting.ServerSettings.from_env()
cors_origins: set[str] = set(server_settings.cors_origins or [])
Expand All @@ -743,7 +738,7 @@ def _run_server(
if address is not None:
server_settings.address = address

if run_setup:
if run_reset or run_setup:
_setup_flows(
flow.flows().values(),
force=force,
Expand Down