Skip to content

Commit cf67332

Browse files
committed
Merge branch 'main' of github.com:apache/iceberg-rust into fd-deserialize-bytes
2 parents aba664e + 76d8e2d commit cf67332

28 files changed

+3180
-137
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ enum-ordinalize = "4.3.0"
7171
env_logger = "0.11.8"
7272
expect-test = "1"
7373
faststr = "0.2.31"
74+
flate2 = "1.1.5"
7475
fnv = "1.0.7"
7576
fs-err = "3.1.0"
7677
futures = "0.3"

crates/iceberg/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ bytes = { workspace = true }
6363
chrono = { workspace = true }
6464
derive_builder = { workspace = true }
6565
expect-test = { workspace = true }
66+
flate2 = { workspace = true }
6667
fnv = { workspace = true }
6768
futures = { workspace = true }
6869
itertools = { workspace = true }

crates/iceberg/src/arrow/caching_delete_file_loader.rs

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -788,4 +788,117 @@ mod tests {
788788
assert_eq!(data_col.value(1), "d");
789789
assert_eq!(data_col.value(2), "g");
790790
}
791+
792+
/// Test loading a FileScanTask with BOTH positional and equality deletes.
793+
/// Verifies the fix for the inverted condition that caused "Missing predicate for equality delete file" errors.
794+
#[tokio::test]
795+
async fn test_load_deletes_with_mixed_types() {
796+
use crate::scan::FileScanTask;
797+
use crate::spec::{DataFileFormat, Schema};
798+
799+
let tmp_dir = TempDir::new().unwrap();
800+
let table_location = tmp_dir.path();
801+
let file_io = FileIO::from_path(table_location.as_os_str().to_str().unwrap())
802+
.unwrap()
803+
.build()
804+
.unwrap();
805+
806+
// Create the data file schema
807+
let data_file_schema = Arc::new(
808+
Schema::builder()
809+
.with_fields(vec![
810+
crate::spec::NestedField::optional(
811+
2,
812+
"y",
813+
crate::spec::Type::Primitive(crate::spec::PrimitiveType::Long),
814+
)
815+
.into(),
816+
crate::spec::NestedField::optional(
817+
3,
818+
"z",
819+
crate::spec::Type::Primitive(crate::spec::PrimitiveType::Long),
820+
)
821+
.into(),
822+
])
823+
.build()
824+
.unwrap(),
825+
);
826+
827+
// Write positional delete file
828+
let positional_delete_schema = crate::arrow::delete_filter::tests::create_pos_del_schema();
829+
let file_path_values =
830+
vec![format!("{}/data-1.parquet", table_location.to_str().unwrap()); 4];
831+
let file_path_col = Arc::new(StringArray::from_iter_values(&file_path_values));
832+
let pos_col = Arc::new(Int64Array::from_iter_values(vec![0i64, 1, 2, 3]));
833+
834+
let positional_deletes_to_write =
835+
RecordBatch::try_new(positional_delete_schema.clone(), vec![
836+
file_path_col,
837+
pos_col,
838+
])
839+
.unwrap();
840+
841+
let props = WriterProperties::builder()
842+
.set_compression(Compression::SNAPPY)
843+
.build();
844+
845+
let pos_del_path = format!("{}/pos-del-mixed.parquet", table_location.to_str().unwrap());
846+
let file = File::create(&pos_del_path).unwrap();
847+
let mut writer = ArrowWriter::try_new(
848+
file,
849+
positional_deletes_to_write.schema(),
850+
Some(props.clone()),
851+
)
852+
.unwrap();
853+
writer.write(&positional_deletes_to_write).unwrap();
854+
writer.close().unwrap();
855+
856+
// Write equality delete file
857+
let eq_delete_path = setup_write_equality_delete_file_1(table_location.to_str().unwrap());
858+
859+
// Create FileScanTask with BOTH positional and equality deletes
860+
let pos_del = FileScanTaskDeleteFile {
861+
file_path: pos_del_path,
862+
file_type: DataContentType::PositionDeletes,
863+
partition_spec_id: 0,
864+
equality_ids: None,
865+
};
866+
867+
let eq_del = FileScanTaskDeleteFile {
868+
file_path: eq_delete_path.clone(),
869+
file_type: DataContentType::EqualityDeletes,
870+
partition_spec_id: 0,
871+
equality_ids: Some(vec![2, 3]), // Only use field IDs that exist in both schemas
872+
};
873+
874+
let file_scan_task = FileScanTask {
875+
start: 0,
876+
length: 0,
877+
record_count: None,
878+
data_file_path: format!("{}/data-1.parquet", table_location.to_str().unwrap()),
879+
data_file_format: DataFileFormat::Parquet,
880+
schema: data_file_schema.clone(),
881+
project_field_ids: vec![2, 3],
882+
predicate: None,
883+
deletes: vec![pos_del, eq_del],
884+
};
885+
886+
// Load the deletes - should handle both types without error
887+
let delete_file_loader = CachingDeleteFileLoader::new(file_io.clone(), 10);
888+
let delete_filter = delete_file_loader
889+
.load_deletes(&file_scan_task.deletes, file_scan_task.schema_ref())
890+
.await
891+
.unwrap()
892+
.unwrap();
893+
894+
// Verify both delete types can be processed together
895+
let result = delete_filter
896+
.build_equality_delete_predicate(&file_scan_task)
897+
.await;
898+
assert!(
899+
result.is_ok(),
900+
"Failed to build equality delete predicate: {:?}",
901+
result.err()
902+
);
903+
}
791904
}

