From 841c8ea4a8867935ce7bd4a496f670073839d952 Mon Sep 17 00:00:00 2001 From: Jiangzhou He Date: Mon, 18 Aug 2025 18:44:48 -0700 Subject: [PATCH] refactor: rename target setup related stuffs for ease of understanding --- src/builder/plan.rs | 4 +- src/execution/db_tracking_setup.rs | 14 ++-- src/lib_context.rs | 26 +++---- src/ops/factory_bases.rs | 52 +++++++------- src/ops/interface.rs | 12 ++-- src/ops/py_factory.rs | 26 +++---- src/ops/targets/kuzu.rs | 32 ++++----- src/ops/targets/neo4j.rs | 28 ++++---- src/ops/targets/postgres.rs | 20 +++--- src/ops/targets/qdrant.rs | 26 +++---- src/setup/components.rs | 12 ++-- src/setup/db_metadata.rs | 6 +- src/setup/driver.rs | 112 ++++++++++++++--------------- src/setup/states.rs | 60 ++++++++-------- 14 files changed, 215 insertions(+), 215 deletions(-) diff --git a/src/builder/plan.rs b/src/builder/plan.rs index 03de2b78a..665c907c5 100644 --- a/src/builder/plan.rs +++ b/src/builder/plan.rs @@ -101,7 +101,7 @@ pub enum AnalyzedPrimaryKeyDef { pub struct AnalyzedExportOp { pub name: String, pub input: AnalyzedLocalCollectorReference, - pub export_target_factory: Arc, + pub export_target_factory: Arc, pub export_context: Arc, pub primary_key_def: AnalyzedPrimaryKeyDef, pub primary_key_type: schema::ValueType, @@ -113,7 +113,7 @@ pub struct AnalyzedExportOp { } pub struct AnalyzedExportTargetOpGroup { - pub target_factory: Arc, + pub target_factory: Arc, pub op_idx: Vec, } diff --git a/src/execution/db_tracking_setup.rs b/src/execution/db_tracking_setup.rs index b55646768..aacd48c4e 100644 --- a/src/execution/db_tracking_setup.rs +++ b/src/execution/db_tracking_setup.rs @@ -1,6 +1,6 @@ use crate::prelude::*; -use crate::setup::{CombinedState, ResourceSetupInfo, ResourceSetupStatus, SetupChangeType}; +use crate::setup::{CombinedState, ResourceSetupChange, ResourceSetupInfo, SetupChangeType}; use serde::{Deserialize, Serialize}; use sqlx::PgPool; @@ -76,7 +76,7 @@ pub struct TrackingTableSetupState { } #[derive(Debug)] -pub struct TrackingTableSetupStatus { +pub struct TrackingTableSetupChange { pub desired_state: Option, pub min_existing_version_id: Option, @@ -88,7 +88,7 @@ pub struct TrackingTableSetupStatus { pub source_ids_to_delete: Vec, } -impl TrackingTableSetupStatus { +impl TrackingTableSetupChange { pub fn new( desired: Option<&TrackingTableSetupState>, existing: &CombinedState, @@ -127,18 +127,18 @@ impl TrackingTableSetupStatus { pub fn into_setup_info( self, - ) -> ResourceSetupInfo<(), TrackingTableSetupState, TrackingTableSetupStatus> { + ) -> ResourceSetupInfo<(), TrackingTableSetupState, TrackingTableSetupChange> { ResourceSetupInfo { key: (), state: self.desired_state.clone(), description: "Internal Storage for Tracking".to_string(), - setup_status: Some(self), + setup_change: Some(self), legacy_key: None, } } } -impl ResourceSetupStatus for TrackingTableSetupStatus { +impl ResourceSetupChange for TrackingTableSetupChange { fn describe_changes(&self) -> Vec { let mut changes: Vec = vec![]; if self.desired_state.is_some() && !self.legacy_tracking_table_names.is_empty() { @@ -234,7 +234,7 @@ impl ResourceSetupStatus for TrackingTableSetupStatus { } } -impl TrackingTableSetupStatus { +impl TrackingTableSetupChange { pub async fn apply_change(&self) -> Result<()> { let lib_context = get_lib_context()?; let pool = lib_context.require_builtin_db_pool()?; diff --git a/src/lib_context.rs b/src/lib_context.rs index c9c17c2f7..647052e77 100644 --- a/src/lib_context.rs +++ b/src/lib_context.rs @@ -6,7 +6,7 @@ use crate::builder::AnalyzedFlow; use crate::execution::source_indexer::SourceIndexingContext; use crate::service::error::ApiError; use crate::settings; -use crate::setup::ObjectSetupStatus; +use crate::setup::ObjectSetupChange; use axum::http::StatusCode; use sqlx::PgPool; use sqlx::postgres::{PgConnectOptions, PgPoolOptions}; @@ -14,7 +14,7 @@ use tokio::runtime::Runtime; pub struct FlowExecutionContext { pub setup_execution_context: Arc, - pub setup_status: setup::FlowSetupStatus, + pub setup_change: setup::FlowSetupChange, source_indexing_contexts: Vec>>, } @@ -23,7 +23,7 @@ async fn build_setup_context( existing_flow_ss: Option<&setup::FlowSetupState>, ) -> Result<( Arc, - setup::FlowSetupStatus, + setup::FlowSetupChange, )> { let setup_execution_context = Arc::new(exec_ctx::build_flow_setup_execution_context( &analyzed_flow.flow_instance, @@ -32,14 +32,14 @@ async fn build_setup_context( existing_flow_ss, )?); - let setup_status = setup::check_flow_setup_status( + let setup_change = setup::diff_flow_setup_states( Some(&setup_execution_context.setup_state), existing_flow_ss, &analyzed_flow.flow_instance_ctx, ) .await?; - Ok((setup_execution_context, setup_status)) + Ok((setup_execution_context, setup_change)) } impl FlowExecutionContext { @@ -47,7 +47,7 @@ impl FlowExecutionContext { analyzed_flow: &AnalyzedFlow, existing_flow_ss: Option<&setup::FlowSetupState>, ) -> Result { - let (setup_execution_context, setup_status) = + let (setup_execution_context, setup_change) = build_setup_context(analyzed_flow, existing_flow_ss).await?; let mut source_indexing_contexts = Vec::new(); @@ -57,7 +57,7 @@ impl FlowExecutionContext { Ok(Self { setup_execution_context, - setup_status, + setup_change, source_indexing_contexts, }) } @@ -67,11 +67,11 @@ impl FlowExecutionContext { analyzed_flow: &AnalyzedFlow, existing_flow_ss: Option<&setup::FlowSetupState>, ) -> Result<()> { - let (setup_execution_context, setup_status) = + let (setup_execution_context, setup_change) = build_setup_context(analyzed_flow, existing_flow_ss).await?; self.setup_execution_context = setup_execution_context; - self.setup_status = setup_status; + self.setup_change = setup_change; Ok(()) } @@ -124,7 +124,7 @@ impl FlowContext { &self, ) -> Result> { let execution_ctx = self.execution_ctx.read().await; - if !execution_ctx.setup_status.is_up_to_date() { + if !execution_ctx.setup_change.is_up_to_date() { api_bail!( "Setup for flow `{}` is not up-to-date. Please run `cocoindex setup` to update the setup.", self.flow_name() @@ -137,7 +137,7 @@ impl FlowContext { &self, ) -> Result> { let execution_ctx = self.execution_ctx.clone().read_owned().await; - if !execution_ctx.setup_status.is_up_to_date() { + if !execution_ctx.setup_change.is_up_to_date() { api_bail!( "Setup for flow `{}` is not up-to-date. Please run `cocoindex setup` to update the setup.", self.flow_name() @@ -212,7 +212,7 @@ impl DbPools { pub struct LibSetupContext { pub all_setup_states: setup::AllSetupStates, - pub global_setup_status: setup::GlobalSetupStatus, + pub global_setup_change: setup::GlobalSetupChange, } pub struct PersistenceContext { pub builtin_db_pool: PgPool, @@ -286,7 +286,7 @@ pub fn create_lib_context(settings: settings::Settings) -> Result { Some(PersistenceContext { builtin_db_pool: pool, setup_ctx: tokio::sync::RwLock::new(LibSetupContext { - global_setup_status: setup::GlobalSetupStatus::from_setup_states(&all_setup_states), + global_setup_change: setup::GlobalSetupChange::from_setup_states(&all_setup_states), all_setup_states, }), }) diff --git a/src/ops/factory_bases.rs b/src/ops/factory_bases.rs index 880f4444f..30d96ea2e 100644 --- a/src/ops/factory_bases.rs +++ b/src/ops/factory_bases.rs @@ -1,5 +1,5 @@ use crate::prelude::*; -use crate::setup::ResourceSetupStatus; +use crate::setup::ResourceSetupChange; use std::fmt::Debug; use std::hash::Hash; @@ -350,12 +350,12 @@ impl SimpleFunctionFactory for T { } } -pub struct TypedExportDataCollectionBuildOutput { +pub struct TypedExportDataCollectionBuildOutput { pub export_context: BoxFuture<'static, Result>>, pub setup_key: F::Key, pub desired_setup_state: F::SetupState, } -pub struct TypedExportDataCollectionSpec { +pub struct TypedExportDataCollectionSpec { pub name: String, pub spec: F::Spec, pub key_fields_schema: Vec, @@ -363,18 +363,18 @@ pub struct TypedExportDataCollectionSpec { pub index_options: IndexOptions, } -pub struct TypedResourceSetupChangeItem<'a, F: StorageFactoryBase + ?Sized> { +pub struct TypedResourceSetupChangeItem<'a, F: TargetFactoryBase + ?Sized> { pub key: F::Key, - pub setup_status: &'a F::SetupStatus, + pub setup_change: &'a F::SetupChange, } #[async_trait] -pub trait StorageFactoryBase: ExportTargetFactory + Send + Sync + 'static { +pub trait TargetFactoryBase: TargetFactory + Send + Sync + 'static { type Spec: DeserializeOwned + Send + Sync; type DeclarationSpec: DeserializeOwned + Send + Sync; type Key: Debug + Clone + Serialize + DeserializeOwned + Eq + Hash + Send + Sync; type SetupState: Debug + Clone + Serialize + DeserializeOwned + Send + Sync; - type SetupStatus: ResourceSetupStatus; + type SetupChange: ResourceSetupChange; type ExportContext: Send + Sync + 'static; fn name(&self) -> &str; @@ -397,13 +397,13 @@ pub trait StorageFactoryBase: ExportTargetFactory + Send + Sync + 'static { /// Will not be called if it's setup by user. /// It returns an error if the target only supports setup by user. - async fn check_setup_status( + async fn diff_setup_states( &self, key: Self::Key, desired_state: Option, existing_states: setup::CombinedState, flow_instance_ctx: Arc, - ) -> Result; + ) -> Result; fn check_state_compatibility( &self, @@ -439,13 +439,13 @@ pub trait StorageFactoryBase: ExportTargetFactory + Send + Sync + 'static { async fn apply_setup_changes( &self, - setup_status: Vec>, + setup_change: Vec>, context: Arc, ) -> Result<()>; } #[async_trait] -impl ExportTargetFactory for T { +impl TargetFactory for T { async fn build( self: Arc, data_collections: Vec, @@ -455,7 +455,7 @@ impl ExportTargetFactory for T { Vec, Vec<(serde_json::Value, serde_json::Value)>, )> { - let (data_coll_output, decl_output) = StorageFactoryBase::build( + let (data_coll_output, decl_output) = TargetFactoryBase::build( self, data_collections .into_iter() @@ -497,19 +497,19 @@ impl ExportTargetFactory for T { Ok((data_coll_output, decl_output)) } - async fn check_setup_status( + async fn diff_setup_states( &self, key: &serde_json::Value, desired_state: Option, existing_states: setup::CombinedState, flow_instance_ctx: Arc, - ) -> Result> { + ) -> Result> { let key: T::Key = Self::deserialize_setup_key(key.clone())?; let desired_state: Option = desired_state .map(|v| serde_json::from_value(v.clone())) .transpose()?; let existing_states = from_json_combined_state(existing_states)?; - let setup_status = StorageFactoryBase::check_setup_status( + let setup_change = TargetFactoryBase::diff_setup_states( self, key, desired_state, @@ -517,12 +517,12 @@ impl ExportTargetFactory for T { flow_instance_ctx, ) .await?; - Ok(Box::new(setup_status)) + Ok(Box::new(setup_change)) } fn describe_resource(&self, key: &serde_json::Value) -> Result { let key: T::Key = Self::deserialize_setup_key(key.clone())?; - StorageFactoryBase::describe_resource(self, &key) + TargetFactoryBase::describe_resource(self, &key) } fn normalize_setup_key(&self, key: &serde_json::Value) -> Result { @@ -535,7 +535,7 @@ impl ExportTargetFactory for T { desired_state: &serde_json::Value, existing_state: &serde_json::Value, ) -> Result { - let result = StorageFactoryBase::check_state_compatibility( + let result = TargetFactoryBase::check_state_compatibility( self, &serde_json::from_value(desired_state.clone())?, &serde_json::from_value(existing_state.clone())?, @@ -543,13 +543,15 @@ impl ExportTargetFactory for T { Ok(result) } + /// Extract additional keys that are passed through as part of the mutation to `apply_mutation()`. + /// This is useful for targets that need to use additional parts as key for the target (which is not considered as part of the key for cocoindex). fn extract_additional_key( &self, key: &value::KeyValue, value: &value::FieldValues, export_context: &(dyn Any + Send + Sync), ) -> Result { - StorageFactoryBase::extract_additional_key( + TargetFactoryBase::extract_additional_key( self, key, value, @@ -575,23 +577,23 @@ impl ExportTargetFactory for T { }) }) .collect::>()?; - StorageFactoryBase::apply_mutation(self, mutations).await + TargetFactoryBase::apply_mutation(self, mutations).await } async fn apply_setup_changes( &self, - setup_status: Vec>, + setup_change: Vec>, context: Arc, ) -> Result<()> { - StorageFactoryBase::apply_setup_changes( + TargetFactoryBase::apply_setup_changes( self, - setup_status + setup_change .into_iter() .map(|item| -> anyhow::Result<_> { Ok(TypedResourceSetupChangeItem { key: serde_json::from_value(item.key.clone())?, - setup_status: (item.setup_status as &dyn Any) - .downcast_ref::() + setup_change: (item.setup_change as &dyn Any) + .downcast_ref::() .ok_or_else(invariance_violation)?, }) }) diff --git a/src/ops/interface.rs b/src/ops/interface.rs index d896b60bf..1e865d0ac 100644 --- a/src/ops/interface.rs +++ b/src/ops/interface.rs @@ -232,7 +232,7 @@ pub struct ExportTargetMutationWithContext<'ctx, T: ?Sized + Send + Sync> { pub struct ResourceSetupChangeItem<'a> { pub key: &'a serde_json::Value, - pub setup_status: &'a dyn setup::ResourceSetupStatus, + pub setup_change: &'a dyn setup::ResourceSetupChange, } #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -263,7 +263,7 @@ pub struct ExportDataCollectionSpec { } #[async_trait] -pub trait ExportTargetFactory: Send + Sync { +pub trait TargetFactory: Send + Sync { async fn build( self: Arc, data_collections: Vec, @@ -276,13 +276,13 @@ pub trait ExportTargetFactory: Send + Sync { /// Will not be called if it's setup by user. /// It returns an error if the target only supports setup by user. - async fn check_setup_status( + async fn diff_setup_states( &self, key: &serde_json::Value, desired_state: Option, existing_states: setup::CombinedState, context: Arc, - ) -> Result>; + ) -> Result>; /// Normalize the key. e.g. the JSON format may change (after code change, e.g. new optional field or field ordering), even if the underlying value is not changed. /// This should always return the canonical serialized form. @@ -310,7 +310,7 @@ pub trait ExportTargetFactory: Send + Sync { async fn apply_setup_changes( &self, - setup_status: Vec>, + setup_change: Vec>, context: Arc, ) -> Result<()>; } @@ -319,5 +319,5 @@ pub trait ExportTargetFactory: Send + Sync { pub enum ExecutorFactory { Source(Arc), SimpleFunction(Arc), - ExportTarget(Arc), + ExportTarget(Arc), } diff --git a/src/ops/py_factory.rs b/src/ops/py_factory.rs index 4938d5844..d02f811a3 100644 --- a/src/ops/py_factory.rs +++ b/src/ops/py_factory.rs @@ -232,12 +232,12 @@ struct PyTargetExecutorContext { } #[derive(Debug)] -struct PyTargetResourceSetupStatus { +struct PyTargetResourceSetupChange { stale_existing_states: IndexSet>, desired_state: Option, } -impl setup::ResourceSetupStatus for PyTargetResourceSetupStatus { +impl setup::ResourceSetupChange for PyTargetResourceSetupChange { fn describe_changes(&self) -> Vec { vec![] } @@ -262,7 +262,7 @@ impl setup::ResourceSetupStatus for PyTargetResourceSetupStatus { } #[async_trait] -impl interface::ExportTargetFactory for PyExportTargetFactory { +impl interface::TargetFactory for PyExportTargetFactory { async fn build( self: Arc, data_collections: Vec, @@ -326,13 +326,13 @@ impl interface::ExportTargetFactory for PyExportTargetFactory { Ok((build_outputs, vec![])) } - async fn check_setup_status( + async fn diff_setup_states( &self, _key: &serde_json::Value, desired_state: Option, existing_states: setup::CombinedState, _context: Arc, - ) -> Result> { + ) -> Result> { // Collect all possible existing states that are not the desired state. let mut stale_existing_states = IndexSet::new(); if !existing_states.always_exists() && desired_state.is_some() { @@ -344,7 +344,7 @@ impl interface::ExportTargetFactory for PyExportTargetFactory { } } - Ok(Box::new(PyTargetResourceSetupStatus { + Ok(Box::new(PyTargetResourceSetupChange { stale_existing_states, desired_state, })) @@ -385,23 +385,23 @@ impl interface::ExportTargetFactory for PyExportTargetFactory { async fn apply_setup_changes( &self, - setup_status: Vec>, + setup_change: Vec>, context: Arc, ) -> Result<()> { // Filter the setup changes that are not NoChange, and flatten to // `list[tuple[key, list[stale_existing_states | None], desired_state | None]]` for Python. let mut setup_changes = Vec::new(); - for item in setup_status.into_iter() { - let decoded_setup_status = (item.setup_status as &dyn Any) - .downcast_ref::() + for item in setup_change.into_iter() { + let decoded_setup_change = (item.setup_change as &dyn Any) + .downcast_ref::() .ok_or_else(invariance_violation)?; - if ::change_type(decoded_setup_status) + if ::change_type(decoded_setup_change) != setup::SetupChangeType::NoChange { setup_changes.push(( item.key, - &decoded_setup_status.stale_existing_states, - &decoded_setup_status.desired_state, + &decoded_setup_change.stale_existing_states, + &decoded_setup_change.desired_state, )); } } diff --git a/src/ops/targets/kuzu.rs b/src/ops/targets/kuzu.rs index 283034018..0144293cd 100644 --- a/src/ops/targets/kuzu.rs +++ b/src/ops/targets/kuzu.rs @@ -180,13 +180,13 @@ impl<'a> From<&'a SetupState> for Cow<'a, TableColumnsSchema> { } #[derive(Debug)] -struct GraphElementDataSetupStatus { +struct GraphElementDataSetupChange { actions: TableMainSetupAction, referenced_node_tables: Option<(String, String)>, drop_affected_referenced_node_tables: IndexSet, } -impl setup::ResourceSetupStatus for GraphElementDataSetupStatus { +impl setup::ResourceSetupChange for GraphElementDataSetupChange { fn describe_changes(&self) -> Vec { self.actions.describe_changes() } @@ -198,10 +198,10 @@ impl setup::ResourceSetupStatus for GraphElementDataSetupStatus { fn append_drop_table( cypher: &mut CypherBuilder, - setup_status: &GraphElementDataSetupStatus, + setup_change: &GraphElementDataSetupChange, elem_type: &ElementType, ) -> Result<()> { - if !setup_status.actions.drop_existing { + if !setup_change.actions.drop_existing { return Ok(()); } writeln!( @@ -222,10 +222,10 @@ fn append_delete_orphaned_nodes(cypher: &mut CypherBuilder, node_table: &str) -> fn append_upsert_table( cypher: &mut CypherBuilder, - setup_status: &GraphElementDataSetupStatus, + setup_change: &GraphElementDataSetupChange, elem_type: &ElementType, ) -> Result<()> { - let table_upsertion = if let Some(table_upsertion) = &setup_status.actions.table_upsertion { + let table_upsertion = if let Some(table_upsertion) = &setup_change.actions.table_upsertion { table_upsertion } else { return Ok(()); @@ -238,7 +238,7 @@ fn append_upsert_table( kuzu_table_type = kuzu_table_type(elem_type), table_name = elem_type.label(), )?; - if let Some((src, tgt)) = &setup_status.referenced_node_tables { + if let Some((src, tgt)) = &setup_change.referenced_node_tables { write!(cypher.query_mut(), "FROM {src} TO {tgt}, ")?; } cypher.query_mut().push_str( @@ -739,11 +739,11 @@ struct Factory { } #[async_trait] -impl StorageFactoryBase for Factory { +impl TargetFactoryBase for Factory { type Spec = Spec; type DeclarationSpec = Declaration; type SetupState = SetupState; - type SetupStatus = GraphElementDataSetupStatus; + type SetupChange = GraphElementDataSetupChange; type Key = KuzuGraphElement; type ExportContext = ExportContext; @@ -842,13 +842,13 @@ impl StorageFactoryBase for Factory { Ok((data_coll_outputs, decl_output)) } - async fn check_setup_status( + async fn diff_setup_states( &self, _key: KuzuGraphElement, desired: Option, existing: CombinedState, _flow_instance_ctx: Arc, - ) -> Result { + ) -> Result { let existing_invalidated = desired.as_ref().is_some_and(|desired| { existing .possible_versions() @@ -865,7 +865,7 @@ impl StorageFactoryBase for Factory { } else { IndexSet::new() }; - Ok(GraphElementDataSetupStatus { + Ok(GraphElementDataSetupChange { actions, referenced_node_tables: desired .and_then(|desired| desired.referenced_node_tables) @@ -1063,14 +1063,14 @@ impl StorageFactoryBase for Factory { let mut cypher = CypherBuilder::new(); // Relationships first when dropping. for change in rel_changes.iter().chain(node_changes.iter()) { - if !change.setup_status.actions.drop_existing { + if !change.setup_change.actions.drop_existing { continue; } - append_drop_table(&mut cypher, change.setup_status, &change.key.typ)?; + append_drop_table(&mut cypher, change.setup_change, &change.key.typ)?; partial_affected_node_tables.extend( change - .setup_status + .setup_change .drop_affected_referenced_node_tables .iter(), ); @@ -1080,7 +1080,7 @@ impl StorageFactoryBase for Factory { } // Nodes first when creating. for change in node_changes.iter().chain(rel_changes.iter()) { - append_upsert_table(&mut cypher, change.setup_status, &change.key.typ)?; + append_upsert_table(&mut cypher, change.setup_change, &change.key.typ)?; } for table in partial_affected_node_tables { diff --git a/src/ops/targets/neo4j.rs b/src/ops/targets/neo4j.rs index 0acf2d7b1..f8a88e847 100644 --- a/src/ops/targets/neo4j.rs +++ b/src/ops/targets/neo4j.rs @@ -3,7 +3,7 @@ use crate::prelude::*; use super::shared::property_graph::*; use crate::setup::components::{self, State, apply_component_changes}; -use crate::setup::{ResourceSetupStatus, SetupChangeType}; +use crate::setup::{ResourceSetupChange, SetupChangeType}; use crate::{ops::sdk::*, setup::CombinedState}; use indoc::formatdoc; @@ -800,12 +800,12 @@ fn build_composite_field_names(qualifier: &str, field_names: &[String]) -> Strin } } #[derive(Debug)] -pub struct GraphElementDataSetupStatus { +pub struct GraphElementDataSetupChange { data_clear: Option, change_type: SetupChangeType, } -impl GraphElementDataSetupStatus { +impl GraphElementDataSetupChange { fn new(desired_state: Option<&SetupState>, existing: &CombinedState) -> Self { let mut data_clear: Option = None; for v in existing.possible_versions() { @@ -839,7 +839,7 @@ impl GraphElementDataSetupStatus { } } -impl ResourceSetupStatus for GraphElementDataSetupStatus { +impl ResourceSetupChange for GraphElementDataSetupChange { fn describe_changes(&self) -> Vec { let mut result = vec![]; if let Some(data_clear) = &self.data_clear { @@ -918,13 +918,13 @@ impl Factory { } #[async_trait] -impl StorageFactoryBase for Factory { +impl TargetFactoryBase for Factory { type Spec = Spec; type DeclarationSpec = Declaration; type SetupState = SetupState; - type SetupStatus = ( - GraphElementDataSetupStatus, - components::SetupStatus, + type SetupChange = ( + GraphElementDataSetupChange, + components::SetupChange, ); type Key = Neo4jGraphElement; type ExportContext = ExportContext; @@ -1004,18 +1004,18 @@ impl StorageFactoryBase for Factory { Ok((data_coll_output, decl_output)) } - async fn check_setup_status( + async fn diff_setup_states( &self, key: Neo4jGraphElement, desired: Option, existing: CombinedState, flow_instance_ctx: Arc, - ) -> Result { + ) -> Result { let conn_spec = flow_instance_ctx .auth_registry .get::(&key.connection)?; - let data_status = GraphElementDataSetupStatus::new(desired.as_ref(), &existing); - let components = components::SetupStatus::create( + let data_status = GraphElementDataSetupChange::new(desired.as_ref(), &existing); + let components = components::SetupChange::create( SetupComponentOperator { graph_pool: self.graph_pool.clone(), conn_spec, @@ -1093,7 +1093,7 @@ impl StorageFactoryBase for Factory { let mut components = vec![]; for change in changes.iter() { - if let Some(data_clear) = &change.setup_status.0.data_clear { + if let Some(data_clear) = &change.setup_change.0.data_clear { match &change.key.typ { ElementType::Relationship(_) => { relationship_types.insert(&change.key); @@ -1109,7 +1109,7 @@ impl StorageFactoryBase for Factory { } } } - components.push(&change.setup_status.1); + components.push(&change.setup_change.1); } // Relationships have no dependency, so can be cleared first. diff --git a/src/ops/targets/postgres.rs b/src/ops/targets/postgres.rs index 7ba6ceb2f..80a1580a7 100644 --- a/src/ops/targets/postgres.rs +++ b/src/ops/targets/postgres.rs @@ -423,13 +423,13 @@ pub struct TableSetupAction { } #[derive(Debug)] -pub struct SetupStatus { +pub struct SetupChange { create_pgvector_extension: bool, actions: TableSetupAction, vector_as_jsonb_columns: Vec<(String, ValueType)>, } -impl SetupStatus { +impl SetupChange { fn new(desired_state: Option, existing: setup::CombinedState) -> Self { let table_action = TableMainSetupAction::from_states(desired_state.as_ref(), &existing, false); @@ -533,7 +533,7 @@ fn describe_index_spec(index_name: &str, index_spec: &VectorIndexDef) -> String format!("{} {}", index_name, to_index_spec_sql(index_spec)) } -impl setup::ResourceSetupStatus for SetupStatus { +impl setup::ResourceSetupChange for SetupChange { fn describe_changes(&self) -> Vec { let mut descriptions = self.actions.table_action.describe_changes(); for (column_name, schema) in self.vector_as_jsonb_columns.iter() { @@ -574,7 +574,7 @@ impl setup::ResourceSetupStatus for SetupStatus { } } -impl SetupStatus { +impl SetupChange { async fn apply_change(&self, db_pool: &PgPool, table_name: &str) -> Result<()> { if self.actions.table_action.drop_existing { sqlx::query(&format!("DROP TABLE IF EXISTS {table_name}")) @@ -651,11 +651,11 @@ async fn get_db_pool( } #[async_trait] -impl StorageFactoryBase for Factory { +impl TargetFactoryBase for Factory { type Spec = Spec; type DeclarationSpec = (); type SetupState = SetupState; - type SetupStatus = SetupStatus; + type SetupChange = SetupChange; type Key = TableId; type ExportContext = ExportContext; @@ -714,14 +714,14 @@ impl StorageFactoryBase for Factory { Ok((data_coll_output, vec![])) } - async fn check_setup_status( + async fn diff_setup_states( &self, _key: TableId, desired: Option, existing: setup::CombinedState, _flow_instance_ctx: Arc, - ) -> Result { - Ok(SetupStatus::new(desired, existing)) + ) -> Result { + Ok(SetupChange::new(desired, existing)) } fn check_state_compatibility( @@ -782,7 +782,7 @@ impl StorageFactoryBase for Factory { for change in changes.iter() { let db_pool = get_db_pool(change.key.database.as_ref(), &context.auth_registry).await?; change - .setup_status + .setup_change .apply_change(&db_pool, &change.key.table_name) .await?; } diff --git a/src/ops/targets/qdrant.rs b/src/ops/targets/qdrant.rs index 414291f84..a7e9006b8 100644 --- a/src/ops/targets/qdrant.rs +++ b/src/ops/targets/qdrant.rs @@ -152,12 +152,12 @@ struct SetupState { } #[derive(Debug)] -struct SetupStatus { +struct SetupChange { delete_collection: bool, add_collection: Option, } -impl setup::ResourceSetupStatus for SetupStatus { +impl setup::ResourceSetupChange for SetupChange { fn describe_changes(&self) -> Vec { let mut result = vec![]; if self.delete_collection { @@ -205,7 +205,7 @@ impl setup::ResourceSetupStatus for SetupStatus { } } -impl SetupStatus { +impl SetupChange { async fn apply_delete(&self, collection_name: &String, qdrant_client: &Qdrant) -> Result<()> { if self.delete_collection { qdrant_client.delete_collection(collection_name).await?; @@ -363,11 +363,11 @@ impl Display for CollectionId { } #[async_trait] -impl StorageFactoryBase for Factory { +impl TargetFactoryBase for Factory { type Spec = Spec; type DeclarationSpec = (); type SetupState = SetupState; - type SetupStatus = SetupStatus; + type SetupChange = SetupChange; type Key = CollectionKey; type ExportContext = ExportContext; @@ -484,13 +484,13 @@ impl StorageFactoryBase for Factory { }) } - async fn check_setup_status( + async fn diff_setup_states( &self, _key: CollectionKey, desired: Option, existing: setup::CombinedState, _flow_instance_ctx: Arc, - ) -> Result { + ) -> Result { let desired_exists = desired.is_some(); let add_collection = desired.filter(|state| { !existing.always_exists() @@ -500,7 +500,7 @@ impl StorageFactoryBase for Factory { }); let delete_collection = existing.possible_versions().next().is_some() && (!desired_exists || add_collection.is_some()); - Ok(SetupStatus { + Ok(SetupChange { delete_collection, add_collection, }) @@ -543,22 +543,22 @@ impl StorageFactoryBase for Factory { async fn apply_setup_changes( &self, - setup_status: Vec>, + setup_change: Vec>, context: Arc, ) -> Result<()> { - for setup_change in setup_status.iter() { + for setup_change in setup_change.iter() { let qdrant_client = self.get_qdrant_client(&setup_change.key.connection, &context.auth_registry)?; setup_change - .setup_status + .setup_change .apply_delete(&setup_change.key.collection_name, &qdrant_client) .await?; } - for setup_change in setup_status.iter() { + for setup_change in setup_change.iter() { let qdrant_client = self.get_qdrant_client(&setup_change.key.connection, &context.auth_registry)?; setup_change - .setup_status + .setup_change .apply_create(&setup_change.key.collection_name, &qdrant_client) .await?; } diff --git a/src/setup/components.rs b/src/setup/components.rs index fdb55d750..956e18bd7 100644 --- a/src/setup/components.rs +++ b/src/setup/components.rs @@ -1,4 +1,4 @@ -use super::{CombinedState, ResourceSetupStatus, SetupChangeType, StateChange}; +use super::{CombinedState, ResourceSetupChange, SetupChangeType, StateChange}; use crate::prelude::*; use std::fmt::Debug; @@ -37,14 +37,14 @@ struct CompositeStateUpsert { #[derive(Derivative)] #[derivative(Debug)] -pub struct SetupStatus { +pub struct SetupChange { #[derivative(Debug = "ignore")] desc: D, keys_to_delete: IndexSet, states_to_upsert: Vec>, } -impl SetupStatus { +impl SetupChange { pub fn create( desc: D, desired: Option, @@ -109,7 +109,7 @@ impl SetupStatus { } } -impl ResourceSetupStatus for SetupStatus { +impl ResourceSetupChange for SetupChange { fn describe_changes(&self) -> Vec { let mut result = vec![]; @@ -149,7 +149,7 @@ impl ResourceSetupStatus for SetupStatus { } pub async fn apply_component_changes( - changes: Vec<&SetupStatus>, + changes: Vec<&SetupChange>, context: &D::Context, ) -> Result<()> { // First delete components that need to be removed @@ -173,7 +173,7 @@ pub async fn apply_component_changes( Ok(()) } -impl ResourceSetupStatus for (A, B) { +impl ResourceSetupChange for (A, B) { fn describe_changes(&self) -> Vec { let mut result = vec![]; result.extend(self.0.describe_changes()); diff --git a/src/setup/db_metadata.rs b/src/setup/db_metadata.rs index fd13d1676..bbe743cc6 100644 --- a/src/setup/db_metadata.rs +++ b/src/setup/db_metadata.rs @@ -1,6 +1,6 @@ use crate::prelude::*; -use super::{ResourceSetupInfo, ResourceSetupStatus, SetupChangeType, StateChange}; +use super::{ResourceSetupChange, ResourceSetupInfo, SetupChangeType, StateChange}; use crate::utils::db::WriteAction; use axum::http::StatusCode; use sqlx::PgPool; @@ -324,13 +324,13 @@ impl MetadataTableSetup { key: (), state: None, description: "CocoIndex Metadata Table".to_string(), - setup_status: Some(self), + setup_change: Some(self), legacy_key: None, } } } -impl ResourceSetupStatus for MetadataTableSetup { +impl ResourceSetupChange for MetadataTableSetup { fn describe_changes(&self) -> Vec { if self.metadata_table_missing { vec![setup::ChangeDescription::Action(format!( diff --git a/src/setup/driver.rs b/src/setup/driver.rs index 4ed9d3f3b..2b50f69cc 100644 --- a/src/setup/driver.rs +++ b/src/setup/driver.rs @@ -2,7 +2,7 @@ use crate::{ lib_context::{FlowContext, FlowExecutionContext, LibSetupContext}, ops::{ get_optional_executor_factory, - interface::{ExportTargetFactory, FlowInstanceContext}, + interface::{FlowInstanceContext, TargetFactory}, }, prelude::*, }; @@ -13,10 +13,10 @@ use std::{ str::FromStr, }; -use super::{AllSetupStates, GlobalSetupStatus}; +use super::{AllSetupStates, GlobalSetupChange}; use super::{ - CombinedState, DesiredMode, ExistingMode, FlowSetupState, FlowSetupStatus, ObjectSetupStatus, - ObjectStatus, ResourceIdentifier, ResourceSetupInfo, ResourceSetupStatus, SetupChangeType, + CombinedState, DesiredMode, ExistingMode, FlowSetupChange, FlowSetupState, ObjectSetupChange, + ObjectStatus, ResourceIdentifier, ResourceSetupChange, ResourceSetupInfo, SetupChangeType, StateChange, TargetSetupState, db_metadata, }; use crate::execution::db_tracking_setup; @@ -80,9 +80,7 @@ fn from_metadata_record( }) } -fn get_export_target_factory( - target_type: &str, -) -> Option> { +fn get_export_target_factory(target_type: &str) -> Option> { match get_optional_executor_factory(target_type) { Some(ExecutorFactory::ExportTarget(factory)) => Some(factory), _ => None, @@ -252,11 +250,11 @@ fn group_resource_states<'a>( Ok(grouped) } -pub async fn check_flow_setup_status( +pub async fn diff_flow_setup_states( desired_state: Option<&FlowSetupState>, existing_state: Option<&FlowSetupState>, flow_instance_ctx: &Arc, -) -> Result { +) -> Result { let metadata_change = diff_state( existing_state.map(|e| &e.metadata), desired_state.map(|d| &d.metadata), @@ -267,7 +265,7 @@ pub async fn check_flow_setup_status( .iter() .flat_map(|d| d.metadata.sources.values().map(|v| v.source_id)) .collect::>(); - let tracking_table_change = db_tracking_setup::TrackingTableSetupStatus::new( + let tracking_table_change = db_tracking_setup::TrackingTableSetupChange::new( desired_state.map(|d| &d.tracking_table), &existing_state .map(|e| Cow::Borrowed(&e.tracking_table)) @@ -326,12 +324,12 @@ pub async fn check_flow_setup_status( let never_setup_by_sys = target_state.is_none() && existing_without_setup_by_user.current.is_none() && existing_without_setup_by_user.staging.is_empty(); - let setup_status = if never_setup_by_sys { + let setup_change = if never_setup_by_sys { None } else { Some( factory - .check_setup_status( + .diff_setup_states( &resource_id.key, target_state, existing_without_setup_by_user, @@ -344,7 +342,7 @@ pub async fn check_flow_setup_status( key: resource_id.clone(), state, description: factory.describe_resource(&resource_id.key)?, - setup_status, + setup_change, legacy_key: v .existing .legacy_state_key @@ -354,7 +352,7 @@ pub async fn check_flow_setup_status( }), }); } - Ok(FlowSetupStatus { + Ok(FlowSetupChange { status: to_object_status(existing_state, desired_state), seen_flow_metadata_version: existing_state.and_then(|s| s.seen_flow_metadata_version), metadata_change, @@ -364,16 +362,16 @@ pub async fn check_flow_setup_status( }) } -struct ResourceSetupChangeItem<'a, K: 'a, C: ResourceSetupStatus> { +struct ResourceSetupChangeItem<'a, K: 'a, C: ResourceSetupChange> { key: &'a K, - setup_status: &'a C, + setup_change: &'a C, } async fn maybe_update_resource_setup< 'a, K: 'a, S: 'a, - C: ResourceSetupStatus, + C: ResourceSetupChange, ChangeApplierResultFut: Future>, >( resource_kind: &str, @@ -383,14 +381,14 @@ async fn maybe_update_resource_setup< ) -> Result<()> { let mut changes = Vec::new(); for resource in resources { - if let Some(setup_status) = &resource.setup_status { - if setup_status.change_type() != SetupChangeType::NoChange { + if let Some(setup_change) = &resource.setup_change { + if setup_change.change_type() != SetupChangeType::NoChange { changes.push(ResourceSetupChangeItem { key: &resource.key, - setup_status, + setup_change, }); writeln!(write, "{}:", resource.description)?; - for change in setup_status.describe_changes() { + for change in setup_change.describe_changes() { match change { setup::ChangeDescription::Action(action) => { writeln!(write, " - {action}")?; @@ -412,7 +410,7 @@ async fn maybe_update_resource_setup< async fn apply_changes_for_flow( write: &mut (dyn std::io::Write + Send), flow_ctx: &FlowContext, - flow_status: &FlowSetupStatus, + flow_status: &FlowSetupChange, existing_setup_state: &mut Option>, pool: &PgPool, ) -> Result<()> { @@ -441,7 +439,7 @@ async fn apply_changes_for_flow( } if let Some(tracking_table) = &flow_status.tracking_table { if tracking_table - .setup_status + .setup_change .as_ref() .map(|c| c.change_type() != SetupChangeType::NoChange) .unwrap_or_default() @@ -487,34 +485,34 @@ async fn apply_changes_for_flow( "tracking table", write, std::iter::once(tracking_table), - |setup_status| setup_status[0].setup_status.apply_change(), + |setup_change| setup_change[0].setup_change.apply_change(), ) .await?; } - let mut setup_status_by_target_kind = IndexMap::<&str, Vec<_>>::new(); + let mut setup_change_by_target_kind = IndexMap::<&str, Vec<_>>::new(); for target_resource in &flow_status.target_resources { - setup_status_by_target_kind + setup_change_by_target_kind .entry(target_resource.key.target_kind.as_str()) .or_default() .push(target_resource); } - for (target_kind, resources) in setup_status_by_target_kind.into_iter() { + for (target_kind, resources) in setup_change_by_target_kind.into_iter() { maybe_update_resource_setup( target_kind, write, resources.into_iter(), - |setup_status| async move { + |setup_change| async move { let factory = get_export_target_factory(target_kind).ok_or_else(|| { anyhow::anyhow!("No factory found for target kind: {}", target_kind) })?; factory .apply_setup_changes( - setup_status + setup_change .into_iter() .map(|s| interface::ResourceSetupChangeItem { key: &s.key.key, - setup_status: s.setup_status.as_ref(), + setup_change: s.setup_change.as_ref(), }) .collect(), flow_ctx.flow.flow_instance_ctx.clone(), @@ -553,7 +551,7 @@ async fn apply_changes_for_flow( let tracking_table = CombinedState::from_change( existing_tracking_table, flow_status.tracking_table.as_ref().map(|c| { - c.setup_status + c.setup_change .as_ref() .and_then(|c| c.desired_state.as_ref()) }), @@ -586,20 +584,20 @@ async fn apply_changes_for_flow( async fn apply_global_changes( write: &mut (dyn std::io::Write + Send), - setup_status: &GlobalSetupStatus, + setup_change: &GlobalSetupChange, all_setup_states: &mut AllSetupStates, ) -> Result<()> { maybe_update_resource_setup( "metadata table", write, - std::iter::once(&setup_status.metadata_table), - |setup_status| setup_status[0].setup_status.apply_change(), + std::iter::once(&setup_change.metadata_table), + |setup_change| setup_change[0].setup_change.apply_change(), ) .await?; - if setup_status + if setup_change .metadata_table - .setup_status + .setup_change .as_ref() .is_some_and(|c| c.change_type() == SetupChangeType::Create) { @@ -620,19 +618,19 @@ pub struct SetupChangeBundle { } impl SetupChangeBundle { - async fn get_flow_setup_status<'a>( + async fn get_flow_setup_change<'a>( setup_ctx: &LibSetupContext, flow_ctx: &'a FlowContext, flow_exec_ctx: &'a FlowExecutionContext, action: &FlowSetupChangeAction, - buffer: &'a mut Option, - ) -> Result<&'a FlowSetupStatus> { + buffer: &'a mut Option, + ) -> Result<&'a FlowSetupChange> { let result = match action { - FlowSetupChangeAction::Setup => &flow_exec_ctx.setup_status, + FlowSetupChangeAction::Setup => &flow_exec_ctx.setup_change, FlowSetupChangeAction::Drop => { let existing_state = setup_ctx.all_setup_states.flows.get(flow_ctx.flow_name()); buffer.insert( - check_flow_setup_status(None, existing_state, &flow_ctx.flow.flow_instance_ctx) + diff_flow_setup_states(None, existing_state, &flow_ctx.flow.flow_instance_ctx) .await?, ) } @@ -652,8 +650,8 @@ impl SetupChangeBundle { let setup_ctx = &*setup_ctx; if self.action == FlowSetupChangeAction::Setup { - is_up_to_date = is_up_to_date && setup_ctx.global_setup_status.is_up_to_date(); - write!(&mut text, "{}", setup_ctx.global_setup_status)?; + is_up_to_date = is_up_to_date && setup_ctx.global_setup_change.is_up_to_date(); + write!(&mut text, "{}", setup_ctx.global_setup_change)?; } for flow_name in &self.flow_names { @@ -666,21 +664,21 @@ impl SetupChangeBundle { }; let flow_exec_ctx = flow_ctx.get_execution_ctx_for_setup().read().await; - let mut setup_status_buffer = None; - let setup_status = Self::get_flow_setup_status( + let mut setup_change_buffer = None; + let setup_change = Self::get_flow_setup_change( setup_ctx, &flow_ctx, &flow_exec_ctx, &self.action, - &mut setup_status_buffer, + &mut setup_change_buffer, ) .await?; - is_up_to_date = is_up_to_date && setup_status.is_up_to_date(); + is_up_to_date = is_up_to_date && setup_change.is_up_to_date(); write!( &mut text, "{}", - setup::FormattedFlowSetupStatus(flow_name, setup_status) + setup::FormattedFlowSetupChange(flow_name, setup_change) )?; } Ok((text, is_up_to_date)) @@ -696,16 +694,16 @@ impl SetupChangeBundle { let setup_ctx = &mut *setup_ctx; if self.action == FlowSetupChangeAction::Setup - && !setup_ctx.global_setup_status.is_up_to_date() + && !setup_ctx.global_setup_change.is_up_to_date() { apply_global_changes( write, - &setup_ctx.global_setup_status, + &setup_ctx.global_setup_change, &mut setup_ctx.all_setup_states, ) .await?; - setup_ctx.global_setup_status = - GlobalSetupStatus::from_setup_states(&setup_ctx.all_setup_states); + setup_ctx.global_setup_change = + GlobalSetupChange::from_setup_states(&setup_ctx.all_setup_states); } for flow_name in &self.flow_names { @@ -718,16 +716,16 @@ impl SetupChangeBundle { }; let mut flow_exec_ctx = flow_ctx.get_execution_ctx_for_setup().write().await; - let mut setup_status_buffer = None; - let setup_status = Self::get_flow_setup_status( + let mut setup_change_buffer = None; + let setup_change = Self::get_flow_setup_change( setup_ctx, &flow_ctx, &flow_exec_ctx, &self.action, - &mut setup_status_buffer, + &mut setup_change_buffer, ) .await?; - if setup_status.is_up_to_date() { + if setup_change.is_up_to_date() { continue; } @@ -735,7 +733,7 @@ impl SetupChangeBundle { apply_changes_for_flow( write, &flow_ctx, - setup_status, + setup_change, &mut flow_states, &persistence_ctx.builtin_db_pool, ) diff --git a/src/setup/states.rs b/src/setup/states.rs index 4bb1c2a14..340f6aff2 100644 --- a/src/setup/states.rs +++ b/src/setup/states.rs @@ -22,7 +22,7 @@ use std::hash::Hash; use super::db_metadata; use crate::execution::db_tracking_setup::{ - self, TrackingTableSetupState, TrackingTableSetupStatus, + self, TrackingTableSetupChange, TrackingTableSetupState, }; const INDENT: &str = " "; @@ -251,13 +251,13 @@ pub enum ChangeDescription { Note(String), } -pub trait ResourceSetupStatus: Send + Sync + Debug + Any + 'static { +pub trait ResourceSetupChange: Send + Sync + Debug + Any + 'static { fn describe_changes(&self) -> Vec; fn change_type(&self) -> SetupChangeType; } -impl ResourceSetupStatus for Box { +impl ResourceSetupChange for Box { fn describe_changes(&self) -> Vec { self.as_ref().describe_changes() } @@ -267,7 +267,7 @@ impl ResourceSetupStatus for Box { } } -impl ResourceSetupStatus for std::convert::Infallible { +impl ResourceSetupChange for std::convert::Infallible { fn describe_changes(&self) -> Vec { unreachable!() } @@ -278,20 +278,20 @@ impl ResourceSetupStatus for std::convert::Infallible { } #[derive(Debug)] -pub struct ResourceSetupInfo { +pub struct ResourceSetupInfo { pub key: K, pub state: Option, pub description: String, /// If `None`, the resource is managed by users. - pub setup_status: Option, + pub setup_change: Option, pub legacy_key: Option, } -impl std::fmt::Display for ResourceSetupInfo { +impl std::fmt::Display for ResourceSetupInfo { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let status_code = match self.setup_status.as_ref().map(|c| c.change_type()) { + let status_code = match self.setup_change.as_ref().map(|c| c.change_type()) { Some(SetupChangeType::NoChange) => "READY", Some(SetupChangeType::Create) => "TO CREATE", Some(SetupChangeType::Update) => "TO UPDATE", @@ -303,8 +303,8 @@ impl std::fmt::Display for ResourceSetupInfo std::fmt::Display for ResourceSetupInfo ResourceSetupInfo { +impl ResourceSetupInfo { pub fn is_up_to_date(&self) -> bool { - self.setup_status + self.setup_change .as_ref() .is_none_or(|c| c.change_type() == SetupChangeType::NoChange) } @@ -351,27 +351,27 @@ pub enum ObjectStatus { Deleted, } -pub trait ObjectSetupStatus { +pub trait ObjectSetupChange { fn status(&self) -> Option; fn is_up_to_date(&self) -> bool; } #[derive(Debug)] -pub struct FlowSetupStatus { +pub struct FlowSetupChange { pub status: Option, pub seen_flow_metadata_version: Option, pub metadata_change: Option>, pub tracking_table: - Option>, + Option>, pub target_resources: - Vec>>, + Vec>>, pub unknown_resources: Vec, } -impl ObjectSetupStatus for FlowSetupStatus { +impl ObjectSetupChange for FlowSetupChange { fn status(&self) -> Option { self.status } @@ -390,11 +390,11 @@ impl ObjectSetupStatus for FlowSetupStatus { } #[derive(Debug)] -pub struct GlobalSetupStatus { +pub struct GlobalSetupChange { pub metadata_table: ResourceSetupInfo<(), (), db_metadata::MetadataTableSetup>, } -impl GlobalSetupStatus { +impl GlobalSetupChange { pub fn from_setup_states(setup_states: &AllSetupStates) -> Self { Self { metadata_table: db_metadata::MetadataTableSetup { @@ -409,8 +409,8 @@ impl GlobalSetupStatus { } } -pub struct ObjectSetupStatusCode<'a, Status: ObjectSetupStatus>(&'a Status); -impl std::fmt::Display for ObjectSetupStatusCode<'_, Status> { +pub struct ObjectSetupChangeCode<'a, Status: ObjectSetupChange>(&'a Status); +impl std::fmt::Display for ObjectSetupChangeCode<'_, Status> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let Some(status) = self.0.status() else { return Ok(()); @@ -433,38 +433,38 @@ impl std::fmt::Display for ObjectSetupStatusCode<'_, } } -impl std::fmt::Display for GlobalSetupStatus { +impl std::fmt::Display for GlobalSetupChange { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { writeln!(f, "{}", self.metadata_table) } } -pub struct FormattedFlowSetupStatus<'a>(pub &'a str, pub &'a FlowSetupStatus); +pub struct FormattedFlowSetupChange<'a>(pub &'a str, pub &'a FlowSetupChange); -impl std::fmt::Display for FormattedFlowSetupStatus<'_> { +impl std::fmt::Display for FormattedFlowSetupChange<'_> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let flow_ssc = self.1; - if flow_ssc.status.is_none() { + let flow_setup_change = self.1; + if flow_setup_change.status.is_none() { return Ok(()); } writeln!( f, "{} Flow: {}", - ObjectSetupStatusCode(flow_ssc) + ObjectSetupChangeCode(flow_setup_change) .to_string() .color(AnsiColors::Cyan), self.0 )?; let mut f = indented(f).with_str(INDENT); - if let Some(tracking_table) = &flow_ssc.tracking_table { + if let Some(tracking_table) = &flow_setup_change.tracking_table { write!(f, "{tracking_table}")?; } - for target_resource in &flow_ssc.target_resources { + for target_resource in &flow_setup_change.target_resources { write!(f, "{target_resource}")?; } - for resource in &flow_ssc.unknown_resources { + for resource in &flow_setup_change.unknown_resources { writeln!(f, "[ UNKNOWN ] {resource}")?; }