Skip to content

Commit 42191e9

Browse files
authored
refactor(writer): Refactor writers for the future partitioning writers (#1657)
## Which issue does this PR close? - Closes #1650 ## What changes are included in this PR? Refactored the writer layers; from a bird’s-eye view, the structure now looks like this: ```mermaid flowchart TD subgraph PartitioningWriter PW[PartitioningWriter] subgraph DataFileWriter RW[DataFileWriter] subgraph RollingWriter DFW[RollingWriter] subgraph FileWriter FW[FileWriter] end DFW --> FW end RW --> DFW end PW --> RW end ``` ### Key Changes - Modified `RollingFileWriter` to handle location generator, file name generator, and partition keys directly - Simplified `ParquetWriterBuilder` interface to accept output files during build - Restructured `DataFileWriterBuilder` to use `RollingFileWriter` with partition keys - Updated DataFusion integration to work with the new writer architecture - NOTE: Technically DataFusion or any engine should use `TaskWriter` -> `PartitioningWriter` -> `RollingWriter` -> ..., but `TaskWriter` and `PartitioningWriter` are not included in this draft so far ## Are these changes tested? Not yet, but changing the existing tests accordingly should be enough
1 parent 3ba59f7 commit 42191e9

File tree

13 files changed

+570
-375
lines changed

13 files changed

+570
-375
lines changed

crates/iceberg/src/spec/manifest/data_file.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use serde_with::{DeserializeFromStr, SerializeDisplay};
2626
use super::_serde::DataFileSerde;
2727
use super::{Datum, FormatVersion, Schema, data_file_schema_v1, data_file_schema_v2};
2828
use crate::error::Result;
29-
use crate::spec::{Struct, StructType};
29+
use crate::spec::{DEFAULT_PARTITION_SPEC_ID, Struct, StructType};
3030
use crate::{Error, ErrorKind};
3131

3232
/// Data file carries data file path, partition tuple, metrics, …
@@ -49,6 +49,7 @@ pub struct DataFile {
4949
///
5050
/// Partition data tuple, schema based on the partition spec output using
5151
/// partition field ids for the struct field ids
52+
#[builder(default = "Struct::empty()")]
5253
pub(crate) partition: Struct,
5354
/// field id: 103
5455
///
@@ -156,6 +157,7 @@ pub struct DataFile {
156157
pub(crate) first_row_id: Option<i64>,
157158
/// This field is not included in spec. It is just store in memory representation used
158159
/// in process.
160+
#[builder(default = "DEFAULT_PARTITION_SPEC_ID")]
159161
pub(crate) partition_spec_id: i32,
160162
/// field id: 143
161163
///

crates/iceberg/src/spec/partition.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,15 @@ impl PartitionKey {
194194
Self { spec, schema, data }
195195
}
196196

197+
/// Creates a new partition key from another partition key, with a new data field.
198+
pub fn copy_with_data(&self, data: Struct) -> Self {
199+
Self {
200+
spec: self.spec.clone(),
201+
schema: self.schema.clone(),
202+
data,
203+
}
204+
}
205+
197206
/// Generates a partition path based on the partition values.
198207
pub fn to_path(&self) -> String {
199208
self.spec.partition_to_path(&self.data, self.schema.clone())
@@ -207,6 +216,21 @@ impl PartitionKey {
207216
Some(pk) => pk.spec.is_unpartitioned(),
208217
}
209218
}
219+
220+
/// Returns the associated [`PartitionSpec`].
221+
pub fn spec(&self) -> &PartitionSpec {
222+
&self.spec
223+
}
224+
225+
/// Returns the associated [`SchemaRef`].
226+
pub fn schema(&self) -> &SchemaRef {
227+
&self.schema
228+
}
229+
230+
/// Returns the associated [`Struct`].
231+
pub fn data(&self) -> &Struct {
232+
&self.data
233+
}
210234
}
211235

212236
/// Reference to [`UnboundPartitionSpec`].

crates/iceberg/src/writer/base_writer/data_file_writer.rs

Lines changed: 101 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -18,86 +18,126 @@
1818
//! This module provide `DataFileWriter`.
1919
2020
use arrow_array::RecordBatch;
21-
use itertools::Itertools;
2221

23-
use crate::Result;
24-
use crate::spec::{DataContentType, DataFile, Struct};
25-
use crate::writer::file_writer::{FileWriter, FileWriterBuilder};
22+
use crate::spec::{DataContentType, DataFile, PartitionKey};
23+
use crate::writer::file_writer::FileWriterBuilder;
24+
use crate::writer::file_writer::location_generator::{FileNameGenerator, LocationGenerator};
25+
use crate::writer::file_writer::rolling_writer::{RollingFileWriter, RollingFileWriterBuilder};
2626
use crate::writer::{CurrentFileStatus, IcebergWriter, IcebergWriterBuilder};
27+
use crate::{Error, ErrorKind, Result};
2728

2829
/// Builder for `DataFileWriter`.
2930
#[derive(Clone, Debug)]
30-
pub struct DataFileWriterBuilder<B: FileWriterBuilder> {
31-
inner: B,
32-
partition_value: Option<Struct>,
33-
partition_spec_id: i32,
31+
pub struct DataFileWriterBuilder<B: FileWriterBuilder, L: LocationGenerator, F: FileNameGenerator> {
32+
inner: RollingFileWriterBuilder<B, L, F>,
33+
partition_key: Option<PartitionKey>,
3434
}
3535

36-
impl<B: FileWriterBuilder> DataFileWriterBuilder<B> {
37-
/// Create a new `DataFileWriterBuilder` using a `FileWriterBuilder`.
38-
pub fn new(inner: B, partition_value: Option<Struct>, partition_spec_id: i32) -> Self {
36+
impl<B, L, F> DataFileWriterBuilder<B, L, F>
37+
where
38+
B: FileWriterBuilder,
39+
L: LocationGenerator,
40+
F: FileNameGenerator,
41+
{
42+
/// Create a new `DataFileWriterBuilder` using a `RollingFileWriterBuilder`.
43+
pub fn new(
44+
inner_builder: RollingFileWriterBuilder<B, L, F>,
45+
partition_key: Option<PartitionKey>,
46+
) -> Self {
3947
Self {
40-
inner,
41-
partition_value,
42-
partition_spec_id,
48+
inner: inner_builder,
49+
partition_key,
4350
}
4451
}
4552
}
4653

4754
#[async_trait::async_trait]
48-
impl<B: FileWriterBuilder> IcebergWriterBuilder for DataFileWriterBuilder<B> {
49-
type R = DataFileWriter<B>;
55+
impl<B, L, F> IcebergWriterBuilder for DataFileWriterBuilder<B, L, F>
56+
where
57+
B: FileWriterBuilder,
58+
L: LocationGenerator,
59+
F: FileNameGenerator,
60+
{
61+
type R = DataFileWriter<B, L, F>;
5062

5163
async fn build(self) -> Result<Self::R> {
5264
Ok(DataFileWriter {
53-
inner_writer: Some(self.inner.clone().build().await?),
54-
partition_value: self.partition_value.unwrap_or(Struct::empty()),
55-
partition_spec_id: self.partition_spec_id,
65+
inner: Some(self.inner.clone().build()),
66+
partition_key: self.partition_key,
5667
})
5768
}
5869
}
5970

6071
/// A writer write data is within one spec/partition.
6172
#[derive(Debug)]
62-
pub struct DataFileWriter<B: FileWriterBuilder> {
63-
inner_writer: Option<B::R>,
64-
partition_value: Struct,
65-
partition_spec_id: i32,
73+
pub struct DataFileWriter<B: FileWriterBuilder, L: LocationGenerator, F: FileNameGenerator> {
74+
inner: Option<RollingFileWriter<B, L, F>>,
75+
partition_key: Option<PartitionKey>,
6676
}
6777

6878
#[async_trait::async_trait]
69-
impl<B: FileWriterBuilder> IcebergWriter for DataFileWriter<B> {
79+
impl<B, L, F> IcebergWriter for DataFileWriter<B, L, F>
80+
where
81+
B: FileWriterBuilder,
82+
L: LocationGenerator,
83+
F: FileNameGenerator,
84+
{
7085
async fn write(&mut self, batch: RecordBatch) -> Result<()> {
71-
self.inner_writer.as_mut().unwrap().write(&batch).await
86+
if let Some(writer) = self.inner.as_mut() {
87+
writer.write(&self.partition_key, &batch).await
88+
} else {
89+
Err(Error::new(
90+
ErrorKind::Unexpected,
91+
"Writer is not initialized!",
92+
))
93+
}
7294
}
7395

7496
async fn close(&mut self) -> Result<Vec<DataFile>> {
75-
let writer = self.inner_writer.take().unwrap();
76-
Ok(writer
77-
.close()
78-
.await?
79-
.into_iter()
80-
.map(|mut res| {
81-
res.content(DataContentType::Data);
82-
res.partition(self.partition_value.clone());
83-
res.partition_spec_id(self.partition_spec_id);
84-
res.build().expect("Guaranteed to be valid")
85-
})
86-
.collect_vec())
97+
if let Some(writer) = self.inner.take() {
98+
writer
99+
.close()
100+
.await?
101+
.into_iter()
102+
.map(|mut res| {
103+
res.content(DataContentType::Data);
104+
if let Some(pk) = self.partition_key.as_ref() {
105+
res.partition(pk.data().clone());
106+
res.partition_spec_id(pk.spec().spec_id());
107+
}
108+
res.build().map_err(|e| {
109+
Error::new(
110+
ErrorKind::DataInvalid,
111+
format!("Failed to build data file: {}", e),
112+
)
113+
})
114+
})
115+
.collect()
116+
} else {
117+
Err(Error::new(
118+
ErrorKind::Unexpected,
119+
"Data file writer has been closed.",
120+
))
121+
}
87122
}
88123
}
89124

90-
impl<B: FileWriterBuilder> CurrentFileStatus for DataFileWriter<B> {
125+
impl<B, L, F> CurrentFileStatus for DataFileWriter<B, L, F>
126+
where
127+
B: FileWriterBuilder,
128+
L: LocationGenerator,
129+
F: FileNameGenerator,
130+
{
91131
fn current_file_path(&self) -> String {
92-
self.inner_writer.as_ref().unwrap().current_file_path()
132+
self.inner.as_ref().unwrap().current_file_path()
93133
}
94134

95135
fn current_row_num(&self) -> usize {
96-
self.inner_writer.as_ref().unwrap().current_row_num()
136+
self.inner.as_ref().unwrap().current_row_num()
97137
}
98138

99139
fn current_written_size(&self) -> usize {
100-
self.inner_writer.as_ref().unwrap().current_written_size()
140+
self.inner.as_ref().unwrap().current_written_size()
101141
}
102142
}
103143

@@ -116,13 +156,15 @@ mod test {
116156
use crate::Result;
117157
use crate::io::FileIOBuilder;
118158
use crate::spec::{
119-
DataContentType, DataFileFormat, Literal, NestedField, PrimitiveType, Schema, Struct, Type,
159+
DataContentType, DataFileFormat, Literal, NestedField, PartitionKey, PartitionSpec,
160+
PrimitiveType, Schema, Struct, Type,
120161
};
121162
use crate::writer::base_writer::data_file_writer::DataFileWriterBuilder;
122163
use crate::writer::file_writer::ParquetWriterBuilder;
123164
use crate::writer::file_writer::location_generator::{
124165
DefaultFileNameGenerator, DefaultLocationGenerator,
125166
};
167+
use crate::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
126168
use crate::writer::{IcebergWriter, IcebergWriterBuilder, RecordBatch};
127169

128170
#[tokio::test]
@@ -143,16 +185,16 @@ mod test {
143185
])
144186
.build()?;
145187

146-
let pw = ParquetWriterBuilder::new(
147-
WriterProperties::builder().build(),
148-
Arc::new(schema),
149-
None,
188+
let pw = ParquetWriterBuilder::new(WriterProperties::builder().build(), Arc::new(schema));
189+
190+
let rolling_file_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
191+
pw,
150192
file_io.clone(),
151193
location_gen,
152194
file_name_gen,
153195
);
154196

155-
let mut data_file_writer = DataFileWriterBuilder::new(pw, None, 0)
197+
let mut data_file_writer = DataFileWriterBuilder::new(rolling_file_writer_builder, None)
156198
.build()
157199
.await
158200
.unwrap();
@@ -219,20 +261,27 @@ mod test {
219261
NestedField::required(6, "name", Type::Primitive(PrimitiveType::String)).into(),
220262
])
221263
.build()?;
264+
let schema_ref = Arc::new(schema);
222265

223266
let partition_value = Struct::from_iter([Some(Literal::int(1))]);
267+
let partition_key = PartitionKey::new(
268+
PartitionSpec::builder(schema_ref.clone()).build()?,
269+
schema_ref.clone(),
270+
partition_value.clone(),
271+
);
224272

225-
let parquet_writer_builder = ParquetWriterBuilder::new(
226-
WriterProperties::builder().build(),
227-
Arc::new(schema.clone()),
228-
None,
273+
let parquet_writer_builder =
274+
ParquetWriterBuilder::new(WriterProperties::builder().build(), schema_ref.clone());
275+
276+
let rolling_file_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
277+
parquet_writer_builder,
229278
file_io.clone(),
230279
location_gen,
231280
file_name_gen,
232281
);
233282

234283
let mut data_file_writer =
235-
DataFileWriterBuilder::new(parquet_writer_builder, Some(partition_value.clone()), 0)
284+
DataFileWriterBuilder::new(rolling_file_writer_builder, Some(partition_key))
236285
.build()
237286
.await?;
238287

0 commit comments

Comments
 (0)