Skip to content

Commit 748356a

Browse files
Skip v0 metadata (#13)
1 parent b52fd1f commit 748356a

2 files changed

Lines changed: 28 additions & 32 deletions

File tree

src/iceberg_destination.rs

Lines changed: 26 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,8 @@ fn create_file_io(target_url: String) -> Result<FileIO, DataLoadingError> {
3838
Ok(file_io)
3939
}
4040

41-
// Create the v0 metadata object. This one will contain no snapshot
42-
fn create_metadata_v0(
41+
// Create an empty table metadata object that contains no snapshots
42+
fn create_empty_metadata(
4343
iceberg_schema: &iceberg::spec::Schema,
4444
target_url: String,
4545
) -> Result<TableMetadata, DataLoadingError> {
@@ -74,23 +74,25 @@ fn assign_field_ids(arrow_schema: Arc<Schema>) -> Schema {
7474
Schema::new_with_metadata(new_fields, arrow_schema.metadata.clone())
7575
}
7676

77-
// Create the v1 metadata object by adding a snapshot to the v0 metadata object
78-
fn create_metadata_v1(
79-
metadata_v0: &TableMetadata,
80-
metadata_v0_location: String,
77+
// Create a new TableMetadata object by updating the current snapshot of an existing TableMetadata
78+
fn update_metadata_snapshot(
79+
previous_metadata: &TableMetadata,
80+
previous_metadata_location: Option<String>,
8181
snapshot: Snapshot,
8282
) -> Result<TableMetadata, DataLoadingError> {
8383
let snapshot_id = snapshot.snapshot_id();
84-
let metadata_v1: TableMetadata =
85-
TableMetadataBuilder::new_from_metadata(metadata_v0.clone(), Some(metadata_v0_location))
86-
.add_snapshot(snapshot)?
87-
.set_ref(
88-
"main",
89-
SnapshotReference::new(snapshot_id, SnapshotRetention::branch(None, None, None)),
90-
)?
91-
.build()?
92-
.into();
93-
Ok(metadata_v1)
84+
let new_metadata: TableMetadata = TableMetadataBuilder::new_from_metadata(
85+
previous_metadata.clone(),
86+
previous_metadata_location,
87+
)
88+
.add_snapshot(snapshot)?
89+
.set_ref(
90+
"main",
91+
SnapshotReference::new(snapshot_id, SnapshotRetention::branch(None, None, None)),
92+
)?
93+
.build()?
94+
.into();
95+
Ok(new_metadata)
9496
}
9597

9698
const DEFAULT_SCHEMA_ID: i32 = 0;
@@ -135,7 +137,7 @@ pub async fn record_batches_to_iceberg(
135137
None
136138
};
137139

138-
let (old_metadata, old_metadata_location) = match old_version_hint {
140+
let (previous_metadata, previous_metadata_location) = match old_version_hint {
139141
Some(version_hint) => {
140142
let old_metadata_location =
141143
format!("{}/metadata/v{}.metadata.json", target_url, version_hint);
@@ -159,25 +161,19 @@ pub async fn record_batches_to_iceberg(
159161
"Schema changes not supported",
160162
)));
161163
}
162-
(old_metadata, old_metadata_location)
164+
(old_metadata, Some(old_metadata_location))
163165
}
164166
None => {
165-
let metadata_v0 = create_metadata_v0(&iceberg_schema, target_url.to_string())?;
166-
let metadata_v0_location = format!("{}/metadata/v0.metadata.json", target_url);
167-
file_io
168-
.new_output(&metadata_v0_location)?
169-
.write_exclusive(serde_json::to_vec(&metadata_v0).unwrap().into())
170-
.await?;
171-
info!("Wrote v0 metadata: {:?}", metadata_v0_location);
172-
(metadata_v0, metadata_v0_location)
167+
let empty_metadata = create_empty_metadata(&iceberg_schema, target_url.to_string())?;
168+
(empty_metadata, None)
173169
}
174170
};
175171

176172
let file_writer_builder = ParquetWriterBuilder::new(
177173
WriterProperties::builder().build(),
178174
iceberg_schema.clone(),
179175
file_io.clone(),
180-
DefaultLocationGenerator::new(old_metadata.clone()).unwrap(),
176+
DefaultLocationGenerator::new(previous_metadata.clone()).unwrap(),
181177
DefaultFileNameGenerator::new(
182178
"part".to_string(),
183179
Some(Uuid::new_v4().to_string()),
@@ -268,10 +264,11 @@ pub async fn record_batches_to_iceberg(
268264
})
269265
.build();
270266

271-
let new_metadata = create_metadata_v1(&old_metadata, old_metadata_location, snapshot)?;
267+
let new_metadata =
268+
update_metadata_snapshot(&previous_metadata, previous_metadata_location, snapshot)?;
272269
let new_version_hint = match old_version_hint {
273270
Some(x) => x + 1,
274-
None => 1,
271+
None => 0,
275272
};
276273
let new_metadata_location = format!(
277274
"{}/metadata/v{}.metadata.json",

tests/basic_integration.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -89,13 +89,12 @@ async fn test_pg_to_iceberg() {
8989
paths.sort();
9090

9191
// THEN iceberg data and metadata files are written
92-
assert_eq!(paths.len(), 6);
92+
assert_eq!(paths.len(), 5);
9393
assert!(Regex::new(r"^iceberg/data/part-00000-[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}.parquet$").unwrap().is_match(paths[0].as_ref()));
9494
assert!(Regex::new(r"^iceberg/metadata/manifest-[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}.avro$").unwrap().is_match(paths[1].as_ref()));
9595
assert!(Regex::new(r"^iceberg/metadata/manifest-list-[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}.avro$").unwrap().is_match(paths[2].as_ref()));
9696
assert_eq!(&paths[3].to_string(), "iceberg/metadata/v0.metadata.json");
97-
assert_eq!(&paths[4].to_string(), "iceberg/metadata/v1.metadata.json");
98-
assert_eq!(&paths[5].to_string(), "iceberg/metadata/version-hint.text");
97+
assert_eq!(&paths[4].to_string(), "iceberg/metadata/version-hint.text");
9998

10099
// WHEN we try to write to an existing table without passing the overwrite flag
101100
// THEN the command errors out

0 commit comments

Comments
 (0)