Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions datafusion-cli/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ impl TableProvider for ParquetMetadataTable {
projection: Option<&Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
_offset: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(MemorySourceConfig::try_new_exec(
&[vec![self.batch.clone()]],
Expand Down Expand Up @@ -496,6 +497,7 @@ impl TableProvider for MetadataCacheTable {
projection: Option<&Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
_offset: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(MemorySourceConfig::try_new_exec(
&[vec![self.batch.clone()]],
Expand Down Expand Up @@ -614,6 +616,7 @@ impl TableProvider for StatisticsCacheTable {
projection: Option<&Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
_offset: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(MemorySourceConfig::try_new_exec(
&[vec![self.batch.clone()]],
Expand Down Expand Up @@ -729,6 +732,7 @@ impl TableProvider for ListFilesCacheTable {
projection: Option<&Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
_offset: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(MemorySourceConfig::try_new_exec(
&[vec![self.batch.clone()]],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ impl TableProvider for CustomDataSource {
// filters and limit can be used here to inject some push-down operations if needed
_filters: &[Expr],
_limit: Option<usize>,
_offset: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
return self.create_physical_plan(projection, self.schema()).await;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ impl TableProvider for DefaultValueTableProvider {
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
offset: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
let schema = Arc::clone(&self.schema);
let df_schema = DFSchema::try_from(schema.clone())?;
Expand Down Expand Up @@ -260,6 +261,7 @@ impl TableProvider for DefaultValueTableProvider {
)
.with_projection_indices(projection.cloned())?
.with_limit(limit)
.with_offset(offset)
.with_file_group(file_group)
.with_expr_adapter(Some(Arc::new(DefaultValuePhysicalExprAdapterFactory) as _));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,7 @@ impl TableProvider for IndexTableProvider {
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
offset: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
let indexed_file = &self.indexed_file;
let predicate = self.filters_to_predicate(state, filters)?;
Expand Down Expand Up @@ -502,6 +503,7 @@ impl TableProvider for IndexTableProvider {
);
let file_scan_config = FileScanConfigBuilder::new(object_store_url, file_source)
.with_limit(limit)
.with_offset(offset)
.with_projection_indices(projection.cloned())?
.with_file(partitioned_file)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,7 @@ impl TableProvider for DistinctIndexTable {
_proj: Option<&Vec<usize>>,
filters: &[Expr],
_limit: Option<usize>,
_offset: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
// This example only handles filters of the form
// `category = 'X'` where X is a string literal
Expand Down
4 changes: 3 additions & 1 deletion datafusion-examples/examples/data_io/parquet_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ impl TableProvider for IndexTableProvider {
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
offset: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
let df_schema = DFSchema::try_from(self.schema())?;
// convert filters like [`a = 1`, `b = 2`] to a single filter like `a = 1 AND b = 2`
Expand All @@ -248,7 +249,8 @@ impl TableProvider for IndexTableProvider {
let mut file_scan_config_builder =
FileScanConfigBuilder::new(object_store_url, source)
.with_projection_indices(projection.cloned())?
.with_limit(limit);
.with_limit(limit)
.with_offset(offset);

// Transform to the format needed to pass to DataSourceExec
// Create one file group per file (default to scanning them all in parallel)
Expand Down
1 change: 1 addition & 0 deletions datafusion-examples/examples/data_io/remote_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ impl TableProvider for RemoteTable {
projection: Option<&Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
_offset: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
// Note that `scan` is called once the plan begin execution, and thus is
// async. When interacting with remote data sources, this is the place
Expand Down
1 change: 1 addition & 0 deletions datafusion-examples/examples/udf/simple_udtf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ impl TableProvider for LocalCsvTable {
projection: Option<&Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
_offset: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
let batches = if let Some(max_return_lines) = self.limit {
// get max return rows from self.batches
Expand Down
98 changes: 82 additions & 16 deletions datafusion/catalog-listing/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ pub struct ListFilesResult {
pub statistics: Statistics,
/// Whether files are grouped by partition values (enables Hash partitioning).
pub grouped_by_partition: bool,
/// Offset remaining after pruning whole files i.e. offset remaining within a file
pub remaining_offset: Option<usize>,
}

/// Built in [`TableProvider`] that reads data from one or more files as a single table.
Expand Down Expand Up @@ -383,11 +385,13 @@ impl TableProvider for ListingTable {
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
offset: Option<usize>,
) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
let options = ScanArgs::default()
.with_projection(projection.map(|p| p.as_slice()))
.with_filters(Some(filters))
.with_limit(limit);
.with_limit(limit)
.with_offset(offset);
Ok(self.scan_with_args(state, options).await?.into_inner())
}

Expand All @@ -399,6 +403,7 @@ impl TableProvider for ListingTable {
let projection = args.projection().map(|p| p.to_vec());
let filters = args.filters().map(|f| f.to_vec()).unwrap_or_default();
let limit = args.limit();
let offset = args.offset();

// extract types of partition columns
let table_partition_cols = self
Expand All @@ -420,18 +425,32 @@ impl TableProvider for ListingTable {
can_be_evaluated_for_partition_pruning(&table_partition_col_names, filter)
});

// We should not limit the number of partitioned files to scan if there are filters and limit
// at the same time. This is because the limit should be applied after the filters are applied.
// We should not limit/offset the number of partitioned files to scan if there are
// filters and limit/offset at the same time. This is because the limit/offset
// should be applied after the filters are applied.
let statistic_file_limit = if filters.is_empty() { limit } else { None };
let statistic_file_offset = if filters.is_empty() { offset } else { None };

let ListFilesResult {
file_groups: mut partitioned_file_lists,
statistics,
grouped_by_partition: partitioned_by_file_group,
remaining_offset,
} = self
.list_files_for_scan(state, &partition_filters, statistic_file_limit)
.list_files_for_scan(
state,
&partition_filters,
statistic_file_limit,
statistic_file_offset,
)
.await?;

let final_remaining_offset = if filters.is_empty() {
remaining_offset
} else {
offset
};

// if no files need to be read, return an `EmptyExec`
if partitioned_file_lists.is_empty() {
let projected_schema = project_schema(&self.schema(), projection.as_ref())?;
Expand Down Expand Up @@ -490,6 +509,7 @@ impl TableProvider for ListingTable {
.with_statistics(statistics)
.with_projection_indices(projection)?
.with_limit(limit)
.with_offset(final_remaining_offset)
.with_output_ordering(output_ordering)
.with_expr_adapter(self.expr_adapter_factory.clone())
.with_partitioned_by_file_group(partitioned_by_file_group)
Expand Down Expand Up @@ -605,6 +625,7 @@ impl ListingTable {
ctx: &'a dyn Session,
filters: &'a [Expr],
limit: Option<usize>,
offset: Option<usize>,
) -> datafusion_common::Result<ListFilesResult> {
let store = if let Some(url) = self.table_paths.first() {
ctx.runtime_env().object_store(url)?
Expand All @@ -613,6 +634,7 @@ impl ListingTable {
file_groups: vec![],
statistics: Statistics::new_unknown(&self.file_schema),
grouped_by_partition: false,
remaining_offset: offset,
});
};
// list files (with partitions)
Expand Down Expand Up @@ -644,8 +666,14 @@ impl ListingTable {
.boxed()
.buffer_unordered(ctx.config_options().execution.meta_fetch_concurrency);

let (file_group, inexact_stats) =
get_files_with_limit(files, limit, self.options.collect_stat).await?;
let (file_group, inexact_stats, remaining_offset) =
get_files_with_limit_and_offset(
files,
limit,
offset,
self.options.collect_stat,
)
.await?;

// Threshold: 0 = disabled, N > 0 = enabled when distinct_keys >= N
//
Expand Down Expand Up @@ -691,6 +719,7 @@ impl ListingTable {
file_groups,
statistics: stats,
grouped_by_partition,
remaining_offset,
})
}

Expand Down Expand Up @@ -735,38 +764,50 @@ impl ListingTable {

/// Processes a stream of partitioned files and returns a `FileGroup` containing the files.
///
/// This function collects files from the provided stream until either:
/// 1. The stream is exhausted
/// 2. The accumulated number of rows exceeds the provided `limit` (if specified)
/// This function collects files from the provided stream under following conditions:
/// 1. Skip files till the accumulated number of rows exceeds the provided `offset` (if specified)
/// 2. The stream is exhausted
/// 3. The accumulated number of rows exceeds the provided `limit` (if specified)
///
/// # Arguments
/// * `files` - A stream of `Result<PartitionedFile>` items to process
/// * `limit` - An optional row count limit. If provided, the function will stop collecting files
/// once the accumulated number of rows exceeds this limit
/// * `offset` - An optional row count offset. If provided, the function will not collect files
/// till the accumulated number of rows exceeds given offset
/// * `collect_stats` - Whether to collect and accumulate statistics from the files
///
/// # Returns
/// A `Result` containing a `FileGroup` with the collected files
/// and a boolean indicating whether the statistics are inexact.
/// A `Result` containing:
/// - a `FileGroup` with the collected files
/// - a boolean indicating whether the statistics are inexact.
/// - an Option<usize> indicating optional offset remaining to be skipped in the file
///
/// # Note
/// The function will continue processing files if statistics are not available or if the
/// limit is not provided. If `collect_stats` is false, statistics won't be accumulated
/// but files will still be collected.
async fn get_files_with_limit(
async fn get_files_with_limit_and_offset(
files: impl Stream<Item = datafusion_common::Result<PartitionedFile>>,
limit: Option<usize>,
offset: Option<usize>,
collect_stats: bool,
) -> datafusion_common::Result<(FileGroup, bool)> {
) -> datafusion_common::Result<(FileGroup, bool, Option<usize>)> {
let mut file_group = FileGroup::default();
// Fusing the stream allows us to call next safely even once it is finished.
let mut all_files = Box::pin(files.fuse());
#[derive(Debug)]
enum ProcessingState {
SkippingUsingOffset(usize),
ReadingFiles,
ReachedLimit,
}

let mut state = ProcessingState::ReadingFiles;
let (mut final_remaining_offset, mut state) = if let Some(o) = offset {
(Some(o), ProcessingState::SkippingUsingOffset(o))
} else {
(None, ProcessingState::ReadingFiles)
};
let mut num_rows = Precision::Absent;

while let Some(file_result) = all_files.next().await {
Expand All @@ -788,7 +829,32 @@ async fn get_files_with_limit(
};
}

// Always add the file to our group
// try to skip opening entire file by applying offset
if let ProcessingState::SkippingUsingOffset(remaining_offset) = state {
if let Precision::Exact(row_count) = num_rows {
if row_count <= remaining_offset {
state = ProcessingState::SkippingUsingOffset(
remaining_offset - row_count,
);
continue; // skip reading this file
} else {
// we have exhausted offset, lets start
// reading files now and applying limit
state = ProcessingState::ReadingFiles;
final_remaining_offset = if remaining_offset == 0 {
None
} else {
Some(remaining_offset)
};
}
} else {
// TODO(feniljain): What should happen when
// we get inexact rows?
}
}

// Add files to group after verifying
// offset(if present) is exhausted
file_group.push(file);

// Check if we've hit the limit (if one was specified)
Expand All @@ -803,5 +869,5 @@ async fn get_files_with_limit(
// in, and the statistic could have been different had we processed the
// files in a different order.
let inexact_stats = all_files.next().await.is_some();
Ok((file_group, inexact_stats))
Ok((file_group, inexact_stats, final_remaining_offset))
}
1 change: 1 addition & 0 deletions datafusion/catalog/src/async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,7 @@ mod tests {
_projection: Option<&Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
_skip: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
unimplemented!()
}
Expand Down
4 changes: 3 additions & 1 deletion datafusion/catalog/src/cte_worktable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,13 @@ impl TableProvider for CteWorkTable {
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
offset: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
let options = ScanArgs::default()
.with_projection(projection.map(|p| p.as_slice()))
.with_filters(Some(filters))
.with_limit(limit);
.with_limit(limit)
.with_offset(offset);
Ok(self.scan_with_args(state, options).await?.into_inner())
}

Expand Down
1 change: 1 addition & 0 deletions datafusion/catalog/src/default_table_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ fn preserves_table_type() {
_: Option<&Vec<usize>>,
_: &[Expr],
_: Option<usize>,
_: Option<usize>,
) -> Result<Arc<dyn datafusion_physical_plan::ExecutionPlan>, DataFusionError>
{
unimplemented!()
Expand Down
3 changes: 2 additions & 1 deletion datafusion/catalog/src/memory/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ impl MemTable {
) -> Result<Self> {
let schema = t.schema();
let constraints = t.constraints();
let exec = t.scan(state, None, &[], None).await?;
let exec = t.scan(state, None, &[], None, None).await?;
let partition_count = exec.output_partitioning().partition_count();

let mut join_set = JoinSet::new();
Expand Down Expand Up @@ -235,6 +235,7 @@ impl TableProvider for MemTable {
projection: Option<&Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
_offset: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
let mut partitions = vec![];
for arc_inner_vec in self.batches.iter() {
Expand Down
1 change: 1 addition & 0 deletions datafusion/catalog/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,7 @@ impl TableProvider for StreamTable {
projection: Option<&Vec<usize>>,
_filters: &[Expr],
limit: Option<usize>,
_offset: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
let projected_schema = match projection {
Some(p) => {
Expand Down
1 change: 1 addition & 0 deletions datafusion/catalog/src/streaming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ impl TableProvider for StreamingTable {
projection: Option<&Vec<usize>>,
_filters: &[Expr],
limit: Option<usize>,
_offset: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
let physical_sort = if !self.sort_order.is_empty() {
let df_schema = DFSchema::try_from(Arc::clone(&self.schema))?;
Expand Down
Loading