Skip to content

Commit 3bf7511

Browse files
committed
rolling
1 parent 2c06cfa commit 3bf7511

File tree

3 files changed

+132
-16
lines changed

3 files changed

+132
-16
lines changed

crates/iceberg/src/writer/base_writer/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,3 +19,4 @@
1919
2020
pub mod data_file_writer;
2121
pub mod equality_delete_writer;
22+
pub mod rolling_writer;
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::mem::take;
19+
20+
use arrow_array::RecordBatch;
21+
use async_trait::async_trait;
22+
23+
use crate::spec::DataFile;
24+
use crate::writer::base_writer::data_file_writer::DataFileWriter;
25+
use crate::writer::file_writer::FileWriterBuilder;
26+
use crate::writer::{IcebergWriter, IcebergWriterBuilder};
27+
use crate::{Error, ErrorKind, Result};
28+
29+
#[async_trait]
30+
pub trait RollingFileWriter: IcebergWriter {
31+
fn should_roll(&mut self, input_size: u64) -> bool;
32+
}
33+
34+
#[derive(Clone)]
35+
pub struct RollingDataFileWriterBuilder<B: FileWriterBuilder> {
36+
inner_builder: B,
37+
target_size: u64,
38+
}
39+
40+
impl<B: FileWriterBuilder> RollingDataFileWriterBuilder<B> {
41+
pub fn new(inner_builder: B, target_size: u64) -> Self {
42+
Self {
43+
inner_builder,
44+
target_size,
45+
}
46+
}
47+
}
48+
49+
#[async_trait]
50+
impl<B: FileWriterBuilder> IcebergWriterBuilder for RollingDataFileWriterBuilder<B> {
51+
type R = RollingDataFileWriter<B>;
52+
53+
async fn build(self) -> Result<Self::R> {
54+
Ok(RollingDataFileWriter {
55+
inner: None,
56+
inner_builder: self.inner_builder,
57+
target_size: self.target_size,
58+
written_size: 0,
59+
data_files: vec![],
60+
})
61+
}
62+
}
63+
64+
pub struct RollingDataFileWriter<B: FileWriterBuilder> {
65+
inner: Option<DataFileWriter<B::R>>,
66+
inner_builder: B,
67+
target_size: u64,
68+
written_size: u64,
69+
data_files: Vec<DataFile>,
70+
}
71+
72+
#[async_trait]
73+
impl<B: FileWriterBuilder> IcebergWriter for RollingDataFileWriter<B> {
74+
async fn write(&mut self, input: RecordBatch) -> Result<()> {
75+
let input_size = input.get_array_memory_size() as u64;
76+
if self.should_roll(input_size) {
77+
if let Some(mut inner) = self.inner.take() {
78+
// close the current writer, roll to a new file
79+
self.data_files.extend(inner.close().await?);
80+
}
81+
82+
// clear bytes written
83+
self.written_size = 0;
84+
}
85+
86+
if self.inner.is_none() {
87+
// start a new writer
88+
self.inner = Some(self.inner_builder.clone().build().await?);
89+
}
90+
91+
// write the input and count bytes written
92+
let Some(writer) = self.inner.as_mut() else {
93+
return Err(Error::new(
94+
ErrorKind::Unexpected,
95+
"Writer is not initialized!",
96+
));
97+
};
98+
writer.write(input).await?;
99+
self.written_size += input_size;
100+
Ok(())
101+
}
102+
103+
async fn close(&mut self) -> Result<Vec<DataFile>> {
104+
Ok(take(&mut self.data_files))
105+
}
106+
}
107+
108+
impl<B: FileWriterBuilder> RollingFileWriter for RollingDataFileWriter<B> {
109+
fn should_roll(&mut self, input_size: u64) -> bool {
110+
self.written_size + input_size > self.target_size
111+
}
112+
}

crates/integrations/datafusion/src/physical_plan/write.rs

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ use iceberg::spec::{
4242
};
4343
use iceberg::table::Table;
4444
use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
45+
use iceberg::writer::base_writer::rolling_writer::RollingDataFileWriterBuilder;
4546
use iceberg::writer::file_writer::ParquetWriterBuilder;
4647
use iceberg::writer::file_writer::location_generator::{
4748
DefaultFileNameGenerator, DefaultLocationGenerator,
@@ -191,22 +192,24 @@ impl ExecutionPlan for IcebergWriteExec {
191192
}
192193

193194
// Create data file writer builder
194-
let data_file_writer_builder = DataFileWriterBuilder::new(
195-
ParquetWriterBuilder::new(
196-
WriterProperties::default(),
197-
self.table.metadata().current_schema().clone(),
198-
self.table.file_io().clone(),
199-
DefaultLocationGenerator::new(self.table.metadata().clone())
200-
.map_err(to_datafusion_error)?,
201-
// todo filename prefix/suffix should be configurable
202-
DefaultFileNameGenerator::new(
203-
"datafusion".to_string(),
204-
Some(Uuid::now_v7().to_string()),
205-
file_format,
206-
),
195+
let parquet_file_writer_builder = ParquetWriterBuilder::new(
196+
WriterProperties::default(),
197+
self.table.metadata().current_schema().clone(),
198+
self.table.file_io().clone(),
199+
DefaultLocationGenerator::new(self.table.metadata().clone())
200+
.map_err(to_datafusion_error)?,
201+
// todo filename prefix/suffix should be configurable
202+
DefaultFileNameGenerator::new(
203+
"datafusion".to_string(),
204+
Some(Uuid::now_v7().to_string()),
205+
file_format,
207206
),
208-
None,
209-
spec_id,
207+
);
208+
let data_file_writer_builder =
209+
DataFileWriterBuilder::new(parquet_file_writer_builder, None, spec_id);
210+
let rolling_writer_builder = RollingDataFileWriterBuilder::new(
211+
data_file_writer_builder,
212+
100 * 1024 * 1024, // todo use a config
210213
);
211214

212215
// Get input data
@@ -222,7 +225,7 @@ impl ExecutionPlan for IcebergWriteExec {
222225

223226
// Create write stream
224227
let stream = futures::stream::once(async move {
225-
let mut writer = data_file_writer_builder
228+
let mut writer = rolling_writer_builder
226229
.build()
227230
.await
228231
.map_err(to_datafusion_error)?;

0 commit comments

Comments
 (0)