diff --git a/Cargo.lock b/Cargo.lock index 911ddde8b8..b3b58bc0b7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3503,6 +3503,7 @@ dependencies = [ "arrow-buffer", "arrow-cast", "arrow-ord", + "arrow-row", "arrow-schema", "arrow-select", "arrow-string", diff --git a/Cargo.toml b/Cargo.toml index c10c01d94a..faeb866b38 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -47,6 +47,7 @@ arrow-array = "56.2" arrow-buffer = "56.2" arrow-cast = "56.2" arrow-ord = "56.2" +arrow-row = "56.2" arrow-schema = "56.2" arrow-select = "56.2" arrow-string = "56.2" diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index f0540e06e5..c130b5a6d0 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -52,6 +52,7 @@ arrow-array = { workspace = true } arrow-buffer = { workspace = true } arrow-cast = { workspace = true } arrow-ord = { workspace = true } +arrow-row = { workspace = true } arrow-schema = { workspace = true } arrow-select = { workspace = true } arrow-string = { workspace = true } diff --git a/crates/iceberg/src/transaction/append.rs b/crates/iceberg/src/transaction/append.rs index ee5661b8dd..58be7a080f 100644 --- a/crates/iceberg/src/transaction/append.rs +++ b/crates/iceberg/src/transaction/append.rs @@ -68,6 +68,7 @@ impl FastAppendAction { self } + /// Set target branch for the snapshot. pub fn set_target_branch(mut self, target_branch: String) -> Self { self.target_branch = Some(target_branch); self @@ -106,6 +107,7 @@ impl FastAppendAction { self } + /// Set snapshot id for the snapshot. pub fn set_snapshot_id(mut self, snapshot_id: i64) -> Self { self.snapshot_id = Some(snapshot_id); self diff --git a/crates/iceberg/src/transaction/mod.rs b/crates/iceberg/src/transaction/mod.rs index 99271f1823..2a31c83e7c 100644 --- a/crates/iceberg/src/transaction/mod.rs +++ b/crates/iceberg/src/transaction/mod.rs @@ -54,6 +54,8 @@ mod action; pub use action::*; mod append; + +pub use append::FastAppendAction; mod manifest_filter; pub use manifest_filter::*; @@ -77,7 +79,7 @@ use rewrite_files::RewriteFilesAction; use crate::error::Result; use crate::spec::TableProperties; use crate::table::Table; -use crate::transaction::append::{FastAppendAction, MergeAppendAction}; +use crate::transaction::append::MergeAppendAction; use crate::transaction::overwrite_files::OverwriteFilesAction; use crate::transaction::sort_order::ReplaceSortOrderAction; use crate::transaction::update_location::UpdateLocationAction; diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index c5a725e574..f53d6a5361 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -708,14 +708,17 @@ impl<'a> SnapshotProducer<'a> { Ok(ActionCommit::new(updates, requirements)) } + /// Set the new data file sequence number for this snapshot pub fn set_new_data_file_sequence_number(&mut self, new_data_file_sequence_number: i64) { self.new_data_file_sequence_number = Some(new_data_file_sequence_number); } + /// Set the target branch for this snapshot pub fn set_target_branch(&mut self, target_branch: String) { self.target_branch = target_branch; } + /// Get the target branch for this snapshot pub fn target_branch(&self) -> &str { &self.target_branch } diff --git a/crates/iceberg/src/writer/base_writer/data_file_writer.rs b/crates/iceberg/src/writer/base_writer/data_file_writer.rs index dcaa56cc97..46b2c9f442 100644 --- a/crates/iceberg/src/writer/base_writer/data_file_writer.rs +++ b/crates/iceberg/src/writer/base_writer/data_file_writer.rs @@ -126,11 +126,15 @@ where } fn current_row_num(&self) -> usize { - self.inner.as_ref().unwrap().current_row_num() + self.inner + .as_ref() + .map_or(0, |inner| inner.current_row_num()) } fn current_written_size(&self) -> usize { - self.inner.as_ref().unwrap().current_written_size() + self.inner + .as_ref() + .map_or(0, |inner| inner.current_written_size()) } } diff --git a/crates/iceberg/src/writer/base_writer/position_delete_file_writer.rs b/crates/iceberg/src/writer/base_writer/position_delete_file_writer.rs index 7db8e71497..153c1a7940 100644 --- a/crates/iceberg/src/writer/base_writer/position_delete_file_writer.rs +++ b/crates/iceberg/src/writer/base_writer/position_delete_file_writer.rs @@ -54,7 +54,8 @@ static DELETE_FILE_POS: Lazy = Lazy::new(|| { Type::Primitive(PrimitiveType::Long), )) }); -static POSITION_DELETE_SCHEMA: Lazy = Lazy::new(|| { +/// Iceberg schema used for position delete files (file_path, pos). +pub static POSITION_DELETE_SCHEMA: Lazy = Lazy::new(|| { Schema::builder() .with_fields(vec![DELETE_FILE_PATH.clone(), DELETE_FILE_POS.clone()]) .build() @@ -199,11 +200,15 @@ where } fn current_row_num(&self) -> usize { - self.inner.as_ref().unwrap().current_row_num() + self.inner + .as_ref() + .map_or(0, |inner| inner.current_row_num()) } fn current_written_size(&self) -> usize { - self.inner.as_ref().unwrap().current_written_size() + self.inner + .as_ref() + .map_or(0, |inner| inner.current_written_size()) } } diff --git a/crates/iceberg/src/writer/file_writer/mod.rs b/crates/iceberg/src/writer/file_writer/mod.rs index 2ed6414ce8..cb2f52179d 100644 --- a/crates/iceberg/src/writer/file_writer/mod.rs +++ b/crates/iceberg/src/writer/file_writer/mod.rs @@ -24,7 +24,7 @@ use super::CurrentFileStatus; use crate::Result; use crate::spec::DataFileBuilder; -mod parquet_writer; +pub mod parquet_writer; pub use parquet_writer::{ParquetWriter, ParquetWriterBuilder}; use crate::io::OutputFile; diff --git a/crates/iceberg/src/writer/file_writer/rolling_writer.rs b/crates/iceberg/src/writer/file_writer/rolling_writer.rs index 8f03654786..3a950bf640 100644 --- a/crates/iceberg/src/writer/file_writer/rolling_writer.rs +++ b/crates/iceberg/src/writer/file_writer/rolling_writer.rs @@ -249,11 +249,15 @@ impl CurrentFi } fn current_row_num(&self) -> usize { - self.inner.as_ref().unwrap().current_row_num() + self.inner + .as_ref() + .map_or(0, |inner| inner.current_row_num()) } fn current_written_size(&self) -> usize { - self.inner.as_ref().unwrap().current_written_size() + self.inner + .as_ref() + .map_or(0, |inner| inner.current_written_size()) } } diff --git a/crates/iceberg/src/writer/function_writer/equality_delta_writer.rs b/crates/iceberg/src/writer/function_writer/equality_delta_writer.rs new file mode 100644 index 0000000000..2b6e9af3ab --- /dev/null +++ b/crates/iceberg/src/writer/function_writer/equality_delta_writer.rs @@ -0,0 +1,608 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Equality delta writer that produces data files, position delete files and equality delete files +//! in a single pass. + +use std::collections::HashMap; +use std::sync::Arc; + +use arrow_array::builder::BooleanBuilder; +use arrow_array::{Array, Int32Array, RecordBatch}; +use arrow_ord::partition::partition; +use arrow_row::{OwnedRow, RowConverter, Rows, SortField}; +use arrow_select::filter::filter_record_batch; +use itertools::Itertools; +use parquet::arrow::PARQUET_FIELD_ID_META_KEY; + +use crate::arrow::record_batch_projector::RecordBatchProjector; +use crate::arrow::schema_to_arrow_schema; +use crate::spec::{DataFile, PartitionKey, SchemaRef}; +use crate::writer::base_writer::position_delete_file_writer::PositionDeleteInput; +use crate::writer::{CurrentFileStatus, IcebergWriter, IcebergWriterBuilder}; +use crate::{Error, ErrorKind, Result}; + +/// Insert operation marker. +pub const INSERT_OP: i32 = 1; +/// Delete operation marker. +pub const DELETE_OP: i32 = 2; + +/// Builder for [`EqualityDeltaWriter`]. +#[derive(Clone)] +pub struct EqualityDeltaWriterBuilder { + data_writer_builder: DB, + position_delete_writer_builder: PDB, + equality_delete_writer_builder: EDB, + unique_column_ids: Vec, + schema: SchemaRef, +} + +impl EqualityDeltaWriterBuilder { + /// Create a new `EqualityDeltaWriterBuilder`. + pub fn new( + data_writer_builder: DB, + position_delete_writer_builder: PDB, + equality_delete_writer_builder: EDB, + unique_column_ids: Vec, + schema: SchemaRef, + ) -> Self { + Self { + data_writer_builder, + position_delete_writer_builder, + equality_delete_writer_builder, + unique_column_ids, + schema, + } + } +} + +#[async_trait::async_trait] +impl IcebergWriterBuilder for EqualityDeltaWriterBuilder +where + DB: IcebergWriterBuilder, + PDB: IcebergWriterBuilder>, + EDB: IcebergWriterBuilder, + DB::R: CurrentFileStatus, +{ + type R = EqualityDeltaWriter; + + async fn build(self, partition_key: Option) -> Result { + Self::R::try_new( + self.data_writer_builder + .build(partition_key.clone()) + .await?, + self.position_delete_writer_builder + .build(partition_key.clone()) + .await?, + self.equality_delete_writer_builder + .build(partition_key) + .await?, + self.schema, + self.unique_column_ids, + ) + } +} + +/// Writer that handles insert and delete operations in a single stream. +pub struct EqualityDeltaWriter { + data_writer: D, + position_delete_writer: PD, + equality_delete_writer: ED, + projector: RecordBatchProjector, + inserted_row: HashMap, + row_converter: RowConverter, +} + +impl EqualityDeltaWriter +where + D: IcebergWriter + CurrentFileStatus, + PD: IcebergWriter>, + ED: IcebergWriter, +{ + pub(crate) fn try_new( + data_writer: D, + position_delete_writer: PD, + equality_delete_writer: ED, + schema: SchemaRef, + unique_column_ids: Vec, + ) -> Result { + let projector = RecordBatchProjector::new( + Arc::new(schema_to_arrow_schema(&schema)?), + &unique_column_ids, + |field| { + if field.data_type().is_nested() { + return Ok(None); + } + field + .metadata() + .get(PARQUET_FIELD_ID_META_KEY) + .map(|s| { + s.parse::() + .map_err(|e| Error::new(ErrorKind::Unexpected, e.to_string())) + }) + .transpose() + }, + |_| true, + )?; + let row_converter = RowConverter::new( + projector + .projected_schema_ref() + .fields() + .iter() + .map(|field| SortField::new(field.data_type().clone())) + .collect(), + )?; + Ok(Self { + data_writer, + position_delete_writer, + equality_delete_writer, + projector, + inserted_row: HashMap::new(), + row_converter, + }) + } + + async fn insert(&mut self, batch: RecordBatch) -> Result<()> { + let rows = self.extract_unique_column(&batch)?; + let row_count = batch.num_rows(); + + // Initialise the writer by writing the batch first; offsets are derived from the end state. + self.data_writer.write(batch).await?; + + let end_offset = self.data_writer.current_row_num(); + let start_offset = end_offset.saturating_sub(row_count); + let current_file_path: Arc = Arc::from(self.data_writer.current_file_path()); + + let mut position_deletes = Vec::new(); + for (idx, row) in rows.iter().enumerate() { + let previous_input = self.inserted_row.insert( + row.owned(), + PositionDeleteInput::new(current_file_path.clone(), (start_offset + idx) as i64), + ); + if let Some(previous_input) = previous_input { + position_deletes.push(previous_input); + } + } + + self.write_position_deletes(position_deletes).await + } + + async fn delete(&mut self, batch: RecordBatch) -> Result<()> { + let rows = self.extract_unique_column(&batch)?; + let mut delete_row = BooleanBuilder::with_capacity(rows.num_rows()); + let mut position_deletes = Vec::new(); + for row in rows.iter() { + if let Some(previous_input) = self.inserted_row.remove(&row.owned()) { + position_deletes.push(previous_input); + delete_row.append_value(false); + } else { + delete_row.append_value(true); + } + } + + self.write_position_deletes(position_deletes).await?; + + let delete_mask = delete_row.finish(); + if delete_mask.null_count() == delete_mask.len() { + return Ok(()); + } + + let delete_batch = filter_record_batch(&batch, &delete_mask).map_err(|err| { + Error::new( + ErrorKind::DataInvalid, + format!("Failed to filter record batch, error: {err}"), + ) + })?; + if delete_batch.num_rows() == 0 { + return Ok(()); + } + self.equality_delete_writer.write(delete_batch).await + } + + fn extract_unique_column(&mut self, batch: &RecordBatch) -> Result { + self.row_converter + .convert_columns(&self.projector.project_column(batch.columns())?) + .map_err(|err| { + Error::new( + ErrorKind::DataInvalid, + format!("Failed to convert columns, error: {err}"), + ) + }) + } + + async fn write_position_deletes( + &mut self, + mut deletes: Vec, + ) -> Result<()> { + if deletes.is_empty() { + return Ok(()); + } + deletes.sort_by(|a, b| { + let path_cmp = a.path.as_ref().cmp(b.path.as_ref()); + if path_cmp == std::cmp::Ordering::Equal { + a.pos.cmp(&b.pos) + } else { + path_cmp + } + }); + self.position_delete_writer.write(deletes).await + } +} + +#[async_trait::async_trait] +impl IcebergWriter for EqualityDeltaWriter +where + D: IcebergWriter + CurrentFileStatus, + PD: IcebergWriter>, + ED: IcebergWriter, +{ + async fn write(&mut self, batch: RecordBatch) -> Result<()> { + if batch.num_columns() == 0 { + return Err(Error::new( + ErrorKind::DataInvalid, + "Equality delta writer expects at least one column for operation markers", + )); + } + + let ops = batch + .column(batch.num_columns() - 1) + .as_any() + .downcast_ref::() + .ok_or(Error::new( + ErrorKind::DataInvalid, + "The last column must be an Int32Array of operation markers", + ))?; + + let partitions = + partition(&[batch.column(batch.num_columns() - 1).clone()]).map_err(|err| { + Error::new( + ErrorKind::DataInvalid, + format!("Failed to partition ops, error: {err}"), + ) + })?; + for range in partitions.ranges() { + let batch = batch + .project(&(0..batch.num_columns() - 1).collect_vec()) + .unwrap() + .slice(range.start, range.end - range.start); + match ops.value(range.start) { + INSERT_OP => self.insert(batch).await?, + DELETE_OP => self.delete(batch).await?, + op => { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Invalid ops: {op}"), + )); + } + } + } + Ok(()) + } + + async fn close(&mut self) -> Result> { + let data_files = self.data_writer.close().await?; + let position_delete_files = self.position_delete_writer.close().await?; + let equality_delete_files = self.equality_delete_writer.close().await?; + Ok(data_files + .into_iter() + .chain(position_delete_files) + .chain(equality_delete_files) + .collect()) + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + use std::sync::Arc; + + use arrow_array::{Int32Array, Int64Array, RecordBatch, StringArray}; + use arrow_schema::{DataType, Field, Schema as ArrowSchema}; + use arrow_select::concat::concat_batches; + use parquet::arrow::PARQUET_FIELD_ID_META_KEY; + use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; + use parquet::file::properties::WriterProperties; + use tempfile::TempDir; + + use crate::Result; + use crate::arrow::{arrow_schema_to_schema, schema_to_arrow_schema}; + use crate::io::FileIOBuilder; + use crate::spec::{DataContentType, DataFileFormat, NestedField, PrimitiveType, Schema, Type}; + use crate::writer::base_writer::data_file_writer::DataFileWriterBuilder; + use crate::writer::base_writer::equality_delete_writer::{ + EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig, + }; + use crate::writer::base_writer::position_delete_file_writer::PositionDeleteFileWriterBuilder; + use crate::writer::file_writer::location_generator::{ + DefaultFileNameGenerator, DefaultLocationGenerator, + }; + use crate::writer::file_writer::parquet_writer::ParquetWriterBuilder; + use crate::writer::file_writer::rolling_writer::RollingFileWriterBuilder; + use crate::writer::function_writer::equality_delta_writer::{ + DELETE_OP, EqualityDeltaWriterBuilder, INSERT_OP, + }; + use crate::writer::{IcebergWriter, IcebergWriterBuilder}; + + type WriterBuildersResult = ( + DataFileWriterBuilder< + ParquetWriterBuilder, + DefaultLocationGenerator, + DefaultFileNameGenerator, + >, + PositionDeleteFileWriterBuilder< + ParquetWriterBuilder, + DefaultLocationGenerator, + DefaultFileNameGenerator, + >, + EqualityDeleteFileWriterBuilder< + ParquetWriterBuilder, + DefaultLocationGenerator, + DefaultFileNameGenerator, + >, + EqualityDeleteWriterConfig, + ); + + fn position_delete_arrow_schema() -> ArrowSchema { + schema_to_arrow_schema( + &Schema::builder() + .with_fields(vec![ + NestedField::required( + 2147483546, + "file_path", + Type::Primitive(PrimitiveType::String), + ) + .into(), + NestedField::required(2147483545, "pos", Type::Primitive(PrimitiveType::Long)) + .into(), + ]) + .build() + .unwrap(), + ) + .unwrap() + } + + fn create_writer_builders( + data_schema: Arc, + file_io: &crate::io::FileIO, + location_gen: DefaultLocationGenerator, + file_name_gen: DefaultFileNameGenerator, + ) -> Result { + let parquet_writer_builder = + ParquetWriterBuilder::new(WriterProperties::builder().build(), data_schema.clone()); + let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size( + parquet_writer_builder, + file_io.clone(), + location_gen.clone(), + file_name_gen.clone(), + ); + let data_file_writer_builder = DataFileWriterBuilder::new(rolling_writer_builder); + + let position_delete_parquet_builder = ParquetWriterBuilder::new( + WriterProperties::builder().build(), + Arc::new( + Schema::builder() + .with_fields(vec![ + NestedField::required( + 2147483546, + "file_path", + Type::Primitive(PrimitiveType::String), + ) + .into(), + NestedField::required( + 2147483545, + "pos", + Type::Primitive(PrimitiveType::Long), + ) + .into(), + ]) + .build() + .unwrap(), + ), + ); + let position_delete_writer_builder = PositionDeleteFileWriterBuilder::new( + RollingFileWriterBuilder::new_with_default_file_size( + position_delete_parquet_builder, + file_io.clone(), + location_gen.clone(), + file_name_gen.clone(), + ), + ); + + let equality_config = EqualityDeleteWriterConfig::new(vec![1, 2], data_schema.clone())?; + let equality_delete_writer_builder = { + let schema = + arrow_schema_to_schema(equality_config.projected_arrow_schema_ref())?.into(); + let parquet_writer_builder = + ParquetWriterBuilder::new(WriterProperties::builder().build(), schema); + let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size( + parquet_writer_builder, + file_io.clone(), + location_gen, + file_name_gen, + ); + EqualityDeleteFileWriterBuilder::new(rolling_writer_builder, equality_config.clone()) + }; + + Ok(( + data_file_writer_builder, + position_delete_writer_builder, + equality_delete_writer_builder, + equality_config, + )) + } + + #[tokio::test] + async fn test_equality_delta_writer() -> Result<()> { + let schema = Arc::new( + Schema::builder() + .with_fields(vec![ + Arc::new(NestedField::required( + 1, + "id".to_string(), + Type::Primitive(PrimitiveType::Long), + )), + Arc::new(NestedField::required( + 2, + "name".to_string(), + Type::Primitive(PrimitiveType::String), + )), + ]) + .build() + .unwrap(), + ); + let temp_dir = TempDir::new().unwrap(); + let file_io = FileIOBuilder::new_fs_io().build()?; + let location_gen = + DefaultLocationGenerator::with_data_location(temp_dir.path().to_string_lossy().into()); + let file_name_gen = + DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet); + + let ( + data_file_writer_builder, + position_delete_writer_builder, + equality_delete_writer_builder, + equality_config, + ) = create_writer_builders( + schema.clone(), + &file_io, + location_gen.clone(), + file_name_gen.clone(), + )?; + + let mut equality_delta_writer = EqualityDeltaWriterBuilder::new( + data_file_writer_builder, + position_delete_writer_builder, + equality_delete_writer_builder, + vec![1, 2], + schema.clone(), + ) + .build(None) + .await?; + + let arrow_schema = Arc::new(ArrowSchema::new(vec![ + Field::new("id", DataType::Int64, true).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + 1.to_string(), + )])), + Field::new("data", DataType::Utf8, true).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + 2.to_string(), + )])), + Field::new("op", DataType::Int32, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + 3.to_string(), + )])), + ])); + + let batch_one = RecordBatch::try_new(arrow_schema.clone(), vec![ + Arc::new(Int64Array::from(vec![1, 2, 1, 3, 2, 3, 1])), + Arc::new(StringArray::from(vec!["a", "b", "c", "d", "e", "f", "g"])), + Arc::new(Int32Array::from(vec![INSERT_OP; 7])), + ])?; + equality_delta_writer.write(batch_one).await?; + + let batch_two = RecordBatch::try_new(arrow_schema.clone(), vec![ + Arc::new(Int64Array::from(vec![1, 2, 3, 4])), + Arc::new(StringArray::from(vec!["a", "b", "k", "l"])), + Arc::new(Int32Array::from(vec![ + DELETE_OP, DELETE_OP, DELETE_OP, INSERT_OP, + ])), + ])?; + equality_delta_writer.write(batch_two).await?; + + let data_files = equality_delta_writer.close().await?; + assert_eq!(data_files.len(), 3); + + let data_schema = Arc::new(ArrowSchema::new(vec![ + Field::new("id", DataType::Int64, true).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + 1.to_string(), + )])), + Field::new("data", DataType::Utf8, true).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + 2.to_string(), + )])), + ])); + + let data_file = data_files + .iter() + .find(|file| file.content == DataContentType::Data) + .unwrap(); + let data_file_path = data_file.file_path().to_string(); + let input_content = file_io.new_input(data_file_path.clone())?.read().await?; + let reader = ParquetRecordBatchReaderBuilder::try_new(input_content)? + .build() + .unwrap(); + let batches = reader.map(|batch| batch.unwrap()).collect::>(); + let res = concat_batches(&data_schema, &batches).unwrap(); + let expected_batches = RecordBatch::try_new(data_schema.clone(), vec![ + Arc::new(Int64Array::from(vec![1, 2, 1, 3, 2, 3, 1, 4])), + Arc::new(StringArray::from(vec![ + "a", "b", "c", "d", "e", "f", "g", "l", + ])), + ])?; + assert_eq!(expected_batches, res); + + let position_delete_file = data_files + .iter() + .find(|file| file.content == DataContentType::PositionDeletes) + .unwrap(); + let position_input = file_io + .new_input(position_delete_file.file_path.clone())? + .read() + .await?; + let reader = ParquetRecordBatchReaderBuilder::try_new(position_input)? + .build() + .unwrap(); + let batches = reader.map(|batch| batch.unwrap()).collect::>(); + let position_schema = Arc::new(position_delete_arrow_schema()); + let res = concat_batches(&position_schema, &batches).unwrap(); + let expected_batches = RecordBatch::try_new(position_schema.clone(), vec![ + Arc::new(StringArray::from(vec![ + data_file_path.clone(), + data_file_path.clone(), + ])), + Arc::new(Int64Array::from(vec![0, 1])), + ])?; + assert_eq!(expected_batches, res); + + let equality_delete_file = data_files + .iter() + .find(|file| file.content == DataContentType::EqualityDeletes) + .unwrap(); + let equality_input = file_io + .new_input(equality_delete_file.file_path.clone())? + .read() + .await?; + let reader = ParquetRecordBatchReaderBuilder::try_new(equality_input)? + .build() + .unwrap(); + let batches = reader.map(|batch| batch.unwrap()).collect::>(); + let equality_schema = Arc::new(arrow_schema_to_schema( + equality_config.projected_arrow_schema_ref(), + )?); + let equality_arrow_schema = Arc::new(schema_to_arrow_schema(&equality_schema)?); + let res = concat_batches(&equality_arrow_schema, &batches).unwrap(); + let expected_batches = RecordBatch::try_new(equality_arrow_schema.clone(), vec![ + Arc::new(Int64Array::from(vec![3])), + Arc::new(StringArray::from(vec!["k"])), + ])?; + assert_eq!(expected_batches, res); + + Ok(()) + } +} diff --git a/crates/iceberg/src/writer/function_writer/mod.rs b/crates/iceberg/src/writer/function_writer/mod.rs new file mode 100644 index 0000000000..d6dfda6d9d --- /dev/null +++ b/crates/iceberg/src/writer/function_writer/mod.rs @@ -0,0 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Higher-level functional writers built on top of base Iceberg writers. + +pub mod equality_delta_writer; diff --git a/crates/iceberg/src/writer/mod.rs b/crates/iceberg/src/writer/mod.rs index a7892d49e1..03273d7153 100644 --- a/crates/iceberg/src/writer/mod.rs +++ b/crates/iceberg/src/writer/mod.rs @@ -386,7 +386,9 @@ pub mod base_writer; pub mod file_writer; +pub mod function_writer; pub mod partitioning; +pub mod task_writer; use arrow_array::RecordBatch; @@ -419,6 +421,8 @@ pub trait IcebergWriter: Send + 'static { async fn close(&mut self) -> Result; } +pub use task_writer::TaskWriter; + /// The current file status of the Iceberg writer. /// This is implemented for writers that write a single file at a time. pub trait CurrentFileStatus { diff --git a/crates/iceberg/src/writer/task_writer.rs b/crates/iceberg/src/writer/task_writer.rs new file mode 100644 index 0000000000..1bb527192e --- /dev/null +++ b/crates/iceberg/src/writer/task_writer.rs @@ -0,0 +1,527 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! High-level Iceberg writer that coordinates partition routing for `RecordBatch` input. +//! +//! `TaskWriter` sits on top of the generic writer abstractions and provides a convenient entry +//! point for users that start from Arrow `RecordBatch` values. It lazily constructs the +//! appropriate partitioning writer (unpartitioned, fanout, or clustered) and routes batches to it. +//! +//! # Example +//! +//! ```rust,ignore +//! use iceberg::spec::{PartitionSpec, Schema}; +//! use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder; +//! use iceberg::writer::file_writer::ParquetWriterBuilder; +//! use iceberg::writer::file_writer::location_generator::{ +//! DefaultFileNameGenerator, DefaultLocationGenerator, +//! }; +//! use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder; +//! use iceberg::writer::task_writer::TaskWriter; +//! use parquet::file::properties::WriterProperties; +//! +//! # async fn build_task_writer( +//! # data_file_writer_builder: +//! # DataFileWriterBuilder, +//! # schema: Schema, +//! # partition_spec: PartitionSpec, +//! # ) -> iceberg::Result<()> { +//! let mut task_writer = TaskWriter::new( +//! data_file_writer_builder, +//! false, // fanout_enabled +//! Schema::from(schema).into(), +//! PartitionSpec::from(partition_spec).into(), +//! ); +//! +//! // task_writer.write(record_batch).await?; +//! // let data_files = task_writer.close().await?; +//! # Ok(()) +//! # } +//! ``` + +use arrow_array::RecordBatch; + +use crate::Result; +use crate::arrow::RecordBatchPartitionSplitter; +use crate::spec::{DataFile, PartitionSpecRef, SchemaRef}; +use crate::writer::partitioning::PartitioningWriter; +use crate::writer::partitioning::clustered_writer::ClusteredWriter; +use crate::writer::partitioning::fanout_writer::FanoutWriter; +use crate::writer::partitioning::unpartitioned_writer::UnpartitionedWriter; +use crate::writer::{IcebergWriter, IcebergWriterBuilder}; + +/// High-level writer that handles partitioning and routing of `RecordBatch` data to Iceberg tables. +pub struct TaskWriter { + /// The underlying writer (unpartitioned, fanout, or clustered) + writer: Option>, + /// Lazily initialized partition splitter for partitioned tables + partition_splitter: Option, + /// Iceberg schema reference used for partition splitting + schema: SchemaRef, + /// Partition specification reference used for partition splitting + partition_spec: PartitionSpecRef, +} + +/// Internal enum holding the writer implementation for each partitioning strategy. +enum SupportedWriter { + /// Writer for unpartitioned tables + Unpartitioned(UnpartitionedWriter), + /// Writer for partitioned tables with unsorted data (maintains multiple active writers) + Fanout(FanoutWriter), + /// Writer for partitioned tables with sorted data (maintains a single active writer) + Clustered(ClusteredWriter), +} + +impl TaskWriter { + /// Create a new `TaskWriter`. + /// + /// * `writer_builder` - The writer builder used to create underlying writers + /// * `fanout_enabled` - Controls whether `FanoutWriter` is used for partitioned tables; when + /// `false` the `ClusteredWriter` is selected instead + /// * `schema` - The Iceberg schema reference for the incoming `RecordBatch` + /// * `partition_spec` - The partition specification reference for the target table + pub fn new( + writer_builder: B, + fanout_enabled: bool, + schema: SchemaRef, + partition_spec: PartitionSpecRef, + ) -> Self { + Self::new_with_partition_splitter( + writer_builder, + fanout_enabled, + schema, + partition_spec, + None, + ) + } + + /// Create a new `TaskWriter` with a pre-configured partition splitter. + /// + /// This allows callers to provide a custom [`RecordBatchPartitionSplitter`], enabling use cases + /// such as computing partition values at runtime rather than expecting a pre-computed + /// `_partition` column in incoming batches. + pub fn new_with_partition_splitter( + writer_builder: B, + fanout_enabled: bool, + schema: SchemaRef, + partition_spec: PartitionSpecRef, + partition_splitter: Option, + ) -> Self { + let writer = if partition_spec.is_unpartitioned() { + SupportedWriter::Unpartitioned(UnpartitionedWriter::new(writer_builder)) + } else if fanout_enabled { + SupportedWriter::Fanout(FanoutWriter::new(writer_builder)) + } else { + SupportedWriter::Clustered(ClusteredWriter::new(writer_builder)) + }; + + Self { + writer: Some(writer), + partition_splitter, + schema, + partition_spec, + } + } + + /// Write a `RecordBatch` to the `TaskWriter`. + /// + /// For the first write against a partitioned table, the partition splitter is initialised + /// lazily. Unpartitioned tables bypass the splitter entirely. + pub async fn write(&mut self, batch: RecordBatch) -> Result<()> { + let writer = self.writer.as_mut().ok_or_else(|| { + crate::Error::new( + crate::ErrorKind::Unexpected, + "TaskWriter has been closed and cannot be used", + ) + })?; + + match writer { + SupportedWriter::Unpartitioned(writer) => writer.write(batch).await, + SupportedWriter::Fanout(writer) => { + if self.partition_splitter.is_none() { + self.partition_splitter = + Some(RecordBatchPartitionSplitter::new_with_precomputed_values( + self.schema.clone(), + self.partition_spec.clone(), + )?); + } + + Self::write_partitioned_batches(writer, &self.partition_splitter, &batch).await + } + SupportedWriter::Clustered(writer) => { + if self.partition_splitter.is_none() { + self.partition_splitter = + Some(RecordBatchPartitionSplitter::new_with_precomputed_values( + self.schema.clone(), + self.partition_spec.clone(), + )?); + } + + Self::write_partitioned_batches(writer, &self.partition_splitter, &batch).await + } + } + } + + /// Close the `TaskWriter` and return all written data files. + pub async fn close(self) -> Result> { + if let Some(writer) = self.writer { + match writer { + SupportedWriter::Unpartitioned(writer) => writer.close().await, + SupportedWriter::Fanout(writer) => writer.close().await, + SupportedWriter::Clustered(writer) => writer.close().await, + } + } else { + Err(crate::Error::new( + crate::ErrorKind::Unexpected, + "TaskWriter has already been closed", + )) + } + } + + async fn write_partitioned_batches( + writer: &mut W, + partition_splitter: &Option, + batch: &RecordBatch, + ) -> Result<()> { + let splitter = partition_splitter + .as_ref() + .expect("partition splitter must be initialised before use"); + let partitioned_batches = splitter.split(batch)?; + + for (partition_key, partition_batch) in partitioned_batches { + writer.write(partition_key, partition_batch).await?; + } + + Ok(()) + } +} + +#[async_trait::async_trait] +impl IcebergWriter for TaskWriter { + async fn write(&mut self, input: RecordBatch) -> Result<()> { + self.write(input).await + } + + async fn close(&mut self) -> Result> { + if let Some(writer) = self.writer.take() { + match writer { + SupportedWriter::Unpartitioned(writer) => writer.close().await, + SupportedWriter::Fanout(writer) => writer.close().await, + SupportedWriter::Clustered(writer) => writer.close().await, + } + } else { + Err(crate::Error::new( + crate::ErrorKind::Unexpected, + "TaskWriter has already been closed", + )) + } + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + use std::sync::Arc; + + use arrow_array::{ArrayRef, Int32Array, RecordBatch, StringArray, StructArray}; + use arrow_schema::{DataType, Field, Schema}; + use parquet::arrow::PARQUET_FIELD_ID_META_KEY; + use parquet::file::properties::WriterProperties; + use tempfile::TempDir; + + use crate::Result; + use crate::arrow::{PROJECTED_PARTITION_VALUE_COLUMN, RecordBatchPartitionSplitter}; + use crate::io::FileIOBuilder; + use crate::spec::{ + DataFile, DataFileFormat, NestedField, PartitionSpec, PrimitiveLiteral, PrimitiveType, Type, + }; + use crate::writer::base_writer::data_file_writer::DataFileWriterBuilder; + use crate::writer::file_writer::ParquetWriterBuilder; + use crate::writer::file_writer::location_generator::{ + DefaultFileNameGenerator, DefaultLocationGenerator, + }; + use crate::writer::file_writer::rolling_writer::RollingFileWriterBuilder; + use crate::writer::task_writer::TaskWriter; + + fn create_test_schema() -> Result> { + Ok(Arc::new( + crate::spec::Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(3, "region", Type::Primitive(PrimitiveType::String)) + .into(), + ]) + .build()?, + )) + } + + fn create_arrow_schema() -> Arc { + Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "1".to_string(), + )])), + Field::new("name", DataType::Utf8, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "2".to_string(), + )])), + Field::new("region", DataType::Utf8, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "3".to_string(), + )])), + ])) + } + + fn create_arrow_schema_with_partition() -> Arc { + let partition_field = Field::new("region", DataType::Utf8, false).with_metadata( + HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "1000".to_string())]), + ); + let partition_struct_field = Field::new( + PROJECTED_PARTITION_VALUE_COLUMN, + DataType::Struct(vec![partition_field.clone()].into()), + false, + ); + + Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "1".to_string(), + )])), + Field::new("name", DataType::Utf8, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "2".to_string(), + )])), + Field::new("region", DataType::Utf8, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "3".to_string(), + )])), + partition_struct_field, + ])) + } + + fn create_writer_builder( + temp_dir: &TempDir, + schema: Arc, + ) -> Result< + DataFileWriterBuilder< + ParquetWriterBuilder, + DefaultLocationGenerator, + DefaultFileNameGenerator, + >, + > { + let file_io = FileIOBuilder::new_fs_io().build()?; + let location_gen = DefaultLocationGenerator::with_data_location( + temp_dir.path().to_string_lossy().into_owned(), + ); + let file_name_gen = + DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet); + let parquet_writer_builder = + ParquetWriterBuilder::new(WriterProperties::builder().build(), schema); + let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size( + parquet_writer_builder, + file_io, + location_gen, + file_name_gen, + ); + Ok(DataFileWriterBuilder::new(rolling_writer_builder)) + } + + #[tokio::test] + async fn test_task_writer_unpartitioned() -> Result<()> { + let temp_dir = TempDir::new()?; + let schema = create_test_schema()?; + let arrow_schema = create_arrow_schema(); + + let partition_spec = Arc::new(PartitionSpec::builder(schema.clone()).build()?); + + let writer_builder = create_writer_builder(&temp_dir, schema.clone())?; + let mut task_writer = TaskWriter::new(writer_builder, false, schema, partition_spec); + + let batch = RecordBatch::try_new(arrow_schema, vec![ + Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef, + Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie"])) as ArrayRef, + Arc::new(StringArray::from(vec!["US", "EU", "US"])) as ArrayRef, + ])?; + + task_writer.write(batch).await?; + let data_files = task_writer.close().await?; + + assert!(!data_files.is_empty()); + assert_eq!(data_files[0].record_count(), 3); + + Ok(()) + } + + fn verify_partition_files( + data_files: &[DataFile], + expected_total: u64, + ) -> HashMap { + let total_records: u64 = data_files.iter().map(|f| f.record_count()).sum(); + assert_eq!(total_records, expected_total, "total record count mismatch"); + + let mut partition_counts = HashMap::new(); + for data_file in data_files { + let partition_value = data_file.partition(); + let region_literal = partition_value.fields()[0] + .as_ref() + .expect("partition value should not be null"); + let region = match region_literal + .as_primitive_literal() + .expect("expected primitive literal") + { + PrimitiveLiteral::String(s) => s.clone(), + _ => panic!("expected string partition value"), + }; + + *partition_counts.entry(region.clone()).or_insert(0) += data_file.record_count(); + + assert!( + data_file.file_path().contains("region="), + "file path should encode partition info" + ); + } + partition_counts + } + + #[tokio::test] + async fn test_task_writer_partitioned_with_computed_partitions() -> Result<()> { + let temp_dir = TempDir::new()?; + let schema = create_test_schema()?; + let arrow_schema = create_arrow_schema(); + + let partition_spec = Arc::new( + PartitionSpec::builder(schema.clone()) + .with_spec_id(1) + .add_partition_field("region", "region", crate::spec::Transform::Identity)? + .build()?, + ); + let partition_splitter = RecordBatchPartitionSplitter::new_with_computed_values( + schema.clone(), + partition_spec.clone(), + )?; + + let writer_builder = create_writer_builder(&temp_dir, schema.clone())?; + let mut task_writer = TaskWriter::new_with_partition_splitter( + writer_builder, + true, + schema, + partition_spec, + Some(partition_splitter), + ); + + let batch = RecordBatch::try_new(arrow_schema, vec![ + Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as ArrayRef, + Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie", "Dave"])) as ArrayRef, + Arc::new(StringArray::from(vec!["US", "EU", "US", "EU"])) as ArrayRef, + ])?; + + task_writer.write(batch).await?; + let data_files = task_writer.close().await?; + + let partition_counts = verify_partition_files(&data_files, 4); + assert_eq!(partition_counts.get("US"), Some(&2)); + assert_eq!(partition_counts.get("EU"), Some(&2)); + + Ok(()) + } + + #[tokio::test] + async fn test_task_writer_partitioned_fanout() -> Result<()> { + let temp_dir = TempDir::new()?; + let schema = create_test_schema()?; + let arrow_schema = create_arrow_schema_with_partition(); + + let partition_spec = Arc::new( + PartitionSpec::builder(schema.clone()) + .with_spec_id(1) + .add_partition_field("region", "region", crate::spec::Transform::Identity)? + .build()?, + ); + + let writer_builder = create_writer_builder(&temp_dir, schema.clone())?; + let mut task_writer = TaskWriter::new(writer_builder, true, schema, partition_spec); + + let partition_field = Field::new("region", DataType::Utf8, false).with_metadata( + HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "1000".to_string())]), + ); + let partition_values = StringArray::from(vec!["US", "EU", "US", "EU"]); + let partition_struct = StructArray::from(vec![( + Arc::new(partition_field), + Arc::new(partition_values) as ArrayRef, + )]); + + let batch = RecordBatch::try_new(arrow_schema, vec![ + Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as ArrayRef, + Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie", "Dave"])) as ArrayRef, + Arc::new(StringArray::from(vec!["US", "EU", "US", "EU"])) as ArrayRef, + Arc::new(partition_struct) as ArrayRef, + ])?; + + task_writer.write(batch).await?; + let data_files = task_writer.close().await?; + + let partition_counts = verify_partition_files(&data_files, 4); + assert_eq!(partition_counts.get("US"), Some(&2)); + assert_eq!(partition_counts.get("EU"), Some(&2)); + + Ok(()) + } + + #[tokio::test] + async fn test_task_writer_partitioned_clustered() -> Result<()> { + let temp_dir = TempDir::new()?; + let schema = create_test_schema()?; + let arrow_schema = create_arrow_schema_with_partition(); + + let partition_spec = Arc::new( + PartitionSpec::builder(schema.clone()) + .with_spec_id(1) + .add_partition_field("region", "region", crate::spec::Transform::Identity)? + .build()?, + ); + + let writer_builder = create_writer_builder(&temp_dir, schema.clone())?; + let mut task_writer = TaskWriter::new(writer_builder, false, schema, partition_spec); + + let partition_field = Field::new("region", DataType::Utf8, false).with_metadata( + HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "1000".to_string())]), + ); + let partition_values = StringArray::from(vec!["ASIA", "ASIA", "EU", "EU"]); + let partition_struct = StructArray::from(vec![( + Arc::new(partition_field), + Arc::new(partition_values) as ArrayRef, + )]); + + let batch = RecordBatch::try_new(arrow_schema, vec![ + Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as ArrayRef, + Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie", "Dave"])) as ArrayRef, + Arc::new(StringArray::from(vec!["ASIA", "ASIA", "EU", "EU"])) as ArrayRef, + Arc::new(partition_struct) as ArrayRef, + ])?; + + task_writer.write(batch).await?; + let data_files = task_writer.close().await?; + + let partition_counts = verify_partition_files(&data_files, 4); + assert_eq!(partition_counts.get("ASIA"), Some(&2)); + assert_eq!(partition_counts.get("EU"), Some(&2)); + + Ok(()) + } +} diff --git a/crates/integrations/datafusion/src/lib.rs b/crates/integrations/datafusion/src/lib.rs index 4b0ea8606d..09d1cac4ce 100644 --- a/crates/integrations/datafusion/src/lib.rs +++ b/crates/integrations/datafusion/src/lib.rs @@ -26,5 +26,3 @@ mod schema; pub mod table; pub use table::table_provider_factory::IcebergTableProviderFactory; pub use table::*; - -pub(crate) mod task_writer; diff --git a/crates/integrations/datafusion/src/task_writer.rs b/crates/integrations/datafusion/src/task_writer.rs index d27b2e6fbf..30013fe5e6 100644 --- a/crates/integrations/datafusion/src/task_writer.rs +++ b/crates/integrations/datafusion/src/task_writer.rs @@ -15,520 +15,5 @@ // specific language governing permissions and limitations // under the License. -//! TaskWriter for DataFusion integration. -//! -//! This module provides a high-level writer that handles partitioning and routing -//! of RecordBatch data to Iceberg tables. - -use datafusion::arrow::array::RecordBatch; -use iceberg::Result; -use iceberg::arrow::RecordBatchPartitionSplitter; -use iceberg::spec::{DataFile, PartitionSpecRef, SchemaRef}; -use iceberg::writer::IcebergWriterBuilder; -use iceberg::writer::partitioning::PartitioningWriter; -use iceberg::writer::partitioning::clustered_writer::ClusteredWriter; -use iceberg::writer::partitioning::fanout_writer::FanoutWriter; -use iceberg::writer::partitioning::unpartitioned_writer::UnpartitionedWriter; - /// High-level writer for DataFusion that handles partitioning and routing of RecordBatch data. -/// -/// TaskWriter coordinates writing data to Iceberg tables by: -/// - Selecting the appropriate partitioning strategy (unpartitioned, fanout, or clustered) -/// - Lazily initializing the partition splitter on first write -/// - Routing data to the underlying writer -/// - Collecting all written data files -/// -/// # Type Parameters -/// -/// * `B` - The IcebergWriterBuilder type used to create underlying writers -/// -/// # Example -/// -/// ```rust,ignore -/// use iceberg::spec::{PartitionSpec, Schema}; -/// use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder; -/// use iceberg_datafusion::writer::task_writer::TaskWriter; -/// -/// // Create a TaskWriter for an unpartitioned table -/// let task_writer = TaskWriter::new( -/// data_file_writer_builder, -/// false, // fanout_enabled -/// schema, -/// partition_spec, -/// ); -/// -/// // Write data -/// task_writer.write(record_batch).await?; -/// -/// // Close and get data files -/// let data_files = task_writer.close().await?; -/// ``` -#[allow(dead_code)] -pub(crate) struct TaskWriter { - /// The underlying writer (UnpartitionedWriter, FanoutWriter, or ClusteredWriter) - writer: SupportedWriter, - /// Lazily initialized partition splitter for partitioned tables - partition_splitter: Option, - /// Iceberg schema reference - schema: SchemaRef, - /// Partition specification reference - partition_spec: PartitionSpecRef, -} - -/// Internal enum to hold the different writer types. -/// -/// This enum allows TaskWriter to work with different partitioning strategies -/// while maintaining a unified interface. -#[allow(dead_code)] -enum SupportedWriter { - /// Writer for unpartitioned tables - Unpartitioned(UnpartitionedWriter), - /// Writer for partitioned tables with unsorted data (maintains multiple active writers) - Fanout(FanoutWriter), - /// Writer for partitioned tables with sorted data (maintains single active writer) - Clustered(ClusteredWriter), -} - -#[allow(dead_code)] -impl TaskWriter { - /// Create a new TaskWriter. - /// - /// # Parameters - /// - /// * `writer_builder` - The IcebergWriterBuilder to use for creating underlying writers - /// * `fanout_enabled` - If true, use FanoutWriter for partitioned tables; otherwise use ClusteredWriter - /// * `schema` - The Iceberg schema reference - /// * `partition_spec` - The partition specification reference - /// - /// # Returns - /// - /// Returns a new TaskWriter instance. - /// - /// # Writer Selection Logic - /// - /// - If partition_spec is unpartitioned: creates UnpartitionedWriter - /// - If partition_spec is partitioned AND fanout_enabled is true: creates FanoutWriter - /// - If partition_spec is partitioned AND fanout_enabled is false: creates ClusteredWriter - /// - /// # Example - /// - /// ```rust,ignore - /// use iceberg::spec::{PartitionSpec, Schema}; - /// use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder; - /// use iceberg_datafusion::writer::task_writer::TaskWriter; - /// - /// // Create a TaskWriter for an unpartitioned table - /// let task_writer = TaskWriter::new( - /// data_file_writer_builder, - /// false, // fanout_enabled - /// schema, - /// partition_spec, - /// ); - /// ``` - pub fn new( - writer_builder: B, - fanout_enabled: bool, - schema: SchemaRef, - partition_spec: PartitionSpecRef, - ) -> Self { - let writer = if partition_spec.is_unpartitioned() { - SupportedWriter::Unpartitioned(UnpartitionedWriter::new(writer_builder)) - } else if fanout_enabled { - SupportedWriter::Fanout(FanoutWriter::new(writer_builder)) - } else { - SupportedWriter::Clustered(ClusteredWriter::new(writer_builder)) - }; - - Self { - writer, - partition_splitter: None, - schema, - partition_spec, - } - } - - /// Write a RecordBatch to the TaskWriter. - /// - /// For the first write to a partitioned table, this method initializes the partition splitter. - /// For unpartitioned tables, data is written directly without splitting. - /// - /// # Parameters - /// - /// * `batch` - The RecordBatch to write - /// - /// # Returns - /// - /// Returns `Ok(())` on success, or an error if the write fails. - /// - /// # Errors - /// - /// This method will return an error if: - /// - Partition splitter initialization fails - /// - Splitting the batch by partition fails - /// - Writing to the underlying writer fails - /// - /// # Example - /// - /// ```rust,ignore - /// use arrow_array::RecordBatch; - /// use iceberg_datafusion::writer::task_writer::TaskWriter; - /// - /// // Write a RecordBatch - /// task_writer.write(record_batch).await?; - /// ``` - pub async fn write(&mut self, batch: RecordBatch) -> Result<()> { - match &mut self.writer { - SupportedWriter::Unpartitioned(writer) => { - // Unpartitioned: write directly without splitting - writer.write(batch).await - } - SupportedWriter::Fanout(writer) => { - // Initialize splitter on first write if needed - if self.partition_splitter.is_none() { - self.partition_splitter = - Some(RecordBatchPartitionSplitter::new_with_precomputed_values( - self.schema.clone(), - self.partition_spec.clone(), - )?); - } - - // Split and write partitioned data - Self::write_partitioned_batches(writer, &self.partition_splitter, &batch).await - } - SupportedWriter::Clustered(writer) => { - // Initialize splitter on first write if needed - if self.partition_splitter.is_none() { - self.partition_splitter = - Some(RecordBatchPartitionSplitter::new_with_precomputed_values( - self.schema.clone(), - self.partition_spec.clone(), - )?); - } - - // Split and write partitioned data - Self::write_partitioned_batches(writer, &self.partition_splitter, &batch).await - } - } - } - - /// Helper method to split and write partitioned data. - /// - /// This method handles the common logic for both FanoutWriter and ClusteredWriter: - /// - Splits the batch by partition key using the provided splitter - /// - Writes each partition to the underlying writer - /// - /// # Parameters - /// - /// * `writer` - The underlying PartitioningWriter (FanoutWriter or ClusteredWriter) - /// * `partition_splitter` - The partition splitter (must be initialized) - /// * `batch` - The RecordBatch to write - /// - /// # Returns - /// - /// Returns `Ok(())` on success, or an error if the operation fails. - async fn write_partitioned_batches( - writer: &mut W, - partition_splitter: &Option, - batch: &RecordBatch, - ) -> Result<()> { - // Split batch by partition - let splitter = partition_splitter - .as_ref() - .expect("Partition splitter should be initialized"); - let partitioned_batches = splitter.split(batch)?; - - // Write each partition - for (partition_key, partition_batch) in partitioned_batches { - writer.write(partition_key, partition_batch).await?; - } - - Ok(()) - } - - /// Close the TaskWriter and return all written data files. - /// - /// This method consumes the TaskWriter to prevent further use. - /// - /// # Returns - /// - /// Returns a `Vec` containing all written files, or an error if closing fails. - /// - /// # Errors - /// - /// This method will return an error if: - /// - Closing the underlying writer fails - /// - Any I/O operation fails during the close process - /// - /// # Example - /// - /// ```rust,ignore - /// use iceberg_datafusion::writer::task_writer::TaskWriter; - /// - /// // Close the writer and get all data files - /// let data_files = task_writer.close().await?; - /// ``` - pub async fn close(self) -> Result> { - match self.writer { - SupportedWriter::Unpartitioned(writer) => writer.close().await, - SupportedWriter::Fanout(writer) => writer.close().await, - SupportedWriter::Clustered(writer) => writer.close().await, - } - } -} - -#[cfg(test)] -mod tests { - use std::collections::HashMap; - use std::sync::Arc; - - use datafusion::arrow::array::{ArrayRef, Int32Array, RecordBatch, StringArray, StructArray}; - use datafusion::arrow::datatypes::{DataType, Field, Schema}; - use iceberg::arrow::PROJECTED_PARTITION_VALUE_COLUMN; - use iceberg::io::FileIOBuilder; - use iceberg::spec::{DataFileFormat, NestedField, PartitionSpec, PrimitiveType, Type}; - use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder; - use iceberg::writer::file_writer::ParquetWriterBuilder; - use iceberg::writer::file_writer::location_generator::{ - DefaultFileNameGenerator, DefaultLocationGenerator, - }; - use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder; - use parquet::arrow::PARQUET_FIELD_ID_META_KEY; - use parquet::file::properties::WriterProperties; - use tempfile::TempDir; - - use super::*; - - fn create_test_schema() -> Result> { - Ok(Arc::new( - iceberg::spec::Schema::builder() - .with_schema_id(1) - .with_fields(vec![ - NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), - NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(), - NestedField::required(3, "region", Type::Primitive(PrimitiveType::String)) - .into(), - ]) - .build()?, - )) - } - - fn create_arrow_schema() -> Arc { - Arc::new(Schema::new(vec![ - Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([( - PARQUET_FIELD_ID_META_KEY.to_string(), - "1".to_string(), - )])), - Field::new("name", DataType::Utf8, false).with_metadata(HashMap::from([( - PARQUET_FIELD_ID_META_KEY.to_string(), - "2".to_string(), - )])), - Field::new("region", DataType::Utf8, false).with_metadata(HashMap::from([( - PARQUET_FIELD_ID_META_KEY.to_string(), - "3".to_string(), - )])), - ])) - } - - fn create_arrow_schema_with_partition() -> Arc { - let partition_field = Field::new("region", DataType::Utf8, false).with_metadata( - HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "1000".to_string())]), - ); - let partition_struct_field = Field::new( - PROJECTED_PARTITION_VALUE_COLUMN, - DataType::Struct(vec![partition_field].into()), - false, - ); - - Arc::new(Schema::new(vec![ - Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([( - PARQUET_FIELD_ID_META_KEY.to_string(), - "1".to_string(), - )])), - Field::new("name", DataType::Utf8, false).with_metadata(HashMap::from([( - PARQUET_FIELD_ID_META_KEY.to_string(), - "2".to_string(), - )])), - Field::new("region", DataType::Utf8, false).with_metadata(HashMap::from([( - PARQUET_FIELD_ID_META_KEY.to_string(), - "3".to_string(), - )])), - partition_struct_field, - ])) - } - - fn create_writer_builder( - temp_dir: &TempDir, - schema: Arc, - ) -> Result< - DataFileWriterBuilder< - ParquetWriterBuilder, - DefaultLocationGenerator, - DefaultFileNameGenerator, - >, - > { - let file_io = FileIOBuilder::new_fs_io().build()?; - let location_gen = DefaultLocationGenerator::with_data_location( - temp_dir.path().to_str().unwrap().to_string(), - ); - let file_name_gen = - DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet); - let parquet_writer_builder = - ParquetWriterBuilder::new(WriterProperties::builder().build(), schema); - let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size( - parquet_writer_builder, - file_io, - location_gen, - file_name_gen, - ); - Ok(DataFileWriterBuilder::new(rolling_writer_builder)) - } - - #[tokio::test] - async fn test_task_writer_unpartitioned() -> Result<()> { - let temp_dir = TempDir::new()?; - let schema = create_test_schema()?; - let arrow_schema = create_arrow_schema(); - - // Create unpartitioned spec - let partition_spec = Arc::new(PartitionSpec::builder(schema.clone()).build()?); - - let writer_builder = create_writer_builder(&temp_dir, schema.clone())?; - let mut task_writer = TaskWriter::new(writer_builder, false, schema, partition_spec); - - // Write data - let batch = RecordBatch::try_new(arrow_schema, vec![ - Arc::new(Int32Array::from(vec![1, 2, 3])), - Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie"])), - Arc::new(StringArray::from(vec!["US", "EU", "US"])), - ])?; - - task_writer.write(batch).await?; - let data_files = task_writer.close().await?; - - // Verify results - assert!(!data_files.is_empty()); - assert_eq!(data_files[0].record_count(), 3); - - Ok(()) - } - - /// Helper to verify partition data files - fn verify_partition_files( - data_files: &[iceberg::spec::DataFile], - expected_total: u64, - ) -> HashMap { - let total_records: u64 = data_files.iter().map(|f| f.record_count()).sum(); - assert_eq!(total_records, expected_total, "Total record count mismatch"); - - let mut partition_counts = HashMap::new(); - for data_file in data_files { - let partition_value = data_file.partition(); - let region_literal = partition_value.fields()[0] - .as_ref() - .expect("Partition value should not be null"); - let region = match region_literal - .as_primitive_literal() - .expect("Should be primitive literal") - { - iceberg::spec::PrimitiveLiteral::String(s) => s.clone(), - _ => panic!("Expected string partition value"), - }; - - *partition_counts.entry(region.clone()).or_insert(0) += data_file.record_count(); - - // Verify file path contains partition information - assert!( - data_file.file_path().contains("region="), - "File path should contain partition info" - ); - } - partition_counts - } - - #[tokio::test] - async fn test_task_writer_partitioned_fanout() -> Result<()> { - let temp_dir = TempDir::new()?; - let schema = create_test_schema()?; - let arrow_schema = create_arrow_schema_with_partition(); - - let partition_spec = Arc::new( - PartitionSpec::builder(schema.clone()) - .with_spec_id(1) - .add_partition_field("region", "region", iceberg::spec::Transform::Identity)? - .build()?, - ); - - let writer_builder = create_writer_builder(&temp_dir, schema.clone())?; - let mut task_writer = TaskWriter::new(writer_builder, true, schema, partition_spec); - - // Create partition column - let partition_field = Field::new("region", DataType::Utf8, false).with_metadata( - HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "1000".to_string())]), - ); - let partition_values = StringArray::from(vec!["US", "EU", "US", "EU"]); - let partition_struct = StructArray::from(vec![( - Arc::new(partition_field), - Arc::new(partition_values) as ArrayRef, - )]); - - let batch = RecordBatch::try_new(arrow_schema, vec![ - Arc::new(Int32Array::from(vec![1, 2, 3, 4])), - Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie", "Dave"])), - Arc::new(StringArray::from(vec!["US", "EU", "US", "EU"])), - Arc::new(partition_struct), - ])?; - - task_writer.write(batch).await?; - let data_files = task_writer.close().await?; - - let partition_counts = verify_partition_files(&data_files, 4); - assert_eq!(partition_counts.get("US"), Some(&2)); - assert_eq!(partition_counts.get("EU"), Some(&2)); - - Ok(()) - } - - #[tokio::test] - async fn test_task_writer_partitioned_clustered() -> Result<()> { - let temp_dir = TempDir::new()?; - let schema = create_test_schema()?; - let arrow_schema = create_arrow_schema_with_partition(); - - let partition_spec = Arc::new( - PartitionSpec::builder(schema.clone()) - .with_spec_id(1) - .add_partition_field("region", "region", iceberg::spec::Transform::Identity)? - .build()?, - ); - - let writer_builder = create_writer_builder(&temp_dir, schema.clone())?; - let mut task_writer = TaskWriter::new(writer_builder, false, schema, partition_spec); - - // Create partition column - let partition_field = Field::new("region", DataType::Utf8, false).with_metadata( - HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "1000".to_string())]), - ); - let partition_values = StringArray::from(vec!["ASIA", "ASIA", "EU", "EU"]); - let partition_struct = StructArray::from(vec![( - Arc::new(partition_field), - Arc::new(partition_values) as ArrayRef, - )]); - - // ClusteredWriter expects data to be pre-sorted by partition - let batch = RecordBatch::try_new(arrow_schema, vec![ - Arc::new(Int32Array::from(vec![1, 2, 3, 4])), - Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie", "Dave"])), - Arc::new(StringArray::from(vec!["ASIA", "ASIA", "EU", "EU"])), - Arc::new(partition_struct), - ])?; - - task_writer.write(batch).await?; - let data_files = task_writer.close().await?; - - let partition_counts = verify_partition_files(&data_files, 4); - assert_eq!(partition_counts.get("ASIA"), Some(&2)); - assert_eq!(partition_counts.get("EU"), Some(&2)); - - Ok(()) - } -} +pub use iceberg::writer::task_writer::TaskWriter;