Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ dependencies = ["sentence-transformers>=3.3.1", "click>=8.1.8", "rich>=14.0.0"]
license = "Apache-2.0"
urls = { Homepage = "https://cocoindex.io/" }

[project.scripts]
cocoindex = "cocoindex.cli:cli"

[tool.maturin]
bindings = "pyo3"
python-source = "python"
Expand Down
137 changes: 121 additions & 16 deletions python/cocoindex/cli.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,90 @@
import click
import datetime
import sys
import importlib.util
import os
import atexit

from rich.console import Console
from rich.table import Table

from . import flow, lib, setting
from . import flow, lib, setting, query
from .setup import sync_setup, drop_setup, flow_names_with_setup, apply_setup_changes

# Create ServerSettings lazily upon first call, as environment variables may be loaded from files, etc.
COCOINDEX_HOST = 'https://cocoindex.io'

def _load_user_app(app_path: str):
"""Loads the user's application file as a module. Exits on failure."""
if not app_path:
click.echo("Internal Error: Application path not provided.", err=True)
sys.exit(1)

app_path = os.path.abspath(app_path)
app_dir = os.path.dirname(app_path)
module_name = os.path.splitext(os.path.basename(app_path))[0]

original_sys_path = list(sys.path)
if app_dir not in sys.path:
sys.path.insert(0, app_dir)

try:
spec = importlib.util.spec_from_file_location(module_name, app_path)
if spec is None:
raise ImportError(f"Could not load spec for file: {app_path}")
module = importlib.util.module_from_spec(spec)
sys.modules[spec.name] = module
spec.loader.exec_module(module)
return module
except Exception as e:
raise click.ClickException(f"Failed importing application module '{os.path.basename(app_path)}': {e}")
finally:
sys.path = original_sys_path

def _ensure_flows_and_handlers_built():
"""Builds flows and handlers after app load. Exits on failure."""
try:
flow.ensure_all_flows_built()
query.ensure_all_handlers_built()
except Exception as e:
click.echo(f"\nError: Failed processing flows/handlers from application.", err=True)
click.echo(f"Reason: {e}", err=True)
sys.exit(1)

@click.group()
@click.version_option(package_name="cocoindex", message="%(prog)s version %(version)s")
def cli():
"""
CLI for Cocoindex.
CLI for Cocoindex. Requires --app for most commands.
"""
try:
settings = setting.Settings.from_env()
lib.init(settings)
atexit.register(lib.stop)
except Exception as e:
raise click.ClickException(f"Failed to initialize CocoIndex library: {e}")

@cli.command()
@click.option(
'--app', 'app_path', required=False,
type=click.Path(exists=True, file_okay=True, dir_okay=False, readable=True, resolve_path=True),
help="Path to the Python file defining flows."
)
@click.option(
"-a", "--all", "show_all", is_flag=True, show_default=True, default=False,
help="Also show all flows with persisted setup, even if not defined in the current process.")
def ls(show_all: bool):
def ls(app_path: str | None, show_all: bool):
"""
List all flows.
"""
current_flow_names = flow.flow_names()
current_flow_names = set()

if app_path:
_load_user_app(app_path)
current_flow_names = set(flow.flow_names())
elif not show_all:
raise click.UsageError("The --app <path/to/app.py> option is required unless using --all.")

persisted_flow_names = flow_names_with_setup()
remaining_persisted_flow_names = set(persisted_flow_names)

Expand Down Expand Up @@ -52,21 +115,28 @@ def ls(show_all: bool):
click.echo(' [?]: Flows with persisted setup, but not in the current process.')

@cli.command()
@click.option(
'--app', 'app_path', required=True,
type=click.Path(exists=True, file_okay=True, dir_okay=False, readable=True, resolve_path=True),
help="Path to the Python file defining the flow."
)
@click.argument("flow_name", type=str, required=False)
@click.option("--color/--no-color", default=True, help="Enable or disable colored output.")
@click.option("--verbose", is_flag=True, help="Show verbose output with full details.")
def show(flow_name: str | None, color: bool, verbose: bool):
def show(app_path: str, flow_name: str | None, color: bool, verbose: bool):
"""
Show the flow spec and schema in a readable format with colored output.
Show the flow spec and schema in a readable format.
"""
_load_user_app(app_path)

flow = _flow_by_name(flow_name)
console = Console(no_color=not color)
console.print(flow._render_spec(verbose=verbose))

console.print()
table = Table(
title=f"Schema for Flow: {flow.name}",
show_header=True,
title_style="cyan",
header_style="bold magenta"
)
table.add_column("Field", style="cyan")
Expand All @@ -79,11 +149,17 @@ def show(flow_name: str | None, color: bool, verbose: bool):
console.print(table)

