Skip to content

Commit a314752

Browse files
authored
feat(setup): cleanup source states after incompatible change (#906)
1 parent 47ce7dd commit a314752

File tree

5 files changed

+91
-35
lines changed

5 files changed

+91
-35
lines changed

src/builder/exec_ctx.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ pub struct AnalyzedSetupState {
3131
}
3232

3333
fn build_import_op_exec_ctx(
34-
import_field_name: &spec::FieldName,
34+
import_op: &spec::NamedSpec<spec::ImportOpSpec>,
3535
import_op_output_type: &schema::EnrichedValueType,
3636
existing_source_states: Option<&Vec<&setup::SourceSetupState>>,
3737
metadata: &mut setup::FlowSetupMetadata,
@@ -64,10 +64,11 @@ fn build_import_op_exec_ctx(
6464
metadata.last_source_id
6565
};
6666
metadata.sources.insert(
67-
import_field_name.clone(),
67+
import_op.name.clone(),
6868
setup::SourceSetupState {
6969
source_id,
7070
key_schema: key_schema_no_attrs,
71+
source_kind: import_op.spec.source.kind.clone(),
7172
},
7273
);
7374
Ok(ImportOpExecutionContext { source_id })
@@ -232,7 +233,7 @@ pub fn build_flow_setup_execution_context(
232233
.get(&import_op.name)
233234
.ok_or_else(invariance_violation)?;
234235
build_import_op_exec_ctx(
235-
&import_op.name,
236+
&import_op,
236237
output_type,
237238
source_states_by_name.get(&import_op.name.as_str()),
238239
&mut metadata,

src/execution/db_tracking_setup.rs

Lines changed: 36 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,16 @@ async fn create_source_state_table(pool: &PgPool, table_name: &str) -> Result<()
7272
Ok(())
7373
}
7474

75+
async fn delete_source_states_for_sources(
76+
pool: &PgPool,
77+
table_name: &str,
78+
source_ids: &Vec<i32>,
79+
) -> Result<()> {
80+
let query = format!("DELETE FROM {} WHERE source_id = ANY($1)", table_name,);
81+
sqlx::query(&query).bind(source_ids).execute(pool).await?;
82+
Ok(())
83+
}
84+
7585
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
7686
pub struct TrackingTableSetupState {
7787
pub table_name: String,
@@ -92,14 +102,14 @@ pub struct TrackingTableSetupChange {
92102
pub source_state_table_always_exists: bool,
93103
pub legacy_source_state_table_names: BTreeSet<String>,
94104

95-
pub source_ids_to_delete: Vec<i32>,
105+
pub source_names_need_state_cleanup: BTreeMap<i32, BTreeSet<String>>,
96106
}
97107

98108
impl TrackingTableSetupChange {
99109
pub fn new(
100110
desired: Option<&TrackingTableSetupState>,
101111
existing: &CombinedState<TrackingTableSetupState>,
102-
source_ids_to_delete: Vec<i32>,
112+
source_names_need_state_cleanup: BTreeMap<i32, BTreeSet<String>>,
103113
) -> Option<Self> {
104114
let legacy_tracking_table_names = existing
105115
.legacy_values(desired, |v| &v.table_name)
@@ -125,7 +135,7 @@ impl TrackingTableSetupChange {
125135
.all(|v| v.source_state_table_name.is_some()),
126136
legacy_source_state_table_names,
127137
min_existing_version_id,
128-
source_ids_to_delete,
138+
source_names_need_state_cleanup,
129139
})
130140
} else {
131141
None
@@ -201,29 +211,28 @@ impl ResourceSetupChange for TrackingTableSetupChange {
201211
)));
202212
}
203213

204-
if !self.source_ids_to_delete.is_empty() {
214+
if !self.source_names_need_state_cleanup.is_empty() {
205215
changes.push(setup::ChangeDescription::Action(format!(
206-
"Delete source IDs: {}. ",
207-
self.source_ids_to_delete
208-
.iter()
209-
.map(|id| id.to_string())
210-
.collect::<Vec<String>>()
216+
"Clean up legacy source states: {}. ",
217+
self.source_names_need_state_cleanup
218+
.values()
219+
.flatten()
220+
.dedup()
211221
.join(", ")
212222
)));
213223
}
214224
changes
215225
}
216226

217227
fn change_type(&self) -> SetupChangeType {
218-
let source_state_table_up_to_date = self.legacy_source_state_table_names.is_empty()
219-
&& (self.source_state_table_always_exists
220-
|| self
221-
.desired_state
222-
.as_ref()
223-
.map_or(true, |v| v.source_state_table_name.is_none()));
224228
match (self.min_existing_version_id, &self.desired_state) {
225229
(None, Some(_)) => SetupChangeType::Create,
226230
(Some(min_version_id), Some(desired)) => {
231+
let source_state_table_up_to_date = self.legacy_source_state_table_names.is_empty()
232+
&& self.source_names_need_state_cleanup.is_empty()
233+
&& (self.source_state_table_always_exists
234+
|| desired.source_state_table_name.is_none());
235+
227236
if min_version_id == desired.version_id
228237
&& self.legacy_tracking_table_names.is_empty()
229238
&& source_state_table_up_to_date
@@ -279,6 +288,18 @@ impl TrackingTableSetupChange {
279288
if !self.source_state_table_always_exists {
280289
create_source_state_table(pool, source_state_table_name).await?;
281290
}
291+
if !self.source_names_need_state_cleanup.is_empty() {
292+
delete_source_states_for_sources(
293+
pool,
294+
source_state_table_name,
295+
&self
296+
.source_names_need_state_cleanup
297+
.keys()
298+
.map(|v| *v)
299+
.collect::<Vec<_>>(),
300+
)
301+
.await?;
302+
}
282303
} else {
283304
for lagacy_name in self.legacy_source_state_table_names.iter() {
284305
let query = format!("DROP TABLE IF EXISTS {lagacy_name}");

src/py/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ use pyo3::IntoPyObjectExt;
1313
use pyo3::{exceptions::PyException, prelude::*};
1414
use pyo3_async_runtimes::tokio::future_into_py;
1515
use std::fmt::Write;
16-
use std::io::Cursor;
1716
use std::sync::Arc;
1817

1918
mod convert;

src/setup/driver.rs

Lines changed: 47 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -261,27 +261,58 @@ pub async fn diff_flow_setup_states(
261261
|_, desired_state| Some(StateChange::Upsert(desired_state.clone())),
262262
);
263263

264-
let new_source_ids = desired_state
265-
.iter()
266-
.flat_map(|d| d.metadata.sources.values().map(|v| v.source_id))
267-
.collect::<HashSet<i32>>();
264+
// If the source kind has changed, we need to clean the source states.
265+
let source_names_needs_states_cleanup: BTreeMap<i32, BTreeSet<String>> =
266+
if let Some(desired_state) = desired_state
267+
&& let Some(existing_state) = existing_state
268+
{
269+
let new_source_id_to_kind = desired_state
270+
.metadata
271+
.sources
272+
.values()
273+
.map(|v| (v.source_id, &v.source_kind))
274+
.collect::<HashMap<i32, &String>>();
275+
276+
let mut existing_source_id_to_name_kind =
277+
BTreeMap::<i32, Vec<(&String, &String)>>::new();
278+
for (name, setup_state) in existing_state
279+
.metadata
280+
.possible_versions()
281+
.flat_map(|v| v.sources.iter())
282+
{
283+
existing_source_id_to_name_kind
284+
.entry(setup_state.source_id)
285+
.or_default()
286+
.push((&name, &setup_state.source_kind));
287+
}
288+
289+
(existing_source_id_to_name_kind.into_iter())
290+
.map(|(id, name_kinds)| {
291+
let new_kind = new_source_id_to_kind.get(&id).map(|v| *v);
292+
let source_names_for_legacy_states = name_kinds
293+
.into_iter()
294+
.filter_map(|(name, kind)| {
295+
if Some(kind) != new_kind {
296+
Some(name.clone())
297+
} else {
298+
None
299+
}
300+
})
301+
.collect::<BTreeSet<_>>();
302+
(id, source_names_for_legacy_states)
303+
})
304+
.filter(|(_, v)| !v.is_empty())
305+
.collect::<BTreeMap<_, _>>()
306+
} else {
307+
BTreeMap::new()
308+
};
309+
268310
let tracking_table_change = db_tracking_setup::TrackingTableSetupChange::new(
269311
desired_state.map(|d| &d.tracking_table),
270312
&existing_state
271313
.map(|e| Cow::Borrowed(&e.tracking_table))
272314
.unwrap_or_default(),
273-
(existing_state.iter())
274-
.flat_map(|state| state.metadata.possible_versions())
275-
.flat_map(|metadata| {
276-
metadata
277-
.sources
278-
.values()
279-
.map(|v| v.source_id)
280-
.filter(|id| !new_source_ids.contains(id))
281-
})
282-
.collect::<BTreeSet<i32>>()
283-
.into_iter()
284-
.collect(),
315+
source_names_needs_states_cleanup,
285316
);
286317

287318
let mut target_resources = Vec::new();

src/setup/states.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,10 @@ impl<State> StateChange<State> {
148148
pub struct SourceSetupState {
149149
pub source_id: i32,
150150
pub key_schema: schema::ValueType,
151+
152+
// Allow empty string during deserialization for backward compatibility.
153+
#[serde(default)]
154+
pub source_kind: String,
151155
}
152156

153157
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]

0 commit comments

Comments
 (0)