diff --git a/crates/iceberg/src/transaction.rs b/crates/iceberg/src/transaction.rs index b945929193..15d5c99a1a 100644 --- a/crates/iceberg/src/transaction.rs +++ b/crates/iceberg/src/transaction.rs @@ -177,6 +177,7 @@ impl<'a> Transaction<'a> { /// FastAppendAction is a transaction action for fast append data files to the table. pub struct FastAppendAction<'a> { snapshot_produce_action: SnapshotProduceAction<'a>, + check_duplicate: bool, } impl<'a> FastAppendAction<'a> { @@ -196,9 +197,16 @@ impl<'a> FastAppendAction<'a> { commit_uuid, snapshot_properties, )?, + check_duplicate: true, }) } + /// Set whether to check duplicate files + pub fn with_check_duplicate(mut self, v: bool) -> Self { + self.check_duplicate = v; + self + } + /// Add data files to the snapshot. pub fn add_data_files( &mut self, @@ -242,51 +250,53 @@ impl<'a> FastAppendAction<'a> { /// Finished building the action and apply it to the transaction. pub async fn apply(self) -> Result> { // Checks duplicate files - let new_files: HashSet<&str> = self - .snapshot_produce_action - .added_data_files - .iter() - .map(|df| df.file_path.as_str()) - .collect(); - - let mut manifest_stream = self - .snapshot_produce_action - .tx - .table - .inspect() - .manifests() - .scan() - .await?; - let mut referenced_files = Vec::new(); - - while let Some(batch) = manifest_stream.try_next().await? { - let file_path_array = batch - .column(1) - .as_any() - .downcast_ref::() - .ok_or_else(|| { - Error::new( - ErrorKind::DataInvalid, - "Failed to downcast file_path column to StringArray", - ) - })?; - - for i in 0..batch.num_rows() { - let file_path = file_path_array.value(i); - if new_files.contains(file_path) { - referenced_files.push(file_path.to_string()); + if self.check_duplicate { + let new_files: HashSet<&str> = self + .snapshot_produce_action + .added_data_files + .iter() + .map(|df| df.file_path.as_str()) + .collect(); + + let mut manifest_stream = self + .snapshot_produce_action + .tx + .table + .inspect() + .manifests() + .scan() + .await?; + let mut referenced_files = Vec::new(); + + while let Some(batch) = manifest_stream.try_next().await? { + let file_path_array = batch + .column(1) + .as_any() + .downcast_ref::() + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + "Failed to downcast file_path column to StringArray", + ) + })?; + + for i in 0..batch.num_rows() { + let file_path = file_path_array.value(i); + if new_files.contains(file_path) { + referenced_files.push(file_path.to_string()); + } } } - } - if !referenced_files.is_empty() { - return Err(Error::new( - ErrorKind::DataInvalid, - format!( - "Cannot add files that are already referenced by table, files: {}", - referenced_files.join(", ") - ), - )); + if !referenced_files.is_empty() { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Cannot add files that are already referenced by table, files: {}", + referenced_files.join(", ") + ), + )); + } } self.snapshot_produce_action