Skip to content

Commit 76d8e2d

Browse files
fix: support reading compressed metadata (#1802)
The spec mentions this naming convention here: https://iceberg.apache.org/spec/#naming-for-gzip-compressed-metadata-json-files ## Which issue does this PR close? - Closes #1801 ## What changes are included in this PR? Support for reading compressed metadata. ## Are these changes tested? Yes. Co-authored-by: Renjie Liu <[email protected]>
1 parent a970a0c commit 76d8e2d

File tree

5 files changed

+53
-4
lines changed

5 files changed

+53
-4
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ enum-ordinalize = "4.3.0"
7171
env_logger = "0.11.8"
7272
expect-test = "1"
7373
faststr = "0.2.31"
74+
flate2 = "1.1.5"
7475
fnv = "1.0.7"
7576
fs-err = "3.1.0"
7677
futures = "0.3"

crates/iceberg/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ bytes = { workspace = true }
6363
chrono = { workspace = true }
6464
derive_builder = { workspace = true }
6565
expect-test = { workspace = true }
66+
flate2 = { workspace = true }
6667
fnv = { workspace = true }
6768
futures = { workspace = true }
6869
itertools = { workspace = true }

crates/iceberg/src/spec/table_metadata.rs

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,12 @@ use std::cmp::Ordering;
2222
use std::collections::HashMap;
2323
use std::fmt::{Display, Formatter};
2424
use std::hash::Hash;
25+
use std::io::Read as _;
2526
use std::sync::Arc;
2627

2728
use _serde::TableMetadataEnum;
2829
use chrono::{DateTime, Utc};
30+
use flate2::read::GzDecoder;
2931
use serde::{Deserialize, Serialize};
3032
use serde_repr::{Deserialize_repr, Serialize_repr};
3133
use uuid::Uuid;
@@ -426,9 +428,30 @@ impl TableMetadata {
426428
file_io: &FileIO,
427429
metadata_location: impl AsRef<str>,
428430
) -> Result<TableMetadata> {
431+
let metadata_location = metadata_location.as_ref();
429432
let input_file = file_io.new_input(metadata_location)?;
430433
let metadata_content = input_file.read().await?;
431-
let metadata = serde_json::from_slice::<TableMetadata>(&metadata_content)?;
434+
435+
// Check if the file is compressed by looking for the gzip "magic number".
436+
let metadata = if metadata_content.len() > 2
437+
&& metadata_content[0] == 0x1F
438+
&& metadata_content[1] == 0x8B
439+
{
440+
let mut decoder = GzDecoder::new(metadata_content.as_ref());
441+
let mut decompressed_data = Vec::new();
442+
decoder.read_to_end(&mut decompressed_data).map_err(|e| {
443+
Error::new(
444+
ErrorKind::DataInvalid,
445+
"Trying to read compressed metadata file",
446+
)
447+
.with_context("file_path", metadata_location)
448+
.with_source(e)
449+
})?;
450+
serde_json::from_slice(&decompressed_data)?
451+
} else {
452+
serde_json::from_slice(&metadata_content)?
453+
};
454+
432455
Ok(metadata)
433456
}
434457

@@ -1516,6 +1539,7 @@ impl SnapshotLog {
15161539
mod tests {
15171540
use std::collections::HashMap;
15181541
use std::fs;
1542+
use std::io::Write as _;
15191543
use std::sync::Arc;
15201544

15211545
use anyhow::Result;
@@ -3524,6 +3548,30 @@ mod tests {
35243548
assert_eq!(read_metadata, original_metadata);
35253549
}
35263550

3551+
#[tokio::test]
3552+
async fn test_table_metadata_read_compressed() {
3553+
let temp_dir = TempDir::new().unwrap();
3554+
let metadata_location = temp_dir.path().join("v1.gz.metadata.json");
3555+
3556+
let original_metadata: TableMetadata = get_test_table_metadata("TableMetadataV2Valid.json");
3557+
let json = serde_json::to_string(&original_metadata).unwrap();
3558+
3559+
let mut encoder = flate2::write::GzEncoder::new(Vec::new(), flate2::Compression::default());
3560+
encoder.write_all(json.as_bytes()).unwrap();
3561+
std::fs::write(&metadata_location, encoder.finish().unwrap())
3562+
.expect("failed to write metadata");
3563+
3564+
// Read the metadata back
3565+
let file_io = FileIOBuilder::new_fs_io().build().unwrap();
3566+
let metadata_location = metadata_location.to_str().unwrap();
3567+
let read_metadata = TableMetadata::read_from(&file_io, metadata_location)
3568+
.await
3569+
.unwrap();
3570+
3571+
// Verify the metadata matches
3572+
assert_eq!(read_metadata, original_metadata);
3573+
}
3574+
35273575
#[tokio::test]
35283576
async fn test_table_metadata_read_nonexistent_file() {
35293577
// Create a FileIO instance

crates/iceberg/src/table.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -297,9 +297,7 @@ impl StaticTable {
297297
table_ident: TableIdent,
298298
file_io: FileIO,
299299
) -> Result<Self> {
300-
let metadata_file = file_io.new_input(metadata_location)?;
301-
let metadata_file_content = metadata_file.read().await?;
302-
let metadata = serde_json::from_slice::<TableMetadata>(&metadata_file_content)?;
300+
let metadata = TableMetadata::read_from(&file_io, metadata_location).await?;
303301

304302
let table = Table::builder()
305303
.metadata(metadata)

0 commit comments

Comments
 (0)