Skip to content

Commit 82436de

Browse files
Add test about dropping partitions
Signed-off-by: Abhi Agarwal <[email protected]>
1 parent 5ef0c6d commit 82436de

File tree

1 file changed

+95
-0
lines changed

1 file changed

+95
-0
lines changed

etl/tests/partitioned_table_test.rs

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,3 +203,98 @@ async fn partitioned_table_copy_and_streams_new_data_from_new_partition() {
203203
.unwrap_or_default();
204204
assert_eq!(parent_inserts.len(), 1);
205205
}
206+
207+
/// Dropping a child partition must not emit DELETE/TRUNCATE events.
208+
#[tokio::test(flavor = "multi_thread")]
209+
async fn partition_drop_does_not_emit_delete_or_truncate() {
210+
init_test_tracing();
211+
let database = spawn_source_database().await;
212+
213+
let table_name = test_table_name("partitioned_events_drop");
214+
let partition_specs = [("p1", "from (1) to (100)"), ("p2", "from (100) to (200)")];
215+
216+
let (parent_table_id, _partition_table_ids) =
217+
create_partitioned_table(&database, table_name.clone(), &partition_specs)
218+
.await
219+
.expect("Failed to create partitioned table");
220+
221+
database
222+
.run_sql(&format!(
223+
"insert into {} (data, partition_key) values \
224+
('event1', 50), ('event2', 150)",
225+
table_name.as_quoted_identifier()
226+
))
227+
.await
228+
.unwrap();
229+
230+
let publication_name = "test_partitioned_pub_drop".to_string();
231+
database
232+
.create_publication(&publication_name, std::slice::from_ref(&table_name))
233+
.await
234+
.expect("Failed to create publication");
235+
236+
let state_store = NotifyingStore::new();
237+
let destination = TestDestinationWrapper::wrap(MemoryDestination::new());
238+
239+
let parent_sync_done = state_store
240+
.notify_on_table_state_type(parent_table_id, TableReplicationPhaseType::SyncDone)
241+
.await;
242+
243+
let pipeline_id: PipelineId = random();
244+
let mut pipeline = create_pipeline(
245+
&database.config,
246+
pipeline_id,
247+
publication_name,
248+
state_store.clone(),
249+
destination.clone(),
250+
);
251+
252+
pipeline.start().await.unwrap();
253+
parent_sync_done.notified().await;
254+
255+
let events_before = destination.get_events().await;
256+
let grouped_before = group_events_by_type_and_table_id(&events_before);
257+
let del_before = grouped_before
258+
.get(&(EventType::Delete, parent_table_id))
259+
.map(|v| v.len())
260+
.unwrap_or(0);
261+
let trunc_before = grouped_before
262+
.get(&(EventType::Truncate, parent_table_id))
263+
.map(|v| v.len())
264+
.unwrap_or(0);
265+
266+
// Detach and drop one child partition (DDL should not generate DML events)
267+
let child_p1_name = format!("{}_{}", table_name.name, "p1");
268+
let child_p1_qualified = format!("{}.{}", table_name.schema, child_p1_name);
269+
database
270+
.run_sql(&format!(
271+
"alter table {} detach partition {}",
272+
table_name.as_quoted_identifier(),
273+
child_p1_qualified
274+
))
275+
.await
276+
.unwrap();
277+
database
278+
.run_sql(&format!("drop table {}", child_p1_qualified))
279+
.await
280+
.unwrap();
281+
282+
let _ = pipeline.shutdown_and_wait().await;
283+
284+
let events_after = destination.get_events().await;
285+
let grouped_after = group_events_by_type_and_table_id(&events_after);
286+
let del_after = grouped_after
287+
.get(&(EventType::Delete, parent_table_id))
288+
.map(|v| v.len())
289+
.unwrap_or(0);
290+
let trunc_after = grouped_after
291+
.get(&(EventType::Truncate, parent_table_id))
292+
.map(|v| v.len())
293+
.unwrap_or(0);
294+
295+
assert_eq!(del_after, del_before, "Partition drop must not emit DELETE events");
296+
assert_eq!(
297+
trunc_after, trunc_before,
298+
"Partition drop must not emit TRUNCATE events"
299+
);
300+
}

0 commit comments

Comments
 (0)