Skip to content

Conversation

iambriccardo
Copy link
Contributor

@iambriccardo iambriccardo commented Sep 29, 2025

This PR adds support for the Relation message, which can now emit RelationChange events to describe what changed in a table. In addition, it defaults all ColumnSchemas to nullable since we are unable to infer nullability from Relation messages because the field is omitted; thus, it's better if we just make everything nullable since it gives more flexibility.

The PR also adds schema change support for the BigQuery destination.

The main limitation of this approach is that we do not recognize column renames because of a limitation of the replication protocol, so when a rename is performed, it is emitted as DROP + ADD.

@iambriccardo
Copy link
Contributor Author

@codex review

Copy link

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting

Comment on lines 92 to 106
// We process all the changes that we want to dispatch to the destination.
let mut changes = vec![];
for column_schema in self.old_table_schema.column_schemas.iter() {
let column_schema = IndexedColumnSchema(column_schema.clone());
let latest_column_schema = new_indexed_column_schemas.take(&column_schema);
match latest_column_schema {
Some(latest_column_schema) => {
let column_schema = column_schema.into_inner();
let latest_column_schema = latest_column_schema.into_inner();

if column_schema.name != latest_column_schema.name {
// If we find a column with the same name but different fields, we assume it was changed. The only changes
// that we detect are changes to the column but with preserved name.
changes.push(RelationChange::AlterColumn(
column_schema,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Altered columns never detected

In RelationEvent::build_changes the comparison inside the Some branch uses if column_schema.name != latest_column_schema.name. Because the HashSet lookup is keyed by column name, the retrieved column always has the same name, so this condition is never true even when type, nullability or other attributes changed. As a result no RelationChange::AlterColumn events are ever produced and destinations will miss in-place schema changes, leaving replicated tables out of sync.

Useful? React with 👍 / 👎.

Comment on lines 652 to 677
// Preserve per-table ordering.
table_id_to_table_rows
.entry(table_id)
.or_default()
.push(table_row);

// Ensure a single schema version per batch.
//
// We need to do this since we don't make any assumptions on relation events being there
// so we use the schema version of the first element that we find.
//
// The invariant that must be upheld is that for all events in a batch, they must all have
// the same schema version.
match batch_schema_version {
Some(batch_schema_version) => {
if schema_version != *batch_schema_version {
bail!(
ErrorKind::InvalidState,
"Multiple schema versions in the same batch",
"Multiple schema versions in the same batch were found while processing events for BigQuery"
)
}
}
None => {
*batch_schema_version = Some(schema_version);
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge BigQuery batching assumes global schema version

The new batching logic records a single batch_schema_version for the entire batch and bails if any subsequent DML event carries a different version. Schema versions are scoped per table, so it’s normal for a batch to include events for two tables with different versions (e.g., one table had a schema change while another did not). Under such circumstances the code emits “Multiple schema versions in the same batch” and the destination fails to apply further events. The version tracking should be per table rather than a global constraint on the batch.

Useful? React with 👍 / 👎.

@coveralls
Copy link

coveralls commented Oct 6, 2025

Pull Request Test Coverage Report for Build 18295505125

Details

  • 663 of 899 (73.75%) changed or added relevant lines in 19 files are covered.
  • 90 unchanged lines in 14 files lost coverage.
  • Overall coverage decreased (-0.9%) to 80.835%

Changes Missing Coverage Covered Lines Changed/Added Lines %
etl/src/test_utils/test_schema.rs 5 7 71.43%
etl/src/replication/apply.rs 50 55 90.91%
etl/src/destination/memory.rs 5 11 45.45%
etl/src/test_utils/test_destination_wrapper.rs 0 6 0.0%
etl/src/types/event.rs 43 51 84.31%
etl/src/conversions/event.rs 28 40 70.0%
etl/src/store/both/postgres.rs 45 57 78.95%
etl-postgres/src/types/schema.rs 23 38 60.53%
etl/src/store/both/memory.rs 0 39 0.0%
etl-destinations/src/bigquery/core.rs 241 285 84.56%
Files with Coverage Reduction New Missed Lines %
etl-destinations/src/bigquery/validation.rs 1 82.0%
etl/src/destination/memory.rs 1 60.0%
etl/src/test_utils/test_destination_wrapper.rs 1 80.99%
etl/src/test_utils/test_schema.rs 1 89.95%
etl/src/conversions/event.rs 2 85.96%
etl/src/store/both/postgres.rs 2 82.07%
etl-destinations/src/bigquery/client.rs 3 72.29%
etl/src/conversions/numeric.rs 3 87.01%
etl/src/replication/apply.rs 3 88.34%
etl/src/state/table.rs 4 60.49%
Totals Coverage Status
Change from base Build 18271607905: -0.9%
Covered Lines: 14210
Relevant Lines: 17579

💛 - Coveralls

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants