From 5e3b2e3dfcec77506812283ce1e082f92de003b1 Mon Sep 17 00:00:00 2001 From: Dylan Date: Tue, 25 Mar 2025 23:11:09 +0800 Subject: [PATCH] fix: cherry-pick #27 (#41) --- .../src/arrow/caching_delete_file_loader.rs | 18 +++---- .../iceberg/src/arrow/delete_file_loader.rs | 13 +++-- crates/iceberg/src/arrow/delete_filter.rs | 53 ++++++++++--------- crates/iceberg/src/arrow/reader.rs | 52 +++++++++++------- crates/iceberg/src/delete_file_index.rs | 17 +++--- crates/iceberg/src/scan/context.rs | 3 ++ crates/iceberg/src/scan/mod.rs | 8 +++ crates/iceberg/src/scan/task.rs | 36 ++++++++++++- crates/iceberg/src/spec/manifest/mod.rs | 3 +- .../function_writer/equality_delta_writer.rs | 31 ++++++++--- .../iceberg/src/writer/function_writer/mod.rs | 1 + 11 files changed, 157 insertions(+), 78 deletions(-) diff --git a/crates/iceberg/src/arrow/caching_delete_file_loader.rs b/crates/iceberg/src/arrow/caching_delete_file_loader.rs index f0dece75a4..b1d892f996 100644 --- a/crates/iceberg/src/arrow/caching_delete_file_loader.rs +++ b/crates/iceberg/src/arrow/caching_delete_file_loader.rs @@ -19,14 +19,14 @@ use std::collections::HashMap; use arrow_array::{Int64Array, StringArray}; use futures::{StreamExt, TryStreamExt}; -use tokio::sync::oneshot::{Receiver, channel}; +use tokio::sync::oneshot::{channel, Receiver}; use super::delete_filter::DeleteFilter; use crate::arrow::delete_file_loader::BasicDeleteFileLoader; use crate::delete_vector::DeleteVector; use crate::expr::Predicate; use crate::io::FileIO; -use crate::scan::{ArrowRecordBatchStream, FileScanTaskDeleteFile}; +use crate::scan::{ArrowRecordBatchStream, FileScanTask}; use crate::spec::{DataContentType, SchemaRef}; use crate::{Error, ErrorKind, Result}; @@ -129,7 +129,7 @@ impl CachingDeleteFileLoader { /// ``` pub(crate) fn load_deletes( &self, - delete_file_entries: &[FileScanTaskDeleteFile], + delete_file_entries: &[FileScanTask], schema: SchemaRef, ) -> Receiver> { let (tx, rx) = channel(); @@ -195,30 +195,30 @@ impl CachingDeleteFileLoader { } async fn load_file_for_task( - task: &FileScanTaskDeleteFile, + task: &FileScanTask, basic_delete_file_loader: BasicDeleteFileLoader, del_filter: DeleteFilter, schema: SchemaRef, ) -> Result { - match task.file_type { + match task.data_file_content { DataContentType::PositionDeletes => Ok(DeleteFileContext::PosDels( basic_delete_file_loader - .parquet_to_batch_stream(&task.file_path) + .parquet_to_batch_stream(task.data_file_path()) .await?, )), DataContentType::EqualityDeletes => { - let Some(notify) = del_filter.try_start_eq_del_load(&task.file_path) else { + let Some(notify) = del_filter.try_start_eq_del_load(task.data_file_path()) else { return Ok(DeleteFileContext::ExistingEqDel); }; let (sender, receiver) = channel(); - del_filter.insert_equality_delete(&task.file_path, receiver); + del_filter.insert_equality_delete(task.data_file_path(), receiver); Ok(DeleteFileContext::FreshEqDel { batch_stream: BasicDeleteFileLoader::evolve_schema( basic_delete_file_loader - .parquet_to_batch_stream(&task.file_path) + .parquet_to_batch_stream(task.data_file_path()) .await?, schema, ) diff --git a/crates/iceberg/src/arrow/delete_file_loader.rs b/crates/iceberg/src/arrow/delete_file_loader.rs index 592ef2eb4a..80d5623e7b 100644 --- a/crates/iceberg/src/arrow/delete_file_loader.rs +++ b/crates/iceberg/src/arrow/delete_file_loader.rs @@ -115,6 +115,7 @@ mod tests { use super::*; use crate::arrow::delete_filter::tests::setup; + use crate::spec::DataContentType; #[tokio::test] async fn test_basic_delete_file_loader_read_delete_file() { @@ -129,11 +130,15 @@ mod tests { let file_scan_tasks = setup(table_location); + let delete_task = FileScanTaskDeleteFile { + file_path: file_scan_tasks[0].deletes[0].data_file_path.clone(), + file_type: DataContentType::PositionDeletes, + partition_spec_id: 0, + equality_ids: vec![], + }; + let result = delete_file_loader - .read_delete_file( - &file_scan_tasks[0].deletes[0], - file_scan_tasks[0].schema_ref(), - ) + .read_delete_file(&delete_task, file_scan_tasks[0].schema_ref()) .await .unwrap(); diff --git a/crates/iceberg/src/arrow/delete_filter.rs b/crates/iceberg/src/arrow/delete_filter.rs index 0dd53a34fa..d82f6827aa 100644 --- a/crates/iceberg/src/arrow/delete_filter.rs +++ b/crates/iceberg/src/arrow/delete_filter.rs @@ -18,13 +18,13 @@ use std::collections::HashMap; use std::sync::{Arc, Mutex, RwLock}; -use tokio::sync::Notify; use tokio::sync::oneshot::Receiver; +use tokio::sync::Notify; use crate::delete_vector::DeleteVector; use crate::expr::Predicate::AlwaysTrue; use crate::expr::{Bind, BoundPredicate, Predicate}; -use crate::scan::{FileScanTask, FileScanTaskDeleteFile}; +use crate::scan::FileScanTask; use crate::spec::DataContentType; use crate::{Error, ErrorKind, Result}; @@ -120,14 +120,14 @@ impl DeleteFilter { } let Some(predicate) = self - .get_equality_delete_predicate_for_delete_file_path(&delete.file_path) + .get_equality_delete_predicate_for_delete_file_path(delete.data_file_path()) .await else { return Err(Error::new( ErrorKind::Unexpected, format!( "Missing predicate for equality delete file '{}'", - delete.file_path + delete.data_file_path() ), )); }; @@ -190,8 +190,8 @@ impl DeleteFilter { } } -pub(crate) fn is_equality_delete(f: &FileScanTaskDeleteFile) -> bool { - matches!(f.file_type, DataContentType::EqualityDeletes) +pub(crate) fn is_equality_delete(f: &FileScanTask) -> bool { + matches!(f.data_file_content, DataContentType::EqualityDeletes) } #[cfg(test)] @@ -307,24 +307,19 @@ pub(crate) mod tests { writer.close().unwrap(); } - let pos_del_1 = FileScanTaskDeleteFile { - file_path: format!("{}/pos-del-1.parquet", table_location.to_str().unwrap()), - file_type: DataContentType::PositionDeletes, - partition_spec_id: 0, - equality_ids: vec![], - }; - - let pos_del_2 = FileScanTaskDeleteFile { - file_path: format!("{}/pos-del-2.parquet", table_location.to_str().unwrap()), - file_type: DataContentType::PositionDeletes, - partition_spec_id: 0, - equality_ids: vec![], - }; - - let pos_del_3 = FileScanTaskDeleteFile { - file_path: format!("{}/pos-del-3.parquet", table_location.to_str().unwrap()), - file_type: DataContentType::PositionDeletes, - partition_spec_id: 0, + // Helper to build a positional delete task with minimal fields + let make_pos_del_task = |n: u8| FileScanTask { + start: 0, + length: 0, + record_count: None, + data_file_path: format!("{}/pos-del-{}.parquet", table_location.to_str().unwrap(), n), + data_file_content: DataContentType::PositionDeletes, + data_file_format: DataFileFormat::Parquet, + schema: data_file_schema.clone(), + project_field_ids: vec![], + predicate: None, + deletes: vec![], + sequence_number: 0, equality_ids: vec![], }; @@ -334,22 +329,28 @@ pub(crate) mod tests { length: 0, record_count: None, data_file_path: format!("{}/1.parquet", table_location.to_str().unwrap()), + data_file_content: DataContentType::Data, data_file_format: DataFileFormat::Parquet, schema: data_file_schema.clone(), project_field_ids: vec![], predicate: None, - deletes: vec![pos_del_1, pos_del_2.clone()], + deletes: vec![make_pos_del_task(1), make_pos_del_task(2)], + sequence_number: 0, + equality_ids: vec![], }, FileScanTask { start: 0, length: 0, record_count: None, data_file_path: format!("{}/2.parquet", table_location.to_str().unwrap()), + data_file_content: DataContentType::Data, data_file_format: DataFileFormat::Parquet, schema: data_file_schema.clone(), project_field_ids: vec![], predicate: None, - deletes: vec![pos_del_3], + deletes: vec![make_pos_del_task(3)], + sequence_number: 0, + equality_ids: vec![], }, ]; diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index 4327184058..35b8ac4709 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -33,12 +33,12 @@ use arrow_string::like::starts_with; use bytes::Bytes; use fnv::FnvHashSet; use futures::future::BoxFuture; -use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt, try_join}; +use futures::{try_join, FutureExt, StreamExt, TryFutureExt, TryStreamExt}; use parquet::arrow::arrow_reader::{ ArrowPredicateFn, ArrowReaderOptions, RowFilter, RowSelection, RowSelector, }; use parquet::arrow::async_reader::AsyncFileReader; -use parquet::arrow::{PARQUET_FIELD_ID_META_KEY, ParquetRecordBatchStreamBuilder, ProjectionMask}; +use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask, PARQUET_FIELD_ID_META_KEY}; use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader, RowGroupMetaData}; use parquet::schema::types::{SchemaDescriptor, Type as ParquetType}; @@ -47,13 +47,13 @@ use crate::arrow::record_batch_transformer::RecordBatchTransformer; use crate::arrow::{arrow_schema_to_schema, get_arrow_datum}; use crate::delete_vector::DeleteVector; use crate::error::Result; -use crate::expr::visitors::bound_predicate_visitor::{BoundPredicateVisitor, visit}; +use crate::expr::visitors::bound_predicate_visitor::{visit, BoundPredicateVisitor}; use crate::expr::visitors::page_index_evaluator::PageIndexEvaluator; use crate::expr::visitors::row_group_metrics_evaluator::RowGroupMetricsEvaluator; use crate::expr::{BoundPredicate, BoundReference}; use crate::io::{FileIO, FileMetadata, FileRead}; use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream}; -use crate::spec::{Datum, NestedField, PrimitiveType, Schema, Type}; +use crate::spec::{DataContentType, Datum, NestedField, PrimitiveType, Schema, Type}; use crate::utils::available_parallelism; use crate::{Error, ErrorKind}; @@ -312,13 +312,16 @@ impl ArrowReader { // Build the batch stream and send all the RecordBatches that it generates // to the requester. - let record_batch_stream = - record_batch_stream_builder - .build()? - .map(move |batch| match batch { + let record_batch_stream = record_batch_stream_builder.build()?.map(move |batch| { + if matches!(task.data_file_content, DataContentType::PositionDeletes) { + Ok(batch?) + } else { + match batch { Ok(batch) => record_batch_transformer.process_record_batch(batch), Err(err) => Err(err.into()), - }); + } + } + }); Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream) } @@ -1443,7 +1446,6 @@ mod tests { use roaring::RoaringTreemap; use tempfile::TempDir; - use crate::ErrorKind; use crate::arrow::reader::{CollectFieldIdVisitor, PARQUET_FIELD_ID_META_KEY}; use crate::arrow::{ArrowReader, ArrowReaderBuilder}; use crate::delete_vector::DeleteVector; @@ -1451,7 +1453,8 @@ mod tests { use crate::expr::{Bind, Predicate, Reference}; use crate::io::FileIO; use crate::scan::{FileScanTask, FileScanTaskStream}; - use crate::spec::{DataFileFormat, Datum, NestedField, PrimitiveType, Schema, SchemaRef, Type}; + use crate::spec::{DataContentType, DataFileFormat, Datum, NestedField, PrimitiveType, Schema, SchemaRef, Type}; + use crate::ErrorKind; fn table_schema_simple() -> SchemaRef { Arc::new( @@ -1740,11 +1743,14 @@ message schema { length: 0, record_count: None, data_file_path: format!("{}/1.parquet", table_location), + data_file_content: DataContentType::Data, data_file_format: DataFileFormat::Parquet, schema: schema.clone(), project_field_ids: vec![1], predicate: Some(predicate.bind(schema, true).unwrap()), deletes: vec![], + sequence_number: 0, + equality_ids: vec![], })] .into_iter(), )) as FileScanTaskStream; @@ -1774,19 +1780,25 @@ message schema { let schema = Arc::new( Schema::builder() .with_schema_id(1) - .with_fields(vec![ - NestedField::optional(1, "a", Type::Primitive(PrimitiveType::String)).into(), - ]) + .with_fields(vec![NestedField::optional( + 1, + "a", + Type::Primitive(PrimitiveType::String), + ) + .into()]) .build() .unwrap(), ); - let arrow_schema = Arc::new(ArrowSchema::new(vec![ - Field::new("a", col_a_type.clone(), true).with_metadata(HashMap::from([( - PARQUET_FIELD_ID_META_KEY.to_string(), - "1".to_string(), - )])), - ])); + let arrow_schema = Arc::new(ArrowSchema::new(vec![Field::new( + "a", + col_a_type.clone(), + true, + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "1".to_string(), + )]))])); let tmp_dir = TempDir::new().unwrap(); let table_location = tmp_dir.path().to_str().unwrap().to_string(); diff --git a/crates/iceberg/src/delete_file_index.rs b/crates/iceberg/src/delete_file_index.rs index d8f7a872e1..71bf169dc6 100644 --- a/crates/iceberg/src/delete_file_index.rs +++ b/crates/iceberg/src/delete_file_index.rs @@ -19,12 +19,12 @@ use std::collections::HashMap; use std::ops::Deref; use std::sync::{Arc, RwLock}; +use futures::channel::mpsc::{channel, Sender}; use futures::StreamExt; -use futures::channel::mpsc::{Sender, channel}; use tokio::sync::Notify; use crate::runtime::spawn; -use crate::scan::{DeleteFileContext, FileScanTaskDeleteFile}; +use crate::scan::{DeleteFileContext, FileScanTask}; use crate::spec::{DataContentType, DataFile, Struct}; /// Index of delete files @@ -85,7 +85,7 @@ impl DeleteFileIndex { &self, data_file: &DataFile, seq_num: Option, - ) -> Vec { + ) -> Vec { let notifier = { let guard = self.state.read().unwrap(); match *guard { @@ -132,10 +132,11 @@ impl PopulatedDeleteFileIndex { // The spec states that "Equality delete files stored with an unpartitioned spec are applied as global deletes". if partition.fields().is_empty() { // TODO: confirm we're good to skip here if we encounter a pos del - if arc_ctx.manifest_entry.content_type() != DataContentType::PositionDeletes { - global_deletes.push(arc_ctx); - return; - } + // FIXME(Dylan): allow putting position delete to global deletes. + // if arc_ctx.manifest_entry.content_type() != DataContentType::PositionDeletes { + global_deletes.push(arc_ctx); + return; + // } } let destination_map = match arc_ctx.manifest_entry.content_type() { @@ -164,7 +165,7 @@ impl PopulatedDeleteFileIndex { &self, data_file: &DataFile, seq_num: Option, - ) -> Vec { + ) -> Vec { let mut results = vec![]; self.global_deletes diff --git a/crates/iceberg/src/scan/context.rs b/crates/iceberg/src/scan/context.rs index 1f7d2827e2..0e60ca3e18 100644 --- a/crates/iceberg/src/scan/context.rs +++ b/crates/iceberg/src/scan/context.rs @@ -127,6 +127,7 @@ impl ManifestEntryContext { record_count: Some(self.manifest_entry.record_count()), data_file_path: self.manifest_entry.file_path().to_string(), + data_file_content: self.manifest_entry.data_file().content_type(), data_file_format: self.manifest_entry.file_format(), schema: self.snapshot_schema, @@ -136,6 +137,8 @@ impl ManifestEntryContext { .map(|x| x.as_ref().snapshot_bound_predicate.clone()), deletes, + sequence_number: self.manifest_entry.sequence_number().unwrap_or(0), + equality_ids: self.manifest_entry.data_file().equality_ids().to_vec(), }) } } diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs index fc98e1f6ce..0b4c3976e4 100644 --- a/crates/iceberg/src/scan/mod.rs +++ b/crates/iceberg/src/scan/mod.rs @@ -585,6 +585,8 @@ impl TableScan { .send(DeleteFileContext { manifest_entry: manifest_entry_context.manifest_entry.clone(), partition_spec_id: manifest_entry_context.partition_spec_id, + snapshot_schema: manifest_entry_context.snapshot_schema.clone(), + field_ids: manifest_entry_context.field_ids.clone(), }) .await?; @@ -1821,8 +1823,11 @@ pub mod tests { predicate: None, schema: schema.clone(), record_count: Some(100), + data_file_content: DataContentType::Data, data_file_format: DataFileFormat::Parquet, deletes: vec![], + sequence_number: 0, + equality_ids: vec![], }; test_fn(task); @@ -1835,8 +1840,11 @@ pub mod tests { predicate: Some(BoundPredicate::AlwaysTrue), schema, record_count: None, + data_file_content: DataContentType::Data, data_file_format: DataFileFormat::Avro, deletes: vec![], + sequence_number: 0, + equality_ids: vec![], }; test_fn(task); } diff --git a/crates/iceberg/src/scan/task.rs b/crates/iceberg/src/scan/task.rs index 7b111e4f04..8abffc4932 100644 --- a/crates/iceberg/src/scan/task.rs +++ b/crates/iceberg/src/scan/task.rs @@ -15,12 +15,14 @@ // specific language governing permissions and limitations // under the License. +use std::sync::Arc; + use futures::stream::BoxStream; use serde::{Deserialize, Serialize}; -use crate::Result; use crate::expr::BoundPredicate; use crate::spec::{DataContentType, DataFileFormat, ManifestEntryRef, Schema, SchemaRef}; +use crate::Result; /// A stream of [`FileScanTask`]. pub type FileScanTaskStream = BoxStream<'static, Result>; @@ -41,6 +43,9 @@ pub struct FileScanTask { /// The data file path corresponding to the task. pub data_file_path: String, + /// The content type of the file to scan. + pub data_file_content: DataContentType, + /// The format of the file to scan. pub data_file_format: DataFileFormat, @@ -53,7 +58,11 @@ pub struct FileScanTask { pub predicate: Option, /// The list of delete files that may need to be applied to this data file - pub deletes: Vec, + pub deletes: Vec, + /// sequence number + pub sequence_number: i64, + /// equality ids + pub equality_ids: Vec, } impl FileScanTask { @@ -87,6 +96,8 @@ impl FileScanTask { pub(crate) struct DeleteFileContext { pub(crate) manifest_entry: ManifestEntryRef, pub(crate) partition_spec_id: i32, + pub(crate) snapshot_schema: SchemaRef, + pub(crate) field_ids: Arc>, } impl From<&DeleteFileContext> for FileScanTaskDeleteFile { @@ -100,6 +111,27 @@ impl From<&DeleteFileContext> for FileScanTaskDeleteFile { } } +impl From<&DeleteFileContext> for FileScanTask { + fn from(ctx: &DeleteFileContext) -> Self { + FileScanTask { + start: 0, + length: ctx.manifest_entry.file_size_in_bytes(), + record_count: Some(ctx.manifest_entry.record_count()), + + data_file_path: ctx.manifest_entry.file_path().to_string(), + data_file_content: ctx.manifest_entry.content_type(), + data_file_format: ctx.manifest_entry.file_format(), + + schema: ctx.snapshot_schema.clone(), + project_field_ids: ctx.field_ids.to_vec(), + predicate: None, + deletes: vec![], + sequence_number: ctx.manifest_entry.sequence_number().unwrap_or(0), + equality_ids: ctx.manifest_entry.data_file().equality_ids().to_vec(), + } + } +} + /// A task to scan part of file. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub struct FileScanTaskDeleteFile { diff --git a/crates/iceberg/src/spec/manifest/mod.rs b/crates/iceberg/src/spec/manifest/mod.rs index ba7928f4f3..d0e8418066 100644 --- a/crates/iceberg/src/spec/manifest/mod.rs +++ b/crates/iceberg/src/spec/manifest/mod.rs @@ -1111,8 +1111,7 @@ mod tests { // Create a partition spec let partition_spec = PartitionSpec::builder(schema.clone()) .with_spec_id(1) - .add_partition_field("id", "id_partition", Transform::Identity) - .unwrap() + // No partition fields for this serialization round-trip test .build() .unwrap(); diff --git a/crates/iceberg/src/writer/function_writer/equality_delta_writer.rs b/crates/iceberg/src/writer/function_writer/equality_delta_writer.rs index f28dd44ce9..a668e8981c 100644 --- a/crates/iceberg/src/writer/function_writer/equality_delta_writer.rs +++ b/crates/iceberg/src/writer/function_writer/equality_delta_writer.rs @@ -255,12 +255,14 @@ where #[cfg(test)] mod test { + use std::collections::HashMap; use std::sync::Arc; use arrow_array::{Int32Array, Int64Array, RecordBatch, StringArray}; use arrow_schema::{DataType, Field, Schema as ArrowSchema}; use arrow_select::concat::concat_batches; use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; + use parquet::arrow::PARQUET_FIELD_ID_META_KEY; use parquet::file::properties::WriterProperties; use tempfile::TempDir; @@ -318,7 +320,7 @@ mod test { location_gen.clone(), file_name_gen.clone(), ); - DataFileWriterBuilder::new(pw.clone(), None, None) + DataFileWriterBuilder::new(pw.clone(), None, 0) }; let position_delete_writer_builder = { let pw = ParquetWriterBuilder::new( @@ -331,7 +333,7 @@ mod test { SortPositionDeleteWriterBuilder::new(pw.clone(), 100, None, None) }; let equality_delete_writer_builder = { - let config = EqualityDeleteWriterConfig::new(vec![1, 2], schema, None, None)?; + let config = EqualityDeleteWriterConfig::new(vec![1, 2], schema, None, 0)?; let pw = ParquetWriterBuilder::new( WriterProperties::builder().build(), arrow_schema_to_schema(config.projected_arrow_schema_ref()) @@ -355,9 +357,18 @@ mod test { // write data let schema = Arc::new(ArrowSchema::new(vec![ - Field::new("id", DataType::Int64, true), - Field::new("data", DataType::Utf8, true), - Field::new("op", DataType::Int32, false), + Field::new("id", DataType::Int64, true).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + 1.to_string(), + )])), + Field::new("data", DataType::Utf8, true).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + 2.to_string(), + )])), + Field::new("op", DataType::Int32, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + 3.to_string(), + )])), ])); { let id_array = Int64Array::from(vec![1, 2, 1, 3, 2, 3, 1]); @@ -388,8 +399,14 @@ mod test { assert_eq!(data_files.len(), 3); // data file let data_schema = Arc::new(ArrowSchema::new(vec![ - Field::new("id", DataType::Int64, true), - Field::new("data", DataType::Utf8, true), + Field::new("id", DataType::Int64, true).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + 1.to_string(), + )])), + Field::new("data", DataType::Utf8, true).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + 2.to_string(), + )])), ])); let data_file = data_files .iter() diff --git a/crates/iceberg/src/writer/function_writer/mod.rs b/crates/iceberg/src/writer/function_writer/mod.rs index 4d608d6571..58e2f7671c 100644 --- a/crates/iceberg/src/writer/function_writer/mod.rs +++ b/crates/iceberg/src/writer/function_writer/mod.rs @@ -17,5 +17,6 @@ //! This module contains the functional writer. +pub mod equality_delta_writer; pub mod fanout_partition_writer; pub mod precompute_partition_writer;