Skip to content

Commit 3ab45ee

Browse files
CTTYliurenjie1024
andauthored
feat(core): Implement RollingFileWriter to help split data into multiple files (#1547)
## Which issue does this PR close? - Closes #1541 ## What changes are included in this PR? - Added `RollingFileWriter` - Fix some minor typos in writer mod ## Are these changes tested? added unit tests --------- Co-authored-by: Renjie Liu <[email protected]>
1 parent 2d7bae9 commit 3ab45ee

File tree

5 files changed

+339
-11
lines changed

5 files changed

+339
-11
lines changed

crates/iceberg/src/writer/file_writer/location_generator.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use crate::spec::{DataFileFormat, TableMetadata};
2626
/// `LocationGenerator` used to generate the location of data file.
2727
pub trait LocationGenerator: Clone + Send + 'static {
2828
/// Generate an absolute path for the given file name.
29-
/// e.g
29+
/// e.g.
3030
/// For file name "part-00000.parquet", the generated location maybe "/table/data/part-00000.parquet"
3131
fn generate_location(&self, file_name: &str) -> String;
3232
}

crates/iceberg/src/writer/file_writer/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ pub use parquet_writer::{ParquetWriter, ParquetWriterBuilder};
2929
mod track_writer;
3030

3131
pub mod location_generator;
32+
/// Module providing writers that can automatically roll over to new files based on size thresholds.
33+
pub mod rolling_writer;
3234

3335
type DefaultOutput = Vec<DataFileBuilder>;
3436

