Skip to content
Open
4 changes: 2 additions & 2 deletions crates/iceberg/src/catalog/memory/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ impl Catalog for MemoryCatalog {
}

#[cfg(test)]
mod tests {
pub(crate) mod tests {
use std::collections::HashSet;
use std::hash::Hash;
use std::iter::FromIterator;
Expand All @@ -396,7 +396,7 @@ mod tests {
temp_dir.path().to_str().unwrap().to_string()
}

async fn new_memory_catalog() -> impl Catalog {
pub(crate) async fn new_memory_catalog() -> impl Catalog {
let warehouse_location = temp_path();
MemoryCatalogBuilder::default()
.load(
Expand Down
219 changes: 193 additions & 26 deletions crates/iceberg/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use std::ops::Deref;
use std::str::FromStr;
use std::sync::Arc;

use _serde::deserialize_snapshot;
use _serde::{deserialize_snapshot, serialize_snapshot};
use async_trait::async_trait;
pub use memory::MemoryCatalog;
pub use metadata_location::*;
Expand All @@ -39,9 +39,9 @@ use typed_builder::TypedBuilder;
use uuid::Uuid;

use crate::spec::{
FormatVersion, PartitionStatisticsFile, Schema, SchemaId, Snapshot, SnapshotReference,
SortOrder, StatisticsFile, TableMetadata, TableMetadataBuilder, UnboundPartitionSpec,
ViewFormatVersion, ViewRepresentations, ViewVersion,
EncryptedKey, FormatVersion, PartitionStatisticsFile, Schema, SchemaId, Snapshot,
SnapshotReference, SortOrder, StatisticsFile, TableMetadata, TableMetadataBuilder,
UnboundPartitionSpec, ViewFormatVersion, ViewRepresentations, ViewVersion,
};
use crate::table::Table;
use crate::{Error, ErrorKind, Result};
Expand Down Expand Up @@ -291,6 +291,9 @@ pub struct TableCreation {
props.into_iter().collect()
}))]
pub properties: HashMap<String, String>,
/// Format version of the table. Defaults to V2.
#[builder(default = FormatVersion::V2)]
pub format_version: FormatVersion,
}

/// TableCommit represents the commit of a table in the catalog.
Expand Down Expand Up @@ -479,7 +482,10 @@ pub enum TableUpdate {
#[serde(rename_all = "kebab-case")]
AddSnapshot {
/// Snapshot to add.
#[serde(deserialize_with = "deserialize_snapshot")]
#[serde(
deserialize_with = "deserialize_snapshot",
serialize_with = "serialize_snapshot"
)]
snapshot: Snapshot,
},
/// Set table's snapshot ref.
Expand Down Expand Up @@ -554,6 +560,18 @@ pub enum TableUpdate {
/// Schema IDs to remove.
schema_ids: Vec<i32>,
},
/// Add an encryption key
#[serde(rename_all = "kebab-case")]
AddEncryptionKey {
/// The encryption key to add.
encryption_key: EncryptedKey,
},
/// Remove an encryption key
#[serde(rename_all = "kebab-case")]
RemoveEncryptionKey {
/// The id of the encryption key to remove.
key_id: String,
},
}

impl TableUpdate {
Expand Down Expand Up @@ -598,6 +616,12 @@ impl TableUpdate {
Ok(builder.remove_partition_statistics(snapshot_id))
}
TableUpdate::RemoveSchemas { schema_ids } => builder.remove_schemas(&schema_ids),
TableUpdate::AddEncryptionKey { encryption_key } => {
Ok(builder.add_encryption_key(encryption_key))
}
TableUpdate::RemoveEncryptionKey { key_id } => {
Ok(builder.remove_encryption_key(&key_id))
}
}
}
}
Expand Down Expand Up @@ -746,7 +770,7 @@ impl TableRequirement {
}

pub(super) mod _serde {
use serde::{Deserialize as _, Deserializer};
use serde::{Deserialize as _, Deserializer, Serialize as _};

use super::*;
use crate::spec::{SchemaId, Summary};
Expand All @@ -759,7 +783,18 @@ pub(super) mod _serde {
Ok(buf.into())
}

#[derive(Debug, Deserialize, PartialEq, Eq)]
pub(super) fn serialize_snapshot<S>(
snapshot: &Snapshot,
serializer: S,
) -> std::result::Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let buf: CatalogSnapshot = snapshot.clone().into();
buf.serialize(serializer)
}

