Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 9 additions & 17 deletions crates/iceberg/src/writer/base_writer/data_file_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ use crate::{Error, ErrorKind, Result};
#[derive(Clone, Debug)]
pub struct DataFileWriterBuilder<B: FileWriterBuilder, L: LocationGenerator, F: FileNameGenerator> {
inner: RollingFileWriterBuilder<B, L, F>,
partition_key: Option<PartitionKey>,
}

impl<B, L, F> DataFileWriterBuilder<B, L, F>
Expand All @@ -40,14 +39,8 @@ where
F: FileNameGenerator,
{
/// Create a new `DataFileWriterBuilder` using a `RollingFileWriterBuilder`.
pub fn new(
inner_builder: RollingFileWriterBuilder<B, L, F>,
partition_key: Option<PartitionKey>,
) -> Self {
Self {
inner: inner_builder,
partition_key,
}
pub fn new(inner: RollingFileWriterBuilder<B, L, F>) -> Self {
Self { inner }
}
}

Expand All @@ -60,10 +53,10 @@ where
{
type R = DataFileWriter<B, L, F>;

async fn build(self) -> Result<Self::R> {
async fn build_with_partition(self, partition_key: Option<PartitionKey>) -> Result<Self::R> {
Ok(DataFileWriter {
inner: Some(self.inner.clone().build()),
partition_key: self.partition_key,
partition_key,
})
}
}
Expand Down Expand Up @@ -194,8 +187,8 @@ mod test {
file_name_gen,
);

let mut data_file_writer = DataFileWriterBuilder::new(rolling_file_writer_builder, None)
.build()
let mut data_file_writer = DataFileWriterBuilder::new(rolling_file_writer_builder)
.build_with_partition(None)
.await
.unwrap();

Expand Down Expand Up @@ -280,10 +273,9 @@ mod test {
file_name_gen,
);

let mut data_file_writer =
DataFileWriterBuilder::new(rolling_file_writer_builder, Some(partition_key))
.build()
.await?;
let mut data_file_writer = DataFileWriterBuilder::new(rolling_file_writer_builder)
.build_with_partition(Some(partition_key))
.await?;

let arrow_schema = arrow_schema::Schema::new(vec![
Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
Expand Down
42 changes: 18 additions & 24 deletions crates/iceberg/src/writer/base_writer/equality_delete_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,11 @@ pub struct EqualityDeleteWriterConfig {
equality_ids: Vec<i32>,
// Projector used to project the data chunk into specific fields.
projector: RecordBatchProjector,
partition_key: Option<PartitionKey>,
}

impl EqualityDeleteWriterConfig {
/// Create a new `DataFileWriterConfig` with equality ids.
pub fn new(
equality_ids: Vec<i32>,
original_schema: SchemaRef,
partition_key: Option<PartitionKey>,
) -> Result<Self> {
pub fn new(equality_ids: Vec<i32>, original_schema: SchemaRef) -> Result<Self> {
let original_arrow_schema = Arc::new(schema_to_arrow_schema(&original_schema)?);
let projector = RecordBatchProjector::new(
original_arrow_schema,
Expand Down Expand Up @@ -110,7 +105,6 @@ impl EqualityDeleteWriterConfig {
Ok(Self {
equality_ids,
projector,
partition_key,
})
}

Expand All @@ -129,12 +123,12 @@ where
{
type R = EqualityDeleteFileWriter<B, L, F>;

async fn build(self) -> Result<Self::R> {
async fn build_with_partition(self, partition_key: Option<PartitionKey>) -> Result<Self::R> {
Ok(EqualityDeleteFileWriter {
inner: Some(self.inner.clone().build()), // todo revisit this, probably still need a builder for rolling writer
inner: Some(self.inner.clone().build()),
projector: self.config.projector,
equality_ids: self.config.equality_ids,
partition_key: self.config.partition_key,
partition_key,
})
}
}
Expand Down Expand Up @@ -428,7 +422,7 @@ mod test {

let equality_ids = vec![0_i32, 8];
let equality_config =
EqualityDeleteWriterConfig::new(equality_ids, Arc::new(schema), None).unwrap();
EqualityDeleteWriterConfig::new(equality_ids, Arc::new(schema)).unwrap();
let delete_schema =
arrow_schema_to_schema(equality_config.projected_arrow_schema_ref()).unwrap();
let projector = equality_config.projector.clone();
Expand All @@ -444,7 +438,7 @@ mod test {
);
let mut equality_delete_writer =
EqualityDeleteFileWriterBuilder::new(rolling_writer_builder, equality_config)
.build()
.build_with_partition(None)
.await?;

// write
Expand Down Expand Up @@ -531,19 +525,19 @@ mod test {
.unwrap(),
);
// Float and Double are not allowed to be used for equality delete
assert!(EqualityDeleteWriterConfig::new(vec![0], schema.clone(), None).is_err());
assert!(EqualityDeleteWriterConfig::new(vec![1], schema.clone(), None).is_err());
assert!(EqualityDeleteWriterConfig::new(vec![0], schema.clone()).is_err());
assert!(EqualityDeleteWriterConfig::new(vec![1], schema.clone()).is_err());
// Struct is not allowed to be used for equality delete
assert!(EqualityDeleteWriterConfig::new(vec![3], schema.clone(), None).is_err());
assert!(EqualityDeleteWriterConfig::new(vec![3], schema.clone()).is_err());
// Nested field of struct is allowed to be used for equality delete
assert!(EqualityDeleteWriterConfig::new(vec![4], schema.clone(), None).is_ok());
assert!(EqualityDeleteWriterConfig::new(vec![4], schema.clone()).is_ok());
// Nested field of map is not allowed to be used for equality delete
assert!(EqualityDeleteWriterConfig::new(vec![7], schema.clone(), None).is_err());
assert!(EqualityDeleteWriterConfig::new(vec![8], schema.clone(), None).is_err());
assert!(EqualityDeleteWriterConfig::new(vec![9], schema.clone(), None).is_err());
assert!(EqualityDeleteWriterConfig::new(vec![7], schema.clone()).is_err());
assert!(EqualityDeleteWriterConfig::new(vec![8], schema.clone()).is_err());
assert!(EqualityDeleteWriterConfig::new(vec![9], schema.clone()).is_err());
// Nested field of list is not allowed to be used for equality delete
assert!(EqualityDeleteWriterConfig::new(vec![10], schema.clone(), None).is_err());
assert!(EqualityDeleteWriterConfig::new(vec![11], schema.clone(), None).is_err());
assert!(EqualityDeleteWriterConfig::new(vec![10], schema.clone()).is_err());
assert!(EqualityDeleteWriterConfig::new(vec![11], schema.clone()).is_err());

Ok(())
}
Expand Down Expand Up @@ -597,7 +591,7 @@ mod test {
.unwrap(),
);
let equality_ids = vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13];
let config = EqualityDeleteWriterConfig::new(equality_ids, schema.clone(), None).unwrap();
let config = EqualityDeleteWriterConfig::new(equality_ids, schema.clone()).unwrap();
let delete_arrow_schema = config.projected_arrow_schema_ref().clone();
let delete_schema = arrow_schema_to_schema(&delete_arrow_schema).unwrap();

Expand All @@ -611,7 +605,7 @@ mod test {
);
let mut equality_delete_writer =
EqualityDeleteFileWriterBuilder::new(rolling_writer_builder, config)
.build()
.build_with_partition(None)
.await?;

// prepare data
Expand Down Expand Up @@ -795,7 +789,7 @@ mod test {
let to_write = RecordBatch::try_new(arrow_schema.clone(), columns).unwrap();
let equality_ids = vec![0_i32, 2, 5];
let equality_config =
EqualityDeleteWriterConfig::new(equality_ids, Arc::new(schema), None).unwrap();
EqualityDeleteWriterConfig::new(equality_ids, Arc::new(schema)).unwrap();
let projector = equality_config.projector.clone();

// check
Expand Down
34 changes: 22 additions & 12 deletions crates/iceberg/src/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,9 @@
//! );
//!
//! // Create a data file writer using parquet file writer builder.
//! let data_file_writer_builder =
//! DataFileWriterBuilder::new(rolling_file_writer_builder, None);
//! let data_file_writer_builder = DataFileWriterBuilder::new(rolling_file_writer_builder);
//! // Build the data file writer
//! let mut data_file_writer = data_file_writer_builder.build().await?;
//! let mut data_file_writer = data_file_writer_builder.build_with_partition(None).await?;
//!
//! // Write the data using data_file_writer...
//!
Expand All @@ -122,7 +121,7 @@
//! use arrow_array::RecordBatch;
//! use iceberg::io::FileIOBuilder;
//! use iceberg::memory::MemoryCatalogBuilder;
//! use iceberg::spec::DataFile;
//! use iceberg::spec::{DataFile, PartitionKey};
//! use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
//! use iceberg::writer::file_writer::ParquetWriterBuilder;
//! use iceberg::writer::file_writer::location_generator::{
Expand All @@ -149,9 +148,15 @@
//! impl<B: IcebergWriterBuilder> IcebergWriterBuilder for LatencyRecordWriterBuilder<B> {
//! type R = LatencyRecordWriter<B::R>;
//!
//! async fn build(self) -> Result<Self::R> {
//! async fn build_with_partition(
//! self,
//! partition_key: Option<PartitionKey>,
//! ) -> Result<Self::R> {
//! Ok(LatencyRecordWriter {
//! inner_writer: self.inner_writer_builder.build().await?,
//! inner_writer: self
//! .inner_writer_builder
//! .build_with_partition(partition_key)
//! .await?,
//! })
//! }
//! }
Expand Down Expand Up @@ -231,24 +236,29 @@
//! );
//!
//! // Create a data file writer builder using rolling file writer.
//! let data_file_writer_builder =
//! DataFileWriterBuilder::new(rolling_file_writer_builder, Some(partition_key));
//! let data_file_writer_builder = DataFileWriterBuilder::new(rolling_file_writer_builder);
//! // Create latency record writer using data file writer builder.
//! let latency_record_builder = LatencyRecordWriterBuilder::new(data_file_writer_builder);
//! // Build the final writer
//! let mut latency_record_data_file_writer = latency_record_builder.build().await.unwrap();
//! let mut latency_record_data_file_writer = latency_record_builder
//! .build_with_partition(Some(partition_key))
//! .await
//! .unwrap();
//!
//! Ok(())
//! }
//! ```

pub mod base_writer;
pub mod file_writer;
/// Provides partition-aware writers
/// TODO examples
pub mod partitioning;

use arrow_array::RecordBatch;

use crate::Result;
use crate::spec::DataFile;
use crate::spec::{DataFile, PartitionKey};

type DefaultInput = RecordBatch;
type DefaultOutput = Vec<DataFile>;
Expand All @@ -260,8 +270,8 @@ pub trait IcebergWriterBuilder<I = DefaultInput, O = DefaultOutput>:
{
/// The associated writer type.
type R: IcebergWriter<I, O>;
/// Build the iceberg writer.
async fn build(self) -> Result<Self::R>;
/// Build the iceberg writer for an optional partition key.
async fn build_with_partition(self, partition_key: Option<PartitionKey>) -> Result<Self::R>;
Copy link
Contributor Author

@CTTY CTTY Oct 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a breaking change. I believe this is necessary because:

  1. IcebergWriter is supposed to generate DataFile that always hold a partition value according to iceberg spec.

  2. The existing code store partition value in the builder directly, making builder.clone() useless:

let builder = IcebergWriterBuilder::new(partition_A);
let writer_A = builder.build();
... // write to partition A

// done with partition A and now we need to write to partition B
// this is wrong because partition value A is still stored in the builder
let writer_B = builder.clone().build() 

An alternative is to add a new method clone_with_partition() but that would also be a breaking change and it's less clean compared to build_with_partition()

}

/// The iceberg writer used to write data to iceberg table.
Expand Down
Loading
Loading