Skip to content

Commit b78ca30

Browse files
authored
fix(bigquery): Allow missing table schema when truncate is issued
1 parent 2357826 commit b78ca30

File tree

4 files changed

+36
-9
lines changed

4 files changed

+36
-9
lines changed

etl-destinations/src/bigquery/core.rs

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -654,7 +654,7 @@ where
654654
}
655655

656656
if !truncate_table_ids.is_empty() {
657-
self.process_truncate_for_table_ids(truncate_table_ids.into_iter())
657+
self.process_truncate_for_table_ids(truncate_table_ids.into_iter(), true)
658658
.await?;
659659
}
660660
}
@@ -670,13 +670,28 @@ where
670670
async fn process_truncate_for_table_ids(
671671
&self,
672672
table_ids: impl IntoIterator<Item = TableId>,
673+
is_cdc_truncate: bool,
673674
) -> EtlResult<()> {
674675
// We want to lock for the entire processing to ensure that we don't have any race conditions
675676
// and possible errors are easier to reason about.
676677
let mut inner = self.inner.lock().await;
677678

678679
for table_id in table_ids {
679-
let table_schema = self.store.get_table_schema(&table_id).await?.ok_or_else(|| etl_error!(
680+
let table_schema = self.store.get_table_schema(&table_id).await?;
681+
// If we are not doing CDC, it means that this truncation has been issued while recovering
682+
// from a failed data sync operation. In that case, we could have failed before table schemas
683+
// were stored in the schema store, so we just continue and emit a warning. If we are doing
684+
// CDC, it's a problem if the schema disappears while streaming, so we error out.
685+
if !is_cdc_truncate {
686+
warn!(
687+
"the table schema for table {table_id} was not found in the schema store while processing truncate events for BigQuery",
688+
table_id = table_id.to_string()
689+
);
690+
691+
continue;
692+
}
693+
694+
let table_schema = table_schema.ok_or_else(|| etl_error!(
680695
ErrorKind::MissingTableSchema,
681696
"Table not found in the schema store",
682697
format!(
@@ -785,7 +800,7 @@ where
785800
S: StateStore + SchemaStore + Send + Sync,
786801
{
787802
async fn truncate_table(&self, table_id: TableId) -> EtlResult<()> {
788-
self.process_truncate_for_table_ids(iter::once(table_id))
803+
self.process_truncate_for_table_ids(iter::once(table_id), false)
789804
.await
790805
}
791806

etl/src/failpoints.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@ use fail::fail_point;
88
use crate::bail;
99
use crate::error::{ErrorKind, EtlError, EtlResult};
1010

11-
pub const START_TABLE_SYNC__AFTER_DATA_SYNC: &str = "start_table_sync.after_data_sync";
11+
pub const START_TABLE_SYNC__BEFORE_DATA_SYNC_SLOT_CREATION: &str =
12+
"start_table_sync.befor_data_sync_slot_creation";
1213
pub const START_TABLE_SYNC__DURING_DATA_SYNC: &str = "start_table_sync.during_data_sync";
1314

1415
/// Executes a configurable failpoint for testing error scenarios.

etl/src/replication/table_sync.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@ use crate::destination::Destination;
1818
use crate::error::{ErrorKind, EtlError, EtlResult};
1919
#[cfg(feature = "failpoints")]
2020
use crate::failpoints::{
21-
START_TABLE_SYNC__AFTER_DATA_SYNC, START_TABLE_SYNC__DURING_DATA_SYNC, etl_fail_point,
21+
START_TABLE_SYNC__BEFORE_DATA_SYNC_SLOT_CREATION, START_TABLE_SYNC__DURING_DATA_SYNC,
22+
etl_fail_point,
2223
};
2324
use crate::metrics::{
2425
ETL_BATCH_SEND_MILLISECONDS_TOTAL, ETL_BATCH_SIZE, ETL_TABLE_SYNC_ROWS_COPIED_TOTAL,
@@ -165,7 +166,7 @@ where
165166

166167
// Fail point to test when the table sync fails before copying data.
167168
#[cfg(feature = "failpoints")]
168-
etl_fail_point(START_TABLE_SYNC__AFTER_DATA_SYNC)?;
169+
etl_fail_point(START_TABLE_SYNC__BEFORE_DATA_SYNC_SLOT_CREATION)?;
169170

170171
// We create the slot with a transaction, since we need to have a consistent snapshot of the database
171172
// before copying the schema and tables.

etl/tests/failpoints/pipeline_test.rs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
use etl::destination::memory::MemoryDestination;
22
use etl::error::ErrorKind;
3-
use etl::failpoints::{START_TABLE_SYNC__AFTER_DATA_SYNC, START_TABLE_SYNC__DURING_DATA_SYNC};
3+
use etl::failpoints::{
4+
START_TABLE_SYNC__BEFORE_DATA_SYNC_SLOT_CREATION, START_TABLE_SYNC__DURING_DATA_SYNC,
5+
};
46
use etl::state::table::TableReplicationPhaseType;
57
use etl::test_utils::database::spawn_source_database;
68
use etl::test_utils::notify::NotifyingStore;
@@ -15,7 +17,11 @@ use rand::random;
1517
#[tokio::test(flavor = "multi_thread")]
1618
async fn table_copy_fails_after_data_sync_threw_an_error_with_no_retry() {
1719
let _scenario = FailScenario::setup();
18-
fail::cfg(START_TABLE_SYNC__AFTER_DATA_SYNC, "1*return(no_retry)").unwrap();
20+
fail::cfg(
21+
START_TABLE_SYNC__BEFORE_DATA_SYNC_SLOT_CREATION,
22+
"1*return(no_retry)",
23+
)
24+
.unwrap();
1925

2026
init_test_tracing();
2127

@@ -73,7 +79,11 @@ async fn table_copy_fails_after_data_sync_threw_an_error_with_no_retry() {
7379
#[tokio::test(flavor = "multi_thread")]
7480
async fn table_copy_is_consistent_after_data_sync_threw_an_error_with_timed_retry() {
7581
let _scenario = FailScenario::setup();
76-
fail::cfg(START_TABLE_SYNC__AFTER_DATA_SYNC, "1*return(timed_retry)").unwrap();
82+
fail::cfg(
83+
START_TABLE_SYNC__BEFORE_DATA_SYNC_SLOT_CREATION,
84+
"1*return(timed_retry)",
85+
)
86+
.unwrap();
7787

7888
init_test_tracing();
7989

0 commit comments

Comments
 (0)