Skip to content

Commit b6dc4cf

Browse files
c-thielFokkoXuanwo
authored
feat: Support for V3 Metadata (#1682)
## Which issue does this PR close? Towards V3 Support! ## What changes are included in this PR? Introduce V3 FormatVersion accross Iceberg Metadata. ## Are these changes tested? Yes. Java has [a few more tests](https://github.com/apache/iceberg/blob/ee90c10e39cec0ccceb9425e03a3e0b5690daf3b/core/src/test/java/org/apache/iceberg/TestRowLineageMetadata.java#L36) regarding other table operations (different deletes, remove file), but we don't have those yet in our Transaction interface. --------- Signed-off-by: Xuanwo <[email protected]> Co-authored-by: Fokko Driesprong <[email protected]> Co-authored-by: Xuanwo <[email protected]>
1 parent b3519d5 commit b6dc4cf

File tree

17 files changed

+2276
-126
lines changed

17 files changed

+2276
-126
lines changed

crates/iceberg/src/catalog/memory/catalog.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -377,7 +377,7 @@ impl Catalog for MemoryCatalog {
377377
}
378378

379379
#[cfg(test)]
380-
mod tests {
380+
pub(crate) mod tests {
381381
use std::collections::HashSet;
382382
use std::hash::Hash;
383383
use std::iter::FromIterator;
@@ -396,7 +396,7 @@ mod tests {
396396
temp_dir.path().to_str().unwrap().to_string()
397397
}
398398

399-
async fn new_memory_catalog() -> impl Catalog {
399+
pub(crate) async fn new_memory_catalog() -> impl Catalog {
400400
let warehouse_location = temp_path();
401401
MemoryCatalogBuilder::default()
402402
.load(

crates/iceberg/src/catalog/mod.rs

Lines changed: 193 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use std::ops::Deref;
2828
use std::str::FromStr;
2929
use std::sync::Arc;
3030

31-
use _serde::deserialize_snapshot;
31+
use _serde::{deserialize_snapshot, serialize_snapshot};
3232
use async_trait::async_trait;
3333
pub use memory::MemoryCatalog;
3434
pub use metadata_location::*;
@@ -39,9 +39,9 @@ use typed_builder::TypedBuilder;
3939
use uuid::Uuid;
4040

4141
use crate::spec::{
42-
FormatVersion, PartitionStatisticsFile, Schema, SchemaId, Snapshot, SnapshotReference,
43-
SortOrder, StatisticsFile, TableMetadata, TableMetadataBuilder, UnboundPartitionSpec,
44-
ViewFormatVersion, ViewRepresentations, ViewVersion,
42+
EncryptedKey, FormatVersion, PartitionStatisticsFile, Schema, SchemaId, Snapshot,
43+
SnapshotReference, SortOrder, StatisticsFile, TableMetadata, TableMetadataBuilder,
44+
UnboundPartitionSpec, ViewFormatVersion, ViewRepresentations, ViewVersion,
4545
};
4646
use crate::table::Table;
4747
use crate::{Error, ErrorKind, Result};
@@ -291,6 +291,9 @@ pub struct TableCreation {
291291
props.into_iter().collect()
292292
}))]
293293
pub properties: HashMap<String, String>,
294+
/// Format version of the table. Defaults to V2.
295+
#[builder(default = FormatVersion::V2)]
296+
pub format_version: FormatVersion,
294297
}
295298

296299
/// TableCommit represents the commit of a table in the catalog.
@@ -479,7 +482,10 @@ pub enum TableUpdate {
479482
#[serde(rename_all = "kebab-case")]
480483
AddSnapshot {
481484
/// Snapshot to add.
482-
#[serde(deserialize_with = "deserialize_snapshot")]
485+
#[serde(
486+
deserialize_with = "deserialize_snapshot",
487+
serialize_with = "serialize_snapshot"
488+
)]
483489
snapshot: Snapshot,
484490
},
485491
/// Set table's snapshot ref.
@@ -554,6 +560,18 @@ pub enum TableUpdate {
554560
/// Schema IDs to remove.
555561
schema_ids: Vec<i32>,
556562
},
563+
/// Add an encryption key
564+
#[serde(rename_all = "kebab-case")]
565+
AddEncryptionKey {
566+
/// The encryption key to add.
567+
encryption_key: EncryptedKey,
568+
},
569+
/// Remove an encryption key
570+
#[serde(rename_all = "kebab-case")]
571+
RemoveEncryptionKey {
572+
/// The id of the encryption key to remove.
573+
key_id: String,
574+
},
557575
}
558576

