Skip to content

Commit fb6fef3

Browse files
authored
refactor: remove TrackWriter (#1575)
## Which issue does this PR close? - Closes #1574 ## What changes are included in this PR? 1. Remove `TrackWriter` 2. impl FileWrite for Box<dyn FileWrite> ## Are these changes tested? Covered by existing tests
1 parent 492c1a3 commit fb6fef3

File tree

4 files changed

+18
-70
lines changed

4 files changed

+18
-70
lines changed

crates/iceberg/src/io/file_io.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -390,6 +390,17 @@ impl FileWrite for opendal::Writer {
390390
}
391391
}
392392

393+
#[async_trait::async_trait]
394+
impl FileWrite for Box<dyn FileWrite> {
395+
async fn write(&mut self, bs: Bytes) -> crate::Result<()> {
396+
self.as_mut().write(bs).await
397+
}
398+
399+
async fn close(&mut self) -> crate::Result<()> {
400+
self.as_mut().close().await
401+
}
402+
}
403+
393404
/// Output file is used for writing to files..
394405
#[derive(Debug)]
395406
pub struct OutputFile {

crates/iceberg/src/writer/file_writer/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ use crate::spec::DataFileBuilder;
2626

2727
mod parquet_writer;
2828
pub use parquet_writer::{ParquetWriter, ParquetWriterBuilder};
29-
mod track_writer;
3029

3130
pub mod location_generator;
3231
/// Module providing writers that can automatically roll over to new files based on size thresholds.

crates/iceberg/src/writer/file_writer/parquet_writer.rs

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
2020
use std::collections::HashMap;
2121
use std::sync::Arc;
22-
use std::sync::atomic::AtomicI64;
2322

2423
use arrow_schema::SchemaRef as ArrowSchemaRef;
2524
use bytes::Bytes;
@@ -36,7 +35,6 @@ use parquet::thrift::{TCompactOutputProtocol, TSerializable};
3635
use thrift::protocol::TOutputProtocol;
3736

3837
use super::location_generator::{FileNameGenerator, LocationGenerator};
39-
use super::track_writer::TrackWriter;
4038
use super::{FileWriter, FileWriterBuilder};
4139
use crate::arrow::{
4240
ArrowFileReader, DEFAULT_MAP_FIELD_NAME, NanValueCountVisitor, get_parquet_stat_max_as_datum,
@@ -87,7 +85,6 @@ impl<T: LocationGenerator, F: FileNameGenerator> FileWriterBuilder for ParquetWr
8785
type R = ParquetWriter;
8886

8987
async fn build(self) -> Result<Self::R> {
90-
let written_size = Arc::new(AtomicI64::new(0));
9188
let out_file = self.file_io.new_output(
9289
self.location_generator
9390
.generate_location(&self.file_name_generator.generate_file_name()),
@@ -97,7 +94,6 @@ impl<T: LocationGenerator, F: FileNameGenerator> FileWriterBuilder for ParquetWr
9794
schema: self.schema.clone(),
9895
inner_writer: None,
9996
writer_properties: self.props,
100-
written_size,
10197
current_row_num: 0,
10298
out_file,
10399
nan_value_count_visitor: NanValueCountVisitor::new(),
@@ -227,11 +223,8 @@ impl SchemaVisitor for IndexByParquetPathName {
227223
pub struct ParquetWriter {
228224
schema: SchemaRef,
229225
out_file: OutputFile,
230-
inner_writer: Option<AsyncArrowWriter<AsyncFileWriter<TrackWriter>>>,
226+
inner_writer: Option<AsyncArrowWriter<AsyncFileWriter<Box<dyn FileWrite>>>>,
231227
writer_properties: WriterProperties,
232-
// written_size is only accurate after closing the inner writer,
233-
// because the inner writer flushes data asynchronously.
234-
written_size: Arc<AtomicI64>,
235228
current_row_num: usize,
236229
nan_value_count_visitor: NanValueCountVisitor,
237230
}
@@ -534,8 +527,7 @@ impl FileWriter for ParquetWriter {
534527
writer
535528
} else {
536529
let arrow_schema: ArrowSchemaRef = Arc::new(self.schema.as_ref().try_into()?);
537-
let inner_writer =
538-
TrackWriter::new(self.out_file.writer().await?, self.written_size.clone());
530+
let inner_writer = self.out_file.writer().await?;
539531
let async_writer = AsyncFileWriter::new(inner_writer);
540532
let writer = AsyncArrowWriter::try_new(
541533
async_writer,
@@ -562,16 +554,16 @@ impl FileWriter for ParquetWriter {
562554
}
563555

564556
async fn close(mut self) -> Result<Vec<DataFileBuilder>> {
565-
let writer = match self.inner_writer.take() {
557+
let mut writer = match self.inner_writer.take() {
566558
Some(writer) => writer,
567559
None => return Ok(vec![]),
568560
};
569561

570-
let metadata = writer.close().await.map_err(|err| {
571-
Error::new(ErrorKind::Unexpected, "Failed to close parquet writer.").with_source(err)
562+
let metadata = writer.finish().await.map_err(|err| {
563+
Error::new(ErrorKind::Unexpected, "Failed to finish parquet writer.").with_source(err)
572564
})?;
573565

574-
let written_size = self.written_size.load(std::sync::atomic::Ordering::Relaxed);
566+
let written_size = writer.bytes_written();
575567

576568
if self.current_row_num == 0 {
577569
self.out_file.delete().await.map_err(|err| {
@@ -595,7 +587,7 @@ impl FileWriter for ParquetWriter {
595587
Ok(vec![Self::parquet_to_data_file_builder(
596588
self.schema,
597589
parquet_metadata,
598-
written_size as usize,
590+
written_size,
599591
self.out_file.location().to_string(),
600592
self.nan_value_count_visitor.nan_value_counts,
601593
)?])

crates/iceberg/src/writer/file_writer/track_writer.rs

Lines changed: 0 additions & 54 deletions
This file was deleted.

0 commit comments

Comments
 (0)