diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index cc7d534776d7e..b12dbb6ce9cb7 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -1029,7 +1029,7 @@ impl DefaultPhysicalPlanner { LogicalPlan::Subquery(_) => todo!(), LogicalPlan::SubqueryAlias(_) => children.one()?, LogicalPlan::Limit(limit) => { - let input = children.one()?; + let mut input = children.one()?; let SkipType::Literal(skip) = limit.get_skip_type()? else { return not_impl_err!( "Unsupported OFFSET expression: {:?}", diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 1f7c37315c47a..a29ff23bfdf16 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -149,6 +149,8 @@ pub struct FileScanConfig { pub file_groups: Vec, /// Table constraints pub constraints: Constraints, + /// The number of rows to skip (OFFSET) before producing output rows. + pub skip: usize, /// The maximum number of records to read from this plan. If `None`, /// all records after filtering are returned. pub limit: Option, @@ -239,6 +241,7 @@ pub struct FileScanConfig { pub struct FileScanConfigBuilder { object_store_url: ObjectStoreUrl, file_source: Arc, + skip: usize, limit: Option, constraints: Option, file_groups: Vec, @@ -268,6 +271,7 @@ impl FileScanConfigBuilder { statistics: None, output_ordering: vec![], file_compression_type: None, + skip: 0, limit: None, constraints: None, batch_size: None, @@ -283,6 +287,12 @@ impl FileScanConfigBuilder { self } + /// Set the number of rows to skip (OFFSET) before producing output rows. + pub fn with_skip(mut self, skip: usize) -> Self { + self.skip = skip; + self + } + /// Set the file source for scanning files. /// /// This method allows you to change the file source implementation (e.g. ParquetSource, CsvSource, etc.) @@ -449,6 +459,7 @@ impl FileScanConfigBuilder { let Self { object_store_url, file_source, + skip, limit, constraints, file_groups, @@ -470,6 +481,7 @@ impl FileScanConfigBuilder { FileScanConfig { object_store_url, file_source, + skip, limit, constraints, file_groups, @@ -492,6 +504,7 @@ impl From for FileScanConfigBuilder { statistics: Some(config.statistics), output_ordering: config.output_ordering, file_compression_type: Some(config.file_compression_type), + skip: config.skip, limit: config.limit, constraints: Some(config.constraints), batch_size: config.batch_size, @@ -566,6 +579,9 @@ impl DataSource for FileScanConfig { if let Some(limit) = self.limit { write!(f, ", limit={limit}")?; } + if self.skip > 0 { + write!(f, ", skip={}", self.skip)?; + } display_orderings(f, &orderings)?; @@ -737,16 +753,49 @@ impl DataSource for FileScanConfig { } fn with_fetch(&self, limit: Option) -> Option> { - let source = FileScanConfigBuilder::from(self.clone()) - .with_limit(limit) - .build(); - Some(Arc::new(source)) + self.with_limit(0, limit) + } + + fn with_limit( + &self, + skip: usize, + fetch: Option, + ) -> Option> { + // Do not push offset past scan-time filters; offset must apply after filtering. + if skip > 0 && self.file_source.filter().is_some() { + return None; + } + // Preserve partitioning and avoid over-skipping: if multiple file groups + // exist, do not push skip down. + if skip > 0 && self.file_groups.len() > 1 { + return None; + } + let mut builder = FileScanConfigBuilder::from(self.clone()); + let combined_skip = self.skip.saturating_add(skip); + builder = builder.with_skip(combined_skip); + + let requested_limit = + fetch.map(|requested| requested.saturating_add(combined_skip)); + + let limit = match (self.limit, requested_limit) { + (Some(existing), Some(requested)) => Some(existing.min(requested)), + (Some(existing), None) => Some(existing), + (None, Some(requested)) => Some(requested), + (None, None) => None, + }; + + builder = builder.with_limit(limit); + Some(Arc::new(builder.build())) } fn fetch(&self) -> Option { self.limit } + fn skip(&self) -> usize { + self.skip + } + fn metrics(&self) -> ExecutionPlanMetricsSet { self.file_source.metrics().clone() } diff --git a/datafusion/datasource/src/file_stream.rs b/datafusion/datasource/src/file_stream.rs index c8090382094ef..f8ec68dc5483c 100644 --- a/datafusion/datasource/src/file_stream.rs +++ b/datafusion/datasource/src/file_stream.rs @@ -31,6 +31,7 @@ use crate::PartitionedFile; use crate::file_scan_config::FileScanConfig; use arrow::datatypes::SchemaRef; use datafusion_common::error::Result; +use datafusion_common::stats::Precision; use datafusion_execution::RecordBatchStream; use datafusion_physical_plan::metrics::{ BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, Time, @@ -50,6 +51,8 @@ pub struct FileStream { /// The stream schema (file schema including partition columns and after /// projection). projected_schema: SchemaRef, + /// Number of rows to skip before producing output rows. + skip: usize, /// The remaining number of records to parse, None if no limit remain: Option, /// A dynamic [`FileOpener`]. Calling `open()` returns a [`FileOpenFuture`], @@ -76,11 +79,39 @@ impl FileStream { let projected_schema = config.projected_schema()?; let file_group = config.file_groups[partition].clone(); + let mut skip_remaining = config.skip; + let mut file_iter: VecDeque = VecDeque::new(); + + let mut files = file_group.into_inner().into_iter(); + while let Some(file) = files.next() { + if skip_remaining == 0 { + file_iter.push_back(file); + continue; + } + + if let Some(statistics) = file.statistics.as_deref() + && let Precision::Exact(num_rows) = statistics.num_rows + { + if num_rows <= skip_remaining { + skip_remaining -= num_rows; + continue; + } + } + + file_iter.push_back(file); + file_iter.extend(files); + break; + } + + let remain = config + .limit + .map(|limit| limit.saturating_sub(config.skip)); Ok(Self { - file_iter: file_group.into_inner().into_iter().collect(), + file_iter, projected_schema, - remain: config.limit, + skip: skip_remaining, + remain, file_opener, state: FileStreamState::Idle, file_stream_metrics: FileStreamMetrics::new(metrics, partition), @@ -172,6 +203,20 @@ impl FileStream { Some(Ok(batch)) => { self.file_stream_metrics.time_scanning_until_data.stop(); self.file_stream_metrics.time_scanning_total.stop(); + let mut batch = batch; + if self.skip > 0 { + if batch.num_rows() <= self.skip { + self.skip -= batch.num_rows(); + continue; + } else { + let to_skip = self.skip; + self.skip = 0; + batch = batch.slice( + to_skip, + batch.num_rows().saturating_sub(to_skip), + ); + } + } let batch = match &mut self.remain { Some(remain) => { if *remain > batch.num_rows() { @@ -500,6 +545,8 @@ mod tests { num_files: usize, /// Global limit of records emitted by the stream limit: Option, + /// Number of rows to skip before emitting records + skip: usize, /// Error-handling behavior of the stream on_error: OnError, /// Mock `FileOpener` @@ -523,6 +570,12 @@ mod tests { self } + /// Specify the skip (offset) + pub fn with_skip(mut self, skip: usize) -> Self { + self.skip = skip; + self + } + /// Specify the index of files in the stream which should /// throw an error when opening pub fn with_open_errors(mut self, idx: Vec) -> Self { @@ -582,6 +635,7 @@ mod tests { Arc::new(MockSource::new(table_schema)), ) .with_file_group(file_group) + .with_skip(self.skip) .with_limit(self.limit) .build(); let metrics_set = ExecutionPlanMetricsSet::new(); @@ -884,6 +938,30 @@ mod tests { Ok(()) } + #[tokio::test] + async fn with_skip_and_limit() -> Result<()> { + // Skip the first 6 rows across two files and return only the next 2 rows + let batches = FileStreamTest::new() + .with_records(vec![make_partition(3), make_partition(2)]) + .with_num_files(2) + .with_skip(6) + .with_limit(Some(8)) + .result() + .await?; + + #[rustfmt::skip] + assert_batches_eq!(&[ + "+---+", + "| i |", + "+---+", + "| 1 |", + "| 2 |", + "+---+", + ], &batches); + + Ok(()) + } + #[tokio::test] async fn with_limit_at_middle_of_batch() -> Result<()> { let batches = create_and_collect(Some(6)).await; diff --git a/datafusion/datasource/src/source.rs b/datafusion/datasource/src/source.rs index a3892dfac9778..3e796c802bfce 100644 --- a/datafusion/datasource/src/source.rs +++ b/datafusion/datasource/src/source.rs @@ -171,6 +171,23 @@ pub trait DataSource: Send + Sync + Debug { /// Return a copy of this DataSource with a new fetch limit fn with_fetch(&self, _limit: Option) -> Option>; fn fetch(&self) -> Option; + /// Return a copy of this DataSource with offset and fetch limits, if supported. + /// Default implementation supports only fetch pushdown when skip is zero. + fn with_limit( + &self, + skip: usize, + fetch: Option, + ) -> Option> { + if skip == 0 { + self.with_fetch(fetch) + } else { + None + } + } + /// Returns offset (skip) for this DataSource. Defaults to 0. + fn skip(&self) -> usize { + 0 + } fn metrics(&self) -> ExecutionPlanMetricsSet { ExecutionPlanMetricsSet::new() } @@ -321,10 +338,31 @@ impl ExecutionPlan for DataSourceExec { self.data_source.partition_statistics(partition) } + fn supports_limit_pushdown(&self) -> bool { + // Memory-based sources (e.g. VALUES) do not currently support offset + // pushdown, so keep the explicit LimitExec for correctness. + if self + .data_source + .as_any() + .downcast_ref::() + .is_some() + { + return false; + } + true + } + fn with_fetch(&self, limit: Option) -> Option> { - let data_source = self.data_source.with_fetch(limit)?; - let cache = self.cache.clone(); + self.with_limit(self.data_source.skip(), limit) + } + fn with_limit( + &self, + skip: usize, + fetch: Option, + ) -> Option> { + let data_source = self.data_source.with_limit(skip, fetch)?; + let cache = Self::compute_properties(&data_source); Some(Arc::new(Self { data_source, cache })) } @@ -332,6 +370,10 @@ impl ExecutionPlan for DataSourceExec { self.data_source.fetch() } + fn skip(&self) -> usize { + self.data_source.skip() + } + fn try_swapping_with_projection( &self, projection: &ProjectionExec, diff --git a/datafusion/physical-optimizer/src/limit_pushdown.rs b/datafusion/physical-optimizer/src/limit_pushdown.rs index 4cb3abe30bae2..d3773d5965e3f 100644 --- a/datafusion/physical-optimizer/src/limit_pushdown.rs +++ b/datafusion/physical-optimizer/src/limit_pushdown.rs @@ -192,14 +192,28 @@ pub fn pushdown_limit_helper( }; }; - let skip_and_fetch = Some(global_fetch + global_state.skip); - if pushdown_plan.supports_limit_pushdown() { if !combines_input_partitions(&pushdown_plan) { - // We have information in the global state and the plan pushes down, - // continue: - Ok((Transformed::no(pushdown_plan), global_state)) - } else if let Some(plan_with_fetch) = pushdown_plan.with_fetch(skip_and_fetch) { + if let Some(plan_with_limit) = + pushdown_plan.with_limit(global_state.skip, global_state.fetch) + { + let mut new_state = global_state; + new_state.satisfied = true; + new_state.skip = 0; + new_state.fetch = plan_with_limit.fetch().or(new_state.fetch); + Ok((Transformed::yes(plan_with_limit), new_state)) + } else { + // We have information in the global state and the plan pushes down, + // continue: + Ok((Transformed::no(pushdown_plan), global_state)) + } + } else if let Some(plan_with_fetch) = + pushdown_plan.with_limit(global_state.skip, Some(global_fetch + global_state.skip)) + .or_else(|| { + let skip_and_fetch = Some(global_fetch + global_state.skip); + pushdown_plan.with_fetch(skip_and_fetch) + }) + { // This plan is combining input partitions, so we need to add the // fetch info to plan if possible. If not, we must add a `LimitExec` // with the information from the global state. @@ -210,7 +224,7 @@ pub fn pushdown_limit_helper( new_plan = add_global_limit(new_plan, global_state.skip, global_state.fetch); } - global_state.fetch = skip_and_fetch; + global_state.fetch = Some(global_fetch + global_state.skip); global_state.skip = 0; global_state.satisfied = true; Ok((Transformed::yes(new_plan), global_state)) @@ -238,6 +252,7 @@ pub fn pushdown_limit_helper( global_state.fetch = None; global_state.skip = 0; + let skip_and_fetch = Some(global_fetch + global_skip); let maybe_fetchable = pushdown_plan.with_fetch(skip_and_fetch); if global_state.satisfied { if let Some(plan_with_fetch) = maybe_fetchable { diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index 06da0b8933c18..67f22841a4928 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -516,11 +516,31 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { None } + /// Returns a variant of this `ExecutionPlan` node that applies `skip` and + /// `fetch` limits, if supported. Defaults to `with_fetch` when `skip` is + /// zero and returns `None` otherwise. + fn with_limit( + &self, + skip: usize, + fetch: Option, + ) -> Option> { + if skip == 0 { + self.with_fetch(fetch) + } else { + None + } + } + /// Gets the fetch count for the operator, `None` means there is no fetch. fn fetch(&self) -> Option { None } + /// Gets the offset (skip) for the operator, defaults to zero. + fn skip(&self) -> usize { + 0 + } + /// Gets the effect on cardinality, if known fn cardinality_effect(&self) -> CardinalityEffect { CardinalityEffect::Unknown diff --git a/datafusion/sqllogictest/test_files/explain_tree.slt b/datafusion/sqllogictest/test_files/explain_tree.slt index 9215ce87e3bef..e15993e6f42d8 100644 --- a/datafusion/sqllogictest/test_files/explain_tree.slt +++ b/datafusion/sqllogictest/test_files/explain_tree.slt @@ -244,17 +244,11 @@ explain SELECT int_col FROM table1 LIMIT 3,2; ---- physical_plan 01)┌───────────────────────────┐ -02)│ GlobalLimitExec │ +02)│ DataSourceExec │ 03)│ -------------------- │ -04)│ limit: 2 │ -05)│ skip: 3 │ -06)└─────────────┬─────────────┘ -07)┌─────────────┴─────────────┐ -08)│ DataSourceExec │ -09)│ -------------------- │ -10)│ files: 1 │ -11)│ format: csv │ -12)└───────────────────────────┘ +04)│ files: 1 │ +05)│ format: csv │ +06)└───────────────────────────┘ query TT explain SELECT * FROM limit_table LIMIT 10; diff --git a/datafusion/sqllogictest/test_files/offset.slt b/datafusion/sqllogictest/test_files/offset.slt new file mode 100644 index 0000000000000..531c7112ad672 --- /dev/null +++ b/datafusion/sqllogictest/test_files/offset.slt @@ -0,0 +1,543 @@ +# Offset pushdown scaffolding and initial test + +statement ok +set datafusion.explain.logical_plan_only = false; + +# Use a single partition for deterministic file ordering in these tests +statement ok +set datafusion.execution.target_partitions = 1; + +# Base data used to materialize parquet files for offset tests +statement ok +CREATE TABLE offset_src ( + id INT, + part_key INT, + value INT +) AS VALUES + (1, 1, 10), + (2, 1, 11), + (3, 1, 12), + (4, 1, 13), + (5, 1, 14), + (6, 2, 15), + (7, 2, 16), + (8, 2, 17), + (9, 3, 18), + (10, 3, 19), + (11, 3, 20), + (12, 3, 21); + +# ---------------------------------------------------------------------- +# Step 1: materialize parquet files needed for the upcoming offset tests +# ---------------------------------------------------------------------- + +# Single-file dataset (exact row count, no predicate) +query I +COPY (SELECT * FROM offset_src ORDER BY id) +TO 'test_files/scratch/offset/single/part-0.parquet' +STORED AS PARQUET; +---- +12 + +statement ok +CREATE EXTERNAL TABLE offset_single +( + id INT, + part_key INT, + value INT +) +STORED AS PARQUET +LOCATION 'test_files/scratch/offset/single/'; + +# Multi-file dataset (exact row counts, split by part_key) +# File 1 (part_key = 1) +query I +COPY (SELECT * FROM offset_src WHERE part_key = 1 ORDER BY id) +TO 'test_files/scratch/offset/multi_exact/part-0.parquet' +STORED AS PARQUET; +---- +5 + +# File 2 (part_key = 2) +query I +COPY (SELECT * FROM offset_src WHERE part_key = 2 ORDER BY id) +TO 'test_files/scratch/offset/multi_exact/part-1.parquet' +STORED AS PARQUET; +---- +3 + +# File 3 (part_key = 3) +query I +COPY (SELECT * FROM offset_src WHERE part_key = 3 ORDER BY id) +TO 'test_files/scratch/offset/multi_exact/part-2.parquet' +STORED AS PARQUET; +---- +4 + +statement ok +CREATE EXTERNAL TABLE offset_multi_exact +( + id INT, + part_key INT, + value INT +) +STORED AS PARQUET +LOCATION 'test_files/scratch/offset/multi_exact/'; + +# ---------------------------------------------------------------------- +# Step 2: Test 1 — Single file, single partition, exact rows, no predicate +# Expect to skip first 3 rows and return the next 2 in order. +# ---------------------------------------------------------------------- +query II +SELECT id, value +FROM offset_single +ORDER BY id +LIMIT 2 OFFSET 3; +---- +4 13 +5 14 + +# ---------------------------------------------------------------------- +# Test 2 — Single file, single partition, exact rows, predicate present +# Predicate filters to part_key = 1 (rows id 1-5); offset and limit apply after filter. +# Expect same rows as test 1. +# ---------------------------------------------------------------------- +query II +SELECT id, value +FROM offset_single +WHERE part_key = 1 +ORDER BY id +LIMIT 2 OFFSET 3; +---- +4 13 +5 14 + +# ---------------------------------------------------------------------- +# Test 3 — Multiple files, single partition, exact rows, no predicate +# Dataset spans three files; verify cross-file offset consumption. +# Skip first 6 rows (id 1-6) and take next 3. +# ---------------------------------------------------------------------- +query II +SELECT id, value +FROM offset_multi_exact +ORDER BY id +LIMIT 3 OFFSET 6; +---- +7 16 +8 17 +9 18 + +# ---------------------------------------------------------------------- +# Test 4 — Multiple files, single partition, exact rows, predicate present +# Predicate restricts to part_key = 3 (ids 9-12); apply offset within that subset. +# Skip first 1 row and take next 2. +# ---------------------------------------------------------------------- +query II +SELECT id, value +FROM offset_multi_exact +WHERE part_key = 3 +ORDER BY id +LIMIT 2 OFFSET 1; +---- +10 19 +11 20 + +# ---------------------------------------------------------------------- +# Additional datasets for inexact-row-count scenarios (CSV, stats absent) +# ---------------------------------------------------------------------- + +# Single-file CSV (inexact stats) +query I +COPY (SELECT * FROM offset_src ORDER BY id) +TO 'test_files/scratch/offset/single_csv/part-0.csv' +STORED AS CSV; +---- +12 + +statement ok +CREATE EXTERNAL TABLE offset_single_inexact +( + id INT, + part_key INT, + value INT +) +STORED AS CSV +LOCATION 'test_files/scratch/offset/single_csv/'; + +# Multi-file CSV (inexact stats), split by part_key +query I +COPY (SELECT * FROM offset_src WHERE part_key = 1 ORDER BY id) +TO 'test_files/scratch/offset/multi_inexact/part-0.csv' +STORED AS CSV; +---- +5 + +query I +COPY (SELECT * FROM offset_src WHERE part_key = 2 ORDER BY id) +TO 'test_files/scratch/offset/multi_inexact/part-1.csv' +STORED AS CSV; +---- +3 + +query I +COPY (SELECT * FROM offset_src WHERE part_key = 3 ORDER BY id) +TO 'test_files/scratch/offset/multi_inexact/part-2.csv' +STORED AS CSV; +---- +4 + +statement ok +CREATE EXTERNAL TABLE offset_multi_inexact +( + id INT, + part_key INT, + value INT +) +STORED AS CSV +LOCATION 'test_files/scratch/offset/multi_inexact/'; + +# ---------------------------------------------------------------------- +# Test 5 — Single file, single partition, inexact rows, no predicate +# ---------------------------------------------------------------------- +query II +SELECT id, value +FROM offset_single_inexact +ORDER BY id +LIMIT 2 OFFSET 3; +---- +4 13 +5 14 + +# ---------------------------------------------------------------------- +# Test 6 — Single file, single partition, inexact rows, predicate present +# ---------------------------------------------------------------------- +query II +SELECT id, value +FROM offset_single_inexact +WHERE part_key = 1 +ORDER BY id +LIMIT 2 OFFSET 3; +---- +4 13 +5 14 + +# ---------------------------------------------------------------------- +# Test 7 — Multiple files, single partition, inexact rows, no predicate +# ---------------------------------------------------------------------- +query II +SELECT id, value +FROM offset_multi_inexact +ORDER BY id +LIMIT 3 OFFSET 6; +---- +7 16 +8 17 +9 18 + +# ---------------------------------------------------------------------- +# Test 8 — Multiple files, single partition, inexact rows, predicate present +# ---------------------------------------------------------------------- +query II +SELECT id, value +FROM offset_multi_inexact +WHERE part_key = 3 +ORDER BY id +LIMIT 2 OFFSET 1; +---- +10 19 +11 20 + +# ---------------------------------------------------------------------- +# Multi-partition scenarios: set target_partitions = 3 +# ---------------------------------------------------------------------- +statement ok +set datafusion.execution.target_partitions = 3; + +# ---------------------------------------------------------------------- +# Test 9 — Single file, multiple partitions (config), exact rows, no predicate +# ---------------------------------------------------------------------- +query II +SELECT id, value +FROM offset_single +ORDER BY id +LIMIT 2 OFFSET 3; +---- +4 13 +5 14 + +# ---------------------------------------------------------------------- +# Test 10 — Single file, multiple partitions (config), exact rows, predicate present +# ---------------------------------------------------------------------- +query II +SELECT id, value +FROM offset_single +WHERE part_key = 1 +ORDER BY id +LIMIT 2 OFFSET 3; +---- +4 13 +5 14 + +# ---------------------------------------------------------------------- +# Test 11 — Single file, multiple partitions (config), inexact rows, no predicate +# ---------------------------------------------------------------------- +query II +SELECT id, value +FROM offset_single_inexact +ORDER BY id +LIMIT 2 OFFSET 3; +---- +4 13 +5 14 + +# ---------------------------------------------------------------------- +# Test 12 — Single file, multiple partitions (config), inexact rows, predicate present +# ---------------------------------------------------------------------- +query II +SELECT id, value +FROM offset_single_inexact +WHERE part_key = 1 +ORDER BY id +LIMIT 2 OFFSET 3; +---- +4 13 +5 14 + +# ---------------------------------------------------------------------- +# Test 13 — Multiple files, multiple partitions, exact rows, no predicate +# ---------------------------------------------------------------------- +query II +SELECT id, value +FROM offset_multi_exact +ORDER BY id +LIMIT 3 OFFSET 6; +---- +7 16 +8 17 +9 18 + +# ---------------------------------------------------------------------- +# Test 14 — Multiple files, multiple partitions, exact rows, predicate present +# ---------------------------------------------------------------------- +query II +SELECT id, value +FROM offset_multi_exact +WHERE part_key = 3 +ORDER BY id +LIMIT 2 OFFSET 1; +---- +10 19 +11 20 + +# ---------------------------------------------------------------------- +# Test 15 — Multiple files, multiple partitions, inexact rows, no predicate +# ---------------------------------------------------------------------- +query II +SELECT id, value +FROM offset_multi_inexact +ORDER BY id +LIMIT 3 OFFSET 6; +---- +7 16 +8 17 +9 18 + +# ---------------------------------------------------------------------- +# Test 16 — Multiple files, multiple partitions, inexact rows, predicate present +# ---------------------------------------------------------------------- +query II +SELECT id, value +FROM offset_multi_inexact +WHERE part_key = 3 +ORDER BY id +LIMIT 2 OFFSET 1; +---- +10 19 +11 20 + +# ---------------------------------------------------------------------- +# Additional multi-file exact scenarios: whole-file skips and empty files +# ---------------------------------------------------------------------- +# Additional multi-file exact scenarios: whole-file skips and empty files +# Files with an empty first file, followed by the same splits as offset_multi_exact +query I +COPY (SELECT * FROM offset_src WHERE 1 = 0) +TO 'test_files/scratch/offset/multi_exact_empty/part-0.parquet' +STORED AS PARQUET; +---- +0 + +query I +COPY (SELECT * FROM offset_src WHERE part_key = 1 ORDER BY id) +TO 'test_files/scratch/offset/multi_exact_empty/part-1.parquet' +STORED AS PARQUET; +---- +5 + +query I +COPY (SELECT * FROM offset_src WHERE part_key = 2 ORDER BY id) +TO 'test_files/scratch/offset/multi_exact_empty/part-2.parquet' +STORED AS PARQUET; +---- +3 + +query I +COPY (SELECT * FROM offset_src WHERE part_key = 3 ORDER BY id) +TO 'test_files/scratch/offset/multi_exact_empty/part-3.parquet' +STORED AS PARQUET; +---- +4 + +statement ok +CREATE EXTERNAL TABLE offset_multi_exact_empty +( + id INT, + part_key INT, + value INT +) +STORED AS PARQUET +LOCATION 'test_files/scratch/offset/multi_exact_empty/'; + +# ---------------------------------------------------------------------- +# Test 17 — Multiple files, single partition, exact rows, no predicate +# Offset spans exactly one whole file (skip 5 rows), then take 2. +# ---------------------------------------------------------------------- +query II +SELECT id, value +FROM offset_multi_exact +ORDER BY id +LIMIT 2 OFFSET 5; +---- +6 15 +7 16 + +# ---------------------------------------------------------------------- +# Test 18 — Multiple files, single partition, exact rows, no predicate +# Offset spans two whole files (skip 8 rows), then take 2. +# ---------------------------------------------------------------------- +query II +SELECT id, value +FROM offset_multi_exact +ORDER BY id +LIMIT 2 OFFSET 8; +---- +9 18 +10 19 + +# ---------------------------------------------------------------------- +# Test 19 — Multiple files, single partition, exact rows, empty leading file +# Offset across empty leading file (skip 2, take 2). +# ---------------------------------------------------------------------- +query II +SELECT id, value +FROM offset_multi_exact_empty +ORDER BY id +LIMIT 2 OFFSET 2; +---- +3 12 +4 13 + +# ---------------------------------------------------------------------- +# Multi-row-group single-file scenarios (Parquet with small row groups) +# ---------------------------------------------------------------------- + +# Force small row groups while writing +statement ok +set datafusion.execution.parquet.max_row_group_size = 2; + +# Create a Parquet file with multiple row groups in a single file +query I +COPY (SELECT * FROM offset_src ORDER BY id) +TO 'test_files/scratch/offset/single_multi_rg/part-0.parquet' +STORED AS PARQUET; +---- +12 + +statement ok +CREATE EXTERNAL TABLE offset_single_multi_rg +( + id INT, + part_key INT, + value INT +) +STORED AS PARQUET +LOCATION 'test_files/scratch/offset/single_multi_rg/'; + +# Reset row group size to default for subsequent operations +statement ok +set datafusion.execution.parquet.max_row_group_size = 1048576; + +# Test 20 — Single file, multiple row groups, offset spans row-group boundary +# Skip first 3 rows (crossing RG boundary when RG size is 2) and take next 3. +query II +SELECT id, value +FROM offset_single_multi_rg +ORDER BY id +LIMIT 3 OFFSET 3; +---- +4 13 +5 14 +6 15 + +# ---------------------------------------------------------------------- +# Logical plan checks (no predicate) +# ---------------------------------------------------------------------- + +statement ok +set datafusion.explain.logical_plan_only = false; + +# Logical plan: single file, exact rows, no predicate +query TT +EXPLAIN select id, value from offset_single order by id limit 2 offset 3; +---- +logical_plan +01)Limit: skip=3, fetch=2 +02)--Sort: offset_single.id ASC NULLS LAST, fetch=5 +03)----TableScan: offset_single projection=[id, value] +physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/offset/single/part-0.parquet]]}, projection=[id, value], limit=5, skip=3, output_ordering=[id@0 ASC NULLS LAST], file_type=parquet + +# Logical plan: multi file, exact rows, no predicate +query TT +EXPLAIN select id, value from offset_multi_exact order by id limit 3 offset 6; +---- +logical_plan +01)Limit: skip=6, fetch=3 +02)--Sort: offset_multi_exact.id ASC NULLS LAST, fetch=9 +03)----TableScan: offset_multi_exact projection=[id, value] +physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/offset/multi_exact/part-0.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/offset/multi_exact/part-1.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/offset/multi_exact/part-2.parquet]]}, projection=[id, value], limit=9, skip=6, output_ordering=[id@0 ASC NULLS LAST], file_type=parquet + +# Logical/physical plan: large offset on single Parquet file (table scan) +query TT +EXPLAIN select * from offset_single LIMIT 1 OFFSET 50000000; +---- +logical_plan +01)Limit: skip=50000000, fetch=1 +02)--TableScan: offset_single projection=[id, part_key, value], fetch=50000001 +physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/offset/single/part-0.parquet]]}, projection=[id, part_key, value], limit=50000001, skip=50000000, output_ordering=[id@0 ASC NULLS LAST], file_type=parquet + +# ---------------------------------------------------------------------- +# Teardown: reset configs and drop created tables +# ---------------------------------------------------------------------- +statement ok +set datafusion.execution.target_partitions = 1; + +statement ok +DROP TABLE IF EXISTS offset_single_multi_rg; + +statement ok +DROP TABLE IF EXISTS offset_multi_exact_empty; + +statement ok +DROP TABLE IF EXISTS offset_multi_inexact; + +statement ok +DROP TABLE IF EXISTS offset_multi_exact; + +statement ok +DROP TABLE IF EXISTS offset_single_inexact; + +statement ok +DROP TABLE IF EXISTS offset_single; + +statement ok +DROP TABLE IF EXISTS offset_src; \ No newline at end of file