Skip to content

Commit 3cc1a68

Browse files
authored
feat: Add conversion from FileMetaData to ParquetMetadata (#1074)
## Which issue does this PR close? - Closes #1033 and #1004. ## What changes are included in this PR? Add conversion from filemetadat to parquet metadata using thrift `decode_metadata` <!-- Provide a summary of the modifications in this PR. List the main changes such as new features, bug fixes, refactoring, or any other updates. --> ## Are these changes tested? <!-- Specify what test covers (unit test, integration test, etc.). If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? -->
1 parent bf149d3 commit 3cc1a68

File tree

4 files changed

+70
-89
lines changed

4 files changed

+70
-89
lines changed

Cargo.lock

Lines changed: 29 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ serde_json = "1.0.138"
9494
serde_repr = "0.1.16"
9595
serde_with = "3.4"
9696
tempfile = "3.18"
97+
thrift = "0.17.0"
9798
tokio = { version = "1.36", default-features = false }
9899
typed-builder = "0.20"
99100
url = "2.5.4"

crates/iceberg/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ serde_derive = { workspace = true }
7979
serde_json = { workspace = true }
8080
serde_repr = { workspace = true }
8181
serde_with = { workspace = true }
82+
thrift = { workspace = true }
8283
tokio = { workspace = true, optional = true, features = ["sync"] }
8384
typed-builder = { workspace = true }
8485
url = { workspace = true }

crates/iceberg/src/writer/file_writer/parquet_writer.rs

Lines changed: 39 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,12 @@ use itertools::Itertools;
2828
use parquet::arrow::async_reader::AsyncFileReader;
2929
use parquet::arrow::async_writer::AsyncFileWriter as ArrowAsyncFileWriter;
3030
use parquet::arrow::AsyncArrowWriter;
31-
use parquet::file::metadata::ParquetMetaData;
31+
use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
3232
use parquet::file::properties::WriterProperties;
33-
use parquet::file::statistics::{from_thrift, Statistics};
33+
use parquet::file::statistics::Statistics;
3434
use parquet::format::FileMetaData;
35+
use parquet::thrift::{TCompactOutputProtocol, TSerializable};
36+
use thrift::protocol::TOutputProtocol;
3537

3638
use super::location_generator::{FileNameGenerator, LocationGenerator};
3739
use super::track_writer::TrackWriter;
@@ -352,89 +354,27 @@ impl ParquetWriter {
352354
Ok(data_files)
353355
}
354356

355-
fn to_data_file_builder(
356-
schema: SchemaRef,
357-
metadata: FileMetaData,
358-
written_size: usize,
359-
file_path: String,
360-
) -> Result<DataFileBuilder> {
361-
let index_by_parquet_path = {
362-
let mut visitor = IndexByParquetPathName::new();
363-
visit_schema(&schema, &mut visitor)?;
364-
visitor
365-
};
366-
367-
let (column_sizes, value_counts, null_value_counts, (lower_bounds, upper_bounds)) = {
368-
let mut per_col_size: HashMap<i32, u64> = HashMap::new();
369-
let mut per_col_val_num: HashMap<i32, u64> = HashMap::new();
370-
let mut per_col_null_val_num: HashMap<i32, u64> = HashMap::new();
371-
let mut min_max_agg = MinMaxColAggregator::new(schema);
372-
373-
for row_group in &metadata.row_groups {
374-
for column_chunk in row_group.columns.iter() {
375-
let Some(column_chunk_metadata) = &column_chunk.meta_data else {
376-
continue;
377-
};
378-
let physical_type = column_chunk_metadata.type_;
379-
let Some(&field_id) =
380-
index_by_parquet_path.get(&column_chunk_metadata.path_in_schema.join("."))
381-
else {
382-
// Following java implementation: https://github.com/apache/iceberg/blob/29a2c456353a6120b8c882ed2ab544975b168d7b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java#L163
383-
// Ignore the field if it is not in schema.
384-
continue;
385-
};
386-
*per_col_size.entry(field_id).or_insert(0) +=
387-
column_chunk_metadata.total_compressed_size as u64;
388-
*per_col_val_num.entry(field_id).or_insert(0) +=
389-
column_chunk_metadata.num_values as u64;
390-
if let Some(null_count) = column_chunk_metadata
391-
.statistics
392-
.as_ref()
393-
.and_then(|s| s.null_count)
394-
{
395-
*per_col_null_val_num.entry(field_id).or_insert(0_u64) += null_count as u64;
396-
}
397-
if let Some(statistics) = &column_chunk_metadata.statistics {
398-
min_max_agg.update(
399-
field_id,
400-
from_thrift(physical_type.try_into()?, Some(statistics.clone()))?
401-
.unwrap(),
402-
)?;
403-
}
404-
}
405-
}
357+
fn thrift_to_parquet_metadata(&self, file_metadata: FileMetaData) -> Result<ParquetMetaData> {
358+
let mut buffer = Vec::new();
359+
{
360+
let mut protocol = TCompactOutputProtocol::new(&mut buffer);
361+
file_metadata
362+
.write_to_out_protocol(&mut protocol)
363+
.map_err(|err| {
364+
Error::new(ErrorKind::Unexpected, "Failed to write parquet metadata")
365+
.with_source(err)
366+
})?;
367+
368+
protocol.flush().map_err(|err| {
369+
Error::new(ErrorKind::Unexpected, "Failed to flush protocol").with_source(err)
370+
})?;
371+
}
406372

407-
(
408-
per_col_size,
409-
per_col_val_num,
410-
per_col_null_val_num,
411-
min_max_agg.produce(),
412-
)
413-
};
373+
let parquet_metadata = ParquetMetaDataReader::decode_metadata(&buffer).map_err(|err| {
374+
Error::new(ErrorKind::Unexpected, "Failed to decode parquet metadata").with_source(err)
375+
})?;
414376

415-
let mut builder = DataFileBuilder::default();
416-
builder
417-
.file_path(file_path)
418-
.file_format(DataFileFormat::Parquet)
419-
.record_count(metadata.num_rows as u64)
420-
.file_size_in_bytes(written_size as u64)
421-
.column_sizes(column_sizes)
422-
.value_counts(value_counts)
423-
.null_value_counts(null_value_counts)
424-
.lower_bounds(lower_bounds)
425-
.upper_bounds(upper_bounds)
426-
// # TODO(#417)
427-
// - nan_value_counts
428-
// - distinct_counts
429-
.key_metadata(metadata.footer_signing_key_metadata)
430-
.split_offsets(
431-
metadata
432-
.row_groups
433-
.iter()
434-
.filter_map(|group| group.file_offset)
435-
.collect(),
436-
);
437-
Ok(builder)
377+
Ok(parquet_metadata)
438378
}
439379

440380
/// `ParquetMetadata` to data file builder
@@ -551,19 +491,30 @@ impl FileWriter for ParquetWriter {
551491
Ok(())
552492
}
553493

554-
async fn close(self) -> crate::Result<Vec<crate::spec::DataFileBuilder>> {
555-
let Some(writer) = self.inner_writer else {
556-
return Ok(vec![]);
494+
async fn close(mut self) -> crate::Result<Vec<crate::spec::DataFileBuilder>> {
495+
let writer = match self.inner_writer.take() {
496+
Some(writer) => writer,
497+
None => return Ok(vec![]),
557498
};
499+
558500
let metadata = writer.close().await.map_err(|err| {
559501
Error::new(ErrorKind::Unexpected, "Failed to close parquet writer.").with_source(err)
560502
})?;
561503

562504
let written_size = self.written_size.load(std::sync::atomic::Ordering::Relaxed);
563505

564-
Ok(vec![Self::to_data_file_builder(
506+
let parquet_metadata =
507+
Arc::new(self.thrift_to_parquet_metadata(metadata).map_err(|err| {
508+
Error::new(
509+
ErrorKind::Unexpected,
510+
"Failed to convert metadata from thrift to parquet.",
511+
)
512+
.with_source(err)
513+
})?);
514+
515+
Ok(vec![Self::parquet_to_data_file_builder(
565516
self.schema,
566-
metadata,
517+
parquet_metadata,
567518
written_size as usize,
568519
self.out_file.location().to_string(),
569520
)?])

0 commit comments

Comments
 (0)