559577
impl TableUpdate {
@@ -598,6 +616,12 @@ impl TableUpdate {
598616
Ok(builder.remove_partition_statistics(snapshot_id))
599617
}
600618
TableUpdate::RemoveSchemas { schema_ids } => builder.remove_schemas(&schema_ids),
619+
TableUpdate::AddEncryptionKey { encryption_key } => {
620+
Ok(builder.add_encryption_key(encryption_key))
621+
}
622+
TableUpdate::RemoveEncryptionKey { key_id } => {
623+
Ok(builder.remove_encryption_key(&key_id))
624+
}
601625
}
602626
}
603627
}
@@ -742,7 +766,7 @@ impl TableRequirement {
742766
}
743767

744768
pub(super) mod _serde {
745-
use serde::{Deserialize as _, Deserializer};
769+
use serde::{Deserialize as _, Deserializer, Serialize as _};
746770

747771
use super::*;
748772
use crate::spec::{SchemaId, Summary};
@@ -755,7 +779,18 @@ pub(super) mod _serde {
755779
Ok(buf.into())
756780
}
757781

758-
#[derive(Debug, Deserialize, PartialEq, Eq)]
782+
pub(super) fn serialize_snapshot<S>(
783+
snapshot: &Snapshot,
784+
serializer: S,
785+
) -> std::result::Result<S::Ok, S::Error>
786+
where
787+
S: serde::Serializer,
788+
{
789+
let buf: CatalogSnapshot = snapshot.clone().into();
790+
buf.serialize(serializer)
791+
}
792+
793+
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
759794
#[serde(rename_all = "kebab-case")]
760795
/// Defines the structure of a v2 snapshot for the catalog.
761796
/// Main difference to SnapshotV2 is that sequence-number is optional
@@ -771,6 +806,12 @@ pub(super) mod _serde {
771806
summary: Summary,
772807
#[serde(skip_serializing_if = "Option::is_none")]
773808
schema_id: Option<SchemaId>,
809+
#[serde(skip_serializing_if = "Option::is_none")]
810+
first_row_id: Option<u64>,
811+
#[serde(skip_serializing_if = "Option::is_none")]
812+
added_rows: Option<u64>,
813+
#[serde(skip_serializing_if = "Option::is_none")]
814+
key_id: Option<String>,
774815
}
775816

776817
impl From<CatalogSnapshot> for Snapshot {
@@ -783,18 +824,59 @@ pub(super) mod _serde {
783824
manifest_list,
784825
schema_id,
785826
summary,
827+
first_row_id,
828+
added_rows,
829+
key_id,
786830
} = snapshot;
787831
let builder = Snapshot::builder()
788832
.with_snapshot_id(snapshot_id)
789833
.with_parent_snapshot_id(parent_snapshot_id)
790834
.with_sequence_number(sequence_number)
791835
.with_timestamp_ms(timestamp_ms)
792836
.with_manifest_list(manifest_list)
793-
.with_summary(summary);
794-
if let Some(schema_id) = schema_id {
795-
builder.with_schema_id(schema_id).build()
796-
} else {
797-
builder.build()
837+
.with_summary(summary)
838+
.with_encryption_key_id(key_id);
839+
let row_range = first_row_id.zip(added_rows);
840+
match (schema_id, row_range) {
841+
(None, None) => builder.build(),
842+
(Some(schema_id), None) => builder.with_schema_id(schema_id).build(),
843+
(None, Some((first_row_id, last_row_id))) => {
844+
builder.with_row_range(first_row_id, last_row_id).build()
845+
}
846+
(Some(schema_id), Some((first_row_id, last_row_id))) => builder
847+
.with_schema_id(schema_id)
848+
.with_row_range(first_row_id, last_row_id)
849+
.build(),
850+
}
851+
}
852+
}
853+
854+
impl From<Snapshot> for CatalogSnapshot {
855+
fn from(snapshot: Snapshot) -> Self {
856+
let first_row_id = snapshot.first_row_id();
857+
let added_rows = snapshot.added_rows_count();
858+
let Snapshot {
859+
snapshot_id,
860+
parent_snapshot_id,
861+
sequence_number,
862+
timestamp_ms,
863+
manifest_list,
864+
summary,
865+
schema_id,
866+
row_range: _,
867+
encryption_key_id: key_id,
868+
} = snapshot;
869+
CatalogSnapshot {
870+
snapshot_id,
871+
parent_snapshot_id,
872+
sequence_number,
873+
timestamp_ms,
874+
manifest_list,
875+
summary,
876+
schema_id,
877+
first_row_id,
878+
added_rows,
879+
key_id,
798880
}
799881
}
800882
}
@@ -938,14 +1020,15 @@ mod tests {
9381020
use std::fs::File;
9391021
use std::io::BufReader;
9401022

1023+
use base64::Engine as _;
9411024
use serde::Serialize;
9421025
use serde::de::DeserializeOwned;
9431026
use uuid::uuid;
9441027

9451028
use super::ViewUpdate;
9461029
use crate::io::FileIOBuilder;
9471030
use crate::spec::{
948-
BlobMetadata, FormatVersion, MAIN_BRANCH, NestedField, NullOrder, Operation,
1031+
BlobMetadata, EncryptedKey, FormatVersion, MAIN_BRANCH, NestedField, NullOrder, Operation,
9491032
PartitionStatisticsFile, PrimitiveType, Schema, Snapshot, SnapshotReference,
9501033
SnapshotRetention, SortDirection, SortField, SortOrder, SqlViewRepresentation,
9511034
StatisticsFile, Summary, TableMetadata, TableMetadataBuilder, Transform, Type,
@@ -1075,20 +1158,18 @@ mod tests {
10751158
assert!(requirement.check(Some(&metadata)).is_ok());
10761159

10771160
// Add snapshot
1078-
let record = r#"
1079-
{
1080-
"snapshot-id": 3051729675574597004,
1081-
"sequence-number": 10,
1082-
"timestamp-ms": 9992191116217,
1083-
"summary": {
1084-
"operation": "append"
1085-
},
1086-
"manifest-list": "s3://b/wh/.../s1.avro",
1087-
"schema-id": 0
1088-
}
1089-
"#;
1161+
let snapshot = Snapshot::builder()
1162+
.with_snapshot_id(3051729675574597004)
1163+
.with_sequence_number(10)
1164+
.with_timestamp_ms(9992191116217)
1165+
.with_manifest_list("s3://b/wh/.../s1.avro".to_string())
1166+
.with_schema_id(0)
1167+
.with_summary(Summary {
1168+
operation: Operation::Append,
1169+
additional_properties: HashMap::new(),
1170+
})
1171+
.build();
10901172

1091-
let snapshot = serde_json::from_str::<Snapshot>(record).unwrap();
10921173
let builder = metadata.into_builder(None);
10931174
let builder = TableUpdate::AddSnapshot {
10941175
snapshot: snapshot.clone(),
@@ -1666,6 +1747,50 @@ mod tests {
16661747
assert_eq!(actual, update, "Parsed value is not equal to expected");
16671748
}
16681749

1750+
#[test]
1751+
fn test_add_snapshot_v3() {
1752+
let json = serde_json::json!(
1753+
{
1754+
"action": "add-snapshot",
1755+
"snapshot": {
1756+
"snapshot-id": 3055729675574597000i64,
1757+
"parent-snapshot-id": 3051729675574597000i64,
1758+
"timestamp-ms": 1555100955770i64,
1759+
"first-row-id":0,
1760+
"added-rows":2,
1761+
"key-id":"key123",
1762+
"summary": {
1763+
"operation": "append"
1764+
},
1765+
"manifest-list": "s3://a/b/2.avro"
1766+
}
1767+
});
1768+
1769+
let update = TableUpdate::AddSnapshot {
1770+
snapshot: Snapshot::builder()
1771+
.with_snapshot_id(3055729675574597000)
1772+
.with_parent_snapshot_id(Some(3051729675574597000))
1773+
.with_timestamp_ms(1555100955770)
1774+
.with_sequence_number(0)
1775+
.with_manifest_list("s3://a/b/2.avro")
1776+
.with_row_range(0, 2)
1777+
.with_encryption_key_id(Some("key123".to_string()))
1778+
.with_summary(Summary {
1779+
operation: Operation::Append,
1780+
additional_properties: HashMap::default(),
1781+
})
1782+
.build(),
1783+
};
1784+
1785+
let actual: TableUpdate = serde_json::from_value(json).expect("Failed to parse from json");
1786+
assert_eq!(actual, update, "Parsed value is not equal to expected");
1787+
let restored: TableUpdate = serde_json::from_str(
1788+
&serde_json::to_string(&actual).expect("Failed to serialize to json"),
1789+
)
1790+
.expect("Failed to parse from serialized json");
1791+
assert_eq!(restored, update);
1792+
}
1793+
16691794
#[test]
16701795
fn test_remove_snapshots() {
16711796
let json = r#"
@@ -2169,6 +2294,48 @@ mod tests {
21692294
);
21702295
}
21712296

2297+
#[test]
2298+
fn test_add_encryption_key() {
2299+
let key_bytes = "key".as_bytes();
2300+
let encoded_key = base64::engine::general_purpose::STANDARD.encode(key_bytes);
2301+
test_serde_json(
2302+
format!(
2303+
r#"
2304+
{{
2305+
"action": "add-encryption-key",
2306+
"encryption-key": {{
2307+
"key-id": "a",
2308+
"encrypted-key-metadata": "{encoded_key}",
2309+
"encrypted-by-id": "b"
2310+
}}
2311+
}}
2312+
"#
2313+
),
2314+
TableUpdate::AddEncryptionKey {
2315+
encryption_key: EncryptedKey::builder()
2316+
.key_id("a")
2317+
.encrypted_key_metadata(key_bytes.to_vec())
2318+
.encrypted_by_id("b")
2319+
.build(),
2320+
},
2321+
);
2322+
}
2323+
2324+
#[test]
2325+
fn test_remove_encryption_key() {
2326+
test_serde_json(
2327+
r#"
2328+
{
2329+
"action": "remove-encryption-key",
2330+
"key-id": "a"
2331+
}
2332+
"#,
2333+
TableUpdate::RemoveEncryptionKey {
2334+
key_id: "a".to_string(),
2335+
},
2336+
);
2337+
}
2338+
21722339
#[test]
21732340
fn test_table_commit() {
21742341
let table = {

crates/iceberg/src/expr/visitors/manifest_evaluator.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -703,6 +703,7 @@ mod test {
703703
deleted_rows_count: None,
704704
partitions: Some(partitions),
705705
key_metadata: None,
706+
first_row_id: None,
706707
}
707708
}
708709

crates/iceberg/src/spec/encrypted_key.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,16 +26,16 @@ use serde::{Deserialize, Serialize};
2626
pub struct EncryptedKey {
2727
/// Unique identifier for the key
2828
#[builder(setter(into))]
29-
key_id: String,
29+
pub(crate) key_id: String,
3030
/// Encrypted key metadata as binary data
3131
#[builder(setter(into))]
32-
encrypted_key_metadata: Vec<u8>,
32+
pub(crate) encrypted_key_metadata: Vec<u8>,
3333
/// Identifier of the entity that encrypted this key
34-
#[builder(setter(into))]
35-
encrypted_by_id: String,
34+
#[builder(default, setter(into, strip_option))]
35+
pub(crate) encrypted_by_id: Option<String>,
3636
/// Additional properties associated with the key
3737
#[builder(default)]
38-
properties: HashMap<String, String>,
38+
pub(crate) properties: HashMap<String, String>,
3939
}
4040

4141
impl EncryptedKey {
@@ -50,8 +50,8 @@ impl EncryptedKey {
5050
}
5151

5252
/// Returns the ID of the entity that encrypted this key
53-
pub fn encrypted_by_id(&self) -> &str {
54-
&self.encrypted_by_id
53+
pub fn encrypted_by_id(&self) -> Option<&str> {
54+
self.encrypted_by_id.as_deref()
5555
}
5656

5757
/// Returns the properties map
@@ -72,7 +72,7 @@ pub(super) mod _serde {
7272
pub(super) struct EncryptedKeySerde {
7373
pub key_id: String,
7474
pub encrypted_key_metadata: String, // Base64 encoded
75-
pub encrypted_by_id: String,
75+
pub encrypted_by_id: Option<String>,
7676
#[serde(default, skip_serializing_if = "HashMap::is_empty")]
7777
pub properties: HashMap<String, String>,
7878
}

0 commit comments

Comments
 (0)