Skip to content
Open
4 changes: 2 additions & 2 deletions crates/iceberg/src/catalog/memory/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ impl Catalog for MemoryCatalog {
}

#[cfg(test)]
mod tests {
pub(crate) mod tests {
use std::collections::HashSet;
use std::hash::Hash;
use std::iter::FromIterator;
Expand All @@ -396,7 +396,7 @@ mod tests {
temp_dir.path().to_str().unwrap().to_string()
}

async fn new_memory_catalog() -> impl Catalog {
pub(crate) async fn new_memory_catalog() -> impl Catalog {
let warehouse_location = temp_path();
MemoryCatalogBuilder::default()
.load(
Expand Down
73 changes: 69 additions & 4 deletions crates/iceberg/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ use typed_builder::TypedBuilder;
use uuid::Uuid;

use crate::spec::{
FormatVersion, PartitionStatisticsFile, Schema, SchemaId, Snapshot, SnapshotReference,
SortOrder, StatisticsFile, TableMetadata, TableMetadataBuilder, UnboundPartitionSpec,
ViewFormatVersion, ViewRepresentations, ViewVersion,
EncryptedKey, FormatVersion, PartitionStatisticsFile, Schema, SchemaId, Snapshot,
SnapshotReference, SortOrder, StatisticsFile, TableMetadata, TableMetadataBuilder,
UnboundPartitionSpec, ViewFormatVersion, ViewRepresentations, ViewVersion,
};
use crate::table::Table;
use crate::{Error, ErrorKind, Result};
Expand Down Expand Up @@ -291,6 +291,9 @@ pub struct TableCreation {
props.into_iter().collect()
}))]
pub properties: HashMap<String, String>,
/// Format version of the table. Defaults to V2.
#[builder(default = FormatVersion::V2)]
pub format_version: FormatVersion,
}

/// TableCommit represents the commit of a table in the catalog.
Expand Down Expand Up @@ -554,6 +557,18 @@ pub enum TableUpdate {
/// Schema IDs to remove.
schema_ids: Vec<i32>,
},
/// Add an encryption key
#[serde(rename_all = "kebab-case")]
AddEncryptionKey {
/// The encryption key to add.
encryption_key: EncryptedKey,
},
/// Remove an encryption key
#[serde(rename_all = "kebab-case")]
RemoveEncryptionKey {
/// The id of the encryption key to remove.
key_id: String,
},
}

