diff --git a/src/builder/exec_ctx.rs b/src/builder/exec_ctx.rs index 47af1ae6b..c47a058fb 100644 --- a/src/builder/exec_ctx.rs +++ b/src/builder/exec_ctx.rs @@ -31,7 +31,7 @@ pub struct AnalyzedSetupState { } fn build_import_op_exec_ctx( - import_field_name: &spec::FieldName, + import_op: &spec::NamedSpec, import_op_output_type: &schema::EnrichedValueType, existing_source_states: Option<&Vec<&setup::SourceSetupState>>, metadata: &mut setup::FlowSetupMetadata, @@ -64,10 +64,11 @@ fn build_import_op_exec_ctx( metadata.last_source_id }; metadata.sources.insert( - import_field_name.clone(), + import_op.name.clone(), setup::SourceSetupState { source_id, key_schema: key_schema_no_attrs, + source_kind: import_op.spec.source.kind.clone(), }, ); Ok(ImportOpExecutionContext { source_id }) @@ -232,7 +233,7 @@ pub fn build_flow_setup_execution_context( .get(&import_op.name) .ok_or_else(invariance_violation)?; build_import_op_exec_ctx( - &import_op.name, + &import_op, output_type, source_states_by_name.get(&import_op.name.as_str()), &mut metadata, diff --git a/src/execution/db_tracking_setup.rs b/src/execution/db_tracking_setup.rs index b9a64514c..547bb5f87 100644 --- a/src/execution/db_tracking_setup.rs +++ b/src/execution/db_tracking_setup.rs @@ -72,6 +72,16 @@ async fn create_source_state_table(pool: &PgPool, table_name: &str) -> Result<() Ok(()) } +async fn delete_source_states_for_sources( + pool: &PgPool, + table_name: &str, + source_ids: &Vec, +) -> Result<()> { + let query = format!("DELETE FROM {} WHERE source_id = ANY($1)", table_name,); + sqlx::query(&query).bind(source_ids).execute(pool).await?; + Ok(()) +} + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub struct TrackingTableSetupState { pub table_name: String, @@ -92,14 +102,14 @@ pub struct TrackingTableSetupChange { pub source_state_table_always_exists: bool, pub legacy_source_state_table_names: BTreeSet, - pub source_ids_to_delete: Vec, + pub source_names_need_state_cleanup: BTreeMap>, } impl TrackingTableSetupChange { pub fn new( desired: Option<&TrackingTableSetupState>, existing: &CombinedState, - source_ids_to_delete: Vec, + source_names_need_state_cleanup: BTreeMap>, ) -> Option { let legacy_tracking_table_names = existing .legacy_values(desired, |v| &v.table_name) @@ -125,7 +135,7 @@ impl TrackingTableSetupChange { .all(|v| v.source_state_table_name.is_some()), legacy_source_state_table_names, min_existing_version_id, - source_ids_to_delete, + source_names_need_state_cleanup, }) } else { None @@ -201,13 +211,13 @@ impl ResourceSetupChange for TrackingTableSetupChange { ))); } - if !self.source_ids_to_delete.is_empty() { + if !self.source_names_need_state_cleanup.is_empty() { changes.push(setup::ChangeDescription::Action(format!( - "Delete source IDs: {}. ", - self.source_ids_to_delete - .iter() - .map(|id| id.to_string()) - .collect::>() + "Clean up legacy source states: {}. ", + self.source_names_need_state_cleanup + .values() + .flatten() + .dedup() .join(", ") ))); } @@ -215,15 +225,14 @@ impl ResourceSetupChange for TrackingTableSetupChange { } fn change_type(&self) -> SetupChangeType { - let source_state_table_up_to_date = self.legacy_source_state_table_names.is_empty() - && (self.source_state_table_always_exists - || self - .desired_state - .as_ref() - .map_or(true, |v| v.source_state_table_name.is_none())); match (self.min_existing_version_id, &self.desired_state) { (None, Some(_)) => SetupChangeType::Create, (Some(min_version_id), Some(desired)) => { + let source_state_table_up_to_date = self.legacy_source_state_table_names.is_empty() + && self.source_names_need_state_cleanup.is_empty() + && (self.source_state_table_always_exists + || desired.source_state_table_name.is_none()); + if min_version_id == desired.version_id && self.legacy_tracking_table_names.is_empty() && source_state_table_up_to_date @@ -279,6 +288,18 @@ impl TrackingTableSetupChange { if !self.source_state_table_always_exists { create_source_state_table(pool, source_state_table_name).await?; } + if !self.source_names_need_state_cleanup.is_empty() { + delete_source_states_for_sources( + pool, + source_state_table_name, + &self + .source_names_need_state_cleanup + .keys() + .map(|v| *v) + .collect::>(), + ) + .await?; + } } else { for lagacy_name in self.legacy_source_state_table_names.iter() { let query = format!("DROP TABLE IF EXISTS {lagacy_name}"); diff --git a/src/py/mod.rs b/src/py/mod.rs index 538f1c185..818619ed6 100644 --- a/src/py/mod.rs +++ b/src/py/mod.rs @@ -13,7 +13,6 @@ use pyo3::IntoPyObjectExt; use pyo3::{exceptions::PyException, prelude::*}; use pyo3_async_runtimes::tokio::future_into_py; use std::fmt::Write; -use std::io::Cursor; use std::sync::Arc; mod convert; diff --git a/src/setup/driver.rs b/src/setup/driver.rs index 5fa0a68ad..2a8723fb4 100644 --- a/src/setup/driver.rs +++ b/src/setup/driver.rs @@ -261,27 +261,58 @@ pub async fn diff_flow_setup_states( |_, desired_state| Some(StateChange::Upsert(desired_state.clone())), ); - let new_source_ids = desired_state - .iter() - .flat_map(|d| d.metadata.sources.values().map(|v| v.source_id)) - .collect::>(); + // If the source kind has changed, we need to clean the source states. + let source_names_needs_states_cleanup: BTreeMap> = + if let Some(desired_state) = desired_state + && let Some(existing_state) = existing_state + { + let new_source_id_to_kind = desired_state + .metadata + .sources + .values() + .map(|v| (v.source_id, &v.source_kind)) + .collect::>(); + + let mut existing_source_id_to_name_kind = + BTreeMap::>::new(); + for (name, setup_state) in existing_state + .metadata + .possible_versions() + .flat_map(|v| v.sources.iter()) + { + existing_source_id_to_name_kind + .entry(setup_state.source_id) + .or_default() + .push((&name, &setup_state.source_kind)); + } + + (existing_source_id_to_name_kind.into_iter()) + .map(|(id, name_kinds)| { + let new_kind = new_source_id_to_kind.get(&id).map(|v| *v); + let source_names_for_legacy_states = name_kinds + .into_iter() + .filter_map(|(name, kind)| { + if Some(kind) != new_kind { + Some(name.clone()) + } else { + None + } + }) + .collect::>(); + (id, source_names_for_legacy_states) + }) + .filter(|(_, v)| !v.is_empty()) + .collect::>() + } else { + BTreeMap::new() + }; + let tracking_table_change = db_tracking_setup::TrackingTableSetupChange::new( desired_state.map(|d| &d.tracking_table), &existing_state .map(|e| Cow::Borrowed(&e.tracking_table)) .unwrap_or_default(), - (existing_state.iter()) - .flat_map(|state| state.metadata.possible_versions()) - .flat_map(|metadata| { - metadata - .sources - .values() - .map(|v| v.source_id) - .filter(|id| !new_source_ids.contains(id)) - }) - .collect::>() - .into_iter() - .collect(), + source_names_needs_states_cleanup, ); let mut target_resources = Vec::new(); diff --git a/src/setup/states.rs b/src/setup/states.rs index 579159d8d..e49c41402 100644 --- a/src/setup/states.rs +++ b/src/setup/states.rs @@ -148,6 +148,10 @@ impl StateChange { pub struct SourceSetupState { pub source_id: i32, pub key_schema: schema::ValueType, + + // Allow empty string during deserialization for backward compatibility. + #[serde(default)] + pub source_kind: String, } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]