#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "kebab-case")]
/// Defines the structure of a v2 snapshot for the catalog.
/// Main difference to SnapshotV2 is that sequence-number is optional
Expand All @@ -775,6 +810,12 @@ pub(super) mod _serde {
summary: Summary,
#[serde(skip_serializing_if = "Option::is_none")]
schema_id: Option<SchemaId>,
#[serde(skip_serializing_if = "Option::is_none")]
first_row_id: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
added_rows: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
key_id: Option<String>,
}

impl From<CatalogSnapshot> for Snapshot {
Expand All @@ -787,18 +828,59 @@ pub(super) mod _serde {
manifest_list,
schema_id,
summary,
first_row_id,
added_rows,
key_id,
} = snapshot;
let builder = Snapshot::builder()
.with_snapshot_id(snapshot_id)
.with_parent_snapshot_id(parent_snapshot_id)
.with_sequence_number(sequence_number)
.with_timestamp_ms(timestamp_ms)
.with_manifest_list(manifest_list)
.with_summary(summary);
if let Some(schema_id) = schema_id {
builder.with_schema_id(schema_id).build()
} else {
builder.build()
.with_summary(summary)
.with_encryption_key_id(key_id);
let row_range = first_row_id.zip(added_rows);
match (schema_id, row_range) {
(None, None) => builder.build(),
(Some(schema_id), None) => builder.with_schema_id(schema_id).build(),
(None, Some((first_row_id, last_row_id))) => {
builder.with_row_range(first_row_id, last_row_id).build()
}
(Some(schema_id), Some((first_row_id, last_row_id))) => builder
.with_schema_id(schema_id)
.with_row_range(first_row_id, last_row_id)
.build(),
}
}
}

