Skip to content

Commit af2a28b

Browse files
committed
feat: Iceberg Metadata V3 Support
1 parent c04b7ac commit af2a28b

File tree

16 files changed

+1452
-63
lines changed

16 files changed

+1452
-63
lines changed

crates/iceberg/src/catalog/memory/catalog.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -377,7 +377,7 @@ impl Catalog for MemoryCatalog {
377377
}
378378

379379
#[cfg(test)]
380-
mod tests {
380+
pub(crate) mod tests {
381381
use std::collections::HashSet;
382382
use std::hash::Hash;
383383
use std::iter::FromIterator;
@@ -396,7 +396,7 @@ mod tests {
396396
temp_dir.path().to_str().unwrap().to_string()
397397
}
398398

399-
async fn new_memory_catalog() -> impl Catalog {
399+
pub(crate) async fn new_memory_catalog() -> impl Catalog {
400400
let warehouse_location = temp_path();
401401
MemoryCatalogBuilder::default()
402402
.load(

crates/iceberg/src/catalog/mod.rs

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,9 @@ use typed_builder::TypedBuilder;
3939
use uuid::Uuid;
4040

4141
use crate::spec::{
42-
FormatVersion, PartitionStatisticsFile, Schema, SchemaId, Snapshot, SnapshotReference,
43-
SortOrder, StatisticsFile, TableMetadata, TableMetadataBuilder, UnboundPartitionSpec,
44-
ViewFormatVersion, ViewRepresentations, ViewVersion,
42+
EncryptedKey, FormatVersion, PartitionStatisticsFile, Schema, SchemaId, Snapshot,
43+
SnapshotReference, SortOrder, StatisticsFile, TableMetadata, TableMetadataBuilder,
44+
UnboundPartitionSpec, ViewFormatVersion, ViewRepresentations, ViewVersion,
4545
};
4646
use crate::table::Table;
4747
use crate::{Error, ErrorKind, Result};
@@ -291,6 +291,9 @@ pub struct TableCreation {
291291
props.into_iter().collect()
292292
}))]
293293
pub properties: HashMap<String, String>,
294+
/// Format version of the table. Defaults to V2.
295+
#[builder(default = FormatVersion::V2)]
296+
pub format_version: FormatVersion,
294297
}
295298

296299
/// TableCommit represents the commit of a table in the catalog.
@@ -554,6 +557,18 @@ pub enum TableUpdate {
554557
/// Schema IDs to remove.
555558
schema_ids: Vec<i32>,
556559
},
560+
/// Add an encryption key
561+
#[serde(rename_all = "kebab-case")]
562+
AddEncryptionKey {
563+
/// The encryption key to add.
564+
encryption_key: EncryptedKey,
565+
},
566+
/// Remove an encryption key
567+
#[serde(rename_all = "kebab-case")]
568+
RemoveEncryptionKey {
569+
/// The id of the encryption key to remove.
570+
key_id: String,
571+
},
557572
}
558573

559574
impl TableUpdate {
@@ -598,6 +613,12 @@ impl TableUpdate {
598613
Ok(builder.remove_partition_statistics(snapshot_id))
599614
}
600615
TableUpdate::RemoveSchemas { schema_ids } => builder.remove_schemas(&schema_ids),
616+
TableUpdate::AddEncryptionKey { encryption_key } => {
617+
Ok(builder.add_encryption_key(encryption_key))
618+
}
619+
TableUpdate::RemoveEncryptionKey { key_id } => {
620+
Ok(builder.remove_encryption_key(&key_id))
621+
}
601622
}
602623
}
603624
}

crates/iceberg/src/expr/visitors/manifest_evaluator.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -703,6 +703,7 @@ mod test {
703703
deleted_rows_count: None,
704704
partitions: Some(partitions),
705705
key_metadata: None,
706+
first_row_id: None,
706707
}
707708
}
708709

crates/iceberg/src/spec/manifest/data_file.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use serde_with::{DeserializeFromStr, SerializeDisplay};
2626
use super::_serde::DataFileSerde;
2727
use super::{Datum, FormatVersion, Schema, data_file_schema_v1, data_file_schema_v2};
2828
use crate::error::Result;
29+
use crate::spec::manifest::data_file_schema_v3;
2930
use crate::spec::{Struct, StructType};
3031
use crate::{Error, ErrorKind};
3132

