diff --git a/crates/iceberg/src/catalog/memory/catalog.rs b/crates/iceberg/src/catalog/memory/catalog.rs index 6ce3111419..894a312d90 100644 --- a/crates/iceberg/src/catalog/memory/catalog.rs +++ b/crates/iceberg/src/catalog/memory/catalog.rs @@ -377,7 +377,7 @@ impl Catalog for MemoryCatalog { } #[cfg(test)] -mod tests { +pub(crate) mod tests { use std::collections::HashSet; use std::hash::Hash; use std::iter::FromIterator; @@ -396,7 +396,7 @@ mod tests { temp_dir.path().to_str().unwrap().to_string() } - async fn new_memory_catalog() -> impl Catalog { + pub(crate) async fn new_memory_catalog() -> impl Catalog { let warehouse_location = temp_path(); MemoryCatalogBuilder::default() .load( diff --git a/crates/iceberg/src/catalog/mod.rs b/crates/iceberg/src/catalog/mod.rs index 77b171ee30..25fffd31b1 100644 --- a/crates/iceberg/src/catalog/mod.rs +++ b/crates/iceberg/src/catalog/mod.rs @@ -28,7 +28,7 @@ use std::ops::Deref; use std::str::FromStr; use std::sync::Arc; -use _serde::deserialize_snapshot; +use _serde::{deserialize_snapshot, serialize_snapshot}; use async_trait::async_trait; pub use memory::MemoryCatalog; pub use metadata_location::*; @@ -39,9 +39,9 @@ use typed_builder::TypedBuilder; use uuid::Uuid; use crate::spec::{ - FormatVersion, PartitionStatisticsFile, Schema, SchemaId, Snapshot, SnapshotReference, - SortOrder, StatisticsFile, TableMetadata, TableMetadataBuilder, UnboundPartitionSpec, - ViewFormatVersion, ViewRepresentations, ViewVersion, + EncryptedKey, FormatVersion, PartitionStatisticsFile, Schema, SchemaId, Snapshot, + SnapshotReference, SortOrder, StatisticsFile, TableMetadata, TableMetadataBuilder, + UnboundPartitionSpec, ViewFormatVersion, ViewRepresentations, ViewVersion, }; use crate::table::Table; use crate::{Error, ErrorKind, Result}; @@ -291,6 +291,9 @@ pub struct TableCreation { props.into_iter().collect() }))] pub properties: HashMap, + /// Format version of the table. Defaults to V2. + #[builder(default = FormatVersion::V2)] + pub format_version: FormatVersion, } /// TableCommit represents the commit of a table in the catalog. @@ -479,7 +482,10 @@ pub enum TableUpdate { #[serde(rename_all = "kebab-case")] AddSnapshot { /// Snapshot to add. - #[serde(deserialize_with = "deserialize_snapshot")] + #[serde( + deserialize_with = "deserialize_snapshot", + serialize_with = "serialize_snapshot" + )] snapshot: Snapshot, }, /// Set table's snapshot ref. @@ -554,6 +560,18 @@ pub enum TableUpdate { /// Schema IDs to remove. schema_ids: Vec, }, + /// Add an encryption key + #[serde(rename_all = "kebab-case")] + AddEncryptionKey { + /// The encryption key to add. + encryption_key: EncryptedKey, + }, + /// Remove an encryption key + #[serde(rename_all = "kebab-case")] + RemoveEncryptionKey { + /// The id of the encryption key to remove. + key_id: String, + }, } impl TableUpdate { @@ -598,6 +616,12 @@ impl TableUpdate { Ok(builder.remove_partition_statistics(snapshot_id)) } TableUpdate::RemoveSchemas { schema_ids } => builder.remove_schemas(&schema_ids), + TableUpdate::AddEncryptionKey { encryption_key } => { + Ok(builder.add_encryption_key(encryption_key)) + } + TableUpdate::RemoveEncryptionKey { key_id } => { + Ok(builder.remove_encryption_key(&key_id)) + } } } } @@ -746,7 +770,7 @@ impl TableRequirement { } pub(super) mod _serde { - use serde::{Deserialize as _, Deserializer}; + use serde::{Deserialize as _, Deserializer, Serialize as _}; use super::*; use crate::spec::{SchemaId, Summary}; @@ -759,7 +783,18 @@ pub(super) mod _serde { Ok(buf.into()) } - #[derive(Debug, Deserialize, PartialEq, Eq)] + pub(super) fn serialize_snapshot( + snapshot: &Snapshot, + serializer: S, + ) -> std::result::Result + where + S: serde::Serializer, + { + let buf: CatalogSnapshot = snapshot.clone().into(); + buf.serialize(serializer) + } + + #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "kebab-case")] /// Defines the structure of a v2 snapshot for the catalog. /// Main difference to SnapshotV2 is that sequence-number is optional @@ -775,6 +810,12 @@ pub(super) mod _serde { summary: Summary, #[serde(skip_serializing_if = "Option::is_none")] schema_id: Option, + #[serde(skip_serializing_if = "Option::is_none")] + first_row_id: Option, + #[serde(skip_serializing_if = "Option::is_none")] + added_rows: Option, + #[serde(skip_serializing_if = "Option::is_none")] + key_id: Option, } impl From for Snapshot { @@ -787,6 +828,9 @@ pub(super) mod _serde { manifest_list, schema_id, summary, + first_row_id, + added_rows, + key_id, } = snapshot; let builder = Snapshot::builder() .with_snapshot_id(snapshot_id) @@ -794,11 +838,49 @@ pub(super) mod _serde { .with_sequence_number(sequence_number) .with_timestamp_ms(timestamp_ms) .with_manifest_list(manifest_list) - .with_summary(summary); - if let Some(schema_id) = schema_id { - builder.with_schema_id(schema_id).build() - } else { - builder.build() + .with_summary(summary) + .with_encryption_key_id(key_id); + let row_range = first_row_id.zip(added_rows); + match (schema_id, row_range) { + (None, None) => builder.build(), + (Some(schema_id), None) => builder.with_schema_id(schema_id).build(), + (None, Some((first_row_id, last_row_id))) => { + builder.with_row_range(first_row_id, last_row_id).build() + } + (Some(schema_id), Some((first_row_id, last_row_id))) => builder + .with_schema_id(schema_id) + .with_row_range(first_row_id, last_row_id) + .build(), + } + } + } + + impl From for CatalogSnapshot { + fn from(snapshot: Snapshot) -> Self { + let first_row_id = snapshot.first_row_id(); + let added_rows = snapshot.added_rows_count(); + let Snapshot { + snapshot_id, + parent_snapshot_id, + sequence_number, + timestamp_ms, + manifest_list, + summary, + schema_id, + row_range: _, + encryption_key_id: key_id, + } = snapshot; + CatalogSnapshot { + snapshot_id, + parent_snapshot_id, + sequence_number, + timestamp_ms, + manifest_list, + summary, + schema_id, + first_row_id, + added_rows, + key_id, } } } @@ -942,6 +1024,7 @@ mod tests { use std::fs::File; use std::io::BufReader; + use base64::Engine as _; use serde::Serialize; use serde::de::DeserializeOwned; use uuid::uuid; @@ -949,7 +1032,7 @@ mod tests { use super::ViewUpdate; use crate::io::FileIOBuilder; use crate::spec::{ - BlobMetadata, FormatVersion, MAIN_BRANCH, NestedField, NullOrder, Operation, + BlobMetadata, EncryptedKey, FormatVersion, MAIN_BRANCH, NestedField, NullOrder, Operation, PartitionStatisticsFile, PrimitiveType, Schema, Snapshot, SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder, SqlViewRepresentation, StatisticsFile, Summary, TableMetadata, TableMetadataBuilder, Transform, Type, @@ -1079,20 +1162,18 @@ mod tests { assert!(requirement.check(Some(&metadata)).is_ok()); // Add snapshot - let record = r#" - { - "snapshot-id": 3051729675574597004, - "sequence-number": 10, - "timestamp-ms": 9992191116217, - "summary": { - "operation": "append" - }, - "manifest-list": "s3://b/wh/.../s1.avro", - "schema-id": 0 - } - "#; + let snapshot = Snapshot::builder() + .with_snapshot_id(3051729675574597004) + .with_sequence_number(10) + .with_timestamp_ms(9992191116217) + .with_manifest_list("s3://b/wh/.../s1.avro".to_string()) + .with_schema_id(0) + .with_summary(Summary { + operation: Operation::Append, + additional_properties: HashMap::new(), + }) + .build(); - let snapshot = serde_json::from_str::(record).unwrap(); let builder = metadata.into_builder(None); let builder = TableUpdate::AddSnapshot { snapshot: snapshot.clone(), @@ -1670,6 +1751,50 @@ mod tests { assert_eq!(actual, update, "Parsed value is not equal to expected"); } + #[test] + fn test_add_snapshot_v3() { + let json = serde_json::json!( + { + "action": "add-snapshot", + "snapshot": { + "snapshot-id": 3055729675574597000i64, + "parent-snapshot-id": 3051729675574597000i64, + "timestamp-ms": 1555100955770i64, + "first-row-id":0, + "added-rows":2, + "key-id":"key123", + "summary": { + "operation": "append" + }, + "manifest-list": "s3://a/b/2.avro" + } + }); + + let update = TableUpdate::AddSnapshot { + snapshot: Snapshot::builder() + .with_snapshot_id(3055729675574597000) + .with_parent_snapshot_id(Some(3051729675574597000)) + .with_timestamp_ms(1555100955770) + .with_sequence_number(0) + .with_manifest_list("s3://a/b/2.avro") + .with_row_range(0, 2) + .with_encryption_key_id(Some("key123".to_string())) + .with_summary(Summary { + operation: Operation::Append, + additional_properties: HashMap::default(), + }) + .build(), + }; + + let actual: TableUpdate = serde_json::from_value(json).expect("Failed to parse from json"); + assert_eq!(actual, update, "Parsed value is not equal to expected"); + let restored: TableUpdate = serde_json::from_str( + &serde_json::to_string(&actual).expect("Failed to serialize to json"), + ) + .expect("Failed to parse from serialized json"); + assert_eq!(restored, update); + } + #[test] fn test_remove_snapshots() { let json = r#" @@ -2173,6 +2298,48 @@ mod tests { ); } + #[test] + fn test_add_encryption_key() { + let key_bytes = "key".as_bytes(); + let encoded_key = base64::engine::general_purpose::STANDARD.encode(key_bytes); + test_serde_json( + format!( + r#" + {{ + "action": "add-encryption-key", + "encryption-key": {{ + "key-id": "a", + "encrypted-key-metadata": "{encoded_key}", + "encrypted-by-id": "b" + }} + }} + "# + ), + TableUpdate::AddEncryptionKey { + encryption_key: EncryptedKey::builder() + .key_id("a") + .encrypted_key_metadata(key_bytes.to_vec()) + .encrypted_by_id("b") + .build(), + }, + ); + } + + #[test] + fn test_remove_encryption_key() { + test_serde_json( + r#" + { + "action": "remove-encryption-key", + "key-id": "a" + } + "#, + TableUpdate::RemoveEncryptionKey { + key_id: "a".to_string(), + }, + ); + } + #[test] fn test_table_commit() { let table = { diff --git a/crates/iceberg/src/expr/visitors/manifest_evaluator.rs b/crates/iceberg/src/expr/visitors/manifest_evaluator.rs index 903c9ea4a7..abbd136cb1 100644 --- a/crates/iceberg/src/expr/visitors/manifest_evaluator.rs +++ b/crates/iceberg/src/expr/visitors/manifest_evaluator.rs @@ -703,6 +703,7 @@ mod test { deleted_rows_count: None, partitions: Some(partitions), key_metadata: None, + first_row_id: None, } } diff --git a/crates/iceberg/src/spec/encrypted_key.rs b/crates/iceberg/src/spec/encrypted_key.rs index db19a023b2..6908ade1d5 100644 --- a/crates/iceberg/src/spec/encrypted_key.rs +++ b/crates/iceberg/src/spec/encrypted_key.rs @@ -26,16 +26,16 @@ use serde::{Deserialize, Serialize}; pub struct EncryptedKey { /// Unique identifier for the key #[builder(setter(into))] - key_id: String, + pub(crate) key_id: String, /// Encrypted key metadata as binary data #[builder(setter(into))] - encrypted_key_metadata: Vec, + pub(crate) encrypted_key_metadata: Vec, /// Identifier of the entity that encrypted this key - #[builder(setter(into))] - encrypted_by_id: String, + #[builder(default, setter(into, strip_option))] + pub(crate) encrypted_by_id: Option, /// Additional properties associated with the key #[builder(default)] - properties: HashMap, + pub(crate) properties: HashMap, } impl EncryptedKey { @@ -50,8 +50,8 @@ impl EncryptedKey { } /// Returns the ID of the entity that encrypted this key - pub fn encrypted_by_id(&self) -> &str { - &self.encrypted_by_id + pub fn encrypted_by_id(&self) -> Option<&str> { + self.encrypted_by_id.as_deref() } /// Returns the properties map @@ -72,7 +72,7 @@ pub(super) mod _serde { pub(super) struct EncryptedKeySerde { pub key_id: String, pub encrypted_key_metadata: String, // Base64 encoded - pub encrypted_by_id: String, + pub encrypted_by_id: Option, #[serde(default, skip_serializing_if = "HashMap::is_empty")] pub properties: HashMap, } diff --git a/crates/iceberg/src/spec/manifest/data_file.rs b/crates/iceberg/src/spec/manifest/data_file.rs index 931f9441e9..e854e12f44 100644 --- a/crates/iceberg/src/spec/manifest/data_file.rs +++ b/crates/iceberg/src/spec/manifest/data_file.rs @@ -24,7 +24,9 @@ use serde_derive::{Deserialize, Serialize}; use serde_with::{DeserializeFromStr, SerializeDisplay}; use super::_serde::DataFileSerde; -use super::{Datum, FormatVersion, Schema, data_file_schema_v1, data_file_schema_v2}; +use super::{ + Datum, FormatVersion, Schema, data_file_schema_v1, data_file_schema_v2, data_file_schema_v3, +}; use crate::error::Result; use crate::spec::{Struct, StructType}; use crate::{Error, ErrorKind}; @@ -293,6 +295,7 @@ pub fn write_data_files_to_avro( let avro_schema = match version { FormatVersion::V1 => data_file_schema_v1(partition_type).unwrap(), FormatVersion::V2 => data_file_schema_v2(partition_type).unwrap(), + FormatVersion::V3 => data_file_schema_v3(partition_type).unwrap(), }; let mut writer = AvroWriter::new(&avro_schema, writer); @@ -320,6 +323,7 @@ pub fn read_data_files_from_avro( let avro_schema = match version { FormatVersion::V1 => data_file_schema_v1(partition_type).unwrap(), FormatVersion::V2 => data_file_schema_v2(partition_type).unwrap(), + FormatVersion::V3 => data_file_schema_v3(partition_type).unwrap(), }; let reader = AvroReader::with_schema(&avro_schema, reader)?; diff --git a/crates/iceberg/src/spec/manifest/entry.rs b/crates/iceberg/src/spec/manifest/entry.rs index 7ba9efb3b9..c1cb229c6e 100644 --- a/crates/iceberg/src/spec/manifest/entry.rs +++ b/crates/iceberg/src/spec/manifest/entry.rs @@ -509,6 +509,42 @@ static CONTENT_SIZE_IN_BYTES: Lazy = { }) }; +fn data_file_fields_v3(partition_type: &StructType) -> Vec { + vec![ + CONTENT.clone(), + FILE_PATH.clone(), + FILE_FORMAT.clone(), + Arc::new(NestedField::required( + 102, + "partition", + Type::Struct(partition_type.clone()), + )), + RECORD_COUNT.clone(), + FILE_SIZE_IN_BYTES.clone(), + COLUMN_SIZES.clone(), + VALUE_COUNTS.clone(), + NULL_VALUE_COUNTS.clone(), + NAN_VALUE_COUNTS.clone(), + LOWER_BOUNDS.clone(), + UPPER_BOUNDS.clone(), + KEY_METADATA.clone(), + SPLIT_OFFSETS.clone(), + EQUALITY_IDS.clone(), + SORT_ORDER_ID.clone(), + FIRST_ROW_ID.clone(), + REFERENCE_DATA_FILE.clone(), + CONTENT_OFFSET.clone(), + CONTENT_SIZE_IN_BYTES.clone(), + ] +} + +pub(super) fn data_file_schema_v3(partition_type: &StructType) -> Result { + let schema = Schema::builder() + .with_fields(data_file_fields_v3(partition_type)) + .build()?; + schema_to_avro_schema("data_file", &schema) +} + fn data_file_fields_v2(partition_type: &StructType) -> Vec { vec![ CONTENT.clone(), @@ -533,6 +569,8 @@ fn data_file_fields_v2(partition_type: &StructType) -> Vec { SORT_ORDER_ID.clone(), FIRST_ROW_ID.clone(), REFERENCE_DATA_FILE.clone(), + // Why are the following two fields here in the existing v2 schema? + // In the spec, they are not even listed as optional for v2. CONTENT_OFFSET.clone(), CONTENT_SIZE_IN_BYTES.clone(), ] diff --git a/crates/iceberg/src/spec/manifest/mod.rs b/crates/iceberg/src/spec/manifest/mod.rs index a1a5612c36..51219bfdb7 100644 --- a/crates/iceberg/src/spec/manifest/mod.rs +++ b/crates/iceberg/src/spec/manifest/mod.rs @@ -70,7 +70,8 @@ impl Manifest { }) .collect::>>()? } - FormatVersion::V2 => { + // Manifest Schema & Manifest Entry did not change between V2 and V3 + FormatVersion::V2 | FormatVersion::V3 => { let schema = manifest_schema_v2(&partition_type)?; let reader = AvroReader::with_schema(&schema, bs)?; reader diff --git a/crates/iceberg/src/spec/manifest/writer.rs b/crates/iceberg/src/spec/manifest/writer.rs index 673f8b5d2a..ebb0590bcf 100644 --- a/crates/iceberg/src/spec/manifest/writer.rs +++ b/crates/iceberg/src/spec/manifest/writer.rs @@ -72,7 +72,13 @@ impl ManifestWriterBuilder { .format_version(FormatVersion::V1) .content(ManifestContentType::Data) .build(); - ManifestWriter::new(self.output, self.snapshot_id, self.key_metadata, metadata) + ManifestWriter::new( + self.output, + self.snapshot_id, + self.key_metadata, + metadata, + None, + ) } /// Build a [`ManifestWriter`] for format version 2, data content. @@ -84,7 +90,13 @@ impl ManifestWriterBuilder { .format_version(FormatVersion::V2) .content(ManifestContentType::Data) .build(); - ManifestWriter::new(self.output, self.snapshot_id, self.key_metadata, metadata) + ManifestWriter::new( + self.output, + self.snapshot_id, + self.key_metadata, + metadata, + None, + ) } /// Build a [`ManifestWriter`] for format version 2, deletes content. @@ -96,7 +108,51 @@ impl ManifestWriterBuilder { .format_version(FormatVersion::V2) .content(ManifestContentType::Deletes) .build(); - ManifestWriter::new(self.output, self.snapshot_id, self.key_metadata, metadata) + ManifestWriter::new( + self.output, + self.snapshot_id, + self.key_metadata, + metadata, + None, + ) + } + + /// Build a [`ManifestWriter`] for format version 2, data content. + pub fn build_v3_data(self) -> ManifestWriter { + let metadata = ManifestMetadata::builder() + .schema_id(self.schema.schema_id()) + .schema(self.schema) + .partition_spec(self.partition_spec) + .format_version(FormatVersion::V3) + .content(ManifestContentType::Data) + .build(); + ManifestWriter::new( + self.output, + self.snapshot_id, + self.key_metadata, + metadata, + // First row id is assigned by the [`ManifestListWriter`] when the manifest + // is added to the list. + None, + ) + } + + /// Build a [`ManifestWriter`] for format version 3, deletes content. + pub fn build_v3_deletes(self) -> ManifestWriter { + let metadata = ManifestMetadata::builder() + .schema_id(self.schema.schema_id()) + .schema(self.schema) + .partition_spec(self.partition_spec) + .format_version(FormatVersion::V3) + .content(ManifestContentType::Deletes) + .build(); + ManifestWriter::new( + self.output, + self.snapshot_id, + self.key_metadata, + metadata, + None, + ) } } @@ -112,6 +168,7 @@ pub struct ManifestWriter { existing_rows: u64, deleted_files: u32, deleted_rows: u64, + first_row_id: Option, min_seq_num: Option, @@ -129,6 +186,7 @@ impl ManifestWriter { snapshot_id: Option, key_metadata: Option>, metadata: ManifestMetadata, + first_row_id: Option, ) -> Self { Self { output, @@ -139,6 +197,7 @@ impl ManifestWriter { existing_rows: 0, deleted_files: 0, deleted_rows: 0, + first_row_id, min_seq_num: None, key_metadata, manifest_entries: Vec::new(), @@ -348,7 +407,8 @@ impl ManifestWriter { let table_schema = &self.metadata.schema; let avro_schema = match self.metadata.format_version { FormatVersion::V1 => manifest_schema_v1(&partition_type)?, - FormatVersion::V2 => manifest_schema_v2(&partition_type)?, + // Manifest schema did not change between V2 and V3 + FormatVersion::V2 | FormatVersion::V3 => manifest_schema_v2(&partition_type)?, }; let mut avro_writer = AvroWriter::new(&avro_schema, Vec::new()); avro_writer.add_user_metadata( @@ -388,8 +448,11 @@ impl ManifestWriter { let value = match self.metadata.format_version { FormatVersion::V1 => to_value(ManifestEntryV1::try_from(entry, &partition_type)?)? .resolve(&avro_schema)?, - FormatVersion::V2 => to_value(ManifestEntryV2::try_from(entry, &partition_type)?)? - .resolve(&avro_schema)?, + // Manifest entry format did not change between V2 and V3 + FormatVersion::V2 | FormatVersion::V3 => { + to_value(ManifestEntryV2::try_from(entry, &partition_type)?)? + .resolve(&avro_schema)? + } }; avro_writer.append(value)?; @@ -417,6 +480,7 @@ impl ManifestWriter { deleted_rows_count: Some(self.deleted_rows), partitions: Some(partition_summary), key_metadata: self.key_metadata, + first_row_id: self.first_row_id, }) } } diff --git a/crates/iceberg/src/spec/manifest_list.rs b/crates/iceberg/src/spec/manifest_list.rs index 43808bb249..8cba4a2232 100644 --- a/crates/iceberg/src/spec/manifest_list.rs +++ b/crates/iceberg/src/spec/manifest_list.rs @@ -31,6 +31,8 @@ use self::_serde::{ManifestFileV1, ManifestFileV2}; use super::{FormatVersion, Manifest}; use crate::error::Result; use crate::io::{FileIO, OutputFile}; +use crate::spec::manifest_list::_const_schema::MANIFEST_LIST_AVRO_SCHEMA_V3; +use crate::spec::manifest_list::_serde::ManifestFileV3; use crate::{Error, ErrorKind}; /// Placeholder for sequence number. The field with this value must be replaced with the actual sequence number before it write. @@ -69,6 +71,11 @@ impl ManifestList { let values = Value::Array(reader.collect::, _>>()?); from_value::<_serde::ManifestListV2>(&values)?.try_into() } + FormatVersion::V3 => { + let reader = Reader::new(bs)?; + let values = Value::Array(reader.collect::, _>>()?); + from_value::<_serde::ManifestListV3>(&values)?.try_into() + } } } @@ -90,6 +97,7 @@ pub struct ManifestListWriter { avro_writer: Writer<'static, Vec>, sequence_number: i64, snapshot_id: i64, + next_row_id: Option, } impl std::fmt::Debug for ManifestListWriter { @@ -103,6 +111,11 @@ impl std::fmt::Debug for ManifestListWriter { } impl ManifestListWriter { + /// Get the next row ID that will be assigned to the next data manifest added. + pub fn next_row_id(&self) -> Option { + self.next_row_id + } + /// Construct a v1 [`ManifestListWriter`] that writes to a provided [`OutputFile`]. pub fn v1(output_file: OutputFile, snapshot_id: i64, parent_snapshot_id: Option) -> Self { let mut metadata = HashMap::from_iter([ @@ -115,7 +128,14 @@ impl ManifestListWriter { parent_snapshot_id.to_string(), ); } - Self::new(FormatVersion::V1, output_file, metadata, 0, snapshot_id) + Self::new( + FormatVersion::V1, + output_file, + metadata, + 0, + snapshot_id, + None, + ) } /// Construct a v2 [`ManifestListWriter`] that writes to a provided [`OutputFile`]. @@ -142,6 +162,42 @@ impl ManifestListWriter { metadata, sequence_number, snapshot_id, + None, + ) + } + + /// Construct a v3 [`ManifestListWriter`] that writes to a provided [`OutputFile`]. + pub fn v3( + output_file: OutputFile, + snapshot_id: i64, + parent_snapshot_id: Option, + sequence_number: i64, + first_row_id: Option, // Always None for delete manifests + ) -> Self { + let mut metadata = HashMap::from_iter([ + ("snapshot-id".to_string(), snapshot_id.to_string()), + ("sequence-number".to_string(), sequence_number.to_string()), + ("format-version".to_string(), "3".to_string()), + ]); + metadata.insert( + "parent-snapshot-id".to_string(), + parent_snapshot_id + .map(|v| v.to_string()) + .unwrap_or("null".to_string()), + ); + metadata.insert( + "first-row-id".to_string(), + first_row_id + .map(|v| v.to_string()) + .unwrap_or("null".to_string()), + ); + Self::new( + FormatVersion::V3, + output_file, + metadata, + sequence_number, + snapshot_id, + first_row_id, ) } @@ -151,10 +207,12 @@ impl ManifestListWriter { metadata: HashMap, sequence_number: i64, snapshot_id: i64, + first_row_id: Option, ) -> Self { let avro_schema = match format_version { FormatVersion::V1 => &MANIFEST_LIST_AVRO_SCHEMA_V1, FormatVersion::V2 => &MANIFEST_LIST_AVRO_SCHEMA_V2, + FormatVersion::V3 => &MANIFEST_LIST_AVRO_SCHEMA_V3, }; let mut avro_writer = Writer::new(avro_schema, Vec::new()); for (key, value) in metadata { @@ -168,46 +226,35 @@ impl ManifestListWriter { avro_writer, sequence_number, snapshot_id, + next_row_id: first_row_id, } } /// Append manifests to be written. + /// + /// If V3 Manifests are added and the `first_row_id` of any data manifest is unassigned, + /// it will be assigned based on the `next_row_id` of the writer, and the `next_row_id` of the writer will be updated accordingly. + /// If `first_row_id` is already assigned, it will be validated against the `next_row_id` of the writer. pub fn add_manifests(&mut self, manifests: impl Iterator) -> Result<()> { match self.format_version { FormatVersion::V1 => { for manifest in manifests { - let manifes: ManifestFileV1 = manifest.try_into()?; - self.avro_writer.append_ser(manifes)?; + let manifests: ManifestFileV1 = manifest.try_into()?; + self.avro_writer.append_ser(manifests)?; } } - FormatVersion::V2 => { + FormatVersion::V2 | FormatVersion::V3 => { for mut manifest in manifests { - if manifest.sequence_number == UNASSIGNED_SEQUENCE_NUMBER { - if manifest.added_snapshot_id != self.snapshot_id { - return Err(Error::new( - ErrorKind::DataInvalid, - format!( - "Found unassigned sequence number for a manifest from snapshot {}.", - manifest.added_snapshot_id - ), - )); - } - manifest.sequence_number = self.sequence_number; + self.assign_sequence_numbers(&mut manifest)?; + + if self.format_version == FormatVersion::V2 { + let manifest_entry: ManifestFileV2 = manifest.try_into()?; + self.avro_writer.append_ser(manifest_entry)?; + } else if self.format_version == FormatVersion::V3 { + self.assign_first_row_id(&mut manifest)?; + let manifest_entry: ManifestFileV3 = manifest.try_into()?; + self.avro_writer.append_ser(manifest_entry)?; } - if manifest.min_sequence_number == UNASSIGNED_SEQUENCE_NUMBER { - if manifest.added_snapshot_id != self.snapshot_id { - return Err(Error::new( - ErrorKind::DataInvalid, - format!( - "Found unassigned sequence number for a manifest from snapshot {}.", - manifest.added_snapshot_id - ), - )); - } - manifest.min_sequence_number = self.sequence_number; - } - let manifest_entry: ManifestFileV2 = manifest.try_into()?; - self.avro_writer.append_ser(manifest_entry)?; } } } @@ -222,6 +269,112 @@ impl ManifestListWriter { writer.close().await?; Ok(()) } + + /// Assign sequence numbers to manifest if they are unassigned + fn assign_sequence_numbers(&self, manifest: &mut ManifestFile) -> Result<()> { + if manifest.sequence_number == UNASSIGNED_SEQUENCE_NUMBER { + if manifest.added_snapshot_id != self.snapshot_id { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Found unassigned sequence number for a manifest from snapshot {}.", + manifest.added_snapshot_id + ), + )); + } + manifest.sequence_number = self.sequence_number; + } + + if manifest.min_sequence_number == UNASSIGNED_SEQUENCE_NUMBER { + if manifest.added_snapshot_id != self.snapshot_id { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Found unassigned sequence number for a manifest from snapshot {}.", + manifest.added_snapshot_id + ), + )); + } + manifest.min_sequence_number = self.sequence_number; + } + + Ok(()) + } + + /// Returns number of newly assigned first-row-ids, if any. + fn assign_first_row_id(&mut self, manifest: &mut ManifestFile) -> Result<()> { + match manifest.content { + ManifestContentType::Data => { + match (self.next_row_id, manifest.first_row_id) { + (Some(_), Some(_)) => { + // Case: Manifest with already assigned first row ID. + // No need to increase next_row_id, as this manifest is already assigned. + } + (None, Some(manifest_first_row_id)) => { + // Case: Assigned first row ID for data manifest, but the writer does not have a next-row-id assigned. + return Err(Error::new( + ErrorKind::Unexpected, + format!( + "Found invalid first-row-id assignment for Manifest {}. Writer does not have a next-row-id assigned, but the manifest has first-row-id assigned to {}.", + manifest.manifest_path, manifest_first_row_id, + ), + )); + } + (Some(writer_next_row_id), None) => { + // Case: Unassigned first row ID for data manifest. This is either a new + // manifest, or a manifest from a pre-v3 snapshot. We need to assign one. + let (existing_rows_count, added_rows_count) = + require_row_counts_in_manifest(manifest)?; + manifest.first_row_id = Some(writer_next_row_id); + + self.next_row_id = writer_next_row_id + .checked_add(existing_rows_count) + .and_then(|sum| sum.checked_add(added_rows_count)) + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!( + "Row ID overflow when computing next row ID for Manifest {}. Next Row ID: {writer_next_row_id}, Existing Rows Count: {existing_rows_count}, Added Rows Count: {added_rows_count}", + manifest.manifest_path + ), + ) + }).map(Some)?; + } + (None, None) => { + // Case: Table without row lineage. No action needed. + } + } + } + ManifestContentType::Deletes => { + // Deletes never have a first-row-id assigned. + manifest.first_row_id = None; + } + }; + + Ok(()) + } +} + +fn require_row_counts_in_manifest(manifest: &ManifestFile) -> Result<(u64, u64)> { + let existing_rows_count = manifest.existing_rows_count.ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!( + "Cannot include a Manifest without existing-rows-count to a table with row lineage enabled. Manifest path: {}", + manifest.manifest_path, + ), + ) + })?; + let added_rows_count = manifest.added_rows_count.ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!( + "Cannot include a Manifest without added-rows-count to a table with row lineage enabled. Manifest path: {}", + manifest.manifest_path, + ), + ) + })?; + Ok((existing_rows_count, added_rows_count)) } /// This is a helper module that defines the schema field of the manifest list entry. @@ -453,6 +606,15 @@ mod _const_schema { )) }) }; + static FIRST_ROW_ID: Lazy = { + Lazy::new(|| { + Arc::new(NestedField::optional( + 520, + "first_row_id", + Type::Primitive(PrimitiveType::Long), + )) + }) + }; static V1_SCHEMA: Lazy = { Lazy::new(|| { @@ -497,11 +659,38 @@ mod _const_schema { }) }; + static V3_SCHEMA: Lazy = { + Lazy::new(|| { + let fields = vec![ + MANIFEST_PATH.clone(), + MANIFEST_LENGTH.clone(), + PARTITION_SPEC_ID.clone(), + CONTENT.clone(), + SEQUENCE_NUMBER.clone(), + MIN_SEQUENCE_NUMBER.clone(), + ADDED_SNAPSHOT_ID.clone(), + ADDED_FILES_COUNT_V2.clone(), + EXISTING_FILES_COUNT_V2.clone(), + DELETED_FILES_COUNT_V2.clone(), + ADDED_ROWS_COUNT_V2.clone(), + EXISTING_ROWS_COUNT_V2.clone(), + DELETED_ROWS_COUNT_V2.clone(), + PARTITIONS.clone(), + KEY_METADATA.clone(), + FIRST_ROW_ID.clone(), + ]; + Schema::builder().with_fields(fields).build().unwrap() + }) + }; + pub(super) static MANIFEST_LIST_AVRO_SCHEMA_V1: Lazy = Lazy::new(|| schema_to_avro_schema("manifest_file", &V1_SCHEMA).unwrap()); pub(super) static MANIFEST_LIST_AVRO_SCHEMA_V2: Lazy = Lazy::new(|| schema_to_avro_schema("manifest_file", &V2_SCHEMA).unwrap()); + + pub(super) static MANIFEST_LIST_AVRO_SCHEMA_V3: Lazy = + Lazy::new(|| schema_to_avro_schema("manifest_file", &V3_SCHEMA).unwrap()); } /// Entry in a manifest list. @@ -580,6 +769,10 @@ pub struct ManifestFile { /// /// Implementation-specific key metadata for encryption pub key_metadata: Option>, + /// field 520 + /// + /// The starting _row_id to assign to rows added by ADDED data files + pub first_row_id: Option, } impl ManifestFile { @@ -706,6 +899,12 @@ pub(super) mod _serde { use crate::error::Result; use crate::spec::FieldSummary; + #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] + #[serde(transparent)] + pub(crate) struct ManifestListV3 { + entries: Vec, + } + #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] #[serde(transparent)] pub(crate) struct ManifestListV2 { @@ -718,6 +917,33 @@ pub(super) mod _serde { entries: Vec, } + impl ManifestListV3 { + /// Converts the [ManifestListV3] into a [ManifestList]. + pub fn try_into(self) -> Result { + Ok(super::ManifestList { + entries: self + .entries + .into_iter() + .map(|v| v.try_into()) + .collect::>>()?, + }) + } + } + + impl TryFrom for ManifestListV3 { + type Error = Error; + + fn try_from(value: super::ManifestList) -> std::result::Result { + Ok(Self { + entries: value + .entries + .into_iter() + .map(|v| v.try_into()) + .collect::, _>>()?, + }) + } + } + impl ManifestListV2 { /// Converts the [ManifestListV2] into a [ManifestList]. pub fn try_into(self) -> Result { @@ -816,6 +1042,58 @@ pub(super) mod _serde { pub key_metadata: Option, } + #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] + pub(super) struct ManifestFileV3 { + pub manifest_path: String, + pub manifest_length: i64, + pub partition_spec_id: i32, + #[serde(default = "v2_default_content_for_v1")] + pub content: i32, + #[serde(default = "v2_default_sequence_number_for_v1")] + pub sequence_number: i64, + #[serde(default = "v2_default_min_sequence_number_for_v1")] + pub min_sequence_number: i64, + pub added_snapshot_id: i64, + #[serde(alias = "added_data_files_count", alias = "added_files_count")] + pub added_files_count: i32, + #[serde(alias = "existing_data_files_count", alias = "existing_files_count")] + pub existing_files_count: i32, + #[serde(alias = "deleted_data_files_count", alias = "deleted_files_count")] + pub deleted_files_count: i32, + pub added_rows_count: i64, + pub existing_rows_count: i64, + pub deleted_rows_count: i64, + pub partitions: Option>, + pub key_metadata: Option, + pub first_row_id: Option, + } + + impl ManifestFileV3 { + /// Converts the [ManifestFileV3] into a [ManifestFile]. + pub fn try_into(self) -> Result { + let manifest_file = ManifestFile { + manifest_path: self.manifest_path, + manifest_length: self.manifest_length, + partition_spec_id: self.partition_spec_id, + content: self.content.try_into()?, + sequence_number: self.sequence_number, + min_sequence_number: self.min_sequence_number, + added_snapshot_id: self.added_snapshot_id, + added_files_count: Some(self.added_files_count.try_into()?), + existing_files_count: Some(self.existing_files_count.try_into()?), + deleted_files_count: Some(self.deleted_files_count.try_into()?), + added_rows_count: Some(self.added_rows_count.try_into()?), + existing_rows_count: Some(self.existing_rows_count.try_into()?), + deleted_rows_count: Some(self.deleted_rows_count.try_into()?), + partitions: self.partitions, + key_metadata: self.key_metadata.map(|b| b.into_vec()), + first_row_id: self.first_row_id, + }; + + Ok(manifest_file) + } + } + impl ManifestFileV2 { /// Converts the [ManifestFileV2] into a [ManifestFile]. pub fn try_into(self) -> Result { @@ -835,6 +1113,7 @@ pub(super) mod _serde { deleted_rows_count: Some(self.deleted_rows_count.try_into()?), partitions: self.partitions, key_metadata: self.key_metadata.map(|b| b.into_vec()), + first_row_id: None, }) } } @@ -884,6 +1163,7 @@ pub(super) mod _serde { content: super::ManifestContentType::Data, sequence_number: 0, min_sequence_number: 0, + first_row_id: None, }) } } @@ -895,6 +1175,80 @@ pub(super) mod _serde { } } + impl TryFrom for ManifestFileV3 { + type Error = Error; + + fn try_from(value: ManifestFile) -> std::result::Result { + let key_metadata = convert_to_serde_key_metadata(value.key_metadata); + Ok(Self { + manifest_path: value.manifest_path, + manifest_length: value.manifest_length, + partition_spec_id: value.partition_spec_id, + content: value.content as i32, + sequence_number: value.sequence_number, + min_sequence_number: value.min_sequence_number, + added_snapshot_id: value.added_snapshot_id, + added_files_count: value + .added_files_count + .ok_or_else(|| { + Error::new( + crate::ErrorKind::DataInvalid, + "added_data_files_count in ManifestFileV3 is required", + ) + })? + .try_into()?, + existing_files_count: value + .existing_files_count + .ok_or_else(|| { + Error::new( + crate::ErrorKind::DataInvalid, + "existing_data_files_count in ManifestFileV3 is required", + ) + })? + .try_into()?, + deleted_files_count: value + .deleted_files_count + .ok_or_else(|| { + Error::new( + crate::ErrorKind::DataInvalid, + "deleted_data_files_count in ManifestFileV3 is required", + ) + })? + .try_into()?, + added_rows_count: value + .added_rows_count + .ok_or_else(|| { + Error::new( + crate::ErrorKind::DataInvalid, + "added_rows_count in ManifestFileV3 is required", + ) + })? + .try_into()?, + existing_rows_count: value + .existing_rows_count + .ok_or_else(|| { + Error::new( + crate::ErrorKind::DataInvalid, + "existing_rows_count in ManifestFileV3 is required", + ) + })? + .try_into()?, + deleted_rows_count: value + .deleted_rows_count + .ok_or_else(|| { + Error::new( + crate::ErrorKind::DataInvalid, + "deleted_rows_count in ManifestFileV3 is required", + ) + })? + .try_into()?, + partitions: value.partitions, + key_metadata, + first_row_id: value.first_row_id, + }) + } + } + impl TryFrom for ManifestFileV2 { type Error = Error; @@ -1015,7 +1369,7 @@ mod test { use super::_serde::ManifestListV2; use crate::io::FileIOBuilder; - use crate::spec::manifest_list::_serde::ManifestListV1; + use crate::spec::manifest_list::_serde::{ManifestListV1, ManifestListV3}; use crate::spec::{ Datum, FieldSummary, ManifestContentType, ManifestFile, ManifestList, ManifestListWriter, UNASSIGNED_SEQUENCE_NUMBER, @@ -1041,6 +1395,7 @@ mod test { deleted_rows_count: Some(0), partitions: Some(vec![]), key_metadata: None, + first_row_id: None, } ] }; @@ -1092,6 +1447,7 @@ mod test { vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Datum::long(1).to_bytes().unwrap()), upper_bound: Some(Datum::long(1).to_bytes().unwrap())}] ), key_metadata: None, + first_row_id: None, }, ManifestFile { manifest_path: "s3a://icebergdata/demo/s1/t1/metadata/05ffe08b-810f-49b3-a8f4-e88fc99b254a-m1.avro".to_string(), @@ -1111,6 +1467,7 @@ mod test { vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Datum::float(1.1).to_bytes().unwrap()), upper_bound: Some(Datum::float(2.1).to_bytes().unwrap())}] ), key_metadata: None, + first_row_id: None, } ] }; @@ -1141,6 +1498,80 @@ mod test { assert_eq!(manifest_list, parsed_manifest_list); } + #[tokio::test] + async fn test_parse_manifest_list_v3() { + let manifest_list = ManifestList { + entries: vec![ + ManifestFile { + manifest_path: "s3a://icebergdata/demo/s1/t1/metadata/05ffe08b-810f-49b3-a8f4-e88fc99b254a-m0.avro".to_string(), + manifest_length: 6926, + partition_spec_id: 1, + content: ManifestContentType::Data, + sequence_number: 1, + min_sequence_number: 1, + added_snapshot_id: 377075049360453639, + added_files_count: Some(1), + existing_files_count: Some(0), + deleted_files_count: Some(0), + added_rows_count: Some(3), + existing_rows_count: Some(0), + deleted_rows_count: Some(0), + partitions: Some( + vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Datum::long(1).to_bytes().unwrap()), upper_bound: Some(Datum::long(1).to_bytes().unwrap())}] + ), + key_metadata: None, + first_row_id: Some(10), + }, + ManifestFile { + manifest_path: "s3a://icebergdata/demo/s1/t1/metadata/05ffe08b-810f-49b3-a8f4-e88fc99b254a-m1.avro".to_string(), + manifest_length: 6926, + partition_spec_id: 2, + content: ManifestContentType::Data, + sequence_number: 1, + min_sequence_number: 1, + added_snapshot_id: 377075049360453639, + added_files_count: Some(1), + existing_files_count: Some(0), + deleted_files_count: Some(0), + added_rows_count: Some(3), + existing_rows_count: Some(0), + deleted_rows_count: Some(0), + partitions: Some( + vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Datum::float(1.1).to_bytes().unwrap()), upper_bound: Some(Datum::float(2.1).to_bytes().unwrap())}] + ), + key_metadata: None, + first_row_id: Some(13), + } + ] + }; + + let file_io = FileIOBuilder::new_fs_io().build().unwrap(); + + let tmp_dir = TempDir::new().unwrap(); + let file_name = "simple_manifest_list_v3.avro"; + let full_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), file_name); + + let mut writer = ManifestListWriter::v3( + file_io.new_output(full_path.clone()).unwrap(), + 377075049360453639, + Some(377075049360453639), + 1, + Some(10), + ); + + writer + .add_manifests(manifest_list.entries.clone().into_iter()) + .unwrap(); + writer.close().await.unwrap(); + + let bs = fs::read(full_path).expect("read_file must succeed"); + + let parsed_manifest_list = + ManifestList::parse_with_version(&bs, crate::spec::FormatVersion::V3).unwrap(); + + assert_eq!(manifest_list, parsed_manifest_list); + } + #[test] fn test_serialize_manifest_list_v1() { let manifest_list:ManifestListV1 = ManifestList { @@ -1160,6 +1591,7 @@ mod test { deleted_rows_count: Some(0), partitions: None, key_metadata: None, + first_row_id: None, }] }.try_into().unwrap(); let result = serde_json::to_string(&manifest_list).unwrap(); @@ -1190,6 +1622,7 @@ mod test { vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Datum::long(1).to_bytes().unwrap()), upper_bound: Some(Datum::long(1).to_bytes().unwrap())}] ), key_metadata: None, + first_row_id: None, }] }.try_into().unwrap(); let result = serde_json::to_string(&manifest_list).unwrap(); @@ -1199,6 +1632,37 @@ mod test { ); } + #[test] + fn test_serialize_manifest_list_v3() { + let manifest_list: ManifestListV3 = ManifestList { + entries: vec![ManifestFile { + manifest_path: "s3a://icebergdata/demo/s1/t1/metadata/05ffe08b-810f-49b3-a8f4-e88fc99b254a-m0.avro".to_string(), + manifest_length: 6926, + partition_spec_id: 1, + content: ManifestContentType::Data, + sequence_number: 1, + min_sequence_number: 1, + added_snapshot_id: 377075049360453639, + added_files_count: Some(1), + existing_files_count: Some(0), + deleted_files_count: Some(0), + added_rows_count: Some(3), + existing_rows_count: Some(0), + deleted_rows_count: Some(0), + partitions: Some( + vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Datum::long(1).to_bytes().unwrap()), upper_bound: Some(Datum::long(1).to_bytes().unwrap())}] + ), + key_metadata: None, + first_row_id: Some(10), + }] + }.try_into().unwrap(); + let result = serde_json::to_string(&manifest_list).unwrap(); + assert_eq!( + result, + r#"[{"manifest_path":"s3a://icebergdata/demo/s1/t1/metadata/05ffe08b-810f-49b3-a8f4-e88fc99b254a-m0.avro","manifest_length":6926,"partition_spec_id":1,"content":0,"sequence_number":1,"min_sequence_number":1,"added_snapshot_id":377075049360453639,"added_files_count":1,"existing_files_count":0,"deleted_files_count":0,"added_rows_count":3,"existing_rows_count":0,"deleted_rows_count":0,"partitions":[{"contains_null":false,"contains_nan":false,"lower_bound":[1,0,0,0,0,0,0,0],"upper_bound":[1,0,0,0,0,0,0,0]}],"key_metadata":null,"first_row_id":10}]"# + ); + } + #[tokio::test] async fn test_manifest_list_writer_v1() { let expected_manifest_list = ManifestList { @@ -1220,6 +1684,7 @@ mod test { vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Datum::long(1).to_bytes().unwrap()), upper_bound: Some(Datum::long(1).to_bytes().unwrap())}], ), key_metadata: None, + first_row_id: None, }] }; @@ -1266,6 +1731,7 @@ mod test { vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Datum::long(1).to_bytes().unwrap()), upper_bound: Some(Datum::long(1).to_bytes().unwrap())}] ), key_metadata: None, + first_row_id: None, }] }; @@ -1290,6 +1756,56 @@ mod test { temp_dir.close().unwrap(); } + #[tokio::test] + async fn test_manifest_list_writer_v3() { + let snapshot_id = 377075049360453639; + let seq_num = 1; + let mut expected_manifest_list = ManifestList { + entries: vec![ManifestFile { + manifest_path: "s3a://icebergdata/demo/s1/t1/metadata/05ffe08b-810f-49b3-a8f4-e88fc99b254a-m0.avro".to_string(), + manifest_length: 6926, + partition_spec_id: 1, + content: ManifestContentType::Data, + sequence_number: UNASSIGNED_SEQUENCE_NUMBER, + min_sequence_number: UNASSIGNED_SEQUENCE_NUMBER, + added_snapshot_id: snapshot_id, + added_files_count: Some(1), + existing_files_count: Some(0), + deleted_files_count: Some(0), + added_rows_count: Some(3), + existing_rows_count: Some(0), + deleted_rows_count: Some(0), + partitions: Some( + vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Datum::long(1).to_bytes().unwrap()), upper_bound: Some(Datum::long(1).to_bytes().unwrap())}] + ), + key_metadata: None, + first_row_id: Some(10), + }] + }; + + let temp_dir = TempDir::new().unwrap(); + let path = temp_dir.path().join("manifest_list_v2.avro"); + let io = FileIOBuilder::new_fs_io().build().unwrap(); + let output_file = io.new_output(path.to_str().unwrap()).unwrap(); + + let mut writer = + ManifestListWriter::v3(output_file, snapshot_id, Some(0), seq_num, Some(10)); + writer + .add_manifests(expected_manifest_list.entries.clone().into_iter()) + .unwrap(); + writer.close().await.unwrap(); + + let bs = fs::read(path).unwrap(); + let manifest_list = + ManifestList::parse_with_version(&bs, crate::spec::FormatVersion::V3).unwrap(); + expected_manifest_list.entries[0].sequence_number = seq_num; + expected_manifest_list.entries[0].min_sequence_number = seq_num; + expected_manifest_list.entries[0].first_row_id = Some(10); + assert_eq!(manifest_list, expected_manifest_list); + + temp_dir.close().unwrap(); + } + #[tokio::test] async fn test_manifest_list_writer_v1_as_v2() { let expected_manifest_list = ManifestList { @@ -1311,6 +1827,7 @@ mod test { vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Datum::long(1).to_bytes().unwrap()), upper_bound: Some(Datum::long(1).to_bytes().unwrap())}] ), key_metadata: None, + first_row_id: None, }] }; @@ -1334,6 +1851,100 @@ mod test { temp_dir.close().unwrap(); } + #[tokio::test] + async fn test_manifest_list_writer_v1_as_v3() { + let expected_manifest_list = ManifestList { + entries: vec![ManifestFile { + manifest_path: "/opt/bitnami/spark/warehouse/db/table/metadata/10d28031-9739-484c-92db-cdf2975cead4-m0.avro".to_string(), + manifest_length: 5806, + partition_spec_id: 1, + content: ManifestContentType::Data, + sequence_number: 0, + min_sequence_number: 0, + added_snapshot_id: 1646658105718557341, + added_files_count: Some(3), + existing_files_count: Some(0), + deleted_files_count: Some(0), + added_rows_count: Some(3), + existing_rows_count: Some(0), + deleted_rows_count: Some(0), + partitions: Some( + vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Datum::long(1).to_bytes().unwrap()), upper_bound: Some(Datum::long(1).to_bytes().unwrap())}] + ), + key_metadata: None, + first_row_id: None, + }] + }; + + let temp_dir = TempDir::new().unwrap(); + let path = temp_dir.path().join("manifest_list_v1.avro"); + let io = FileIOBuilder::new_fs_io().build().unwrap(); + let output_file = io.new_output(path.to_str().unwrap()).unwrap(); + + let mut writer = ManifestListWriter::v1(output_file, 1646658105718557341, Some(0)); + writer + .add_manifests(expected_manifest_list.entries.clone().into_iter()) + .unwrap(); + writer.close().await.unwrap(); + + let bs = fs::read(path).unwrap(); + + let manifest_list = + ManifestList::parse_with_version(&bs, crate::spec::FormatVersion::V3).unwrap(); + assert_eq!(manifest_list, expected_manifest_list); + + temp_dir.close().unwrap(); + } + + #[tokio::test] + async fn test_manifest_list_writer_v2_as_v3() { + let snapshot_id = 377075049360453639; + let seq_num = 1; + let mut expected_manifest_list = ManifestList { + entries: vec![ManifestFile { + manifest_path: "s3a://icebergdata/demo/s1/t1/metadata/05ffe08b-810f-49b3-a8f4-e88fc99b254a-m0.avro".to_string(), + manifest_length: 6926, + partition_spec_id: 1, + content: ManifestContentType::Data, + sequence_number: UNASSIGNED_SEQUENCE_NUMBER, + min_sequence_number: UNASSIGNED_SEQUENCE_NUMBER, + added_snapshot_id: snapshot_id, + added_files_count: Some(1), + existing_files_count: Some(0), + deleted_files_count: Some(0), + added_rows_count: Some(3), + existing_rows_count: Some(0), + deleted_rows_count: Some(0), + partitions: Some( + vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Datum::long(1).to_bytes().unwrap()), upper_bound: Some(Datum::long(1).to_bytes().unwrap())}] + ), + key_metadata: None, + first_row_id: None, + }] + }; + + let temp_dir = TempDir::new().unwrap(); + let path = temp_dir.path().join("manifest_list_v2.avro"); + let io = FileIOBuilder::new_fs_io().build().unwrap(); + let output_file = io.new_output(path.to_str().unwrap()).unwrap(); + + let mut writer = ManifestListWriter::v2(output_file, snapshot_id, Some(0), seq_num); + writer + .add_manifests(expected_manifest_list.entries.clone().into_iter()) + .unwrap(); + writer.close().await.unwrap(); + + let bs = fs::read(path).unwrap(); + + let manifest_list = + ManifestList::parse_with_version(&bs, crate::spec::FormatVersion::V3).unwrap(); + expected_manifest_list.entries[0].sequence_number = seq_num; + expected_manifest_list.entries[0].min_sequence_number = seq_num; + assert_eq!(manifest_list, expected_manifest_list); + + temp_dir.close().unwrap(); + } + #[tokio::test] async fn test_manifest_list_v2_deserializer_aliases() { // reading avro manifest file generated by iceberg 1.4.0 diff --git a/crates/iceberg/src/spec/snapshot.rs b/crates/iceberg/src/spec/snapshot.rs index 04a9e15b34..1b58562122 100644 --- a/crates/iceberg/src/spec/snapshot.rs +++ b/crates/iceberg/src/spec/snapshot.rs @@ -21,7 +21,6 @@ use std::collections::HashMap; use std::sync::Arc; -use _serde::SnapshotV2; use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use typed_builder::TypedBuilder; @@ -82,33 +81,52 @@ impl Default for Operation { } } -#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize, TypedBuilder)] -#[serde(from = "SnapshotV2", into = "SnapshotV2")] +#[derive(Debug, PartialEq, Eq, Clone)] +/// Row range of a snapshot, contains first_row_id and added_rows_count. +pub struct SnapshotRowRange { + /// The first _row_id assigned to the first row in the first data file in the first manifest. + pub first_row_id: u64, + /// The upper bound of the number of rows with assigned row IDs + pub added_rows: u64, +} + +#[derive(Debug, PartialEq, Eq, Clone, TypedBuilder)] #[builder(field_defaults(setter(prefix = "with_")))] /// A snapshot represents the state of a table at some time and is used to access the complete set of data files in the table. pub struct Snapshot { /// A unique long ID - snapshot_id: i64, + pub(crate) snapshot_id: i64, /// The snapshot ID of the snapshot’s parent. /// Omitted for any snapshot with no parent #[builder(default = None)] - parent_snapshot_id: Option, + pub(crate) parent_snapshot_id: Option, /// A monotonically increasing long that tracks the order of /// changes to a table. - sequence_number: i64, + pub(crate) sequence_number: i64, /// A timestamp when the snapshot was created, used for garbage /// collection and table inspection - timestamp_ms: i64, + pub(crate) timestamp_ms: i64, /// The location of a manifest list for this snapshot that /// tracks manifest files with additional metadata. /// Currently we only support manifest list file, and manifest files are not supported. #[builder(setter(into))] - manifest_list: String, + pub(crate) manifest_list: String, /// A string map that summarizes the snapshot changes, including operation. - summary: Summary, + pub(crate) summary: Summary, /// ID of the table’s current schema when the snapshot was created. #[builder(setter(strip_option(fallback = schema_id_opt)), default = None)] - schema_id: Option, + pub(crate) schema_id: Option, + /// Encryption Key ID + #[builder(default)] + pub(crate) encryption_key_id: Option, + /// Row range of this snapshot, required when the table version supports row lineage. + /// Specify as a tuple of (first_row_id, added_rows_count) + #[builder(default, setter(!strip_option, transform = |first_row_id: u64, added_rows: u64| Some(SnapshotRowRange { first_row_id, added_rows })))] + // This is specified as a struct instead of two separate fields to ensure that both fields are either set or not set. + // The java implementations uses two separate fields, then sets `added_row_counts` to Null if `first_row_id` is set to Null. + // It throws an error if `added_row_counts` is set but `first_row_id` is not set, or if either of the two is negative. + // We handle all cases infallible using the rust type system. + pub(crate) row_range: Option, } impl Snapshot { @@ -205,6 +223,37 @@ impl Snapshot { snapshot_id: self.snapshot_id, } } + + /// The row-id of the first newly added row in this snapshot. All rows added in this snapshot will + /// have a row-id assigned to them greater than this value. All rows with a row-id less than this + /// value were created in a snapshot that was added to the table (but not necessarily committed to + /// this branch) in the past. + /// + /// This field is optional but is required when the table version supports row lineage. + pub fn first_row_id(&self) -> Option { + self.row_range.as_ref().map(|r| r.first_row_id) + } + + /// The total number of newly added rows in this snapshot. It should be the summation of {@link + /// ManifestFile#ADDED_ROWS_COUNT} for every manifest added in this snapshot. + /// + /// This field is optional but is required when the table version supports row lineage. + pub fn added_rows_count(&self) -> Option { + self.row_range.as_ref().map(|r| r.added_rows) + } + + /// Returns the row range of this snapshot, if available. + /// This is a tuple containing (first_row_id, added_rows_count). + pub fn row_range(&self) -> Option<(u64, u64)> { + self.row_range + .as_ref() + .map(|r| (r.first_row_id, r.added_rows)) + } + + /// Get encryption key id, if available. + pub fn encryption_key_id(&self) -> Option<&str> { + self.encryption_key_id.as_deref() + } } pub(super) mod _serde { @@ -219,6 +268,26 @@ pub(super) mod _serde { use super::{Operation, Snapshot, Summary}; use crate::Error; use crate::spec::SchemaId; + use crate::spec::snapshot::SnapshotRowRange; + + #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] + #[serde(rename_all = "kebab-case")] + /// Defines the structure of a v2 snapshot for serialization/deserialization + pub(crate) struct SnapshotV3 { + pub snapshot_id: i64, + #[serde(skip_serializing_if = "Option::is_none")] + pub parent_snapshot_id: Option, + pub sequence_number: i64, + pub timestamp_ms: i64, + pub manifest_list: String, + pub summary: Summary, + #[serde(skip_serializing_if = "Option::is_none")] + pub schema_id: Option, + pub first_row_id: u64, + pub added_rows: u64, + #[serde(skip_serializing_if = "Option::is_none")] + pub key_id: Option, + } #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "kebab-case")] @@ -253,6 +322,51 @@ pub(super) mod _serde { pub schema_id: Option, } + impl From for Snapshot { + fn from(s: SnapshotV3) -> Self { + Snapshot { + snapshot_id: s.snapshot_id, + parent_snapshot_id: s.parent_snapshot_id, + sequence_number: s.sequence_number, + timestamp_ms: s.timestamp_ms, + manifest_list: s.manifest_list, + summary: s.summary, + schema_id: s.schema_id, + encryption_key_id: s.key_id, + row_range: Some(SnapshotRowRange { + first_row_id: s.first_row_id, + added_rows: s.added_rows, + }), + } + } + } + + impl TryFrom for SnapshotV3 { + type Error = Error; + + fn try_from(s: Snapshot) -> Result { + let row_range = s.row_range.ok_or_else(|| { + Error::new( + crate::ErrorKind::DataInvalid, + "v3 Snapshots must have first-row-id and rows-added fields set.".to_string(), + ) + })?; + + Ok(SnapshotV3 { + snapshot_id: s.snapshot_id, + parent_snapshot_id: s.parent_snapshot_id, + sequence_number: s.sequence_number, + timestamp_ms: s.timestamp_ms, + manifest_list: s.manifest_list, + summary: s.summary, + schema_id: s.schema_id, + first_row_id: row_range.first_row_id, + added_rows: row_range.added_rows, + key_id: s.encryption_key_id, + }) + } + } + impl From for Snapshot { fn from(v2: SnapshotV2) -> Self { Snapshot { @@ -263,6 +377,8 @@ pub(super) mod _serde { manifest_list: v2.manifest_list, summary: v2.summary, schema_id: v2.schema_id, + encryption_key_id: None, + row_range: None, } } } @@ -300,6 +416,8 @@ pub(super) mod _serde { additional_properties: HashMap::new(), }), schema_id: v1.schema_id, + encryption_key_id: None, + row_range: None, }) } } diff --git a/crates/iceberg/src/spec/snapshot_summary.rs b/crates/iceberg/src/spec/snapshot_summary.rs index 1b07ce3fa0..3d8f7de5b5 100644 --- a/crates/iceberg/src/spec/snapshot_summary.rs +++ b/crates/iceberg/src/spec/snapshot_summary.rs @@ -850,6 +850,7 @@ mod tests { deleted_rows_count: Some(50), partitions: Some(Vec::new()), key_metadata: None, + first_row_id: None, }; collector @@ -974,6 +975,7 @@ mod tests { deleted_rows_count: Some(0), partitions: Some(Vec::new()), key_metadata: None, + first_row_id: None, }); summary_four.add_file( diff --git a/crates/iceberg/src/spec/table_metadata.rs b/crates/iceberg/src/spec/table_metadata.rs index d7347d50ee..f8b36b83bb 100644 --- a/crates/iceberg/src/spec/table_metadata.rs +++ b/crates/iceberg/src/spec/table_metadata.rs @@ -38,6 +38,7 @@ use super::{ }; use crate::error::{Result, timestamp_ms_to_utc}; use crate::io::FileIO; +use crate::spec::EncryptedKey; use crate::{Error, ErrorKind}; static MAIN_BRANCH: &str = "main"; @@ -131,6 +132,11 @@ pub const PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES: &str = "write.target-file-size- /// Default target file size pub const PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT: usize = 512 * 1024 * 1024; // 512 MB +/// Initial row id for row lineage for new v3 tables and older tables upgrading to v3. +pub const INITIAL_ROW_ID: u64 = 0; +/// Minimum format version that supports row lineage (v3). +pub const MIN_FORMAT_VERSION_ROW_LINEAGE: FormatVersion = FormatVersion::V3; + /// Reference to [`TableMetadata`]. pub type TableMetadataRef = Arc; @@ -208,8 +214,10 @@ pub struct TableMetadata { pub(crate) statistics: HashMap, /// Mapping of snapshot ids to partition statistics files. pub(crate) partition_statistics: HashMap, - /// Encryption Keys - pub(crate) encryption_keys: HashMap, + /// Encryption Keys - map of key id to the actual key + pub(crate) encryption_keys: HashMap, + /// Next row id to be assigned for Row Lineage (v3) + pub(crate) next_row_id: u64, } impl TableMetadata { @@ -483,16 +491,22 @@ impl TableMetadata { /// Iterate over all encryption keys #[inline] - pub fn encryption_keys_iter(&self) -> impl ExactSizeIterator { - self.encryption_keys.iter() + pub fn encryption_keys_iter(&self) -> impl ExactSizeIterator { + self.encryption_keys.values() } /// Get the encryption key for a given key id #[inline] - pub fn encryption_key(&self, key_id: &str) -> Option<&String> { + pub fn encryption_key(&self, key_id: &str) -> Option<&EncryptedKey> { self.encryption_keys.get(key_id) } + /// Get the next row id to be assigned + #[inline] + pub fn next_row_id(&self) -> u64 { + self.next_row_id + } + /// Read table metadata from the given location. pub async fn read_from( file_io: &FileIO, @@ -759,16 +773,18 @@ pub(super) mod _serde { TableMetadata, }; use crate::spec::schema::_serde::{SchemaV1, SchemaV2}; - use crate::spec::snapshot::_serde::{SnapshotV1, SnapshotV2}; + use crate::spec::snapshot::_serde::{SnapshotV1, SnapshotV2, SnapshotV3}; use crate::spec::{ - PartitionField, PartitionSpec, PartitionSpecRef, PartitionStatisticsFile, Schema, - SchemaRef, Snapshot, SnapshotReference, SnapshotRetention, SortOrder, StatisticsFile, + EncryptedKey, INITIAL_ROW_ID, PartitionField, PartitionSpec, PartitionSpecRef, + PartitionStatisticsFile, Schema, SchemaRef, Snapshot, SnapshotReference, SnapshotRetention, + SortOrder, StatisticsFile, }; use crate::{Error, ErrorKind}; #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] #[serde(untagged)] pub(super) enum TableMetadataEnum { + V3(TableMetadataV3), V2(TableMetadataV2), V1(TableMetadataV1), } @@ -776,8 +792,21 @@ pub(super) mod _serde { #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "kebab-case")] /// Defines the structure of a v2 table metadata for serialization/deserialization - pub(super) struct TableMetadataV2 { - pub format_version: VersionNumber<2>, + pub(super) struct TableMetadataV3 { + pub format_version: VersionNumber<3>, + #[serde(flatten)] + pub shared: TableMetadataV2V3Shared, + pub next_row_id: u64, + #[serde(skip_serializing_if = "Option::is_none")] + pub encryption_keys: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + pub snapshots: Option>, + } + + #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] + #[serde(rename_all = "kebab-case")] + /// Defines the structure of a v2 table metadata for serialization/deserialization + pub(super) struct TableMetadataV2V3Shared { pub table_uuid: Uuid, pub location: String, pub last_sequence_number: i64, @@ -793,8 +822,6 @@ pub(super) mod _serde { #[serde(skip_serializing_if = "Option::is_none")] pub current_snapshot_id: Option, #[serde(skip_serializing_if = "Option::is_none")] - pub snapshots: Option>, - #[serde(skip_serializing_if = "Option::is_none")] pub snapshot_log: Option>, #[serde(skip_serializing_if = "Option::is_none")] pub metadata_log: Option>, @@ -808,6 +835,17 @@ pub(super) mod _serde { pub partition_statistics: Vec, } + #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] + #[serde(rename_all = "kebab-case")] + /// Defines the structure of a v2 table metadata for serialization/deserialization + pub(super) struct TableMetadataV2 { + pub format_version: VersionNumber<2>, + #[serde(flatten)] + pub shared: TableMetadataV2V3Shared, + #[serde(skip_serializing_if = "Option::is_none")] + pub snapshots: Option>, + } + #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "kebab-case")] /// Defines the structure of a v1 table metadata for serialization/deserialization @@ -888,6 +926,7 @@ pub(super) mod _serde { type Error = Error; fn try_from(value: TableMetadataEnum) -> Result { match value { + TableMetadataEnum::V3(value) => value.try_into(), TableMetadataEnum::V2(value) => value.try_into(), TableMetadataEnum::V1(value) => value.try_into(), } @@ -898,15 +937,136 @@ pub(super) mod _serde { type Error = Error; fn try_from(value: TableMetadata) -> Result { Ok(match value.format_version { + FormatVersion::V3 => TableMetadataEnum::V3(value.try_into()?), FormatVersion::V2 => TableMetadataEnum::V2(value.into()), FormatVersion::V1 => TableMetadataEnum::V1(value.try_into()?), }) } } + impl TryFrom for TableMetadata { + type Error = Error; + fn try_from(value: TableMetadataV3) -> Result { + let TableMetadataV3 { + format_version: _, + shared: value, + next_row_id, + encryption_keys, + snapshots, + } = value; + let current_snapshot_id = if let &Some(-1) = &value.current_snapshot_id { + None + } else { + value.current_snapshot_id + }; + let schemas = HashMap::from_iter( + value + .schemas + .into_iter() + .map(|schema| Ok((schema.schema_id, Arc::new(schema.try_into()?)))) + .collect::, Error>>()?, + ); + + let current_schema: &SchemaRef = + schemas.get(&value.current_schema_id).ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!( + "No schema exists with the current schema id {}.", + value.current_schema_id + ), + ) + })?; + let partition_specs = HashMap::from_iter( + value + .partition_specs + .into_iter() + .map(|x| (x.spec_id(), Arc::new(x))), + ); + let default_spec_id = value.default_spec_id; + let default_spec: PartitionSpecRef = partition_specs + .get(&value.default_spec_id) + .map(|spec| (**spec).clone()) + .or_else(|| { + (DEFAULT_PARTITION_SPEC_ID == default_spec_id) + .then(PartitionSpec::unpartition_spec) + }) + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!("Default partition spec {default_spec_id} not found"), + ) + })? + .into(); + let default_partition_type = default_spec.partition_type(current_schema)?; + + let mut metadata = TableMetadata { + format_version: FormatVersion::V3, + table_uuid: value.table_uuid, + location: value.location, + last_sequence_number: value.last_sequence_number, + last_updated_ms: value.last_updated_ms, + last_column_id: value.last_column_id, + current_schema_id: value.current_schema_id, + schemas, + partition_specs, + default_partition_type, + default_spec, + last_partition_id: value.last_partition_id, + properties: value.properties.unwrap_or_default(), + current_snapshot_id, + snapshots: snapshots + .map(|snapshots| { + HashMap::from_iter( + snapshots + .into_iter() + .map(|x| (x.snapshot_id, Arc::new(x.into()))), + ) + }) + .unwrap_or_default(), + snapshot_log: value.snapshot_log.unwrap_or_default(), + metadata_log: value.metadata_log.unwrap_or_default(), + sort_orders: HashMap::from_iter( + value + .sort_orders + .into_iter() + .map(|x| (x.order_id, Arc::new(x))), + ), + default_sort_order_id: value.default_sort_order_id, + refs: value.refs.unwrap_or_else(|| { + if let Some(snapshot_id) = current_snapshot_id { + HashMap::from_iter(vec![(MAIN_BRANCH.to_string(), SnapshotReference { + snapshot_id, + retention: SnapshotRetention::Branch { + min_snapshots_to_keep: None, + max_snapshot_age_ms: None, + max_ref_age_ms: None, + }, + })]) + } else { + HashMap::new() + } + }), + statistics: index_statistics(value.statistics), + partition_statistics: index_partition_statistics(value.partition_statistics), + encryption_keys: encryption_keys + .map(|keys| { + HashMap::from_iter(keys.into_iter().map(|key| (key.key_id.clone(), key))) + }) + .unwrap_or_default(), + next_row_id, + }; + + metadata.borrow_mut().try_normalize()?; + Ok(metadata) + } + } + impl TryFrom for TableMetadata { type Error = Error; fn try_from(value: TableMetadataV2) -> Result { + let snapshots = value.snapshots; + let value = value.shared; let current_snapshot_id = if let &Some(-1) = &value.current_snapshot_id { None } else { @@ -968,8 +1128,7 @@ pub(super) mod _serde { last_partition_id: value.last_partition_id, properties: value.properties.unwrap_or_default(), current_snapshot_id, - snapshots: value - .snapshots + snapshots: snapshots .map(|snapshots| { HashMap::from_iter( snapshots @@ -1004,6 +1163,7 @@ pub(super) mod _serde { statistics: index_statistics(value.statistics), partition_statistics: index_partition_statistics(value.partition_statistics), encryption_keys: HashMap::new(), + next_row_id: INITIAL_ROW_ID, }; metadata.borrow_mut().try_normalize()?; @@ -1161,6 +1321,7 @@ pub(super) mod _serde { statistics: index_statistics(value.statistics), partition_statistics: index_partition_statistics(value.partition_statistics), encryption_keys: HashMap::new(), + next_row_id: INITIAL_ROW_ID, // v1 has no row lineage }; metadata.borrow_mut().try_normalize()?; @@ -1168,10 +1329,63 @@ pub(super) mod _serde { } } + impl TryFrom for TableMetadataV3 { + type Error = Error; + + fn try_from(mut v: TableMetadata) -> Result { + let next_row_id = v.next_row_id; + let encryption_keys = std::mem::take(&mut v.encryption_keys); + let snapshots = std::mem::take(&mut v.snapshots); + let shared = v.into(); + + Ok(TableMetadataV3 { + format_version: VersionNumber::<3>, + shared, + next_row_id, + encryption_keys: if encryption_keys.is_empty() { + None + } else { + Some(encryption_keys.into_values().collect()) + }, + snapshots: if snapshots.is_empty() { + None + } else { + Some( + snapshots + .into_values() + .map(|s| SnapshotV3::try_from(Arc::unwrap_or_clone(s))) + .collect::>()?, + ) + }, + }) + } + } + impl From for TableMetadataV2 { - fn from(v: TableMetadata) -> Self { + fn from(mut v: TableMetadata) -> Self { + let snapshots = std::mem::take(&mut v.snapshots); + let shared = v.into(); + TableMetadataV2 { format_version: VersionNumber::<2>, + shared, + snapshots: if snapshots.is_empty() { + None + } else { + Some( + snapshots + .into_values() + .map(|s| SnapshotV2::from(Arc::unwrap_or_clone(s))) + .collect(), + ) + }, + } + } + } + + impl From for TableMetadataV2V3Shared { + fn from(v: TableMetadata) -> Self { + TableMetadataV2V3Shared { table_uuid: v.table_uuid, location: v.location, last_sequence_number: v.last_sequence_number, @@ -1200,20 +1414,6 @@ pub(super) mod _serde { Some(v.properties) }, current_snapshot_id: v.current_snapshot_id, - snapshots: if v.snapshots.is_empty() { - None - } else { - Some( - v.snapshots - .into_values() - .map(|x| { - Arc::try_unwrap(x) - .unwrap_or_else(|snapshot| snapshot.as_ref().clone()) - .into() - }) - .collect(), - ) - }, snapshot_log: if v.snapshot_log.is_empty() { None } else { @@ -1343,6 +1543,8 @@ pub enum FormatVersion { V1 = 1u8, /// Iceberg spec version 2 V2 = 2u8, + /// Iceberg spec version 3 + V3 = 3u8, } impl PartialOrd for FormatVersion { @@ -1362,6 +1564,7 @@ impl Display for FormatVersion { match self { FormatVersion::V1 => write!(f, "v1"), FormatVersion::V2 => write!(f, "v2"), + FormatVersion::V3 => write!(f, "v3"), } } } @@ -1406,6 +1609,7 @@ mod tests { use std::sync::Arc; use anyhow::Result; + use base64::Engine as _; use pretty_assertions::assert_eq; use tempfile::TempDir; use uuid::Uuid; @@ -1415,9 +1619,10 @@ mod tests { use crate::io::FileIOBuilder; use crate::spec::table_metadata::TableMetadata; use crate::spec::{ - BlobMetadata, NestedField, NullOrder, Operation, PartitionSpec, PartitionStatisticsFile, - PrimitiveType, Schema, Snapshot, SnapshotReference, SnapshotRetention, SortDirection, - SortField, SortOrder, StatisticsFile, Summary, Transform, Type, UnboundPartitionField, + BlobMetadata, EncryptedKey, INITIAL_ROW_ID, Literal, NestedField, NullOrder, Operation, + PartitionSpec, PartitionStatisticsFile, PrimitiveLiteral, PrimitiveType, Schema, Snapshot, + SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder, StatisticsFile, + Summary, Transform, Type, UnboundPartitionField, }; fn check_table_metadata_serde(json: &str, expected_type: TableMetadata) { @@ -1563,6 +1768,183 @@ mod tests { statistics: HashMap::new(), partition_statistics: HashMap::new(), encryption_keys: HashMap::new(), + next_row_id: INITIAL_ROW_ID, + }; + + let expected_json_value = serde_json::to_value(&expected).unwrap(); + check_table_metadata_serde(data, expected); + + let json_value = serde_json::from_str::(data).unwrap(); + assert_eq!(json_value, expected_json_value); + } + + #[test] + fn test_table_data_v3() { + let data = r#" + { + "format-version" : 3, + "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94", + "location": "s3://b/wh/data.db/table", + "last-sequence-number" : 1, + "last-updated-ms": 1515100955770, + "last-column-id": 1, + "next-row-id": 5, + "schemas": [ + { + "schema-id" : 1, + "type" : "struct", + "fields" :[ + { + "id": 4, + "name": "ts", + "required": true, + "type": "timestamp" + } + ] + } + ], + "current-schema-id" : 1, + "partition-specs": [ + { + "spec-id": 0, + "fields": [ + { + "source-id": 4, + "field-id": 1000, + "name": "ts_day", + "transform": "day" + } + ] + } + ], + "default-spec-id": 0, + "last-partition-id": 1000, + "properties": { + "commit.retry.num-retries": "1" + }, + "metadata-log": [ + { + "metadata-file": "s3://bucket/.../v1.json", + "timestamp-ms": 1515100 + } + ], + "refs": {}, + "snapshots" : [ { + "snapshot-id" : 1, + "timestamp-ms" : 1662532818843, + "sequence-number" : 0, + "first-row-id" : 0, + "added-rows" : 4, + "key-id" : "key1", + "summary" : { + "operation" : "append" + }, + "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro", + "schema-id" : 0 + } + ], + "encryption-keys": [ + { + "key-id": "key1", + "encrypted-by-id": "KMS", + "encrypted-key-metadata": "c29tZS1lbmNyeXB0aW9uLWtleQ==", + "properties": { + "p1": "v1" + } + } + ], + "sort-orders": [ + { + "order-id": 0, + "fields": [] + } + ], + "default-sort-order-id": 0 + } + "#; + + let schema = Schema::builder() + .with_schema_id(1) + .with_fields(vec![Arc::new(NestedField::required( + 4, + "ts", + Type::Primitive(PrimitiveType::Timestamp), + ))]) + .build() + .unwrap(); + + let partition_spec = PartitionSpec::builder(schema.clone()) + .with_spec_id(0) + .add_unbound_field(UnboundPartitionField { + name: "ts_day".to_string(), + transform: Transform::Day, + source_id: 4, + field_id: Some(1000), + }) + .unwrap() + .build() + .unwrap(); + + let snapshot = Snapshot::builder() + .with_snapshot_id(1) + .with_timestamp_ms(1662532818843) + .with_sequence_number(0) + .with_row_range(0, 4) + .with_encryption_key_id(Some("key1".to_string())) + .with_summary(Summary { + operation: Operation::Append, + additional_properties: HashMap::new(), + }) + .with_manifest_list("/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro".to_string()) + .with_schema_id(0) + .build(); + + let encryption_key = EncryptedKey::builder() + .key_id("key1".to_string()) + .encrypted_by_id("KMS".to_string()) + .encrypted_key_metadata( + base64::prelude::BASE64_STANDARD + .decode("c29tZS1lbmNyeXB0aW9uLWtleQ==") + .unwrap(), + ) + .properties(HashMap::from_iter(vec![( + "p1".to_string(), + "v1".to_string(), + )])) + .build(); + + let default_partition_type = partition_spec.partition_type(&schema).unwrap(); + let expected = TableMetadata { + format_version: FormatVersion::V3, + table_uuid: Uuid::parse_str("fb072c92-a02b-11e9-ae9c-1bb7bc9eca94").unwrap(), + location: "s3://b/wh/data.db/table".to_string(), + last_updated_ms: 1515100955770, + last_column_id: 1, + schemas: HashMap::from_iter(vec![(1, Arc::new(schema))]), + current_schema_id: 1, + partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]), + default_partition_type, + default_spec: partition_spec.into(), + last_partition_id: 1000, + default_sort_order_id: 0, + sort_orders: HashMap::from_iter(vec![(0, SortOrder::unsorted_order().into())]), + snapshots: HashMap::from_iter(vec![(1, snapshot.into())]), + current_snapshot_id: None, + last_sequence_number: 1, + properties: HashMap::from_iter(vec![( + "commit.retry.num-retries".to_string(), + "1".to_string(), + )]), + snapshot_log: Vec::new(), + metadata_log: vec![MetadataLog { + metadata_file: "s3://bucket/.../v1.json".to_string(), + timestamp_ms: 1515100, + }], + refs: HashMap::new(), + statistics: HashMap::new(), + partition_statistics: HashMap::new(), + encryption_keys: HashMap::from_iter(vec![("key1".to_string(), encryption_key)]), + next_row_id: 5, }; let expected_json_value = serde_json::to_value(&expected).unwrap(); @@ -1739,6 +2121,7 @@ mod tests { statistics: HashMap::new(), partition_statistics: HashMap::new(), encryption_keys: HashMap::new(), + next_row_id: INITIAL_ROW_ID, }; check_table_metadata_serde(data, expected); @@ -1837,6 +2220,7 @@ mod tests { statistics: HashMap::new(), partition_statistics: HashMap::new(), encryption_keys: HashMap::new(), + next_row_id: INITIAL_ROW_ID, }; let expected_json_value = serde_json::to_value(&expected).unwrap(); @@ -2372,6 +2756,7 @@ mod tests { }, })]), encryption_keys: HashMap::new(), + next_row_id: INITIAL_ROW_ID, }; check_table_metadata_serde(data, expected); @@ -2507,6 +2892,7 @@ mod tests { }, })]), encryption_keys: HashMap::new(), + next_row_id: INITIAL_ROW_ID, }; check_table_metadata_serde(data, expected); @@ -2535,6 +2921,95 @@ mod tests { Ok(()) } + #[test] + fn test_table_metadata_v3_valid_minimal() { + let metadata_str = + fs::read_to_string("testdata/table_metadata/TableMetadataV3ValidMinimal.json").unwrap(); + + let table_metadata = serde_json::from_str::(&metadata_str).unwrap(); + assert_eq!(table_metadata.format_version, FormatVersion::V3); + + let schema = Schema::builder() + .with_schema_id(0) + .with_fields(vec![ + Arc::new( + NestedField::required(1, "x", Type::Primitive(PrimitiveType::Long)) + .with_initial_default(Literal::Primitive(PrimitiveLiteral::Long(1))) + .with_write_default(Literal::Primitive(PrimitiveLiteral::Long(1))), + ), + Arc::new( + NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long)) + .with_doc("comment"), + ), + Arc::new(NestedField::required( + 3, + "z", + Type::Primitive(PrimitiveType::Long), + )), + ]) + .build() + .unwrap(); + + let partition_spec = PartitionSpec::builder(schema.clone()) + .with_spec_id(0) + .add_unbound_field(UnboundPartitionField { + name: "x".to_string(), + transform: Transform::Identity, + source_id: 1, + field_id: Some(1000), + }) + .unwrap() + .build() + .unwrap(); + + let sort_order = SortOrder::builder() + .with_order_id(3) + .with_sort_field(SortField { + source_id: 2, + transform: Transform::Identity, + direction: SortDirection::Ascending, + null_order: NullOrder::First, + }) + .with_sort_field(SortField { + source_id: 3, + transform: Transform::Bucket(4), + direction: SortDirection::Descending, + null_order: NullOrder::Last, + }) + .build_unbound() + .unwrap(); + + let default_partition_type = partition_spec.partition_type(&schema).unwrap(); + let expected = TableMetadata { + format_version: FormatVersion::V3, + table_uuid: Uuid::parse_str("9c12d441-03fe-4693-9a96-a0705ddf69c1").unwrap(), + location: "s3://bucket/test/location".to_string(), + last_updated_ms: 1602638573590, + last_column_id: 3, + schemas: HashMap::from_iter(vec![(0, Arc::new(schema))]), + current_schema_id: 0, + partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]), + default_spec: Arc::new(partition_spec), + default_partition_type, + last_partition_id: 1000, + default_sort_order_id: 3, + sort_orders: HashMap::from_iter(vec![(3, sort_order.into())]), + snapshots: HashMap::default(), + current_snapshot_id: None, + last_sequence_number: 34, + properties: HashMap::new(), + snapshot_log: Vec::new(), + metadata_log: Vec::new(), + refs: HashMap::new(), + statistics: HashMap::new(), + partition_statistics: HashMap::new(), + encryption_keys: HashMap::new(), + next_row_id: 0, // V3 specific field from the JSON + }; + + check_table_metadata_serde(&metadata_str, expected); + } + #[test] fn test_table_metadata_v2_file_valid() { let metadata = @@ -2669,6 +3144,7 @@ mod tests { statistics: HashMap::new(), partition_statistics: HashMap::new(), encryption_keys: HashMap::new(), + next_row_id: INITIAL_ROW_ID, }; check_table_metadata_serde(&metadata, expected); @@ -2754,6 +3230,7 @@ mod tests { statistics: HashMap::new(), partition_statistics: HashMap::new(), encryption_keys: HashMap::new(), + next_row_id: INITIAL_ROW_ID, }; check_table_metadata_serde(&metadata, expected); @@ -2823,6 +3300,7 @@ mod tests { statistics: HashMap::new(), partition_statistics: HashMap::new(), encryption_keys: HashMap::new(), + next_row_id: INITIAL_ROW_ID, }; check_table_metadata_serde(&metadata, expected); diff --git a/crates/iceberg/src/spec/table_metadata_builder.rs b/crates/iceberg/src/spec/table_metadata_builder.rs index 068f2002f9..50a1d18fd1 100644 --- a/crates/iceberg/src/spec/table_metadata_builder.rs +++ b/crates/iceberg/src/spec/table_metadata_builder.rs @@ -29,6 +29,7 @@ use super::{ TableMetadata, UNPARTITIONED_LAST_ASSIGNED_ID, UnboundPartitionSpec, }; use crate::error::{Error, ErrorKind, Result}; +use crate::spec::{EncryptedKey, INITIAL_ROW_ID, MIN_FORMAT_VERSION_ROW_LINEAGE}; use crate::{TableCreation, TableUpdate}; const FIRST_FIELD_ID: u32 = 1; @@ -121,6 +122,7 @@ impl TableMetadataBuilder { statistics: HashMap::new(), partition_statistics: HashMap::new(), encryption_keys: HashMap::new(), + next_row_id: INITIAL_ROW_ID, }, last_updated_ms: None, changes: vec![], @@ -171,6 +173,7 @@ impl TableMetadataBuilder { partition_spec, sort_order, properties, + format_version, } = table_creation; let location = location.ok_or_else(|| { @@ -189,7 +192,7 @@ impl TableMetadataBuilder { partition_spec, sort_order.unwrap_or(SortOrder::unsorted_order()), location, - FormatVersion::V2, + format_version, properties, ) } @@ -229,6 +232,11 @@ impl TableMetadataBuilder { self.changes .push(TableUpdate::UpgradeFormatVersion { format_version }); } + FormatVersion::V3 => { + self.metadata.format_version = format_version; + self.changes + .push(TableUpdate::UpgradeFormatVersion { format_version }); + } } } @@ -330,6 +338,9 @@ impl TableMetadataBuilder { /// # Errors /// - Snapshot id already exists. /// - For format version > 1: the sequence number of the snapshot is lower than the highest sequence number specified so far. + /// - For format version >= 3: the first-row-id of the snapshot is lower than the next-row-id of the table. + /// - For format version >= 3: added-rows is null or first-row-id is null. + /// - For format version >= 3: next-row-id would overflow when adding added-rows. pub fn add_snapshot(mut self, snapshot: Snapshot) -> Result { if self .metadata @@ -386,6 +397,43 @@ impl TableMetadataBuilder { )); } + let mut added_rows = None; + if self.metadata.format_version >= MIN_FORMAT_VERSION_ROW_LINEAGE { + if let Some((first_row_id, added_rows_count)) = snapshot.row_range() { + if first_row_id < self.metadata.next_row_id { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Cannot add a snapshot, first-row-id is behind table next-row-id: {first_row_id} < {}", + self.metadata.next_row_id + ), + )); + } + + added_rows = Some(added_rows_count); + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Cannot add a snapshot: first-row-id is null. first-row-id must be set for format version >= {MIN_FORMAT_VERSION_ROW_LINEAGE}", + ), + )); + } + } + + if let Some(added_rows) = added_rows { + self.metadata.next_row_id = self + .metadata + .next_row_id + .checked_add(added_rows) + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + "Cannot add snapshot: next-row-id overflowed when adding added-rows", + ) + })?; + } + // Mutation happens in next line - must be infallible from here self.changes.push(TableUpdate::AddSnapshot { snapshot: snapshot.clone(), @@ -1019,6 +1067,31 @@ impl TableMetadataBuilder { .set_default_sort_order(Self::LAST_ADDED as i64) } + /// Add an encryption key to the table metadata. + pub fn add_encryption_key(mut self, key: EncryptedKey) -> Self { + let key_id = key.key_id().to_string(); + if self.metadata.encryption_keys.contains_key(&key_id) { + // already exists + return self; + } + + self.metadata.encryption_keys.insert(key_id, key.clone()); + self.changes.push(TableUpdate::AddEncryptionKey { + encryption_key: key, + }); + self + } + + /// Remove an encryption key from the table metadata. + pub fn remove_encryption_key(mut self, key_id: &str) -> Self { + if self.metadata.encryption_keys.remove(key_id).is_some() { + self.changes.push(TableUpdate::RemoveEncryptionKey { + key_id: key_id.to_string(), + }); + } + self + } + /// Build the table metadata. pub fn build(mut self) -> Result { self.metadata.last_updated_ms = self @@ -2994,4 +3067,382 @@ mod tests { assert!(result.is_ok()); } + + #[test] + fn test_row_lineage_addition() { + let new_rows = 30; + let base = builder_without_changes(FormatVersion::V3) + .build() + .unwrap() + .metadata; + let add_rows = Snapshot::builder() + .with_snapshot_id(0) + .with_timestamp_ms(base.last_updated_ms + 1) + .with_sequence_number(0) + .with_schema_id(0) + .with_manifest_list("foo") + .with_parent_snapshot_id(None) + .with_summary(Summary { + operation: Operation::Append, + additional_properties: HashMap::new(), + }) + .with_row_range(base.next_row_id(), new_rows) + .build(); + + let first_addition = base + .into_builder(None) + .add_snapshot(add_rows.clone()) + .unwrap() + .build() + .unwrap() + .metadata; + + assert_eq!(first_addition.next_row_id(), new_rows); + + let add_more_rows = Snapshot::builder() + .with_snapshot_id(1) + .with_timestamp_ms(first_addition.last_updated_ms + 1) + .with_sequence_number(1) + .with_schema_id(0) + .with_manifest_list("foo") + .with_parent_snapshot_id(Some(0)) + .with_summary(Summary { + operation: Operation::Append, + additional_properties: HashMap::new(), + }) + .with_row_range(first_addition.next_row_id(), new_rows) + .build(); + + let second_addition = first_addition + .into_builder(None) + .add_snapshot(add_more_rows) + .unwrap() + .build() + .unwrap() + .metadata; + assert_eq!(second_addition.next_row_id(), new_rows * 2); + } + + #[test] + fn test_row_lineage_invalid_snapshot() { + let new_rows = 30; + let base = builder_without_changes(FormatVersion::V3) + .build() + .unwrap() + .metadata; + + // add rows to check TableMetadata validation; Snapshot rejects negative next-row-id + let add_rows = Snapshot::builder() + .with_snapshot_id(0) + .with_timestamp_ms(base.last_updated_ms + 1) + .with_sequence_number(0) + .with_schema_id(0) + .with_manifest_list("foo") + .with_parent_snapshot_id(None) + .with_summary(Summary { + operation: Operation::Append, + additional_properties: HashMap::new(), + }) + .with_row_range(base.next_row_id(), new_rows) + .build(); + + let added = base + .into_builder(None) + .add_snapshot(add_rows) + .unwrap() + .build() + .unwrap() + .metadata; + + let invalid_new_rows = Snapshot::builder() + .with_snapshot_id(1) + .with_timestamp_ms(added.last_updated_ms + 1) + .with_sequence_number(1) + .with_schema_id(0) + .with_manifest_list("foo") + .with_parent_snapshot_id(Some(0)) + .with_summary(Summary { + operation: Operation::Append, + additional_properties: HashMap::new(), + }) + // first_row_id is behind table next_row_id + .with_row_range(added.next_row_id() - 1, 10) + .build(); + + let err = added + .into_builder(None) + .add_snapshot(invalid_new_rows) + .unwrap_err(); + assert!( + err.to_string().contains( + "Cannot add a snapshot, first-row-id is behind table next-row-id: 29 < 30" + ) + ); + } + + #[test] + fn test_row_lineage_append_branch() { + // Appends to a branch should still change last-row-id even if not on main, these changes + // should also affect commits to main + + let branch = "some_branch"; + + // Start with V3 metadata to support row lineage + let base = builder_without_changes(FormatVersion::V3) + .build() + .unwrap() + .metadata; + + // Initial next_row_id should be 0 + assert_eq!(base.next_row_id(), 0); + + // Write to Branch - append 30 rows + let branch_snapshot_1 = Snapshot::builder() + .with_snapshot_id(1) + .with_timestamp_ms(base.last_updated_ms + 1) + .with_sequence_number(0) + .with_schema_id(0) + .with_manifest_list("foo") + .with_parent_snapshot_id(None) + .with_summary(Summary { + operation: Operation::Append, + additional_properties: HashMap::new(), + }) + .with_row_range(base.next_row_id(), 30) + .build(); + + let table_after_branch_1 = base + .into_builder(None) + .set_branch_snapshot(branch_snapshot_1.clone(), branch) + .unwrap() + .build() + .unwrap() + .metadata; + + // Current snapshot should be null (no main branch snapshot yet) + assert!(table_after_branch_1.current_snapshot().is_none()); + + // Branch snapshot should have first_row_id = 0 + let branch_ref = table_after_branch_1.refs.get(branch).unwrap(); + let branch_snap_1 = table_after_branch_1 + .snapshots + .get(&branch_ref.snapshot_id) + .unwrap(); + assert_eq!(branch_snap_1.first_row_id(), Some(0)); + + // Next row id should be 30 + assert_eq!(table_after_branch_1.next_row_id(), 30); + + // Write to Main - append 28 rows + let main_snapshot = Snapshot::builder() + .with_snapshot_id(2) + .with_timestamp_ms(table_after_branch_1.last_updated_ms + 1) + .with_sequence_number(1) + .with_schema_id(0) + .with_manifest_list("bar") + .with_parent_snapshot_id(None) + .with_summary(Summary { + operation: Operation::Append, + additional_properties: HashMap::new(), + }) + .with_row_range(table_after_branch_1.next_row_id(), 28) + .build(); + + let table_after_main = table_after_branch_1 + .into_builder(None) + .add_snapshot(main_snapshot.clone()) + .unwrap() + .set_ref(MAIN_BRANCH, SnapshotReference { + snapshot_id: main_snapshot.snapshot_id(), + retention: SnapshotRetention::Branch { + min_snapshots_to_keep: None, + max_snapshot_age_ms: None, + max_ref_age_ms: None, + }, + }) + .unwrap() + .build() + .unwrap() + .metadata; + + // Main snapshot should have first_row_id = 30 + let current_snapshot = table_after_main.current_snapshot().unwrap(); + assert_eq!(current_snapshot.first_row_id(), Some(30)); + + // Next row id should be 58 (30 + 28) + assert_eq!(table_after_main.next_row_id(), 58); + + // Write again to branch - append 21 rows + let branch_snapshot_2 = Snapshot::builder() + .with_snapshot_id(3) + .with_timestamp_ms(table_after_main.last_updated_ms + 1) + .with_sequence_number(2) + .with_schema_id(0) + .with_manifest_list("baz") + .with_parent_snapshot_id(Some(branch_snapshot_1.snapshot_id())) + .with_summary(Summary { + operation: Operation::Append, + additional_properties: HashMap::new(), + }) + .with_row_range(table_after_main.next_row_id(), 21) + .build(); + + let table_after_branch_2 = table_after_main + .into_builder(None) + .set_branch_snapshot(branch_snapshot_2.clone(), branch) + .unwrap() + .build() + .unwrap() + .metadata; + + // Branch snapshot should have first_row_id = 58 (30 + 28) + let branch_ref_2 = table_after_branch_2.refs.get(branch).unwrap(); + let branch_snap_2 = table_after_branch_2 + .snapshots + .get(&branch_ref_2.snapshot_id) + .unwrap(); + assert_eq!(branch_snap_2.first_row_id(), Some(58)); + + // Next row id should be 79 (30 + 28 + 21) + assert_eq!(table_after_branch_2.next_row_id(), 79); + } + + #[test] + fn test_encryption_keys() { + let builder = builder_without_changes(FormatVersion::V2); + + // Create test encryption keys + let encryption_key_1 = EncryptedKey::builder() + .key_id("key-1") + .encrypted_key_metadata(vec![1, 2, 3, 4]) + .encrypted_by_id("encryption-service-1") + .properties(HashMap::from_iter(vec![( + "algorithm".to_string(), + "AES-256".to_string(), + )])) + .build(); + + let encryption_key_2 = EncryptedKey::builder() + .key_id("key-2") + .encrypted_key_metadata(vec![5, 6, 7, 8]) + .encrypted_by_id("encryption-service-2") + .properties(HashMap::new()) + .build(); + + // Add first encryption key + let build_result = builder + .add_encryption_key(encryption_key_1.clone()) + .build() + .unwrap(); + + assert_eq!(build_result.changes.len(), 1); + assert_eq!(build_result.metadata.encryption_keys.len(), 1); + assert_eq!( + build_result.metadata.encryption_key("key-1"), + Some(&encryption_key_1) + ); + assert_eq!(build_result.changes[0], TableUpdate::AddEncryptionKey { + encryption_key: encryption_key_1.clone() + }); + + // Add second encryption key + let build_result = build_result + .metadata + .into_builder(Some( + "s3://bucket/test/location/metadata/metadata1.json".to_string(), + )) + .add_encryption_key(encryption_key_2.clone()) + .build() + .unwrap(); + + assert_eq!(build_result.changes.len(), 1); + assert_eq!(build_result.metadata.encryption_keys.len(), 2); + assert_eq!( + build_result.metadata.encryption_key("key-1"), + Some(&encryption_key_1) + ); + assert_eq!( + build_result.metadata.encryption_key("key-2"), + Some(&encryption_key_2) + ); + assert_eq!(build_result.changes[0], TableUpdate::AddEncryptionKey { + encryption_key: encryption_key_2.clone() + }); + + // Try to add duplicate key - should not create a change + let build_result = build_result + .metadata + .into_builder(Some( + "s3://bucket/test/location/metadata/metadata2.json".to_string(), + )) + .add_encryption_key(encryption_key_1.clone()) + .build() + .unwrap(); + + assert_eq!(build_result.changes.len(), 0); + assert_eq!(build_result.metadata.encryption_keys.len(), 2); + + // Remove first encryption key + let build_result = build_result + .metadata + .into_builder(Some( + "s3://bucket/test/location/metadata/metadata3.json".to_string(), + )) + .remove_encryption_key("key-1") + .build() + .unwrap(); + + assert_eq!(build_result.changes.len(), 1); + assert_eq!(build_result.metadata.encryption_keys.len(), 1); + assert_eq!(build_result.metadata.encryption_key("key-1"), None); + assert_eq!( + build_result.metadata.encryption_key("key-2"), + Some(&encryption_key_2) + ); + assert_eq!(build_result.changes[0], TableUpdate::RemoveEncryptionKey { + key_id: "key-1".to_string() + }); + + // Try to remove non-existent key - should not create a change + let build_result = build_result + .metadata + .into_builder(Some( + "s3://bucket/test/location/metadata/metadata4.json".to_string(), + )) + .remove_encryption_key("non-existent-key") + .build() + .unwrap(); + + assert_eq!(build_result.changes.len(), 0); + assert_eq!(build_result.metadata.encryption_keys.len(), 1); + + // Test encryption_keys_iter() + let keys = build_result + .metadata + .encryption_keys_iter() + .collect::>(); + assert_eq!(keys.len(), 1); + assert_eq!(keys[0], &encryption_key_2); + + // Remove last encryption key + let build_result = build_result + .metadata + .into_builder(Some( + "s3://bucket/test/location/metadata/metadata5.json".to_string(), + )) + .remove_encryption_key("key-2") + .build() + .unwrap(); + + assert_eq!(build_result.changes.len(), 1); + assert_eq!(build_result.metadata.encryption_keys.len(), 0); + assert_eq!(build_result.metadata.encryption_key("key-2"), None); + assert_eq!(build_result.changes[0], TableUpdate::RemoveEncryptionKey { + key_id: "key-2".to_string() + }); + + // Verify empty encryption_keys_iter() + let keys = build_result.metadata.encryption_keys_iter(); + assert_eq!(keys.len(), 0); + } } diff --git a/crates/iceberg/src/transaction/mod.rs b/crates/iceberg/src/transaction/mod.rs index 06549a95c5..919946cc0e 100644 --- a/crates/iceberg/src/transaction/mod.rs +++ b/crates/iceberg/src/transaction/mod.rs @@ -284,7 +284,7 @@ mod tests { use crate::spec::TableMetadata; use crate::table::Table; use crate::transaction::{ApplyTransactionAction, Transaction}; - use crate::{Error, ErrorKind, TableIdent}; + use crate::{Catalog, Error, ErrorKind, TableCreation, TableIdent}; pub fn make_v1_table() -> Table { let file = File::open(format!( @@ -343,8 +343,41 @@ mod tests { .unwrap() } + pub(crate) async fn make_v3_minimal_table_in_catalog(catalog: &impl Catalog) -> Table { + let table_ident = + TableIdent::from_strs([format!("ns1-{}", uuid::Uuid::new_v4()), "test1".to_string()]) + .unwrap(); + + catalog + .create_namespace(table_ident.namespace(), HashMap::new()) + .await + .unwrap(); + + let file = File::open(format!( + "{}/testdata/table_metadata/{}", + env!("CARGO_MANIFEST_DIR"), + "TableMetadataV3ValidMinimal.json" + )) + .unwrap(); + let reader = BufReader::new(file); + let base_metadata = serde_json::from_reader::<_, TableMetadata>(reader).unwrap(); + + let table_creation = TableCreation::builder() + .schema((**base_metadata.current_schema()).clone()) + .partition_spec((**base_metadata.default_partition_spec()).clone()) + .sort_order((**base_metadata.default_sort_order()).clone()) + .name(table_ident.name().to_string()) + .format_version(crate::spec::FormatVersion::V3) + .build(); + + catalog + .create_table(table_ident.namespace(), table_creation) + .await + .unwrap() + } + /// Helper function to create a test table with retry properties - fn setup_test_table(num_retries: &str) -> Table { + pub(super) fn setup_test_table(num_retries: &str) -> Table { let table = make_v2_table(); // Set retry properties @@ -510,3 +543,88 @@ mod tests { } } } + +#[cfg(test)] +mod test_row_lineage { + use crate::memory::tests::new_memory_catalog; + use crate::spec::{ + DataContentType, DataFile, DataFileBuilder, DataFileFormat, Literal, Struct, + }; + use crate::transaction::tests::make_v3_minimal_table_in_catalog; + use crate::transaction::{ApplyTransactionAction, Transaction}; + + #[tokio::test] + async fn test_fast_append_with_row_lineage() { + // Helper function to create a data file with specified number of rows + fn file_with_rows(record_count: u64) -> DataFile { + DataFileBuilder::default() + .content(DataContentType::Data) + .file_path(format!("test/{}.parquet", record_count)) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(100) + .record_count(record_count) + .partition(Struct::from_iter([Some(Literal::long(0))])) + .partition_spec_id(0) + .build() + .unwrap() + } + let catalog = new_memory_catalog().await; + + let table = make_v3_minimal_table_in_catalog(&catalog).await; + + // Check initial state - next_row_id should be 0 + assert_eq!(table.metadata().next_row_id(), 0); + + // First fast append with 30 rows + let tx = Transaction::new(&table); + let data_file_30 = file_with_rows(30); + let action = tx.fast_append().add_data_files(vec![data_file_30]); + let tx = action.apply(tx).unwrap(); + let table = tx.commit(&catalog).await.unwrap(); + + // Check snapshot and table state after first append + let snapshot = table.metadata().current_snapshot().unwrap(); + assert_eq!(snapshot.first_row_id(), Some(0)); + assert_eq!(table.metadata().next_row_id(), 30); + + // Check written manifest for first_row_id + let manifest_list = table + .metadata() + .current_snapshot() + .unwrap() + .load_manifest_list(table.file_io(), table.metadata()) + .await + .unwrap(); + + assert_eq!(manifest_list.entries().len(), 1); + let manifest_file = &manifest_list.entries()[0]; + assert_eq!(manifest_file.first_row_id, Some(0)); + + // Second fast append with 17 and 11 rows + let tx = Transaction::new(&table); + let data_file_17 = file_with_rows(17); + let data_file_11 = file_with_rows(11); + let action = tx + .fast_append() + .add_data_files(vec![data_file_17, data_file_11]); + let tx = action.apply(tx).unwrap(); + let table = tx.commit(&catalog).await.unwrap(); + + // Check snapshot and table state after second append + let snapshot = table.metadata().current_snapshot().unwrap(); + assert_eq!(snapshot.first_row_id(), Some(30)); + assert_eq!(table.metadata().next_row_id(), 30 + 17 + 11); + + // Check written manifest for first_row_id + let manifest_list = table + .metadata() + .current_snapshot() + .unwrap() + .load_manifest_list(table.file_io(), table.metadata()) + .await + .unwrap(); + assert_eq!(manifest_list.entries().len(), 2); + let manifest_file = &manifest_list.entries()[1]; + assert_eq!(manifest_file.first_row_id, Some(30)); + } +} diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index 48dc2b5b90..5eb121e4b3 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -207,13 +207,16 @@ impl<'a> SnapshotProducer<'a> { .as_ref() .clone(), ); - if self.table.metadata().format_version() == FormatVersion::V1 { - Ok(builder.build_v1()) - } else { - match content { + match self.table.metadata().format_version() { + FormatVersion::V1 => Ok(builder.build_v1()), + FormatVersion::V2 => match content { ManifestContentType::Data => Ok(builder.build_v2_data()), ManifestContentType::Deletes => Ok(builder.build_v2_deletes()), - } + }, + FormatVersion::V3 => match content { + ManifestContentType::Data => Ok(builder.build_v3_data()), + ManifestContentType::Deletes => Ok(builder.build_v3_deletes()), + }, } } @@ -385,6 +388,7 @@ impl<'a> SnapshotProducer<'a> { .manifest_file(&snapshot_produce_operation, &process) .await?; let next_seq_num = self.table.metadata().next_sequence_number(); + let first_row_id = self.table.metadata().next_row_id(); let summary = self.summary(&snapshot_produce_operation).map_err(|err| { Error::new(ErrorKind::Unexpected, "Failed to create snapshot summary.").with_source(err) @@ -408,8 +412,18 @@ impl<'a> SnapshotProducer<'a> { self.table.metadata().current_snapshot_id(), next_seq_num, ), + FormatVersion::V3 => ManifestListWriter::v3( + self.table + .file_io() + .new_output(manifest_list_path.clone())?, + self.snapshot_id, + self.table.metadata().current_snapshot_id(), + next_seq_num, + Some(first_row_id), + ), }; manifest_list_writer.add_manifests(new_manifests.into_iter())?; + let writer_next_row_id = manifest_list_writer.next_row_id(); manifest_list_writer.close().await?; let commit_ts = chrono::Utc::now().timestamp_millis(); @@ -420,8 +434,16 @@ impl<'a> SnapshotProducer<'a> { .with_sequence_number(next_seq_num) .with_summary(summary) .with_schema_id(self.table.metadata().current_schema_id()) - .with_timestamp_ms(commit_ts) - .build(); + .with_timestamp_ms(commit_ts); + + let new_snapshot = if let Some(writer_next_row_id) = writer_next_row_id { + let assigned_rows = writer_next_row_id - self.table.metadata().next_row_id(); + new_snapshot + .with_row_range(first_row_id, assigned_rows) + .build() + } else { + new_snapshot.build() + }; let updates = vec![ TableUpdate::AddSnapshot { diff --git a/crates/iceberg/src/writer/file_writer/location_generator.rs b/crates/iceberg/src/writer/file_writer/location_generator.rs index 4a73fc148d..49f3fb32e9 100644 --- a/crates/iceberg/src/writer/file_writer/location_generator.rs +++ b/crates/iceberg/src/writer/file_writer/location_generator.rs @@ -183,6 +183,7 @@ pub(crate) mod test { statistics: HashMap::new(), partition_statistics: HashMap::new(), encryption_keys: HashMap::new(), + next_row_id: 0, }; let file_name_generator = super::DefaultFileNameGenerator::new( @@ -297,6 +298,7 @@ pub(crate) mod test { statistics: HashMap::new(), partition_statistics: HashMap::new(), encryption_keys: HashMap::new(), + next_row_id: 0, }; // Test with DefaultLocationGenerator diff --git a/crates/iceberg/testdata/table_metadata/TableMetadataV3ValidMinimal.json b/crates/iceberg/testdata/table_metadata/TableMetadataV3ValidMinimal.json new file mode 100644 index 0000000000..bf85114c02 --- /dev/null +++ b/crates/iceberg/testdata/table_metadata/TableMetadataV3ValidMinimal.json @@ -0,0 +1,74 @@ +{ + "format-version": 3, + "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1", + "location": "s3://bucket/test/location", + "last-sequence-number": 34, + "last-updated-ms": 1602638573590, + "last-column-id": 3, + "current-schema-id": 0, + "next-row-id": 0, + "schemas": [ + { + "type": "struct", + "schema-id": 0, + "fields": [ + { + "id": 1, + "name": "x", + "required": true, + "type": "long", + "initial-default": 1, + "write-default": 1 + }, + { + "id": 2, + "name": "y", + "required": true, + "type": "long", + "doc": "comment" + }, + { + "id": 3, + "name": "z", + "required": true, + "type": "long" + } + ] + } + ], + "default-spec-id": 0, + "partition-specs": [ + { + "spec-id": 0, + "fields": [ + { + "name": "x", + "transform": "identity", + "source-id": 1, + "field-id": 1000 + } + ] + } + ], + "last-partition-id": 1000, + "default-sort-order-id": 3, + "sort-orders": [ + { + "order-id": 3, + "fields": [ + { + "transform": "identity", + "source-id": 2, + "direction": "asc", + "null-order": "nulls-first" + }, + { + "transform": "bucket[4]", + "source-id": 3, + "direction": "desc", + "null-order": "nulls-last" + } + ] + } + ] +} \ No newline at end of file