Skip to content

Commit deeef7e

Browse files
authored
fix(setup): make sure legacy setup keys cleared from db (#398)
1 parent 0be50d6 commit deeef7e

File tree

7 files changed

+117
-55
lines changed

7 files changed

+117
-55
lines changed

src/execution/db_tracking_setup.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ impl TrackingTableSetupStatusCheck {
9797
state: self.desired_state.clone(),
9898
description: "Tracking Table".to_string(),
9999
status_check: Some(self),
100+
legacy_key: None,
100101
}
101102
}
102103
}

src/ops/factory_bases.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -420,7 +420,7 @@ impl<T: StorageFactoryBase> ExportTargetFactory for T {
420420
StorageFactoryBase::describe_resource(self, &key)
421421
}
422422

423-
fn normalize_setup_key(&self, key: serde_json::Value) -> Result<serde_json::Value> {
423+
fn normalize_setup_key(&self, key: &serde_json::Value) -> Result<serde_json::Value> {
424424
let key: T::Key = serde_json::from_value(key.clone())?;
425425
Ok(serde_json::to_value(key)?)
426426
}
@@ -478,5 +478,6 @@ fn from_json_combined_state<T: Debug + Clone + Serialize + DeserializeOwned>(
478478
})
479479
})
480480
.collect::<Result<_>>()?,
481+
legacy_state_key: existing_states.legacy_state_key,
481482
})
482483
}

src/ops/interface.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,7 @@ pub trait ExportTargetFactory: Send + Sync {
200200

201201
/// Normalize the key. e.g. the JSON format may change (after code change, e.g. new optional field or field ordering), even if the underlying value is not changed.
202202
/// This should always return the canonical serialized form.
203-
fn normalize_setup_key(&self, key: serde_json::Value) -> Result<serde_json::Value>;
203+
fn normalize_setup_key(&self, key: &serde_json::Value) -> Result<serde_json::Value>;
204204

205205
fn check_state_compatibility(
206206
&self,

src/setup/components.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ impl<D: Operator> StatusCheck<D> {
6565
}
6666
})
6767
.collect(),
68+
legacy_state_key: existing.legacy_state_key,
6869
};
6970
let mut keys_to_delete = IndexSet::new();
7071
let mut states_to_upsert = vec![];

src/setup/db_metadata.rs

Lines changed: 52 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -179,10 +179,30 @@ async fn delete_state(
179179
Ok(())
180180
}
181181

