Skip to content

Commit e735f7f

Browse files
authored
fix(test): Fix missing primary key test (#301)
1 parent e194841 commit e735f7f

File tree

6 files changed

+72
-120
lines changed

6 files changed

+72
-120
lines changed

etl/src/replication/table_sync.rs

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ use crate::metrics::{
2626
};
2727
use crate::replication::client::PgReplicationClient;
2828
use crate::replication::stream::TableCopyStream;
29-
use crate::state::table::RetryPolicy;
3029
use crate::state::table::{TableReplicationPhase, TableReplicationPhaseType};
3130
use crate::store::schema::SchemaStore;
3231
use crate::store::state::StateStore;
@@ -192,19 +191,6 @@ where
192191
.await?;
193192

194193
if !table_schema.has_primary_keys() {
195-
store
196-
.update_table_replication_state(
197-
table_id,
198-
TableReplicationPhase::Errored {
199-
reason: "The table has no primary keys".to_string(),
200-
solution: Some(format!(
201-
"You should set at least one primary key on the table {table_id}"
202-
)),
203-
retry_policy: RetryPolicy::ManualRetry,
204-
},
205-
)
206-
.await?;
207-
208194
bail!(
209195
ErrorKind::SourceSchemaError,
210196
"Missing primary key",

etl/src/workers/pool.rs

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ pub struct TableSyncWorkerPoolInner {
2121
/// Completed or failed table sync workers, preserving history for inspection.
2222
finished: HashMap<TableId, Vec<TableSyncWorkerHandle>>,
2323
/// Notification mechanism for pool state changes.
24-
pool_update: Option<Arc<Notify>>,
24+
pool_update: Arc<Notify>,
2525
}
2626

2727
impl TableSyncWorkerPoolInner {
@@ -33,7 +33,7 @@ impl TableSyncWorkerPoolInner {
3333
Self {
3434
active: HashMap::new(),
3535
finished: HashMap::new(),
36-
pool_update: None,
36+
pool_update: Arc::new(Notify::new()),
3737
}
3838
}
3939

@@ -75,9 +75,7 @@ impl TableSyncWorkerPoolInner {
7575
pub fn mark_worker_finished(&mut self, table_id: TableId) {
7676
let removed_worker = self.active.remove(&table_id);
7777

78-
if let Some(waiting) = self.pool_update.take() {
79-
waiting.notify_one();
80-
}
78+
self.pool_update.notify_waiters();
8179

8280
if let Some(removed_worker) = removed_worker {
8381
self.finished
@@ -117,10 +115,7 @@ impl TableSyncWorkerPoolInner {
117115
// worker within the `ReactiveFuture` will not be able to hold the lock onto the pool to
118116
// mark itself as finished.
119117
if !self.active.is_empty() {
120-
let notify = Arc::new(Notify::new());
121-
self.pool_update = Some(notify.clone());
122-
123-
return Ok(Some(notify));
118+
return Ok(Some(self.pool_update.clone()));
124119
}
125120

126121
let mut errors = Vec::new();

etl/src/workers/table_sync.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -406,15 +406,15 @@ where
406406
let store = self.store.clone();
407407
let config = self.config.clone();
408408

409-
// Clone all the fields we need for retries
409+
// Clone all the fields we need for retries.
410410
let pipeline_id = self.pipeline_id;
411411
let destination = self.destination.clone();
412412
let shutdown_rx = self.shutdown_rx.clone();
413413
let force_syncing_tables_tx = self.force_syncing_tables_tx.clone();
414414
let run_permit = self.run_permit.clone();
415415

416416
loop {
417-
// Recreate the worker for each attempt
417+
// Recreate the worker for each attempt.
418418
let worker = TableSyncWorker {
419419
pipeline_id,
420420
config: config.clone(),
@@ -431,7 +431,7 @@ where
431431

432432
match result {
433433
Ok(_) => {
434-
// Worker completed successfully, mark as finished
434+
// Worker completed successfully, mark as finished.
435435
let mut pool = pool.lock().await;
436436
pool.mark_worker_finished(table_id);
437437

@@ -440,16 +440,16 @@ where
440440
Err(err) => {
441441
error!("table sync worker failed for table {}: {}", table_id, err);
442442

443-
// Convert error to table replication error to determine retry policy
443+
// Convert error to table replication error to determine retry policy.
444444
let table_error =
445445
TableReplicationError::from_etl_error(&config, table_id, &err);
446446
let retry_policy = table_error.retry_policy().clone();
447447

448-
// We lock both the pool and the table sync worker state to be consistent
448+
// We lock both the pool and the table sync worker state to be consistent.
449449
let mut pool_guard = pool.lock().await;
450450
let mut state_guard = state.lock().await;
451451

452-
// Update the state and store with the error
452+
// Update the state and store with the error.
453453
if let Err(err) = state_guard.set_and_store(table_error.into(), &store).await {
454454
error!(
455455
"failed to update table sync worker state for table {}: {}",
@@ -488,7 +488,7 @@ where
488488
);
489489
}
490490

491-
// Before rolling back, we acquire the pool lock again for consistency
491+
// Before rolling back, we acquire the pool lock again for consistency.
492492
let mut pool_guard = pool.lock().await;
493493

494494
// After sleeping, we rollback to the previous state and retry.

etl/tests/integration/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
mod no_primary_key_test;
21
mod pipeline_test;
32
mod postgres_store;
43
mod replica_identity;

etl/tests/integration/no_primary_key_test.rs

Lines changed: 0 additions & 89 deletions
This file was deleted.

etl/tests/integration/pipeline_test.rs

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use etl::destination::memory::MemoryDestination;
2+
use etl::error::ErrorKind;
23
use etl::state::table::TableReplicationPhaseType;
34
use etl::test_utils::database::{spawn_source_database, test_table_name};
45
use etl::test_utils::event::group_events_by_type_and_table_id;
@@ -931,3 +932,63 @@ async fn table_processing_with_schema_change_errors_table() {
931932
);
932933
assert_events_equal(orders_inserts, &expected_orders_inserts);
933934
}
935+
936+
#[tokio::test(flavor = "multi_thread")]
937+
async fn table_without_primary_key_is_errored() {
938+
init_test_tracing();
939+
let database = spawn_source_database().await;
940+
941+
let table_name = test_table_name("no_primary_key_table");
942+
let table_id = database
943+
.create_table(table_name.clone(), false, &[("name", "text")])
944+
.await
945+
.unwrap();
946+
947+
let publication_name = "test_pub".to_string();
948+
database
949+
.create_publication(&publication_name, std::slice::from_ref(&table_name))
950+
.await
951+
.expect("Failed to create publication");
952+
953+
// Insert a row to later check that this doesn't appear in destination's table rows.
954+
database
955+
.insert_values(table_name.clone(), &["name"], &[&"abc"])
956+
.await
957+
.unwrap();
958+
959+
let state_store = NotifyingStore::new();
960+
let destination = TestDestinationWrapper::wrap(MemoryDestination::new());
961+
962+
let pipeline_id: PipelineId = random();
963+
let mut pipeline = create_pipeline(
964+
&database.config,
965+
pipeline_id,
966+
publication_name,
967+
state_store.clone(),
968+
destination.clone(),
969+
);
970+
971+
// We wait for the table to be errored.
972+
let errored_state = state_store
973+
.notify_on_table_state(table_id, TableReplicationPhaseType::Errored)
974+
.await;
975+
976+
pipeline.start().await.unwrap();
977+
978+
// Insert a row to later check that it is not processed by the apply worker.
979+
database
980+
.insert_values(table_name.clone(), &["name"], &[&"abc1"])
981+
.await
982+
.unwrap();
983+
984+
errored_state.notified().await;
985+
986+
// Wait for the pipeline expecting an error to be returned.
987+
let err = pipeline.shutdown_and_wait().await.err().unwrap();
988+
assert_eq!(err.kinds().len(), 1);
989+
assert_eq!(err.kinds()[0], ErrorKind::SourceSchemaError);
990+
991+
// We expect no events to be saved.
992+
let events = destination.get_events().await;
993+
assert!(events.is_empty());
994+
}

0 commit comments

Comments
 (0)