Skip to content

Commit 0f9bce0

Browse files
committed
rolling in the deep
1 parent c5b1c38 commit 0f9bce0

File tree

3 files changed

+56
-16
lines changed

3 files changed

+56
-16
lines changed

crates/iceberg/src/writer/base_writer/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,5 @@
1919
2020
pub mod data_file_writer;
2121
pub mod equality_delete_writer;
22+
/// Module providing writers that can automatically roll over to new files based on size thresholds.
2223
pub mod rolling_writer;

crates/iceberg/src/writer/base_writer/rolling_writer.rs

Lines changed: 52 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,25 +19,46 @@ use std::mem::take;
1919

2020
use arrow_array::RecordBatch;
2121
use async_trait::async_trait;
22+
use futures::future::try_join_all;
2223

24+
use crate::runtime::{JoinHandle, spawn};
2325
use crate::spec::DataFile;
24-
use crate::writer::base_writer::data_file_writer::DataFileWriter;
25-
use crate::writer::file_writer::FileWriterBuilder;
2626
use crate::writer::{IcebergWriter, IcebergWriterBuilder};
2727
use crate::{Error, ErrorKind, Result};
2828

29+
/// A writer that can roll over to a new file when certain conditions are met.
30+
///
31+
/// This trait extends `IcebergWriter` with the ability to determine when to start
32+
/// writing to a new file based on the size of incoming data.
2933
#[async_trait]
3034
pub trait RollingFileWriter: IcebergWriter {
35+
/// Determines if the writer should roll over to a new file.
36+
///
37+
/// # Arguments
38+
///
39+
/// * `input_size` - The size in bytes of the incoming data
40+
///
41+
/// # Returns
42+
///
43+
/// `true` if a new file should be started, `false` otherwise
3144
fn should_roll(&mut self, input_size: u64) -> bool;
3245
}
3346

47+
/// Builder for creating a `RollingDataFileWriter` that rolls over to a new file
48+
/// when the data size exceeds a target threshold.
3449
#[derive(Clone)]
35-
pub struct RollingDataFileWriterBuilder<B: FileWriterBuilder> {
50+
pub struct RollingDataFileWriterBuilder<B: IcebergWriterBuilder> {
3651
inner_builder: B,
3752
target_size: u64,
3853
}
3954

40-
impl<B: FileWriterBuilder> RollingDataFileWriterBuilder<B> {
55+
impl<B: IcebergWriterBuilder> RollingDataFileWriterBuilder<B> {
56+
/// Creates a new `RollingDataFileWriterBuilder` with the specified inner builder and target size.
57+
///
58+
/// # Arguments
59+
///
60+
/// * `inner_builder` - The builder for the underlying file writer
61+
/// * `target_size` - The target size in bytes before rolling over to a new file
4162
pub fn new(inner_builder: B, target_size: u64) -> Self {
4263
Self {
4364
inner_builder,
@@ -47,7 +68,7 @@ impl<B: FileWriterBuilder> RollingDataFileWriterBuilder<B> {
4768
}
4869

4970
#[async_trait]
50-
impl<B: FileWriterBuilder> IcebergWriterBuilder for RollingDataFileWriterBuilder<B> {
71+
impl<B: IcebergWriterBuilder> IcebergWriterBuilder for RollingDataFileWriterBuilder<B> {
5172
type R = RollingDataFileWriter<B>;
5273

5374
async fn build(self) -> Result<Self::R> {
@@ -56,27 +77,34 @@ impl<B: FileWriterBuilder> IcebergWriterBuilder for RollingDataFileWriterBuilder
5677
inner_builder: self.inner_builder,
5778
target_size: self.target_size,
5879
written_size: 0,
59-
data_files: vec![],
80+
close_handles: vec![],
6081
})
6182
}
6283
}
6384

64-
pub struct RollingDataFileWriter<B: FileWriterBuilder> {
65-
inner: Option<DataFileWriter<B::R>>,
85+
/// A writer that automatically rolls over to a new file when the data size
86+
/// exceeds a target threshold.
87+
///
88+
/// This writer wraps another file writer and tracks the amount of data written.
89+
/// When the data size exceeds the target size, it closes the current file and
90+
/// starts writing to a new one.
91+
pub struct RollingDataFileWriter<B: IcebergWriterBuilder> {
92+
inner: Option<B::R>,
6693
inner_builder: B,
6794
target_size: u64,
6895
written_size: u64,
69-
data_files: Vec<DataFile>,
96+
close_handles: Vec<JoinHandle<Result<Vec<DataFile>>>>,
7097
}
7198

7299
#[async_trait]
73-
impl<B: FileWriterBuilder> IcebergWriter for RollingDataFileWriter<B> {
100+
impl<B: IcebergWriterBuilder> IcebergWriter for RollingDataFileWriter<B> {
74101
async fn write(&mut self, input: RecordBatch) -> Result<()> {
75102
let input_size = input.get_array_memory_size() as u64;
76103
if self.should_roll(input_size) {
77104
if let Some(mut inner) = self.inner.take() {
78105
// close the current writer, roll to a new file
79-
self.data_files.extend(inner.close().await?);
106+
let handle = spawn(async move { inner.close().await });
107+
self.close_handles.push(handle)
80108
}
81109

82110
// clear bytes written
@@ -101,11 +129,22 @@ impl<B: FileWriterBuilder> IcebergWriter for RollingDataFileWriter<B> {
101129
}
102130

103131
async fn close(&mut self) -> Result<Vec<DataFile>> {
104-
Ok(take(&mut self.data_files))
132+
let mut data_files = try_join_all(take(&mut self.close_handles))
133+
.await?
134+
.into_iter()
135+
.flatten()
136+
.collect::<Vec<DataFile>>();
137+
138+
// close the current writer and merge the output
139+
if let Some(mut current_writer) = take(&mut self.inner) {
140+
data_files.extend(current_writer.close().await?);
141+
}
142+
143+
Ok(data_files)
105144
}
106145
}
107146

108-
impl<B: FileWriterBuilder> RollingFileWriter for RollingDataFileWriter<B> {
147+
impl<B: IcebergWriterBuilder> RollingFileWriter for RollingDataFileWriter<B> {
109148
fn should_roll(&mut self, input_size: u64) -> bool {
110149
self.written_size + input_size > self.target_size
111150
}

crates/integrations/datafusion/src/physical_plan/write.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -235,10 +235,10 @@ impl ExecutionPlan for IcebergWriteExec {
235235
writer.write(batch?).await.map_err(to_datafusion_error)?;
236236
}
237237

238-
let data_file_builders = writer.close().await.map_err(to_datafusion_error)?;
238+
let data_files = writer.close().await.map_err(to_datafusion_error)?;
239239

240240
// Convert builders to data files and then to JSON strings
241-
let data_files: Vec<String> = data_file_builders
241+
let data_files_strs: Vec<String> = data_files
242242
.into_iter()
243243
.map(|data_file| -> DFResult<String> {
244244
// Serialize to JSON
@@ -251,7 +251,7 @@ impl ExecutionPlan for IcebergWriteExec {
251251
})
252252
.collect::<DFResult<Vec<String>>>()?;
253253

254-
Self::make_result_batch(data_files)
254+
Self::make_result_batch(data_files_strs)
255255
})
256256
.boxed();
257257

0 commit comments

Comments
 (0)