Skip to content

Commit 6846be4

Browse files
authored
fix(writer): Fix current_written_size for ParquetWriter (#1556)
## Which issue does this PR close? - Closes #1554 ## What changes are included in this PR? - Use `ParquetWriter`'s inner `AsyncArrowWriter`'s `bytes_written()` and `in_progress_size` to track current written size ## Are these changes tested? No new tests added
1 parent b68c2a7 commit 6846be4

File tree

1 file changed

+10
-1
lines changed

1 file changed

+10
-1
lines changed

crates/iceberg/src/writer/file_writer/parquet_writer.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,8 @@ pub struct ParquetWriter {
229229
out_file: OutputFile,
230230
inner_writer: Option<AsyncArrowWriter<AsyncFileWriter<TrackWriter>>>,
231231
writer_properties: WriterProperties,
232+
// written_size is only accurate after closing the inner writer,
233+
// because the inner writer flushes data asynchronously.
232234
written_size: Arc<AtomicI64>,
233235
current_row_num: usize,
234236
nan_value_count_visitor: NanValueCountVisitor,
@@ -611,7 +613,14 @@ impl CurrentFileStatus for ParquetWriter {
611613
}
612614

613615
fn current_written_size(&self) -> usize {
614-
self.written_size.load(std::sync::atomic::Ordering::Relaxed) as usize
616+
if let Some(inner) = self.inner_writer.as_ref() {
617+
// inner/AsyncArrowWriter contains sync and async writers
618+
// written size = bytes flushed to inner's async writer + bytes buffered in the inner's sync writer
619+
inner.bytes_written() + inner.in_progress_size()
620+
} else {
621+
// inner writer is not initialized yet
622+
0
623+
}
615624
}
616625
}
617626

0 commit comments

Comments
 (0)