@cli.command()
def setup():
@click.option(
'--app', 'app_path', required=True,
type=click.Path(exists=True, file_okay=True, dir_okay=False, readable=True, resolve_path=True),
help="Path to the Python file defining flows to set up."
)
def setup(app_path: str):
"""
Check and apply backend setup changes for flows, including the internal and target storage
(to export).
"""
_load_user_app(app_path)
setup_status = sync_setup()
click.echo(setup_status)
if setup_status.is_up_to_date():
Expand All @@ -95,16 +171,25 @@ def setup():
apply_setup_changes(setup_status)

@cli.command()
@click.option(
'--app', 'app_path', required=False,
type=click.Path(exists=True, file_okay=True, dir_okay=False, readable=True, resolve_path=True),
help="Path to the app file (needed if not using --all or specific names)."
)
@click.argument("flow_name", type=str, nargs=-1)
@click.option(
"-a", "--all", "drop_all", is_flag=True, show_default=True, default=False,
help="Drop the backend setup for all flows with persisted setup, "
"even if not defined in the current process.")
def drop(flow_name: tuple[str, ...], drop_all: bool):
def drop(app_path: str | None, flow_name: tuple[str, ...], drop_all: bool):
"""
Drop the backend setup for specified flows.
If no flow is specified, all flows defined in the current process will be dropped.
"""
if not app_path:
raise click.UsageError("The --app <path> option is required when dropping flows defined in the app (and not using --all or specific flow names).")
_load_user_app(app_path)

if drop_all:
flow_names = flow_names_with_setup()
elif len(flow_name) == 0:
Expand All @@ -122,17 +207,23 @@ def drop(flow_name: tuple[str, ...], drop_all: bool):
apply_setup_changes(setup_status)

@cli.command()
@click.option(
'--app', 'app_path', required=True,
type=click.Path(exists=True, file_okay=True, dir_okay=False, readable=True, resolve_path=True),
help="Path to the Python file defining flows."
)
@click.argument("flow_name", type=str, required=False)
@click.option(
"-L", "--live", is_flag=True, show_default=True, default=False,
help="Continuously watch changes from data sources and apply to the target index.")
@click.option(
"-q", "--quiet", is_flag=True, show_default=True, default=False,
help="Avoid printing anything to the standard output, e.g. statistics.")
def update(flow_name: str | None, live: bool, quiet: bool):
def update(app_path: str, flow_name: str | None, live: bool, quiet: bool):
"""
Update the index to reflect the latest data from data sources.
"""
_load_user_app(app_path)
options = flow.FlowLiveUpdaterOptions(live_mode=live, print_stats=not quiet)
if flow_name is None:
return flow.update_all_flows(options)
Expand All @@ -142,6 +233,11 @@ def update(flow_name: str | None, live: bool, quiet: bool):
return updater.update_stats()

@cli.command()
@click.option(
'--app', 'app_path', required=True,
type=click.Path(exists=True, file_okay=True, dir_okay=False, readable=True, resolve_path=True),
help="Path to the Python file defining the flow."
)
@click.argument("flow_name", type=str, required=False)
@click.option(
"-o", "--output-dir", type=str, required=False,
Expand All @@ -151,23 +247,26 @@ def update(flow_name: str | None, live: bool, quiet: bool):
help="Use already-cached intermediate data if available. "
"Note that we only reuse existing cached data without updating the cache "
"even if it's turned on.")
def evaluate(flow_name: str | None, output_dir: str | None, cache: bool = True):
def evaluate(app_path: str, flow_name: str | None, output_dir: str | None, cache: bool = True):
"""
Evaluate the flow and dump flow outputs to files.

Instead of updating the index, it dumps what should be indexed to files.
Mainly used for evaluation purpose.
"""
_load_user_app(app_path)
fl = _flow_by_name(flow_name)
if output_dir is None:
output_dir = f"eval_{fl.name}_{datetime.datetime.now().strftime('%y%m%d_%H%M%S')}"
options = flow.EvaluateAndDumpOptions(output_dir=output_dir, use_cache=cache)
fl.evaluate_and_dump(options)

# Create ServerSettings lazily upon first call, as environment variables may be loaded from files, etc.
COCOINDEX_HOST = 'https://cocoindex.io'

