From 2d701c672f049eb66f8845130d79fb5cf8a10e88 Mon Sep 17 00:00:00 2001 From: Jiangzhou He Date: Wed, 13 Aug 2025 22:20:54 -0700 Subject: [PATCH] feat(source-state-table): create source state table for new flows --- src/builder/exec_ctx.rs | 93 ++++++++++++++-------- src/execution/db_tracking_setup.rs | 120 +++++++++++++++++++++++++---- src/setup/flow_features.rs | 7 ++ src/setup/mod.rs | 1 + src/setup/states.rs | 2 + 5 files changed, 175 insertions(+), 48 deletions(-) create mode 100644 src/setup/flow_features.rs diff --git a/src/builder/exec_ctx.rs b/src/builder/exec_ctx.rs index 41065760e..09f7c95e7 100644 --- a/src/builder/exec_ctx.rs +++ b/src/builder/exec_ctx.rs @@ -76,7 +76,8 @@ fn build_import_op_exec_ctx( fn build_target_id( analyzed_target_ss: &AnalyzedTargetSetupState, existing_target_states: &HashMap<&setup::ResourceIdentifier, Vec<&setup::TargetSetupState>>, - flow_setup_state: &mut setup::FlowSetupState, + metadata: &mut setup::FlowSetupMetadata, + target_states: &mut IndexMap, ) -> Result { let interface::ExecutorFactory::ExportTarget(target_factory) = get_executor_factory(&analyzed_target_ss.target_kind)? @@ -125,8 +126,8 @@ fn build_target_id( None }; let target_id = target_id.unwrap_or_else(|| { - flow_setup_state.metadata.last_target_id += 1; - flow_setup_state.metadata.last_target_id + metadata.last_target_id += 1; + metadata.last_target_id }); let max_schema_version_id = existing_target_states .iter() @@ -143,7 +144,7 @@ fn build_target_id( } else { max_schema_version_id + 1 }; - match flow_setup_state.targets.entry(resource_id) { + match target_states.entry(resource_id) { indexmap::map::Entry::Occupied(entry) => { api_bail!( "Target resource already exists: kind = {}, key = {}", @@ -199,34 +200,27 @@ pub fn build_flow_setup_execution_context( } } - let mut setup_state = setup::FlowSetupState:: { - seen_flow_metadata_version: existing_flow_ss - .and_then(|flow_ss| flow_ss.seen_flow_metadata_version), - metadata: setup::FlowSetupMetadata { - last_source_id: existing_metadata_versions() - .map(|metadata| metadata.last_source_id) - .max() - .unwrap_or(0), - last_target_id: existing_metadata_versions() - .map(|metadata| metadata.last_target_id) - .max() - .unwrap_or(0), - sources: BTreeMap::new(), - }, - tracking_table: db_tracking_setup::TrackingTableSetupState { - table_name: existing_flow_ss - .and_then(|flow_ss| { - flow_ss - .tracking_table - .current - .as_ref() - .map(|v| v.table_name.clone()) - }) - .unwrap_or_else(|| db_tracking_setup::default_tracking_table_name(&flow_inst.name)), - version_id: db_tracking_setup::CURRENT_TRACKING_TABLE_VERSION, - }, - targets: IndexMap::new(), + let mut metadata = setup::FlowSetupMetadata { + last_source_id: existing_metadata_versions() + .map(|metadata| metadata.last_source_id) + .max() + .unwrap_or(0), + last_target_id: existing_metadata_versions() + .map(|metadata| metadata.last_target_id) + .max() + .unwrap_or(0), + sources: BTreeMap::new(), + features: existing_flow_ss + .map(|m| { + m.metadata + .possible_versions() + .flat_map(|v| v.features.iter()) + .cloned() + .collect::>() + }) + .unwrap_or_else(setup::flow_features::default_features), }; + let mut target_states = IndexMap::new(); let import_op_exec_ctx = flow_inst .import_ops @@ -241,7 +235,7 @@ pub fn build_flow_setup_execution_context( &import_op.name, output_type, source_states_by_name.get(&import_op.name.as_str()), - &mut setup_state.metadata, + &mut metadata, ) }) .collect::>>()?; @@ -253,7 +247,8 @@ pub fn build_flow_setup_execution_context( let target_id = build_target_id( analyzed_target_ss, &target_states_by_name_type, - &mut setup_state, + &mut metadata, + &mut target_states, )?; Ok(ExportOpExecutionContext { target_id }) }) @@ -263,10 +258,40 @@ pub fn build_flow_setup_execution_context( build_target_id( analyzed_target_ss, &target_states_by_name_type, - &mut setup_state, + &mut metadata, + &mut target_states, )?; } + let setup_state = setup::FlowSetupState:: { + seen_flow_metadata_version: existing_flow_ss + .and_then(|flow_ss| flow_ss.seen_flow_metadata_version), + tracking_table: db_tracking_setup::TrackingTableSetupState { + table_name: existing_flow_ss + .and_then(|flow_ss| { + flow_ss + .tracking_table + .current + .as_ref() + .map(|v| v.table_name.clone()) + }) + .unwrap_or_else(|| db_tracking_setup::default_tracking_table_name(&flow_inst.name)), + version_id: db_tracking_setup::CURRENT_TRACKING_TABLE_VERSION, + source_state_table_name: metadata + .features + .contains(setup::flow_features::SOURCE_STATE_TABLE) + .then(|| { + existing_flow_ss + .and_then(|flow_ss| flow_ss.tracking_table.current.as_ref()) + .and_then(|v| v.source_state_table_name.clone()) + .unwrap_or_else(|| { + db_tracking_setup::default_source_state_table_name(&flow_inst.name) + }) + }), + }, + targets: target_states, + metadata, + }; Ok(FlowSetupExecutionContext { setup_state, import_ops: import_op_exec_ctx, diff --git a/src/execution/db_tracking_setup.rs b/src/execution/db_tracking_setup.rs index be1100f65..b55646768 100644 --- a/src/execution/db_tracking_setup.rs +++ b/src/execution/db_tracking_setup.rs @@ -11,6 +11,13 @@ pub fn default_tracking_table_name(flow_name: &str) -> String { ) } +pub fn default_source_state_table_name(flow_name: &str) -> String { + format!( + "{}__cocoindex_srcstate", + utils::db::sanitize_identifier(flow_name) + ) +} + pub const CURRENT_TRACKING_TABLE_VERSION: i32 = 1; async fn upgrade_tracking_table( @@ -46,19 +53,38 @@ async fn upgrade_tracking_table( Ok(()) } +async fn create_source_state_table(pool: &PgPool, table_name: &str) -> Result<()> { + let query = format!( + "CREATE TABLE IF NOT EXISTS {table_name} ( + source_id INTEGER NOT NULL, + key JSONB NOT NULL, + value JSONB NOT NULL, + + PRIMARY KEY (source_id, key) + )" + ); + sqlx::query(&query).execute(pool).await?; + Ok(()) +} + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub struct TrackingTableSetupState { pub table_name: String, pub version_id: i32, + #[serde(default)] + pub source_state_table_name: Option, } #[derive(Debug)] pub struct TrackingTableSetupStatus { pub desired_state: Option, - pub legacy_table_names: Vec, - pub min_existing_version_id: Option, + pub legacy_tracking_table_names: BTreeSet, + + pub source_state_table_always_exists: bool, + pub legacy_source_state_table_names: BTreeSet, + pub source_ids_to_delete: Vec, } @@ -68,11 +94,16 @@ impl TrackingTableSetupStatus { existing: &CombinedState, source_ids_to_delete: Vec, ) -> Option { - let legacy_table_names = existing + let legacy_tracking_table_names = existing .legacy_values(desired, |v| &v.table_name) .into_iter() .cloned() - .collect(); + .collect::>(); + let legacy_source_state_table_names = existing + .legacy_values(desired, |v| &v.source_state_table_name) + .into_iter() + .filter_map(|v| v.clone()) + .collect::>(); let min_existing_version_id = existing .always_exists() .then(|| existing.possible_versions().map(|v| v.version_id).min()) @@ -80,7 +111,12 @@ impl TrackingTableSetupStatus { if desired.is_some() || min_existing_version_id.is_some() { Some(Self { desired_state: desired.cloned(), - legacy_table_names, + legacy_tracking_table_names, + source_state_table_always_exists: existing.always_exists() + && existing + .possible_versions() + .all(|v| v.source_state_table_name.is_some()), + legacy_source_state_table_names, min_existing_version_id, source_ids_to_delete, }) @@ -95,7 +131,7 @@ impl TrackingTableSetupStatus { ResourceSetupInfo { key: (), state: self.desired_state.clone(), - description: "Tracking Table".to_string(), + description: "Internal Storage for Tracking".to_string(), setup_status: Some(self), legacy_key: None, } @@ -105,10 +141,10 @@ impl TrackingTableSetupStatus { impl ResourceSetupStatus for TrackingTableSetupStatus { fn describe_changes(&self) -> Vec { let mut changes: Vec = vec![]; - if self.desired_state.is_some() && !self.legacy_table_names.is_empty() { + if self.desired_state.is_some() && !self.legacy_tracking_table_names.is_empty() { changes.push(setup::ChangeDescription::Action(format!( "Rename legacy tracking tables: {}. ", - self.legacy_table_names.join(", ") + self.legacy_tracking_table_names.iter().join(", ") ))); } match (self.min_existing_version_id, &self.desired_state) { @@ -127,10 +163,37 @@ impl ResourceSetupStatus for TrackingTableSetupStatus { } (Some(_), None) => changes.push(setup::ChangeDescription::Action(format!( "Drop existing tracking table: {}. ", - self.legacy_table_names.join(", ") + self.legacy_tracking_table_names.iter().join(", ") ))), (None, None) => (), } + + let source_state_table_name = self + .desired_state + .as_ref() + .and_then(|v| v.source_state_table_name.as_ref()); + if let Some(source_state_table_name) = source_state_table_name { + if !self.legacy_source_state_table_names.is_empty() { + changes.push(setup::ChangeDescription::Action(format!( + "Rename legacy source state tables: {}. ", + self.legacy_source_state_table_names.iter().join(", ") + ))); + } + if !self.source_state_table_always_exists { + changes.push(setup::ChangeDescription::Action(format!( + "Create the source state table: {}. ", + source_state_table_name + ))); + } + } else if !self.source_state_table_always_exists + && !self.legacy_source_state_table_names.is_empty() + { + changes.push(setup::ChangeDescription::Action(format!( + "Drop existing source state table: {}. ", + self.legacy_source_state_table_names.iter().join(", ") + ))); + } + if !self.source_ids_to_delete.is_empty() { changes.push(setup::ChangeDescription::Action(format!( "Delete source IDs: {}. ", @@ -145,12 +208,21 @@ impl ResourceSetupStatus for TrackingTableSetupStatus { } fn change_type(&self) -> SetupChangeType { + let source_state_table_up_to_date = self.legacy_source_state_table_names.is_empty() + && (self.source_state_table_always_exists + || self + .desired_state + .as_ref() + .map_or(true, |v| v.source_state_table_name.is_none())); match (self.min_existing_version_id, &self.desired_state) { (None, Some(_)) => SetupChangeType::Create, (Some(min_version_id), Some(desired)) => { - if min_version_id == desired.version_id && self.legacy_table_names.is_empty() { + if min_version_id == desired.version_id + && self.legacy_tracking_table_names.is_empty() + && source_state_table_up_to_date + { SetupChangeType::NoChange - } else if min_version_id < desired.version_id { + } else if min_version_id < desired.version_id || !source_state_table_up_to_date { SetupChangeType::Update } else { SetupChangeType::Invalid @@ -167,7 +239,7 @@ impl TrackingTableSetupStatus { let lib_context = get_lib_context()?; let pool = lib_context.require_builtin_db_pool()?; if let Some(desired) = &self.desired_state { - for lagacy_name in self.legacy_table_names.iter() { + for lagacy_name in self.legacy_tracking_table_names.iter() { let query = format!( "ALTER TABLE IF EXISTS {} RENAME TO {}", lagacy_name, desired.table_name @@ -185,11 +257,31 @@ impl TrackingTableSetupStatus { .await?; } } else { - for lagacy_name in self.legacy_table_names.iter() { + for lagacy_name in self.legacy_tracking_table_names.iter() { + let query = format!("DROP TABLE IF EXISTS {lagacy_name}"); + sqlx::query(&query).execute(pool).await?; + } + } + + let source_state_table_name = self + .desired_state + .as_ref() + .and_then(|v| v.source_state_table_name.as_ref()); + if let Some(source_state_table_name) = source_state_table_name { + for lagacy_name in self.legacy_source_state_table_names.iter() { + let query = format!( + "ALTER TABLE IF EXISTS {lagacy_name} RENAME TO {source_state_table_name}" + ); + sqlx::query(&query).execute(pool).await?; + } + if !self.source_state_table_always_exists { + create_source_state_table(pool, source_state_table_name).await?; + } + } else { + for lagacy_name in self.legacy_source_state_table_names.iter() { let query = format!("DROP TABLE IF EXISTS {lagacy_name}"); sqlx::query(&query).execute(pool).await?; } - return Ok(()); } Ok(()) } diff --git a/src/setup/flow_features.rs b/src/setup/flow_features.rs new file mode 100644 index 000000000..80e036621 --- /dev/null +++ b/src/setup/flow_features.rs @@ -0,0 +1,7 @@ +use crate::prelude::*; + +pub const SOURCE_STATE_TABLE: &str = "source_state_table"; + +pub fn default_features() -> BTreeSet { + vec![SOURCE_STATE_TABLE.to_string()].into_iter().collect() +} diff --git a/src/setup/mod.rs b/src/setup/mod.rs index 6acf7a3c3..099541843 100644 --- a/src/setup/mod.rs +++ b/src/setup/mod.rs @@ -4,6 +4,7 @@ mod driver; mod states; pub mod components; +pub mod flow_features; pub use auth_registry::AuthRegistry; pub use driver::*; diff --git a/src/setup/states.rs b/src/setup/states.rs index 98b118675..4bb1c2a14 100644 --- a/src/setup/states.rs +++ b/src/setup/states.rs @@ -190,6 +190,8 @@ pub struct FlowSetupMetadata { pub last_source_id: i32, pub last_target_id: i32, pub sources: BTreeMap, + #[serde(default)] + pub features: BTreeSet, } #[derive(Debug, Clone)]