Skip to content

Commit 25548da

Browse files
Create replication for partitioned table issue
1 parent 3a7ba8b commit 25548da

File tree

2 files changed

+218
-0
lines changed

2 files changed

+218
-0
lines changed

etl/tests/integration/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
mod partitioned_table_test;
12
mod pipeline_test;
23
mod postgres_store;
34
mod replica_identity;
Lines changed: 217 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,217 @@
1+
use etl::destination::memory::MemoryDestination;
2+
use etl::state::table::TableReplicationPhaseType;
3+
use etl::test_utils::database::{spawn_source_database, test_table_name};
4+
use etl::test_utils::notify::NotifyingStore;
5+
use etl::test_utils::pipeline::create_pipeline;
6+
use etl::test_utils::test_destination_wrapper::TestDestinationWrapper;
7+
use etl::types::PipelineId;
8+
use etl_postgres::tokio::test_utils::PgDatabase;
9+
use tokio_postgres::GenericClient;
10+
use etl_postgres::types::{TableId, TableName};
11+
use etl_telemetry::tracing::init_test_tracing;
12+
use rand::random;
13+
14+
/// Creates a partitioned table with the given name and partitions.
15+
///
16+
/// This function creates:
17+
/// 1. A parent partitioned table with a primary key
18+
/// 2. Several child partitions based on the provided partition specifications
19+
///
20+
/// Returns the table ID of the parent table and a list of partition table IDs.
21+
pub async fn create_partitioned_table<G: GenericClient>(
22+
database: &PgDatabase<G>,
23+
table_name: TableName,
24+
partition_specs: &[(&str, &str)], // (partition_name, partition_constraint)
25+
) -> Result<(TableId, Vec<TableId>), tokio_postgres::Error> {
26+
// Create the parent partitioned table with a primary key
27+
let create_parent_query = format!(
28+
"CREATE TABLE {} (
29+
id bigserial,
30+
data text NOT NULL,
31+
partition_key integer NOT NULL,
32+
PRIMARY KEY (id, partition_key)
33+
) PARTITION BY RANGE (partition_key)",
34+
table_name.as_quoted_identifier()
35+
);
36+
37+
database.run_sql(&create_parent_query).await?;
38+
39+
// Get the OID of the parent table
40+
let parent_row = database
41+
.client
42+
.as_ref()
43+
.unwrap()
44+
.query_one(
45+
"SELECT c.oid FROM pg_class c JOIN pg_namespace n ON n.oid = c.relnamespace
46+
WHERE n.nspname = $1 AND c.relname = $2",
47+
&[&table_name.schema, &table_name.name],
48+
)
49+
.await?;
50+
51+
let parent_table_id: TableId = parent_row.get(0);
52+
let mut partition_table_ids = Vec::new();
53+
54+
// Create child partitions
55+
for (partition_name, partition_constraint) in partition_specs {
56+
let partition_table_name = TableName::new(
57+
table_name.schema.clone(),
58+
format!("{}_{}", table_name.name, partition_name),
59+
);
60+
61+
let create_partition_query = format!(
62+
"CREATE TABLE {} PARTITION OF {} FOR VALUES {}",
63+
partition_table_name.as_quoted_identifier(),
64+
table_name.as_quoted_identifier(),
65+
partition_constraint
66+
);
67+
68+
database.run_sql(&create_partition_query).await?;
69+
70+
// Get the OID of the partition table
71+
let partition_row = database
72+
.client
73+
.as_ref()
74+
.unwrap()
75+
.query_one(
76+
"SELECT c.oid FROM pg_class c JOIN pg_namespace n ON n.oid = c.relnamespace
77+
WHERE n.nspname = $1 AND c.relname = $2",
78+
&[&partition_table_name.schema, &partition_table_name.name],
79+
)
80+
.await?;
81+
82+
let partition_table_id: TableId = partition_row.get(0);
83+
partition_table_ids.push(partition_table_id);
84+
}
85+
86+
Ok((parent_table_id, partition_table_ids))
87+
}
88+
89+
/// Test that verifies partitioned tables with inherited primary keys work correctly.
90+
///
91+
/// This test validates the fix for GitHub issue #296 where partitioned tables
92+
/// failed during sync because the ETL system didn't recognize that leaf partitions
93+
/// inherit primary key constraints from their parent table.
94+
#[tokio::test(flavor = "multi_thread")]
95+
async fn partitioned_table_sync_succeeds_with_inherited_primary_keys() {
96+
init_test_tracing();
97+
let database = spawn_source_database().await;
98+
99+
let table_name = test_table_name("partitioned_events");
100+
101+
// Create a partitioned table with three partitions based on partition_key ranges
102+
let partition_specs = [
103+
("p1", "FROM (1) TO (100)"),
104+
("p2", "FROM (100) TO (200)"),
105+
("p3", "FROM (200) TO (300)"),
106+
];
107+
108+
let (parent_table_id, partition_table_ids) = create_partitioned_table(
109+
&database,
110+
table_name.clone(),
111+
&partition_specs
112+
).await.expect("Failed to create partitioned table");
113+
114+
// Insert some test data into the partitioned table
115+
database
116+
.run_sql(&format!(
117+
"INSERT INTO {} (data, partition_key) VALUES
118+
('event1', 50), ('event2', 150), ('event3', 250)",
119+
table_name.as_quoted_identifier()
120+
))
121+
.await
122+
.unwrap();
123+
124+
let publication_name = "test_partitioned_pub".to_string();
125+
database
126+
.create_publication(&publication_name, std::slice::from_ref(&table_name))
127+
.await
128+
.expect("Failed to create publication");
129+
130+
let state_store = NotifyingStore::new();
131+
let destination = TestDestinationWrapper::wrap(MemoryDestination::new());
132+
133+
let pipeline_id: PipelineId = random();
134+
let mut pipeline = create_pipeline(
135+
&database.config,
136+
pipeline_id,
137+
publication_name,
138+
state_store.clone(),
139+
destination.clone(),
140+
);
141+
142+
pipeline.start().await.unwrap();
143+
144+
// Wait for all partitions to complete their sync (should succeed now)
145+
tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
146+
147+
// Check that all partitions successfully synced without primary key errors
148+
let table_states = state_store.get_table_replication_states().await;
149+
150+
let mut sync_done_count = 0;
151+
let mut error_count = 0;
152+
153+
for (table_id, state) in &table_states {
154+
match state.as_type() {
155+
TableReplicationPhaseType::SyncDone | TableReplicationPhaseType::Ready => {
156+
sync_done_count += 1;
157+
println!("✓ Partition {} successfully synced", table_id);
158+
}
159+
TableReplicationPhaseType::Errored => {
160+
error_count += 1;
161+
let reason = match state {
162+
etl::state::table::TableReplicationPhase::Errored { reason, .. } => reason,
163+
_ => unreachable!(),
164+
};
165+
println!("✗ Partition {} failed with error: {}", table_id, reason);
166+
}
167+
other_state => {
168+
println!("? Partition {} in state: {:?}", table_id, other_state);
169+
}
170+
}
171+
}
172+
173+
// Shutdown the pipeline (should succeed now)
174+
let shutdown_result = pipeline.shutdown_and_wait().await;
175+
176+
// Verify all 3 partitions successfully synced without errors
177+
assert_eq!(
178+
error_count, 0,
179+
"Expected no partitions to fail, but got {} errors",
180+
error_count
181+
);
182+
183+
assert!(
184+
sync_done_count >= 3,
185+
"Expected all 3 partitions to sync successfully, but only {} completed",
186+
sync_done_count
187+
);
188+
189+
// The pipeline should succeed (or fail for reasons other than primary key issues)
190+
if let Err(e) = shutdown_result {
191+
// If there's an error, it shouldn't be about missing primary keys
192+
let error_str = format!("{:?}", e);
193+
assert!(
194+
!error_str.contains("Missing primary key"),
195+
"Pipeline failed with primary key error despite fix: {}",
196+
error_str
197+
);
198+
}
199+
200+
// Verify that data was synced successfully
201+
let table_rows = destination.get_table_rows().await;
202+
203+
// We expect data to be synced from all 3 partitions
204+
let total_rows: usize = table_rows.values().map(|rows| rows.len()).sum();
205+
assert!(
206+
total_rows >= 3,
207+
"Expected at least 3 rows synced (one per partition), but got {}",
208+
total_rows
209+
);
210+
211+
println!(
212+
"✓ Successfully verified GitHub issue #296 fix: Partitioned table sync works with inherited primary keys"
213+
);
214+
println!("✓ Parent table ID: {}", parent_table_id);
215+
println!("✓ Successfully synced partition IDs: {:?}", partition_table_ids);
216+
println!("✓ Total rows synced: {}", total_rows);
217+
}

0 commit comments

Comments
 (0)