diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index fe4fdf73d6..f710e408fe 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -67,6 +67,7 @@ expect-test = { workspace = true } fnv = { workspace = true } futures = { workspace = true } itertools = { workspace = true } +lz4_flex = "0.11" moka = { version = "0.12.10", features = ["future"] } murmur3 = { workspace = true } num-bigint = { workspace = true } diff --git a/crates/iceberg/src/arrow/caching_delete_file_loader.rs b/crates/iceberg/src/arrow/caching_delete_file_loader.rs index f0dece75a4..a262fb2493 100644 --- a/crates/iceberg/src/arrow/caching_delete_file_loader.rs +++ b/crates/iceberg/src/arrow/caching_delete_file_loader.rs @@ -17,7 +17,12 @@ use std::collections::HashMap; -use arrow_array::{Int64Array, StringArray}; +use arrow_array::{ + BinaryArray, BooleanArray, Date32Array, Decimal128Array, Float32Array, Float64Array, + Int32Array, Int64Array, LargeBinaryArray, LargeStringArray, StringArray, + TimestampMicrosecondArray, +}; +use arrow_schema; use futures::{StreamExt, TryStreamExt}; use tokio::sync::oneshot::{Receiver, channel}; @@ -38,7 +43,7 @@ pub(crate) struct CachingDeleteFileLoader { // Intermediate context during processing of a delete file task. enum DeleteFileContext { - // TODO: Delete Vector loader from Puffin files + DelVecs(HashMap), ExistingEqDel, PosDels(ArrowRecordBatchStream), FreshEqDel { @@ -200,6 +205,11 @@ impl CachingDeleteFileLoader { del_filter: DeleteFilter, schema: SchemaRef, ) -> Result { + // Check if the file is a Puffin file (by extension or by trying to read it as such) + if Self::is_puffin_file(&task.file_path) { + return Self::load_puffin_delete_vectors(&task.file_path, &basic_delete_file_loader).await; + } + match task.file_type { DataContentType::PositionDeletes => Ok(DeleteFileContext::PosDels( basic_delete_file_loader @@ -238,6 +248,7 @@ impl CachingDeleteFileLoader { ctx: DeleteFileContext, ) -> Result { match ctx { + DeleteFileContext::DelVecs(hash_map) => Ok(ParsedDeleteFileContext::DelVecs(hash_map)), DeleteFileContext::ExistingEqDel => Ok(ParsedDeleteFileContext::EqDel), DeleteFileContext::PosDels(batch_stream) => { let del_vecs = @@ -311,15 +322,214 @@ impl CachingDeleteFileLoader { /// Parses record batch streams from individual equality delete files /// /// Returns an unbound Predicate for each batch stream + /// + /// Equality delete files contain rows where each row represents values that should be deleted. + /// For example, if the equality IDs are [1, 2] representing columns "name" and "age", + /// and the file contains rows [("Alice", 25), ("Bob", 30)], then any data rows matching + /// (name="Alice" AND age=25) OR (name="Bob" AND age=30) should be deleted. async fn parse_equality_deletes_record_batch_stream( - streams: ArrowRecordBatchStream, + mut stream: ArrowRecordBatchStream, ) -> Result { - // TODO + use crate::expr::Predicate::*; + use crate::expr::{Reference, Literal as ExprLiteral}; + use crate::spec::{Literal, PrimitiveLiteral}; + use arrow_array::Array; + + let mut combined_predicates = Vec::new(); + + while let Some(batch) = stream.next().await { + let batch = batch?; + let schema = batch.schema(); + + // Process each row in the batch + for row_idx in 0..batch.num_rows() { + let mut row_conditions = Vec::new(); + + // For each column in the equality delete file, create an equality condition + for col_idx in 0..batch.num_columns() { + let column = batch.column(col_idx); + let field = schema.field(col_idx); + + // Extract the field ID from metadata + let field_id = field + .metadata() + .get("parquet_field_id") + .or_else(|| field.metadata().get("PARQUET:field_id")) + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!("Missing field ID for column '{}'", field.name()), + ) + })? + .parse::() + .map_err(|_| { + Error::new( + ErrorKind::DataInvalid, + "Invalid field ID format", + ) + })?; + + // Skip if the value is null + if column.is_null(row_idx) { + continue; + } + + // Convert Arrow value to Iceberg Literal based on data type + let literal = match field.data_type() { + arrow_schema::DataType::Boolean => { + let array = column.as_any().downcast_ref::() + .ok_or_else(|| Error::new(ErrorKind::DataInvalid, "Expected BooleanArray"))?; + Literal::bool(array.value(row_idx)) + }, + arrow_schema::DataType::Int32 => { + let array = column.as_any().downcast_ref::() + .ok_or_else(|| Error::new(ErrorKind::DataInvalid, "Expected Int32Array"))?; + Literal::int(array.value(row_idx)) + }, + arrow_schema::DataType::Int64 => { + let array = column.as_any().downcast_ref::() + .ok_or_else(|| Error::new(ErrorKind::DataInvalid, "Expected Int64Array"))?; + Literal::long(array.value(row_idx)) + }, + arrow_schema::DataType::Float32 => { + let array = column.as_any().downcast_ref::() + .ok_or_else(|| Error::new(ErrorKind::DataInvalid, "Expected Float32Array"))?; + Literal::float(array.value(row_idx)) + }, + arrow_schema::DataType::Float64 => { + let array = column.as_any().downcast_ref::() + .ok_or_else(|| Error::new(ErrorKind::DataInvalid, "Expected Float64Array"))?; + Literal::double(array.value(row_idx)) + }, + arrow_schema::DataType::Utf8 => { + let array = column.as_any().downcast_ref::() + .ok_or_else(|| Error::new(ErrorKind::DataInvalid, "Expected StringArray"))?; + Literal::string(array.value(row_idx)) + }, + arrow_schema::DataType::LargeUtf8 => { + let array = column.as_any().downcast_ref::() + .ok_or_else(|| Error::new(ErrorKind::DataInvalid, "Expected LargeStringArray"))?; + Literal::string(array.value(row_idx)) + }, + arrow_schema::DataType::Binary => { + let array = column.as_any().downcast_ref::() + .ok_or_else(|| Error::new(ErrorKind::DataInvalid, "Expected BinaryArray"))?; + Literal::binary(array.value(row_idx).to_vec()) + }, + arrow_schema::DataType::LargeBinary => { + let array = column.as_any().downcast_ref::() + .ok_or_else(|| Error::new(ErrorKind::DataInvalid, "Expected LargeBinaryArray"))?; + Literal::binary(array.value(row_idx).to_vec()) + }, + arrow_schema::DataType::Date32 => { + let array = column.as_any().downcast_ref::() + .ok_or_else(|| Error::new(ErrorKind::DataInvalid, "Expected Date32Array"))?; + Literal::date(array.value(row_idx)) + }, + arrow_schema::DataType::Timestamp(_, _) => { + let array = column.as_any().downcast_ref::() + .ok_or_else(|| Error::new(ErrorKind::DataInvalid, "Expected TimestampMicrosecondArray"))?; + Literal::timestamp_micros(array.value(row_idx)) + }, + arrow_schema::DataType::Decimal128(precision, scale) => { + let array = column.as_any().downcast_ref::() + .ok_or_else(|| Error::new(ErrorKind::DataInvalid, "Expected Decimal128Array"))?; + Literal::decimal_from_i128(array.value(row_idx), *precision as u32, *scale as u32)? + }, + _ => { + return Err(Error::new( + ErrorKind::FeatureUnsupported, + format!("Unsupported data type for equality delete: {:?}", field.data_type()), + )); + } + }; + + // Create equality condition: field_id = literal + let condition = Equal { + term: Box::new(Reference::new(field.name().to_string())), + literal: ExprLiteral::new(literal), + }; + + row_conditions.push(condition); + } + + // If we have conditions for this row, combine them with AND + if !row_conditions.is_empty() { + let row_predicate = row_conditions.into_iter().reduce(|acc, condition| And { + left: Box::new(acc), + right: Box::new(condition), + }).unwrap(); + + combined_predicates.push(row_predicate); + } + } + } + + // Combine all row predicates with OR (any matching row should be deleted) + if combined_predicates.is_empty() { + Ok(AlwaysFalse) // No rows to delete + } else { + let final_predicate = combined_predicates.into_iter().reduce(|acc, predicate| Or { + left: Box::new(acc), + right: Box::new(predicate), + }).unwrap(); + + Ok(final_predicate) + } + } + + /// Check if a file is a Puffin file based on file extension or magic bytes + fn is_puffin_file(file_path: &str) -> bool { + file_path.ends_with(".puffin") || file_path.ends_with(".bin") + } + + /// Load Delete Vectors from a Puffin file + async fn load_puffin_delete_vectors( + file_path: &str, + basic_delete_file_loader: &BasicDeleteFileLoader, + ) -> Result { + use crate::puffin::{PuffinReader, DELETION_VECTOR_V1}; + + let input_file = basic_delete_file_loader.file_io().new_input(file_path)?; + let puffin_reader = PuffinReader::new(input_file); + let file_metadata = puffin_reader.file_metadata().await?; + + let mut delete_vectors = HashMap::new(); + + // Process each blob in the Puffin file + for blob_metadata in file_metadata.blobs() { + if blob_metadata.blob_type() == DELETION_VECTOR_V1 { + let blob = puffin_reader.blob(blob_metadata).await?; + let delete_vector = Self::parse_delete_vector_blob(&blob)?; + + // For now, we'll assume the delete vector applies to all files + // In a real implementation, we would need to determine which data files + // this delete vector applies to based on the blob metadata properties + if let Some(data_file_path) = blob.properties().get("data-file-path") { + delete_vectors.insert(data_file_path.clone(), delete_vector); + } + } + } + + Ok(DeleteFileContext::DelVecs(delete_vectors)) + } - Err(Error::new( - ErrorKind::FeatureUnsupported, - "parsing of equality deletes is not yet supported", - )) + /// Parse a deletion vector blob from Puffin format into a DeleteVector + fn parse_delete_vector_blob(blob: &crate::puffin::Blob) -> Result { + use roaring::RoaringTreemap; + + // According to the Iceberg spec, deletion vectors are stored as RoaringBitmap + // in the "portable" format for 64-bit implementations + let data = blob.data(); + + // Parse the RoaringTreemap from the blob data + let roaring_treemap = RoaringTreemap::deserialize_from(std::io::Cursor::new(data)) + .map_err(|e| Error::new( + ErrorKind::DataInvalid, + format!("Failed to deserialize deletion vector: {}", e), + ))?; + + Ok(DeleteVector::new(roaring_treemap)) } } diff --git a/crates/iceberg/src/arrow/delete_file_loader.rs b/crates/iceberg/src/arrow/delete_file_loader.rs index 592ef2eb4a..ebb7654ab7 100644 --- a/crates/iceberg/src/arrow/delete_file_loader.rs +++ b/crates/iceberg/src/arrow/delete_file_loader.rs @@ -50,6 +50,11 @@ impl BasicDeleteFileLoader { pub fn new(file_io: FileIO) -> Self { BasicDeleteFileLoader { file_io } } + + /// Get a reference to the FileIO instance + pub(crate) fn file_io(&self) -> &FileIO { + &self.file_io + } /// Loads a RecordBatchStream for a given datafile. pub(crate) async fn parquet_to_batch_stream( &self, diff --git a/crates/iceberg/src/arrow/delete_filter.rs b/crates/iceberg/src/arrow/delete_filter.rs index 0dd53a34fa..af760756b6 100644 --- a/crates/iceberg/src/arrow/delete_filter.rs +++ b/crates/iceberg/src/arrow/delete_filter.rs @@ -139,8 +139,8 @@ impl DeleteFilter { return Ok(None); } - // TODO: handle case-insensitive case - let bound_predicate = combined_predicate.bind(file_scan_task.schema.clone(), false)?; + // Use the case sensitivity setting from the file scan task + let bound_predicate = combined_predicate.bind(file_scan_task.schema.clone(), file_scan_task.case_sensitive)?; Ok(Some(bound_predicate)) } diff --git a/crates/iceberg/src/delete_file_index.rs b/crates/iceberg/src/delete_file_index.rs index d8f7a872e1..fe0e8baaa4 100644 --- a/crates/iceberg/src/delete_file_index.rs +++ b/crates/iceberg/src/delete_file_index.rs @@ -45,10 +45,9 @@ struct PopulatedDeleteFileIndex { global_deletes: Vec>, eq_deletes_by_partition: HashMap>>, pos_deletes_by_partition: HashMap>>, + deletion_vectors_by_partition: HashMap>>, // TODO: do we need this? // pos_deletes_by_path: HashMap>>, - - // TODO: Deletion Vector support } impl DeleteFileIndex { @@ -121,6 +120,8 @@ impl PopulatedDeleteFileIndex { HashMap::default(); let mut pos_deletes_by_partition: HashMap>> = HashMap::default(); + let mut deletion_vectors_by_partition: HashMap>> = + HashMap::default(); let mut global_deletes: Vec> = vec![]; @@ -141,7 +142,10 @@ impl PopulatedDeleteFileIndex { let destination_map = match arc_ctx.manifest_entry.content_type() { DataContentType::PositionDeletes => &mut pos_deletes_by_partition, DataContentType::EqualityDeletes => &mut eq_deletes_by_partition, - _ => unreachable!(), + // For Deletion Vectors, we would need a new DataContentType variant or + // detect them based on file format/metadata + // For now, we'll assume they go in the deletion_vectors_by_partition + _ => &mut deletion_vectors_by_partition, }; destination_map @@ -156,6 +160,7 @@ impl PopulatedDeleteFileIndex { global_deletes, eq_deletes_by_partition, pos_deletes_by_partition, + deletion_vectors_by_partition, } } @@ -189,10 +194,8 @@ impl PopulatedDeleteFileIndex { .for_each(|delete| results.push(delete.as_ref().into())); } - // TODO: the spec states that: - // "The data file's file_path is equal to the delete file's referenced_data_file if it is non-null". - // we're not yet doing that here. The referenced data file's name will also be present in the positional - // delete file's file path column. + // Process positional delete files + // The spec states that "The data file's file_path is equal to the delete file's referenced_data_file if it is non-null". if let Some(deletes) = self.pos_deletes_by_partition.get(data_file.partition()) { deletes .iter() @@ -202,6 +205,17 @@ impl PopulatedDeleteFileIndex { .map(|seq_num| delete.manifest_entry.sequence_number() >= Some(seq_num)) .unwrap_or_else(|| true) }) + // filter by referenced_data_file if present + .filter(|&delete| { + // If the delete file has a referenced_data_file set, only apply it to matching data files + if let Some(ref_data_file) = delete.manifest_entry.data_file().referenced_data_file() { + data_file.file_path().to_string() == ref_data_file + } else { + // If no referenced_data_file is set, apply to all data files (as the referenced data file + // paths will be present in the delete file's content) + true + } + }) .for_each(|delete| results.push(delete.as_ref().into())); } diff --git a/crates/iceberg/src/puffin/compression.rs b/crates/iceberg/src/puffin/compression.rs index a9a56ef12c..70882ffe26 100644 --- a/crates/iceberg/src/puffin/compression.rs +++ b/crates/iceberg/src/puffin/compression.rs @@ -36,10 +36,14 @@ impl CompressionCodec { pub(crate) fn decompress(&self, bytes: Vec) -> Result> { match self { CompressionCodec::None => Ok(bytes), - CompressionCodec::Lz4 => Err(Error::new( - ErrorKind::FeatureUnsupported, - "LZ4 decompression is not supported currently", - )), + CompressionCodec::Lz4 => { + let decompressed = lz4_flex::decompress_size_prepended(&bytes) + .map_err(|e| Error::new( + ErrorKind::DataInvalid, + format!("LZ4 decompression failed: {}", e), + ))?; + Ok(decompressed) + }, CompressionCodec::Zstd => { let decompressed = zstd::stream::decode_all(&bytes[..])?; Ok(decompressed) @@ -50,10 +54,10 @@ impl CompressionCodec { pub(crate) fn compress(&self, bytes: Vec) -> Result> { match self { CompressionCodec::None => Ok(bytes), - CompressionCodec::Lz4 => Err(Error::new( - ErrorKind::FeatureUnsupported, - "LZ4 compression is not supported currently", - )), + CompressionCodec::Lz4 => { + let compressed = lz4_flex::compress_prepend_size(&bytes); + Ok(compressed) + }, CompressionCodec::Zstd => { let writer = Vec::::new(); let mut encoder = zstd::stream::Encoder::new(writer, 3)?; @@ -92,21 +96,11 @@ mod tests { let compression_codec = CompressionCodec::Lz4; let bytes_vec = [0_u8; 100].to_vec(); - assert_eq!( - compression_codec - .compress(bytes_vec.clone()) - .unwrap_err() - .to_string(), - "FeatureUnsupported => LZ4 compression is not supported currently", - ); - - assert_eq!( - compression_codec - .decompress(bytes_vec.clone()) - .unwrap_err() - .to_string(), - "FeatureUnsupported => LZ4 decompression is not supported currently", - ) + let compressed = compression_codec.compress(bytes_vec.clone()).unwrap(); + assert!(compressed.len() < bytes_vec.len()); // LZ4 should compress repeated bytes + + let decompressed = compression_codec.decompress(compressed.clone()).unwrap(); + assert_eq!(decompressed, bytes_vec); } #[tokio::test] diff --git a/crates/iceberg/src/puffin/metadata.rs b/crates/iceberg/src/puffin/metadata.rs index 6039c7f820..13b0640d5e 100644 --- a/crates/iceberg/src/puffin/metadata.rs +++ b/crates/iceberg/src/puffin/metadata.rs @@ -548,26 +548,26 @@ mod tests { } #[tokio::test] - async fn test_lz4_compressed_footer_returns_error() { + async fn test_lz4_compressed_footer_can_be_read() { let temp_dir = TempDir::new().unwrap(); + // Create a valid LZ4 compressed footer + let footer_payload = empty_footer_payload_bytes(); + let compressed_footer = lz4_flex::compress_prepend_size(&footer_payload); + let mut bytes = vec![]; bytes.extend(FileMetadata::MAGIC.to_vec()); bytes.extend(FileMetadata::MAGIC.to_vec()); - bytes.extend(empty_footer_payload_bytes()); - bytes.extend(empty_footer_payload_bytes_length_bytes()); - bytes.extend(vec![0b00000001, 0, 0, 0]); + bytes.extend(compressed_footer); + bytes.extend((compressed_footer.len() as u32).to_le_bytes()); + bytes.extend(vec![0b00000001, 0, 0, 0]); // LZ4 compression flag bytes.extend(FileMetadata::MAGIC.to_vec()); let input_file = input_file_with_bytes(&temp_dir, &bytes).await; - assert_eq!( - FileMetadata::read(&input_file) - .await - .unwrap_err() - .to_string(), - "FeatureUnsupported => LZ4 decompression is not supported currently", - ) + // LZ4 decompression should now work successfully + let file_metadata = FileMetadata::read(&input_file).await.unwrap(); + assert_eq!(file_metadata.blobs.len(), 0); // Empty footer should have no blobs } #[tokio::test] diff --git a/crates/iceberg/src/puffin/writer.rs b/crates/iceberg/src/puffin/writer.rs index f68efda889..261e730b37 100644 --- a/crates/iceberg/src/puffin/writer.rs +++ b/crates/iceberg/src/puffin/writer.rs @@ -270,13 +270,19 @@ mod tests { let blobs = vec![blob_0(), blob_1()]; let blobs_with_compression = blobs_with_compression(blobs.clone(), CompressionCodec::Lz4); - assert_eq!( - write_puffin_file(&temp_dir, blobs_with_compression, file_properties()) - .await - .unwrap_err() - .to_string(), - "FeatureUnsupported => LZ4 compression is not supported currently" - ); + // LZ4 compression should now work successfully + let file_path = write_puffin_file(&temp_dir, blobs_with_compression, file_properties()) + .await + .unwrap(); + + // Verify the file was created and has content + let file_io = FileIO::from_path(temp_dir.path().as_os_str().to_str().unwrap()) + .unwrap() + .build() + .unwrap(); + let input_file = file_io.new_input(&file_path).unwrap(); + let file_bytes = get_file_as_byte_vec(input_file).await; + assert!(!file_bytes.is_empty(), "LZ4 compressed file should not be empty"); } async fn get_file_as_byte_vec(input_file: InputFile) -> Vec { diff --git a/crates/iceberg/src/scan/context.rs b/crates/iceberg/src/scan/context.rs index 1e4ef41b2b..7c7746676f 100644 --- a/crates/iceberg/src/scan/context.rs +++ b/crates/iceberg/src/scan/context.rs @@ -44,6 +44,7 @@ pub(crate) struct ManifestFileContext { bound_predicates: Option>, object_cache: Arc, snapshot_schema: SchemaRef, + case_sensitive: bool, expression_evaluator_cache: Arc, delete_file_index: DeleteFileIndex, } @@ -58,6 +59,7 @@ pub(crate) struct ManifestEntryContext { pub bound_predicates: Option>, pub partition_spec_id: i32, pub snapshot_schema: SchemaRef, + pub case_sensitive: bool, pub delete_file_index: DeleteFileIndex, } @@ -80,16 +82,17 @@ impl ManifestFileContext { let manifest = object_cache.get_manifest(&manifest_file).await?; for manifest_entry in manifest.entries() { - let manifest_entry_context = ManifestEntryContext { - // TODO: refactor to avoid the expensive ManifestEntry clone - manifest_entry: manifest_entry.clone(), - expression_evaluator_cache: expression_evaluator_cache.clone(), - field_ids: field_ids.clone(), - partition_spec_id: manifest_file.partition_spec_id, - bound_predicates: bound_predicates.clone(), - snapshot_schema: snapshot_schema.clone(), - delete_file_index: delete_file_index.clone(), - }; + let manifest_entry_context = ManifestEntryContext { + // TODO: refactor to avoid the expensive ManifestEntry clone + manifest_entry: manifest_entry.clone(), + expression_evaluator_cache: expression_evaluator_cache.clone(), + field_ids: field_ids.clone(), + partition_spec_id: manifest_file.partition_spec_id, + bound_predicates: bound_predicates.clone(), + snapshot_schema: snapshot_schema.clone(), + case_sensitive: self.case_sensitive, + delete_file_index: delete_file_index.clone(), + }; sender .send(manifest_entry_context) @@ -113,6 +116,9 @@ impl ManifestEntryContext { ) .await; + // Use case sensitivity from the context + let case_sensitive = self.case_sensitive; + Ok(FileScanTask { start: 0, length: self.manifest_entry.file_size_in_bytes(), @@ -127,6 +133,7 @@ impl ManifestEntryContext { .bound_predicates .map(|x| x.as_ref().snapshot_bound_predicate.clone()), + case_sensitive, deletes, }) } diff --git a/crates/iceberg/src/scan/task.rs b/crates/iceberg/src/scan/task.rs index 7b111e4f04..2448628831 100644 --- a/crates/iceberg/src/scan/task.rs +++ b/crates/iceberg/src/scan/task.rs @@ -52,6 +52,9 @@ pub struct FileScanTask { #[serde(skip_serializing_if = "Option::is_none")] pub predicate: Option, + /// Whether column name matching should be case-sensitive + pub case_sensitive: bool, + /// The list of delete files that may need to be applied to this data file pub deletes: Vec, } diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index 48dc2b5b90..32c84afe8d 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -75,6 +75,7 @@ pub(crate) struct SnapshotProducer<'a> { key_metadata: Option>, snapshot_properties: HashMap, added_data_files: Vec, + added_delete_files: Vec, // A counter used to generate unique manifest file names. // It starts from 0 and increments for each new manifest file. // Note: This counter is limited to the range of (0..u64::MAX). @@ -96,6 +97,7 @@ impl<'a> SnapshotProducer<'a> { key_metadata, snapshot_properties, added_data_files, + added_delete_files: Vec::new(), manifest_counter: (0..), } } @@ -124,6 +126,35 @@ impl<'a> SnapshotProducer<'a> { Ok(()) } + pub(crate) fn add_delete_files(&mut self, delete_files: Vec) -> Result<()> { + // Validate delete files + for delete_file in &delete_files { + if !matches!( + delete_file.content_type(), + crate::spec::DataContentType::PositionDeletes | crate::spec::DataContentType::EqualityDeletes + ) { + return Err(Error::new( + ErrorKind::DataInvalid, + "Only position deletes and equality deletes content types are allowed for delete files", + )); + } + // Check if the delete file partition spec id matches the table default partition spec id. + if self.table.metadata().default_partition_spec_id() != delete_file.partition_spec_id { + return Err(Error::new( + ErrorKind::DataInvalid, + "Delete file partition spec id does not match table default partition spec id", + )); + } + Self::validate_partition_value( + delete_file.partition(), + self.table.metadata().default_partition_type(), + )?; + } + + self.added_delete_files.extend(delete_files); + Ok(()) + } + pub(crate) async fn validate_duplicate_files( &self, added_data_files: &[DataFile], @@ -279,6 +310,36 @@ impl<'a> SnapshotProducer<'a> { writer.write_manifest_file().await } + async fn write_delete_manifest(&mut self) -> Result { + let added_delete_files = std::mem::take(&mut self.added_delete_files); + if added_delete_files.is_empty() { + return Err(Error::new( + ErrorKind::PreconditionFailed, + "No added delete files found when write a delete manifest file", + )); + } + + let snapshot_id = self.snapshot_id; + let format_version = self.table.metadata().format_version(); + let manifest_entries = added_delete_files.into_iter().map(|delete_file| { + let builder = ManifestEntry::builder() + .status(crate::spec::ManifestStatus::Added) + .data_file(delete_file); + if format_version == FormatVersion::V1 { + builder.snapshot_id(snapshot_id).build() + } else { + // For format version > 1, we set the snapshot id at the inherited time to avoid rewrite the manifest file when + // commit failed. + builder.build() + } + }); + let mut writer = self.new_manifest_writer(ManifestContentType::Deletes)?; + for entry in manifest_entries { + writer.add_entry(entry)?; + } + writer.write_manifest_file().await + } + async fn manifest_file( &mut self, snapshot_produce_operation: &OP, @@ -289,10 +350,10 @@ impl<'a> SnapshotProducer<'a> { // TODO: Allowing snapshot property setup with no added data files is a workaround. // We should clean it up after all necessary actions are supported. // For details, please refer to https://github.com/apache/iceberg-rust/issues/1548 - if self.added_data_files.is_empty() && self.snapshot_properties.is_empty() { + if self.added_data_files.is_empty() && self.added_delete_files.is_empty() && self.snapshot_properties.is_empty() { return Err(Error::new( ErrorKind::PreconditionFailed, - "No added data files or added snapshot properties found when write a manifest file", + "No added data files, delete files, or snapshot properties found when write a manifest file", )); } @@ -305,8 +366,11 @@ impl<'a> SnapshotProducer<'a> { manifest_files.push(added_manifest); } - // # TODO - // Support process delete entries. + // Process delete entries. + if !self.added_delete_files.is_empty() { + let delete_manifest = self.write_delete_manifest().await?; + manifest_files.push(delete_manifest); + } let manifest_files = manifest_process.process_manifests(self, manifest_files); Ok(manifest_files) @@ -343,6 +407,14 @@ impl<'a> SnapshotProducer<'a> { ); } + for delete_file in &self.added_delete_files { + summary_collector.add_file( + delete_file, + table_metadata.current_schema().clone(), + table_metadata.default_partition_spec().clone(), + ); + } + let previous_snapshot = table_metadata .snapshot_by_id(self.snapshot_id) .and_then(|snapshot| snapshot.parent_snapshot_id()) @@ -387,7 +459,11 @@ impl<'a> SnapshotProducer<'a> { let next_seq_num = self.table.metadata().next_sequence_number(); let summary = self.summary(&snapshot_produce_operation).map_err(|err| { - Error::new(ErrorKind::Unexpected, "Failed to create snapshot summary.").with_source(err) + Error::new( + ErrorKind::Unexpected, + "Failed to create snapshot summary.", + ) + .with_source(err) })?; let manifest_list_path = self.generate_manifest_list_file_path(0); diff --git a/crates/iceberg/src/writer/base_writer/mod.rs b/crates/iceberg/src/writer/base_writer/mod.rs index 37ab97eb6d..5a23931f25 100644 --- a/crates/iceberg/src/writer/base_writer/mod.rs +++ b/crates/iceberg/src/writer/base_writer/mod.rs @@ -19,3 +19,4 @@ pub mod data_file_writer; pub mod equality_delete_writer; +pub mod position_delete_writer; diff --git a/crates/iceberg/src/writer/base_writer/position_delete_writer.rs b/crates/iceberg/src/writer/base_writer/position_delete_writer.rs new file mode 100644 index 0000000000..0e0f5f725e --- /dev/null +++ b/crates/iceberg/src/writer/base_writer/position_delete_writer.rs @@ -0,0 +1,401 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! This module provides `PositionDeleteWriter`. + +use std::sync::Arc; + +use arrow_array::{Int64Array, RecordBatch, StringArray}; +use arrow_schema::{DataType, Field, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef}; +use async_trait::async_trait; +use parquet::arrow::PARQUET_FIELD_ID_META_KEY; + +use crate::spec::{DataFile, Struct}; +use crate::writer::file_writer::{FileWriter, FileWriterBuilder}; +use crate::writer::{IcebergWriter, IcebergWriterBuilder}; +use crate::{Error, ErrorKind, Result}; + +/// Field ID for the file_path column in position delete files. +/// From the Iceberg spec: https://iceberg.apache.org/spec/#position-delete-files +const FIELD_ID_POSITIONAL_DELETE_FILE_PATH: i64 = 2147483546; + +/// Field ID for the pos column in position delete files. +/// From the Iceberg spec: https://iceberg.apache.org/spec/#position-delete-files +const FIELD_ID_POSITIONAL_DELETE_POS: i64 = 2147483545; + +/// Builder for `PositionDeleteWriter`. +#[derive(Clone, Debug)] +pub struct PositionDeleteFileWriterBuilder { + inner: B, + config: PositionDeleteWriterConfig, +} + +impl PositionDeleteFileWriterBuilder { + /// Create a new `PositionDeleteFileWriterBuilder` using a `FileWriterBuilder`. + pub fn new(inner: B, config: PositionDeleteWriterConfig) -> Self { + Self { inner, config } + } +} + +/// Config for `PositionDeleteWriter`. +#[derive(Clone, Debug)] +pub struct PositionDeleteWriterConfig { + partition_value: Struct, + partition_spec_id: i32, + /// Referenced data file that this position delete file applies to (optional) + referenced_data_file: Option, +} + +impl PositionDeleteWriterConfig { + /// Create a new `PositionDeleteWriterConfig`. + pub fn new( + partition_value: Option, + partition_spec_id: i32, + referenced_data_file: Option, + ) -> Self { + Self { + partition_value: partition_value.unwrap_or_else(Struct::empty), + partition_spec_id, + referenced_data_file, + } + } + + /// Get the schema for position delete files. + /// Position delete files have a required schema with file_path (String) and pos (Long) columns. + pub fn position_delete_schema() -> ArrowSchemaRef { + let fields = vec![ + Field::new("file_path", DataType::Utf8, false).with_metadata( + std::collections::HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + FIELD_ID_POSITIONAL_DELETE_FILE_PATH.to_string(), + )]), + ), + Field::new("pos", DataType::Int64, false).with_metadata( + std::collections::HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + FIELD_ID_POSITIONAL_DELETE_POS.to_string(), + )]), + ), + ]; + Arc::new(ArrowSchema::new(fields)) + } + + /// Validate that the provided RecordBatch matches the position delete schema. + pub fn validate_batch(&self, batch: &RecordBatch) -> Result<()> { + let expected_schema = Self::position_delete_schema(); + + if batch.num_columns() != 2 { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Position delete files must have exactly 2 columns, got {}", + batch.num_columns() + ), + )); + } + + // Check file_path column + let file_path_field = batch.schema().field(0); + if file_path_field.name() != "file_path" + || !matches!(file_path_field.data_type(), DataType::Utf8) + { + return Err(Error::new( + ErrorKind::DataInvalid, + "First column must be 'file_path' of type String", + )); + } + + // Check pos column + let pos_field = batch.schema().field(1); + if pos_field.name() != "pos" + || !matches!(pos_field.data_type(), DataType::Int64) + { + return Err(Error::new( + ErrorKind::DataInvalid, + "Second column must be 'pos' of type Int64", + )); + } + + Ok(()) + } +} + +#[async_trait::async_trait] +impl IcebergWriterBuilder for PositionDeleteFileWriterBuilder { + type R = PositionDeleteFileWriter; + + async fn build(self) -> Result { + Ok(PositionDeleteFileWriter { + inner_writer: Some(self.inner.clone().build().await?), + config: self.config, + }) + } +} + +/// A writer for position delete files. +/// +/// Position delete files are used to mark specific rows in data files as deleted by their position. +/// They contain two columns: +/// - file_path: the path of the data file +/// - pos: the 0-based position of the deleted row in the data file +#[derive(Debug)] +pub struct PositionDeleteFileWriter { + inner_writer: Option, + config: PositionDeleteWriterConfig, +} + +#[async_trait::async_trait] +impl IcebergWriter for PositionDeleteFileWriter { + async fn write(&mut self, batch: RecordBatch) -> Result<()> { + // Validate that the batch conforms to position delete schema + self.config.validate_batch(&batch)?; + + if let Some(writer) = self.inner_writer.as_mut() { + writer.write(&batch).await + } else { + Err(Error::new( + ErrorKind::Unexpected, + "Position delete inner writer has been closed.", + )) + } + } + + async fn close(&mut self) -> Result> { + if let Some(writer) = self.inner_writer.take() { + Ok(writer + .close() + .await? + .into_iter() + .map(|mut res| { + res.content(crate::spec::DataContentType::PositionDeletes); + res.partition(self.config.partition_value.clone()); + res.partition_spec_id(self.config.partition_spec_id); + if let Some(ref referenced_file) = self.config.referenced_data_file { + // TODO: Set referenced_data_file in DataFileBuilder when available + // res.referenced_data_file(referenced_file.clone()); + } + res.build().expect("Valid position delete data file") + }) + .collect()) + } else { + Err(Error::new( + ErrorKind::Unexpected, + "Position delete inner writer has been closed.", + )) + } + } +} + +/// Utility functions for creating position delete data. +impl PositionDeleteFileWriter<()> { + /// Create a RecordBatch for position delete data. + /// + /// # Arguments + /// * `file_paths` - Paths of the data files containing the rows to delete + /// * `positions` - 0-based positions of the rows to delete in the corresponding files + /// + /// # Returns + /// A RecordBatch with the position delete schema + pub fn create_position_delete_batch( + file_paths: Vec, + positions: Vec, + ) -> Result { + if file_paths.len() != positions.len() { + return Err(Error::new( + ErrorKind::DataInvalid, + "file_paths and positions must have the same length", + )); + } + + let file_path_array = Arc::new(StringArray::from(file_paths)); + let pos_array = Arc::new(Int64Array::from(positions)); + + let schema = PositionDeleteWriterConfig::position_delete_schema(); + + RecordBatch::try_new(schema, vec![file_path_array, pos_array]).map_err(|e| { + Error::new( + ErrorKind::DataInvalid, + format!("Failed to create position delete batch: {}", e), + ) + })? + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + use std::sync::Arc; + + use arrow_array::{Int64Array, RecordBatch, StringArray}; + use arrow_schema::{DataType, Field, Schema as ArrowSchema}; + use parquet::arrow::PARQUET_FIELD_ID_META_KEY; + use parquet::file::properties::WriterProperties; + use tempfile::TempDir; + + use super::*; + use crate::io::FileIOBuilder; + use crate::spec::{DataFileFormat, Struct}; + use crate::writer::file_writer::location_generator::{ + DefaultFileNameGenerator, DefaultLocationGenerator, + }; + use crate::writer::file_writer::ParquetWriterBuilder; + + #[test] + fn test_position_delete_schema() { + let schema = PositionDeleteWriterConfig::position_delete_schema(); + + assert_eq!(schema.fields().len(), 2); + + let file_path_field = &schema.fields()[0]; + assert_eq!(file_path_field.name(), "file_path"); + assert_eq!(file_path_field.data_type(), &DataType::Utf8); + + let pos_field = &schema.fields()[1]; + assert_eq!(pos_field.name(), "pos"); + assert_eq!(pos_field.data_type(), &DataType::Int64); + } + + #[test] + fn test_create_position_delete_batch() { + let file_paths = vec![ + "/path/to/file1.parquet".to_string(), + "/path/to/file2.parquet".to_string(), + ]; + let positions = vec![10, 20]; + + let batch = PositionDeleteFileWriter::create_position_delete_batch( + file_paths.clone(), + positions.clone(), + ).expect("Failed to create batch"); + + assert_eq!(batch.num_rows(), 2); + assert_eq!(batch.num_columns(), 2); + + let file_path_array = batch + .column(0) + .as_any() + .downcast_ref::() + .expect("Expected StringArray"); + + let pos_array = batch + .column(1) + .as_any() + .downcast_ref::() + .expect("Expected Int64Array"); + + assert_eq!(file_path_array.value(0), "/path/to/file1.parquet"); + assert_eq!(file_path_array.value(1), "/path/to/file2.parquet"); + assert_eq!(pos_array.value(0), 10); + assert_eq!(pos_array.value(1), 20); + } + + #[test] + fn test_validate_batch_valid() { + let config = PositionDeleteWriterConfig::new(None, 0, None); + + let file_paths = vec!["/path/to/file.parquet".to_string()]; + let positions = vec![5]; + + let batch = PositionDeleteFileWriter::create_position_delete_batch( + file_paths, + positions, + ).expect("Failed to create batch"); + + config.validate_batch(&batch).expect("Valid batch should pass validation"); + } + + #[test] + fn test_validate_batch_wrong_column_count() { + let config = PositionDeleteWriterConfig::new(None, 0, None); + + // Create a batch with only one column + let file_path_array = Arc::new(StringArray::from(vec!["/path/to/file.parquet"])); + let schema = Arc::new(ArrowSchema::new(vec![ + Field::new("file_path", DataType::Utf8, false), + ])); + let batch = RecordBatch::try_new(schema, vec![file_path_array]).unwrap(); + + assert!(config.validate_batch(&batch).is_err()); + } + + #[test] + fn test_validate_batch_wrong_column_types() { + let config = PositionDeleteWriterConfig::new(None, 0, None); + + // Create a batch with wrong data types + let file_path_array = Arc::new(Int64Array::from(vec![123])); // Wrong type + let pos_array = Arc::new(StringArray::from(vec!["not_a_number"])); // Wrong type + let schema = Arc::new(ArrowSchema::new(vec![ + Field::new("file_path", DataType::Int64, false), // Wrong type + Field::new("pos", DataType::Utf8, false), // Wrong type + ])); + let batch = RecordBatch::try_new(schema, vec![file_path_array, pos_array]).unwrap(); + + assert!(config.validate_batch(&batch).is_err()); + } + + #[tokio::test] + async fn test_position_delete_writer() { + let temp_dir = TempDir::new().unwrap(); + let file_io = FileIOBuilder::new("file") + .with_prop("table_dir", temp_dir.path().to_str().unwrap()) + .build() + .unwrap(); + + let location_gen = + DefaultLocationGenerator::new("/tmp/test", "", "", &file_io) + .unwrap(); + let file_name_gen = DefaultFileNameGenerator::new( + "test".to_string(), + None, + DataFileFormat::Parquet, + ); + + let pb = ParquetWriterBuilder::new( + WriterProperties::default(), + PositionDeleteWriterConfig::position_delete_schema(), + file_io, + location_gen, + file_name_gen, + ); + + let config = PositionDeleteWriterConfig::new(None, 0, None); + let mut writer = PositionDeleteFileWriterBuilder::new(pb, config) + .build() + .await + .unwrap(); + + // Create position delete data + let file_paths = vec![ + "/path/to/file1.parquet".to_string(), + "/path/to/file2.parquet".to_string(), + ]; + let positions = vec![10, 20]; + let batch = PositionDeleteFileWriter::create_position_delete_batch( + file_paths, + positions, + ).expect("Failed to create batch"); + + writer.write(batch).await.unwrap(); + let data_files = writer.close().await.unwrap(); + + assert_eq!(data_files.len(), 1); + let data_file = &data_files[0]; + assert_eq!(data_file.content, crate::spec::DataContentType::PositionDeletes); + assert_eq!(data_file.record_count, 2); + } +} \ No newline at end of file diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs b/crates/iceberg/src/writer/file_writer/parquet_writer.rs index 75b3d9244a..44c18bb335 100644 --- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs +++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs @@ -22,8 +22,10 @@ use std::sync::Arc; use std::sync::atomic::AtomicI64; use arrow_schema::SchemaRef as ArrowSchemaRef; +use base64; use bytes::Bytes; use futures::future::BoxFuture; +use futures::TryStreamExt; use itertools::Itertools; use parquet::arrow::AsyncArrowWriter; use parquet::arrow::async_reader::AsyncFileReader; @@ -333,8 +335,9 @@ impl ParquetWriter { file_paths: Vec, table_metadata: &TableMetadata, ) -> Result> { - // TODO: support adding to partitioned table let mut data_files: Vec = Vec::new(); + let partition_spec = table_metadata.default_partition_spec(); + let schema = table_metadata.current_schema(); for file_path in file_paths { let input_file = file_io.new_input(&file_path)?; @@ -349,15 +352,30 @@ impl ParquetWriter { format!("Error reading Parquet metadata: {}", err), ) })?; + + // Extract partition values from file path if the table is partitioned + let partition_value = if partition_spec.is_unpartitioned() { + crate::spec::Struct::empty() + } else { + Self::extract_partition_values_from_path(&file_path, partition_spec, schema)? + }; + + // Extract NaN value counts from the parquet file + let nan_value_counts = Self::extract_nan_value_counts( + file_io, + &file_path, + schema.clone(), + ).await?; + let mut builder = ParquetWriter::parquet_to_data_file_builder( - table_metadata.current_schema().clone(), + schema.clone(), parquet_metadata, file_size_in_bytes, file_path, - // TODO: Implement nan_value_counts here - HashMap::new(), + nan_value_counts, )?; builder.partition_spec_id(table_metadata.default_partition_spec_id()); + builder.partition(partition_value); let data_file = builder.build().unwrap(); data_files.push(data_file); } @@ -365,6 +383,266 @@ impl ParquetWriter { Ok(data_files) } + /// Extract NaN value counts from parquet file + async fn extract_nan_value_counts( + file_io: &FileIO, + file_path: &str, + schema: SchemaRef, + ) -> Result> { + let input_file = file_io.new_input(file_path)?; + let file_metadata = input_file.metadata().await?; + let reader = input_file.reader().await?; + + let mut parquet_reader = ArrowFileReader::new(file_metadata, reader); + let arrow_schema = parquet_reader.get_schema().await.map_err(|err| { + Error::new( + ErrorKind::DataInvalid, + "Failed to get Arrow schema from parquet file", + ) + .with_source(err) + })?; + + let mut nan_visitor = crate::arrow::NanValueCountVisitor::new(); + + // Read all row groups and compute NaN counts + let num_row_groups = parquet_reader.get_metadata(None).await?.num_row_groups(); + for row_group_idx in 0..num_row_groups { + let record_batch_stream = parquet_reader + .get_row_group(row_group_idx) + .map_err(|err| { + Error::new( + ErrorKind::DataInvalid, + format!("Failed to read row group {}", row_group_idx), + ) + .with_source(err) + })?; + + let record_batch = record_batch_stream.try_collect::>().await.map_err(|err| { + Error::new( + ErrorKind::DataInvalid, + "Failed to collect record batches from parquet file", + ) + .with_source(err) + })?; + + for batch in record_batch { + crate::spec::visit_schema_with_partner(&schema, &batch, &mut nan_visitor)?; + } + } + + Ok(nan_visitor.nan_value_counts) + } + + /// Extract partition values from file path using Hive-style partitioning + /// + /// Supports paths like: `/table_location/year=2023/month=01/day=15/file.parquet` + /// where `year`, `month`, and `day` are partition columns + fn extract_partition_values_from_path( + file_path: &str, + partition_spec: &crate::spec::PartitionSpec, + schema: &crate::spec::Schema, + ) -> Result { + use std::collections::HashMap; + use crate::spec::{Literal, PrimitiveType}; + + let mut partition_values: HashMap> = HashMap::new(); + + // Parse Hive-style partition path segments like "year=2023" + let path_segments: Vec<&str> = file_path.split('/').collect(); + let mut found_partition_values: HashMap = HashMap::new(); + + for segment in path_segments { + if let Some((key, value)) = segment.split_once('=') { + found_partition_values.insert(key.to_string(), value.to_string()); + } + } + + // Process each partition field + for partition_field in partition_spec.fields() { + let source_field_id = partition_field.source_id; + let source_field = schema.field_by_id(source_field_id).ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!("Partition source field with id {} not found in schema", source_field_id), + ) + })?; + + let field_name = source_field.name.clone(); + + if let Some(value_str) = found_partition_values.get(&field_name) { + // Parse the value based on the field type + let literal = Self::parse_partition_value(value_str, source_field.field_type.as_primitive_type())?; + partition_values.insert(field_name, Some(literal)); + } else { + // If partition value is not found in path, default to null + partition_values.insert(field_name, None); + } + } + + // Convert to ordered values according to partition spec + let mut ordered_values = Vec::new(); + for partition_field in partition_spec.fields() { + let source_field_id = partition_field.source_id; + let source_field = schema.field_by_id(source_field_id).ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!("Partition source field with id {} not found in schema", source_field_id), + ) + })?; + + let field_name = &source_field.name; + ordered_values.push(partition_values.get(field_name).cloned().unwrap_or(None)); + } + + Ok(crate::spec::Struct::from_iter(ordered_values)) + } + + /// Parse a partition value string based on the field type + fn parse_partition_value(value_str: &str, primitive_type: Option<&PrimitiveType>) -> Result { + use crate::spec::{Literal, PrimitiveType}; + + let Some(primitive_type) = primitive_type else { + return Err(Error::new( + ErrorKind::DataInvalid, + "Partition field must be of primitive type", + )); + }; + + match primitive_type { + PrimitiveType::Boolean => { + let val = value_str.parse::().map_err(|_| { + Error::new( + ErrorKind::DataInvalid, + format!("Invalid boolean value: {}", value_str), + ) + })?; + Ok(Literal::bool(val)) + }, + PrimitiveType::Int => { + let val = value_str.parse::().map_err(|_| { + Error::new( + ErrorKind::DataInvalid, + format!("Invalid int value: {}", value_str), + ) + })?; + Ok(Literal::int(val)) + }, + PrimitiveType::Long => { + let val = value_str.parse::().map_err(|_| { + Error::new( + ErrorKind::DataInvalid, + format!("Invalid long value: {}", value_str), + ) + })?; + Ok(Literal::long(val)) + }, + PrimitiveType::Float => { + let val = value_str.parse::().map_err(|_| { + Error::new( + ErrorKind::DataInvalid, + format!("Invalid float value: {}", value_str), + ) + })?; + Ok(Literal::float(val)) + }, + PrimitiveType::Double => { + let val = value_str.parse::().map_err(|_| { + Error::new( + ErrorKind::DataInvalid, + format!("Invalid double value: {}", value_str), + ) + })?; + Ok(Literal::double(val)) + }, + PrimitiveType::String => { + Ok(Literal::string(value_str)) + }, + PrimitiveType::Date => { + // Parse date in YYYY-MM-DD format + let parsed_date = chrono::NaiveDate::parse_from_str(value_str, "%Y-%m-%d") + .map_err(|_| { + Error::new( + ErrorKind::DataInvalid, + format!("Invalid date value: {} (expected YYYY-MM-DD)", value_str), + ) + })?; + let days_since_epoch = parsed_date.signed_duration_since(chrono::NaiveDate::from_ymd_opt(1970, 1, 1).unwrap()).num_days(); + Ok(Literal::date(days_since_epoch as i32)) + }, + PrimitiveType::Timestamp => { + // Parse timestamp - support common formats + let timestamp_micros = if let Ok(timestamp_secs) = value_str.parse::() { + // Unix timestamp in seconds + timestamp_secs * 1_000_000 + } else if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(value_str) { + // RFC3339 format + dt.timestamp_micros() + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Invalid timestamp value: {}", value_str), + )); + }; + Ok(Literal::timestamp_micros(timestamp_micros)) + }, + PrimitiveType::Timestamptz => { + // Parse timestamptz - support common formats + let timestamp_micros = if let Ok(timestamp_secs) = value_str.parse::() { + // Unix timestamp in seconds + timestamp_secs * 1_000_000 + } else if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(value_str) { + // RFC3339 format + dt.timestamp_micros() + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Invalid timestamptz value: {}", value_str), + )); + }; + Ok(Literal::timestamptz_micros(timestamp_micros)) + }, + PrimitiveType::Decimal { precision, scale } => { + let decimal = value_str.parse::().map_err(|_| { + Error::new( + ErrorKind::DataInvalid, + format!("Invalid decimal value: {}", value_str), + ) + })?; + Ok(Literal::decimal(decimal.mantissa(), *precision as u32, *scale as u32)?) + }, + PrimitiveType::Uuid => { + let uuid = uuid::Uuid::parse_str(value_str).map_err(|_| { + Error::new( + ErrorKind::DataInvalid, + format!("Invalid UUID value: {}", value_str), + ) + })?; + Ok(Literal::uuid(uuid)) + }, + PrimitiveType::Fixed(_) | PrimitiveType::Binary => { + // For binary types, try base64 decoding, fall back to UTF-8 bytes + match base64::decode(value_str) { + Ok(bytes) => Ok(Literal::binary(bytes)), + Err(_) => Ok(Literal::binary(value_str.as_bytes().to_vec())), + } + }, + PrimitiveType::Time => { + // Parse time as microseconds since midnight + let time = chrono::NaiveTime::parse_from_str(value_str, "%H:%M:%S") + .or_else(|_| chrono::NaiveTime::parse_from_str(value_str, "%H:%M:%S%.f")) + .map_err(|_| { + Error::new( + ErrorKind::DataInvalid, + format!("Invalid time value: {} (expected HH:MM:SS or HH:MM:SS.fff)", value_str), + ) + })?; + let micros_since_midnight = time.num_seconds_from_midnight() as i64 * 1_000_000 + + time.nanosecond() as i64 / 1_000; + Ok(Literal::time_micros(micros_since_midnight)) + }, + } + } + fn thrift_to_parquet_metadata(&self, file_metadata: FileMetaData) -> Result { let mut buffer = Vec::new(); { @@ -377,12 +655,20 @@ impl ParquetWriter { })?; protocol.flush().map_err(|err| { - Error::new(ErrorKind::Unexpected, "Failed to flush protocol").with_source(err) + Error::new( + ErrorKind::Unexpected, + "Failed to flush protocol", + ) + .with_source(err) })?; } let parquet_metadata = ParquetMetaDataReader::decode_metadata(&buffer).map_err(|err| { - Error::new(ErrorKind::Unexpected, "Failed to decode parquet metadata").with_source(err) + Error::new( + ErrorKind::Unexpected, + "Failed to decode parquet metadata", + ) + .with_source(err) })?; Ok(parquet_metadata) @@ -541,8 +827,11 @@ impl FileWriter for ParquetWriter { Some(self.writer_properties.clone()), ) .map_err(|err| { - Error::new(ErrorKind::Unexpected, "Failed to build parquet writer.") - .with_source(err) + Error::new( + ErrorKind::Unexpected, + "Failed to build parquet writer.", + ) + .with_source(err) })?; self.inner_writer = Some(writer); self.inner_writer.as_mut().unwrap() @@ -566,7 +855,11 @@ impl FileWriter for ParquetWriter { }; let metadata = writer.close().await.map_err(|err| { - Error::new(ErrorKind::Unexpected, "Failed to close parquet writer.").with_source(err) + Error::new( + ErrorKind::Unexpected, + "Failed to close parquet writer.", + ) + .with_source(err) })?; let written_size = self.written_size.load(std::sync::atomic::Ordering::Relaxed); @@ -581,14 +874,13 @@ impl FileWriter for ParquetWriter { })?; Ok(vec![]) } else { - let parquet_metadata = - Arc::new(self.thrift_to_parquet_metadata(metadata).map_err(|err| { - Error::new( - ErrorKind::Unexpected, - "Failed to convert metadata from thrift to parquet.", - ) - .with_source(err) - })?); + let parquet_metadata = Arc::new(self.thrift_to_parquet_metadata(metadata).map_err(|err| { + Error::new( + ErrorKind::Unexpected, + "Failed to convert metadata from thrift to parquet.", + ) + .with_source(err) + })?); Ok(vec![Self::parquet_to_data_file_builder( self.schema,