From b44757e3ee8f3cb3ae6f5de66b937ed8e80d06b3 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Thu, 9 Oct 2025 20:58:53 -0700 Subject: [PATCH 1/3] Add clustered and fanout writer --- .../writer/base_writer/data_file_writer.rs | 26 +- .../base_writer/equality_delete_writer.rs | 42 +- crates/iceberg/src/writer/mod.rs | 34 +- .../partitioning/clustered_data_writer.rs | 600 ++++++++++++++++ .../writer/partitioning/fanout_data_writer.rs | 676 ++++++++++++++++++ crates/iceberg/src/writer/partitioning/mod.rs | 49 ++ .../datafusion/src/physical_plan/write.rs | 7 +- 7 files changed, 1378 insertions(+), 56 deletions(-) create mode 100644 crates/iceberg/src/writer/partitioning/clustered_data_writer.rs create mode 100644 crates/iceberg/src/writer/partitioning/fanout_data_writer.rs create mode 100644 crates/iceberg/src/writer/partitioning/mod.rs diff --git a/crates/iceberg/src/writer/base_writer/data_file_writer.rs b/crates/iceberg/src/writer/base_writer/data_file_writer.rs index a950547d3..5de93f98f 100644 --- a/crates/iceberg/src/writer/base_writer/data_file_writer.rs +++ b/crates/iceberg/src/writer/base_writer/data_file_writer.rs @@ -30,7 +30,6 @@ use crate::{Error, ErrorKind, Result}; #[derive(Clone, Debug)] pub struct DataFileWriterBuilder { inner: RollingFileWriterBuilder, - partition_key: Option, } impl DataFileWriterBuilder @@ -40,14 +39,8 @@ where F: FileNameGenerator, { /// Create a new `DataFileWriterBuilder` using a `RollingFileWriterBuilder`. - pub fn new( - inner_builder: RollingFileWriterBuilder, - partition_key: Option, - ) -> Self { - Self { - inner: inner_builder, - partition_key, - } + pub fn new(inner: RollingFileWriterBuilder) -> Self { + Self { inner } } } @@ -60,10 +53,10 @@ where { type R = DataFileWriter; - async fn build(self) -> Result { + async fn build_with_partition(self, partition_key: Option) -> Result { Ok(DataFileWriter { inner: Some(self.inner.clone().build()), - partition_key: self.partition_key, + partition_key, }) } } @@ -194,8 +187,8 @@ mod test { file_name_gen, ); - let mut data_file_writer = DataFileWriterBuilder::new(rolling_file_writer_builder, None) - .build() + let mut data_file_writer = DataFileWriterBuilder::new(rolling_file_writer_builder) + .build_with_partition(None) .await .unwrap(); @@ -280,10 +273,9 @@ mod test { file_name_gen, ); - let mut data_file_writer = - DataFileWriterBuilder::new(rolling_file_writer_builder, Some(partition_key)) - .build() - .await?; + let mut data_file_writer = DataFileWriterBuilder::new(rolling_file_writer_builder) + .build_with_partition(Some(partition_key)) + .await?; let arrow_schema = arrow_schema::Schema::new(vec![ Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([( diff --git a/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs b/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs index 6740ed435..1d3bcc7db 100644 --- a/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs +++ b/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs @@ -66,16 +66,11 @@ pub struct EqualityDeleteWriterConfig { equality_ids: Vec, // Projector used to project the data chunk into specific fields. projector: RecordBatchProjector, - partition_key: Option, } impl EqualityDeleteWriterConfig { /// Create a new `DataFileWriterConfig` with equality ids. - pub fn new( - equality_ids: Vec, - original_schema: SchemaRef, - partition_key: Option, - ) -> Result { + pub fn new(equality_ids: Vec, original_schema: SchemaRef) -> Result { let original_arrow_schema = Arc::new(schema_to_arrow_schema(&original_schema)?); let projector = RecordBatchProjector::new( original_arrow_schema, @@ -110,7 +105,6 @@ impl EqualityDeleteWriterConfig { Ok(Self { equality_ids, projector, - partition_key, }) } @@ -129,12 +123,12 @@ where { type R = EqualityDeleteFileWriter; - async fn build(self) -> Result { + async fn build_with_partition(self, partition_key: Option) -> Result { Ok(EqualityDeleteFileWriter { - inner: Some(self.inner.clone().build()), // todo revisit this, probably still need a builder for rolling writer + inner: Some(self.inner.clone().build()), projector: self.config.projector, equality_ids: self.config.equality_ids, - partition_key: self.config.partition_key, + partition_key, }) } } @@ -428,7 +422,7 @@ mod test { let equality_ids = vec![0_i32, 8]; let equality_config = - EqualityDeleteWriterConfig::new(equality_ids, Arc::new(schema), None).unwrap(); + EqualityDeleteWriterConfig::new(equality_ids, Arc::new(schema)).unwrap(); let delete_schema = arrow_schema_to_schema(equality_config.projected_arrow_schema_ref()).unwrap(); let projector = equality_config.projector.clone(); @@ -444,7 +438,7 @@ mod test { ); let mut equality_delete_writer = EqualityDeleteFileWriterBuilder::new(rolling_writer_builder, equality_config) - .build() + .build_with_partition(None) .await?; // write @@ -531,19 +525,19 @@ mod test { .unwrap(), ); // Float and Double are not allowed to be used for equality delete - assert!(EqualityDeleteWriterConfig::new(vec![0], schema.clone(), None).is_err()); - assert!(EqualityDeleteWriterConfig::new(vec![1], schema.clone(), None).is_err()); + assert!(EqualityDeleteWriterConfig::new(vec![0], schema.clone()).is_err()); + assert!(EqualityDeleteWriterConfig::new(vec![1], schema.clone()).is_err()); // Struct is not allowed to be used for equality delete - assert!(EqualityDeleteWriterConfig::new(vec![3], schema.clone(), None).is_err()); + assert!(EqualityDeleteWriterConfig::new(vec![3], schema.clone()).is_err()); // Nested field of struct is allowed to be used for equality delete - assert!(EqualityDeleteWriterConfig::new(vec![4], schema.clone(), None).is_ok()); + assert!(EqualityDeleteWriterConfig::new(vec![4], schema.clone()).is_ok()); // Nested field of map is not allowed to be used for equality delete - assert!(EqualityDeleteWriterConfig::new(vec![7], schema.clone(), None).is_err()); - assert!(EqualityDeleteWriterConfig::new(vec![8], schema.clone(), None).is_err()); - assert!(EqualityDeleteWriterConfig::new(vec![9], schema.clone(), None).is_err()); + assert!(EqualityDeleteWriterConfig::new(vec![7], schema.clone()).is_err()); + assert!(EqualityDeleteWriterConfig::new(vec![8], schema.clone()).is_err()); + assert!(EqualityDeleteWriterConfig::new(vec![9], schema.clone()).is_err()); // Nested field of list is not allowed to be used for equality delete - assert!(EqualityDeleteWriterConfig::new(vec![10], schema.clone(), None).is_err()); - assert!(EqualityDeleteWriterConfig::new(vec![11], schema.clone(), None).is_err()); + assert!(EqualityDeleteWriterConfig::new(vec![10], schema.clone()).is_err()); + assert!(EqualityDeleteWriterConfig::new(vec![11], schema.clone()).is_err()); Ok(()) } @@ -597,7 +591,7 @@ mod test { .unwrap(), ); let equality_ids = vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13]; - let config = EqualityDeleteWriterConfig::new(equality_ids, schema.clone(), None).unwrap(); + let config = EqualityDeleteWriterConfig::new(equality_ids, schema.clone()).unwrap(); let delete_arrow_schema = config.projected_arrow_schema_ref().clone(); let delete_schema = arrow_schema_to_schema(&delete_arrow_schema).unwrap(); @@ -611,7 +605,7 @@ mod test { ); let mut equality_delete_writer = EqualityDeleteFileWriterBuilder::new(rolling_writer_builder, config) - .build() + .build_with_partition(None) .await?; // prepare data @@ -795,7 +789,7 @@ mod test { let to_write = RecordBatch::try_new(arrow_schema.clone(), columns).unwrap(); let equality_ids = vec![0_i32, 2, 5]; let equality_config = - EqualityDeleteWriterConfig::new(equality_ids, Arc::new(schema), None).unwrap(); + EqualityDeleteWriterConfig::new(equality_ids, Arc::new(schema)).unwrap(); let projector = equality_config.projector.clone(); // check diff --git a/crates/iceberg/src/writer/mod.rs b/crates/iceberg/src/writer/mod.rs index d5a8a6686..1427fcac7 100644 --- a/crates/iceberg/src/writer/mod.rs +++ b/crates/iceberg/src/writer/mod.rs @@ -100,10 +100,9 @@ //! ); //! //! // Create a data file writer using parquet file writer builder. -//! let data_file_writer_builder = -//! DataFileWriterBuilder::new(rolling_file_writer_builder, None); +//! let data_file_writer_builder = DataFileWriterBuilder::new(rolling_file_writer_builder); //! // Build the data file writer -//! let mut data_file_writer = data_file_writer_builder.build().await?; +//! let mut data_file_writer = data_file_writer_builder.build_with_partition(None).await?; //! //! // Write the data using data_file_writer... //! @@ -122,7 +121,7 @@ //! use arrow_array::RecordBatch; //! use iceberg::io::FileIOBuilder; //! use iceberg::memory::MemoryCatalogBuilder; -//! use iceberg::spec::DataFile; +//! use iceberg::spec::{DataFile, PartitionKey}; //! use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder; //! use iceberg::writer::file_writer::ParquetWriterBuilder; //! use iceberg::writer::file_writer::location_generator::{ @@ -149,9 +148,15 @@ //! impl IcebergWriterBuilder for LatencyRecordWriterBuilder { //! type R = LatencyRecordWriter; //! -//! async fn build(self) -> Result { +//! async fn build_with_partition( +//! self, +//! partition_key: Option, +//! ) -> Result { //! Ok(LatencyRecordWriter { -//! inner_writer: self.inner_writer_builder.build().await?, +//! inner_writer: self +//! .inner_writer_builder +//! .build_with_partition(partition_key) +//! .await?, //! }) //! } //! } @@ -231,12 +236,14 @@ //! ); //! //! // Create a data file writer builder using rolling file writer. -//! let data_file_writer_builder = -//! DataFileWriterBuilder::new(rolling_file_writer_builder, Some(partition_key)); +//! let data_file_writer_builder = DataFileWriterBuilder::new(rolling_file_writer_builder); //! // Create latency record writer using data file writer builder. //! let latency_record_builder = LatencyRecordWriterBuilder::new(data_file_writer_builder); //! // Build the final writer -//! let mut latency_record_data_file_writer = latency_record_builder.build().await.unwrap(); +//! let mut latency_record_data_file_writer = latency_record_builder +//! .build_with_partition(Some(partition_key)) +//! .await +//! .unwrap(); //! //! Ok(()) //! } @@ -244,11 +251,14 @@ pub mod base_writer; pub mod file_writer; +/// Provides partition-aware writers +/// TODO examples +pub mod partitioning; use arrow_array::RecordBatch; use crate::Result; -use crate::spec::DataFile; +use crate::spec::{DataFile, PartitionKey}; type DefaultInput = RecordBatch; type DefaultOutput = Vec; @@ -260,8 +270,8 @@ pub trait IcebergWriterBuilder: { /// The associated writer type. type R: IcebergWriter; - /// Build the iceberg writer. - async fn build(self) -> Result; + /// Build the iceberg writer for an optional partition key. + async fn build_with_partition(self, partition_key: Option) -> Result; } /// The iceberg writer used to write data to iceberg table. diff --git a/crates/iceberg/src/writer/partitioning/clustered_data_writer.rs b/crates/iceberg/src/writer/partitioning/clustered_data_writer.rs new file mode 100644 index 000000000..f8f458d74 --- /dev/null +++ b/crates/iceberg/src/writer/partitioning/clustered_data_writer.rs @@ -0,0 +1,600 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! This module provides the `ClusteredDataWriter` implementation. + +use std::collections::HashSet; + +use arrow_array::RecordBatch; +use async_trait::async_trait; + +use crate::spec::{DataFile, PartitionKey, Struct}; +use crate::writer::partitioning::PartitioningWriter; +use crate::writer::{IcebergWriter, IcebergWriterBuilder}; +use crate::{Error, ErrorKind, Result}; + +/// A writer that writes data to a single partition at a time. +#[derive(Clone)] +pub struct ClusteredDataWriter { + inner_builder: B, + current_writer: Option, + current_partition: Option, + closed_partitions: HashSet, + output: Vec, +} + +impl ClusteredDataWriter { + /// Create a new `ClusteredDataWriter`. + pub fn new(inner_builder: B) -> Self { + Self { + inner_builder, + current_writer: None, + current_partition: None, + closed_partitions: HashSet::new(), + output: Vec::new(), + } + } + + /// Closes the current writer if it exists, flushes the written data to output, and record closed partition. + async fn close_current_writer(&mut self) -> Result<()> { + if let Some(mut writer) = self.current_writer.take() { + self.output.extend(writer.close().await?); + + // Add the current partition to the set of closed partitions + if let Some(current_partition) = self.current_partition.take() { + self.closed_partitions.insert(current_partition); + } + } + + Ok(()) + } +} + +#[async_trait] +impl PartitioningWriter for ClusteredDataWriter { + async fn write( + &mut self, + partition_key: Option, + input: RecordBatch, + ) -> Result<()> { + if let Some(partition_key) = partition_key { + let partition_value = partition_key.data(); + + // Check if this partition has been closed already + if self.closed_partitions.contains(partition_value) { + return Err(Error::new( + ErrorKind::Unexpected, + format!( + "The input is not sorted! Cannot write to partition that was previously closed: {:?}", + partition_key + ), + )); + } + + // Check if we need to switch to a new partition + let need_new_writer = match &self.current_partition { + Some(current) => current != partition_value, + None => true, + }; + + if need_new_writer { + self.close_current_writer().await?; + + // Create a new writer for the new partition + self.current_writer = Some( + self.inner_builder + .clone() + .build_with_partition(Some(partition_key.clone())) + .await?, + ); + self.current_partition = Some(partition_value.clone()); + } + } else if self.current_writer.is_none() { + // Unpartitioned data, initialize the writer here + self.current_writer = Some( + self.inner_builder + .clone() + .build_with_partition(None) + .await?, + ); + } + + // do write + if let Some(writer) = &mut self.current_writer { + writer.write(input).await?; + Ok(()) + } else { + Err(Error::new( + ErrorKind::Unexpected, + "Writer is not initialized!", + )) + } + } + + async fn close(&mut self) -> Result> { + self.close_current_writer().await?; + + // Return all collected data files + Ok(std::mem::take(&mut self.output)) + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + use std::sync::Arc; + + use arrow_array::{Int32Array, StringArray}; + use arrow_schema::{DataType, Field, Schema}; + use parquet::arrow::PARQUET_FIELD_ID_META_KEY; + use parquet::file::properties::WriterProperties; + use tempfile::TempDir; + + use super::*; + use crate::io::FileIOBuilder; + use crate::spec::{DataFileFormat, NestedField, PrimitiveType, Type}; + use crate::writer::base_writer::data_file_writer::DataFileWriterBuilder; + use crate::writer::file_writer::ParquetWriterBuilder; + use crate::writer::file_writer::location_generator::{ + DefaultFileNameGenerator, DefaultLocationGenerator, + }; + use crate::writer::file_writer::rolling_writer::RollingFileWriterBuilder; + + #[tokio::test] + async fn test_clustered_writer_unpartitioned() -> Result<()> { + let temp_dir = TempDir::new()?; + let file_io = FileIOBuilder::new_fs_io().build()?; + let location_gen = DefaultLocationGenerator::with_data_location( + temp_dir.path().to_str().unwrap().to_string(), + ); + let file_name_gen = + DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet); + + // Create schema + let schema = Arc::new( + crate::spec::Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(), + ]) + .build()?, + ); + + // Create writer builder + let parquet_writer_builder = + ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone()); + + // Create rolling file writer builder + let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size( + parquet_writer_builder, + file_io.clone(), + location_gen, + file_name_gen, + ); + + // Create data file writer builder + let data_file_writer_builder = DataFileWriterBuilder::new(rolling_writer_builder); + + // Create clustered writer + let mut writer = ClusteredDataWriter::new(data_file_writer_builder); + + // Create test data with proper field ID metadata + let arrow_schema = Schema::new(vec![ + Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + 1.to_string(), + )])), + Field::new("name", DataType::Utf8, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + 2.to_string(), + )])), + ]); + + let batch1 = RecordBatch::try_new(Arc::new(arrow_schema.clone()), vec![ + Arc::new(Int32Array::from(vec![1, 2, 3])), + Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie"])), + ])?; + + let batch2 = RecordBatch::try_new(Arc::new(arrow_schema.clone()), vec![ + Arc::new(Int32Array::from(vec![4, 5])), + Arc::new(StringArray::from(vec!["Dave", "Eve"])), + ])?; + + // Write data without partitioning (pass None for partition_key) + writer.write(None, batch1).await?; + writer.write(None, batch2).await?; + + // Close writer and get data files + let data_files = writer.close().await?; + + // Verify at least one file was created + assert!( + !data_files.is_empty(), + "Expected at least one data file to be created" + ); + + Ok(()) + } + + #[tokio::test] + async fn test_clustered_writer_single_partition() -> Result<()> { + let temp_dir = TempDir::new()?; + let file_io = FileIOBuilder::new_fs_io().build()?; + let location_gen = DefaultLocationGenerator::with_data_location( + temp_dir.path().to_str().unwrap().to_string(), + ); + let file_name_gen = + DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet); + + // Create schema with partition field + let schema = Arc::new( + crate::spec::Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(3, "region", Type::Primitive(PrimitiveType::String)) + .into(), + ]) + .build()?, + ); + + // Create partition spec and key + let partition_spec = crate::spec::PartitionSpec::builder(schema.clone()).build()?; + let partition_value = + crate::spec::Struct::from_iter([Some(crate::spec::Literal::string("US"))]); + let partition_key = + crate::spec::PartitionKey::new(partition_spec, schema.clone(), partition_value.clone()); + + // Create writer builder + let parquet_writer_builder = + ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone()); + + // Create rolling file writer builder + let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size( + parquet_writer_builder, + file_io.clone(), + location_gen, + file_name_gen, + ); + + // Create data file writer builder + let data_file_writer_builder = DataFileWriterBuilder::new(rolling_writer_builder); + + // Create clustered writer + let mut writer = ClusteredDataWriter::new(data_file_writer_builder); + + // Create test data with proper field ID metadata + let arrow_schema = Schema::new(vec![ + Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + 1.to_string(), + )])), + Field::new("name", DataType::Utf8, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + 2.to_string(), + )])), + Field::new("region", DataType::Utf8, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + 3.to_string(), + )])), + ]); + + let batch1 = RecordBatch::try_new(Arc::new(arrow_schema.clone()), vec![ + Arc::new(Int32Array::from(vec![1, 2])), + Arc::new(StringArray::from(vec!["Alice", "Bob"])), + Arc::new(StringArray::from(vec!["US", "US"])), + ])?; + + let batch2 = RecordBatch::try_new(Arc::new(arrow_schema.clone()), vec![ + Arc::new(Int32Array::from(vec![3, 4])), + Arc::new(StringArray::from(vec!["Charlie", "Dave"])), + Arc::new(StringArray::from(vec!["US", "US"])), + ])?; + + // Write data to the same partition (this should work) + writer.write(Some(partition_key.clone()), batch1).await?; + writer.write(Some(partition_key.clone()), batch2).await?; + + // Close writer and get data files + let data_files = writer.close().await?; + + // Verify at least one file was created + assert!( + !data_files.is_empty(), + "Expected at least one data file to be created" + ); + + // Verify that all data files have the correct partition value + for data_file in &data_files { + assert_eq!(data_file.partition, partition_value); + } + + Ok(()) + } + + #[tokio::test] + async fn test_clustered_writer_sorted_partitions() -> Result<()> { + let temp_dir = TempDir::new()?; + let file_io = FileIOBuilder::new_fs_io().build()?; + let location_gen = DefaultLocationGenerator::with_data_location( + temp_dir.path().to_str().unwrap().to_string(), + ); + let file_name_gen = + DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet); + + // Create schema with partition field + let schema = Arc::new( + crate::spec::Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(3, "region", Type::Primitive(PrimitiveType::String)) + .into(), + ]) + .build()?, + ); + + // Create partition spec + let partition_spec = crate::spec::PartitionSpec::builder(schema.clone()).build()?; + + // Create partition keys for different regions (in sorted order) + let partition_value_asia = + crate::spec::Struct::from_iter([Some(crate::spec::Literal::string("ASIA"))]); + let partition_key_asia = crate::spec::PartitionKey::new( + partition_spec.clone(), + schema.clone(), + partition_value_asia.clone(), + ); + + let partition_value_eu = + crate::spec::Struct::from_iter([Some(crate::spec::Literal::string("EU"))]); + let partition_key_eu = crate::spec::PartitionKey::new( + partition_spec.clone(), + schema.clone(), + partition_value_eu.clone(), + ); + + let partition_value_us = + crate::spec::Struct::from_iter([Some(crate::spec::Literal::string("US"))]); + let partition_key_us = crate::spec::PartitionKey::new( + partition_spec.clone(), + schema.clone(), + partition_value_us.clone(), + ); + + // Create writer builder + let parquet_writer_builder = + ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone()); + + // Create rolling file writer builder + let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size( + parquet_writer_builder, + file_io.clone(), + location_gen, + file_name_gen, + ); + + // Create data file writer builder + let data_file_writer_builder = DataFileWriterBuilder::new(rolling_writer_builder); + + // Create clustered writer + let mut writer = ClusteredDataWriter::new(data_file_writer_builder); + + // Create test data with proper field ID metadata + let arrow_schema = Schema::new(vec![ + Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + 1.to_string(), + )])), + Field::new("name", DataType::Utf8, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + 2.to_string(), + )])), + Field::new("region", DataType::Utf8, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + 3.to_string(), + )])), + ]); + + // Create batches for different partitions (in sorted order) + let batch_asia = RecordBatch::try_new(Arc::new(arrow_schema.clone()), vec![ + Arc::new(Int32Array::from(vec![1, 2])), + Arc::new(StringArray::from(vec!["Alice", "Bob"])), + Arc::new(StringArray::from(vec!["ASIA", "ASIA"])), + ])?; + + let batch_eu = RecordBatch::try_new(Arc::new(arrow_schema.clone()), vec![ + Arc::new(Int32Array::from(vec![3, 4])), + Arc::new(StringArray::from(vec!["Charlie", "Dave"])), + Arc::new(StringArray::from(vec!["EU", "EU"])), + ])?; + + let batch_us = RecordBatch::try_new(Arc::new(arrow_schema.clone()), vec![ + Arc::new(Int32Array::from(vec![5, 6])), + Arc::new(StringArray::from(vec!["Eve", "Frank"])), + Arc::new(StringArray::from(vec!["US", "US"])), + ])?; + + // Write data in sorted partition order (this should work) + writer + .write(Some(partition_key_asia.clone()), batch_asia) + .await?; + writer + .write(Some(partition_key_eu.clone()), batch_eu) + .await?; + writer + .write(Some(partition_key_us.clone()), batch_us) + .await?; + + // Close writer and get data files + let data_files = writer.close().await?; + + // Verify files were created for all partitions + assert!( + data_files.len() >= 3, + "Expected at least 3 data files (one per partition), got {}", + data_files.len() + ); + + // Verify that we have files for each partition + let mut partitions_found = std::collections::HashSet::new(); + for data_file in &data_files { + partitions_found.insert(data_file.partition.clone()); + } + + assert!( + partitions_found.contains(&partition_value_asia), + "Missing ASIA partition" + ); + assert!( + partitions_found.contains(&partition_value_eu), + "Missing EU partition" + ); + assert!( + partitions_found.contains(&partition_value_us), + "Missing US partition" + ); + + Ok(()) + } + + #[tokio::test] + async fn test_clustered_writer_unsorted_partitions_error() -> Result<()> { + let temp_dir = TempDir::new()?; + let file_io = FileIOBuilder::new_fs_io().build()?; + let location_gen = DefaultLocationGenerator::with_data_location( + temp_dir.path().to_str().unwrap().to_string(), + ); + let file_name_gen = + DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet); + + // Create schema with partition field + let schema = Arc::new( + crate::spec::Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(3, "region", Type::Primitive(PrimitiveType::String)) + .into(), + ]) + .build()?, + ); + + // Create partition spec + let partition_spec = crate::spec::PartitionSpec::builder(schema.clone()).build()?; + + // Create partition keys for different regions + let partition_value_us = + crate::spec::Struct::from_iter([Some(crate::spec::Literal::string("US"))]); + let partition_key_us = crate::spec::PartitionKey::new( + partition_spec.clone(), + schema.clone(), + partition_value_us.clone(), + ); + + let partition_value_eu = + crate::spec::Struct::from_iter([Some(crate::spec::Literal::string("EU"))]); + let partition_key_eu = crate::spec::PartitionKey::new( + partition_spec.clone(), + schema.clone(), + partition_value_eu.clone(), + ); + + // Create writer builder + let parquet_writer_builder = + ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone()); + + // Create rolling file writer builder + let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size( + parquet_writer_builder, + file_io.clone(), + location_gen, + file_name_gen, + ); + + // Create data file writer builder + let data_file_writer_builder = DataFileWriterBuilder::new(rolling_writer_builder); + + // Create clustered writer + let mut writer = ClusteredDataWriter::new(data_file_writer_builder); + + // Create test data with proper field ID metadata + let arrow_schema = Schema::new(vec![ + Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + 1.to_string(), + )])), + Field::new("name", DataType::Utf8, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + 2.to_string(), + )])), + Field::new("region", DataType::Utf8, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + 3.to_string(), + )])), + ]); + + // Create batches for different partitions + let batch_us = RecordBatch::try_new(Arc::new(arrow_schema.clone()), vec![ + Arc::new(Int32Array::from(vec![1, 2])), + Arc::new(StringArray::from(vec!["Alice", "Bob"])), + Arc::new(StringArray::from(vec!["US", "US"])), + ])?; + + let batch_eu = RecordBatch::try_new(Arc::new(arrow_schema.clone()), vec![ + Arc::new(Int32Array::from(vec![3, 4])), + Arc::new(StringArray::from(vec!["Charlie", "Dave"])), + Arc::new(StringArray::from(vec!["EU", "EU"])), + ])?; + + let batch_us2 = RecordBatch::try_new(Arc::new(arrow_schema.clone()), vec![ + Arc::new(Int32Array::from(vec![5])), + Arc::new(StringArray::from(vec!["Eve"])), + Arc::new(StringArray::from(vec!["US"])), + ])?; + + // Write data to US partition first + writer + .write(Some(partition_key_us.clone()), batch_us) + .await?; + + // Write data to EU partition (this closes US partition) + writer + .write(Some(partition_key_eu.clone()), batch_eu) + .await?; + + // Try to write to US partition again - this should fail because data is not sorted + let result = writer + .write(Some(partition_key_us.clone()), batch_us2) + .await; + + assert!(result.is_err(), "Expected error when writing unsorted data"); + + let error = result.unwrap_err(); + assert!( + error.to_string().contains("The input is not sorted"), + "Expected 'input is not sorted' error, got: {}", + error + ); + + Ok(()) + } +} diff --git a/crates/iceberg/src/writer/partitioning/fanout_data_writer.rs b/crates/iceberg/src/writer/partitioning/fanout_data_writer.rs new file mode 100644 index 000000000..5f52fc235 --- /dev/null +++ b/crates/iceberg/src/writer/partitioning/fanout_data_writer.rs @@ -0,0 +1,676 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! This module provides the `FanoutDataWriter` implementation. + +use std::collections::HashMap; + +use arrow_array::RecordBatch; +use async_trait::async_trait; + +use crate::spec::{DataFile, PartitionKey, Struct}; +use crate::writer::partitioning::PartitioningWriter; +use crate::writer::{IcebergWriter, IcebergWriterBuilder}; +use crate::{Error, ErrorKind, Result}; + +/// A writer that can write data to multiple partitions simultaneously. +/// +/// Unlike `ClusteredDataWriter` which expects sorted input and maintains only one active writer, +/// `FanoutDataWriter` can handle unsorted data by maintaining multiple active writers in a map. +/// This allows writing to any partition at any time, but uses more memory as all writers +/// remain active until the writer is closed. +#[derive(Clone)] +pub struct FanoutDataWriter { + inner_builder: B, + partition_writers: HashMap, + unpartitioned_writer: Option, + output: Vec, +} + +impl FanoutDataWriter { + /// Create a new `FanoutDataWriter`. + pub fn new(inner_builder: B) -> Self { + Self { + inner_builder, + partition_writers: HashMap::new(), + unpartitioned_writer: None, + output: Vec::new(), + } + } + + /// Get or create a writer for the specified partition. + async fn get_or_create_partition_writer( + &mut self, + partition_key: &PartitionKey, + ) -> Result<&mut B::R> { + if !self.partition_writers.contains_key(partition_key.data()) { + let writer = self + .inner_builder + .clone() + .build_with_partition(Some(partition_key.clone())) + .await?; + self.partition_writers + .insert(partition_key.data().clone(), writer); + } + + self.partition_writers + .get_mut(partition_key.data()) + .ok_or_else(|| { + Error::new( + ErrorKind::Unexpected, + "Failed to get partition writer after creation", + ) + }) + } + + /// Get or create the unpartitioned writer. + async fn get_or_create_unpartitioned_writer(&mut self) -> Result<&mut B::R> { + if self.unpartitioned_writer.is_none() { + self.unpartitioned_writer = Some( + self.inner_builder + .clone() + .build_with_partition(None) + .await?, + ); + } + + self.unpartitioned_writer.as_mut().ok_or_else(|| { + Error::new( + ErrorKind::Unexpected, + "Failed to get unpartitioned writer after creation", + ) + }) + } +} + +#[async_trait] +impl PartitioningWriter for FanoutDataWriter { + async fn write( + &mut self, + partition_key: Option, + input: RecordBatch, + ) -> Result<()> { + if let Some(ref partition_key) = partition_key { + let writer = self.get_or_create_partition_writer(&partition_key).await?; + writer.write(input).await + } else { + let writer = self.get_or_create_unpartitioned_writer().await?; + writer.write(input).await + } + } + + async fn close(&mut self) -> Result> { + // Close all partition writers + for (_, mut writer) in std::mem::take(&mut self.partition_writers) { + self.output.extend(writer.close().await?); + } + + // Close unpartitioned writer if it exists + if let Some(mut writer) = self.unpartitioned_writer.take() { + self.output.extend(writer.close().await?); + } + + // Return all collected data files + Ok(std::mem::take(&mut self.output)) + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + use std::sync::Arc; + + use arrow_array::{Int32Array, StringArray}; + use arrow_schema::{DataType, Field, Schema}; + use parquet::arrow::PARQUET_FIELD_ID_META_KEY; + use parquet::file::properties::WriterProperties; + use tempfile::TempDir; + + use super::*; + use crate::io::FileIOBuilder; + use crate::spec::{ + DataFileFormat, Literal, NestedField, PartitionKey, PartitionSpec, PrimitiveType, Struct, + Type, + }; + use crate::writer::base_writer::data_file_writer::DataFileWriterBuilder; + use crate::writer::file_writer::ParquetWriterBuilder; + use crate::writer::file_writer::location_generator::{ + DefaultFileNameGenerator, DefaultLocationGenerator, + }; + use crate::writer::file_writer::rolling_writer::RollingFileWriterBuilder; + + #[tokio::test] + async fn test_fanout_writer_unpartitioned() -> Result<()> { + let temp_dir = TempDir::new()?; + let file_io = FileIOBuilder::new_fs_io().build()?; + let location_gen = DefaultLocationGenerator::with_data_location( + temp_dir.path().to_str().unwrap().to_string(), + ); + let file_name_gen = + DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet); + + // Create schema + let schema = Arc::new( + crate::spec::Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(), + ]) + .build()?, + ); + + // Create writer builder + let parquet_writer_builder = + ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone()); + + // Create rolling file writer builder + let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size( + parquet_writer_builder, + file_io.clone(), + location_gen, + file_name_gen, + ); + + // Create data file writer builder + let data_file_writer_builder = DataFileWriterBuilder::new(rolling_writer_builder); + + // Create fanout writer + let mut writer = FanoutDataWriter::new(data_file_writer_builder); + + // Create test data with proper field ID metadata + let arrow_schema = Schema::new(vec![ + Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + 1.to_string(), + )])), + Field::new("name", DataType::Utf8, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + 2.to_string(), + )])), + ]); + + let batch1 = RecordBatch::try_new(Arc::new(arrow_schema.clone()), vec![ + Arc::new(Int32Array::from(vec![1, 2, 3])), + Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie"])), + ])?; + + let batch2 = RecordBatch::try_new(Arc::new(arrow_schema.clone()), vec![ + Arc::new(Int32Array::from(vec![4, 5])), + Arc::new(StringArray::from(vec!["Dave", "Eve"])), + ])?; + + // Write data without partitioning (pass None for partition_key) + writer.write(None, batch1).await?; + writer.write(None, batch2).await?; + + // Close writer and get data files + let data_files = writer.close().await?; + + // Verify at least one file was created + assert!( + !data_files.is_empty(), + "Expected at least one data file to be created" + ); + + Ok(()) + } + + #[tokio::test] + async fn test_fanout_writer_multiple_writes() -> Result<()> { + let temp_dir = TempDir::new()?; + let file_io = FileIOBuilder::new_fs_io().build()?; + let location_gen = DefaultLocationGenerator::with_data_location( + temp_dir.path().to_str().unwrap().to_string(), + ); + let file_name_gen = + DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet); + + // Create schema + let schema = Arc::new( + crate::spec::Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(), + ]) + .build()?, + ); + + // Create writer builder + let parquet_writer_builder = + ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone()); + + // Create rolling file writer builder + let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size( + parquet_writer_builder, + file_io.clone(), + location_gen, + file_name_gen, + ); + + // Create data file writer builder + let data_file_writer_builder = DataFileWriterBuilder::new(rolling_writer_builder); + + // Create fanout writer + let mut writer = FanoutDataWriter::new(data_file_writer_builder); + + // Create test data with proper field ID metadata + let arrow_schema = Schema::new(vec![ + Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + 1.to_string(), + )])), + Field::new("name", DataType::Utf8, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + 2.to_string(), + )])), + ]); + + let batch1 = RecordBatch::try_new(Arc::new(arrow_schema.clone()), vec![ + Arc::new(Int32Array::from(vec![1, 2])), + Arc::new(StringArray::from(vec!["Alice", "Bob"])), + ])?; + + let batch2 = RecordBatch::try_new(Arc::new(arrow_schema.clone()), vec![ + Arc::new(Int32Array::from(vec![3, 4])), + Arc::new(StringArray::from(vec!["Charlie", "Dave"])), + ])?; + + let batch3 = RecordBatch::try_new(Arc::new(arrow_schema.clone()), vec![ + Arc::new(Int32Array::from(vec![5])), + Arc::new(StringArray::from(vec!["Eve"])), + ])?; + + // Write multiple batches to demonstrate fanout capability + // (all unpartitioned for simplicity) + writer.write(None, batch1).await?; + writer.write(None, batch2).await?; + writer.write(None, batch3).await?; + + // Close writer and get data files + let data_files = writer.close().await?; + + // Verify at least one file was created + assert!( + !data_files.is_empty(), + "Expected at least one data file to be created" + ); + + Ok(()) + } + + #[tokio::test] + async fn test_fanout_writer_single_partition() -> Result<()> { + let temp_dir = TempDir::new()?; + let file_io = FileIOBuilder::new_fs_io().build()?; + let location_gen = DefaultLocationGenerator::with_data_location( + temp_dir.path().to_str().unwrap().to_string(), + ); + let file_name_gen = + DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet); + + // Create schema with partition field + let schema = Arc::new( + crate::spec::Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(3, "region", Type::Primitive(PrimitiveType::String)) + .into(), + ]) + .build()?, + ); + + // Create partition spec - using the same pattern as data_file_writer tests + let partition_spec = PartitionSpec::builder(schema.clone()).build()?; + let partition_value = Struct::from_iter([Some(Literal::string("US"))]); + let partition_key = + PartitionKey::new(partition_spec, schema.clone(), partition_value.clone()); + + // Create writer builder + let parquet_writer_builder = + ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone()); + + // Create rolling file writer builder + let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size( + parquet_writer_builder, + file_io.clone(), + location_gen, + file_name_gen, + ); + + // Create data file writer builder + let data_file_writer_builder = DataFileWriterBuilder::new(rolling_writer_builder); + + // Create fanout writer + let mut writer = FanoutDataWriter::new(data_file_writer_builder); + + // Create test data with proper field ID metadata + let arrow_schema = Schema::new(vec![ + Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + 1.to_string(), + )])), + Field::new("name", DataType::Utf8, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + 2.to_string(), + )])), + Field::new("region", DataType::Utf8, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + 3.to_string(), + )])), + ]); + + let batch1 = RecordBatch::try_new(Arc::new(arrow_schema.clone()), vec![ + Arc::new(Int32Array::from(vec![1, 2])), + Arc::new(StringArray::from(vec!["Alice", "Bob"])), + Arc::new(StringArray::from(vec!["US", "US"])), + ])?; + + let batch2 = RecordBatch::try_new(Arc::new(arrow_schema.clone()), vec![ + Arc::new(Int32Array::from(vec![3, 4])), + Arc::new(StringArray::from(vec!["Charlie", "Dave"])), + Arc::new(StringArray::from(vec!["US", "US"])), + ])?; + + // Write data to the same partition + writer.write(Some(partition_key.clone()), batch1).await?; + writer.write(Some(partition_key.clone()), batch2).await?; + + // Close writer and get data files + let data_files = writer.close().await?; + + // Verify at least one file was created + assert!( + !data_files.is_empty(), + "Expected at least one data file to be created" + ); + + // Verify that all data files have the correct partition value + for data_file in &data_files { + assert_eq!(data_file.partition, partition_value); + } + + Ok(()) + } + + #[tokio::test] + async fn test_fanout_writer_multiple_partitions() -> Result<()> { + let temp_dir = TempDir::new()?; + let file_io = FileIOBuilder::new_fs_io().build()?; + let location_gen = DefaultLocationGenerator::with_data_location( + temp_dir.path().to_str().unwrap().to_string(), + ); + let file_name_gen = + DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet); + + // Create schema with partition field + let schema = Arc::new( + crate::spec::Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(3, "region", Type::Primitive(PrimitiveType::String)) + .into(), + ]) + .build()?, + ); + + // Create partition spec + let partition_spec = PartitionSpec::builder(schema.clone()).build()?; + + // Create partition keys for different regions + let partition_value_us = Struct::from_iter([Some(Literal::string("US"))]); + let partition_key_us = PartitionKey::new( + partition_spec.clone(), + schema.clone(), + partition_value_us.clone(), + ); + + let partition_value_eu = Struct::from_iter([Some(Literal::string("EU"))]); + let partition_key_eu = PartitionKey::new( + partition_spec.clone(), + schema.clone(), + partition_value_eu.clone(), + ); + + let partition_value_asia = Struct::from_iter([Some(Literal::string("ASIA"))]); + let partition_key_asia = PartitionKey::new( + partition_spec.clone(), + schema.clone(), + partition_value_asia.clone(), + ); + + // Create writer builder + let parquet_writer_builder = + ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone()); + + // Create rolling file writer builder + let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size( + parquet_writer_builder, + file_io.clone(), + location_gen, + file_name_gen, + ); + + // Create data file writer builder + let data_file_writer_builder = DataFileWriterBuilder::new(rolling_writer_builder); + + // Create fanout writer + let mut writer = FanoutDataWriter::new(data_file_writer_builder); + + // Create test data with proper field ID metadata + let arrow_schema = Schema::new(vec![ + Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + 1.to_string(), + )])), + Field::new("name", DataType::Utf8, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + 2.to_string(), + )])), + Field::new("region", DataType::Utf8, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + 3.to_string(), + )])), + ]); + + // Create batches for different partitions + let batch_us1 = RecordBatch::try_new(Arc::new(arrow_schema.clone()), vec![ + Arc::new(Int32Array::from(vec![1, 2])), + Arc::new(StringArray::from(vec!["Alice", "Bob"])), + Arc::new(StringArray::from(vec!["US", "US"])), + ])?; + + let batch_eu1 = RecordBatch::try_new(Arc::new(arrow_schema.clone()), vec![ + Arc::new(Int32Array::from(vec![3, 4])), + Arc::new(StringArray::from(vec!["Charlie", "Dave"])), + Arc::new(StringArray::from(vec!["EU", "EU"])), + ])?; + + let batch_us2 = RecordBatch::try_new(Arc::new(arrow_schema.clone()), vec![ + Arc::new(Int32Array::from(vec![5])), + Arc::new(StringArray::from(vec!["Eve"])), + Arc::new(StringArray::from(vec!["US"])), + ])?; + + let batch_asia1 = RecordBatch::try_new(Arc::new(arrow_schema.clone()), vec![ + Arc::new(Int32Array::from(vec![6, 7])), + Arc::new(StringArray::from(vec!["Frank", "Grace"])), + Arc::new(StringArray::from(vec!["ASIA", "ASIA"])), + ])?; + + // Write data in mixed partition order to demonstrate fanout capability + // This is the key difference from ClusteredWriter - we can write to any partition at any time + writer + .write(Some(partition_key_us.clone()), batch_us1) + .await?; + writer + .write(Some(partition_key_eu.clone()), batch_eu1) + .await?; + writer + .write(Some(partition_key_us.clone()), batch_us2) + .await?; // Back to US partition + writer + .write(Some(partition_key_asia.clone()), batch_asia1) + .await?; + + // Close writer and get data files + let data_files = writer.close().await?; + + // Verify files were created for all partitions + assert!( + data_files.len() >= 3, + "Expected at least 3 data files (one per partition), got {}", + data_files.len() + ); + + // Verify that we have files for each partition + let mut partitions_found = std::collections::HashSet::new(); + for data_file in &data_files { + partitions_found.insert(data_file.partition.clone()); + } + + assert!( + partitions_found.contains(&partition_value_us), + "Missing US partition" + ); + assert!( + partitions_found.contains(&partition_value_eu), + "Missing EU partition" + ); + assert!( + partitions_found.contains(&partition_value_asia), + "Missing ASIA partition" + ); + + Ok(()) + } + + #[tokio::test] + async fn test_fanout_writer_mixed_partitioned_unpartitioned() -> Result<()> { + let temp_dir = TempDir::new()?; + let file_io = FileIOBuilder::new_fs_io().build()?; + let location_gen = DefaultLocationGenerator::with_data_location( + temp_dir.path().to_str().unwrap().to_string(), + ); + let file_name_gen = + DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet); + + // Create schema + let schema = Arc::new( + crate::spec::Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(3, "region", Type::Primitive(PrimitiveType::String)) + .into(), + ]) + .build()?, + ); + + // Create partition spec and key + let partition_spec = PartitionSpec::builder(schema.clone()).build()?; + let partition_value_us = Struct::from_iter([Some(Literal::string("US"))]); + let partition_key_us = + PartitionKey::new(partition_spec, schema.clone(), partition_value_us.clone()); + + // Create writer builder + let parquet_writer_builder = + ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone()); + + // Create rolling file writer builder + let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size( + parquet_writer_builder, + file_io.clone(), + location_gen, + file_name_gen, + ); + + // Create data file writer builder + let data_file_writer_builder = DataFileWriterBuilder::new(rolling_writer_builder); + + // Create fanout writer + let mut writer = FanoutDataWriter::new(data_file_writer_builder); + + // Create test data with proper field ID metadata + let arrow_schema = Schema::new(vec![ + Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + 1.to_string(), + )])), + Field::new("name", DataType::Utf8, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + 2.to_string(), + )])), + Field::new("region", DataType::Utf8, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + 3.to_string(), + )])), + ]); + + // Create batches + let batch_partitioned = RecordBatch::try_new(Arc::new(arrow_schema.clone()), vec![ + Arc::new(Int32Array::from(vec![1, 2])), + Arc::new(StringArray::from(vec!["Alice", "Bob"])), + Arc::new(StringArray::from(vec!["US", "US"])), + ])?; + + let batch_unpartitioned = RecordBatch::try_new(Arc::new(arrow_schema.clone()), vec![ + Arc::new(Int32Array::from(vec![3, 4])), + Arc::new(StringArray::from(vec!["Charlie", "Dave"])), + Arc::new(StringArray::from(vec!["UNKNOWN", "UNKNOWN"])), + ])?; + + // Write both partitioned and unpartitioned data + writer + .write(Some(partition_key_us), batch_partitioned) + .await?; + writer.write(None, batch_unpartitioned).await?; + + // Close writer and get data files + let data_files = writer.close().await?; + + // Verify files were created for both partitioned and unpartitioned data + assert!( + data_files.len() >= 2, + "Expected at least 2 data files (partitioned + unpartitioned), got {}", + data_files.len() + ); + + // Verify we have both partitioned and unpartitioned files + let mut has_partitioned = false; + let mut has_unpartitioned = false; + + for data_file in &data_files { + if data_file.partition == partition_value_us { + has_partitioned = true; + } else if data_file.partition == Struct::empty() { + has_unpartitioned = true; + } + } + + assert!(has_partitioned, "Missing partitioned data file"); + assert!(has_unpartitioned, "Missing unpartitioned data file"); + + Ok(()) + } +} diff --git a/crates/iceberg/src/writer/partitioning/mod.rs b/crates/iceberg/src/writer/partitioning/mod.rs new file mode 100644 index 000000000..751efb9cc --- /dev/null +++ b/crates/iceberg/src/writer/partitioning/mod.rs @@ -0,0 +1,49 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +pub mod clustered_data_writer; +pub mod fanout_data_writer; + +use crate::Result; +use crate::spec::PartitionKey; +use crate::writer::{DefaultInput, DefaultOutput}; + +/// A writer that can write data to partitioned tables. +/// +/// This trait provides methods for writing data with optional partition keys and +/// closing the writer to retrieve the output. +#[async_trait::async_trait] +pub trait PartitioningWriter: Send + 'static { + /// Write data with an optional partition key. + /// + /// # Parameters + /// + /// * `partition_key` - Optional partition key to determine which partition to write to + /// * `input` - The input data to write + /// + /// # Returns + /// + /// `Ok(())` on success, or an error if the write operation fails. + async fn write(&mut self, partition_key: Option, input: I) -> Result<()>; + + /// Close the writer and return the output. + /// + /// # Returns + /// + /// The accumulated output from all write operations. + async fn close(&mut self) -> Result; +} diff --git a/crates/integrations/datafusion/src/physical_plan/write.rs b/crates/integrations/datafusion/src/physical_plan/write.rs index 712da92b2..37a0d8c7a 100644 --- a/crates/integrations/datafusion/src/physical_plan/write.rs +++ b/crates/integrations/datafusion/src/physical_plan/write.rs @@ -256,6 +256,7 @@ impl ExecutionPlan for IcebergWriteExec { }; let file_io = self.table.file_io().clone(); + // todo location_gen and file_name_gen should be configurable let location_generator = DefaultLocationGenerator::new(self.table.metadata().clone()) .map_err(to_datafusion_error)?; // todo filename prefix/suffix should be configurable @@ -268,8 +269,7 @@ impl ExecutionPlan for IcebergWriteExec { location_generator, file_name_generator, ); - // todo specify partition key when partitioning writer is supported - let data_file_writer_builder = DataFileWriterBuilder::new(rolling_writer_builder, None); + let data_file_writer_builder = DataFileWriterBuilder::new(rolling_writer_builder); // Get input data let data = execute_input_stream( @@ -285,7 +285,8 @@ impl ExecutionPlan for IcebergWriteExec { // Create write stream let stream = futures::stream::once(async move { let mut writer = data_file_writer_builder - .build() + // todo specify partition key when partitioning writer is supported + .build_with_partition(None) .await .map_err(to_datafusion_error)?; let mut input_stream = data; From 36bac114bb7e4af02aae14e8d84f3c8b21749ba1 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Thu, 9 Oct 2025 21:34:14 -0700 Subject: [PATCH 2/3] fix usages --- .../src/writer/file_writer/rolling_writer.rs | 9 +++--- .../shared_tests/append_data_file_test.rs | 7 +++-- .../append_partition_data_file_test.rs | 30 ++++++++----------- .../shared_tests/conflict_commit_test.rs | 7 +++-- .../tests/shared_tests/scan_all_type.rs | 7 +++-- 5 files changed, 32 insertions(+), 28 deletions(-) diff --git a/crates/iceberg/src/writer/file_writer/rolling_writer.rs b/crates/iceberg/src/writer/file_writer/rolling_writer.rs index 0b9b105c5..0ffd48b39 100644 --- a/crates/iceberg/src/writer/file_writer/rolling_writer.rs +++ b/crates/iceberg/src/writer/file_writer/rolling_writer.rs @@ -329,11 +329,10 @@ mod tests { file_name_gen, ); - let data_file_writer_builder = - DataFileWriterBuilder::new(rolling_file_writer_builder, None); + let data_file_writer_builder = DataFileWriterBuilder::new(rolling_file_writer_builder); // Create writer - let mut writer = data_file_writer_builder.build().await?; + let mut writer = data_file_writer_builder.build_with_partition(None).await?; // Create test data let arrow_schema = make_test_arrow_schema(); @@ -388,10 +387,10 @@ mod tests { file_name_gen, ); - let data_file_writer_builder = DataFileWriterBuilder::new(rolling_writer_builder, None); + let data_file_writer_builder = DataFileWriterBuilder::new(rolling_writer_builder); // Create writer - let mut writer = data_file_writer_builder.build().await?; + let mut writer = data_file_writer_builder.build_with_partition(None).await?; // Create test data let arrow_schema = make_test_arrow_schema(); diff --git a/crates/integration_tests/tests/shared_tests/append_data_file_test.rs b/crates/integration_tests/tests/shared_tests/append_data_file_test.rs index f4cba959e..4488dfff7 100644 --- a/crates/integration_tests/tests/shared_tests/append_data_file_test.rs +++ b/crates/integration_tests/tests/shared_tests/append_data_file_test.rs @@ -82,8 +82,11 @@ async fn test_append_data_file() { location_generator.clone(), file_name_generator.clone(), ); - let data_file_writer_builder = DataFileWriterBuilder::new(rolling_file_writer_builder, None); - let mut data_file_writer = data_file_writer_builder.build().await.unwrap(); + let data_file_writer_builder = DataFileWriterBuilder::new(rolling_file_writer_builder); + let mut data_file_writer = data_file_writer_builder + .build_with_partition(None) + .await + .unwrap(); let col1 = StringArray::from(vec![Some("foo"), Some("bar"), None, Some("baz")]); let col2 = Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4)]); let col3 = BooleanArray::from(vec![Some(true), Some(false), None, Some(false)]); diff --git a/crates/integration_tests/tests/shared_tests/append_partition_data_file_test.rs b/crates/integration_tests/tests/shared_tests/append_partition_data_file_test.rs index 0da88f1a0..47080e2e1 100644 --- a/crates/integration_tests/tests/shared_tests/append_partition_data_file_test.rs +++ b/crates/integration_tests/tests/shared_tests/append_partition_data_file_test.rs @@ -106,13 +106,11 @@ async fn test_append_partition_data_file() { file_name_generator.clone(), ); - let mut data_file_writer_valid = DataFileWriterBuilder::new( - rolling_file_writer_builder.clone(), - Some(partition_key.clone()), - ) - .build() - .await - .unwrap(); + let mut data_file_writer_valid = + DataFileWriterBuilder::new(rolling_file_writer_builder.clone()) + .build_with_partition(Some(partition_key.clone())) + .await + .unwrap(); let col1 = StringArray::from(vec![Some("foo1"), Some("foo2")]); let col2 = Int32Array::from(vec![ @@ -191,11 +189,10 @@ async fn test_schema_incompatible_partition_type( catalog: &dyn Catalog, ) { // test writing different "type" of partition than mentioned in schema - let mut data_file_writer_invalid = - DataFileWriterBuilder::new(rolling_file_writer_builder, Some(partition_key)) - .build() - .await - .unwrap(); + let mut data_file_writer_invalid = DataFileWriterBuilder::new(rolling_file_writer_builder) + .build_with_partition(Some(partition_key)) + .await + .unwrap(); data_file_writer_invalid.write(batch.clone()).await.unwrap(); let data_file_invalid = data_file_writer_invalid.close().await.unwrap(); @@ -221,11 +218,10 @@ async fn test_schema_incompatible_partition_fields( catalog: &dyn Catalog, ) { // test writing different number of partition fields than mentioned in schema - let mut data_file_writer_invalid = - DataFileWriterBuilder::new(rolling_file_writer_builder, Some(partition_key)) - .build() - .await - .unwrap(); + let mut data_file_writer_invalid = DataFileWriterBuilder::new(rolling_file_writer_builder) + .build_with_partition(Some(partition_key)) + .await + .unwrap(); data_file_writer_invalid.write(batch.clone()).await.unwrap(); let data_file_invalid = data_file_writer_invalid.close().await.unwrap(); diff --git a/crates/integration_tests/tests/shared_tests/conflict_commit_test.rs b/crates/integration_tests/tests/shared_tests/conflict_commit_test.rs index a248fa707..bc37ce2a2 100644 --- a/crates/integration_tests/tests/shared_tests/conflict_commit_test.rs +++ b/crates/integration_tests/tests/shared_tests/conflict_commit_test.rs @@ -81,8 +81,11 @@ async fn test_append_data_file_conflict() { location_generator.clone(), file_name_generator.clone(), ); - let data_file_writer_builder = DataFileWriterBuilder::new(rolling_file_writer_builder, None); - let mut data_file_writer = data_file_writer_builder.build().await.unwrap(); + let data_file_writer_builder = DataFileWriterBuilder::new(rolling_file_writer_builder); + let mut data_file_writer = data_file_writer_builder + .build_with_partition(None) + .await + .unwrap(); let col1 = StringArray::from(vec![Some("foo"), Some("bar"), None, Some("baz")]); let col2 = Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4)]); let col3 = BooleanArray::from(vec![Some(true), Some(false), None, Some(false)]); diff --git a/crates/integration_tests/tests/shared_tests/scan_all_type.rs b/crates/integration_tests/tests/shared_tests/scan_all_type.rs index 1125de11a..870f7ef92 100644 --- a/crates/integration_tests/tests/shared_tests/scan_all_type.rs +++ b/crates/integration_tests/tests/shared_tests/scan_all_type.rs @@ -163,8 +163,11 @@ async fn test_scan_all_type() { location_generator.clone(), file_name_generator.clone(), ); - let data_file_writer_builder = DataFileWriterBuilder::new(rolling_file_writer_builder, None); - let mut data_file_writer = data_file_writer_builder.build().await.unwrap(); + let data_file_writer_builder = DataFileWriterBuilder::new(rolling_file_writer_builder); + let mut data_file_writer = data_file_writer_builder + .build_with_partition(None) + .await + .unwrap(); // Prepare data let col1 = Int32Array::from(vec![1, 2, 3, 4, 5]); From 34917d415c9b04d87766c6ee78af075680e677f1 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Thu, 9 Oct 2025 21:55:33 -0700 Subject: [PATCH 3/3] daily clippy fix --- crates/iceberg/src/writer/partitioning/fanout_data_writer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/iceberg/src/writer/partitioning/fanout_data_writer.rs b/crates/iceberg/src/writer/partitioning/fanout_data_writer.rs index 5f52fc235..ff7566fcc 100644 --- a/crates/iceberg/src/writer/partitioning/fanout_data_writer.rs +++ b/crates/iceberg/src/writer/partitioning/fanout_data_writer.rs @@ -104,7 +104,7 @@ impl PartitioningWriter for FanoutDataWriter { partition_key: Option, input: RecordBatch, ) -> Result<()> { - if let Some(ref partition_key) = partition_key { + if let Some(partition_key) = partition_key { let writer = self.get_or_create_partition_writer(&partition_key).await?; writer.write(input).await } else {