Skip to content

Commit 7c77ce9

Browse files
Fix race condition related to schema initialization
1 parent 880f3cd commit 7c77ce9

File tree

2 files changed

+54
-15
lines changed

2 files changed

+54
-15
lines changed

etl/src/replication/client.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -375,12 +375,19 @@ impl PgReplicationClient {
375375
}
376376

377377
/// Retrieves the names of all tables included in a publication.
378+
///
379+
/// Note: This excludes parent partitioned tables since they contain no data
380+
/// and should not be processed. Only child partitions and regular tables are returned.
378381
pub async fn get_publication_table_names(
379382
&self,
380383
publication_name: &str,
381384
) -> EtlResult<Vec<TableName>> {
382385
let publication_query = format!(
383-
"select schemaname, tablename from pg_publication_tables where pubname = {};",
386+
"select pt.schemaname, pt.tablename from pg_publication_tables pt
387+
join pg_class c on c.relname = pt.tablename
388+
join pg_namespace n on n.oid = c.relnamespace AND n.nspname = pt.schemaname
389+
where pt.pubname = {}
390+
and c.relkind != 'p'", /* Exclude parent partitioned tables */
384391
quote_literal(publication_name)
385392
);
386393

@@ -402,6 +409,10 @@ impl PgReplicationClient {
402409
}
403410

404411
/// Retrieves the OIDs of all tables included in a publication.
412+
///
413+
/// Note: This excludes parent partitioned tables (relkind = 'p') since they
414+
/// contain no data and should not be processed. Only child partitions and
415+
/// regular tables are returned.
405416
pub async fn get_publication_table_ids(
406417
&self,
407418
publication_name: &str,
@@ -410,7 +421,8 @@ impl PgReplicationClient {
410421
"select c.oid from pg_publication_tables pt
411422
join pg_class c on c.relname = pt.tablename
412423
join pg_namespace n on n.oid = c.relnamespace AND n.nspname = pt.schemaname
413-
where pt.pubname = {};",
424+
where pt.pubname = {}
425+
and c.relkind != 'p'", /* Exclude parent partitioned tables */
414426
quote_literal(publication_name)
415427
);
416428

etl/tests/integration/partitioned_table_test.rs

Lines changed: 40 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ async fn partitioned_table_sync_succeeds_with_inherited_primary_keys() {
2222
("p3", "FROM (200) TO (300)"),
2323
];
2424

25-
let (_parent_table_id, partition_table_ids) =
25+
let (parent_table_id, partition_table_ids) =
2626
create_partitioned_table(&database, table_name.clone(), &partition_specs)
2727
.await
2828
.expect("Failed to create partitioned table");
@@ -80,17 +80,44 @@ async fn partitioned_table_sync_succeeds_with_inherited_primary_keys() {
8080
);
8181

8282
let table_states = state_store.get_table_replication_states().await;
83-
for (table_id, state) in &table_states {
84-
if partition_table_ids.contains(table_id) {
85-
assert!(
86-
matches!(
87-
state.as_type(),
88-
TableReplicationPhaseType::SyncDone | TableReplicationPhaseType::Ready
89-
),
90-
"Partition {} should be in SyncDone or Ready state, but was in {:?}",
91-
table_id,
92-
state.as_type()
93-
);
94-
}
83+
84+
assert_eq!(
85+
table_states.len(),
86+
partition_table_ids.len(),
87+
"Expected {} partition states, but found {}",
88+
partition_table_ids.len(),
89+
table_states.len()
90+
);
91+
92+
for &partition_id in &partition_table_ids {
93+
let state = table_states
94+
.get(&partition_id)
95+
.expect(&format!("Partition {} should have a state", partition_id));
96+
assert!(
97+
matches!(
98+
state.as_type(),
99+
TableReplicationPhaseType::SyncDone | TableReplicationPhaseType::Ready
100+
),
101+
"Partition {} should be in SyncDone or Ready state, but was in {:?}",
102+
partition_id,
103+
state.as_type()
104+
);
95105
}
106+
107+
assert!(
108+
!table_states.contains_key(&parent_table_id),
109+
"Parent table {} should not be tracked since parent partitioned tables are excluded from processing",
110+
parent_table_id
111+
);
112+
113+
let parent_table_rows = table_rows
114+
.iter()
115+
.filter(|(table_id, _)| **table_id == parent_table_id)
116+
.map(|(_, rows)| rows.len())
117+
.sum::<usize>();
118+
assert_eq!(
119+
parent_table_rows, 0,
120+
"Parent table {} should have no data since it's excluded from processing and all data goes to partitions, but found {} rows",
121+
parent_table_id, parent_table_rows
122+
);
96123
}

0 commit comments

Comments
 (0)