Skip to content

Commit fa12150

Browse files
Fix race condition related to schema initialization
1 parent 3b4eb6d commit fa12150

File tree

2 files changed

+54
-15
lines changed

2 files changed

+54
-15
lines changed

etl/src/replication/client.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -379,12 +379,19 @@ impl PgReplicationClient {
379379
}
380380

381381
/// Retrieves the names of all tables included in a publication.
382+
///
383+
/// Note: This excludes parent partitioned tables since they contain no data
384+
/// and should not be processed. Only child partitions and regular tables are returned.
382385
pub async fn get_publication_table_names(
383386
&self,
384387
publication_name: &str,
385388
) -> EtlResult<Vec<TableName>> {
386389
let publication_query = format!(
387-
"select schemaname, tablename from pg_publication_tables where pubname = {};",
390+
"select pt.schemaname, pt.tablename from pg_publication_tables pt
391+
join pg_class c on c.relname = pt.tablename
392+
join pg_namespace n on n.oid = c.relnamespace AND n.nspname = pt.schemaname
393+
where pt.pubname = {}
394+
and c.relkind != 'p'", /* Exclude parent partitioned tables */
388395
quote_literal(publication_name)
389396
);
390397

@@ -406,6 +413,10 @@ impl PgReplicationClient {
406413
}
407414

408415
/// Retrieves the OIDs of all tables included in a publication.
416+
///
417+
/// Note: This excludes parent partitioned tables (relkind = 'p') since they
418+
/// contain no data and should not be processed. Only child partitions and
419+
/// regular tables are returned.
409420
pub async fn get_publication_table_ids(
410421
&self,
411422
publication_name: &str,
@@ -414,7 +425,8 @@ impl PgReplicationClient {
414425
"select c.oid from pg_publication_tables pt
415426
join pg_class c on c.relname = pt.tablename
416427
join pg_namespace n on n.oid = c.relnamespace AND n.nspname = pt.schemaname
417-
where pt.pubname = {};",
428+
where pt.pubname = {}
429+
and c.relkind != 'p'", /* Exclude parent partitioned tables */
418430
quote_literal(publication_name)
419431
);
420432

etl/tests/integration/partitioned_table_test.rs

Lines changed: 40 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ async fn partitioned_table_sync_succeeds_with_inherited_primary_keys() {
2222
("p3", "FROM (200) TO (300)"),
2323
];
2424

25-
let (_parent_table_id, partition_table_ids) =
25+
let (parent_table_id, partition_table_ids) =
2626
create_partitioned_table(&database, table_name.clone(), &partition_specs)
2727
.await
2828
.expect("Failed to create partitioned table");
@@ -80,17 +80,44 @@ async fn partitioned_table_sync_succeeds_with_inherited_primary_keys() {
8080
);
8181

8282
let table_states = state_store.get_table_replication_states().await;
83-
for (table_id, state) in &table_states {
84-
if partition_table_ids.contains(table_id) {
85-
assert!(
86-
matches!(
87-
state.as_type(),
88-
TableReplicationPhaseType::SyncDone | TableReplicationPhaseType::Ready
89-
),
90-
"Partition {} should be in SyncDone or Ready state, but was in {:?}",
91-
table_id,
92-
state.as_type()
93-
);
94-
}
83+
84+
assert_eq!(
85+
table_states.len(),
86+
partition_table_ids.len(),
87+
"Expected {} partition states, but found {}",
88+
partition_table_ids.len(),
89+
table_states.len()
90+
);
91+
92+
for &partition_id in &partition_table_ids {
93+
let state = table_states
94+
.get(&partition_id)
95+
.expect(&format!("Partition {} should have a state", partition_id));
96+
assert!(
97+
matches!(
98+
state.as_type(),
99+
TableReplicationPhaseType::SyncDone | TableReplicationPhaseType::Ready
100+
),
101+
"Partition {} should be in SyncDone or Ready state, but was in {:?}",
102+
partition_id,
103+
state.as_type()
104+
);
95105
}
106+
107+
assert!(
108+
!table_states.contains_key(&parent_table_id),
109+
"Parent table {} should not be tracked since parent partitioned tables are excluded from processing",
110+
parent_table_id
111+
);
112+
113+
let parent_table_rows = table_rows
114+
.iter()
115+
.filter(|(table_id, _)| **table_id == parent_table_id)
116+
.map(|(_, rows)| rows.len())
117+
.sum::<usize>();
118+
assert_eq!(
119+
parent_table_rows, 0,
120+
"Parent table {} should have no data since it's excluded from processing and all data goes to partitions, but found {} rows",
121+
parent_table_id, parent_table_rows
122+
);
96123
}

0 commit comments

Comments
 (0)