Skip to content

feat: add register table for catalogs #1550

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/iceberg/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ expect-test = { workspace = true }
fnv = { workspace = true }
futures = { workspace = true }
itertools = { workspace = true }
lz4_flex = "0.11"
moka = { version = "0.12.10", features = ["future"] }
murmur3 = { workspace = true }
num-bigint = { workspace = true }
Expand Down
226 changes: 218 additions & 8 deletions crates/iceberg/src/arrow/caching_delete_file_loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,12 @@

use std::collections::HashMap;

use arrow_array::{Int64Array, StringArray};
use arrow_array::{
BinaryArray, BooleanArray, Date32Array, Decimal128Array, Float32Array, Float64Array,
Int32Array, Int64Array, LargeBinaryArray, LargeStringArray, StringArray,
TimestampMicrosecondArray,
};
use arrow_schema;
use futures::{StreamExt, TryStreamExt};
use tokio::sync::oneshot::{Receiver, channel};

Expand All @@ -38,7 +43,7 @@ pub(crate) struct CachingDeleteFileLoader {

// Intermediate context during processing of a delete file task.
enum DeleteFileContext {
// TODO: Delete Vector loader from Puffin files
DelVecs(HashMap<String, DeleteVector>),
ExistingEqDel,
PosDels(ArrowRecordBatchStream),
FreshEqDel {
Expand Down Expand Up @@ -200,6 +205,11 @@ impl CachingDeleteFileLoader {
del_filter: DeleteFilter,
schema: SchemaRef,
) -> Result<DeleteFileContext> {
// Check if the file is a Puffin file (by extension or by trying to read it as such)
if Self::is_puffin_file(&task.file_path) {
return Self::load_puffin_delete_vectors(&task.file_path, &basic_delete_file_loader).await;
}

match task.file_type {
DataContentType::PositionDeletes => Ok(DeleteFileContext::PosDels(
basic_delete_file_loader
Expand Down Expand Up @@ -238,6 +248,7 @@ impl CachingDeleteFileLoader {
ctx: DeleteFileContext,
) -> Result<ParsedDeleteFileContext> {
match ctx {
DeleteFileContext::DelVecs(hash_map) => Ok(ParsedDeleteFileContext::DelVecs(hash_map)),
DeleteFileContext::ExistingEqDel => Ok(ParsedDeleteFileContext::EqDel),
DeleteFileContext::PosDels(batch_stream) => {
let del_vecs =
Expand Down Expand Up @@ -311,15 +322,214 @@ impl CachingDeleteFileLoader {
/// Parses record batch streams from individual equality delete files
///
/// Returns an unbound Predicate for each batch stream
///
/// Equality delete files contain rows where each row represents values that should be deleted.
/// For example, if the equality IDs are [1, 2] representing columns "name" and "age",
/// and the file contains rows [("Alice", 25), ("Bob", 30)], then any data rows matching
/// (name="Alice" AND age=25) OR (name="Bob" AND age=30) should be deleted.
async fn parse_equality_deletes_record_batch_stream(
streams: ArrowRecordBatchStream,
mut stream: ArrowRecordBatchStream,
) -> Result<Predicate> {
// TODO
use crate::expr::Predicate::*;
use crate::expr::{Reference, Literal as ExprLiteral};
use crate::spec::{Literal, PrimitiveLiteral};
use arrow_array::Array;

let mut combined_predicates = Vec::new();

while let Some(batch) = stream.next().await {
let batch = batch?;
let schema = batch.schema();

// Process each row in the batch
for row_idx in 0..batch.num_rows() {
let mut row_conditions = Vec::new();

// For each column in the equality delete file, create an equality condition
for col_idx in 0..batch.num_columns() {
let column = batch.column(col_idx);
let field = schema.field(col_idx);

// Extract the field ID from metadata
let field_id = field
.metadata()
.get("parquet_field_id")
.or_else(|| field.metadata().get("PARQUET:field_id"))
.ok_or_else(|| {
Error::new(
ErrorKind::DataInvalid,
format!("Missing field ID for column '{}'", field.name()),
)
})?
.parse::<i32>()
.map_err(|_| {
Error::new(
ErrorKind::DataInvalid,
"Invalid field ID format",
)
})?;

// Skip if the value is null
if column.is_null(row_idx) {
continue;
}

// Convert Arrow value to Iceberg Literal based on data type
let literal = match field.data_type() {
arrow_schema::DataType::Boolean => {
let array = column.as_any().downcast_ref::<arrow_array::BooleanArray>()
.ok_or_else(|| Error::new(ErrorKind::DataInvalid, "Expected BooleanArray"))?;
Literal::bool(array.value(row_idx))
},
arrow_schema::DataType::Int32 => {
let array = column.as_any().downcast_ref::<arrow_array::Int32Array>()
.ok_or_else(|| Error::new(ErrorKind::DataInvalid, "Expected Int32Array"))?;
Literal::int(array.value(row_idx))
},
arrow_schema::DataType::Int64 => {
let array = column.as_any().downcast_ref::<arrow_array::Int64Array>()
.ok_or_else(|| Error::new(ErrorKind::DataInvalid, "Expected Int64Array"))?;
Literal::long(array.value(row_idx))
},
arrow_schema::DataType::Float32 => {
let array = column.as_any().downcast_ref::<arrow_array::Float32Array>()
.ok_or_else(|| Error::new(ErrorKind::DataInvalid, "Expected Float32Array"))?;
Literal::float(array.value(row_idx))
},
arrow_schema::DataType::Float64 => {
let array = column.as_any().downcast_ref::<arrow_array::Float64Array>()
.ok_or_else(|| Error::new(ErrorKind::DataInvalid, "Expected Float64Array"))?;
Literal::double(array.value(row_idx))
},
arrow_schema::DataType::Utf8 => {
let array = column.as_any().downcast_ref::<arrow_array::StringArray>()
.ok_or_else(|| Error::new(ErrorKind::DataInvalid, "Expected StringArray"))?;
Literal::string(array.value(row_idx))
},
arrow_schema::DataType::LargeUtf8 => {
let array = column.as_any().downcast_ref::<arrow_array::LargeStringArray>()
.ok_or_else(|| Error::new(ErrorKind::DataInvalid, "Expected LargeStringArray"))?;
Literal::string(array.value(row_idx))
},
arrow_schema::DataType::Binary => {
let array = column.as_any().downcast_ref::<arrow_array::BinaryArray>()
.ok_or_else(|| Error::new(ErrorKind::DataInvalid, "Expected BinaryArray"))?;
Literal::binary(array.value(row_idx).to_vec())
},
arrow_schema::DataType::LargeBinary => {
let array = column.as_any().downcast_ref::<arrow_array::LargeBinaryArray>()
.ok_or_else(|| Error::new(ErrorKind::DataInvalid, "Expected LargeBinaryArray"))?;
Literal::binary(array.value(row_idx).to_vec())
},
arrow_schema::DataType::Date32 => {
let array = column.as_any().downcast_ref::<arrow_array::Date32Array>()
.ok_or_else(|| Error::new(ErrorKind::DataInvalid, "Expected Date32Array"))?;
Literal::date(array.value(row_idx))
},
arrow_schema::DataType::Timestamp(_, _) => {
let array = column.as_any().downcast_ref::<arrow_array::TimestampMicrosecondArray>()
.ok_or_else(|| Error::new(ErrorKind::DataInvalid, "Expected TimestampMicrosecondArray"))?;
Literal::timestamp_micros(array.value(row_idx))
},
arrow_schema::DataType::Decimal128(precision, scale) => {
let array = column.as_any().downcast_ref::<arrow_array::Decimal128Array>()
.ok_or_else(|| Error::new(ErrorKind::DataInvalid, "Expected Decimal128Array"))?;
Literal::decimal_from_i128(array.value(row_idx), *precision as u32, *scale as u32)?
},
_ => {
return Err(Error::new(
ErrorKind::FeatureUnsupported,
format!("Unsupported data type for equality delete: {:?}", field.data_type()),
));
}
};

// Create equality condition: field_id = literal
let condition = Equal {
term: Box::new(Reference::new(field.name().to_string())),
literal: ExprLiteral::new(literal),
};

row_conditions.push(condition);
}

// If we have conditions for this row, combine them with AND
if !row_conditions.is_empty() {
let row_predicate = row_conditions.into_iter().reduce(|acc, condition| And {
left: Box::new(acc),
right: Box::new(condition),
}).unwrap();

combined_predicates.push(row_predicate);
}
}
}

// Combine all row predicates with OR (any matching row should be deleted)
if combined_predicates.is_empty() {
Ok(AlwaysFalse) // No rows to delete
} else {
let final_predicate = combined_predicates.into_iter().reduce(|acc, predicate| Or {
left: Box::new(acc),
right: Box::new(predicate),
}).unwrap();

Ok(final_predicate)
}
}

/// Check if a file is a Puffin file based on file extension or magic bytes
fn is_puffin_file(file_path: &str) -> bool {
file_path.ends_with(".puffin") || file_path.ends_with(".bin")
}

/// Load Delete Vectors from a Puffin file
async fn load_puffin_delete_vectors(
file_path: &str,
basic_delete_file_loader: &BasicDeleteFileLoader,
) -> Result<DeleteFileContext> {
use crate::puffin::{PuffinReader, DELETION_VECTOR_V1};

let input_file = basic_delete_file_loader.file_io().new_input(file_path)?;
let puffin_reader = PuffinReader::new(input_file);
let file_metadata = puffin_reader.file_metadata().await?;

let mut delete_vectors = HashMap::new();

// Process each blob in the Puffin file
for blob_metadata in file_metadata.blobs() {
if blob_metadata.blob_type() == DELETION_VECTOR_V1 {
let blob = puffin_reader.blob(blob_metadata).await?;
let delete_vector = Self::parse_delete_vector_blob(&blob)?;

// For now, we'll assume the delete vector applies to all files
// In a real implementation, we would need to determine which data files
// this delete vector applies to based on the blob metadata properties
if let Some(data_file_path) = blob.properties().get("data-file-path") {
delete_vectors.insert(data_file_path.clone(), delete_vector);
}
}
}

Ok(DeleteFileContext::DelVecs(delete_vectors))
}

Err(Error::new(
ErrorKind::FeatureUnsupported,
"parsing of equality deletes is not yet supported",
))
/// Parse a deletion vector blob from Puffin format into a DeleteVector
fn parse_delete_vector_blob(blob: &crate::puffin::Blob) -> Result<DeleteVector> {
use roaring::RoaringTreemap;

// According to the Iceberg spec, deletion vectors are stored as RoaringBitmap
// in the "portable" format for 64-bit implementations
let data = blob.data();

// Parse the RoaringTreemap from the blob data
let roaring_treemap = RoaringTreemap::deserialize_from(std::io::Cursor::new(data))
.map_err(|e| Error::new(
ErrorKind::DataInvalid,
format!("Failed to deserialize deletion vector: {}", e),
))?;

Ok(DeleteVector::new(roaring_treemap))
}
}

Expand Down
5 changes: 5 additions & 0 deletions crates/iceberg/src/arrow/delete_file_loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ impl BasicDeleteFileLoader {
pub fn new(file_io: FileIO) -> Self {
BasicDeleteFileLoader { file_io }
}

/// Get a reference to the FileIO instance
pub(crate) fn file_io(&self) -> &FileIO {
&self.file_io
}
/// Loads a RecordBatchStream for a given datafile.
pub(crate) async fn parquet_to_batch_stream(
&self,
Expand Down
4 changes: 2 additions & 2 deletions crates/iceberg/src/arrow/delete_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,8 @@ impl DeleteFilter {
return Ok(None);
}

// TODO: handle case-insensitive case
let bound_predicate = combined_predicate.bind(file_scan_task.schema.clone(), false)?;
// Use the case sensitivity setting from the file scan task
let bound_predicate = combined_predicate.bind(file_scan_task.schema.clone(), file_scan_task.case_sensitive)?;
Ok(Some(bound_predicate))
}

Expand Down
28 changes: 21 additions & 7 deletions crates/iceberg/src/delete_file_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,9 @@ struct PopulatedDeleteFileIndex {
global_deletes: Vec<Arc<DeleteFileContext>>,
eq_deletes_by_partition: HashMap<Struct, Vec<Arc<DeleteFileContext>>>,
pos_deletes_by_partition: HashMap<Struct, Vec<Arc<DeleteFileContext>>>,
deletion_vectors_by_partition: HashMap<Struct, Vec<Arc<DeleteFileContext>>>,
// TODO: do we need this?
// pos_deletes_by_path: HashMap<String, Vec<Arc<DeleteFileContext>>>,

// TODO: Deletion Vector support
}

impl DeleteFileIndex {
Expand Down Expand Up @@ -121,6 +120,8 @@ impl PopulatedDeleteFileIndex {
HashMap::default();
let mut pos_deletes_by_partition: HashMap<Struct, Vec<Arc<DeleteFileContext>>> =
HashMap::default();
let mut deletion_vectors_by_partition: HashMap<Struct, Vec<Arc<DeleteFileContext>>> =
HashMap::default();

let mut global_deletes: Vec<Arc<DeleteFileContext>> = vec![];

Expand All @@ -141,7 +142,10 @@ impl PopulatedDeleteFileIndex {
let destination_map = match arc_ctx.manifest_entry.content_type() {
DataContentType::PositionDeletes => &mut pos_deletes_by_partition,
DataContentType::EqualityDeletes => &mut eq_deletes_by_partition,
_ => unreachable!(),
// For Deletion Vectors, we would need a new DataContentType variant or
// detect them based on file format/metadata
// For now, we'll assume they go in the deletion_vectors_by_partition
_ => &mut deletion_vectors_by_partition,
};

destination_map
Expand All @@ -156,6 +160,7 @@ impl PopulatedDeleteFileIndex {
global_deletes,
eq_deletes_by_partition,
pos_deletes_by_partition,
deletion_vectors_by_partition,
}
}

Expand Down Expand Up @@ -189,10 +194,8 @@ impl PopulatedDeleteFileIndex {
.for_each(|delete| results.push(delete.as_ref().into()));
}

// TODO: the spec states that:
// "The data file's file_path is equal to the delete file's referenced_data_file if it is non-null".
// we're not yet doing that here. The referenced data file's name will also be present in the positional
// delete file's file path column.
// Process positional delete files
// The spec states that "The data file's file_path is equal to the delete file's referenced_data_file if it is non-null".
if let Some(deletes) = self.pos_deletes_by_partition.get(data_file.partition()) {
deletes
.iter()
Expand All @@ -202,6 +205,17 @@ impl PopulatedDeleteFileIndex {
.map(|seq_num| delete.manifest_entry.sequence_number() >= Some(seq_num))
.unwrap_or_else(|| true)
})
// filter by referenced_data_file if present
.filter(|&delete| {
// If the delete file has a referenced_data_file set, only apply it to matching data files
if let Some(ref_data_file) = delete.manifest_entry.data_file().referenced_data_file() {
data_file.file_path().to_string() == ref_data_file
} else {
// If no referenced_data_file is set, apply to all data files (as the referenced data file
// paths will be present in the delete file's content)
true
}
})
.for_each(|delete| results.push(delete.as_ref().into()));
}

Expand Down
Loading
Loading