impl From<Snapshot> for CatalogSnapshot {
fn from(snapshot: Snapshot) -> Self {
let first_row_id = snapshot.first_row_id();
let added_rows = snapshot.added_rows_count();
let Snapshot {
snapshot_id,
parent_snapshot_id,
sequence_number,
timestamp_ms,
manifest_list,
summary,
schema_id,
row_range: _,
encryption_key_id: key_id,
} = snapshot;
CatalogSnapshot {
snapshot_id,
parent_snapshot_id,
sequence_number,
timestamp_ms,
manifest_list,
summary,
schema_id,
first_row_id,
added_rows,
key_id,
}
}
}
Expand Down Expand Up @@ -942,14 +1024,15 @@ mod tests {
use std::fs::File;
use std::io::BufReader;

use base64::Engine as _;
use serde::Serialize;
use serde::de::DeserializeOwned;
use uuid::uuid;

use super::ViewUpdate;
use crate::io::FileIOBuilder;
use crate::spec::{
BlobMetadata, FormatVersion, MAIN_BRANCH, NestedField, NullOrder, Operation,
BlobMetadata, EncryptedKey, FormatVersion, MAIN_BRANCH, NestedField, NullOrder, Operation,
PartitionStatisticsFile, PrimitiveType, Schema, Snapshot, SnapshotReference,
SnapshotRetention, SortDirection, SortField, SortOrder, SqlViewRepresentation,
StatisticsFile, Summary, TableMetadata, TableMetadataBuilder, Transform, Type,
Expand Down Expand Up @@ -1079,20 +1162,18 @@ mod tests {
assert!(requirement.check(Some(&metadata)).is_ok());

// Add snapshot
let record = r#"
{
"snapshot-id": 3051729675574597004,
"sequence-number": 10,
"timestamp-ms": 9992191116217,
"summary": {
"operation": "append"
},
"manifest-list": "s3://b/wh/.../s1.avro",
"schema-id": 0
}
"#;
let snapshot = Snapshot::builder()
.with_snapshot_id(3051729675574597004)
.with_sequence_number(10)
.with_timestamp_ms(9992191116217)
.with_manifest_list("s3://b/wh/.../s1.avro".to_string())
.with_schema_id(0)
.with_summary(Summary {
operation: Operation::Append,
additional_properties: HashMap::new(),
})
.build();

let snapshot = serde_json::from_str::<Snapshot>(record).unwrap();
let builder = metadata.into_builder(None);
let builder = TableUpdate::AddSnapshot {
snapshot: snapshot.clone(),
Expand Down Expand Up @@ -1670,6 +1751,50 @@ mod tests {
assert_eq!(actual, update, "Parsed value is not equal to expected");
}

#[test]
fn test_add_snapshot_v3() {
let json = serde_json::json!(
{
"action": "add-snapshot",
"snapshot": {
"snapshot-id": 3055729675574597000i64,
"parent-snapshot-id": 3051729675574597000i64,
"timestamp-ms": 1555100955770i64,
"first-row-id":0,
"added-rows":2,
"key-id":"key123",
"summary": {
"operation": "append"
},
"manifest-list": "s3://a/b/2.avro"
}
});

let update = TableUpdate::AddSnapshot {
snapshot: Snapshot::builder()
.with_snapshot_id(3055729675574597000)
.with_parent_snapshot_id(Some(3051729675574597000))
.with_timestamp_ms(1555100955770)
.with_sequence_number(0)
.with_manifest_list("s3://a/b/2.avro")
.with_row_range(0, 2)
.with_encryption_key_id(Some("key123".to_string()))
.with_summary(Summary {
operation: Operation::Append,
additional_properties: HashMap::default(),
})
.build(),
};

let actual: TableUpdate = serde_json::from_value(json).expect("Failed to parse from json");
assert_eq!(actual, update, "Parsed value is not equal to expected");
let restored: TableUpdate = serde_json::from_str(
&serde_json::to_string(&actual).expect("Failed to serialize to json"),
)
.expect("Failed to parse from serialized json");
assert_eq!(restored, update);
}

#[test]
fn test_remove_snapshots() {
let json = r#"
Expand Down Expand Up @@ -2173,6 +2298,48 @@ mod tests {
);
}

#[test]
fn test_add_encryption_key() {
let key_bytes = "key".as_bytes();
let encoded_key = base64::engine::general_purpose::STANDARD.encode(key_bytes);
test_serde_json(
format!(
r#"
{{
"action": "add-encryption-key",
"encryption-key": {{
"key-id": "a",
"encrypted-key-metadata": "{encoded_key}",
"encrypted-by-id": "b"
}}
}}
"#
),
TableUpdate::AddEncryptionKey {
encryption_key: EncryptedKey::builder()
.key_id("a")
.encrypted_key_metadata(key_bytes.to_vec())
.encrypted_by_id("b")
.build(),
},
);
}

#[test]
fn test_remove_encryption_key() {
test_serde_json(
r#"
{
"action": "remove-encryption-key",
"key-id": "a"
}
"#,
TableUpdate::RemoveEncryptionKey {
key_id: "a".to_string(),
},
);
}

#[test]
fn test_table_commit() {
let table = {
Expand Down
1 change: 1 addition & 0 deletions crates/iceberg/src/expr/visitors/manifest_evaluator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -703,6 +703,7 @@ mod test {
deleted_rows_count: None,
partitions: Some(partitions),
key_metadata: None,
first_row_id: None,
}
}

Expand Down
16 changes: 8 additions & 8 deletions crates/iceberg/src/spec/encrypted_key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,16 @@ use serde::{Deserialize, Serialize};
pub struct EncryptedKey {
/// Unique identifier for the key
#[builder(setter(into))]
key_id: String,
pub(crate) key_id: String,
/// Encrypted key metadata as binary data
#[builder(setter(into))]
encrypted_key_metadata: Vec<u8>,
pub(crate) encrypted_key_metadata: Vec<u8>,
/// Identifier of the entity that encrypted this key
#[builder(setter(into))]
encrypted_by_id: String,
#[builder(default, setter(into, strip_option))]
pub(crate) encrypted_by_id: Option<String>,
/// Additional properties associated with the key
#[builder(default)]
properties: HashMap<String, String>,
pub(crate) properties: HashMap<String, String>,
}

impl EncryptedKey {
Expand All @@ -50,8 +50,8 @@ impl EncryptedKey {
}

/// Returns the ID of the entity that encrypted this key
pub fn encrypted_by_id(&self) -> &str {
&self.encrypted_by_id
pub fn encrypted_by_id(&self) -> Option<&str> {
self.encrypted_by_id.as_deref()
}

/// Returns the properties map
Expand All @@ -72,7 +72,7 @@ pub(super) mod _serde {
pub(super) struct EncryptedKeySerde {
pub key_id: String,
pub encrypted_key_metadata: String, // Base64 encoded
pub encrypted_by_id: String,
pub encrypted_by_id: Option<String>,
#[serde(default, skip_serializing_if = "HashMap::is_empty")]
pub properties: HashMap<String, String>,
}
Expand Down
Loading
Loading