Skip to content

Commit a32a7e7

Browse files
committed
feat: add register table for catalogs
Signed-off-by: Mustafa Elbehery <[email protected]>
1 parent b3ea8d1 commit a32a7e7

File tree

14 files changed

+1025
-76
lines changed

14 files changed

+1025
-76
lines changed

crates/iceberg/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ expect-test = { workspace = true }
6767
fnv = { workspace = true }
6868
futures = { workspace = true }
6969
itertools = { workspace = true }
70+
lz4_flex = "0.11"
7071
moka = { version = "0.12.10", features = ["future"] }
7172
murmur3 = { workspace = true }
7273
num-bigint = { workspace = true }

crates/iceberg/src/arrow/caching_delete_file_loader.rs

Lines changed: 218 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,12 @@
1717

1818
use std::collections::HashMap;
1919

20-
use arrow_array::{Int64Array, StringArray};
20+
use arrow_array::{
21+
BinaryArray, BooleanArray, Date32Array, Decimal128Array, Float32Array, Float64Array,
22+
Int32Array, Int64Array, LargeBinaryArray, LargeStringArray, StringArray,
23+
TimestampMicrosecondArray,
24+
};
25+
use arrow_schema;
2126
use futures::{StreamExt, TryStreamExt};
2227
use tokio::sync::oneshot::{Receiver, channel};
2328

