diff --git a/src/execution/db_tracking_setup.rs b/src/execution/db_tracking_setup.rs index 89fdc8737..403999220 100644 --- a/src/execution/db_tracking_setup.rs +++ b/src/execution/db_tracking_setup.rs @@ -97,6 +97,7 @@ impl TrackingTableSetupStatusCheck { state: self.desired_state.clone(), description: "Tracking Table".to_string(), status_check: Some(self), + legacy_key: None, } } } diff --git a/src/ops/factory_bases.rs b/src/ops/factory_bases.rs index cf2aa345b..b255f00f8 100644 --- a/src/ops/factory_bases.rs +++ b/src/ops/factory_bases.rs @@ -420,7 +420,7 @@ impl ExportTargetFactory for T { StorageFactoryBase::describe_resource(self, &key) } - fn normalize_setup_key(&self, key: serde_json::Value) -> Result { + fn normalize_setup_key(&self, key: &serde_json::Value) -> Result { let key: T::Key = serde_json::from_value(key.clone())?; Ok(serde_json::to_value(key)?) } @@ -478,5 +478,6 @@ fn from_json_combined_state( }) }) .collect::>()?, + legacy_state_key: existing_states.legacy_state_key, }) } diff --git a/src/ops/interface.rs b/src/ops/interface.rs index 3598a3a70..68e1f2bf0 100644 --- a/src/ops/interface.rs +++ b/src/ops/interface.rs @@ -200,7 +200,7 @@ pub trait ExportTargetFactory: Send + Sync { /// Normalize the key. e.g. the JSON format may change (after code change, e.g. new optional field or field ordering), even if the underlying value is not changed. /// This should always return the canonical serialized form. - fn normalize_setup_key(&self, key: serde_json::Value) -> Result; + fn normalize_setup_key(&self, key: &serde_json::Value) -> Result; fn check_state_compatibility( &self, diff --git a/src/setup/components.rs b/src/setup/components.rs index 8d07ce429..5cd817cf3 100644 --- a/src/setup/components.rs +++ b/src/setup/components.rs @@ -65,6 +65,7 @@ impl StatusCheck { } }) .collect(), + legacy_state_key: existing.legacy_state_key, }; let mut keys_to_delete = IndexSet::new(); let mut states_to_upsert = vec![]; diff --git a/src/setup/db_metadata.rs b/src/setup/db_metadata.rs index 6e64949f6..5e40ba9c0 100644 --- a/src/setup/db_metadata.rs +++ b/src/setup/db_metadata.rs @@ -179,10 +179,30 @@ async fn delete_state( Ok(()) } +pub struct StateUpdateInfo { + pub desired_state: Option, + pub legacy_key: Option, +} + +impl StateUpdateInfo { + pub fn new( + desired_state: Option<&impl Serialize>, + legacy_key: Option, + ) -> Result { + Ok(Self { + desired_state: desired_state + .as_ref() + .map(serde_json::to_value) + .transpose()?, + legacy_key, + }) + } +} + pub async fn stage_changes_for_flow( flow_name: &str, seen_metadata_version: Option, - state_updates: &HashMap>, + resource_update_info: &HashMap, pool: &PgPool, ) -> Result { let mut txn = pool.begin().await?; @@ -210,39 +230,43 @@ pub async fn stage_changes_for_flow( ) .await?; - for (type_id, desired_state) in state_updates { + for (type_id, update_info) in resource_update_info { let existing = existing_records.remove(type_id); - let change = match desired_state { + let change = match &update_info.desired_state { Some(desired_state) => StateChange::Upsert(desired_state.clone()), None => StateChange::Delete, }; - match existing { + let mut new_staging_changes = vec![]; + if let Some(legacy_key) = &update_info.legacy_key { + if let Some(legacy_record) = existing_records.remove(&legacy_key) { + new_staging_changes.extend(legacy_record.staging_changes.0); + delete_state(flow_name, legacy_key, &mut *txn).await?; + } + } + let (action, existing_staging_changes) = match existing { Some(existing) => { - if existing.staging_changes.0.iter().all(|c| c != &change) { - let mut staging_changes = existing.staging_changes.0; - staging_changes.push(change); - upsert_staging_changes( - flow_name, - type_id, - staging_changes, - &mut *txn, - WriteAction::Update, - ) - .await?; + let existing_staging_changes = existing.staging_changes.0; + if existing_staging_changes.iter().all(|c| c != &change) { + new_staging_changes.push(change); } + (WriteAction::Update, existing_staging_changes) } None => { - if desired_state.is_some() { - upsert_staging_changes( - flow_name, - type_id, - vec![change], - &mut *txn, - WriteAction::Insert, - ) - .await?; + if update_info.desired_state.is_some() { + new_staging_changes.push(change); } + (WriteAction::Insert, vec![]) } + }; + if !new_staging_changes.is_empty() { + upsert_staging_changes( + flow_name, + type_id, + [existing_staging_changes, new_staging_changes].concat(), + &mut *txn, + action, + ) + .await?; } } txn.commit().await?; @@ -252,7 +276,7 @@ pub async fn stage_changes_for_flow( pub async fn commit_changes_for_flow( flow_name: &str, curr_metadata_version: u64, - state_updates: HashMap>, + state_updates: HashMap, delete_version: bool, pool: &PgPool, ) -> Result<()> { @@ -265,8 +289,8 @@ pub async fn commit_changes_for_flow( StatusCode::CONFLICT, ))?; } - for (type_id, desired_state) in state_updates { - match desired_state { + for (type_id, update_info) in state_updates { + match update_info.desired_state { Some(desired_state) => { upsert_state( flow_name, @@ -301,6 +325,7 @@ impl MetadataTableSetup { state: None, description: "CocoIndex Metadata Table".to_string(), status_check: Some(self), + legacy_key: None, } } } diff --git a/src/setup/driver.rs b/src/setup/driver.rs index f5e2b52ed..c5735f62a 100644 --- a/src/setup/driver.rs +++ b/src/setup/driver.rs @@ -57,6 +57,7 @@ impl std::str::FromStr for MetadataRecordType { fn from_metadata_record( state: Option, staging_changes: sqlx::types::Json>>, + legacy_state_key: Option, ) -> Result> { let current: Option = state.map(serde_json::from_value).transpose()?; let staging: Vec> = (staging_changes.0.into_iter()) @@ -67,7 +68,11 @@ fn from_metadata_record( }) }) .collect::>()?; - Ok(CombinedState { current, staging }) + Ok(CombinedState { + current, + staging, + legacy_state_key, + }) } pub async fn get_existing_setup_state(pool: &PgPool) -> Result> { @@ -103,27 +108,33 @@ pub async fn get_existing_setup_state(pool: &PgPool) -> Result { - flow_ss.metadata = from_metadata_record(state, staging_changes)?; + flow_ss.metadata = from_metadata_record(state, staging_changes, None)?; } MetadataRecordType::TrackingTable => { - flow_ss.tracking_table = from_metadata_record(state, staging_changes)?; + flow_ss.tracking_table = + from_metadata_record(state, staging_changes, None)?; } MetadataRecordType::Target(target_type) => { let normalized_key = { let registry = executor_factory_registry(); match registry.get(&target_type) { Some(ExecutorFactory::ExportTarget(factory)) => { - factory.normalize_setup_key(metadata_record.key)? + factory.normalize_setup_key(&metadata_record.key)? } _ => metadata_record.key.clone(), } }; + let combined_state = from_metadata_record( + state, + staging_changes, + (normalized_key != metadata_record.key).then_some(metadata_record.key), + )?; flow_ss.targets.insert( super::ResourceIdentifier { key: normalized_key, target_kind: target_type, }, - from_metadata_record(state, staging_changes)?, + combined_state, ); } } @@ -203,6 +214,20 @@ fn group_resource_states<'a>( if let Some(current) = &state.current { entry.existing.current = Some(current.clone()); } + if let Some(legacy_state_key) = &state.legacy_state_key { + if !entry + .existing + .legacy_state_key + .as_ref() + .map_or(false, |v| v == legacy_state_key) + { + warn!( + "inconsistent legacy key: {:?}, {:?}", + key, entry.existing.legacy_state_key + ); + } + entry.existing.legacy_state_key = Some(legacy_state_key.clone()); + } for s in state.staging.iter() { match s { StateChange::Upsert(v) => { @@ -288,6 +313,7 @@ pub async fn check_flow_setup_status( StateChange::Delete => Some(StateChange::Delete), }) .collect(), + legacy_state_key: v.existing.legacy_state_key.clone(), }; let never_setup_by_sys = target_state.is_none() && existing_without_setup_by_user.current.is_none() @@ -311,6 +337,13 @@ pub async fn check_flow_setup_status( state, description: factory.describe_resource(&resource_id.key)?, status_check, + legacy_key: v + .existing + .legacy_state_key + .map(|legacy_state_key| ResourceIdentifier { + target_kind: resource_id.target_kind.clone(), + key: legacy_state_key, + }), }); } Ok(FlowSetupStatusCheck { @@ -406,19 +439,16 @@ pub async fn apply_changes( }; write!(write, "\n{verb} flow {flow_name}:\n")?; - let mut state_updates = - HashMap::>::new(); + let mut update_info = + HashMap::::new(); if let Some(metadata_change) = &flow_status.metadata_change { - state_updates.insert( + update_info.insert( db_metadata::ResourceTypeKey::new( MetadataRecordType::FlowMetadata.to_string(), serde_json::Value::Null, ), - metadata_change - .desired_state() - .map(serde_json::to_value) - .transpose()?, + db_metadata::StateUpdateInfo::new(metadata_change.desired_state(), None)?, ); } if let Some(tracking_table) = &flow_status.tracking_table { @@ -428,37 +458,37 @@ pub async fn apply_changes( .map(|c| c.change_type() != SetupChangeType::NoChange) .unwrap_or_default() { - state_updates.insert( + update_info.insert( db_metadata::ResourceTypeKey::new( MetadataRecordType::TrackingTable.to_string(), serde_json::Value::Null, ), - tracking_table - .state - .as_ref() - .map(serde_json::to_value) - .transpose()?, + db_metadata::StateUpdateInfo::new(tracking_table.state.as_ref(), None)?, ); } } for target_resource in &flow_status.target_resources { - state_updates.insert( + update_info.insert( db_metadata::ResourceTypeKey::new( MetadataRecordType::Target(target_resource.key.target_kind.clone()).to_string(), target_resource.key.key.clone(), ), - target_resource - .state - .as_ref() - .map(serde_json::to_value) - .transpose()?, + db_metadata::StateUpdateInfo::new( + target_resource.state.as_ref(), + target_resource.legacy_key.as_ref().map(|k| { + db_metadata::ResourceTypeKey::new( + MetadataRecordType::Target(k.target_kind.clone()).to_string(), + k.key.clone(), + ) + }), + )?, ); } let new_version_id = db_metadata::stage_changes_for_flow( flow_name, flow_status.seen_flow_metadata_version, - &state_updates, + &update_info, pool, ) .await?; @@ -474,7 +504,7 @@ pub async fn apply_changes( db_metadata::commit_changes_for_flow( flow_name, new_version_id, - state_updates, + update_info, is_deletion, pool, ) diff --git a/src/setup/states.rs b/src/setup/states.rs index b61488dea..8924ea692 100644 --- a/src/setup/states.rs +++ b/src/setup/states.rs @@ -42,6 +42,7 @@ impl StateMode for DesiredMode { pub struct CombinedState { pub current: Option, pub staging: Vec>, + pub legacy_state_key: Option, } impl CombinedState { @@ -73,6 +74,7 @@ impl Default for CombinedState { Self { current: None, staging: vec![], + legacy_state_key: None, } } } @@ -263,6 +265,8 @@ pub struct ResourceSetupInfo { /// If `None`, the resource is managed by users. pub status_check: Option, + + pub legacy_key: Option, } impl std::fmt::Display for ResourceSetupInfo {