Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions crates/iceberg/src/io/file_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,13 +145,13 @@ impl FileIO {
let (op, relative_path) = self.inner.create_operator(&path)?;
let path = path.as_ref().to_string();
let relative_path_pos = path.len() - relative_path.len();

// ADLS requires append mode for writes
#[cfg(feature = "storage-azdls")]
let append_file = matches!(self.inner.as_ref(), Storage::Azdls { .. });
#[cfg(not(feature = "storage-azdls"))]
let append_file = false;

Ok(OutputFile {
op,
path,
Expand Down
6 changes: 3 additions & 3 deletions crates/iceberg/src/io/storage_azdls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ use std::collections::HashMap;
use std::fmt::Display;
use std::str::FromStr;

use opendal::Configurator;
use opendal::services::AzdlsConfig;
use opendal::Configurator;
use url::Url;

use crate::{Error, ErrorKind, Result, ensure_data_valid};
use crate::{ensure_data_valid, Error, ErrorKind, Result};

/// A connection string.
///
Expand Down Expand Up @@ -341,7 +341,7 @@ mod tests {

use opendal::services::AzdlsConfig;

use super::{AzureStoragePath, AzureStorageScheme, azdls_create_operator};
use super::{azdls_create_operator, AzureStoragePath, AzureStorageScheme};
use crate::io::azdls_config_parse;

#[test]
Expand Down
121 changes: 81 additions & 40 deletions crates/iceberg/src/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -383,8 +383,7 @@ pub struct TableScan {
}

impl TableScan {
/// Returns a stream of [`FileScanTask`]s.
pub async fn plan_files(&self) -> Result<FileScanTaskStream> {
async fn plan_files_inner(&self, all_files: bool) -> Result<FileScanTaskStream> {
let concurrency_limit_manifest_files = self.concurrency_limit_manifest_files;
let concurrency_limit_manifest_entries = self.concurrency_limit_manifest_entries;

Expand All @@ -398,7 +397,7 @@ impl TableScan {
let (file_scan_task_tx, file_scan_task_rx) = channel(concurrency_limit_manifest_entries);

let delete_file_idx_and_tx: Option<(DeleteFileIndex, Sender<DeleteFileContext>)> =
if self.delete_file_processing_enabled {
if self.delete_file_processing_enabled && !all_files {
Some(DeleteFileIndex::new())
} else {
None
Expand Down Expand Up @@ -468,7 +467,11 @@ impl TableScan {
concurrency_limit_manifest_entries,
|(manifest_entry_context, tx)| async move {
spawn(async move {
Self::process_data_manifest_entry(manifest_entry_context, tx).await
if all_files {
Self::process_all_manifest_entry(manifest_entry_context, tx).await
} else {
Self::process_data_manifest_entry(manifest_entry_context, tx).await
}
})
.await
},
Expand All @@ -483,6 +486,17 @@ impl TableScan {
return Ok(file_scan_task_rx.boxed());
}

/// Returns a stream of [`FileScanTask`]s for all files (both data and delete files).
/// This method does not distinguish between data and delete files.
pub async fn plan_files_all(&self) -> Result<FileScanTaskStream> {
self.plan_files_inner(true).await
}

/// Returns a stream of [`FileScanTask`]s.
pub async fn plan_files(&self) -> Result<FileScanTaskStream> {
self.plan_files_inner(false).await
}

/// Returns an [`ArrowRecordBatchStream`].
pub async fn to_arrow(&self) -> Result<ArrowRecordBatchStream> {
let mut arrow_reader_builder = ArrowReaderBuilder::new(self.file_io.clone())
Expand Down Expand Up @@ -510,6 +524,39 @@ impl TableScan {
&self.plan_context.snapshot
}

/// Helper method to evaluate filters on a manifest entry.
/// Returns true if the manifest entry passes both partition and metrics filters.
/// For data files, both partition and metrics filters are checked.
/// For delete files, only partition filter is checked.
fn eval_filters(
manifest_entry_context: &ManifestEntryContext,
bound_predicates: &BoundPredicates,
) -> Result<bool> {
// Check partition filter
let expression_evaluator_cache = manifest_entry_context.expression_evaluator_cache.as_ref();
let expression_evaluator = expression_evaluator_cache.get(
manifest_entry_context.partition_spec_id,
&bound_predicates.partition_bound_predicate,
)?;

if !expression_evaluator.eval(manifest_entry_context.manifest_entry.data_file())? {
return Ok(false);
}

// Check metrics filter only for data files
if manifest_entry_context.manifest_entry.content_type() == DataContentType::Data {
if !InclusiveMetricsEvaluator::eval(
&bound_predicates.snapshot_bound_predicate,
manifest_entry_context.manifest_entry.data_file(),
false,
)? {
return Ok(false);
}
}

Ok(true)
}

async fn process_data_manifest_entry(
manifest_entry_context: ManifestEntryContext,
mut file_scan_task_tx: Sender<Result<FileScanTask>>,
Expand All @@ -528,31 +575,8 @@ impl TableScan {
}

if let Some(ref bound_predicates) = manifest_entry_context.bound_predicates {
let BoundPredicates {
snapshot_bound_predicate,
partition_bound_predicate,
} = bound_predicates.as_ref();

let expression_evaluator_cache =
manifest_entry_context.expression_evaluator_cache.as_ref();

let expression_evaluator = expression_evaluator_cache.get(
manifest_entry_context.partition_spec_id,
partition_bound_predicate,
)?;

// skip any data file whose partition data indicates that it can't contain
// any data that matches this scan's filter
if !expression_evaluator.eval(manifest_entry_context.manifest_entry.data_file())? {
return Ok(());
}

// skip any data file whose metrics don't match this scan's filter
if !InclusiveMetricsEvaluator::eval(
snapshot_bound_predicate,
manifest_entry_context.manifest_entry.data_file(),
false,
)? {
// skip any data file that doesn't pass partition and metrics filters
if !Self::eval_filters(&manifest_entry_context, bound_predicates)? {
return Ok(());
}
}
Expand All @@ -567,6 +591,31 @@ impl TableScan {
Ok(())
}

async fn process_all_manifest_entry(
manifest_entry_context: ManifestEntryContext,
mut file_scan_task_tx: Sender<Result<FileScanTask>>,
) -> Result<()> {
// skip processing this manifest entry if it has been marked as deleted
if !manifest_entry_context.manifest_entry.is_alive() {
return Ok(());
}

if let Some(ref bound_predicates) = manifest_entry_context.bound_predicates {
// For data files, check both partition and metrics filters
// For delete files, only check partition filter
if !Self::eval_filters(&manifest_entry_context, bound_predicates)? {
return Ok(());
}
}

// Create a corresponding FileScanTask and push it to the result stream
file_scan_task_tx
.send(Ok(manifest_entry_context.into_file_scan_task().await?))
.await?;

Ok(())
}

async fn process_delete_manifest_entry(
manifest_entry_context: ManifestEntryContext,
mut delete_file_ctx_tx: Sender<DeleteFileContext>,
Expand All @@ -585,17 +634,9 @@ impl TableScan {
}

if let Some(ref bound_predicates) = manifest_entry_context.bound_predicates {
let expression_evaluator_cache =
manifest_entry_context.expression_evaluator_cache.as_ref();

let expression_evaluator = expression_evaluator_cache.get(
manifest_entry_context.partition_spec_id,
&bound_predicates.partition_bound_predicate,
)?;

// skip any data file whose partition data indicates that it can't contain
// any data that matches this scan's filter
if !expression_evaluator.eval(manifest_entry_context.manifest_entry.data_file())? {
// skip any delete file whose partition data indicates that it can't contain
// any data that matches this scan's filter (no metrics check for delete files)
if !Self::eval_filters(&manifest_entry_context, bound_predicates)? {
return Ok(());
}
}
Expand Down
Loading
Loading