@@ -38,7 +43,7 @@ pub(crate) struct CachingDeleteFileLoader {
3843

3944
// Intermediate context during processing of a delete file task.
4045
enum DeleteFileContext {
41-
// TODO: Delete Vector loader from Puffin files
46+
DelVecs(HashMap<String, DeleteVector>),
4247
ExistingEqDel,
4348
PosDels(ArrowRecordBatchStream),
4449
FreshEqDel {
@@ -200,6 +205,11 @@ impl CachingDeleteFileLoader {
200205
del_filter: DeleteFilter,
201206
schema: SchemaRef,
202207
) -> Result<DeleteFileContext> {
208+
// Check if the file is a Puffin file (by extension or by trying to read it as such)
209+
if Self::is_puffin_file(&task.file_path) {
210+
return Self::load_puffin_delete_vectors(&task.file_path, &basic_delete_file_loader).await;
211+
}
212+
203213
match task.file_type {
204214
DataContentType::PositionDeletes => Ok(DeleteFileContext::PosDels(
205215
basic_delete_file_loader
@@ -238,6 +248,7 @@ impl CachingDeleteFileLoader {
238248
ctx: DeleteFileContext,
239249
) -> Result<ParsedDeleteFileContext> {
240250
match ctx {
251+
DeleteFileContext::DelVecs(hash_map) => Ok(ParsedDeleteFileContext::DelVecs(hash_map)),
241252
DeleteFileContext::ExistingEqDel => Ok(ParsedDeleteFileContext::EqDel),
242253
DeleteFileContext::PosDels(batch_stream) => {
243254
let del_vecs =
@@ -311,15 +322,214 @@ impl CachingDeleteFileLoader {
311322
/// Parses record batch streams from individual equality delete files
312323
///
313324
/// Returns an unbound Predicate for each batch stream
325+
///
326+
/// Equality delete files contain rows where each row represents values that should be deleted.
327+
/// For example, if the equality IDs are [1, 2] representing columns "name" and "age",
328+
/// and the file contains rows [("Alice", 25), ("Bob", 30)], then any data rows matching
329+
/// (name="Alice" AND age=25) OR (name="Bob" AND age=30) should be deleted.
314330
async fn parse_equality_deletes_record_batch_stream(
315-
streams: ArrowRecordBatchStream,
331+
mut stream: ArrowRecordBatchStream,
316332
) -> Result<Predicate> {
317-
// TODO
333+
use crate::expr::Predicate::*;
334+
use crate::expr::{Reference, Literal as ExprLiteral};
335+
use crate::spec::{Literal, PrimitiveLiteral};
336+
use arrow_array::Array;
337+
338+
let mut combined_predicates = Vec::new();
339+
340+
while let Some(batch) = stream.next().await {
341+
let batch = batch?;
342+
let schema = batch.schema();
343+
344+
// Process each row in the batch
345+
for row_idx in 0..batch.num_rows() {
346+
let mut row_conditions = Vec::new();
347+
348+
// For each column in the equality delete file, create an equality condition
349+
for col_idx in 0..batch.num_columns() {
350+
let column = batch.column(col_idx);
351+
let field = schema.field(col_idx);
352+
353+
// Extract the field ID from metadata
354+
let field_id = field
355+
.metadata()
356+
.get("parquet_field_id")
357+
.or_else(|| field.metadata().get("PARQUET:field_id"))
358+
.ok_or_else(|| {
359+
Error::new(
360+
ErrorKind::DataInvalid,
361+
format!("Missing field ID for column '{}'", field.name()),
362+
)
363+
})?
364+
.parse::<i32>()
365+
.map_err(|_| {
366+
Error::new(
367+
ErrorKind::DataInvalid,
368+
"Invalid field ID format",
369+
)
370+
})?;
371+
372+
// Skip if the value is null
373+
if column.is_null(row_idx) {
374+
continue;
375+
}
376+
377+
// Convert Arrow value to Iceberg Literal based on data type
378+
let literal = match field.data_type() {
379+
arrow_schema::DataType::Boolean => {
380+
let array = column.as_any().downcast_ref::<arrow_array::BooleanArray>()
381+
.ok_or_else(|| Error::new(ErrorKind::DataInvalid, "Expected BooleanArray"))?;
382+
Literal::bool(array.value(row_idx))
383+
},
384+
arrow_schema::DataType::Int32 => {
385+
let array = column.as_any().downcast_ref::<arrow_array::Int32Array>()
386+
.ok_or_else(|| Error::new(ErrorKind::DataInvalid, "Expected Int32Array"))?;
387+
Literal::int(array.value(row_idx))
388+
},
389+
arrow_schema::DataType::Int64 => {
390+
let array = column.as_any().downcast_ref::<arrow_array::Int64Array>()
391+
.ok_or_else(|| Error::new(ErrorKind::DataInvalid, "Expected Int64Array"))?;
392+
Literal::long(array.value(row_idx))
393+
},
394+
arrow_schema::DataType::Float32 => {
395+
let array = column.as_any().downcast_ref::<arrow_array::Float32Array>()
396+
.ok_or_else(|| Error::new(ErrorKind::DataInvalid, "Expected Float32Array"))?;
397+
Literal::float(array.value(row_idx))
398+
},
399+
arrow_schema::DataType::Float64 => {
400+
let array = column.as_any().downcast_ref::<arrow_array::Float64Array>()
401+
.ok_or_else(|| Error::new(ErrorKind::DataInvalid, "Expected Float64Array"))?;
402+
Literal::double(array.value(row_idx))
403+
},
404+
arrow_schema::DataType::Utf8 => {
405+
let array = column.as_any().downcast_ref::<arrow_array::StringArray>()
406+
.ok_or_else(|| Error::new(ErrorKind::DataInvalid, "Expected StringArray"))?;
407+
Literal::string(array.value(row_idx))
408+
},
409+
arrow_schema::DataType::LargeUtf8 => {
410+
let array = column.as_any().downcast_ref::<arrow_array::LargeStringArray>()
411+
.ok_or_else(|| Error::new(ErrorKind::DataInvalid, "Expected LargeStringArray"))?;
412+
Literal::string(array.value(row_idx))
413+
},
414+
arrow_schema::DataType::Binary => {
415+
let array = column.as_any().downcast_ref::<arrow_array::BinaryArray>()
416+
.ok_or_else(|| Error::new(ErrorKind::DataInvalid, "Expected BinaryArray"))?;
417+
Literal::binary(array.value(row_idx).to_vec())
418+
},
419+
arrow_schema::DataType::LargeBinary => {
420+
let array = column.as_any().downcast_ref::<arrow_array::LargeBinaryArray>()
421+
.ok_or_else(|| Error::new(ErrorKind::DataInvalid, "Expected LargeBinaryArray"))?;
422+
Literal::binary(array.value(row_idx).to_vec())
423+
},
424+
arrow_schema::DataType::Date32 => {
425+
let array = column.as_any().downcast_ref::<arrow_array::Date32Array>()
426+
.ok_or_else(|| Error::new(ErrorKind::DataInvalid, "Expected Date32Array"))?;
427+
Literal::date(array.value(row_idx))
428+
},
429+
arrow_schema::DataType::Timestamp(_, _) => {
430+
let array = column.as_any().downcast_ref::<arrow_array::TimestampMicrosecondArray>()
431+
.ok_or_else(|| Error::new(ErrorKind::DataInvalid, "Expected TimestampMicrosecondArray"))?;
432+
Literal::timestamp_micros(array.value(row_idx))
433+
},
434+
arrow_schema::DataType::Decimal128(precision, scale) => {
435+
let array = column.as_any().downcast_ref::<arrow_array::Decimal128Array>()
436+
.ok_or_else(|| Error::new(ErrorKind::DataInvalid, "Expected Decimal128Array"))?;
437+
Literal::decimal_from_i128(array.value(row_idx), *precision as u32, *scale as u32)?
438+
},
439+
_ => {
440+
return Err(Error::new(
441+
ErrorKind::FeatureUnsupported,
442+
format!("Unsupported data type for equality delete: {:?}", field.data_type()),
443+
));
444+
}
445+
};
446+
447+
// Create equality condition: field_id = literal
448+
let condition = Equal {
449+
term: Box::new(Reference::new(field.name().to_string())),
450+
literal: ExprLiteral::new(literal),
451+
};
452+
453+
row_conditions.push(condition);
454+
}
455+
456+
// If we have conditions for this row, combine them with AND
457+
if !row_conditions.is_empty() {
458+
let row_predicate = row_conditions.into_iter().reduce(|acc, condition| And {
459+
left: Box::new(acc),
460+
right: Box::new(condition),
461+
}).unwrap();
462+
463+
combined_predicates.push(row_predicate);
464+
}
465+
}
466+
}
467+
468+
// Combine all row predicates with OR (any matching row should be deleted)
469+
if combined_predicates.is_empty() {
470+
Ok(AlwaysFalse) // No rows to delete
471+
} else {
472+
let final_predicate = combined_predicates.into_iter().reduce(|acc, predicate| Or {
473+
left: Box::new(acc),
474+
right: Box::new(predicate),
475+
}).unwrap();
476+
477+
Ok(final_predicate)
478+
}
479+
}
480+
481+
/// Check if a file is a Puffin file based on file extension or magic bytes
482+
fn is_puffin_file(file_path: &str) -> bool {
483+
file_path.ends_with(".puffin") || file_path.ends_with(".bin")
484+
}
485+
486+
/// Load Delete Vectors from a Puffin file
487+
async fn load_puffin_delete_vectors(
488+
file_path: &str,
489+
basic_delete_file_loader: &BasicDeleteFileLoader,
490+
) -> Result<DeleteFileContext> {
491+
use crate::puffin::{PuffinReader, DELETION_VECTOR_V1};
492+
493+
let input_file = basic_delete_file_loader.file_io().new_input(file_path)?;
494+
let puffin_reader = PuffinReader::new(input_file);
495+
let file_metadata = puffin_reader.file_metadata().await?;
496+
497+
let mut delete_vectors = HashMap::new();
498+
499+
// Process each blob in the Puffin file
500+
for blob_metadata in file_metadata.blobs() {
501+
if blob_metadata.blob_type() == DELETION_VECTOR_V1 {
502+
let blob = puffin_reader.blob(blob_metadata).await?;
503+
let delete_vector = Self::parse_delete_vector_blob(&blob)?;
504+
505+
// For now, we'll assume the delete vector applies to all files
506+
// In a real implementation, we would need to determine which data files
507+
// this delete vector applies to based on the blob metadata properties
508+
if let Some(data_file_path) = blob.properties().get("data-file-path") {
509+
delete_vectors.insert(data_file_path.clone(), delete_vector);
510+
}
511+
}
512+
}
513+
514+
Ok(DeleteFileContext::DelVecs(delete_vectors))
515+
}
318516

319-
Err(Error::new(
320-
ErrorKind::FeatureUnsupported,
321-
"parsing of equality deletes is not yet supported",
322-
))
517+
/// Parse a deletion vector blob from Puffin format into a DeleteVector
518+
fn parse_delete_vector_blob(blob: &crate::puffin::Blob) -> Result<DeleteVector> {
519+
use roaring::RoaringTreemap;
520+
521+
// According to the Iceberg spec, deletion vectors are stored as RoaringBitmap
522+
// in the "portable" format for 64-bit implementations
523+
let data = blob.data();
524+
525+
// Parse the RoaringTreemap from the blob data
526+
let roaring_treemap = RoaringTreemap::deserialize_from(std::io::Cursor::new(data))
527+
.map_err(|e| Error::new(
528+
ErrorKind::DataInvalid,
529+
format!("Failed to deserialize deletion vector: {}", e),
530+
))?;
531+
532+
Ok(DeleteVector::new(roaring_treemap))
323533
}
324534
}
325535

crates/iceberg/src/arrow/delete_file_loader.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,11 @@ impl BasicDeleteFileLoader {
5050
pub fn new(file_io: FileIO) -> Self {
5151
BasicDeleteFileLoader { file_io }
5252
}
53+
54+
/// Get a reference to the FileIO instance
55+
pub(crate) fn file_io(&self) -> &FileIO {
56+
&self.file_io
57+
}
5358
/// Loads a RecordBatchStream for a given datafile.
5459
pub(crate) async fn parquet_to_batch_stream(
5560
&self,

crates/iceberg/src/arrow/delete_filter.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -139,8 +139,8 @@ impl DeleteFilter {
139139
return Ok(None);
140140
}
141141

142-
// TODO: handle case-insensitive case
143-
let bound_predicate = combined_predicate.bind(file_scan_task.schema.clone(), false)?;
142+
// Use the case sensitivity setting from the file scan task
143+
let bound_predicate = combined_predicate.bind(file_scan_task.schema.clone(), file_scan_task.case_sensitive)?;
144144
Ok(Some(bound_predicate))
145145
}
146146

crates/iceberg/src/delete_file_index.rs

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,9 @@ struct PopulatedDeleteFileIndex {
4545
global_deletes: Vec<Arc<DeleteFileContext>>,
4646
eq_deletes_by_partition: HashMap<Struct, Vec<Arc<DeleteFileContext>>>,
4747
pos_deletes_by_partition: HashMap<Struct, Vec<Arc<DeleteFileContext>>>,
48+
deletion_vectors_by_partition: HashMap<Struct, Vec<Arc<DeleteFileContext>>>,
4849
// TODO: do we need this?
4950
// pos_deletes_by_path: HashMap<String, Vec<Arc<DeleteFileContext>>>,
50-
51-
// TODO: Deletion Vector support
5251
}
5352

5453
impl DeleteFileIndex {
@@ -121,6 +120,8 @@ impl PopulatedDeleteFileIndex {
121120
HashMap::default();
122121
let mut pos_deletes_by_partition: HashMap<Struct, Vec<Arc<DeleteFileContext>>> =
123122
HashMap::default();
123+
let mut deletion_vectors_by_partition: HashMap<Struct, Vec<Arc<DeleteFileContext>>> =
124+
HashMap::default();
124125

125126
let mut global_deletes: Vec<Arc<DeleteFileContext>> = vec![];
126127

@@ -141,7 +142,10 @@ impl PopulatedDeleteFileIndex {
141142
let destination_map = match arc_ctx.manifest_entry.content_type() {
142143
DataContentType::PositionDeletes => &mut pos_deletes_by_partition,
143144
DataContentType::EqualityDeletes => &mut eq_deletes_by_partition,
144-
_ => unreachable!(),
145+
// For Deletion Vectors, we would need a new DataContentType variant or
146+
// detect them based on file format/metadata
147+
// For now, we'll assume they go in the deletion_vectors_by_partition
148+
_ => &mut deletion_vectors_by_partition,
145149
};
146150

147151
destination_map
@@ -156,6 +160,7 @@ impl PopulatedDeleteFileIndex {
156160
global_deletes,
157161
eq_deletes_by_partition,
158162
pos_deletes_by_partition,
163+
deletion_vectors_by_partition,
159164
}
160165
}
161166

@@ -189,10 +194,8 @@ impl PopulatedDeleteFileIndex {
189194
.for_each(|delete| results.push(delete.as_ref().into()));
190195
}
191196

192-
// TODO: the spec states that:
193-
// "The data file's file_path is equal to the delete file's referenced_data_file if it is non-null".
194-
// we're not yet doing that here. The referenced data file's name will also be present in the positional
195-
// delete file's file path column.
197+
// Process positional delete files
198+
// The spec states that "The data file's file_path is equal to the delete file's referenced_data_file if it is non-null".
196199
if let Some(deletes) = self.pos_deletes_by_partition.get(data_file.partition()) {
197200
deletes
198201
.iter()
@@ -202,6 +205,17 @@ impl PopulatedDeleteFileIndex {
202205
.map(|seq_num| delete.manifest_entry.sequence_number() >= Some(seq_num))
203206
.unwrap_or_else(|| true)
204207
})
208+
// filter by referenced_data_file if present
209+
.filter(|&delete| {
210+
// If the delete file has a referenced_data_file set, only apply it to matching data files
211+
if let Some(ref_data_file) = delete.manifest_entry.data_file().referenced_data_file() {
212+
data_file.file_path().to_string() == ref_data_file
213+
} else {
214+
// If no referenced_data_file is set, apply to all data files (as the referenced data file
215+
// paths will be present in the delete file's content)
216+
true
217+
}
218+
})
205219
.for_each(|delete| results.push(delete.as_ref().into()));
206220
}
207221

0 commit comments

Comments
 (0)