Skip to content

Commit 2a41356

Browse files
authored
ref(tests): Fix flaky integration test (#314)
1 parent 07e27b5 commit 2a41356

File tree

5 files changed

+85
-45
lines changed

5 files changed

+85
-45
lines changed

AGENTS.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
## Build and Test
99
- Build: `cargo build --workspace --all-targets --all-features`.
1010
- Lint/format: `cargo fmt`; `cargo clippy --all-targets --all-features -- -D warnings`.
11+
- Use `ENABLE_TRACING=1` when running integration tests to see the logs.
12+
- Use `RUST_LOG=[log-level]` if you need to see the logs with a specific log level.
1113

1214
## Coding Style & Naming
1315
- Rust 2024 edition; keep formatter clean and warnings denied.

etl/src/test_utils/event.rs

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -41,15 +41,32 @@ pub fn group_events_by_type_and_table_id(
4141

4242
pub fn check_events_count(events: &[Event], conditions: Vec<(EventType, u64)>) -> bool {
4343
let grouped_events = group_events_by_type(events);
44-
for (event_type, count) in conditions {
45-
let Some(inner_events) = grouped_events.get(&event_type) else {
46-
return false;
47-
};
4844

49-
if inner_events.len() != count as usize {
50-
return false;
45+
conditions.into_iter().all(|(event_type, count)| {
46+
grouped_events
47+
.get(&event_type)
48+
.map(|inner| inner.len() == count as usize)
49+
.unwrap_or(false)
50+
})
51+
}
52+
53+
/// Returns a new Vec of events with duplicates removed.
54+
///
55+
/// Events that are not tied to a specific row (Begin/Commit/Relation/Truncate/Unsupported)
56+
/// are not de-duplicated and are preserved in order.
57+
/// Returns a new Vec of events with duplicates removed based on full equality of events.
58+
///
59+
/// Two events are considered the same if all their fields are equal. The first
60+
/// occurrence is kept and subsequent duplicates are dropped.
61+
///
62+
/// The rationale for having this method is that the pipeline doesn't guarantee exactly once delivery
63+
/// thus in some tests we might have to exclude duplicates while performing assertions.
64+
pub fn deduplicate_events(events: &[Event]) -> Vec<Event> {
65+
let mut result: Vec<Event> = Vec::with_capacity(events.len());
66+
for e in events.iter().cloned() {
67+
if !result.contains(&e) {
68+
result.push(e);
5169
}
5270
}
53-
54-
true
71+
result
5572
}

etl/src/test_utils/test_destination_wrapper.rs

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use tokio::sync::{Notify, RwLock};
77

88
use crate::destination::Destination;
99
use crate::error::EtlResult;
10-
use crate::test_utils::event::check_events_count;
10+
use crate::test_utils::event::{check_events_count, deduplicate_events};
1111
use crate::types::{Event, EventType, TableRow};
1212

1313
type EventCondition = Box<dyn Fn(&[Event]) -> bool + Send + Sync>;
@@ -100,6 +100,12 @@ impl<D> TestDestinationWrapper<D> {
100100
self.inner.read().await.events.clone()
101101
}
102102

103+
/// Get all events that have been written, de-duplicated by full event equality.
104+
pub async fn get_events_deduped(&self) -> Vec<Event> {
105+
let events = self.inner.read().await.events.clone();
106+
deduplicate_events(&events)
107+
}
108+
103109
/// Wait for a specific condition on events
104110
pub async fn notify_on_events<F>(&self, condition: F) -> Arc<Notify>
105111
where
@@ -114,12 +120,24 @@ impl<D> TestDestinationWrapper<D> {
114120
notify
115121
}
116122

117-
/// Wait for a specific number of events of given types
123+
/// Wait for a specific number of events of given types.
118124
pub async fn wait_for_events_count(&self, conditions: Vec<(EventType, u64)>) -> Arc<Notify> {
119125
self.notify_on_events(move |events| check_events_count(events, conditions.clone()))
120126
.await
121127
}
122128

129+
/// Wait for a specific number of events of given types after de-duplicating by full event equality.
130+
pub async fn wait_for_events_count_deduped(
131+
&self,
132+
conditions: Vec<(EventType, u64)>,
133+
) -> Arc<Notify> {
134+
self.notify_on_events(move |events| {
135+
let deduped = deduplicate_events(events);
136+
check_events_count(&deduped, conditions.clone())
137+
})
138+
.await
139+
}
140+
123141
pub async fn clear_events(&self) {
124142
let mut inner = self.inner.write().await;
125143
inner.events.clear();

etl/src/types/event.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ pub struct CommitEvent {
4949
pub struct RelationEvent {
5050
/// LSN position where the event started.
5151
pub start_lsn: PgLsn,
52-
/// LSN position where the event will commit.
52+
/// LSN position where the transaction of this event will commit.
5353
pub commit_lsn: PgLsn,
5454
/// Complete table schema including columns and types.
5555
pub table_schema: TableSchema,
@@ -63,7 +63,7 @@ pub struct RelationEvent {
6363
pub struct InsertEvent {
6464
/// LSN position where the event started.
6565
pub start_lsn: PgLsn,
66-
/// LSN position where the event will commit.
66+
/// LSN position where the transaction of this event will commit.
6767
pub commit_lsn: PgLsn,
6868
/// ID of the table where the row was inserted.
6969
pub table_id: TableId,
@@ -80,7 +80,7 @@ pub struct InsertEvent {
8080
pub struct UpdateEvent {
8181
/// LSN position where the event started.
8282
pub start_lsn: PgLsn,
83-
/// LSN position where the event will commit.
83+
/// LSN position where the transaction of this event will commit.
8484
pub commit_lsn: PgLsn,
8585
/// ID of the table where the row was updated.
8686
pub table_id: TableId,
@@ -100,11 +100,11 @@ pub struct UpdateEvent {
100100
/// information about the deleted row for proper cleanup in the destination system.
101101
#[derive(Debug, Clone, PartialEq)]
102102
pub struct DeleteEvent {
103-
/// LSN position where the event started
103+
/// LSN position where the event started.
104104
pub start_lsn: PgLsn,
105-
/// LSN position where the event will commit
105+
/// LSN position where the transaction of this event will commit.
106106
pub commit_lsn: PgLsn,
107-
/// ID of the table where the row was deleted
107+
/// ID of the table where the row was deleted.
108108
pub table_id: TableId,
109109
/// Data from the deleted row.
110110
///
@@ -123,7 +123,7 @@ pub struct DeleteEvent {
123123
pub struct TruncateEvent {
124124
/// LSN position where the event started.
125125
pub start_lsn: PgLsn,
126-
/// LSN position where the event will commit.
126+
/// LSN position where the transaction of this event will commit.
127127
pub commit_lsn: PgLsn,
128128
/// Truncate operation options from Postgres.
129129
pub options: i8,

etl/tests/pipeline.rs

Lines changed: 31 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,15 @@ async fn publication_changes_are_correctly_handled() {
149149

150150
let database = spawn_source_database().await;
151151

152+
if let Some(server_version) = database.server_version()
153+
&& server_version.get() <= 150000
154+
{
155+
println!(
156+
"Skipping test for PostgreSQL version <= 15, CREATE PUBLICATION FOR TABLES IN SCHEMA is not supported"
157+
);
158+
return;
159+
}
160+
152161
// Create two tables in the test schema and a publication for that schema.
153162
let table_1 = test_table_name("table_1");
154163
let table_1_id = database
@@ -192,15 +201,6 @@ async fn publication_changes_are_correctly_handled() {
192201
table_1_done.notified().await;
193202
table_2_done.notified().await;
194203

195-
if let Some(server_version) = database.server_version()
196-
&& server_version.get() <= 150000
197-
{
198-
println!(
199-
"Skipping test for PostgreSQL version <= 15, CREATE PUBLICATION FOR TABLES IN SCHEMA is not supported"
200-
);
201-
return;
202-
}
203-
204204
// Insert one row in each table and wait for two insert events.
205205
let inserts_notify = destination
206206
.wait_for_events_count(vec![(EventType::Insert, 2)])
@@ -215,10 +215,7 @@ async fn publication_changes_are_correctly_handled() {
215215
.unwrap();
216216
inserts_notify.notified().await;
217217

218-
// Shutdown pipeline before altering publication (by dropping one table)
219-
pipeline.shutdown_and_wait().await.unwrap();
220-
221-
// Drop table_2 so it's no longer part of the publication
218+
// Drop table_2 so it's no longer part of the publication.
222219
database
223220
.client
224221
.as_ref()
@@ -230,6 +227,10 @@ async fn publication_changes_are_correctly_handled() {
230227
.await
231228
.unwrap();
232229

230+
// Shutdown pipeline after the table was dropped. We do this to show that the dropping of a table
231+
// doesn't cause issues with the pipeline since the change is picked up on pipeline restart.
232+
pipeline.shutdown_and_wait().await.unwrap();
233+
233234
// Create table_3 which is going to be added to the publication.
234235
let table_3 = test_table_name("table_3");
235236
let table_3_id = database
@@ -258,7 +259,7 @@ async fn publication_changes_are_correctly_handled() {
258259
// Insert one row in table_1 and wait for it. (We wait for 4 inserts since it keeps the previous
259260
// ones).
260261
let inserts_notify = destination
261-
.wait_for_events_count(vec![(EventType::Insert, 4)])
262+
.wait_for_events_count_deduped(vec![(EventType::Insert, 4)])
262263
.await;
263264

264265
database
@@ -274,30 +275,32 @@ async fn publication_changes_are_correctly_handled() {
274275

275276
pipeline.shutdown_and_wait().await.unwrap();
276277

277-
// Assert that table_2 state is gone but destination data remains
278+
// Assert that table_2 state is gone but destination data remains.
278279
let states = store.get_table_replication_states().await;
279280
assert!(states.contains_key(&table_1_id));
280281
assert!(!states.contains_key(&table_2_id));
281282
assert!(states.contains_key(&table_3_id));
282283

283284
// The destination should have the 2 events of the first table, the 1 event of the removed table
284285
// and the 1 event of the new table.
285-
let events = destination.get_events().await;
286+
// Use de-duplicated events for assertions to be robust to potential duplicates
287+
// on restart where confirmed_flush_lsn may not have been stored.
288+
let events = destination.get_events_deduped().await;
286289
let grouped = group_events_by_type_and_table_id(&events);
287290
let table_1_inserts = grouped
288291
.get(&(EventType::Insert, table_1_id))
289292
.cloned()
290-
.unwrap_or_default();
293+
.unwrap();
291294
assert_eq!(table_1_inserts.len(), 2);
292295
let table_2_inserts = grouped
293296
.get(&(EventType::Insert, table_2_id))
294297
.cloned()
295-
.unwrap_or_default();
298+
.unwrap();
296299
assert_eq!(table_2_inserts.len(), 1);
297300
let table_3_inserts = grouped
298301
.get(&(EventType::Insert, table_3_id))
299302
.cloned()
300-
.unwrap_or_default();
303+
.unwrap();
301304
assert_eq!(table_3_inserts.len(), 1);
302305
}
303306

@@ -307,6 +310,15 @@ async fn publication_for_all_tables_in_schema_ignores_new_tables_until_restart()
307310

308311
let database = spawn_source_database().await;
309312

313+
if let Some(server_version) = database.server_version()
314+
&& server_version.get() <= 150000
315+
{
316+
println!(
317+
"Skipping test for PostgreSQL version <= 15, CREATE PUBLICATION FOR TABLES IN SCHEMA is not supported"
318+
);
319+
return;
320+
}
321+
310322
// Create first table.
311323
let table_1 = test_table_name("table_1");
312324
let table_1_id = database
@@ -364,15 +376,6 @@ async fn publication_for_all_tables_in_schema_ignores_new_tables_until_restart()
364376
assert!(table_schemas.contains_key(&table_1_id));
365377
assert!(!table_schemas.contains_key(&table_2_id));
366378

367-
if let Some(server_version) = database.server_version()
368-
&& server_version.get() <= 150000
369-
{
370-
println!(
371-
"Skipping test for PostgreSQL version <= 15, CREATE PUBLICATION FOR TABLES IN SCHEMA is not supported"
372-
);
373-
return;
374-
}
375-
376379
// We restart the pipeline and verify that the new table is now processed.
377380
let mut pipeline = create_pipeline(
378381
&database.config,

0 commit comments

Comments
 (0)