crates/iceberg/src/arrow/delete_filter.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,10 +68,12 @@ impl DeleteFilter {
6868
pub(crate) fn try_start_eq_del_load(&self, file_path: &str) -> Option<Arc<Notify>> {
6969
let mut state = self.state.write().unwrap();
7070

71-
if !state.equality_deletes.contains_key(file_path) {
71+
// Skip if already loaded/loading - another task owns it
72+
if state.equality_deletes.contains_key(file_path) {
7273
return None;
7374
}
7475

76+
// Mark as loading to prevent duplicate work
7577
let notifier = Arc::new(Notify::new());
7678
state
7779
.equality_deletes

crates/iceberg/src/arrow/record_batch_partition_splitter.rs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,19 +38,13 @@ pub const PROJECTED_PARTITION_VALUE_COLUMN: &str = "_partition";
3838
/// The splitter supports two modes for obtaining partition values:
3939
/// - **Computed mode** (`calculator` is `Some`): Computes partition values from source columns using transforms
4040
/// - **Pre-computed mode** (`calculator` is `None`): Expects a `_partition` column in the input batch
41-
// # TODO
42-
// Remove this after partition writer supported.
43-
#[allow(dead_code)]
4441
pub struct RecordBatchPartitionSplitter {
4542
schema: SchemaRef,
4643
partition_spec: PartitionSpecRef,
4744
calculator: Option<PartitionValueCalculator>,
4845
partition_type: StructType,
4946
}
5047

51-
// # TODO
52-
// Remove this after partition writer supported.
53-
#[allow(dead_code)]
5448
impl RecordBatchPartitionSplitter {
5549
/// Create a new RecordBatchPartitionSplitter.
5650
///

crates/iceberg/src/catalog/memory/catalog.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -377,7 +377,7 @@ impl Catalog for MemoryCatalog {
377377
}
378378

379379
#[cfg(test)]
380-
mod tests {
380+
pub(crate) mod tests {
381381
use std::collections::HashSet;
382382
use std::hash::Hash;
383383
use std::iter::FromIterator;
@@ -396,7 +396,7 @@ mod tests {
396396
temp_dir.path().to_str().unwrap().to_string()
397397
}
398398

399-
async fn new_memory_catalog() -> impl Catalog {
399+
pub(crate) async fn new_memory_catalog() -> impl Catalog {
400400
let warehouse_location = temp_path();
401401
MemoryCatalogBuilder::default()
402402
.load(

0 commit comments

Comments
 (0)