Skip to content

Commit 8faa3c2

Browse files
authored
fix: avoid showing tracking table in drop if there's no such thing (#376)
1 parent f04b793 commit 8faa3c2

File tree

3 files changed

+52
-43
lines changed

3 files changed

+52
-43
lines changed

src/execution/db_tracking_setup.rs

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -67,19 +67,25 @@ impl TrackingTableSetupStatusCheck {
6767
desired: Option<&TrackingTableSetupState>,
6868
existing: &CombinedState<TrackingTableSetupState>,
6969
source_ids_to_delete: Vec<i32>,
70-
) -> Self {
71-
Self {
72-
desired_state: desired.cloned(),
73-
legacy_table_names: existing
74-
.legacy_values(desired, |v| &v.table_name)
75-
.into_iter()
76-
.cloned()
77-
.collect(),
78-
min_existing_version_id: existing
79-
.always_exists()
80-
.then(|| existing.possible_versions().map(|v| v.version_id).min())
81-
.flatten(),
82-
source_ids_to_delete,
70+
) -> Option<Self> {
71+
let legacy_table_names = existing
72+
.legacy_values(desired, |v| &v.table_name)
73+
.into_iter()
74+
.cloned()
75+
.collect();
76+
let min_existing_version_id = existing
77+
.always_exists()
78+
.then(|| existing.possible_versions().map(|v| v.version_id).min())
79+
.flatten();
80+
if desired.is_some() || min_existing_version_id.is_some() {
81+
Some(Self {
82+
desired_state: desired.cloned(),
83+
legacy_table_names,
84+
min_existing_version_id,
85+
source_ids_to_delete,
86+
})
87+
} else {
88+
None
8389
}
8490
}
8591

src/setup/driver.rs

Lines changed: 22 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -308,7 +308,7 @@ pub fn check_flow_setup_status(
308308
status: to_object_status(existing_state, desired_state)?,
309309
seen_flow_metadata_version: existing_state.and_then(|s| s.seen_flow_metadata_version),
310310
metadata_change,
311-
tracking_table: tracking_table_change.into_setup_info(),
311+
tracking_table: tracking_table_change.map(|c| c.into_setup_info()),
312312
target_resources,
313313
unknown_resources,
314314
})
@@ -412,25 +412,25 @@ pub async fn apply_changes(
412412
.transpose()?,
413413
);
414414
}
415-
if flow_status
416-
.tracking_table
417-
.status_check
418-
.as_ref()
419-
.map(|c| c.change_type() != SetupChangeType::NoChange)
420-
.unwrap_or_default()
421-
{
422-
state_updates.insert(
423-
db_metadata::ResourceTypeKey::new(
424-
MetadataRecordType::TrackingTable.to_string(),
425-
serde_json::Value::Null,
426-
),
427-
flow_status
428-
.tracking_table
429-
.state
430-
.as_ref()
431-
.map(serde_json::to_value)
432-
.transpose()?,
433-
);
415+
if let Some(tracking_table) = &flow_status.tracking_table {
416+
if tracking_table
417+
.status_check
418+
.as_ref()
419+
.map(|c| c.change_type() != SetupChangeType::NoChange)
420+
.unwrap_or_default()
421+
{
422+
state_updates.insert(
423+
db_metadata::ResourceTypeKey::new(
424+
MetadataRecordType::TrackingTable.to_string(),
425+
serde_json::Value::Null,
426+
),
427+
tracking_table
428+
.state
429+
.as_ref()
430+
.map(serde_json::to_value)
431+
.transpose()?,
432+
);
433+
}
434434
}
435435
for target_resource in &flow_status.target_resources {
436436
state_updates.insert(
@@ -454,10 +454,8 @@ pub async fn apply_changes(
454454
)
455455
.await?;
456456

457-
maybe_update_resource_setup(write, &flow_status.tracking_table).await?;
458-
459-
for target_resource in &flow_status.target_resources {
460-
maybe_update_resource_setup(write, target_resource).await?;
457+
if let Some(tracking_table) = &flow_status.tracking_table {
458+
maybe_update_resource_setup(write, tracking_table).await?;
461459
}
462460

463461
let is_deletion = flow_status.status == ObjectStatus::Deleted;

src/setup/states.rs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -323,7 +323,7 @@ pub struct FlowSetupStatusCheck {
323323
pub metadata_change: Option<StateChange<FlowSetupMetadata>>,
324324

325325
pub tracking_table:
326-
ResourceSetupInfo<(), TrackingTableSetupState, TrackingTableSetupStatusCheck>,
326+
Option<ResourceSetupInfo<(), TrackingTableSetupState, TrackingTableSetupStatusCheck>>,
327327
pub target_resources: Vec<
328328
ResourceSetupInfo<ResourceIdentifier, TargetSetupState, Box<dyn ResourceSetupStatusCheck>>,
329329
>,
@@ -338,7 +338,10 @@ impl ObjectSetupStatusCheck for FlowSetupStatusCheck {
338338

339339
fn is_up_to_date(&self) -> bool {
340340
self.metadata_change.is_none()
341-
&& self.tracking_table.is_up_to_date()
341+
&& self
342+
.tracking_table
343+
.as_ref()
344+
.is_none_or(|t| t.is_up_to_date())
342345
&& self
343346
.target_resources
344347
.iter()
@@ -392,17 +395,19 @@ impl std::fmt::Display for FormattedFlowSetupStatusCheck<'_> {
392395
write!(
393396
f,
394397
"{} {}\n",
395-
ObjectSetupStatusCode(flow_ssc).to_string().color(AnsiColors::Cyan),
398+
ObjectSetupStatusCode(flow_ssc)
399+
.to_string()
400+
.color(AnsiColors::Cyan),
396401
format!("Flow: {}", self.0)
397402
)?;
398403

399404
let mut f = indented(f).with_str(INDENT);
400-
write!(f, "{}", flow_ssc.tracking_table)?;
401-
405+
if let Some(tracking_table) = &flow_ssc.tracking_table {
406+
write!(f, "{}", tracking_table)?;
407+
}
402408
for target_resource in &flow_ssc.target_resources {
403409
write!(f, "{}", target_resource)?;
404410
}
405-
406411
for resource in &flow_ssc.unknown_resources {
407412
writeln!(f, "[ UNKNOWN ] {resource}")?;
408413
}

0 commit comments

Comments
 (0)