From 57fa8cbb3e01bc3c79e61acc027a82d105a0074f Mon Sep 17 00:00:00 2001 From: Jiangzhou He Date: Mon, 30 Jun 2025 23:52:44 -0700 Subject: [PATCH 1/4] feat(setup): provide API / CLI options to apply setup in-process --- python/cocoindex/cli.py | 168 +++++++++++++++++++++++--------------- python/cocoindex/flow.py | 41 +++++++++- python/cocoindex/setup.py | 24 +++--- src/py/mod.rs | 4 +- src/setup/driver.rs | 15 ++-- src/setup/states.rs | 14 +++- 6 files changed, 174 insertions(+), 92 deletions(-) diff --git a/python/cocoindex/cli.py b/python/cocoindex/cli.py index d4e23c0fc..c7a4a936f 100644 --- a/python/cocoindex/cli.py +++ b/python/cocoindex/cli.py @@ -243,6 +243,30 @@ def show(app_flow_specifier: str, color: bool, verbose: bool) -> None: console.print(table) +def _setup_flows( + *, + flow_full_names: list[str], + force: bool, + quiet: bool = False, + always_show_setup: bool = False, +) -> None: + setup_bundle = make_setup_bundle(flow_full_names=flow_full_names) + description, is_up_to_date = setup_bundle.describe() + if always_show_setup or not (quiet and (force or is_up_to_date)): + click.echo(description) + if is_up_to_date: + if not quiet: + click.echo("Setup is already up to date.") + return + if not force and not click.confirm( + "Changes need to be pushed. Continue? [yes/N]", + default=False, + show_default=False, + ): + return + setup_bundle.apply(report_to_stdout=not quiet) + + @cli.command() @click.argument("app_target", type=str) @click.option( @@ -261,36 +285,14 @@ def setup(app_target: str, force: bool) -> None: """ app_ref = _get_app_ref_from_specifier(app_target) _load_user_app(app_ref) - - setup_bundle = make_setup_bundle(flow.flow_names()) - description, is_up_to_date = setup_bundle.describe() - click.echo(description) - if is_up_to_date: - click.echo("No changes need to be pushed.") - return - if not force and not click.confirm( - "Changes need to be pushed. Continue? [yes/N]", - default=False, - show_default=False, - ): - return - setup_bundle.apply(write_to_stdout=True) + _setup_flows( + flow_full_names=flow.flow_full_names(), force=force, always_show_setup=True + ) @cli.command("drop") @click.argument("app_target", type=str, required=False) @click.argument("flow_name", type=str, nargs=-1) -@click.option( - "-a", - "--all", - "drop_all", - is_flag=True, - show_default=True, - default=False, - help="Drop the backend setup for all flows with persisted setup, " - "even if not defined in the current process." - "If used, APP_TARGET and any listed flow names are ignored.", -) @click.option( "-f", "--force", @@ -299,70 +301,70 @@ def setup(app_target: str, force: bool) -> None: default=False, help="Force drop without confirmation prompts.", ) -def drop( - app_target: str | None, flow_name: tuple[str, ...], drop_all: bool, force: bool -) -> None: +def drop(app_target: str | None, flow_name: tuple[str, ...], force: bool) -> None: """ Drop the backend setup for flows. \b Modes of operation: - 1. Drop ALL persisted setups: `cocoindex drop --all` - 2. Drop all flows defined in an app: `cocoindex drop ` - 3. Drop specific named flows: `cocoindex drop [FLOW_NAME...]` + 1. Drop all flows defined in an app: `cocoindex drop ` + 2. Drop specific named flows: `cocoindex drop [FLOW_NAME...]` """ app_ref = None - flow_names = [] + flow_names: list[str] = [] - if drop_all: - if app_target or flow_name: - click.echo( - "Warning: When --all is used, APP_TARGET and any individual flow names are ignored.", - err=True, - ) - flow_names = flow_names_with_setup() - elif app_target: - app_ref = _get_app_ref_from_specifier(app_target) - _load_user_app(app_ref) - if flow_name: - flow_names = list(flow_name) - click.echo( - f"Preparing to drop specified flows: {', '.join(flow_names)} (in '{app_ref}').", - err=True, - ) - else: - flow_names = flow.flow_names() - if not flow_names: - click.echo(f"No flows found defined in '{app_ref}' to drop.") - return - click.echo( - f"Preparing to drop all flows defined in '{app_ref}': {', '.join(flow_names)}.", - err=True, - ) - else: + if not app_target: raise click.UsageError( "Missing arguments. You must either provide an APP_TARGET (to target app-specific flows) " "or use the --all flag." ) - if not flow_names: + app_ref = _get_app_ref_from_specifier(app_target) + _load_user_app(app_ref) + if flow_name: + flow_names = list(flow_name) + click.echo( + f"Preparing to drop specified flows: {', '.join(flow_names)} (in '{app_ref}').", + err=True, + ) + else: + flow_names = flow.flow_names() + if not flow_names: + click.echo(f"No flows found defined in '{app_ref}' to drop.") + return + click.echo( + f"Preparing to drop all flows defined in '{app_ref}': {', '.join(flow_names)}.", + err=True, + ) + + flow_full_names = [] + for name in flow_names: + try: + flow_full_names.append(flow.flow_by_name(name).full_name) + except KeyError: + click.echo( + f"Warning: Failed to get flow `{name}`. Ignored.", + err=True, + ) + + if not flow_full_names: click.echo("No flows identified for the drop operation.") return - setup_bundle = make_drop_bundle(flow_names) + setup_bundle = make_drop_bundle(flow_full_names=flow_full_names) description, is_up_to_date = setup_bundle.describe() click.echo(description) if is_up_to_date: click.echo("No flows need to be dropped.") return if not force and not click.confirm( - f"\nThis will apply changes to drop setup for: {', '.join(flow_names)}. Continue? [yes/N]", + f"\nThis will apply changes to drop setup for: {', '.join(flow_full_names)}. Continue? [yes/N]", default=False, show_default=False, ): click.echo("Drop operation aborted by user.") return - setup_bundle.apply(write_to_stdout=True) + setup_bundle.apply(report_to_stdout=True) @cli.command() @@ -375,6 +377,21 @@ def drop( default=False, help="Continuously watch changes from data sources and apply to the target index.", ) +@click.option( + "--setup", + is_flag=True, + show_default=True, + default=False, + help="Automatically setup backends for the flow if it's not setup yet.", +) +@click.option( + "-f", + "--force", + is_flag=True, + show_default=True, + default=False, + help="Force setup without confirmation prompts.", +) @click.option( "-q", "--quiet", @@ -383,21 +400,40 @@ def drop( default=False, help="Avoid printing anything to the standard output, e.g. statistics.", ) -def update(app_flow_specifier: str, live: bool, quiet: bool) -> Any: +def update( + app_flow_specifier: str, + live: bool, + setup: bool, # pylint: disable=redefined-outer-name + force: bool, + quiet: bool, +) -> Any: """ Update the index to reflect the latest data from data sources. APP_FLOW_SPECIFIER: path/to/app.py, module, path/to/app.py:FlowName, or module:FlowName. If :FlowName is omitted, updates all flows. """ - app_ref, flow_ref = _parse_app_flow_specifier(app_flow_specifier) + app_ref, flow_name = _parse_app_flow_specifier(app_flow_specifier) _load_user_app(app_ref) options = flow.FlowLiveUpdaterOptions(live_mode=live, print_stats=not quiet) - if flow_ref is None: + if flow_name is None: + if setup: + _setup_flows( + flow_full_names=flow.flow_full_names(), + force=force, + quiet=quiet, + ) return flow.update_all_flows(options) else: - with flow.FlowLiveUpdater(_flow_by_name(flow_ref), options) as updater: + fl = flow.flow_by_name(flow_name) + if setup: + _setup_flows( + flow_full_names=[fl.full_name], + force=force, + quiet=quiet, + ) + with flow.FlowLiveUpdater(fl, options) as updater: updater.wait() return updater.update_stats() diff --git a/python/cocoindex/flow.py b/python/cocoindex/flow.py index 5c651ec92..0028b7060 100644 --- a/python/cocoindex/flow.py +++ b/python/cocoindex/flow.py @@ -34,6 +34,7 @@ from .convert import dump_engine_object, encode_engine_value, make_engine_value_decoder from .typing import encode_enriched_type from .runtime import execution_context +from .setup import make_setup_bundle, make_drop_bundle class _NameBuilder: @@ -657,6 +658,34 @@ async def internal_flow_async(self) -> _engine.Flow: """ return await asyncio.to_thread(self.internal_flow) + def setup(self, report_to_stdout: bool = False) -> None: + """ + Setup the flow. + """ + execution_context.run(self.setup_async(report_to_stdout=report_to_stdout)) + + async def setup_async(self, report_to_stdout: bool = False) -> None: + """ + Setup the flow. The async version. + """ + await make_setup_bundle(flow_full_names=[self.full_name]).apply_async( + report_to_stdout=report_to_stdout + ) + + def drop(self, report_to_stdout: bool = False) -> None: + """ + Drop the flow. + """ + execution_context.run(self.drop_async(report_to_stdout=report_to_stdout)) + + async def drop_async(self, report_to_stdout: bool = False) -> None: + """ + Drop the flow. The async version. + """ + await make_drop_bundle(flow_full_names=[self.full_name]).apply_async( + report_to_stdout=report_to_stdout + ) + def _create_lazy_flow( name: str | None, fl_def: Callable[[FlowBuilder, DataScope], None] @@ -666,7 +695,7 @@ def _create_lazy_flow( The flow will be built the first time when it's really needed. """ flow_name = _flow_name_builder.build_name(name, prefix="_flow_") - flow_full_name = get_full_flow_name(flow_name) + flow_full_name = get_flow_full_name(flow_name) def _create_engine_flow() -> _engine.Flow: flow_builder_state = _FlowBuilderState(flow_full_name) @@ -685,7 +714,7 @@ def _create_engine_flow() -> _engine.Flow: _flows: dict[str, Flow] = {} -def get_full_flow_name(name: str) -> str: +def get_flow_full_name(name: str) -> str: """ Get the full name of a flow. """ @@ -722,6 +751,14 @@ def flow_names() -> list[str]: return list(_flows.keys()) +def flow_full_names() -> list[str]: + """ + Get the full names of all flows. + """ + with _flows_lock: + return [fl.full_name for fl in _flows.values()] + + def flows() -> dict[str, Flow]: """ Get all flows. diff --git a/python/cocoindex/setup.py b/python/cocoindex/setup.py index 111984dbb..0d896e119 100644 --- a/python/cocoindex/setup.py +++ b/python/cocoindex/setup.py @@ -25,17 +25,17 @@ def __str__(self) -> str: def __repr__(self) -> str: return self.__str__() - def apply(self, write_to_stdout: bool = False) -> None: + def apply(self, report_to_stdout: bool = False) -> None: """ Apply the setup changes. """ - execution_context.run(self.apply_async(write_to_stdout=write_to_stdout)) + execution_context.run(self.apply_async(report_to_stdout=report_to_stdout)) - async def apply_async(self, write_to_stdout: bool = False) -> None: + async def apply_async(self, report_to_stdout: bool = False) -> None: """ Apply the setup changes. Async version of `apply`. """ - await self._engine_bundle.apply_async(write_to_stdout=write_to_stdout) + await self._engine_bundle.apply_async(report_to_stdout=report_to_stdout) def describe(self) -> tuple[str, bool]: """ @@ -50,34 +50,34 @@ async def describe_async(self) -> tuple[str, bool]: return await self._engine_bundle.describe_async() # type: ignore -def make_setup_bundle(flow_names: list[str]) -> SetupChangeBundle: +def make_setup_bundle(*, flow_full_names: list[str]) -> SetupChangeBundle: """ Make a bundle to setup flows with the given names. """ flow.ensure_all_flows_built() - return SetupChangeBundle(_engine.make_setup_bundle(flow_names)) + return SetupChangeBundle(_engine.make_setup_bundle(flow_full_names)) -def make_drop_bundle(flow_names: list[str]) -> SetupChangeBundle: +def make_drop_bundle(*, flow_full_names: list[str]) -> SetupChangeBundle: """ Make a bundle to drop flows with the given names. """ flow.ensure_all_flows_built() - return SetupChangeBundle(_engine.make_drop_bundle(flow_names)) + return SetupChangeBundle(_engine.make_drop_bundle(flow_full_names)) -def setup_all_flows(write_to_stdout: bool = False) -> None: +def setup_all_flows(report_to_stdout: bool = False) -> None: """ Setup all flows registered in the current process. """ - make_setup_bundle(flow.flow_names()).apply(write_to_stdout=write_to_stdout) + make_setup_bundle(flow.flow_names()).apply(report_to_stdout=report_to_stdout) -def drop_all_flows(write_to_stdout: bool = False) -> None: +def drop_all_flows(report_to_stdout: bool = False) -> None: """ Drop all flows registered in the current process. """ - make_drop_bundle(flow.flow_names()).apply(write_to_stdout=write_to_stdout) + make_drop_bundle(flow.flow_names()).apply(report_to_stdout=report_to_stdout) def flow_names_with_setup() -> list[str]: diff --git a/src/py/mod.rs b/src/py/mod.rs index 43c0a5a77..bf6568f24 100644 --- a/src/py/mod.rs +++ b/src/py/mod.rs @@ -406,7 +406,7 @@ impl SetupChangeBundle { pub fn apply_async<'py>( &self, py: Python<'py>, - write_to_stdout: bool, + report_to_stdout: bool, ) -> PyResult> { let lib_context = get_lib_context().into_py_result()?; let bundle = self.0.clone(); @@ -417,7 +417,7 @@ impl SetupChangeBundle { bundle .apply( &lib_context, - if write_to_stdout { + if report_to_stdout { stdout.insert(std::io::stdout()) } else { sink.insert(std::io::sink()) diff --git a/src/setup/driver.rs b/src/setup/driver.rs index a0072440f..8e5aa048c 100644 --- a/src/setup/driver.rs +++ b/src/setup/driver.rs @@ -179,12 +179,12 @@ where } } -fn to_object_status(existing: Option, desired: Option) -> Result { - Ok(match (&existing, &desired) { +fn to_object_status(existing: Option, desired: Option) -> Option { + Some(match (&existing, &desired) { (Some(_), None) => ObjectStatus::Deleted, (None, Some(_)) => ObjectStatus::New, (Some(_), Some(_)) => ObjectStatus::Existing, - (None, None) => bail!("Unexpected object status"), + (None, None) => return None, }) } @@ -351,7 +351,7 @@ pub async fn check_flow_setup_status( }); } Ok(FlowSetupStatus { - status: to_object_status(existing_state, desired_state)?, + status: to_object_status(existing_state, desired_state), seen_flow_metadata_version: existing_state.and_then(|s| s.seen_flow_metadata_version), metadata_change, tracking_table: tracking_table_change.map(|c| c.into_setup_info()), @@ -407,7 +407,10 @@ async fn apply_changes_for_flow( existing_setup_state: &mut Option>, pool: &PgPool, ) -> Result<()> { - let verb = match flow_status.status { + let Some(status) = flow_status.status else { + return Ok(()); + }; + let verb = match status { ObjectStatus::New => "Creating", ObjectStatus::Deleted => "Deleting", ObjectStatus::Existing => "Updating resources for ", @@ -514,7 +517,7 @@ async fn apply_changes_for_flow( .await?; } - let is_deletion = flow_status.status == ObjectStatus::Deleted; + let is_deletion = status == ObjectStatus::Deleted; db_metadata::commit_changes_for_flow( flow_name, new_version_id, diff --git a/src/setup/states.rs b/src/setup/states.rs index 93da48f3a..167e972f8 100644 --- a/src/setup/states.rs +++ b/src/setup/states.rs @@ -328,13 +328,13 @@ pub enum ObjectStatus { } pub trait ObjectSetupStatus { - fn status(&self) -> ObjectStatus; + fn status(&self) -> Option; fn is_up_to_date(&self) -> bool; } #[derive(Debug)] pub struct FlowSetupStatus { - pub status: ObjectStatus, + pub status: Option, pub seen_flow_metadata_version: Option, pub metadata_change: Option>, @@ -348,7 +348,7 @@ pub struct FlowSetupStatus { } impl ObjectSetupStatus for FlowSetupStatus { - fn status(&self) -> ObjectStatus { + fn status(&self) -> Option { self.status } @@ -388,10 +388,13 @@ impl GlobalSetupStatus { pub struct ObjectSetupStatusCode<'a, Status: ObjectSetupStatus>(&'a Status); impl std::fmt::Display for ObjectSetupStatusCode<'_, Status> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let Some(status) = self.0.status() else { + return Ok(()); + }; write!( f, "[ {:^9} ]", - match self.0.status() { + match status { ObjectStatus::New => "TO CREATE", ObjectStatus::Existing => if self.0.is_up_to_date() { @@ -417,6 +420,9 @@ pub struct FormattedFlowSetupStatus<'a>(pub &'a str, pub &'a FlowSetupStatus); impl std::fmt::Display for FormattedFlowSetupStatus<'_> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let flow_ssc = self.1; + if flow_ssc.status.is_none() { + return Ok(()); + } write!( f, From d0888ef3f1f8130aa34f6b5918e344b2d3858c05 Mon Sep 17 00:00:00 2001 From: Jiangzhou He Date: Mon, 30 Jun 2025 23:56:33 -0700 Subject: [PATCH 2/4] fix: mypy errors --- python/cocoindex/functions.py | 2 +- python/cocoindex/setup.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/python/cocoindex/functions.py b/python/cocoindex/functions.py index 2b9ae1802..25d4d1dc1 100644 --- a/python/cocoindex/functions.py +++ b/python/cocoindex/functions.py @@ -11,7 +11,7 @@ # Check if sentence_transformers is available try: - import sentence_transformers # type: ignore + import sentence_transformers _SENTENCE_TRANSFORMERS_AVAILABLE = True except ImportError: diff --git a/python/cocoindex/setup.py b/python/cocoindex/setup.py index 0d896e119..08a6b4d7f 100644 --- a/python/cocoindex/setup.py +++ b/python/cocoindex/setup.py @@ -55,7 +55,7 @@ def make_setup_bundle(*, flow_full_names: list[str]) -> SetupChangeBundle: Make a bundle to setup flows with the given names. """ flow.ensure_all_flows_built() - return SetupChangeBundle(_engine.make_setup_bundle(flow_full_names)) + return SetupChangeBundle(_engine.make_setup_bundle(flow_full_names=flow_full_names)) def make_drop_bundle(*, flow_full_names: list[str]) -> SetupChangeBundle: @@ -63,7 +63,7 @@ def make_drop_bundle(*, flow_full_names: list[str]) -> SetupChangeBundle: Make a bundle to drop flows with the given names. """ flow.ensure_all_flows_built() - return SetupChangeBundle(_engine.make_drop_bundle(flow_full_names)) + return SetupChangeBundle(_engine.make_drop_bundle(flow_full_names=flow_full_names)) def setup_all_flows(report_to_stdout: bool = False) -> None: From 59df6c679048fbb33db98c7bf61e0050803c30b1 Mon Sep 17 00:00:00 2001 From: Jiangzhou He Date: Mon, 30 Jun 2025 23:57:52 -0700 Subject: [PATCH 3/4] fix: more mypy errors --- python/cocoindex/setup.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/python/cocoindex/setup.py b/python/cocoindex/setup.py index 08a6b4d7f..798827de2 100644 --- a/python/cocoindex/setup.py +++ b/python/cocoindex/setup.py @@ -55,7 +55,7 @@ def make_setup_bundle(*, flow_full_names: list[str]) -> SetupChangeBundle: Make a bundle to setup flows with the given names. """ flow.ensure_all_flows_built() - return SetupChangeBundle(_engine.make_setup_bundle(flow_full_names=flow_full_names)) + return SetupChangeBundle(_engine.make_setup_bundle(flow_full_names)) def make_drop_bundle(*, flow_full_names: list[str]) -> SetupChangeBundle: @@ -63,21 +63,25 @@ def make_drop_bundle(*, flow_full_names: list[str]) -> SetupChangeBundle: Make a bundle to drop flows with the given names. """ flow.ensure_all_flows_built() - return SetupChangeBundle(_engine.make_drop_bundle(flow_full_names=flow_full_names)) + return SetupChangeBundle(_engine.make_drop_bundle(flow_full_names)) def setup_all_flows(report_to_stdout: bool = False) -> None: """ Setup all flows registered in the current process. """ - make_setup_bundle(flow.flow_names()).apply(report_to_stdout=report_to_stdout) + make_setup_bundle(flow_full_names=flow.flow_full_names()).apply( + report_to_stdout=report_to_stdout + ) def drop_all_flows(report_to_stdout: bool = False) -> None: """ Drop all flows registered in the current process. """ - make_drop_bundle(flow.flow_names()).apply(report_to_stdout=report_to_stdout) + make_drop_bundle(flow_full_names=flow.flow_full_names()).apply( + report_to_stdout=report_to_stdout + ) def flow_names_with_setup() -> list[str]: From 7cb3c258f3ee6bfb27501142a30196e9dbc1c2e0 Mon Sep 17 00:00:00 2001 From: Jiangzhou He Date: Tue, 1 Jul 2025 00:12:05 -0700 Subject: [PATCH 4/4] fix: restore previous "type: ignore" --- pyproject.toml | 1 + python/cocoindex/functions.py | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index e13aec773..17798cd08 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -38,3 +38,4 @@ python_version = "3.11" strict = true files = "python/cocoindex" exclude = "(\\.venv|site-packages)" +disable_error_code = ["unused-ignore"] diff --git a/python/cocoindex/functions.py b/python/cocoindex/functions.py index 25d4d1dc1..2b9ae1802 100644 --- a/python/cocoindex/functions.py +++ b/python/cocoindex/functions.py @@ -11,7 +11,7 @@ # Check if sentence_transformers is available try: - import sentence_transformers + import sentence_transformers # type: ignore _SENTENCE_TRANSFORMERS_AVAILABLE = True except ImportError: