Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions src/builder/exec_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub struct AnalyzedSetupState {
}

fn build_import_op_exec_ctx(
import_field_name: &spec::FieldName,
import_op: &spec::NamedSpec<spec::ImportOpSpec>,
import_op_output_type: &schema::EnrichedValueType,
existing_source_states: Option<&Vec<&setup::SourceSetupState>>,
metadata: &mut setup::FlowSetupMetadata,
Expand Down Expand Up @@ -64,10 +64,11 @@ fn build_import_op_exec_ctx(
metadata.last_source_id
};
metadata.sources.insert(
import_field_name.clone(),
import_op.name.clone(),
setup::SourceSetupState {
source_id,
key_schema: key_schema_no_attrs,
source_kind: import_op.spec.source.kind.clone(),
},
);
Ok(ImportOpExecutionContext { source_id })
Expand Down Expand Up @@ -232,7 +233,7 @@ pub fn build_flow_setup_execution_context(
.get(&import_op.name)
.ok_or_else(invariance_violation)?;
build_import_op_exec_ctx(
&import_op.name,
&import_op,
output_type,
source_states_by_name.get(&import_op.name.as_str()),
&mut metadata,
Expand Down
51 changes: 36 additions & 15 deletions src/execution/db_tracking_setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,16 @@ async fn create_source_state_table(pool: &PgPool, table_name: &str) -> Result<()
Ok(())
}

async fn delete_source_states_for_sources(
pool: &PgPool,
table_name: &str,
source_ids: &Vec<i32>,
) -> Result<()> {
let query = format!("DELETE FROM {} WHERE source_id = ANY($1)", table_name,);
sqlx::query(&query).bind(source_ids).execute(pool).await?;
Ok(())
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct TrackingTableSetupState {
pub table_name: String,
Expand All @@ -92,14 +102,14 @@ pub struct TrackingTableSetupChange {
pub source_state_table_always_exists: bool,
pub legacy_source_state_table_names: BTreeSet<String>,

pub source_ids_to_delete: Vec<i32>,
pub source_names_need_state_cleanup: BTreeMap<i32, BTreeSet<String>>,
}

impl TrackingTableSetupChange {
pub fn new(
desired: Option<&TrackingTableSetupState>,
existing: &CombinedState<TrackingTableSetupState>,
source_ids_to_delete: Vec<i32>,
source_names_need_state_cleanup: BTreeMap<i32, BTreeSet<String>>,
) -> Option<Self> {
let legacy_tracking_table_names = existing
.legacy_values(desired, |v| &v.table_name)
Expand All @@ -125,7 +135,7 @@ impl TrackingTableSetupChange {
.all(|v| v.source_state_table_name.is_some()),
legacy_source_state_table_names,
min_existing_version_id,
source_ids_to_delete,
source_names_need_state_cleanup,
})
} else {
None
Expand Down Expand Up @@ -201,29 +211,28 @@ impl ResourceSetupChange for TrackingTableSetupChange {
)));
}

if !self.source_ids_to_delete.is_empty() {
if !self.source_names_need_state_cleanup.is_empty() {
changes.push(setup::ChangeDescription::Action(format!(
"Delete source IDs: {}. ",
self.source_ids_to_delete
.iter()
.map(|id| id.to_string())
.collect::<Vec<String>>()
"Clean up legacy source states: {}. ",
self.source_names_need_state_cleanup
.values()
.flatten()
.dedup()
.join(", ")
)));
}
changes
}

fn change_type(&self) -> SetupChangeType {
let source_state_table_up_to_date = self.legacy_source_state_table_names.is_empty()
&& (self.source_state_table_always_exists
|| self
.desired_state
.as_ref()
.map_or(true, |v| v.source_state_table_name.is_none()));
match (self.min_existing_version_id, &self.desired_state) {
(None, Some(_)) => SetupChangeType::Create,
(Some(min_version_id), Some(desired)) => {
let source_state_table_up_to_date = self.legacy_source_state_table_names.is_empty()
&& self.source_names_need_state_cleanup.is_empty()
&& (self.source_state_table_always_exists
|| desired.source_state_table_name.is_none());

if min_version_id == desired.version_id
&& self.legacy_tracking_table_names.is_empty()
&& source_state_table_up_to_date
Expand Down Expand Up @@ -279,6 +288,18 @@ impl TrackingTableSetupChange {
if !self.source_state_table_always_exists {
create_source_state_table(pool, source_state_table_name).await?;
}
if !self.source_names_need_state_cleanup.is_empty() {
delete_source_states_for_sources(
pool,
source_state_table_name,
&self
.source_names_need_state_cleanup
.keys()
.map(|v| *v)
.collect::<Vec<_>>(),
)
.await?;
}
} else {
for lagacy_name in self.legacy_source_state_table_names.iter() {
let query = format!("DROP TABLE IF EXISTS {lagacy_name}");
Expand Down
1 change: 0 additions & 1 deletion src/py/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use pyo3::IntoPyObjectExt;
use pyo3::{exceptions::PyException, prelude::*};
use pyo3_async_runtimes::tokio::future_into_py;
use std::fmt::Write;
use std::io::Cursor;
use std::sync::Arc;

mod convert;
Expand Down
63 changes: 47 additions & 16 deletions src/setup/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,27 +261,58 @@ pub async fn diff_flow_setup_states(
|_, desired_state| Some(StateChange::Upsert(desired_state.clone())),
);

let new_source_ids = desired_state
.iter()
.flat_map(|d| d.metadata.sources.values().map(|v| v.source_id))
.collect::<HashSet<i32>>();
// If the source kind has changed, we need to clean the source states.
let source_names_needs_states_cleanup: BTreeMap<i32, BTreeSet<String>> =
if let Some(desired_state) = desired_state
&& let Some(existing_state) = existing_state
{
let new_source_id_to_kind = desired_state
.metadata
.sources
.values()
.map(|v| (v.source_id, &v.source_kind))
.collect::<HashMap<i32, &String>>();

let mut existing_source_id_to_name_kind =
BTreeMap::<i32, Vec<(&String, &String)>>::new();
for (name, setup_state) in existing_state
.metadata
.possible_versions()
.flat_map(|v| v.sources.iter())
{
existing_source_id_to_name_kind
.entry(setup_state.source_id)
.or_default()
.push((&name, &setup_state.source_kind));
}

(existing_source_id_to_name_kind.into_iter())
.map(|(id, name_kinds)| {
let new_kind = new_source_id_to_kind.get(&id).map(|v| *v);
let source_names_for_legacy_states = name_kinds
.into_iter()
.filter_map(|(name, kind)| {
if Some(kind) != new_kind {
Some(name.clone())
} else {
None
}
})
.collect::<BTreeSet<_>>();
(id, source_names_for_legacy_states)
})
.filter(|(_, v)| !v.is_empty())
.collect::<BTreeMap<_, _>>()
} else {
BTreeMap::new()
};

let tracking_table_change = db_tracking_setup::TrackingTableSetupChange::new(
desired_state.map(|d| &d.tracking_table),
&existing_state
.map(|e| Cow::Borrowed(&e.tracking_table))
.unwrap_or_default(),
(existing_state.iter())
.flat_map(|state| state.metadata.possible_versions())
.flat_map(|metadata| {
metadata
.sources
.values()
.map(|v| v.source_id)
.filter(|id| !new_source_ids.contains(id))
})
.collect::<BTreeSet<i32>>()
.into_iter()
.collect(),
source_names_needs_states_cleanup,
);

let mut target_resources = Vec::new();
Expand Down
4 changes: 4 additions & 0 deletions src/setup/states.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,10 @@ impl<State> StateChange<State> {
pub struct SourceSetupState {
pub source_id: i32,
pub key_schema: schema::ValueType,

// Allow empty string during deserialization for backward compatibility.
#[serde(default)]
pub source_kind: String,
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
Expand Down
Loading