Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 23 additions & 5 deletions src/sql-server-util/src/inspect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,7 @@ impl DDLEvent {
/// 2. ALTER TABLE .. DROP COLUMN
///
/// See <https://learn.microsoft.com/en-us/sql/t-sql/statements/alter-table-transact-sql?view=sql-server-ver17>
pub fn is_compatible(&self) -> bool {
pub fn is_compatible(&self, exclude_columns: &Vec<String>) -> bool {
// TODO (maz): This is currently a basic check that doesn't take into account type changes.
// At some point, we will need to move this to SqlServerTableDesc and expand it.
let mut words = self.ddl_command.split_ascii_whitespace();
Expand All @@ -631,13 +631,31 @@ impl DDLEvent {
words.next().map(str::to_ascii_lowercase).as_deref(),
) {
(Some("alter"), Some("table")) => {
let mut peekable = words.peekable();
let mut peekable = words.multipeek();
let mut compatible = true;
while compatible && let Some(token) = peekable.next() {
compatible = match token.to_ascii_lowercase().as_str() {
"alter" | "drop" => peekable
.peek()
.is_some_and(|next_tok| !next_tok.eq_ignore_ascii_case("column")),
"alter" | "drop" => {
let target = peekable.peek();
match target {
Some(t) if t.eq_ignore_ascii_case("column") => {
// Consume the "column" token
let col_name = peekable.peek();
if let Some(col_name) = col_name {
!exclude_columns
.iter()
.any(|excluded| excluded.eq_ignore_ascii_case(col_name))
} else {
// No column name found after "column" keyword
false
}
}
// No target token after "alter" or "drop"
None => false,
// Other targets are considered compatible
_ => true,
}
}
_ => true,
}
}
Expand Down
3 changes: 3 additions & 0 deletions src/storage/src/source/sql_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ struct SourceOutputInfo {
partition_index: u64,
/// The basis for the resumption LSN when snapshotting.
initial_lsn: Lsn,
/// The columns in the upstream table that have been excluded from this source.
exclude_columns: Vec<String>,
}

#[derive(Debug, Clone, thiserror::Error)]
Expand Down Expand Up @@ -153,6 +155,7 @@ impl SourceRender for SqlServerSourceConnection {
resume_upper: Antichain::from_iter(resume_upper),
partition_index: u64::cast_from(idx),
initial_lsn: details.initial_lsn,
exclude_columns: details.exclude_columns.clone(),
};
source_outputs.insert(*id, output_info);
}
Expand Down
66 changes: 35 additions & 31 deletions src/storage/src/source/sql_server/replication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ pub(crate) fn render<G: Scope<Timestamp = Lsn>>(
let mut capture_instances: BTreeMap<Arc<str>, Vec<_>> = BTreeMap::new();
// Export statistics for a given capture instance
let mut export_statistics: BTreeMap<_, Vec<_>> = BTreeMap::new();
// Maps the exclude columns for each output index so we can check whether schema updates are valid on a per-output basis
let mut exclude_columns: BTreeMap<u64, &Vec<String>> = BTreeMap::new();

for (export_id, output) in outputs.iter() {
if decoder_map.insert(output.partition_index, Arc::clone(&output.decoder)).is_some() {
Expand All @@ -118,6 +120,8 @@ pub(crate) fn render<G: Scope<Timestamp = Lsn>>(
.or_default()
.push(output.partition_index);

exclude_columns.insert(output.partition_index, &output.exclude_columns);

if *output.resume_upper == [Lsn::minimum()] {
capture_instance_to_snapshot
.entry(Arc::clone(&output.capture_instance))
Expand Down Expand Up @@ -359,7 +363,7 @@ pub(crate) fn render<G: Scope<Timestamp = Lsn>>(
.into_stream();
let mut cdc_stream = std::pin::pin!(cdc_stream);

let mut errored_instances = BTreeSet::new();
let mut errored_partitions = BTreeSet::new();

// TODO(sql_server2): We should emit `ProgressStatisticsUpdate::SteadyState` messages
// here, when we receive progress events. What stops us from doing this now is our
Expand Down Expand Up @@ -424,12 +428,6 @@ pub(crate) fn render<G: Scope<Timestamp = Lsn>>(
lsn,
changes,
} => {
if errored_instances.contains(&capture_instance) {
// outputs for this captured instance are in an errored state, so they are not
// emitted
metrics.ignored.inc_by(u64::cast_from(changes.len()));
}

let Some(partition_indexes) = capture_instances.get(&capture_instance) else {
let definite_error = DefiniteError::ProgrammingError(format!(
"capture instance didn't exist: '{capture_instance}'"
Expand All @@ -446,10 +444,17 @@ pub(crate) fn render<G: Scope<Timestamp = Lsn>>(
return Ok(());
};

let (valid_partitions, err_partitions) = partition_indexes.iter().partition::<Vec<u64>, _>(|&partition_idx| {
!errored_partitions.contains(partition_idx)
});

if err_partitions.len() > 0 {
metrics.ignored.inc_by(u64::cast_from(changes.len()));
}

handle_data_event(
changes,
partition_indexes,
&valid_partitions,
&decoder_map,
lsn,
&rewinds,
Expand All @@ -460,36 +465,35 @@ pub(crate) fn render<G: Scope<Timestamp = Lsn>>(
).await?
},
CdcEvent::SchemaUpdate { capture_instance, table, ddl_event } => {
if !errored_instances.contains(&capture_instance)
&& !ddl_event.is_compatible() {
let Some(partition_indexes) = capture_instances.get(&capture_instance) else {
let definite_error = DefiniteError::ProgrammingError(format!(
"capture instance didn't exist: '{capture_instance}'"
));
return_definite_error(
definite_error,
capture_instances.values().flat_map(|indexes| indexes.iter().copied()),
data_output,
data_cap_set,
definite_error_handle,
definite_error_cap_set,
)
.await;
return Ok(());
};
let error = DefiniteError::IncompatibleSchemaChange(
capture_instance.to_string(),
table.to_string()
);
for partition_idx in partition_indexes {
let Some(partition_indexes) = capture_instances.get(&capture_instance) else {
let definite_error = DefiniteError::ProgrammingError(format!(
"capture instance didn't exist: '{capture_instance}'"
));
return_definite_error(
definite_error,
capture_instances.values().flat_map(|indexes| indexes.iter().copied()),
data_output,
data_cap_set,
definite_error_handle,
definite_error_cap_set,
)
.await;
return Ok(());
};
let error = DefiniteError::IncompatibleSchemaChange(
capture_instance.to_string(),
table.to_string()
);
for partition_idx in partition_indexes {
if !errored_partitions.contains(partition_idx) && !ddl_event.is_compatible(exclude_columns.get(partition_idx).unwrap_or_else(|| panic!("Partition index didn't exist: '{partition_idx}'"))) {
data_output
.give_fueled(
&data_cap_set[0],
((*partition_idx, Err(error.clone().into())), ddl_event.lsn, Diff::ONE),
)
.await;
errored_partitions.insert(*partition_idx);
}
errored_instances.insert(capture_instance);
}
}
};
Expand Down