Skip to content

Commit 72befec

Browse files
committed
Merge branch 'main' into partition-spec-support
2 parents 2b1c28a + 76d8e2d commit 72befec

File tree

12 files changed

+904
-11
lines changed

12 files changed

+904
-11
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ enum-ordinalize = "4.3.0"
7171
env_logger = "0.11.8"
7272
expect-test = "1"
7373
faststr = "0.2.31"
74+
flate2 = "1.1.5"
7475
fnv = "1.0.7"
7576
fs-err = "3.1.0"
7677
futures = "0.3"

crates/iceberg/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ bytes = { workspace = true }
6363
chrono = { workspace = true }
6464
derive_builder = { workspace = true }
6565
expect-test = { workspace = true }
66+
flate2 = { workspace = true }
6667
fnv = { workspace = true }
6768
futures = { workspace = true }
6869
itertools = { workspace = true }

crates/iceberg/src/arrow/caching_delete_file_loader.rs

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -788,4 +788,117 @@ mod tests {
788788
assert_eq!(data_col.value(1), "d");
789789
assert_eq!(data_col.value(2), "g");
790790
}
791+
792+
/// Test loading a FileScanTask with BOTH positional and equality deletes.
793+
/// Verifies the fix for the inverted condition that caused "Missing predicate for equality delete file" errors.
794+
#[tokio::test]
795+
async fn test_load_deletes_with_mixed_types() {
796+
use crate::scan::FileScanTask;
797+
use crate::spec::{DataFileFormat, Schema};
798+
799+
let tmp_dir = TempDir::new().unwrap();
800+
let table_location = tmp_dir.path();
801+
let file_io = FileIO::from_path(table_location.as_os_str().to_str().unwrap())
802+
.unwrap()
803+
.build()
804+
.unwrap();
805+
806+
// Create the data file schema
807+
let data_file_schema = Arc::new(
808+
Schema::builder()
809+
.with_fields(vec![
810+
crate::spec::NestedField::optional(
811+
2,
812+
"y",
813+
crate::spec::Type::Primitive(crate::spec::PrimitiveType::Long),
814+
)
815+
.into(),
816+
crate::spec::NestedField::optional(
817+
3,
818+
"z",
819+
crate::spec::Type::Primitive(crate::spec::PrimitiveType::Long),
820+
)
821+
.into(),
822+
])
823+
.build()
824+
.unwrap(),
825+
);
826+
827+
// Write positional delete file
828+
let positional_delete_schema = crate::arrow::delete_filter::tests::create_pos_del_schema();
829+
let file_path_values =
830+
vec![format!("{}/data-1.parquet", table_location.to_str().unwrap()); 4];
831+
let file_path_col = Arc::new(StringArray::from_iter_values(&file_path_values));
832+
let pos_col = Arc::new(Int64Array::from_iter_values(vec![0i64, 1, 2, 3]));
833+
834+
let positional_deletes_to_write =
835+
RecordBatch::try_new(positional_delete_schema.clone(), vec![
836+
file_path_col,
837+
pos_col,
838+
])
839+
.unwrap();
840+
841+
let props = WriterProperties::builder()
842+
.set_compression(Compression::SNAPPY)
843+
.build();
844+
845+
let pos_del_path = format!("{}/pos-del-mixed.parquet", table_location.to_str().unwrap());
846+
let file = File::create(&pos_del_path).unwrap();
847+
let mut writer = ArrowWriter::try_new(
848+
file,
849+
positional_deletes_to_write.schema(),
850+
Some(props.clone()),
851+
)
852+
.unwrap();
853+
writer.write(&positional_deletes_to_write).unwrap();
854+
writer.close().unwrap();
855+
856+
// Write equality delete file
857+
let eq_delete_path = setup_write_equality_delete_file_1(table_location.to_str().unwrap());
858+
859+
// Create FileScanTask with BOTH positional and equality deletes
860+
let pos_del = FileScanTaskDeleteFile {
861+
file_path: pos_del_path,
862+
file_type: DataContentType::PositionDeletes,
863+
partition_spec_id: 0,
864+
equality_ids: None,
865+
};
866+
867+
let eq_del = FileScanTaskDeleteFile {
868+
file_path: eq_delete_path.clone(),
869+
file_type: DataContentType::EqualityDeletes,
870+
partition_spec_id: 0,
871+
equality_ids: Some(vec![2, 3]), // Only use field IDs that exist in both schemas
872+
};
873+
874+
let file_scan_task = FileScanTask {
875+
start: 0,
876+
length: 0,
877+
record_count: None,
878+
data_file_path: format!("{}/data-1.parquet", table_location.to_str().unwrap()),
879+
data_file_format: DataFileFormat::Parquet,
880+
schema: data_file_schema.clone(),
881+
project_field_ids: vec![2, 3],
882+
predicate: None,
883+
deletes: vec![pos_del, eq_del],
884+
};
885+
886+
// Load the deletes - should handle both types without error
887+
let delete_file_loader = CachingDeleteFileLoader::new(file_io.clone(), 10);
888+
let delete_filter = delete_file_loader
889+
.load_deletes(&file_scan_task.deletes, file_scan_task.schema_ref())
890+
.await
891+
.unwrap()
892+
.unwrap();
893+
894+
// Verify both delete types can be processed together
895+
let result = delete_filter
896+
.build_equality_delete_predicate(&file_scan_task)
897+
.await;
898+
assert!(
899+
result.is_ok(),
900+
"Failed to build equality delete predicate: {:?}",
901+
result.err()
902+
);
903+
}
791904
}