impl TableUpdate {
Expand Down Expand Up @@ -598,6 +613,12 @@ impl TableUpdate {
Ok(builder.remove_partition_statistics(snapshot_id))
}
TableUpdate::RemoveSchemas { schema_ids } => builder.remove_schemas(&schema_ids),
TableUpdate::AddEncryptionKey { encryption_key } => {
Ok(builder.add_encryption_key(encryption_key))
}
TableUpdate::RemoveEncryptionKey { key_id } => {
Ok(builder.remove_encryption_key(&key_id))
}
}
}
}
Expand Down Expand Up @@ -942,14 +963,15 @@ mod tests {
use std::fs::File;
use std::io::BufReader;

use base64::Engine as _;
use serde::Serialize;
use serde::de::DeserializeOwned;
use uuid::uuid;

use super::ViewUpdate;
use crate::io::FileIOBuilder;
use crate::spec::{
BlobMetadata, FormatVersion, MAIN_BRANCH, NestedField, NullOrder, Operation,
BlobMetadata, EncryptedKey, FormatVersion, MAIN_BRANCH, NestedField, NullOrder, Operation,
PartitionStatisticsFile, PrimitiveType, Schema, Snapshot, SnapshotReference,
SnapshotRetention, SortDirection, SortField, SortOrder, SqlViewRepresentation,
StatisticsFile, Summary, TableMetadata, TableMetadataBuilder, Transform, Type,
Expand Down Expand Up @@ -2173,6 +2195,49 @@ mod tests {
);
}

#[test]
fn test_add_encryption_key() {
let key_bytes = "key".as_bytes();
let encoded_key = base64::engine::general_purpose::STANDARD.encode(key_bytes);
test_serde_json(
format!(
r#"
{{
"action": "add-encryption-key",
"encryption-key": {{
"key-id": "a",
"encrypted-key-metadata": "{encoded_key}",
"encrypted-by-id": "b"
}}
}}
"#
),
TableUpdate::AddEncryptionKey {
encryption_key: EncryptedKey {
key_id: "a".to_string(),
encrypted_key_metadata: key_bytes.to_vec(),
encrypted_by_id: "b".to_string(),
properties: HashMap::new(),
},
},
);
}

#[test]
fn test_remove_encryption_key() {
test_serde_json(
r#"
{
"action": "remove-encryption-key",
"key-id": "a"
}
"#,
TableUpdate::RemoveEncryptionKey {
key_id: "a".to_string(),
},
);
}

#[test]
fn test_table_commit() {
let table = {
Expand Down
1 change: 1 addition & 0 deletions crates/iceberg/src/expr/visitors/manifest_evaluator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -703,6 +703,7 @@ mod test {
deleted_rows_count: None,
partitions: Some(partitions),
key_metadata: None,
first_row_id: None,
}
}

Expand Down
8 changes: 4 additions & 4 deletions crates/iceberg/src/spec/encrypted_key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,16 @@ use serde::{Deserialize, Serialize};
pub struct EncryptedKey {
/// Unique identifier for the key
#[builder(setter(into))]
key_id: String,
pub(crate) key_id: String,
/// Encrypted key metadata as binary data
#[builder(setter(into))]
encrypted_key_metadata: Vec<u8>,
pub(crate) encrypted_key_metadata: Vec<u8>,
/// Identifier of the entity that encrypted this key
#[builder(setter(into))]
encrypted_by_id: String,
pub(crate) encrypted_by_id: String,
/// Additional properties associated with the key
#[builder(default)]
properties: HashMap<String, String>,
pub(crate) properties: HashMap<String, String>,
}

impl EncryptedKey {
Expand Down
6 changes: 5 additions & 1 deletion crates/iceberg/src/spec/manifest/data_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ use serde_derive::{Deserialize, Serialize};
use serde_with::{DeserializeFromStr, SerializeDisplay};

use super::_serde::DataFileSerde;
use super::{Datum, FormatVersion, Schema, data_file_schema_v1, data_file_schema_v2};
use super::{
Datum, FormatVersion, Schema, data_file_schema_v1, data_file_schema_v2, data_file_schema_v3,
};
use crate::error::Result;
use crate::spec::{Struct, StructType};
use crate::{Error, ErrorKind};
Expand Down Expand Up @@ -293,6 +295,7 @@ pub fn write_data_files_to_avro<W: Write>(
let avro_schema = match version {
FormatVersion::V1 => data_file_schema_v1(partition_type).unwrap(),
FormatVersion::V2 => data_file_schema_v2(partition_type).unwrap(),
FormatVersion::V3 => data_file_schema_v3(partition_type).unwrap(),
};
let mut writer = AvroWriter::new(&avro_schema, writer);

Expand Down Expand Up @@ -320,6 +323,7 @@ pub fn read_data_files_from_avro<R: Read>(
let avro_schema = match version {
FormatVersion::V1 => data_file_schema_v1(partition_type).unwrap(),
FormatVersion::V2 => data_file_schema_v2(partition_type).unwrap(),
FormatVersion::V3 => data_file_schema_v3(partition_type).unwrap(),
};

let reader = AvroReader::with_schema(&avro_schema, reader)?;
Expand Down
38 changes: 38 additions & 0 deletions crates/iceberg/src/spec/manifest/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,42 @@ static CONTENT_SIZE_IN_BYTES: Lazy<NestedFieldRef> = {
})
};

fn data_file_fields_v3(partition_type: &StructType) -> Vec<NestedFieldRef> {
vec![
CONTENT.clone(),
FILE_PATH.clone(),
FILE_FORMAT.clone(),
Arc::new(NestedField::required(
102,
"partition",
Type::Struct(partition_type.clone()),
)),
RECORD_COUNT.clone(),
FILE_SIZE_IN_BYTES.clone(),
COLUMN_SIZES.clone(),
VALUE_COUNTS.clone(),
NULL_VALUE_COUNTS.clone(),
NAN_VALUE_COUNTS.clone(),
LOWER_BOUNDS.clone(),
UPPER_BOUNDS.clone(),
KEY_METADATA.clone(),
SPLIT_OFFSETS.clone(),
EQUALITY_IDS.clone(),
SORT_ORDER_ID.clone(),
FIRST_ROW_ID.clone(),
REFERENCE_DATA_FILE.clone(),
CONTENT_OFFSET.clone(),
CONTENT_SIZE_IN_BYTES.clone(),
]
}

pub(super) fn data_file_schema_v3(partition_type: &StructType) -> Result<AvroSchema> {
let schema = Schema::builder()
.with_fields(data_file_fields_v3(partition_type))
.build()?;
schema_to_avro_schema("data_file", &schema)
}

fn data_file_fields_v2(partition_type: &StructType) -> Vec<NestedFieldRef> {
vec![
CONTENT.clone(),
Expand All @@ -533,6 +569,8 @@ fn data_file_fields_v2(partition_type: &StructType) -> Vec<NestedFieldRef> {
SORT_ORDER_ID.clone(),
FIRST_ROW_ID.clone(),
REFERENCE_DATA_FILE.clone(),
// Why are the following two fields here in the existing v2 schema?
// In the spec, they are not even listed as optional for v2.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have the same doubt and this question also applies to the FIRST_ROW_ID above

CONTENT_OFFSET.clone(),
CONTENT_SIZE_IN_BYTES.clone(),
]
Expand Down
3 changes: 2 additions & 1 deletion crates/iceberg/src/spec/manifest/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ impl Manifest {
})
.collect::<Result<Vec<_>>>()?
}
FormatVersion::V2 => {
// Manifest Schema & Manifest Entry did not change between V2 and V3
FormatVersion::V2 | FormatVersion::V3 => {
let schema = manifest_schema_v2(&partition_type)?;
let reader = AvroReader::with_schema(&schema, bs)?;
reader
Expand Down
76 changes: 70 additions & 6 deletions crates/iceberg/src/spec/manifest/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,13 @@ impl ManifestWriterBuilder {
.format_version(FormatVersion::V1)
.content(ManifestContentType::Data)
.build();
ManifestWriter::new(self.output, self.snapshot_id, self.key_metadata, metadata)
ManifestWriter::new(
self.output,
self.snapshot_id,
self.key_metadata,
metadata,
None,
)
}

/// Build a [`ManifestWriter`] for format version 2, data content.
Expand All @@ -84,7 +90,13 @@ impl ManifestWriterBuilder {
.format_version(FormatVersion::V2)
.content(ManifestContentType::Data)
.build();
ManifestWriter::new(self.output, self.snapshot_id, self.key_metadata, metadata)
ManifestWriter::new(
self.output,
self.snapshot_id,
self.key_metadata,
metadata,
None,
)
}

/// Build a [`ManifestWriter`] for format version 2, deletes content.
Expand All @@ -96,7 +108,51 @@ impl ManifestWriterBuilder {
.format_version(FormatVersion::V2)
.content(ManifestContentType::Deletes)
.build();
ManifestWriter::new(self.output, self.snapshot_id, self.key_metadata, metadata)
ManifestWriter::new(
self.output,
self.snapshot_id,
self.key_metadata,
metadata,
None,
)
}

/// Build a [`ManifestWriter`] for format version 2, data content.
pub fn build_v3_data(self) -> ManifestWriter {
let metadata = ManifestMetadata::builder()
.schema_id(self.schema.schema_id())
.schema(self.schema)
.partition_spec(self.partition_spec)
.format_version(FormatVersion::V3)
.content(ManifestContentType::Data)
.build();
ManifestWriter::new(
self.output,
self.snapshot_id,
self.key_metadata,
metadata,
// First row id is assigned by the [`ManifestListWriter`] when the manifest
// is added to the list.
None,
)
}

/// Build a [`ManifestWriter`] for format version 2, deletes content.
pub fn build_v3_deletes(self) -> ManifestWriter {
let metadata = ManifestMetadata::builder()
.schema_id(self.schema.schema_id())
.schema(self.schema)
.partition_spec(self.partition_spec)
.format_version(FormatVersion::V3)
.content(ManifestContentType::Deletes)
.build();
ManifestWriter::new(
self.output,
self.snapshot_id,
self.key_metadata,
metadata,
None,
)
}
}

Expand All @@ -112,6 +168,7 @@ pub struct ManifestWriter {
existing_rows: u64,
deleted_files: u32,
deleted_rows: u64,
first_row_id: Option<u64>,

min_seq_num: Option<i64>,

Expand All @@ -129,6 +186,7 @@ impl ManifestWriter {
snapshot_id: Option<i64>,
key_metadata: Option<Vec<u8>>,
metadata: ManifestMetadata,
first_row_id: Option<u64>,
) -> Self {
Self {
output,
Expand All @@ -139,6 +197,7 @@ impl ManifestWriter {
existing_rows: 0,
deleted_files: 0,
deleted_rows: 0,
first_row_id,
min_seq_num: None,
key_metadata,
manifest_entries: Vec::new(),
Expand Down Expand Up @@ -348,7 +407,8 @@ impl ManifestWriter {
let table_schema = &self.metadata.schema;
let avro_schema = match self.metadata.format_version {
FormatVersion::V1 => manifest_schema_v1(&partition_type)?,
FormatVersion::V2 => manifest_schema_v2(&partition_type)?,
// Manifest schema did not change between V2 and V3
FormatVersion::V2 | FormatVersion::V3 => manifest_schema_v2(&partition_type)?,
};
let mut avro_writer = AvroWriter::new(&avro_schema, Vec::new());
avro_writer.add_user_metadata(
Expand Down Expand Up @@ -388,8 +448,11 @@ impl ManifestWriter {
let value = match self.metadata.format_version {
FormatVersion::V1 => to_value(ManifestEntryV1::try_from(entry, &partition_type)?)?
.resolve(&avro_schema)?,
FormatVersion::V2 => to_value(ManifestEntryV2::try_from(entry, &partition_type)?)?
.resolve(&avro_schema)?,
// Manifest entry format did not change between V2 and V3
FormatVersion::V2 | FormatVersion::V3 => {
to_value(ManifestEntryV2::try_from(entry, &partition_type)?)?
.resolve(&avro_schema)?
}
};

avro_writer.append(value)?;
Expand Down Expand Up @@ -417,6 +480,7 @@ impl ManifestWriter {
deleted_rows_count: Some(self.deleted_rows),
partitions: Some(partition_summary),
key_metadata: self.key_metadata,
first_row_id: self.first_row_id,
})
}
}
Expand Down
Loading
Loading