diff --git a/python/cocoindex/cli.py b/python/cocoindex/cli.py index bf01e4d99..d4e23c0fc 100644 --- a/python/cocoindex/cli.py +++ b/python/cocoindex/cli.py @@ -17,7 +17,7 @@ from rich.table import Table from . import flow, lib, setting -from .setup import apply_setup_changes, drop_setup, flow_names_with_setup, sync_setup +from .setup import make_setup_bundle, make_drop_bundle, flow_names_with_setup # Create ServerSettings lazily upon first call, as environment variables may be loaded from files, etc. COCOINDEX_HOST = "https://cocoindex.io" @@ -146,7 +146,7 @@ def cli(env_file: str | None = None) -> None: if load_dotenv(dotenv_path=dotenv_path): loaded_env_path = os.path.abspath(dotenv_path) - click.echo(f"Loaded environment variables from: {loaded_env_path}", err=True) + click.echo(f"Loaded environment variables from: {loaded_env_path}\n", err=True) try: _initialize_cocoindex_in_process() @@ -262,9 +262,10 @@ def setup(app_target: str, force: bool) -> None: app_ref = _get_app_ref_from_specifier(app_target) _load_user_app(app_ref) - setup_status = sync_setup() - click.echo(setup_status) - if setup_status.is_up_to_date(): + setup_bundle = make_setup_bundle(flow.flow_names()) + description, is_up_to_date = setup_bundle.describe() + click.echo(description) + if is_up_to_date: click.echo("No changes need to be pushed.") return if not force and not click.confirm( @@ -273,7 +274,7 @@ def setup(app_target: str, force: bool) -> None: show_default=False, ): return - apply_setup_changes(setup_status) + setup_bundle.apply(write_to_stdout=True) @cli.command("drop") @@ -348,9 +349,10 @@ def drop( click.echo("No flows identified for the drop operation.") return - setup_status = drop_setup(flow_names) - click.echo(setup_status) - if setup_status.is_up_to_date(): + setup_bundle = make_drop_bundle(flow_names) + description, is_up_to_date = setup_bundle.describe() + click.echo(description) + if is_up_to_date: click.echo("No flows need to be dropped.") return if not force and not click.confirm( @@ -360,7 +362,7 @@ def drop( ): click.echo("Drop operation aborted by user.") return - apply_setup_changes(setup_status) + setup_bundle.apply(write_to_stdout=True) @cli.command() diff --git a/python/cocoindex/setup.py b/python/cocoindex/setup.py index 59a360b58..111984dbb 100644 --- a/python/cocoindex/setup.py +++ b/python/cocoindex/setup.py @@ -1,26 +1,100 @@ +""" +This module provides APIs to manage the setup of flows. +""" + from . import flow from . import setting from . import _engine # type: ignore +from .runtime import execution_context + + +class SetupChangeBundle: + """ + This class represents a bundle of setup changes. + """ + + _engine_bundle: _engine.SetupChangeBundle + + def __init__(self, _engine_bundle: _engine.SetupChangeBundle): + self._engine_bundle = _engine_bundle + + def __str__(self) -> str: + desc, _ = execution_context.run(self._engine_bundle.describe_async()) + return desc # type: ignore + + def __repr__(self) -> str: + return self.__str__() + + def apply(self, write_to_stdout: bool = False) -> None: + """ + Apply the setup changes. + """ + execution_context.run(self.apply_async(write_to_stdout=write_to_stdout)) + + async def apply_async(self, write_to_stdout: bool = False) -> None: + """ + Apply the setup changes. Async version of `apply`. + """ + await self._engine_bundle.apply_async(write_to_stdout=write_to_stdout) + + def describe(self) -> tuple[str, bool]: + """ + Describe the setup changes. + """ + return execution_context.run(self.describe_async()) # type: ignore + async def describe_async(self) -> tuple[str, bool]: + """ + Describe the setup changes. Async version of `describe`. + """ + return await self._engine_bundle.describe_async() # type: ignore -def sync_setup() -> _engine.SetupStatus: + +def make_setup_bundle(flow_names: list[str]) -> SetupChangeBundle: + """ + Make a bundle to setup flows with the given names. + """ flow.ensure_all_flows_built() - return _engine.sync_setup() + return SetupChangeBundle(_engine.make_setup_bundle(flow_names)) -def drop_setup(flow_names: list[str]) -> _engine.SetupStatus: +def make_drop_bundle(flow_names: list[str]) -> SetupChangeBundle: + """ + Make a bundle to drop flows with the given names. + """ flow.ensure_all_flows_built() - return _engine.drop_setup([flow.get_full_flow_name(name) for name in flow_names]) + return SetupChangeBundle(_engine.make_drop_bundle(flow_names)) + + +def setup_all_flows(write_to_stdout: bool = False) -> None: + """ + Setup all flows registered in the current process. + """ + make_setup_bundle(flow.flow_names()).apply(write_to_stdout=write_to_stdout) + + +def drop_all_flows(write_to_stdout: bool = False) -> None: + """ + Drop all flows registered in the current process. + """ + make_drop_bundle(flow.flow_names()).apply(write_to_stdout=write_to_stdout) def flow_names_with_setup() -> list[str]: + """ + Get the names of all flows that have been setup. + """ + return execution_context.run(flow_names_with_setup_async()) # type: ignore + + +async def flow_names_with_setup_async() -> list[str]: + """ + Get the names of all flows that have been setup. Async version of `flow_names_with_setup`. + """ result = [] - for name in _engine.flow_names_with_setup(): + all_flow_names = await _engine.flow_names_with_setup_async() + for name in all_flow_names: app_namespace, name = setting.split_app_namespace(name, ".") if app_namespace == setting.get_app_namespace(): result.append(name) return result - - -def apply_setup_changes(setup_status: _engine.SetupStatus) -> None: - _engine.apply_setup_changes(setup_status) diff --git a/src/builder/flow_builder.rs b/src/builder/flow_builder.rs index 4fe70fb50..333e0a354 100644 --- a/src/builder/flow_builder.rs +++ b/src/builder/flow_builder.rs @@ -16,7 +16,6 @@ use crate::{ lib_context::LibContext, ops::interface::FlowInstanceContext, py::IntoPyResult, - setup, }; use crate::{lib_context::FlowContext, py}; @@ -237,7 +236,6 @@ impl std::fmt::Display for DataCollector { pub struct FlowBuilder { lib_context: Arc, flow_inst_context: Arc, - existing_flow_ss: Option>, root_op_scope: Arc, flow_instance_name: String, @@ -259,14 +257,6 @@ impl FlowBuilder { #[new] pub fn new(name: &str) -> PyResult { let lib_context = get_lib_context().into_py_result()?; - let existing_flow_ss = lib_context.persistence_ctx.as_ref().and_then(|ctx| { - ctx.all_setup_states - .read() - .unwrap() - .flows - .get(name) - .cloned() - }); let root_op_scope = OpScope::new( spec::ROOT_SCOPE_NAME.to_string(), None, @@ -276,7 +266,6 @@ impl FlowBuilder { let result = Self { lib_context, flow_inst_context, - existing_flow_ss, root_op_scope, flow_instance_name: name.to_string(), @@ -578,9 +567,18 @@ impl FlowBuilder { get_runtime().block_on(async move { let analyzed_flow = super::AnalyzedFlow::from_flow_instance(spec, flow_instance_ctx).await?; - let execution_ctx = - FlowContext::new(Arc::new(analyzed_flow), self.existing_flow_ss.as_ref()) - .await?; + let persistence_ctx = self.lib_context.require_persistence_ctx()?; + let execution_ctx = { + let flow_setup_ctx = persistence_ctx.setup_ctx.read().await; + FlowContext::new( + Arc::new(analyzed_flow), + flow_setup_ctx + .all_setup_states + .flows + .get(&self.flow_instance_name), + ) + .await? + }; anyhow::Ok(execution_ctx) }) }) diff --git a/src/execution/live_updater.rs b/src/execution/live_updater.rs index f9f3003b7..3ee390f76 100644 --- a/src/execution/live_updater.rs +++ b/src/execution/live_updater.rs @@ -62,7 +62,7 @@ impl SharedAckFn { async fn update_source( flow: Arc, plan: Arc, - execution_ctx: Arc>, + execution_ctx: Arc>, source_update_stats: Arc, source_idx: usize, pool: PgPool, diff --git a/src/lib_context.rs b/src/lib_context.rs index e26c9b253..2f22bd81c 100644 --- a/src/lib_context.rs +++ b/src/lib_context.rs @@ -10,29 +10,42 @@ use sqlx::PgPool; use sqlx::postgres::PgConnectOptions; use tokio::runtime::Runtime; -pub struct ExecutionContext { +pub struct FlowExecutionContext { pub setup_execution_context: Arc, - pub flow_setup_status: setup::FlowSetupStatus, + pub setup_status: setup::FlowSetupStatus, source_indexing_contexts: Vec>>, } -impl ExecutionContext { +async fn build_setup_context( + analyzed_flow: &AnalyzedFlow, + existing_flow_ss: Option<&setup::FlowSetupState>, +) -> Result<( + Arc, + setup::FlowSetupStatus, +)> { + let setup_execution_context = Arc::new(exec_ctx::build_flow_setup_execution_context( + &analyzed_flow.flow_instance, + &analyzed_flow.data_schema, + &analyzed_flow.setup_state, + existing_flow_ss, + )?); + + let setup_status = setup::check_flow_setup_status( + Some(&setup_execution_context.setup_state), + existing_flow_ss, + ) + .await?; + + Ok((setup_execution_context, setup_status)) +} + +impl FlowExecutionContext { async fn new( analyzed_flow: &AnalyzedFlow, existing_flow_ss: Option<&setup::FlowSetupState>, ) -> Result { - let setup_execution_context = Arc::new(exec_ctx::build_flow_setup_execution_context( - &analyzed_flow.flow_instance, - &analyzed_flow.data_schema, - &analyzed_flow.setup_state, - existing_flow_ss, - )?); - - let flow_setup_status = setup::check_flow_setup_status( - Some(&setup_execution_context.setup_state), - existing_flow_ss, - ) - .await?; + let (setup_execution_context, setup_status) = + build_setup_context(analyzed_flow, existing_flow_ss).await?; let mut source_indexing_contexts = Vec::new(); source_indexing_contexts.resize_with(analyzed_flow.flow_instance.import_ops.len(), || { @@ -41,10 +54,24 @@ impl ExecutionContext { Ok(Self { setup_execution_context, - flow_setup_status, + setup_status, source_indexing_contexts, }) } + + pub async fn update_setup_state( + &mut self, + analyzed_flow: &AnalyzedFlow, + existing_flow_ss: Option<&setup::FlowSetupState>, + ) -> Result<()> { + let (setup_execution_context, setup_status) = + build_setup_context(analyzed_flow, existing_flow_ss).await?; + + self.setup_execution_context = setup_execution_context; + self.setup_status = setup_status; + Ok(()) + } + pub async fn get_source_indexing_context( &self, flow: &Arc, @@ -66,18 +93,23 @@ impl ExecutionContext { .await?) } } + pub struct FlowContext { pub flow: Arc, - execution_ctx: Arc>, + execution_ctx: Arc>, } impl FlowContext { + pub fn flow_name(&self) -> &str { + &self.flow.flow_instance.name + } + pub async fn new( flow: Arc, existing_flow_ss: Option<&setup::FlowSetupState>, ) -> Result { let execution_ctx = Arc::new(tokio::sync::RwLock::new( - ExecutionContext::new(&flow, existing_flow_ss).await?, + FlowExecutionContext::new(&flow, existing_flow_ss).await?, )); Ok(Self { flow, @@ -87,11 +119,12 @@ impl FlowContext { pub async fn use_execution_ctx( &self, - ) -> Result> { + ) -> Result> { let execution_ctx = self.execution_ctx.read().await; - if !execution_ctx.flow_setup_status.is_up_to_date() { + if !execution_ctx.setup_status.is_up_to_date() { api_bail!( - "Flow setup is not up-to-date. Please run `cocoindex setup` to update the setup." + "Setup for flow `{}` is not up-to-date. Please run `cocoindex setup` to update the setup.", + self.flow_name() ); } Ok(execution_ctx) @@ -99,20 +132,19 @@ impl FlowContext { pub async fn use_owned_execution_ctx( &self, - ) -> Result> { + ) -> Result> { let execution_ctx = self.execution_ctx.clone().read_owned().await; - if !execution_ctx.flow_setup_status.is_up_to_date() { + if !execution_ctx.setup_status.is_up_to_date() { api_bail!( - "Flow setup is not up-to-date. Please run `cocoindex setup` to update the setup." + "Setup for flow `{}` is not up-to-date. Please run `cocoindex setup` to update the setup.", + self.flow_name() ); } Ok(execution_ctx) } - pub async fn get_execution_ctx_for_setup( - &self, - ) -> tokio::sync::RwLockReadGuard { - self.execution_ctx.read().await + pub fn get_execution_ctx_for_setup(&self) -> &tokio::sync::RwLock { + &self.execution_ctx } } @@ -150,9 +182,13 @@ impl DbPools { } } +pub struct LibSetupContext { + pub all_setup_states: setup::AllSetupStates, + pub global_setup_status: setup::GlobalSetupStatus, +} pub struct PersistenceContext { pub builtin_db_pool: PgPool, - pub all_setup_states: RwLock>, + pub setup_ctx: tokio::sync::RwLock, } pub struct LibContext { @@ -176,20 +212,14 @@ impl LibContext { Ok(flow_ctx) } - pub fn require_builtin_db_pool(&self) -> Result<&PgPool> { + pub fn require_persistence_ctx(&self) -> Result<&PersistenceContext> { self.persistence_ctx .as_ref() - .map(|ctx| &ctx.builtin_db_pool) .ok_or_else(|| anyhow!("Database is required for this operation. Please set COCOINDEX_DATABASE_URL environment variable and call cocoindex.init() with database settings.")) } - pub fn require_all_setup_states( - &self, - ) -> Result<&RwLock>> { - self.persistence_ctx - .as_ref() - .map(|ctx| &ctx.all_setup_states) - .ok_or_else(|| anyhow!("Database is required for this operation. Please set COCOINDEX_DATABASE_URL environment variable and call cocoindex.init() with database settings.")) + pub fn require_builtin_db_pool(&self) -> Result<&PgPool> { + Ok(&self.require_persistence_ctx()?.builtin_db_pool) } } @@ -218,7 +248,10 @@ pub fn create_lib_context(settings: settings::Settings) -> Result { })?; Some(PersistenceContext { builtin_db_pool: pool, - all_setup_states: RwLock::new(all_setup_states), + setup_ctx: tokio::sync::RwLock::new(LibSetupContext { + global_setup_status: setup::GlobalSetupStatus::from_setup_states(&all_setup_states), + all_setup_states, + }), }) } else { // No database configured @@ -285,7 +318,6 @@ mod tests { let lib_context = create_lib_context(settings).unwrap(); assert!(lib_context.persistence_ctx.is_none()); assert!(lib_context.require_builtin_db_pool().is_err()); - assert!(lib_context.require_all_setup_states().is_err()); } #[test] diff --git a/src/py/mod.rs b/src/py/mod.rs index 45b190d75..43c0a5a77 100644 --- a/src/py/mod.rs +++ b/src/py/mod.rs @@ -8,7 +8,7 @@ use crate::ops::py_factory::PyOpArgSchema; use crate::ops::{interface::ExecutorFactory, py_factory::PyFunctionFactory, register_factory}; use crate::server::{self, ServerSettings}; use crate::settings::Settings; -use crate::setup; +use crate::setup::{self}; use pyo3::IntoPyObjectExt; use pyo3::{exceptions::PyException, prelude::*}; use pyo3_async_runtimes::tokio::future_into_py; @@ -337,6 +337,22 @@ impl Flow { process_fields(&schema.schema.fields, "", &mut result); result } + + pub fn make_setup_action(&self) -> SetupChangeBundle { + let bundle = setup::SetupChangeBundle { + action: setup::FlowSetupChangeAction::Setup, + flow_names: vec![self.name().to_string()], + }; + SetupChangeBundle(Arc::new(bundle)) + } + + pub fn make_drop_action(&self) -> SetupChangeBundle { + let bundle = setup::SetupChangeBundle { + action: setup::FlowSetupChangeAction::Drop, + flow_names: vec![self.name().to_string()], + }; + SetupChangeBundle(Arc::new(bundle)) + } } #[pyclass] @@ -375,88 +391,75 @@ impl TransientFlow { } #[pyclass] -pub struct SetupStatus(setup::AllSetupStatus); +pub struct SetupChangeBundle(Arc); #[pymethods] -impl SetupStatus { - pub fn __str__(&self) -> String { - format!("{}", &self.0) +impl SetupChangeBundle { + pub fn describe_async<'py>(&self, py: Python<'py>) -> PyResult> { + let lib_context = get_lib_context().into_py_result()?; + let bundle = self.0.clone(); + future_into_py(py, async move { + bundle.describe(&lib_context).await.into_py_result() + }) } - pub fn __repr__(&self) -> String { - self.__str__() - } + pub fn apply_async<'py>( + &self, + py: Python<'py>, + write_to_stdout: bool, + ) -> PyResult> { + let lib_context = get_lib_context().into_py_result()?; + let bundle = self.0.clone(); - pub fn is_up_to_date(&self) -> bool { - self.0.is_up_to_date() + future_into_py(py, async move { + let mut stdout = None; + let mut sink = None; + bundle + .apply( + &lib_context, + if write_to_stdout { + stdout.insert(std::io::stdout()) + } else { + sink.insert(std::io::sink()) + }, + ) + .await + .into_py_result() + }) } } #[pyfunction] -fn sync_setup(py: Python<'_>) -> PyResult { - let lib_context = get_lib_context().into_py_result()?; - let flows = lib_context.flows.lock().unwrap(); - let all_setup_states = lib_context - .require_all_setup_states() - .into_py_result()? - .read() - .unwrap(); - py.allow_threads(|| { - get_runtime() - .block_on(async { - let setup_status = setup::sync_setup(&flows, &all_setup_states).await?; - anyhow::Ok(SetupStatus(setup_status)) - }) - .into_py_result() - }) -} - -#[pyfunction] -fn drop_setup(py: Python<'_>, flow_names: Vec) -> PyResult { - let lib_context = get_lib_context().into_py_result()?; - let all_setup_states = lib_context - .require_all_setup_states() - .into_py_result()? - .read() - .unwrap(); - py.allow_threads(|| { - get_runtime() - .block_on(async { - let setup_status = setup::drop_setup(flow_names, &all_setup_states).await?; - anyhow::Ok(SetupStatus(setup_status)) - }) - .into_py_result() +fn flow_names_with_setup_async(py: Python<'_>) -> PyResult> { + future_into_py(py, async move { + let lib_context = get_lib_context().into_py_result()?; + let setup_ctx = lib_context + .require_persistence_ctx() + .into_py_result()? + .setup_ctx + .read() + .await; + let flow_names: Vec = setup_ctx.all_setup_states.flows.keys().cloned().collect(); + PyResult::Ok(flow_names) }) } #[pyfunction] -fn flow_names_with_setup() -> PyResult> { - let lib_context = get_lib_context().into_py_result()?; - let all_setup_states = lib_context - .require_all_setup_states() - .into_py_result()? - .read() - .unwrap(); - let flow_names = all_setup_states.flows.keys().cloned().collect(); - Ok(flow_names) +fn make_setup_bundle(flow_names: Vec) -> PyResult { + let bundle = setup::SetupChangeBundle { + action: setup::FlowSetupChangeAction::Setup, + flow_names, + }; + Ok(SetupChangeBundle(Arc::new(bundle))) } #[pyfunction] -fn apply_setup_changes(py: Python<'_>, setup_status: &SetupStatus) -> PyResult<()> { - py.allow_threads(|| { - get_runtime() - .block_on(async { - let lib_context = get_lib_context()?; - setup::apply_changes( - &mut std::io::stdout(), - &setup_status.0, - lib_context.require_builtin_db_pool()?, - ) - .await - }) - .into_py_result()?; - Ok(()) - }) +fn make_drop_bundle(flow_names: Vec) -> PyResult { + let bundle = setup::SetupChangeBundle { + action: setup::FlowSetupChangeAction::Drop, + flow_names, + }; + Ok(SetupChangeBundle(Arc::new(bundle))) } #[pyfunction] @@ -487,10 +490,9 @@ fn cocoindex_engine(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_function(wrap_pyfunction!(start_server, m)?)?; m.add_function(wrap_pyfunction!(stop, m)?)?; m.add_function(wrap_pyfunction!(register_function_factory, m)?)?; - m.add_function(wrap_pyfunction!(sync_setup, m)?)?; - m.add_function(wrap_pyfunction!(drop_setup, m)?)?; - m.add_function(wrap_pyfunction!(apply_setup_changes, m)?)?; - m.add_function(wrap_pyfunction!(flow_names_with_setup, m)?)?; + m.add_function(wrap_pyfunction!(flow_names_with_setup_async, m)?)?; + m.add_function(wrap_pyfunction!(make_setup_bundle, m)?)?; + m.add_function(wrap_pyfunction!(make_drop_bundle, m)?)?; m.add_function(wrap_pyfunction!(add_auth_entry, m)?)?; m.add_class::()?; @@ -501,7 +503,7 @@ fn cocoindex_engine(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; - m.add_class::()?; + m.add_class::()?; m.add_class::()?; m.add_class::()?; m.add_class::()?; diff --git a/src/setup/db_metadata.rs b/src/setup/db_metadata.rs index 17c1692ef..b7e1fb185 100644 --- a/src/setup/db_metadata.rs +++ b/src/setup/db_metadata.rs @@ -139,7 +139,7 @@ async fn upsert_staging_changes( async fn upsert_state( flow_name: &str, type_id: &ResourceTypeKey, - state: serde_json::Value, + state: &serde_json::Value, action: WriteAction, db_executor: impl sqlx::Executor<'_, Database = sqlx::Postgres>, ) -> Result<()> { @@ -220,7 +220,7 @@ pub async fn stage_changes_for_flow( upsert_state( flow_name, &VERSION_RESOURCE_TYPE_ID, - serde_json::Value::Number(new_metadata_version.into()), + &serde_json::Value::Number(new_metadata_version.into()), if latest_metadata_version.is_some() { WriteAction::Update } else { @@ -276,7 +276,7 @@ pub async fn stage_changes_for_flow( pub async fn commit_changes_for_flow( flow_name: &str, curr_metadata_version: u64, - state_updates: HashMap, + state_updates: &HashMap, delete_version: bool, pool: &PgPool, ) -> Result<()> { @@ -289,12 +289,12 @@ pub async fn commit_changes_for_flow( StatusCode::CONFLICT, ))?; } - for (type_id, update_info) in state_updates { - match update_info.desired_state { + for (type_id, update_info) in state_updates.iter() { + match &update_info.desired_state { Some(desired_state) => { upsert_state( flow_name, - &type_id, + type_id, desired_state, WriteAction::Update, &mut *txn, @@ -302,7 +302,7 @@ pub async fn commit_changes_for_flow( .await?; } None => { - delete_state(flow_name, &type_id, &mut *txn).await?; + delete_state(flow_name, type_id, &mut *txn).await?; } } } diff --git a/src/setup/driver.rs b/src/setup/driver.rs index a68a2e172..a0072440f 100644 --- a/src/setup/driver.rs +++ b/src/setup/driver.rs @@ -1,5 +1,5 @@ use crate::{ - lib_context::get_auth_registry, + lib_context::{FlowExecutionContext, LibSetupContext, get_auth_registry}, ops::{get_optional_executor_factory, interface::ExportTargetFactory}, prelude::*, }; @@ -10,14 +10,15 @@ use std::{ str::FromStr, }; -use super::{AllSetupState, AllSetupStatus}; +use super::{AllSetupStates, GlobalSetupStatus}; use super::{ CombinedState, DesiredMode, ExistingMode, FlowSetupState, FlowSetupStatus, ObjectSetupStatus, ObjectStatus, ResourceIdentifier, ResourceSetupInfo, ResourceSetupStatus, SetupChangeType, StateChange, TargetSetupState, db_metadata, }; use crate::execution::db_tracking_setup; -use crate::{lib_context::FlowContext, ops::interface::ExecutorFactory}; +use crate::ops::interface::ExecutorFactory; +use std::fmt::Write; enum MetadataRecordType { FlowVersion, @@ -85,13 +86,13 @@ fn get_export_target_factory( } } -pub async fn get_existing_setup_state(pool: &PgPool) -> Result> { +pub async fn get_existing_setup_state(pool: &PgPool) -> Result> { let setup_metadata_records = db_metadata::read_setup_metadata(pool).await?; let setup_metadata_records = if let Some(records) = setup_metadata_records { records } else { - return Ok(AllSetupState::default()); + return Ok(AllSetupStates::default()); }; // Group setup metadata records by flow name @@ -151,7 +152,7 @@ pub async fn get_existing_setup_state(pool: &PgPool) -> Result>()?; - Ok(AllSetupState { + Ok(AllSetupStates { has_metadata_table: true, flows, }) @@ -359,57 +360,6 @@ pub async fn check_flow_setup_status( }) } -pub async fn sync_setup( - flows: &BTreeMap>, - all_setup_state: &AllSetupState, -) -> Result { - let mut flow_setup_status = BTreeMap::new(); - for (flow_name, flow_context) in flows { - let existing_state = all_setup_state.flows.get(flow_name); - let execution_ctx = flow_context.get_execution_ctx_for_setup().await; - flow_setup_status.insert( - flow_name.clone(), - check_flow_setup_status( - Some(&execution_ctx.setup_execution_context.setup_state), - existing_state, - ) - .await?, - ); - } - Ok(AllSetupStatus { - metadata_table: db_metadata::MetadataTableSetup { - metadata_table_missing: !all_setup_state.has_metadata_table, - } - .into_setup_info(), - flows: flow_setup_status, - }) -} - -pub async fn drop_setup( - flow_names: impl IntoIterator, - all_setup_state: &AllSetupState, -) -> Result { - if !all_setup_state.has_metadata_table { - api_bail!("CocoIndex metadata table is missing."); - } - let mut flow_setup_statuss = BTreeMap::new(); - for flow_name in flow_names.into_iter() { - if let Some(existing_state) = all_setup_state.flows.get(&flow_name) { - flow_setup_statuss.insert( - flow_name, - check_flow_setup_status(None, Some(existing_state)).await?, - ); - } - } - Ok(AllSetupStatus { - metadata_table: db_metadata::MetadataTableSetup { - metadata_table_missing: false, - } - .into_setup_info(), - flows: flow_setup_statuss, - }) -} - struct ResourceSetupChangeItem<'a, K: 'a, C: ResourceSetupStatus> { key: &'a K, setup_status: &'a C, @@ -423,7 +373,7 @@ async fn maybe_update_resource_setup< ChangeApplierResultFut: Future>, >( resource_kind: &str, - write: &mut impl std::io::Write, + write: &mut (dyn std::io::Write + Send), resources: impl Iterator>, apply_change: impl FnOnce(Vec>) -> ChangeApplierResultFut, ) -> Result<()> { @@ -450,10 +400,182 @@ async fn maybe_update_resource_setup< Ok(()) } -pub async fn apply_changes( - write: &mut impl std::io::Write, - setup_status: &AllSetupStatus, +async fn apply_changes_for_flow( + write: &mut (dyn std::io::Write + Send), + flow_name: &str, + flow_status: &FlowSetupStatus, + existing_setup_state: &mut Option>, pool: &PgPool, +) -> Result<()> { + let verb = match flow_status.status { + ObjectStatus::New => "Creating", + ObjectStatus::Deleted => "Deleting", + ObjectStatus::Existing => "Updating resources for ", + _ => bail!("invalid flow status"), + }; + write!(write, "\n{verb} flow {flow_name}:\n")?; + + let mut update_info = + HashMap::::new(); + + if let Some(metadata_change) = &flow_status.metadata_change { + update_info.insert( + db_metadata::ResourceTypeKey::new( + MetadataRecordType::FlowMetadata.to_string(), + serde_json::Value::Null, + ), + db_metadata::StateUpdateInfo::new(metadata_change.desired_state(), None)?, + ); + } + if let Some(tracking_table) = &flow_status.tracking_table { + if tracking_table + .setup_status + .as_ref() + .map(|c| c.change_type() != SetupChangeType::NoChange) + .unwrap_or_default() + { + update_info.insert( + db_metadata::ResourceTypeKey::new( + MetadataRecordType::TrackingTable.to_string(), + serde_json::Value::Null, + ), + db_metadata::StateUpdateInfo::new(tracking_table.state.as_ref(), None)?, + ); + } + } + + for target_resource in &flow_status.target_resources { + update_info.insert( + db_metadata::ResourceTypeKey::new( + MetadataRecordType::Target(target_resource.key.target_kind.clone()).to_string(), + target_resource.key.key.clone(), + ), + db_metadata::StateUpdateInfo::new( + target_resource.state.as_ref(), + target_resource.legacy_key.as_ref().map(|k| { + db_metadata::ResourceTypeKey::new( + MetadataRecordType::Target(k.target_kind.clone()).to_string(), + k.key.clone(), + ) + }), + )?, + ); + } + + let new_version_id = db_metadata::stage_changes_for_flow( + flow_name, + flow_status.seen_flow_metadata_version, + &update_info, + pool, + ) + .await?; + + if let Some(tracking_table) = &flow_status.tracking_table { + maybe_update_resource_setup( + "tracking table", + write, + std::iter::once(tracking_table), + |setup_status| setup_status[0].setup_status.apply_change(), + ) + .await?; + } + + let mut setup_status_by_target_kind = IndexMap::<&str, Vec<_>>::new(); + for target_resource in &flow_status.target_resources { + setup_status_by_target_kind + .entry(target_resource.key.target_kind.as_str()) + .or_default() + .push(target_resource); + } + for (target_kind, resources) in setup_status_by_target_kind.into_iter() { + maybe_update_resource_setup( + target_kind, + write, + resources.into_iter(), + |setup_status| async move { + let factory = get_export_target_factory(target_kind).ok_or_else(|| { + anyhow::anyhow!("No factory found for target kind: {}", target_kind) + })?; + factory + .apply_setup_changes( + setup_status + .into_iter() + .map(|s| interface::ResourceSetupChangeItem { + key: &s.key.key, + setup_status: s.setup_status.as_ref(), + }) + .collect(), + get_auth_registry(), + ) + .await?; + Ok(()) + }, + ) + .await?; + } + + let is_deletion = flow_status.status == ObjectStatus::Deleted; + db_metadata::commit_changes_for_flow( + flow_name, + new_version_id, + &update_info, + is_deletion, + pool, + ) + .await?; + if is_deletion { + *existing_setup_state = None; + } else { + let (existing_metadata, existing_tracking_table, existing_targets) = + match std::mem::take(existing_setup_state) { + Some(s) => (Some(s.metadata), Some(s.tracking_table), s.targets), + None => Default::default(), + }; + let metadata = CombinedState::from_change( + existing_metadata, + flow_status + .metadata_change + .as_ref() + .map(|v| v.desired_state()), + ); + let tracking_table = CombinedState::from_change( + existing_tracking_table, + flow_status.tracking_table.as_ref().map(|c| { + c.setup_status + .as_ref() + .and_then(|c| c.desired_state.as_ref()) + }), + ); + let mut targets = existing_targets; + for target_resource in &flow_status.target_resources { + match &target_resource.state { + Some(state) => { + targets.insert( + target_resource.key.clone(), + CombinedState::from_desired(state.clone()), + ); + } + None => { + targets.shift_remove(&target_resource.key); + } + } + } + *existing_setup_state = Some(setup::FlowSetupState { + metadata, + tracking_table, + seen_flow_metadata_version: Some(new_version_id), + targets, + }); + } + + writeln!(write, "Done for flow {}", flow_name)?; + Ok(()) +} + +async fn apply_global_changes( + write: &mut (dyn std::io::Write + Send), + setup_status: &GlobalSetupStatus, + all_setup_states: &mut AllSetupStates, ) -> Result<()> { maybe_update_resource_setup( "metadata table", @@ -463,127 +585,157 @@ pub async fn apply_changes( ) .await?; - for (flow_name, flow_status) in &setup_status.flows { - if flow_status.is_up_to_date() { - continue; - } - let verb = match flow_status.status { - ObjectStatus::New => "Creating", - ObjectStatus::Deleted => "Deleting", - ObjectStatus::Existing => "Updating resources for ", - _ => bail!("invalid flow status"), - }; - write!(write, "\n{verb} flow {flow_name}:\n")?; + if setup_status + .metadata_table + .setup_status + .as_ref() + .map_or(false, |c| c.change_type() == SetupChangeType::Create) + { + all_setup_states.has_metadata_table = true; + } - let mut update_info = - HashMap::::new(); + Ok(()) +} - if let Some(metadata_change) = &flow_status.metadata_change { - update_info.insert( - db_metadata::ResourceTypeKey::new( - MetadataRecordType::FlowMetadata.to_string(), - serde_json::Value::Null, - ), - db_metadata::StateUpdateInfo::new(metadata_change.desired_state(), None)?, - ); - } - if let Some(tracking_table) = &flow_status.tracking_table { - if tracking_table - .setup_status - .as_ref() - .map(|c| c.change_type() != SetupChangeType::NoChange) - .unwrap_or_default() - { - update_info.insert( - db_metadata::ResourceTypeKey::new( - MetadataRecordType::TrackingTable.to_string(), - serde_json::Value::Null, - ), - db_metadata::StateUpdateInfo::new(tracking_table.state.as_ref(), None)?, - ); +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum FlowSetupChangeAction { + Setup, + Drop, +} +pub struct SetupChangeBundle { + pub action: FlowSetupChangeAction, + pub flow_names: Vec, +} + +impl SetupChangeBundle { + async fn get_flow_setup_status<'a>( + setup_ctx: &LibSetupContext, + flow_name: &str, + flow_exec_ctx: &'a FlowExecutionContext, + action: &FlowSetupChangeAction, + buffer: &'a mut Option, + ) -> Result<&'a FlowSetupStatus> { + let result = match action { + FlowSetupChangeAction::Setup => &flow_exec_ctx.setup_status, + FlowSetupChangeAction::Drop => { + let existing_state = setup_ctx.all_setup_states.flows.get(flow_name); + buffer.insert(check_flow_setup_status(None, existing_state).await?) } - } - for target_resource in &flow_status.target_resources { - update_info.insert( - db_metadata::ResourceTypeKey::new( - MetadataRecordType::Target(target_resource.key.target_kind.clone()).to_string(), - target_resource.key.key.clone(), - ), - db_metadata::StateUpdateInfo::new( - target_resource.state.as_ref(), - target_resource.legacy_key.as_ref().map(|k| { - db_metadata::ResourceTypeKey::new( - MetadataRecordType::Target(k.target_kind.clone()).to_string(), - k.key.clone(), - ) - }), - )?, - ); - } + }; + Ok(result) + } - let new_version_id = db_metadata::stage_changes_for_flow( - flow_name, - flow_status.seen_flow_metadata_version, - &update_info, - pool, - ) - .await?; + pub async fn describe(&self, lib_context: &LibContext) -> Result<(String, bool)> { + let mut text = String::new(); + let mut is_up_to_date = true; - if let Some(tracking_table) = &flow_status.tracking_table { - maybe_update_resource_setup( - "tracking table", - write, - std::iter::once(tracking_table), - |setup_status| setup_status[0].setup_status.apply_change(), + let setup_ctx = lib_context + .require_persistence_ctx()? + .setup_ctx + .read() + .await; + let setup_ctx = &*setup_ctx; + + if self.action == FlowSetupChangeAction::Setup { + is_up_to_date = is_up_to_date && setup_ctx.global_setup_status.is_up_to_date(); + write!(&mut text, "{}", setup_ctx.global_setup_status)?; + } + + for flow_name in &self.flow_names { + let flow_ctx = { + let flows = lib_context.flows.lock().unwrap(); + flows + .get(flow_name) + .ok_or_else(|| anyhow::anyhow!("Flow instance not found: {flow_name}"))? + .clone() + }; + let flow_exec_ctx = flow_ctx.get_execution_ctx_for_setup().read().await; + + let mut setup_status_buffer = None; + let setup_status = Self::get_flow_setup_status( + setup_ctx, + flow_name, + &flow_exec_ctx, + &self.action, + &mut setup_status_buffer, ) .await?; - } - let mut setup_status_by_target_kind = IndexMap::<&str, Vec<_>>::new(); - for target_resource in &flow_status.target_resources { - setup_status_by_target_kind - .entry(target_resource.key.target_kind.as_str()) - .or_default() - .push(target_resource); + is_up_to_date = is_up_to_date && setup_status.is_up_to_date(); + write!( + &mut text, + "{}", + setup::FormattedFlowSetupStatus(flow_name, setup_status) + )?; } - for (target_kind, resources) in setup_status_by_target_kind.into_iter() { - maybe_update_resource_setup( - target_kind, + Ok((text, is_up_to_date)) + } + + pub async fn apply( + &self, + lib_context: &LibContext, + write: &mut (dyn std::io::Write + Send), + ) -> Result<()> { + let persistence_ctx = lib_context.require_persistence_ctx()?; + let mut setup_ctx = persistence_ctx.setup_ctx.write().await; + let setup_ctx = &mut *setup_ctx; + + if self.action == FlowSetupChangeAction::Setup + && !setup_ctx.global_setup_status.is_up_to_date() + { + apply_global_changes( write, - resources.into_iter(), - |setup_status| async move { - let factory = get_export_target_factory(target_kind).ok_or_else(|| { - anyhow::anyhow!("No factory found for target kind: {}", target_kind) - })?; - factory - .apply_setup_changes( - setup_status - .into_iter() - .map(|s| interface::ResourceSetupChangeItem { - key: &s.key.key, - setup_status: s.setup_status.as_ref(), - }) - .collect(), - get_auth_registry(), - ) - .await?; - Ok(()) - }, + &setup_ctx.global_setup_status, + &mut setup_ctx.all_setup_states, ) .await?; + setup_ctx.global_setup_status = + GlobalSetupStatus::from_setup_states(&setup_ctx.all_setup_states); } - let is_deletion = flow_status.status == ObjectStatus::Deleted; - db_metadata::commit_changes_for_flow( - flow_name, - new_version_id, - update_info, - is_deletion, - pool, - ) - .await?; + for flow_name in &self.flow_names { + let flow_ctx = { + let flows = lib_context.flows.lock().unwrap(); + flows + .get(flow_name) + .ok_or_else(|| anyhow::anyhow!("Flow instance not found: {flow_name}"))? + .clone() + }; + let mut flow_exec_ctx = flow_ctx.get_execution_ctx_for_setup().write().await; + + let mut setup_status_buffer = None; + let setup_status = Self::get_flow_setup_status( + setup_ctx, + flow_name, + &flow_exec_ctx, + &self.action, + &mut setup_status_buffer, + ) + .await?; + if setup_status.is_up_to_date() { + continue; + } - writeln!(write, "Done for flow {}", flow_name)?; + let mut flow_states = setup_ctx.all_setup_states.flows.remove(flow_name); + apply_changes_for_flow( + write, + flow_name, + setup_status, + &mut flow_states, + &persistence_ctx.builtin_db_pool, + ) + .await?; + + flow_exec_ctx + .update_setup_state(&flow_ctx.flow, flow_states.as_ref()) + .await?; + if let Some(flow_states) = flow_states { + setup_ctx + .all_setup_states + .flows + .insert(flow_name.to_string(), flow_states); + } + } + Ok(()) } - Ok(()) } diff --git a/src/setup/states.rs b/src/setup/states.rs index d1dda2166..93da48f3a 100644 --- a/src/setup/states.rs +++ b/src/setup/states.rs @@ -43,10 +43,35 @@ impl StateMode for DesiredMode { pub struct CombinedState { pub current: Option, pub staging: Vec>, + /// Legacy state keys that no longer identical to the latest serialized form (usually caused by code change). + /// They will be deleted when the next change is applied. pub legacy_state_key: Option, } impl CombinedState { + pub fn from_desired(desired: T) -> Self { + Self { + current: Some(desired), + staging: vec![], + legacy_state_key: None, + } + } + + pub fn from_change(prev: Option>, change: Option>) -> Self + where + T: Clone, + { + Self { + current: match change { + Some(Some(state)) => Some(state.clone()), + Some(None) => None, + None => prev.and_then(|v| v.current), + }, + staging: vec![], + legacy_state_key: None, + } + } + pub fn possible_versions(&self) -> impl Iterator { self.current .iter() @@ -196,12 +221,12 @@ impl PartialEq for FlowSetupState { } #[derive(Debug, Clone)] -pub struct AllSetupState { +pub struct AllSetupStates { pub has_metadata_table: bool, pub flows: BTreeMap>, } -impl Default for AllSetupState { +impl Default for AllSetupStates { fn default() -> Self { Self { has_metadata_table: false, @@ -341,16 +366,22 @@ impl ObjectSetupStatus for FlowSetupStatus { } #[derive(Debug)] -pub struct AllSetupStatus { +pub struct GlobalSetupStatus { pub metadata_table: ResourceSetupInfo<(), (), db_metadata::MetadataTableSetup>, - - pub flows: BTreeMap, } -impl AllSetupStatus { +impl GlobalSetupStatus { + pub fn from_setup_states(setup_states: &AllSetupStates) -> Self { + Self { + metadata_table: db_metadata::MetadataTableSetup { + metadata_table_missing: !setup_states.has_metadata_table, + } + .into_setup_info(), + } + } + pub fn is_up_to_date(&self) -> bool { self.metadata_table.is_up_to_date() - && self.flows.iter().all(|(_, flow)| flow.is_up_to_date()) } } @@ -375,7 +406,13 @@ impl std::fmt::Display for ObjectSetupStatusCode<'_, } } -pub struct FormattedFlowSetupStatus<'a>(&'a str, &'a FlowSetupStatus); +impl std::fmt::Display for GlobalSetupStatus { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + writeln!(f, "{}", self.metadata_table) + } +} + +pub struct FormattedFlowSetupStatus<'a>(pub &'a str, pub &'a FlowSetupStatus); impl std::fmt::Display for FormattedFlowSetupStatus<'_> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { @@ -383,11 +420,11 @@ impl std::fmt::Display for FormattedFlowSetupStatus<'_> { write!( f, - "{} {}\n", + "{} Flow: {}\n", ObjectSetupStatusCode(flow_ssc) .to_string() .color(AnsiColors::Cyan), - format!("Flow: {}", self.0) + self.0 )?; let mut f = indented(f).with_str(INDENT); @@ -404,13 +441,3 @@ impl std::fmt::Display for FormattedFlowSetupStatus<'_> { Ok(()) } } - -impl std::fmt::Display for AllSetupStatus { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - writeln!(f, "{}", self.metadata_table)?; - for (flow_name, flow_status) in &self.flows { - writeln!(f, "{}", FormattedFlowSetupStatus(flow_name, flow_status))?; - } - Ok(()) - } -}