diff --git a/crates/iceberg/src/spec/partition.rs b/crates/iceberg/src/spec/partition.rs index fff9b62d7..d6b83ea33 100644 --- a/crates/iceberg/src/spec/partition.rs +++ b/crates/iceberg/src/spec/partition.rs @@ -178,7 +178,7 @@ impl PartitionSpec { /// A partition key represents a specific partition in a table, containing the partition spec, /// schema, and the actual partition values. -#[derive(Clone, Debug)] +#[derive(Clone, Debug, PartialEq, Eq)] pub struct PartitionKey { /// The partition spec that contains the partition fields. spec: PartitionSpec, diff --git a/crates/iceberg/src/writer/file_writer/clustered_writer.rs b/crates/iceberg/src/writer/file_writer/clustered_writer.rs new file mode 100644 index 000000000..3922e00bd --- /dev/null +++ b/crates/iceberg/src/writer/file_writer/clustered_writer.rs @@ -0,0 +1,155 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow_array::RecordBatch; +use arrow_schema::SchemaRef as ArrowSchemaRef; + +use crate::arrow::record_batch_partition_splitter::RecordBatchPartitionSplitter; +use crate::spec::{DataFileBuilder, PartitionKey, PartitionSpecRef, SchemaRef}; +use crate::writer::CurrentFileStatus; +use crate::writer::file_writer::{FileWriter, FileWriterBuilder}; +use crate::{Error, ErrorKind, Result}; + +/// todo doc +#[derive(Clone)] +pub struct ClusteredWriterBuilder { + inner_builder: B, + partition_spec: PartitionSpecRef, + table_schema: SchemaRef, + arrow_schema: ArrowSchemaRef, +} + +impl ClusteredWriterBuilder { + #[allow(dead_code)] + pub fn new( + inner_builder: B, + partition_spec: PartitionSpecRef, + table_schema: SchemaRef, + arrow_schema: ArrowSchemaRef, + ) -> Self { + Self { + inner_builder, + partition_spec, + table_schema, + arrow_schema, + } + } +} + +impl FileWriterBuilder for ClusteredWriterBuilder { + type R = ClusteredWriter; + + async fn build(self) -> Result { + let splitter = RecordBatchPartitionSplitter::new( + self.arrow_schema, + self.table_schema.clone(), + self.partition_spec.clone(), + )?; + + Ok(ClusteredWriter { + inner: None, + inner_builder: self.inner_builder, + splitter, + table_schema: self.table_schema, + partition_spec: self.partition_spec, + current_partition_key: None, + data_file_builders: vec![], + }) + } +} + +pub struct ClusteredWriter { + inner: Option, + inner_builder: B, + splitter: RecordBatchPartitionSplitter, + table_schema: SchemaRef, + partition_spec: PartitionSpecRef, + current_partition_key: Option, + data_file_builders: Vec, +} + +impl FileWriter for ClusteredWriter { + async fn write(&mut self, batch: &RecordBatch) -> Result<()> { + let splits = self.splitter.split(batch)?; + if splits.len() > 1 { + // todo revisit this, should we assume one batch can contain at most one partition's data? + return Err(Error::new( + ErrorKind::DataInvalid, + "Records from multiple partitions found in one record batch!", + )); + } + + if let Some((partition_value, record_batch)) = splits.first() { + let partition_key = PartitionKey::new( + self.partition_spec.as_ref().clone(), + self.table_schema.clone(), + partition_value.clone(), + ); + + if self + .current_partition_key + .as_ref() + .is_none_or(|pk| pk != &partition_key) + { + if let Some(inner) = self.inner.take() { + // Close the current writer, roll to a new file + self.data_file_builders.extend(inner.close().await?); + + // Start a new writer + // TODO how to pass partition key to inner builder?? + self.inner = Some(self.inner_builder.clone().build().await?); + } + } + + if let Some(writer) = self.inner.as_mut() { + Ok(writer.write(record_batch).await?) + } else { + Err(Error::new( + ErrorKind::Unexpected, + "Writer is not initialized!", + )) + } + } else { + // Input is empty + // todo should we fail? + Ok(()) + } + } + + async fn close(mut self) -> Result> { + // Close the current writer and merge the output + if let Some(current_writer) = self.inner { + self.data_file_builders + .extend(current_writer.close().await?); + } + Ok(self.data_file_builders) + } +} + +impl CurrentFileStatus for ClusteredWriter { + fn current_file_path(&self) -> String { + self.inner.as_ref().unwrap().current_file_path() + } + + fn current_row_num(&self) -> usize { + self.inner.as_ref().unwrap().current_row_num() + } + + fn current_written_size(&self) -> usize { + self.inner.as_ref().unwrap().current_written_size() + } +} diff --git a/crates/iceberg/src/writer/file_writer/mod.rs b/crates/iceberg/src/writer/file_writer/mod.rs index 2a5a73553..808b6ab53 100644 --- a/crates/iceberg/src/writer/file_writer/mod.rs +++ b/crates/iceberg/src/writer/file_writer/mod.rs @@ -27,6 +27,7 @@ use crate::spec::DataFileBuilder; mod parquet_writer; pub use parquet_writer::{ParquetWriter, ParquetWriterBuilder}; +mod clustered_writer; pub mod location_generator; /// Module providing writers that can automatically roll over to new files based on size thresholds. pub mod rolling_writer;