@@ -293,6 +294,8 @@ pub fn write_data_files_to_avro<W: Write>(
293294
let avro_schema = match version {
294295
FormatVersion::V1 => data_file_schema_v1(partition_type).unwrap(),
295296
FormatVersion::V2 => data_file_schema_v2(partition_type).unwrap(),
297+
// Todo: Why unwrap here and not return the error?
298+
FormatVersion::V3 => data_file_schema_v3(partition_type).unwrap(),
296299
};
297300
let mut writer = AvroWriter::new(&avro_schema, writer);
298301

@@ -320,6 +323,8 @@ pub fn read_data_files_from_avro<R: Read>(
320323
let avro_schema = match version {
321324
FormatVersion::V1 => data_file_schema_v1(partition_type).unwrap(),
322325
FormatVersion::V2 => data_file_schema_v2(partition_type).unwrap(),
326+
// Todo: Why unwrap here and not return the error?
327+
FormatVersion::V3 => data_file_schema_v3(partition_type).unwrap(),
323328
};
324329

325330
let reader = AvroReader::with_schema(&avro_schema, reader)?;

crates/iceberg/src/spec/manifest/entry.rs

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -509,6 +509,42 @@ static CONTENT_SIZE_IN_BYTES: Lazy<NestedFieldRef> = {
509509
})
510510
};
511511

