Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ dependencies = [
"click>=8.1.8",
"rich>=14.0.0",
"python-dotenv>=1.1.0",
"watchfiles>=1.1.0",
]
license = "Apache-2.0"
urls = { Homepage = "https://cocoindex.io/" }
Expand Down
99 changes: 95 additions & 4 deletions python/cocoindex/cli.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,24 @@
import asyncio
import atexit
import datetime
import importlib.util
import os
import signal
import sys
import threading
import types
from types import FrameType
from typing import Any

import click
import watchfiles
from dotenv import find_dotenv, load_dotenv
from rich.console import Console
from rich.panel import Panel
from rich.table import Table

from . import flow, lib, setting
from .runtime import execution_context
from .setup import apply_setup_changes, drop_setup, flow_names_with_setup, sync_setup

# Create ServerSettings lazily upon first call, as environment variables may be loaded from files, etc.
Expand Down Expand Up @@ -116,6 +122,12 @@ def _load_user_app(app_target: str) -> types.ModuleType:
)


def _initialize_cocoindex_in_process() -> None:
settings = setting.Settings.from_env()
lib.init(settings)
atexit.register(lib.stop)


@click.group()
@click.version_option(package_name="cocoindex", message="%(prog)s version %(version)s")
@click.option(
Expand All @@ -139,9 +151,7 @@ def cli(env_file: str | None = None) -> None:
click.echo(f"Loaded environment variables from: {loaded_env_path}", err=True)

try:
settings = setting.Settings.from_env()
lib.init(settings)
atexit.register(lib.stop)
_initialize_cocoindex_in_process()
except Exception as e:
raise click.ClickException(f"Failed to initialize CocoIndex library: {e}")

Expand Down Expand Up @@ -485,6 +495,14 @@ def evaluate(
default=False,
help="Avoid printing anything to the standard output, e.g. statistics.",
)
@click.option(
"-r",
"--reload",
is_flag=True,
show_default=True,
default=False,
help="Enable auto-reload on code changes.",
)
def server(
app_target: str,
address: str | None,
Expand All @@ -493,6 +511,7 @@ def server(
cors_origin: str | None,
cors_cocoindex: bool,
cors_local: int | None,
reload: bool,
) -> None:
"""
Start a HTTP server providing REST APIs.
Expand All @@ -502,6 +521,65 @@ def server(
APP_TARGET: path/to/app.py or installed_module.
"""
app_ref = _get_app_ref_from_specifier(app_target)

if reload:
watch_paths = {os.getcwd()}
if os.path.isfile(app_ref):
watch_paths.add(os.path.dirname(os.path.abspath(app_ref)))
else:
try:
spec = importlib.util.find_spec(app_ref)
if spec and spec.origin:
watch_paths.add(os.path.dirname(os.path.abspath(spec.origin)))
except ImportError:
pass

watchfiles.run_process(
*watch_paths,
target=_reloadable_server_target,
args=(
app_ref,
address,
cors_origin,
cors_cocoindex,
cors_local,
live_update,
quiet,
),
watch_filter=watchfiles.PythonFilter(),
callback=lambda changes: click.secho(
f"\nDetected changes in {len(changes)} file(s), reloading server...\n",
fg="cyan",
),
)
else:
_run_server(
app_ref,
address=address,
cors_origin=cors_origin,
cors_cocoindex=cors_cocoindex,
cors_local=cors_local,
live_update=live_update,
quiet=quiet,
)


def _reloadable_server_target(*args: Any, **kwargs: Any) -> None:
"""Reloadable target for the watchfiles process."""
_initialize_cocoindex_in_process()
_run_server(*args, **kwargs)


def _run_server(
app_ref: str,
address: str | None = None,
cors_origin: str | None = None,
cors_cocoindex: bool = False,
cors_local: int | None = None,
live_update: bool = False,
quiet: bool = False,
) -> None:
"""Helper function to run the server with specified settings."""
_load_user_app(app_ref)

server_settings = setting.ServerSettings.from_env()
Expand All @@ -525,7 +603,20 @@ def server(
if live_update:
options = flow.FlowLiveUpdaterOptions(live_mode=True, print_stats=not quiet)
flow.update_all_flows(options)
input("Press Enter to stop...")

click.secho("Press Ctrl+C to stop the server.", fg="yellow")

shutdown_event = threading.Event()

def handle_signal(signum: int, frame: FrameType | None) -> None:
shutdown_event.set()

async def _wait_for_shutdown_signal() -> None:
await asyncio.to_thread(shutdown_event.wait)

signal.signal(signal.SIGINT, handle_signal)
signal.signal(signal.SIGTERM, handle_signal)
execution_context.run(_wait_for_shutdown_signal())


def _flow_name(name: str | None) -> str:
Expand Down