@cli.command()
@click.option(
"--app", "app_path", required=True,
type=click.Path(exists=True, file_okay=True, dir_okay=False, readable=True, resolve_path=True),
help="Path to the Python file defining flows and handlers."
)
@click.option(
"-a", "--address", type=str,
help="The address to bind the server to, in the format of IP:PORT. "
Expand All @@ -190,13 +289,16 @@ def evaluate(flow_name: str | None, output_dir: str | None, cache: bool = True):
@click.option(
"-q", "--quiet", is_flag=True, show_default=True, default=False,
help="Avoid printing anything to the standard output, e.g. statistics.")
def server(address: str | None, live_update: bool, quiet: bool, cors_origin: str | None,
cors_cocoindex: bool, cors_local: int | None):
def server(app_path: str, address: str | None, live_update: bool, quiet: bool,
cors_origin: str | None, cors_cocoindex: bool, cors_local: int | None):
"""
Start a HTTP server providing REST APIs.

It will allow tools like CocoInsight to access the server.
"""
_load_user_app(app_path)
_ensure_flows_and_handlers_built()

server_settings = setting.ServerSettings.from_env()
cors_origins: set[str] = set(server_settings.cors_origins or [])
if cors_origin is not None:
Expand Down Expand Up @@ -235,3 +337,6 @@ def _flow_name(name: str | None) -> str:

def _flow_by_name(name: str | None) -> flow.Flow:
return flow.flow_by_name(_flow_name(name))

if __name__ == "__main__":
cli()
2 changes: 1 addition & 1 deletion python/cocoindex/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ def _render_spec(self, verbose: bool = False) -> Tree:
tree = Tree(f"Flow: {self.name}", style="cyan")

def build_tree(label: str, lines: list):
node = Tree(label, style="bold magenta" if lines else "cyan")
node = Tree(label=label if lines else label + " None", style="cyan")
for line in lines:
child_node = node.add(Text(line.content, style="yellow"))
child_node.children = build_tree("", line.children).children
Expand Down
73 changes: 23 additions & 50 deletions python/cocoindex/lib.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
"""
Library level functions and states.
"""
import sys
import functools
import inspect
import warnings
from typing import Callable, Any

from typing import Callable

from . import _engine
from . import flow, query, cli, setting
from . import _engine, setting
from .convert import dump_engine_object


Expand All @@ -19,59 +15,36 @@ def init(settings: setting.Settings):

def start_server(settings: setting.ServerSettings):
"""Start the cocoindex server."""
flow.ensure_all_flows_built()
query.ensure_all_handlers_built()
_engine.start_server(settings.__dict__)

def stop():
"""Stop the cocoindex library."""
_engine.stop()

def main_fn(
settings: setting.Settings | None = None,
cocoindex_cmd: str = 'cocoindex',
settings: Any | None = None,
cocoindex_cmd: str | None = None,
) -> Callable[[Callable], Callable]:
"""
A decorator to wrap the main function.
If the python binary is called with the given command, it yields control to the cocoindex CLI.

If the settings are not provided, they are loaded from the environment variables.
DEPRECATED: Using @cocoindex.main_fn() is no longer supported and has no effect.
This decorator will be removed in a future version, which will cause an AttributeError.
Please remove it from your code and use the standalone 'cocoindex' CLI.
"""

def _pre_init() -> None:
effective_settings = settings or setting.Settings.from_env()
init(effective_settings)

def _should_run_cli() -> bool:
return len(sys.argv) > 1 and sys.argv[1] == cocoindex_cmd

def _run_cli():
return cli.cli.main(sys.argv[2:], prog_name=f"{sys.argv[0]} {sys.argv[1]}")
warnings.warn(
"\n\n"
"!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n"
"CRITICAL DEPRECATION NOTICE from CocoIndex:\n"
"The @cocoindex.main_fn() decorator found in your script is DEPRECATED and IGNORED.\n"
"It provides NO functionality and will be REMOVED entirely in a future version.\n"
"If not removed, your script will FAIL with an AttributeError in the future.\n\n"
"ACTION REQUIRED: Please REMOVE @cocoindex.main_fn() from your Python script.\n\n"
"To use CocoIndex commands, invoke the standalone 'cocoindex' CLI:\n"
" cocoindex <command> [options] --app <your_script.py>\n"
"!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n\n",
DeprecationWarning,
stacklevel=2
)

def _main_wrapper(fn: Callable) -> Callable:
if inspect.iscoroutinefunction(fn):
@functools.wraps(fn)
async def _inner(*args, **kwargs):
_pre_init()
try:
if _should_run_cli():
# Schedule to a separate thread as it invokes nested event loop.
# return await asyncio.to_thread(_run_cli)
return _run_cli()
return await fn(*args, **kwargs)
finally:
stop()
return _inner
else:
@functools.wraps(fn)
def _inner(*args, **kwargs):
_pre_init()
try:
if _should_run_cli():
return _run_cli()
return fn(*args, **kwargs)
finally:
stop()
return _inner

return fn
return _main_wrapper