crates/iceberg/src/writer/file_writer/parquet_writer.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ impl<T: LocationGenerator, F: FileNameGenerator> ParquetWriterBuilder<T, F> {
8686
impl<T: LocationGenerator, F: FileNameGenerator> FileWriterBuilder for ParquetWriterBuilder<T, F> {
8787
type R = ParquetWriter;
8888

89-
async fn build(self) -> crate::Result<Self::R> {
89+
async fn build(self) -> Result<Self::R> {
9090
let written_size = Arc::new(AtomicI64::new(0));
9191
let out_file = self.file_io.new_output(
9292
self.location_generator
@@ -517,7 +517,7 @@ impl ParquetWriter {
517517
}
518518

519519
impl FileWriter for ParquetWriter {
520-
async fn write(&mut self, batch: &arrow_array::RecordBatch) -> crate::Result<()> {
520+
async fn write(&mut self, batch: &arrow_array::RecordBatch) -> Result<()> {
521521
// Skip empty batch
522522
if batch.num_rows() == 0 {
523523
return Ok(());
Lines changed: 325 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,325 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use arrow_array::RecordBatch;
19+
20+
use crate::spec::DataFileBuilder;
21+
use crate::writer::CurrentFileStatus;
22+
use crate::writer::file_writer::{FileWriter, FileWriterBuilder};
23+
use crate::{Error, ErrorKind, Result};
24+
25+
/// Builder for creating a `RollingFileWriter` that rolls over to a new file
26+
/// when the data size exceeds a target threshold.
27+
#[derive(Clone)]
28+
pub struct RollingFileWriterBuilder<B: FileWriterBuilder> {
29+
inner_builder: B,
30+
target_file_size: usize,
31+
}
32+
33+
impl<B: FileWriterBuilder> RollingFileWriterBuilder<B> {
34+
/// Creates a new `RollingFileWriterBuilder` with the specified inner builder and target size.
35+
///
36+
/// # Arguments
37+
///
38+
/// * `inner_builder` - The builder for the underlying file writer
39+
/// * `target_file_size` - The target size in bytes before rolling over to a new file
40+
///
41+
/// NOTE: The `target_file_size` does not exactly reflect the final size on physical storage.
42+
/// This is because the input size is based on the Arrow in-memory format and cannot precisely control rollover behavior.
43+
/// The actual file size on disk is expected to be slightly larger than `target_file_size`.
44+
pub fn new(inner_builder: B, target_file_size: usize) -> Self {
45+
Self {
46+
inner_builder,
47+
target_file_size,
48+
}
49+
}
50+
}
51+
52+
impl<B: FileWriterBuilder> FileWriterBuilder for RollingFileWriterBuilder<B> {
53+
type R = RollingFileWriter<B>;
54+
55+
async fn build(self) -> Result<Self::R> {
56+
Ok(RollingFileWriter {
57+
inner: None,
58+
inner_builder: self.inner_builder,
59+
target_file_size: self.target_file_size,
60+
data_file_builders: vec![],
61+
})
62+
}
63+
}
64+
65+
/// A writer that automatically rolls over to a new file when the data size
66+
/// exceeds a target threshold.
67+
///
68+
/// This writer wraps another file writer that tracks the amount of data written.
69+
/// When the data size exceeds the target size, it closes the current file and
70+
/// starts writing to a new one.
71+
pub struct RollingFileWriter<B: FileWriterBuilder> {
72+
inner: Option<B::R>,
73+
inner_builder: B,
74+
target_file_size: usize,
75+
data_file_builders: Vec<DataFileBuilder>,
76+
}
77+
78+
impl<B: FileWriterBuilder> RollingFileWriter<B> {
79+
/// Determines if the writer should roll over to a new file.
80+
///
81+
/// # Returns
82+
///
83+
/// `true` if a new file should be started, `false` otherwise
84+
fn should_roll(&self) -> bool {
85+
self.current_written_size() > self.target_file_size
86+
}
87+
}
88+
89+
impl<B: FileWriterBuilder> FileWriter for RollingFileWriter<B> {
90+
async fn write(&mut self, input: &RecordBatch) -> Result<()> {
91+
if self.inner.is_none() {
92+
// initialize inner writer
93+
self.inner = Some(self.inner_builder.clone().build().await?);
94+
}
95+
96+
if self.should_roll() {
97+
if let Some(inner) = self.inner.take() {
98+
// close the current writer, roll to a new file
99+
self.data_file_builders.extend(inner.close().await?);
100+
101+
// start a new writer
102+
self.inner = Some(self.inner_builder.clone().build().await?);
103+
}
104+
}
105+
106+
// write the input
107+
if let Some(writer) = self.inner.as_mut() {
108+
Ok(writer.write(input).await?)
109+
} else {
110+
Err(Error::new(
111+
ErrorKind::Unexpected,
112+
"Writer is not initialized!",
113+
))
114+
}
115+
}
116+
117+
async fn close(mut self) -> Result<Vec<DataFileBuilder>> {
118+
// close the current writer and merge the output
119+
if let Some(current_writer) = self.inner {
120+
self.data_file_builders
121+
.extend(current_writer.close().await?);
122+
}
123+
124+
Ok(self.data_file_builders)
125+
}
126+
}
127+
128+
impl<B: FileWriterBuilder> CurrentFileStatus for RollingFileWriter<B> {
129+
fn current_file_path(&self) -> String {
130+
self.inner.as_ref().unwrap().current_file_path()
131+
}
132+
133+
fn current_row_num(&self) -> usize {
134+
self.inner.as_ref().unwrap().current_row_num()
135+
}
136+
137+
fn current_written_size(&self) -> usize {
138+
self.inner.as_ref().unwrap().current_written_size()
139+
}
140+
}
141+
142+
#[cfg(test)]
143+
mod tests {
144+
use std::collections::HashMap;
145+
use std::sync::Arc;
146+
147+
use arrow_array::{ArrayRef, Int32Array, StringArray};
148+
use arrow_schema::{DataType, Field, Schema as ArrowSchema};
149+
use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
150+
use parquet::file::properties::WriterProperties;
151+
use rand::prelude::IteratorRandom;
152+
use tempfile::TempDir;
153+
154+
use super::*;
155+
use crate::io::FileIOBuilder;
156+
use crate::spec::{DataFileFormat, NestedField, PrimitiveType, Schema, Type};
157+
use crate::writer::base_writer::data_file_writer::DataFileWriterBuilder;
158+
use crate::writer::file_writer::ParquetWriterBuilder;
159+
use crate::writer::file_writer::location_generator::DefaultFileNameGenerator;
160+
use crate::writer::file_writer::location_generator::test::MockLocationGenerator;
161+
use crate::writer::tests::check_parquet_data_file;
162+
use crate::writer::{IcebergWriter, IcebergWriterBuilder, RecordBatch};
163+
164+
fn make_test_schema() -> Result<Schema> {
165+
Schema::builder()
166+
.with_schema_id(1)
167+
.with_fields(vec![
168+
NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
169+
NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
170+
])
171+
.build()
172+
}
173+
174+
fn make_test_arrow_schema() -> ArrowSchema {
175+
ArrowSchema::new(vec![
176+
Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
177+
PARQUET_FIELD_ID_META_KEY.to_string(),
178+
1.to_string(),
179+
)])),
180+
Field::new("name", DataType::Utf8, false).with_metadata(HashMap::from([(
181+
PARQUET_FIELD_ID_META_KEY.to_string(),
182+
2.to_string(),
183+
)])),
184+
])
185+
}
186+
187+
#[tokio::test]
188+
async fn test_rolling_writer_basic() -> Result<()> {
189+
let temp_dir = TempDir::new()?;
190+
let file_io = FileIOBuilder::new_fs_io().build()?;
191+
let location_gen =
192+
MockLocationGenerator::new(temp_dir.path().to_str().unwrap().to_string());
193+
let file_name_gen =
194+
DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
195+
196+
// Create schema
197+
let schema = make_test_schema()?;
198+
199+
// Create writer builders
200+
let parquet_writer_builder = ParquetWriterBuilder::new(
201+
WriterProperties::builder().build(),
202+
Arc::new(schema),
203+
file_io.clone(),
204+
location_gen,
205+
file_name_gen,
206+
);
207+
208+
// Set a large target size so no rolling occurs
209+
let rolling_writer_builder = RollingFileWriterBuilder::new(
210+
parquet_writer_builder,
211+
1024 * 1024, // 1MB, large enough to not trigger rolling
212+
);
213+
214+
let data_file_writer_builder = DataFileWriterBuilder::new(rolling_writer_builder, None, 0);
215+
216+
// Create writer
217+
let mut writer = data_file_writer_builder.build().await?;
218+
219+
// Create test data
220+
let arrow_schema = make_test_arrow_schema();
221+
222+
let batch = RecordBatch::try_new(Arc::new(arrow_schema), vec![
223+
Arc::new(Int32Array::from(vec![1, 2, 3])),
224+
Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie"])),
225+
])?;
226+
227+
// Write data
228+
writer.write(batch.clone()).await?;
229+
230+
// Close writer and get data files
231+
let data_files = writer.close().await?;
232+
233+
// Verify only one file was created
234+
assert_eq!(
235+
data_files.len(),
236+
1,
237+
"Expected only one data file to be created"
238+
);
239+
240+
// Verify file content
241+
check_parquet_data_file(&file_io, &data_files[0], &batch).await;
242+
243+
Ok(())
244+
}
245+
246+
#[tokio::test]
247+
async fn test_rolling_writer_with_rolling() -> Result<()> {
248+
let temp_dir = TempDir::new()?;
249+
let file_io = FileIOBuilder::new_fs_io().build()?;
250+
let location_gen =
251+
MockLocationGenerator::new(temp_dir.path().to_str().unwrap().to_string());
252+
let file_name_gen =
253+
DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
254+
255+
// Create schema
256+
let schema = make_test_schema()?;
257+
258+
// Create writer builders
259+
let parquet_writer_builder = ParquetWriterBuilder::new(
260+
WriterProperties::builder().build(),
261+
Arc::new(schema),
262+
file_io.clone(),
263+
location_gen,
264+
file_name_gen,
265+
);
266+
267+
// Set a very small target size to trigger rolling
268+
let rolling_writer_builder = RollingFileWriterBuilder::new(parquet_writer_builder, 1024);
269+
270+
let data_file_writer_builder = DataFileWriterBuilder::new(rolling_writer_builder, None, 0);
271+
272+
// Create writer
273+
let mut writer = data_file_writer_builder.build().await?;
274+
275+
// Create test data
276+
let arrow_schema = make_test_arrow_schema();
277+
let arrow_schema_ref = Arc::new(arrow_schema.clone());
278+
279+
let names = vec![
280+
"Alice", "Bob", "Charlie", "Dave", "Eve", "Frank", "Grace", "Heidi", "Ivan", "Judy",
281+
"Kelly", "Larry", "Mallory", "Shawn",
282+
];
283+
284+
let mut rng = rand::thread_rng();
285+
let batch_num = 10;
286+
let batch_rows = 100;
287+
let expected_rows = batch_num * batch_rows;
288+
289+
for i in 0..batch_num {
290+
let int_values: Vec<i32> = (0..batch_rows).map(|row| i * batch_rows + row).collect();
291+
let str_values: Vec<&str> = (0..batch_rows)
292+
.map(|_| *names.iter().choose(&mut rng).unwrap())
293+
.collect();
294+
295+
let int_array = Arc::new(Int32Array::from(int_values)) as ArrayRef;
296+
let str_array = Arc::new(StringArray::from(str_values)) as ArrayRef;
297+
298+
let batch =
299+
RecordBatch::try_new(Arc::clone(&arrow_schema_ref), vec![int_array, str_array])
300+
.expect("Failed to create RecordBatch");
301+
302+
writer.write(batch).await?;
303+
}
304+
305+
// Close writer and get data files
306+
let data_files = writer.close().await?;
307+
308+
// Verify multiple files were created (at least 4)
309+
assert!(
310+
data_files.len() > 4,
311+
"Expected at least 4 data files to be created, but got {}",
312+
data_files.len()
313+
);
314+
315+
// Verify total record count across all files
316+
let total_records: u64 = data_files.iter().map(|file| file.record_count).sum();
317+
assert_eq!(
318+
total_records, expected_rows as u64,
319+
"Expected {} total records across all files",
320+
expected_rows
321+
);
322+
323+
Ok(())
324+
}
325+
}

0 commit comments

Comments
 (0)