|
17 | 17 |
|
18 | 18 | use std::collections::{HashMap, HashSet}; |
19 | 19 |
|
20 | | -use arrow_array::StringArray; |
21 | | -use futures::TryStreamExt; |
22 | 20 | use uuid::Uuid; |
23 | 21 |
|
24 | 22 | use crate::error::Result; |
@@ -129,32 +127,19 @@ impl<'a> FastAppendAction<'a> { |
129 | 127 | .map(|df| df.file_path.as_str()) |
130 | 128 | .collect(); |
131 | 129 |
|
132 | | - let mut manifest_stream = self |
133 | | - .snapshot_produce_action |
134 | | - .tx |
135 | | - .current_table |
136 | | - .inspect() |
137 | | - .manifests() |
138 | | - .scan() |
139 | | - .await?; |
140 | 130 | let mut referenced_files = Vec::new(); |
141 | | - |
142 | | - while let Some(batch) = manifest_stream.try_next().await? { |
143 | | - let file_path_array = batch |
144 | | - .column(1) |
145 | | - .as_any() |
146 | | - .downcast_ref::<StringArray>() |
147 | | - .ok_or_else(|| { |
148 | | - Error::new( |
149 | | - ErrorKind::DataInvalid, |
150 | | - "Failed to downcast file_path column to StringArray", |
151 | | - ) |
152 | | - })?; |
153 | | - |
154 | | - for i in 0..batch.num_rows() { |
155 | | - let file_path = file_path_array.value(i); |
156 | | - if new_files.contains(file_path) { |
157 | | - referenced_files.push(file_path.to_string()); |
| 131 | + let table = &self.snapshot_produce_action.tx.current_table; |
| 132 | + if let Some(current_snapshot) = table.metadata().current_snapshot() { |
| 133 | + let manifest_list = current_snapshot |
| 134 | + .load_manifest_list(table.file_io(), &table.metadata_ref()) |
| 135 | + .await?; |
| 136 | + for manifest_list_entry in manifest_list.entries() { |
| 137 | + let manifest = manifest_list_entry.load_manifest(table.file_io()).await?; |
| 138 | + for entry in manifest.entries() { |
| 139 | + let file_path = entry.file_path(); |
| 140 | + if new_files.contains(file_path) && entry.is_alive() { |
| 141 | + referenced_files.push(file_path.to_string()); |
| 142 | + } |
158 | 143 | } |
159 | 144 | } |
160 | 145 | } |
@@ -364,81 +349,39 @@ mod tests { |
364 | 349 | } |
365 | 350 |
|
366 | 351 | #[tokio::test] |
367 | | - async fn test_add_existing_parquet_files_to_unpartitioned_table() { |
| 352 | + async fn test_add_duplicated_parquet_files_to_unpartitioned_table() { |
368 | 353 | let mut fixture = TableTestFixture::new_unpartitioned(); |
369 | 354 | fixture.setup_unpartitioned_manifest_files().await; |
370 | 355 | let tx = crate::transaction::Transaction::new(&fixture.table); |
371 | 356 |
|
372 | 357 | let file_paths = vec![ |
373 | 358 | format!("{}/1.parquet", &fixture.table_location), |
374 | | - format!("{}/2.parquet", &fixture.table_location), |
375 | 359 | format!("{}/3.parquet", &fixture.table_location), |
376 | 360 | ]; |
377 | 361 |
|
378 | 362 | let fast_append_action = tx.fast_append(None, vec![]).unwrap(); |
379 | 363 |
|
380 | | - // Attempt to add the existing Parquet files with fast append. |
381 | | - let new_tx = fast_append_action |
382 | | - .add_parquet_files(file_paths.clone()) |
383 | | - .await |
384 | | - .expect("Adding existing Parquet files should succeed"); |
385 | | - |
386 | | - let mut found_add_snapshot = false; |
387 | | - let mut found_set_snapshot_ref = false; |
388 | | - for update in new_tx.updates.iter() { |
389 | | - match update { |
390 | | - TableUpdate::AddSnapshot { .. } => { |
391 | | - found_add_snapshot = true; |
392 | | - } |
393 | | - TableUpdate::SetSnapshotRef { |
394 | | - ref_name, |
395 | | - reference, |
396 | | - } => { |
397 | | - found_set_snapshot_ref = true; |
398 | | - assert_eq!(ref_name, MAIN_BRANCH); |
399 | | - assert!(reference.snapshot_id > 0); |
400 | | - } |
401 | | - _ => {} |
402 | | - } |
403 | | - } |
404 | | - assert!(found_add_snapshot); |
405 | | - assert!(found_set_snapshot_ref); |
406 | | - |
407 | | - let new_snapshot = if let TableUpdate::AddSnapshot { snapshot } = &new_tx.updates[0] { |
408 | | - snapshot |
409 | | - } else { |
410 | | - panic!("Expected the first update to be an AddSnapshot update"); |
411 | | - }; |
412 | | - |
413 | | - let manifest_list = new_snapshot |
414 | | - .load_manifest_list(fixture.table.file_io(), fixture.table.metadata()) |
415 | | - .await |
416 | | - .expect("Failed to load manifest list"); |
417 | | - |
418 | | - assert_eq!( |
419 | | - manifest_list.entries().len(), |
420 | | - 2, |
421 | | - "Expected 2 manifest list entries, got {}", |
422 | | - manifest_list.entries().len() |
| 364 | + // Attempt to add duplicated Parquet files with fast append. |
| 365 | + assert!( |
| 366 | + fast_append_action |
| 367 | + .add_parquet_files(file_paths.clone()) |
| 368 | + .await |
| 369 | + .is_err(), |
| 370 | + "file already in table" |
423 | 371 | ); |
424 | 372 |
|
425 | | - // Load the manifest from the manifest list |
426 | | - let manifest = manifest_list.entries()[0] |
427 | | - .load_manifest(fixture.table.file_io()) |
428 | | - .await |
429 | | - .expect("Failed to load manifest"); |
| 373 | + let file_paths = vec![format!("{}/2.parquet", &fixture.table_location)]; |
430 | 374 |
|
431 | | - // Check that the manifest contains three entries. |
432 | | - assert_eq!(manifest.entries().len(), 3); |
| 375 | + let tx = crate::transaction::Transaction::new(&fixture.table); |
| 376 | + let fast_append_action = tx.fast_append(None, vec![]).unwrap(); |
433 | 377 |
|
434 | | - // Verify each file path appears in manifest. |
435 | | - let manifest_paths: Vec<String> = manifest |
436 | | - .entries() |
437 | | - .iter() |
438 | | - .map(|entry| entry.data_file().file_path.clone()) |
439 | | - .collect(); |
440 | | - for path in file_paths { |
441 | | - assert!(manifest_paths.contains(&path)); |
442 | | - } |
| 378 | + // Attempt to add Parquet file which was deleted from table. |
| 379 | + assert!( |
| 380 | + fast_append_action |
| 381 | + .add_parquet_files(file_paths.clone()) |
| 382 | + .await |
| 383 | + .is_ok(), |
| 384 | + "file not in table" |
| 385 | + ); |
443 | 386 | } |
444 | 387 | } |
0 commit comments