Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1029,7 +1029,7 @@ impl DefaultPhysicalPlanner {
LogicalPlan::Subquery(_) => todo!(),
LogicalPlan::SubqueryAlias(_) => children.one()?,
LogicalPlan::Limit(limit) => {
let input = children.one()?;
let mut input = children.one()?;
let SkipType::Literal(skip) = limit.get_skip_type()? else {
return not_impl_err!(
"Unsupported OFFSET expression: {:?}",
Expand Down
57 changes: 53 additions & 4 deletions datafusion/datasource/src/file_scan_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ pub struct FileScanConfig {
pub file_groups: Vec<FileGroup>,
/// Table constraints
pub constraints: Constraints,
/// The number of rows to skip (OFFSET) before producing output rows.
pub skip: usize,
/// The maximum number of records to read from this plan. If `None`,
/// all records after filtering are returned.
pub limit: Option<usize>,
Expand Down Expand Up @@ -239,6 +241,7 @@ pub struct FileScanConfig {
pub struct FileScanConfigBuilder {
object_store_url: ObjectStoreUrl,
file_source: Arc<dyn FileSource>,
skip: usize,
limit: Option<usize>,
constraints: Option<Constraints>,
file_groups: Vec<FileGroup>,
Expand Down Expand Up @@ -268,6 +271,7 @@ impl FileScanConfigBuilder {
statistics: None,
output_ordering: vec![],
file_compression_type: None,
skip: 0,
limit: None,
constraints: None,
batch_size: None,
Expand All @@ -283,6 +287,12 @@ impl FileScanConfigBuilder {
self
}

/// Set the number of rows to skip (OFFSET) before producing output rows.
pub fn with_skip(mut self, skip: usize) -> Self {
self.skip = skip;
self
}

/// Set the file source for scanning files.
///
/// This method allows you to change the file source implementation (e.g. ParquetSource, CsvSource, etc.)
Expand Down Expand Up @@ -449,6 +459,7 @@ impl FileScanConfigBuilder {
let Self {
object_store_url,
file_source,
skip,
limit,
constraints,
file_groups,
Expand All @@ -470,6 +481,7 @@ impl FileScanConfigBuilder {
FileScanConfig {
object_store_url,
file_source,
skip,
limit,
constraints,
file_groups,
Expand All @@ -492,6 +504,7 @@ impl From<FileScanConfig> for FileScanConfigBuilder {
statistics: Some(config.statistics),
output_ordering: config.output_ordering,
file_compression_type: Some(config.file_compression_type),
skip: config.skip,
limit: config.limit,
constraints: Some(config.constraints),
batch_size: config.batch_size,
Expand Down Expand Up @@ -566,6 +579,9 @@ impl DataSource for FileScanConfig {
if let Some(limit) = self.limit {
write!(f, ", limit={limit}")?;
}
if self.skip > 0 {
write!(f, ", skip={}", self.skip)?;
}

display_orderings(f, &orderings)?;

Expand Down Expand Up @@ -737,16 +753,49 @@ impl DataSource for FileScanConfig {
}

fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn DataSource>> {
let source = FileScanConfigBuilder::from(self.clone())
.with_limit(limit)
.build();
Some(Arc::new(source))
self.with_limit(0, limit)
}

fn with_limit(
&self,
skip: usize,
fetch: Option<usize>,
) -> Option<Arc<dyn DataSource>> {
// Do not push offset past scan-time filters; offset must apply after filtering.
if skip > 0 && self.file_source.filter().is_some() {
return None;
}
// Preserve partitioning and avoid over-skipping: if multiple file groups
// exist, do not push skip down.
if skip > 0 && self.file_groups.len() > 1 {
return None;
}
let mut builder = FileScanConfigBuilder::from(self.clone());
let combined_skip = self.skip.saturating_add(skip);
builder = builder.with_skip(combined_skip);

let requested_limit =
fetch.map(|requested| requested.saturating_add(combined_skip));

let limit = match (self.limit, requested_limit) {
(Some(existing), Some(requested)) => Some(existing.min(requested)),
(Some(existing), None) => Some(existing),
(None, Some(requested)) => Some(requested),
(None, None) => None,
};

builder = builder.with_limit(limit);
Some(Arc::new(builder.build()))
}

fn fetch(&self) -> Option<usize> {
self.limit
}

fn skip(&self) -> usize {
self.skip
}

fn metrics(&self) -> ExecutionPlanMetricsSet {
self.file_source.metrics().clone()
}
Expand Down
82 changes: 80 additions & 2 deletions datafusion/datasource/src/file_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use crate::PartitionedFile;
use crate::file_scan_config::FileScanConfig;
use arrow::datatypes::SchemaRef;
use datafusion_common::error::Result;
use datafusion_common::stats::Precision;
use datafusion_execution::RecordBatchStream;
use datafusion_physical_plan::metrics::{
BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, Time,
Expand All @@ -50,6 +51,8 @@ pub struct FileStream {
/// The stream schema (file schema including partition columns and after
/// projection).
projected_schema: SchemaRef,
/// Number of rows to skip before producing output rows.
skip: usize,
/// The remaining number of records to parse, None if no limit
remain: Option<usize>,
/// A dynamic [`FileOpener`]. Calling `open()` returns a [`FileOpenFuture`],
Expand All @@ -76,11 +79,39 @@ impl FileStream {
let projected_schema = config.projected_schema()?;

let file_group = config.file_groups[partition].clone();
let mut skip_remaining = config.skip;
let mut file_iter: VecDeque<PartitionedFile> = VecDeque::new();

let mut files = file_group.into_inner().into_iter();
while let Some(file) = files.next() {
if skip_remaining == 0 {
file_iter.push_back(file);
continue;
}

if let Some(statistics) = file.statistics.as_deref()
&& let Precision::Exact(num_rows) = statistics.num_rows
{
if num_rows <= skip_remaining {
skip_remaining -= num_rows;
continue;
}
}

file_iter.push_back(file);
file_iter.extend(files);
break;
}

let remain = config
.limit
.map(|limit| limit.saturating_sub(config.skip));

Ok(Self {
file_iter: file_group.into_inner().into_iter().collect(),
file_iter,
projected_schema,
remain: config.limit,
skip: skip_remaining,
remain,
file_opener,
state: FileStreamState::Idle,
file_stream_metrics: FileStreamMetrics::new(metrics, partition),
Expand Down Expand Up @@ -172,6 +203,20 @@ impl FileStream {
Some(Ok(batch)) => {
self.file_stream_metrics.time_scanning_until_data.stop();
self.file_stream_metrics.time_scanning_total.stop();
let mut batch = batch;
if self.skip > 0 {
if batch.num_rows() <= self.skip {
self.skip -= batch.num_rows();
continue;
} else {
let to_skip = self.skip;
self.skip = 0;
batch = batch.slice(
to_skip,
batch.num_rows().saturating_sub(to_skip),
);
}
}
let batch = match &mut self.remain {
Some(remain) => {
if *remain > batch.num_rows() {
Expand Down Expand Up @@ -500,6 +545,8 @@ mod tests {
num_files: usize,
/// Global limit of records emitted by the stream
limit: Option<usize>,
/// Number of rows to skip before emitting records
skip: usize,
/// Error-handling behavior of the stream
on_error: OnError,
/// Mock `FileOpener`
Expand All @@ -523,6 +570,12 @@ mod tests {
self
}

/// Specify the skip (offset)
pub fn with_skip(mut self, skip: usize) -> Self {
self.skip = skip;
self
}

/// Specify the index of files in the stream which should
/// throw an error when opening
pub fn with_open_errors(mut self, idx: Vec<usize>) -> Self {
Expand Down Expand Up @@ -582,6 +635,7 @@ mod tests {
Arc::new(MockSource::new(table_schema)),
)
.with_file_group(file_group)
.with_skip(self.skip)
.with_limit(self.limit)
.build();
let metrics_set = ExecutionPlanMetricsSet::new();
Expand Down Expand Up @@ -884,6 +938,30 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn with_skip_and_limit() -> Result<()> {
// Skip the first 6 rows across two files and return only the next 2 rows
let batches = FileStreamTest::new()
.with_records(vec![make_partition(3), make_partition(2)])
.with_num_files(2)
.with_skip(6)
.with_limit(Some(8))
.result()
.await?;

#[rustfmt::skip]
assert_batches_eq!(&[
"+---+",
"| i |",
"+---+",
"| 1 |",
"| 2 |",
"+---+",
], &batches);

Ok(())
}

#[tokio::test]
async fn with_limit_at_middle_of_batch() -> Result<()> {
let batches = create_and_collect(Some(6)).await;
Expand Down
46 changes: 44 additions & 2 deletions datafusion/datasource/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,23 @@ pub trait DataSource: Send + Sync + Debug {
/// Return a copy of this DataSource with a new fetch limit
fn with_fetch(&self, _limit: Option<usize>) -> Option<Arc<dyn DataSource>>;
fn fetch(&self) -> Option<usize>;
/// Return a copy of this DataSource with offset and fetch limits, if supported.
/// Default implementation supports only fetch pushdown when skip is zero.
fn with_limit(
&self,
skip: usize,
fetch: Option<usize>,
) -> Option<Arc<dyn DataSource>> {
if skip == 0 {
self.with_fetch(fetch)
} else {
None
}
}
/// Returns offset (skip) for this DataSource. Defaults to 0.
fn skip(&self) -> usize {
0
}
fn metrics(&self) -> ExecutionPlanMetricsSet {
ExecutionPlanMetricsSet::new()
}
Expand Down Expand Up @@ -321,17 +338,42 @@ impl ExecutionPlan for DataSourceExec {
self.data_source.partition_statistics(partition)
}

fn supports_limit_pushdown(&self) -> bool {
// Memory-based sources (e.g. VALUES) do not currently support offset
// pushdown, so keep the explicit LimitExec for correctness.
if self
.data_source
.as_any()
.downcast_ref::<crate::memory::MemorySourceConfig>()
.is_some()
{
return false;
}
true
}

fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
let data_source = self.data_source.with_fetch(limit)?;
let cache = self.cache.clone();
self.with_limit(self.data_source.skip(), limit)
}

fn with_limit(
&self,
skip: usize,
fetch: Option<usize>,
) -> Option<Arc<dyn ExecutionPlan>> {
let data_source = self.data_source.with_limit(skip, fetch)?;
let cache = Self::compute_properties(&data_source);
Some(Arc::new(Self { data_source, cache }))
}

fn fetch(&self) -> Option<usize> {
self.data_source.fetch()
}

fn skip(&self) -> usize {
self.data_source.skip()
}

fn try_swapping_with_projection(
&self,
projection: &ProjectionExec,
Expand Down
29 changes: 22 additions & 7 deletions datafusion/physical-optimizer/src/limit_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,14 +192,28 @@ pub fn pushdown_limit_helper(
};
};

let skip_and_fetch = Some(global_fetch + global_state.skip);

if pushdown_plan.supports_limit_pushdown() {
if !combines_input_partitions(&pushdown_plan) {
// We have information in the global state and the plan pushes down,
// continue:
Ok((Transformed::no(pushdown_plan), global_state))
} else if let Some(plan_with_fetch) = pushdown_plan.with_fetch(skip_and_fetch) {
if let Some(plan_with_limit) =
pushdown_plan.with_limit(global_state.skip, global_state.fetch)
{
let mut new_state = global_state;
new_state.satisfied = true;
new_state.skip = 0;
new_state.fetch = plan_with_limit.fetch().or(new_state.fetch);
Ok((Transformed::yes(plan_with_limit), new_state))
} else {
// We have information in the global state and the plan pushes down,
// continue:
Ok((Transformed::no(pushdown_plan), global_state))
}
} else if let Some(plan_with_fetch) =
pushdown_plan.with_limit(global_state.skip, Some(global_fetch + global_state.skip))
.or_else(|| {
let skip_and_fetch = Some(global_fetch + global_state.skip);
pushdown_plan.with_fetch(skip_and_fetch)
})
{
// This plan is combining input partitions, so we need to add the
// fetch info to plan if possible. If not, we must add a `LimitExec`
// with the information from the global state.
Expand All @@ -210,7 +224,7 @@ pub fn pushdown_limit_helper(
new_plan =
add_global_limit(new_plan, global_state.skip, global_state.fetch);
}
global_state.fetch = skip_and_fetch;
global_state.fetch = Some(global_fetch + global_state.skip);
global_state.skip = 0;
global_state.satisfied = true;
Ok((Transformed::yes(new_plan), global_state))
Expand Down Expand Up @@ -238,6 +252,7 @@ pub fn pushdown_limit_helper(
global_state.fetch = None;
global_state.skip = 0;

let skip_and_fetch = Some(global_fetch + global_skip);
let maybe_fetchable = pushdown_plan.with_fetch(skip_and_fetch);
if global_state.satisfied {
if let Some(plan_with_fetch) = maybe_fetchable {
Expand Down
Loading