512+
fn data_file_fields_v3(partition_type: &StructType) -> Vec<NestedFieldRef> {
513+
vec![
514+
CONTENT.clone(),
515+
FILE_PATH.clone(),
516+
FILE_FORMAT.clone(),
517+
Arc::new(NestedField::required(
518+
102,
519+
"partition",
520+
Type::Struct(partition_type.clone()),
521+
)),
522+
RECORD_COUNT.clone(),
523+
FILE_SIZE_IN_BYTES.clone(),
524+
COLUMN_SIZES.clone(),
525+
VALUE_COUNTS.clone(),
526+
NULL_VALUE_COUNTS.clone(),
527+
NAN_VALUE_COUNTS.clone(),
528+
LOWER_BOUNDS.clone(),
529+
UPPER_BOUNDS.clone(),
530+
KEY_METADATA.clone(),
531+
SPLIT_OFFSETS.clone(),
532+
EQUALITY_IDS.clone(),
533+
SORT_ORDER_ID.clone(),
534+
FIRST_ROW_ID.clone(),
535+
REFERENCE_DATA_FILE.clone(),
536+
CONTENT_OFFSET.clone(),
537+
CONTENT_SIZE_IN_BYTES.clone(),
538+
]
539+
}
540+
541+
pub(super) fn data_file_schema_v3(partition_type: &StructType) -> Result<AvroSchema> {
542+
let schema = Schema::builder()
543+
.with_fields(data_file_fields_v3(partition_type))
544+
.build()?;
545+
schema_to_avro_schema("data_file", &schema)
546+
}
547+
512548
fn data_file_fields_v2(partition_type: &StructType) -> Vec<NestedFieldRef> {
513549
vec![
514550
CONTENT.clone(),
@@ -532,6 +568,7 @@ fn data_file_fields_v2(partition_type: &StructType) -> Vec<NestedFieldRef> {
532568
EQUALITY_IDS.clone(),
533569
SORT_ORDER_ID.clone(),
534570
FIRST_ROW_ID.clone(),
571+
// Why are these three here - shouldn't they be v3 only?
535572
REFERENCE_DATA_FILE.clone(),
536573
CONTENT_OFFSET.clone(),
537574
CONTENT_SIZE_IN_BYTES.clone(),

crates/iceberg/src/spec/manifest/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,8 @@ impl Manifest {
7070
})
7171
.collect::<Result<Vec<_>>>()?
7272
}
73-
FormatVersion::V2 => {
73+
// Manifest Schema & Manifest Entry did not change between V2 and V3
74+
FormatVersion::V2 | FormatVersion::V3 => {
7475
let schema = manifest_schema_v2(&partition_type)?;
7576
let reader = AvroReader::with_schema(&schema, bs)?;
7677
reader

crates/iceberg/src/spec/manifest/writer.rs

Lines changed: 70 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,13 @@ impl ManifestWriterBuilder {
7272
.format_version(FormatVersion::V1)
7373
.content(ManifestContentType::Data)
7474
.build();
75-
ManifestWriter::new(self.output, self.snapshot_id, self.key_metadata, metadata)
75+
ManifestWriter::new(
76+
self.output,
77+
self.snapshot_id,
78+
self.key_metadata,
79+
metadata,
80+
None,
81+
)
7682
}
7783

7884
/// Build a [`ManifestWriter`] for format version 2, data content.
@@ -84,7 +90,13 @@ impl ManifestWriterBuilder {
8490
.format_version(FormatVersion::V2)
8591
.content(ManifestContentType::Data)
8692
.build();
87-
ManifestWriter::new(self.output, self.snapshot_id, self.key_metadata, metadata)
93+
ManifestWriter::new(
94+
self.output,
95+
self.snapshot_id,
96+
self.key_metadata,
97+
metadata,
98+
None,
99+
)
88100
}
89101

90102
/// Build a [`ManifestWriter`] for format version 2, deletes content.
@@ -96,7 +108,51 @@ impl ManifestWriterBuilder {
96108
.format_version(FormatVersion::V2)
97109
.content(ManifestContentType::Deletes)
98110
.build();
99-
ManifestWriter::new(self.output, self.snapshot_id, self.key_metadata, metadata)
111+
ManifestWriter::new(
112+
self.output,
113+
self.snapshot_id,
114+
self.key_metadata,
115+
metadata,
116+
None,
117+
)
118+
}
119+
120+
/// Build a [`ManifestWriter`] for format version 2, data content.
121+
pub fn build_v3_data(self) -> ManifestWriter {
122+
let metadata = ManifestMetadata::builder()
123+
.schema_id(self.schema.schema_id())
124+
.schema(self.schema)
125+
.partition_spec(self.partition_spec)
126+
.format_version(FormatVersion::V3)
127+
.content(ManifestContentType::Data)
128+
.build();
129+
ManifestWriter::new(
130+
self.output,
131+
self.snapshot_id,
132+
self.key_metadata,
133+
metadata,
134+
// First row id is assigned by the [`ManifestListWriter`] when the manifest
135+
// is added to the list.
136+
None,
137+
)
138+
}
139+
140+
/// Build a [`ManifestWriter`] for format version 2, deletes content.
141+
pub fn build_v3_deletes(self) -> ManifestWriter {
142+
let metadata = ManifestMetadata::builder()
143+
.schema_id(self.schema.schema_id())
144+
.schema(self.schema)
145+
.partition_spec(self.partition_spec)
146+
.format_version(FormatVersion::V3)
147+
.content(ManifestContentType::Deletes)
148+
.build();
149+
ManifestWriter::new(
150+
self.output,
151+
self.snapshot_id,
152+
self.key_metadata,
153+
metadata,
154+
None,
155+
)
100156
}
101157
}
102158

@@ -112,6 +168,7 @@ pub struct ManifestWriter {
112168
existing_rows: u64,
113169
deleted_files: u32,
114170
deleted_rows: u64,
171+
first_row_id: Option<u64>,
115172

116173
min_seq_num: Option<i64>,
117174

@@ -129,6 +186,7 @@ impl ManifestWriter {
129186
snapshot_id: Option<i64>,
130187
key_metadata: Option<Vec<u8>>,
131188
metadata: ManifestMetadata,
189+
first_row_id: Option<u64>,
132190
) -> Self {
133191
Self {
134192
output,
@@ -139,6 +197,7 @@ impl ManifestWriter {
139197
existing_rows: 0,
140198
deleted_files: 0,
141199
deleted_rows: 0,
200+
first_row_id,
142201
min_seq_num: None,
143202
key_metadata,
144203
manifest_entries: Vec::new(),
@@ -348,7 +407,8 @@ impl ManifestWriter {
348407
let table_schema = &self.metadata.schema;
349408
let avro_schema = match self.metadata.format_version {
350409
FormatVersion::V1 => manifest_schema_v1(&partition_type)?,
351-
FormatVersion::V2 => manifest_schema_v2(&partition_type)?,
410+
// Manifest schema did not change between V2 and V3
411+
FormatVersion::V2 | FormatVersion::V3 => manifest_schema_v2(&partition_type)?,
352412
};
353413
let mut avro_writer = AvroWriter::new(&avro_schema, Vec::new());
354414
avro_writer.add_user_metadata(
@@ -388,8 +448,11 @@ impl ManifestWriter {
388448
let value = match self.metadata.format_version {
389449
FormatVersion::V1 => to_value(ManifestEntryV1::try_from(entry, &partition_type)?)?
390450
.resolve(&avro_schema)?,
391-
FormatVersion::V2 => to_value(ManifestEntryV2::try_from(entry, &partition_type)?)?
392-
.resolve(&avro_schema)?,
451+
// Manifest entry format did not change between V2 and V3
452+
FormatVersion::V2 | FormatVersion::V3 => {
453+
to_value(ManifestEntryV2::try_from(entry, &partition_type)?)?
454+
.resolve(&avro_schema)?
455+
}
393456
};
394457

395458
avro_writer.append(value)?;
@@ -417,6 +480,7 @@ impl ManifestWriter {
417480
deleted_rows_count: Some(self.deleted_rows),
418481
partitions: Some(partition_summary),
419482
key_metadata: self.key_metadata,
483+
first_row_id: self.first_row_id,
420484
})
421485
}
422486
}

0 commit comments

Comments
 (0)