crates/iceberg/src/arrow/delete_filter.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,10 +68,12 @@ impl DeleteFilter {
6868
pub(crate) fn try_start_eq_del_load(&self, file_path: &str) -> Option<Arc<Notify>> {
6969
let mut state = self.state.write().unwrap();
7070

71-
if !state.equality_deletes.contains_key(file_path) {
71+
// Skip if already loaded/loading - another task owns it
72+
if state.equality_deletes.contains_key(file_path) {
7273
return None;
7374
}
7475

76+
// Mark as loading to prevent duplicate work
7577
let notifier = Arc::new(Notify::new());
7678
state
7779
.equality_deletes

crates/iceberg/src/arrow/record_batch_partition_splitter.rs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,19 +38,13 @@ pub const PROJECTED_PARTITION_VALUE_COLUMN: &str = "_partition";
3838
/// The splitter supports two modes for obtaining partition values:
3939
/// - **Computed mode** (`calculator` is `Some`): Computes partition values from source columns using transforms
4040
/// - **Pre-computed mode** (`calculator` is `None`): Expects a `_partition` column in the input batch
41-
// # TODO
42-
// Remove this after partition writer supported.
43-
#[allow(dead_code)]
4441
pub struct RecordBatchPartitionSplitter {
4542
schema: SchemaRef,
4643
partition_spec: PartitionSpecRef,
4744
calculator: Option<PartitionValueCalculator>,
4845
partition_type: StructType,
4946
}
5047

51-
// # TODO
52-
// Remove this after partition writer supported.
53-
#[allow(dead_code)]
5448
impl RecordBatchPartitionSplitter {
5549
/// Create a new RecordBatchPartitionSplitter.
5650
///

crates/iceberg/src/spec/table_metadata.rs

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,12 @@ use std::cmp::Ordering;
2222
use std::collections::HashMap;
2323
use std::fmt::{Display, Formatter};
2424
use std::hash::Hash;
25+
use std::io::Read as _;
2526
use std::sync::Arc;
2627

2728
use _serde::TableMetadataEnum;
2829
use chrono::{DateTime, Utc};
30+
use flate2::read::GzDecoder;
2931
use serde::{Deserialize, Serialize};
3032
use serde_repr::{Deserialize_repr, Serialize_repr};
3133
use uuid::Uuid;
@@ -426,9 +428,30 @@ impl TableMetadata {
426428
file_io: &FileIO,
427429
metadata_location: impl AsRef<str>,
428430
) -> Result<TableMetadata> {
431+
let metadata_location = metadata_location.as_ref();
429432
let input_file = file_io.new_input(metadata_location)?;
430433
let metadata_content = input_file.read().await?;
431-
let metadata = serde_json::from_slice::<TableMetadata>(&metadata_content)?;
434+
435+
// Check if the file is compressed by looking for the gzip "magic number".
436+
let metadata = if metadata_content.len() > 2
437+
&& metadata_content[0] == 0x1F
438+
&& metadata_content[1] == 0x8B
439+
{
440+
let mut decoder = GzDecoder::new(metadata_content.as_ref());
441+
let mut decompressed_data = Vec::new();
442+
decoder.read_to_end(&mut decompressed_data).map_err(|e| {
443+
Error::new(
444+
ErrorKind::DataInvalid,
445+
"Trying to read compressed metadata file",
446+
)
447+
.with_context("file_path", metadata_location)
448+
.with_source(e)
449+
})?;
450+
serde_json::from_slice(&decompressed_data)?
451+
} else {
452+
serde_json::from_slice(&metadata_content)?
453+
};
454+
432455
Ok(metadata)
433456
}
434457

@@ -1516,6 +1539,7 @@ impl SnapshotLog {
15161539
mod tests {
15171540
use std::collections::HashMap;
15181541
use std::fs;
1542+
use std::io::Write as _;
15191543
use std::sync::Arc;
15201544

15211545
use anyhow::Result;
@@ -3524,6 +3548,30 @@ mod tests {
35243548
assert_eq!(read_metadata, original_metadata);
35253549
}
35263550

3551+
#[tokio::test]
3552+
async fn test_table_metadata_read_compressed() {
3553+
let temp_dir = TempDir::new().unwrap();
3554+
let metadata_location = temp_dir.path().join("v1.gz.metadata.json");
3555+
3556+
let original_metadata: TableMetadata = get_test_table_metadata("TableMetadataV2Valid.json");
3557+
let json = serde_json::to_string(&original_metadata).unwrap();
3558+
3559+
let mut encoder = flate2::write::GzEncoder::new(Vec::new(), flate2::Compression::default());
3560+
encoder.write_all(json.as_bytes()).unwrap();
3561+
std::fs::write(&metadata_location, encoder.finish().unwrap())
3562+
.expect("failed to write metadata");
3563+
3564+
// Read the metadata back
3565+
let file_io = FileIOBuilder::new_fs_io().build().unwrap();
3566+
let metadata_location = metadata_location.to_str().unwrap();
3567+
let read_metadata = TableMetadata::read_from(&file_io, metadata_location)
3568+
.await
3569+
.unwrap();
3570+
3571+
// Verify the metadata matches
3572+
assert_eq!(read_metadata, original_metadata);
3573+
}
3574+
35273575
#[tokio::test]
35283576
async fn test_table_metadata_read_nonexistent_file() {
35293577
// Create a FileIO instance

crates/iceberg/src/table.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -297,9 +297,7 @@ impl StaticTable {
297297
table_ident: TableIdent,
298298
file_io: FileIO,
299299
) -> Result<Self> {
300-
let metadata_file = file_io.new_input(metadata_location)?;
301-
let metadata_file_content = metadata_file.read().await?;
302-
let metadata = serde_json::from_slice::<TableMetadata>(&metadata_file_content)?;
300+
let metadata = TableMetadata::read_from(&file_io, metadata_location).await?;
303301

304302
let table = Table::builder()
305303
.metadata(metadata)

crates/iceberg/src/writer/partitioning/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
2424
pub mod clustered_writer;
2525
pub mod fanout_writer;
26+
pub mod unpartitioned_writer;
2627

2728
use crate::Result;
2829
use crate::spec::PartitionKey;

0 commit comments

Comments
 (0)