Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,4 @@ python_version = "3.11"
strict = true
files = "python/cocoindex"
exclude = "(\\.venv|site-packages)"
disable_error_code = ["unused-ignore"]
168 changes: 102 additions & 66 deletions python/cocoindex/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,30 @@ def show(app_flow_specifier: str, color: bool, verbose: bool) -> None:
console.print(table)


def _setup_flows(
*,
flow_full_names: list[str],
force: bool,
quiet: bool = False,
always_show_setup: bool = False,
) -> None:
setup_bundle = make_setup_bundle(flow_full_names=flow_full_names)
description, is_up_to_date = setup_bundle.describe()
if always_show_setup or not (quiet and (force or is_up_to_date)):
click.echo(description)
if is_up_to_date:
if not quiet:
click.echo("Setup is already up to date.")
return
if not force and not click.confirm(
"Changes need to be pushed. Continue? [yes/N]",
default=False,
show_default=False,
):
return
setup_bundle.apply(report_to_stdout=not quiet)


@cli.command()
@click.argument("app_target", type=str)
@click.option(
Expand All @@ -261,36 +285,14 @@ def setup(app_target: str, force: bool) -> None:
"""
app_ref = _get_app_ref_from_specifier(app_target)
_load_user_app(app_ref)

setup_bundle = make_setup_bundle(flow.flow_names())
description, is_up_to_date = setup_bundle.describe()
click.echo(description)
if is_up_to_date:
click.echo("No changes need to be pushed.")
return
if not force and not click.confirm(
"Changes need to be pushed. Continue? [yes/N]",
default=False,
show_default=False,
):
return
setup_bundle.apply(write_to_stdout=True)
_setup_flows(
flow_full_names=flow.flow_full_names(), force=force, always_show_setup=True
)


@cli.command("drop")
@click.argument("app_target", type=str, required=False)
@click.argument("flow_name", type=str, nargs=-1)
@click.option(
"-a",
"--all",
"drop_all",
is_flag=True,
show_default=True,
default=False,
help="Drop the backend setup for all flows with persisted setup, "
"even if not defined in the current process."
"If used, APP_TARGET and any listed flow names are ignored.",
)
@click.option(
"-f",
"--force",
Expand All @@ -299,70 +301,70 @@ def setup(app_target: str, force: bool) -> None:
default=False,
help="Force drop without confirmation prompts.",
)
def drop(
app_target: str | None, flow_name: tuple[str, ...], drop_all: bool, force: bool
) -> None:
def drop(app_target: str | None, flow_name: tuple[str, ...], force: bool) -> None:
"""
Drop the backend setup for flows.

\b
Modes of operation:
1. Drop ALL persisted setups: `cocoindex drop --all`
2. Drop all flows defined in an app: `cocoindex drop <APP_TARGET>`
3. Drop specific named flows: `cocoindex drop <APP_TARGET> [FLOW_NAME...]`
1. Drop all flows defined in an app: `cocoindex drop <APP_TARGET>`
2. Drop specific named flows: `cocoindex drop <APP_TARGET> [FLOW_NAME...]`
"""
app_ref = None
flow_names = []
flow_names: list[str] = []

if drop_all:
if app_target or flow_name:
click.echo(
"Warning: When --all is used, APP_TARGET and any individual flow names are ignored.",
err=True,
)
flow_names = flow_names_with_setup()
elif app_target:
app_ref = _get_app_ref_from_specifier(app_target)
_load_user_app(app_ref)
if flow_name:
flow_names = list(flow_name)
click.echo(
f"Preparing to drop specified flows: {', '.join(flow_names)} (in '{app_ref}').",
err=True,
)
else:
flow_names = flow.flow_names()
if not flow_names:
click.echo(f"No flows found defined in '{app_ref}' to drop.")
return
click.echo(
f"Preparing to drop all flows defined in '{app_ref}': {', '.join(flow_names)}.",
err=True,
)
else:
if not app_target:
raise click.UsageError(
"Missing arguments. You must either provide an APP_TARGET (to target app-specific flows) "
"or use the --all flag."
)

if not flow_names:
app_ref = _get_app_ref_from_specifier(app_target)
_load_user_app(app_ref)
if flow_name:
flow_names = list(flow_name)
click.echo(
f"Preparing to drop specified flows: {', '.join(flow_names)} (in '{app_ref}').",
err=True,
)
else:
flow_names = flow.flow_names()
if not flow_names:
click.echo(f"No flows found defined in '{app_ref}' to drop.")
return
click.echo(
f"Preparing to drop all flows defined in '{app_ref}': {', '.join(flow_names)}.",
err=True,
)

flow_full_names = []
for name in flow_names:
try:
flow_full_names.append(flow.flow_by_name(name).full_name)
except KeyError:
click.echo(
f"Warning: Failed to get flow `{name}`. Ignored.",
err=True,
)

if not flow_full_names:
click.echo("No flows identified for the drop operation.")
return

setup_bundle = make_drop_bundle(flow_names)
setup_bundle = make_drop_bundle(flow_full_names=flow_full_names)
description, is_up_to_date = setup_bundle.describe()
click.echo(description)
if is_up_to_date:
click.echo("No flows need to be dropped.")
return
if not force and not click.confirm(
f"\nThis will apply changes to drop setup for: {', '.join(flow_names)}. Continue? [yes/N]",
f"\nThis will apply changes to drop setup for: {', '.join(flow_full_names)}. Continue? [yes/N]",
default=False,
show_default=False,
):
click.echo("Drop operation aborted by user.")
return
setup_bundle.apply(write_to_stdout=True)
setup_bundle.apply(report_to_stdout=True)


@cli.command()
Expand All @@ -375,6 +377,21 @@ def drop(
default=False,
help="Continuously watch changes from data sources and apply to the target index.",
)
@click.option(
"--setup",
is_flag=True,
show_default=True,
default=False,
help="Automatically setup backends for the flow if it's not setup yet.",
)
@click.option(
"-f",
"--force",
is_flag=True,
show_default=True,
default=False,
help="Force setup without confirmation prompts.",
)
@click.option(
"-q",
"--quiet",
Expand All @@ -383,21 +400,40 @@ def drop(
default=False,
help="Avoid printing anything to the standard output, e.g. statistics.",
)
def update(app_flow_specifier: str, live: bool, quiet: bool) -> Any:
def update(
app_flow_specifier: str,
live: bool,
setup: bool, # pylint: disable=redefined-outer-name
force: bool,
quiet: bool,
) -> Any:
"""
Update the index to reflect the latest data from data sources.

APP_FLOW_SPECIFIER: path/to/app.py, module, path/to/app.py:FlowName, or module:FlowName.
If :FlowName is omitted, updates all flows.
"""
app_ref, flow_ref = _parse_app_flow_specifier(app_flow_specifier)
app_ref, flow_name = _parse_app_flow_specifier(app_flow_specifier)
_load_user_app(app_ref)

options = flow.FlowLiveUpdaterOptions(live_mode=live, print_stats=not quiet)
if flow_ref is None:
if flow_name is None:
if setup:
_setup_flows(
flow_full_names=flow.flow_full_names(),
force=force,
quiet=quiet,
)
return flow.update_all_flows(options)
else:
with flow.FlowLiveUpdater(_flow_by_name(flow_ref), options) as updater:
fl = flow.flow_by_name(flow_name)
if setup:
_setup_flows(
flow_full_names=[fl.full_name],
force=force,
quiet=quiet,
)
with flow.FlowLiveUpdater(fl, options) as updater:
updater.wait()
return updater.update_stats()

Expand Down
41 changes: 39 additions & 2 deletions python/cocoindex/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from .convert import dump_engine_object, encode_engine_value, make_engine_value_decoder
from .typing import encode_enriched_type
from .runtime import execution_context
from .setup import make_setup_bundle, make_drop_bundle


class _NameBuilder:
Expand Down Expand Up @@ -657,6 +658,34 @@ async def internal_flow_async(self) -> _engine.Flow:
"""
return await asyncio.to_thread(self.internal_flow)

def setup(self, report_to_stdout: bool = False) -> None:
"""
Setup the flow.
"""
execution_context.run(self.setup_async(report_to_stdout=report_to_stdout))

async def setup_async(self, report_to_stdout: bool = False) -> None:
"""
Setup the flow. The async version.
"""
await make_setup_bundle(flow_full_names=[self.full_name]).apply_async(
report_to_stdout=report_to_stdout
)

def drop(self, report_to_stdout: bool = False) -> None:
"""
Drop the flow.
"""
execution_context.run(self.drop_async(report_to_stdout=report_to_stdout))

async def drop_async(self, report_to_stdout: bool = False) -> None:
"""
Drop the flow. The async version.
"""
await make_drop_bundle(flow_full_names=[self.full_name]).apply_async(
report_to_stdout=report_to_stdout
)


def _create_lazy_flow(
name: str | None, fl_def: Callable[[FlowBuilder, DataScope], None]
Expand All @@ -666,7 +695,7 @@ def _create_lazy_flow(
The flow will be built the first time when it's really needed.
"""
flow_name = _flow_name_builder.build_name(name, prefix="_flow_")
flow_full_name = get_full_flow_name(flow_name)
flow_full_name = get_flow_full_name(flow_name)

def _create_engine_flow() -> _engine.Flow:
flow_builder_state = _FlowBuilderState(flow_full_name)
Expand All @@ -685,7 +714,7 @@ def _create_engine_flow() -> _engine.Flow:
_flows: dict[str, Flow] = {}


def get_full_flow_name(name: str) -> str:
def get_flow_full_name(name: str) -> str:
"""
Get the full name of a flow.
"""
Expand Down Expand Up @@ -722,6 +751,14 @@ def flow_names() -> list[str]:
return list(_flows.keys())


def flow_full_names() -> list[str]:
"""
Get the full names of all flows.
"""
with _flows_lock:
return [fl.full_name for fl in _flows.values()]


def flows() -> dict[str, Flow]:
"""
Get all flows.
Expand Down
28 changes: 16 additions & 12 deletions python/cocoindex/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,17 @@ def __str__(self) -> str:
def __repr__(self) -> str:
return self.__str__()

def apply(self, write_to_stdout: bool = False) -> None:
def apply(self, report_to_stdout: bool = False) -> None:
"""
Apply the setup changes.
"""
execution_context.run(self.apply_async(write_to_stdout=write_to_stdout))
execution_context.run(self.apply_async(report_to_stdout=report_to_stdout))

async def apply_async(self, write_to_stdout: bool = False) -> None:
async def apply_async(self, report_to_stdout: bool = False) -> None:
"""
Apply the setup changes. Async version of `apply`.
"""
await self._engine_bundle.apply_async(write_to_stdout=write_to_stdout)
await self._engine_bundle.apply_async(report_to_stdout=report_to_stdout)

def describe(self) -> tuple[str, bool]:
"""
Expand All @@ -50,34 +50,38 @@ async def describe_async(self) -> tuple[str, bool]:
return await self._engine_bundle.describe_async() # type: ignore


def make_setup_bundle(flow_names: list[str]) -> SetupChangeBundle:
def make_setup_bundle(*, flow_full_names: list[str]) -> SetupChangeBundle:
"""
Make a bundle to setup flows with the given names.
"""
flow.ensure_all_flows_built()
return SetupChangeBundle(_engine.make_setup_bundle(flow_names))
return SetupChangeBundle(_engine.make_setup_bundle(flow_full_names))


def make_drop_bundle(flow_names: list[str]) -> SetupChangeBundle:
def make_drop_bundle(*, flow_full_names: list[str]) -> SetupChangeBundle:
"""
Make a bundle to drop flows with the given names.
"""
flow.ensure_all_flows_built()
return SetupChangeBundle(_engine.make_drop_bundle(flow_names))
return SetupChangeBundle(_engine.make_drop_bundle(flow_full_names))


def setup_all_flows(write_to_stdout: bool = False) -> None:
def setup_all_flows(report_to_stdout: bool = False) -> None:
"""
Setup all flows registered in the current process.
"""
make_setup_bundle(flow.flow_names()).apply(write_to_stdout=write_to_stdout)
make_setup_bundle(flow_full_names=flow.flow_full_names()).apply(
report_to_stdout=report_to_stdout
)


def drop_all_flows(write_to_stdout: bool = False) -> None:
def drop_all_flows(report_to_stdout: bool = False) -> None:
"""
Drop all flows registered in the current process.
"""
make_drop_bundle(flow.flow_names()).apply(write_to_stdout=write_to_stdout)
make_drop_bundle(flow_full_names=flow.flow_full_names()).apply(
report_to_stdout=report_to_stdout
)


def flow_names_with_setup() -> list[str]:
Expand Down
Loading