182+
pub struct StateUpdateInfo {
183+
pub desired_state: Option<serde_json::Value>,
184+
pub legacy_key: Option<ResourceTypeKey>,
185+
}
186+
187+
impl StateUpdateInfo {
188+
pub fn new(
189+
desired_state: Option<&impl Serialize>,
190+
legacy_key: Option<ResourceTypeKey>,
191+
) -> Result<Self> {
192+
Ok(Self {
193+
desired_state: desired_state
194+
.as_ref()
195+
.map(serde_json::to_value)
196+
.transpose()?,
197+
legacy_key,
198+
})
199+
}
200+
}
201+
182202
pub async fn stage_changes_for_flow(
183203
flow_name: &str,
184204
seen_metadata_version: Option<u64>,
185-
state_updates: &HashMap<ResourceTypeKey, Option<serde_json::Value>>,
205+
resource_update_info: &HashMap<ResourceTypeKey, StateUpdateInfo>,
186206
pool: &PgPool,
187207
) -> Result<u64> {
188208
let mut txn = pool.begin().await?;
@@ -210,39 +230,43 @@ pub async fn stage_changes_for_flow(
210230
)
211231
.await?;
212232

213-
for (type_id, desired_state) in state_updates {
233+
for (type_id, update_info) in resource_update_info {
214234
let existing = existing_records.remove(type_id);
215-
let change = match desired_state {
235+
let change = match &update_info.desired_state {
216236
Some(desired_state) => StateChange::Upsert(desired_state.clone()),
217237
None => StateChange::Delete,
218238
};
219-
match existing {
239+
let mut new_staging_changes = vec![];
240+
if let Some(legacy_key) = &update_info.legacy_key {
241+
if let Some(legacy_record) = existing_records.remove(&legacy_key) {
242+
new_staging_changes.extend(legacy_record.staging_changes.0);
243+
delete_state(flow_name, legacy_key, &mut *txn).await?;
244+
}
245+
}
246+
let (action, existing_staging_changes) = match existing {
220247
Some(existing) => {
221-
if existing.staging_changes.0.iter().all(|c| c != &change) {
222-
let mut staging_changes = existing.staging_changes.0;
223-
staging_changes.push(change);
224-
upsert_staging_changes(
225-
flow_name,
226-
type_id,
227-
staging_changes,
228-
&mut *txn,
229-
WriteAction::Update,
230-
)
231-
.await?;
248+
let existing_staging_changes = existing.staging_changes.0;
249+
if existing_staging_changes.iter().all(|c| c != &change) {
250+
new_staging_changes.push(change);
232251
}
252+
(WriteAction::Update, existing_staging_changes)
233253
}
234254
None => {
235-
if desired_state.is_some() {
236-
upsert_staging_changes(
237-
flow_name,
238-
type_id,
239-
vec![change],
240-
&mut *txn,
241-
WriteAction::Insert,
242-
)
243-
.await?;
255+
if update_info.desired_state.is_some() {
256+
new_staging_changes.push(change);
244257
}
258+
(WriteAction::Insert, vec![])
245259
}
260+
};
261+
if !new_staging_changes.is_empty() {
262+
upsert_staging_changes(
263+
flow_name,
264+
type_id,
265+
[existing_staging_changes, new_staging_changes].concat(),
266+
&mut *txn,
267+
action,
268+
)
269+
.await?;
246270
}
247271
}
248272
txn.commit().await?;
@@ -252,7 +276,7 @@ pub async fn stage_changes_for_flow(
252276
pub async fn commit_changes_for_flow(
253277
flow_name: &str,
254278
curr_metadata_version: u64,
255-
state_updates: HashMap<ResourceTypeKey, Option<serde_json::Value>>,
279+
state_updates: HashMap<ResourceTypeKey, StateUpdateInfo>,
256280
delete_version: bool,
257281
pool: &PgPool,
258282
) -> Result<()> {
@@ -265,8 +289,8 @@ pub async fn commit_changes_for_flow(
265289
StatusCode::CONFLICT,
266290
))?;
267291
}
268-
for (type_id, desired_state) in state_updates {
269-
match desired_state {
292+
for (type_id, update_info) in state_updates {
293+
match update_info.desired_state {
270294
Some(desired_state) => {
271295
upsert_state(
272296
flow_name,
@@ -301,6 +325,7 @@ impl MetadataTableSetup {
301325
state: None,
302326
description: "CocoIndex Metadata Table".to_string(),
303327
status_check: Some(self),
328+
legacy_key: None,
304329
}
305330
}
306331
}

src/setup/driver.rs

Lines changed: 56 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ impl std::str::FromStr for MetadataRecordType {
5757
fn from_metadata_record<S: DeserializeOwned + Debug + Clone>(
5858
state: Option<serde_json::Value>,
5959
staging_changes: sqlx::types::Json<Vec<StateChange<serde_json::Value>>>,
60+
legacy_state_key: Option<serde_json::Value>,
6061
) -> Result<CombinedState<S>> {
6162
let current: Option<S> = state.map(serde_json::from_value).transpose()?;
6263
let staging: Vec<StateChange<S>> = (staging_changes.0.into_iter())
@@ -67,7 +68,11 @@ fn from_metadata_record<S: DeserializeOwned + Debug + Clone>(
6768
})
6869
})
6970
.collect::<Result<_>>()?;
70-
Ok(CombinedState { current, staging })
71+
Ok(CombinedState {
72+
current,
73+
staging,
74+
legacy_state_key,
75+
})
7176
}
7277

7378
pub async fn get_existing_setup_state(pool: &PgPool) -> Result<AllSetupState<ExistingMode>> {
@@ -103,27 +108,33 @@ pub async fn get_existing_setup_state(pool: &PgPool) -> Result<AllSetupState<Exi
103108
db_metadata::parse_flow_version(&state);
104109
}
105110
MetadataRecordType::FlowMetadata => {
106-
flow_ss.metadata = from_metadata_record(state, staging_changes)?;
111+
flow_ss.metadata = from_metadata_record(state, staging_changes, None)?;
107112
}
108113
MetadataRecordType::TrackingTable => {
109-
flow_ss.tracking_table = from_metadata_record(state, staging_changes)?;
114+
flow_ss.tracking_table =
115+
from_metadata_record(state, staging_changes, None)?;
110116
}
111117
MetadataRecordType::Target(target_type) => {
112118
let normalized_key = {
113119
let registry = executor_factory_registry();
114120
match registry.get(&target_type) {
115121
Some(ExecutorFactory::ExportTarget(factory)) => {
116-
factory.normalize_setup_key(metadata_record.key)?
122+
factory.normalize_setup_key(&metadata_record.key)?
117123
}
118124
_ => metadata_record.key.clone(),
119125
}
120126
};
127+
let combined_state = from_metadata_record(
128+
state,
129+
staging_changes,
130+
(normalized_key != metadata_record.key).then_some(metadata_record.key),
131+
)?;
121132
flow_ss.targets.insert(
122133
super::ResourceIdentifier {
123134
key: normalized_key,
124135
target_kind: target_type,
125136
},
126-
from_metadata_record(state, staging_changes)?,
137+
combined_state,
127138
);
128139
}
129140
}
@@ -203,6 +214,20 @@ fn group_resource_states<'a>(
203214
if let Some(current) = &state.current {
204215
entry.existing.current = Some(current.clone());
205216
}
217+
if let Some(legacy_state_key) = &state.legacy_state_key {
218+
if !entry
219+
.existing
220+
.legacy_state_key
221+
.as_ref()
222+
.map_or(false, |v| v == legacy_state_key)
223+
{
224+
warn!(
225+
"inconsistent legacy key: {:?}, {:?}",
226+
key, entry.existing.legacy_state_key
227+
);
228+
}
229+
entry.existing.legacy_state_key = Some(legacy_state_key.clone());
230+
}
206231
for s in state.staging.iter() {
207232
match s {
208233
StateChange::Upsert(v) => {
@@ -288,6 +313,7 @@ pub async fn check_flow_setup_status(
288313
StateChange::Delete => Some(StateChange::Delete),
289314
})
290315
.collect(),
316+
legacy_state_key: v.existing.legacy_state_key.clone(),
291317
};
292318
let never_setup_by_sys = target_state.is_none()
293319
&& existing_without_setup_by_user.current.is_none()
@@ -311,6 +337,13 @@ pub async fn check_flow_setup_status(
311337
state,
312338
description: factory.describe_resource(&resource_id.key)?,
313339
status_check,
340+
legacy_key: v
341+
.existing
342+
.legacy_state_key
343+
.map(|legacy_state_key| ResourceIdentifier {
344+
target_kind: resource_id.target_kind.clone(),
345+
key: legacy_state_key,
346+
}),
314347
});
315348
}
316349
Ok(FlowSetupStatusCheck {
@@ -406,19 +439,16 @@ pub async fn apply_changes(
406439
};
407440
write!(write, "\n{verb} flow {flow_name}:\n")?;
408441

409-
let mut state_updates =
410-
HashMap::<db_metadata::ResourceTypeKey, Option<serde_json::Value>>::new();
442+
let mut update_info =
443+
HashMap::<db_metadata::ResourceTypeKey, db_metadata::StateUpdateInfo>::new();
411444

412445
if let Some(metadata_change) = &flow_status.metadata_change {
413-
state_updates.insert(
446+
update_info.insert(
414447
db_metadata::ResourceTypeKey::new(
415448
MetadataRecordType::FlowMetadata.to_string(),
416449
serde_json::Value::Null,
417450
),
418-
metadata_change
419-
.desired_state()
420-
.map(serde_json::to_value)
421-
.transpose()?,
451+
db_metadata::StateUpdateInfo::new(metadata_change.desired_state(), None)?,
422452
);
423453
}
424454
if let Some(tracking_table) = &flow_status.tracking_table {
@@ -428,37 +458,37 @@ pub async fn apply_changes(
428458
.map(|c| c.change_type() != SetupChangeType::NoChange)
429459
.unwrap_or_default()
430460
{
431-
state_updates.insert(
461+
update_info.insert(
432462
db_metadata::ResourceTypeKey::new(
433463
MetadataRecordType::TrackingTable.to_string(),
434464
serde_json::Value::Null,
435465
),
436-
tracking_table
437-
.state
438-
.as_ref()
439-
.map(serde_json::to_value)
440-
.transpose()?,
466+
db_metadata::StateUpdateInfo::new(tracking_table.state.as_ref(), None)?,
441467
);
442468
}
443469
}
444470
for target_resource in &flow_status.target_resources {
445-
state_updates.insert(
471+
update_info.insert(
446472
db_metadata::ResourceTypeKey::new(
447473
MetadataRecordType::Target(target_resource.key.target_kind.clone()).to_string(),
448474
target_resource.key.key.clone(),
449475
),
450-
target_resource
451-
.state
452-
.as_ref()
453-
.map(serde_json::to_value)
454-
.transpose()?,
476+
db_metadata::StateUpdateInfo::new(
477+
target_resource.state.as_ref(),
478+
target_resource.legacy_key.as_ref().map(|k| {
479+
db_metadata::ResourceTypeKey::new(
480+
MetadataRecordType::Target(k.target_kind.clone()).to_string(),
481+
k.key.clone(),
482+
)
483+
}),
484+
)?,
455485
);
456486
}
457487

458488
let new_version_id = db_metadata::stage_changes_for_flow(
459489
flow_name,
460490
flow_status.seen_flow_metadata_version,
461-
&state_updates,
491+
&update_info,
462492
pool,
463493
)
464494
.await?;
@@ -474,7 +504,7 @@ pub async fn apply_changes(
474504
db_metadata::commit_changes_for_flow(
475505
flow_name,
476506
new_version_id,
477-
state_updates,
507+
update_info,
478508
is_deletion,
479509
pool,
480510
)

src/setup/states.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ impl StateMode for DesiredMode {
4242
pub struct CombinedState<T> {
4343
pub current: Option<T>,
4444
pub staging: Vec<StateChange<T>>,
45+
pub legacy_state_key: Option<serde_json::Value>,
4546
}
4647

4748
impl<T> CombinedState<T> {
@@ -73,6 +74,7 @@ impl<T: Debug + Clone> Default for CombinedState<T> {
7374
Self {
7475
current: None,
7576
staging: vec![],
77+
legacy_state_key: None,
7678
}
7779
}
7880
}
@@ -263,6 +265,8 @@ pub struct ResourceSetupInfo<K, S, C: ResourceSetupStatusCheck> {
263265

264266
/// If `None`, the resource is managed by users.
265267
pub status_check: Option<C>,
268+
269+
pub legacy_key: Option<K>,
266270
}
267271

268272
impl<K, S, C: ResourceSetupStatusCheck> std::fmt::Display for ResourceSetupInfo<K, S, C> {

0 commit comments

Comments
 (0)