Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions crates/iceberg/src/arrow/caching_delete_file_loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ use std::collections::HashMap;

use arrow_array::{Int64Array, StringArray};
use futures::{StreamExt, TryStreamExt};
use tokio::sync::oneshot::{Receiver, channel};
use tokio::sync::oneshot::{channel, Receiver};

use super::delete_filter::DeleteFilter;
use crate::arrow::delete_file_loader::BasicDeleteFileLoader;
use crate::delete_vector::DeleteVector;
use crate::expr::Predicate;
use crate::io::FileIO;
use crate::scan::{ArrowRecordBatchStream, FileScanTaskDeleteFile};
use crate::scan::{ArrowRecordBatchStream, FileScanTask};
use crate::spec::{DataContentType, SchemaRef};
use crate::{Error, ErrorKind, Result};

Expand Down Expand Up @@ -129,7 +129,7 @@ impl CachingDeleteFileLoader {
/// ```
pub(crate) fn load_deletes(
&self,
delete_file_entries: &[FileScanTaskDeleteFile],
delete_file_entries: &[FileScanTask],
schema: SchemaRef,
) -> Receiver<Result<DeleteFilter>> {
let (tx, rx) = channel();
Expand Down Expand Up @@ -195,30 +195,30 @@ impl CachingDeleteFileLoader {
}

async fn load_file_for_task(
task: &FileScanTaskDeleteFile,
task: &FileScanTask,
basic_delete_file_loader: BasicDeleteFileLoader,
del_filter: DeleteFilter,
schema: SchemaRef,
) -> Result<DeleteFileContext> {
match task.file_type {
match task.data_file_content {
DataContentType::PositionDeletes => Ok(DeleteFileContext::PosDels(
basic_delete_file_loader
.parquet_to_batch_stream(&task.file_path)
.parquet_to_batch_stream(task.data_file_path())
.await?,
)),

DataContentType::EqualityDeletes => {
let Some(notify) = del_filter.try_start_eq_del_load(&task.file_path) else {
let Some(notify) = del_filter.try_start_eq_del_load(task.data_file_path()) else {
return Ok(DeleteFileContext::ExistingEqDel);
};

let (sender, receiver) = channel();
del_filter.insert_equality_delete(&task.file_path, receiver);
del_filter.insert_equality_delete(task.data_file_path(), receiver);

Ok(DeleteFileContext::FreshEqDel {
batch_stream: BasicDeleteFileLoader::evolve_schema(
basic_delete_file_loader
.parquet_to_batch_stream(&task.file_path)
.parquet_to_batch_stream(task.data_file_path())
.await?,
schema,
)
Expand Down
13 changes: 9 additions & 4 deletions crates/iceberg/src/arrow/delete_file_loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ mod tests {

use super::*;
use crate::arrow::delete_filter::tests::setup;
use crate::spec::DataContentType;

#[tokio::test]
async fn test_basic_delete_file_loader_read_delete_file() {
Expand All @@ -129,11 +130,15 @@ mod tests {

let file_scan_tasks = setup(table_location);

let delete_task = FileScanTaskDeleteFile {
file_path: file_scan_tasks[0].deletes[0].data_file_path.clone(),
file_type: DataContentType::PositionDeletes,
partition_spec_id: 0,
equality_ids: vec![],
};

let result = delete_file_loader
.read_delete_file(
&file_scan_tasks[0].deletes[0],
file_scan_tasks[0].schema_ref(),
)
.read_delete_file(&delete_task, file_scan_tasks[0].schema_ref())
.await
.unwrap();

Expand Down
53 changes: 27 additions & 26 deletions crates/iceberg/src/arrow/delete_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@
use std::collections::HashMap;
use std::sync::{Arc, Mutex, RwLock};

use tokio::sync::Notify;
use tokio::sync::oneshot::Receiver;
use tokio::sync::Notify;

use crate::delete_vector::DeleteVector;
use crate::expr::Predicate::AlwaysTrue;
use crate::expr::{Bind, BoundPredicate, Predicate};
use crate::scan::{FileScanTask, FileScanTaskDeleteFile};
use crate::scan::FileScanTask;
use crate::spec::DataContentType;
use crate::{Error, ErrorKind, Result};

Expand Down Expand Up @@ -120,14 +120,14 @@ impl DeleteFilter {
}

let Some(predicate) = self
.get_equality_delete_predicate_for_delete_file_path(&delete.file_path)
.get_equality_delete_predicate_for_delete_file_path(delete.data_file_path())
.await
else {
return Err(Error::new(
ErrorKind::Unexpected,
format!(
"Missing predicate for equality delete file '{}'",
delete.file_path
delete.data_file_path()
),
));
};
Expand Down Expand Up @@ -190,8 +190,8 @@ impl DeleteFilter {
}
}

pub(crate) fn is_equality_delete(f: &FileScanTaskDeleteFile) -> bool {
matches!(f.file_type, DataContentType::EqualityDeletes)
pub(crate) fn is_equality_delete(f: &FileScanTask) -> bool {
matches!(f.data_file_content, DataContentType::EqualityDeletes)
}

#[cfg(test)]
Expand Down Expand Up @@ -307,24 +307,19 @@ pub(crate) mod tests {
writer.close().unwrap();
}

let pos_del_1 = FileScanTaskDeleteFile {
file_path: format!("{}/pos-del-1.parquet", table_location.to_str().unwrap()),
file_type: DataContentType::PositionDeletes,
partition_spec_id: 0,
equality_ids: vec![],
};

let pos_del_2 = FileScanTaskDeleteFile {
file_path: format!("{}/pos-del-2.parquet", table_location.to_str().unwrap()),
file_type: DataContentType::PositionDeletes,
partition_spec_id: 0,
equality_ids: vec![],
};

let pos_del_3 = FileScanTaskDeleteFile {
file_path: format!("{}/pos-del-3.parquet", table_location.to_str().unwrap()),
file_type: DataContentType::PositionDeletes,
partition_spec_id: 0,
// Helper to build a positional delete task with minimal fields
let make_pos_del_task = |n: u8| FileScanTask {
start: 0,
length: 0,
record_count: None,
data_file_path: format!("{}/pos-del-{}.parquet", table_location.to_str().unwrap(), n),
data_file_content: DataContentType::PositionDeletes,
data_file_format: DataFileFormat::Parquet,
schema: data_file_schema.clone(),
project_field_ids: vec![],
predicate: None,
deletes: vec![],
sequence_number: 0,
equality_ids: vec![],
};

Expand All @@ -334,22 +329,28 @@ pub(crate) mod tests {
length: 0,
record_count: None,
data_file_path: format!("{}/1.parquet", table_location.to_str().unwrap()),
data_file_content: DataContentType::Data,
data_file_format: DataFileFormat::Parquet,
schema: data_file_schema.clone(),
project_field_ids: vec![],
predicate: None,
deletes: vec![pos_del_1, pos_del_2.clone()],
deletes: vec![make_pos_del_task(1), make_pos_del_task(2)],
sequence_number: 0,
equality_ids: vec![],
},
FileScanTask {
start: 0,
length: 0,
record_count: None,
data_file_path: format!("{}/2.parquet", table_location.to_str().unwrap()),
data_file_content: DataContentType::Data,
data_file_format: DataFileFormat::Parquet,
schema: data_file_schema.clone(),
project_field_ids: vec![],
predicate: None,
deletes: vec![pos_del_3],
deletes: vec![make_pos_del_task(3)],
sequence_number: 0,
equality_ids: vec![],
},
];

Expand Down
52 changes: 32 additions & 20 deletions crates/iceberg/src/arrow/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@ use arrow_string::like::starts_with;
use bytes::Bytes;
use fnv::FnvHashSet;
use futures::future::BoxFuture;
use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt, try_join};
use futures::{try_join, FutureExt, StreamExt, TryFutureExt, TryStreamExt};
use parquet::arrow::arrow_reader::{
ArrowPredicateFn, ArrowReaderOptions, RowFilter, RowSelection, RowSelector,
};
use parquet::arrow::async_reader::AsyncFileReader;
use parquet::arrow::{PARQUET_FIELD_ID_META_KEY, ParquetRecordBatchStreamBuilder, ProjectionMask};
use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask, PARQUET_FIELD_ID_META_KEY};
use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader, RowGroupMetaData};
use parquet::schema::types::{SchemaDescriptor, Type as ParquetType};

Expand All @@ -47,13 +47,13 @@ use crate::arrow::record_batch_transformer::RecordBatchTransformer;
use crate::arrow::{arrow_schema_to_schema, get_arrow_datum};
use crate::delete_vector::DeleteVector;
use crate::error::Result;
use crate::expr::visitors::bound_predicate_visitor::{BoundPredicateVisitor, visit};
use crate::expr::visitors::bound_predicate_visitor::{visit, BoundPredicateVisitor};
use crate::expr::visitors::page_index_evaluator::PageIndexEvaluator;
use crate::expr::visitors::row_group_metrics_evaluator::RowGroupMetricsEvaluator;
use crate::expr::{BoundPredicate, BoundReference};
use crate::io::{FileIO, FileMetadata, FileRead};
use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream};
use crate::spec::{Datum, NestedField, PrimitiveType, Schema, Type};
use crate::spec::{DataContentType, Datum, NestedField, PrimitiveType, Schema, Type};
use crate::utils::available_parallelism;
use crate::{Error, ErrorKind};

Expand Down Expand Up @@ -312,13 +312,16 @@ impl ArrowReader {

// Build the batch stream and send all the RecordBatches that it generates
// to the requester.
let record_batch_stream =
record_batch_stream_builder
.build()?
.map(move |batch| match batch {
let record_batch_stream = record_batch_stream_builder.build()?.map(move |batch| {
if matches!(task.data_file_content, DataContentType::PositionDeletes) {
Ok(batch?)
} else {
match batch {
Ok(batch) => record_batch_transformer.process_record_batch(batch),
Err(err) => Err(err.into()),
});
}
}
});

Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream)
}
Expand Down Expand Up @@ -1443,15 +1446,15 @@ mod tests {
use roaring::RoaringTreemap;
use tempfile::TempDir;

use crate::ErrorKind;
use crate::arrow::reader::{CollectFieldIdVisitor, PARQUET_FIELD_ID_META_KEY};
use crate::arrow::{ArrowReader, ArrowReaderBuilder};
use crate::delete_vector::DeleteVector;
use crate::expr::visitors::bound_predicate_visitor::visit;
use crate::expr::{Bind, Predicate, Reference};
use crate::io::FileIO;
use crate::scan::{FileScanTask, FileScanTaskStream};
use crate::spec::{DataFileFormat, Datum, NestedField, PrimitiveType, Schema, SchemaRef, Type};
use crate::spec::{DataContentType, DataFileFormat, Datum, NestedField, PrimitiveType, Schema, SchemaRef, Type};
use crate::ErrorKind;

fn table_schema_simple() -> SchemaRef {
Arc::new(
Expand Down Expand Up @@ -1740,11 +1743,14 @@ message schema {
length: 0,
record_count: None,
data_file_path: format!("{}/1.parquet", table_location),
data_file_content: DataContentType::Data,
data_file_format: DataFileFormat::Parquet,
schema: schema.clone(),
project_field_ids: vec![1],
predicate: Some(predicate.bind(schema, true).unwrap()),
deletes: vec![],
sequence_number: 0,
equality_ids: vec![],
})]
.into_iter(),
)) as FileScanTaskStream;
Expand Down Expand Up @@ -1774,19 +1780,25 @@ message schema {
let schema = Arc::new(
Schema::builder()
.with_schema_id(1)
.with_fields(vec![
NestedField::optional(1, "a", Type::Primitive(PrimitiveType::String)).into(),
])
.with_fields(vec![NestedField::optional(
1,
"a",
Type::Primitive(PrimitiveType::String),
)
.into()])
.build()
.unwrap(),
);

let arrow_schema = Arc::new(ArrowSchema::new(vec![
Field::new("a", col_a_type.clone(), true).with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"1".to_string(),
)])),
]));
let arrow_schema = Arc::new(ArrowSchema::new(vec![Field::new(
"a",
col_a_type.clone(),
true,
)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"1".to_string(),
)]))]));

let tmp_dir = TempDir::new().unwrap();
let table_location = tmp_dir.path().to_str().unwrap().to_string();
Expand Down
17 changes: 9 additions & 8 deletions crates/iceberg/src/delete_file_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ use std::collections::HashMap;
use std::ops::Deref;
use std::sync::{Arc, RwLock};

use futures::channel::mpsc::{channel, Sender};
use futures::StreamExt;
use futures::channel::mpsc::{Sender, channel};
use tokio::sync::Notify;

use crate::runtime::spawn;
use crate::scan::{DeleteFileContext, FileScanTaskDeleteFile};
use crate::scan::{DeleteFileContext, FileScanTask};
use crate::spec::{DataContentType, DataFile, Struct};

/// Index of delete files
Expand Down Expand Up @@ -85,7 +85,7 @@ impl DeleteFileIndex {
&self,
data_file: &DataFile,
seq_num: Option<i64>,
) -> Vec<FileScanTaskDeleteFile> {
) -> Vec<FileScanTask> {
let notifier = {
let guard = self.state.read().unwrap();
match *guard {
Expand Down Expand Up @@ -132,10 +132,11 @@ impl PopulatedDeleteFileIndex {
// The spec states that "Equality delete files stored with an unpartitioned spec are applied as global deletes".
if partition.fields().is_empty() {
// TODO: confirm we're good to skip here if we encounter a pos del
if arc_ctx.manifest_entry.content_type() != DataContentType::PositionDeletes {
global_deletes.push(arc_ctx);
return;
}
// FIXME(Dylan): allow putting position delete to global deletes.
// if arc_ctx.manifest_entry.content_type() != DataContentType::PositionDeletes {
global_deletes.push(arc_ctx);
return;
// }
}

let destination_map = match arc_ctx.manifest_entry.content_type() {
Expand Down Expand Up @@ -164,7 +165,7 @@ impl PopulatedDeleteFileIndex {
&self,
data_file: &DataFile,
seq_num: Option<i64>,
) -> Vec<FileScanTaskDeleteFile> {
) -> Vec<FileScanTask> {
let mut results = vec![];

self.global_deletes
Expand Down
3 changes: 3 additions & 0 deletions crates/iceberg/src/scan/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ impl ManifestEntryContext {
record_count: Some(self.manifest_entry.record_count()),

data_file_path: self.manifest_entry.file_path().to_string(),
data_file_content: self.manifest_entry.data_file().content_type(),
data_file_format: self.manifest_entry.file_format(),

schema: self.snapshot_schema,
Expand All @@ -136,6 +137,8 @@ impl ManifestEntryContext {
.map(|x| x.as_ref().snapshot_bound_predicate.clone()),

deletes,
sequence_number: self.manifest_entry.sequence_number().unwrap_or(0),
equality_ids: self.manifest_entry.data_file().equality_ids().to_vec(),
})
}
}
Expand Down
Loading
Loading