diff --git a/datafusion-cli/src/functions.rs b/datafusion-cli/src/functions.rs index aa83fec1118ed..e64617c691f02 100644 --- a/datafusion-cli/src/functions.rs +++ b/datafusion-cli/src/functions.rs @@ -247,6 +247,7 @@ impl TableProvider for ParquetMetadataTable { projection: Option<&Vec>, _filters: &[Expr], _limit: Option, + _offset: Option, ) -> Result> { Ok(MemorySourceConfig::try_new_exec( &[vec![self.batch.clone()]], @@ -496,6 +497,7 @@ impl TableProvider for MetadataCacheTable { projection: Option<&Vec>, _filters: &[Expr], _limit: Option, + _offset: Option, ) -> Result> { Ok(MemorySourceConfig::try_new_exec( &[vec![self.batch.clone()]], @@ -614,6 +616,7 @@ impl TableProvider for StatisticsCacheTable { projection: Option<&Vec>, _filters: &[Expr], _limit: Option, + _offset: Option, ) -> Result> { Ok(MemorySourceConfig::try_new_exec( &[vec![self.batch.clone()]], @@ -729,6 +732,7 @@ impl TableProvider for ListFilesCacheTable { projection: Option<&Vec>, _filters: &[Expr], _limit: Option, + _offset: Option, ) -> Result> { Ok(MemorySourceConfig::try_new_exec( &[vec![self.batch.clone()]], diff --git a/datafusion-examples/examples/custom_data_source/custom_datasource.rs b/datafusion-examples/examples/custom_data_source/custom_datasource.rs index b276ae32cf247..bf3fff4247871 100644 --- a/datafusion-examples/examples/custom_data_source/custom_datasource.rs +++ b/datafusion-examples/examples/custom_data_source/custom_datasource.rs @@ -183,6 +183,7 @@ impl TableProvider for CustomDataSource { // filters and limit can be used here to inject some push-down operations if needed _filters: &[Expr], _limit: Option, + _offset: Option, ) -> Result> { return self.create_physical_plan(projection, self.schema()).await; } diff --git a/datafusion-examples/examples/custom_data_source/default_column_values.rs b/datafusion-examples/examples/custom_data_source/default_column_values.rs index 81d74cfbecabd..9f4d61f13b0fc 100644 --- a/datafusion-examples/examples/custom_data_source/default_column_values.rs +++ b/datafusion-examples/examples/custom_data_source/default_column_values.rs @@ -226,6 +226,7 @@ impl TableProvider for DefaultValueTableProvider { projection: Option<&Vec>, filters: &[Expr], limit: Option, + offset: Option, ) -> Result> { let schema = Arc::clone(&self.schema); let df_schema = DFSchema::try_from(schema.clone())?; @@ -260,6 +261,7 @@ impl TableProvider for DefaultValueTableProvider { ) .with_projection_indices(projection.cloned())? .with_limit(limit) + .with_offset(offset) .with_file_group(file_group) .with_expr_adapter(Some(Arc::new(DefaultValuePhysicalExprAdapterFactory) as _)); diff --git a/datafusion-examples/examples/data_io/parquet_advanced_index.rs b/datafusion-examples/examples/data_io/parquet_advanced_index.rs index 3f4ebe7a92055..882e7934a79de 100644 --- a/datafusion-examples/examples/data_io/parquet_advanced_index.rs +++ b/datafusion-examples/examples/data_io/parquet_advanced_index.rs @@ -469,6 +469,7 @@ impl TableProvider for IndexTableProvider { projection: Option<&Vec>, filters: &[Expr], limit: Option, + offset: Option, ) -> Result> { let indexed_file = &self.indexed_file; let predicate = self.filters_to_predicate(state, filters)?; @@ -502,6 +503,7 @@ impl TableProvider for IndexTableProvider { ); let file_scan_config = FileScanConfigBuilder::new(object_store_url, file_source) .with_limit(limit) + .with_offset(offset) .with_projection_indices(projection.cloned())? .with_file(partitioned_file) .build(); diff --git a/datafusion-examples/examples/data_io/parquet_embedded_index.rs b/datafusion-examples/examples/data_io/parquet_embedded_index.rs index bcaca2ed5c85b..a9445f0a4fdea 100644 --- a/datafusion-examples/examples/data_io/parquet_embedded_index.rs +++ b/datafusion-examples/examples/data_io/parquet_embedded_index.rs @@ -411,6 +411,7 @@ impl TableProvider for DistinctIndexTable { _proj: Option<&Vec>, filters: &[Expr], _limit: Option, + _offset: Option, ) -> Result> { // This example only handles filters of the form // `category = 'X'` where X is a string literal diff --git a/datafusion-examples/examples/data_io/parquet_index.rs b/datafusion-examples/examples/data_io/parquet_index.rs index e11a303f442a4..b12eb63ee389b 100644 --- a/datafusion-examples/examples/data_io/parquet_index.rs +++ b/datafusion-examples/examples/data_io/parquet_index.rs @@ -226,6 +226,7 @@ impl TableProvider for IndexTableProvider { projection: Option<&Vec>, filters: &[Expr], limit: Option, + offset: Option, ) -> Result> { let df_schema = DFSchema::try_from(self.schema())?; // convert filters like [`a = 1`, `b = 2`] to a single filter like `a = 1 AND b = 2` @@ -248,7 +249,8 @@ impl TableProvider for IndexTableProvider { let mut file_scan_config_builder = FileScanConfigBuilder::new(object_store_url, source) .with_projection_indices(projection.cloned())? - .with_limit(limit); + .with_limit(limit) + .with_offset(offset); // Transform to the format needed to pass to DataSourceExec // Create one file group per file (default to scanning them all in parallel) diff --git a/datafusion-examples/examples/data_io/remote_catalog.rs b/datafusion-examples/examples/data_io/remote_catalog.rs index 10ec26b1d5c05..4d9ec51fa64c7 100644 --- a/datafusion-examples/examples/data_io/remote_catalog.rs +++ b/datafusion-examples/examples/data_io/remote_catalog.rs @@ -242,6 +242,7 @@ impl TableProvider for RemoteTable { projection: Option<&Vec>, _filters: &[Expr], _limit: Option, + _offset: Option, ) -> Result> { // Note that `scan` is called once the plan begin execution, and thus is // async. When interacting with remote data sources, this is the place diff --git a/datafusion-examples/examples/udf/simple_udtf.rs b/datafusion-examples/examples/udf/simple_udtf.rs index 087b8ba73af5c..299496dac712d 100644 --- a/datafusion-examples/examples/udf/simple_udtf.rs +++ b/datafusion-examples/examples/udf/simple_udtf.rs @@ -101,6 +101,7 @@ impl TableProvider for LocalCsvTable { projection: Option<&Vec>, _filters: &[Expr], _limit: Option, + _offset: Option, ) -> Result> { let batches = if let Some(max_return_lines) = self.limit { // get max return rows from self.batches diff --git a/datafusion/catalog-listing/src/table.rs b/datafusion/catalog-listing/src/table.rs index 9fb2dd2dce29c..c88c329f88241 100644 --- a/datafusion/catalog-listing/src/table.rs +++ b/datafusion/catalog-listing/src/table.rs @@ -59,6 +59,8 @@ pub struct ListFilesResult { pub statistics: Statistics, /// Whether files are grouped by partition values (enables Hash partitioning). pub grouped_by_partition: bool, + /// Offset remaining after pruning whole files i.e. offset remaining within a file + pub remaining_offset: Option, } /// Built in [`TableProvider`] that reads data from one or more files as a single table. @@ -383,11 +385,13 @@ impl TableProvider for ListingTable { projection: Option<&Vec>, filters: &[Expr], limit: Option, + offset: Option, ) -> datafusion_common::Result> { let options = ScanArgs::default() .with_projection(projection.map(|p| p.as_slice())) .with_filters(Some(filters)) - .with_limit(limit); + .with_limit(limit) + .with_offset(offset); Ok(self.scan_with_args(state, options).await?.into_inner()) } @@ -399,6 +403,7 @@ impl TableProvider for ListingTable { let projection = args.projection().map(|p| p.to_vec()); let filters = args.filters().map(|f| f.to_vec()).unwrap_or_default(); let limit = args.limit(); + let offset = args.offset(); // extract types of partition columns let table_partition_cols = self @@ -420,18 +425,32 @@ impl TableProvider for ListingTable { can_be_evaluated_for_partition_pruning(&table_partition_col_names, filter) }); - // We should not limit the number of partitioned files to scan if there are filters and limit - // at the same time. This is because the limit should be applied after the filters are applied. + // We should not limit/offset the number of partitioned files to scan if there are + // filters and limit/offset at the same time. This is because the limit/offset + // should be applied after the filters are applied. let statistic_file_limit = if filters.is_empty() { limit } else { None }; + let statistic_file_offset = if filters.is_empty() { offset } else { None }; let ListFilesResult { file_groups: mut partitioned_file_lists, statistics, grouped_by_partition: partitioned_by_file_group, + remaining_offset, } = self - .list_files_for_scan(state, &partition_filters, statistic_file_limit) + .list_files_for_scan( + state, + &partition_filters, + statistic_file_limit, + statistic_file_offset, + ) .await?; + let final_remaining_offset = if filters.is_empty() { + remaining_offset + } else { + offset + }; + // if no files need to be read, return an `EmptyExec` if partitioned_file_lists.is_empty() { let projected_schema = project_schema(&self.schema(), projection.as_ref())?; @@ -490,6 +509,7 @@ impl TableProvider for ListingTable { .with_statistics(statistics) .with_projection_indices(projection)? .with_limit(limit) + .with_offset(final_remaining_offset) .with_output_ordering(output_ordering) .with_expr_adapter(self.expr_adapter_factory.clone()) .with_partitioned_by_file_group(partitioned_by_file_group) @@ -605,6 +625,7 @@ impl ListingTable { ctx: &'a dyn Session, filters: &'a [Expr], limit: Option, + offset: Option, ) -> datafusion_common::Result { let store = if let Some(url) = self.table_paths.first() { ctx.runtime_env().object_store(url)? @@ -613,6 +634,7 @@ impl ListingTable { file_groups: vec![], statistics: Statistics::new_unknown(&self.file_schema), grouped_by_partition: false, + remaining_offset: offset, }); }; // list files (with partitions) @@ -644,8 +666,14 @@ impl ListingTable { .boxed() .buffer_unordered(ctx.config_options().execution.meta_fetch_concurrency); - let (file_group, inexact_stats) = - get_files_with_limit(files, limit, self.options.collect_stat).await?; + let (file_group, inexact_stats, remaining_offset) = + get_files_with_limit_and_offset( + files, + limit, + offset, + self.options.collect_stat, + ) + .await?; // Threshold: 0 = disabled, N > 0 = enabled when distinct_keys >= N // @@ -691,6 +719,7 @@ impl ListingTable { file_groups, statistics: stats, grouped_by_partition, + remaining_offset, }) } @@ -735,38 +764,50 @@ impl ListingTable { /// Processes a stream of partitioned files and returns a `FileGroup` containing the files. /// -/// This function collects files from the provided stream until either: -/// 1. The stream is exhausted -/// 2. The accumulated number of rows exceeds the provided `limit` (if specified) +/// This function collects files from the provided stream under following conditions: +/// 1. Skip files till the accumulated number of rows exceeds the provided `offset` (if specified) +/// 2. The stream is exhausted +/// 3. The accumulated number of rows exceeds the provided `limit` (if specified) /// /// # Arguments /// * `files` - A stream of `Result` items to process /// * `limit` - An optional row count limit. If provided, the function will stop collecting files /// once the accumulated number of rows exceeds this limit +/// * `offset` - An optional row count offset. If provided, the function will not collect files +/// till the accumulated number of rows exceeds given offset /// * `collect_stats` - Whether to collect and accumulate statistics from the files /// /// # Returns -/// A `Result` containing a `FileGroup` with the collected files -/// and a boolean indicating whether the statistics are inexact. +/// A `Result` containing: +/// - a `FileGroup` with the collected files +/// - a boolean indicating whether the statistics are inexact. +/// - an Option indicating optional offset remaining to be skipped in the file /// /// # Note /// The function will continue processing files if statistics are not available or if the /// limit is not provided. If `collect_stats` is false, statistics won't be accumulated /// but files will still be collected. -async fn get_files_with_limit( +async fn get_files_with_limit_and_offset( files: impl Stream>, limit: Option, + offset: Option, collect_stats: bool, -) -> datafusion_common::Result<(FileGroup, bool)> { +) -> datafusion_common::Result<(FileGroup, bool, Option)> { let mut file_group = FileGroup::default(); // Fusing the stream allows us to call next safely even once it is finished. let mut all_files = Box::pin(files.fuse()); + #[derive(Debug)] enum ProcessingState { + SkippingUsingOffset(usize), ReadingFiles, ReachedLimit, } - let mut state = ProcessingState::ReadingFiles; + let (mut final_remaining_offset, mut state) = if let Some(o) = offset { + (Some(o), ProcessingState::SkippingUsingOffset(o)) + } else { + (None, ProcessingState::ReadingFiles) + }; let mut num_rows = Precision::Absent; while let Some(file_result) = all_files.next().await { @@ -788,7 +829,32 @@ async fn get_files_with_limit( }; } - // Always add the file to our group + // try to skip opening entire file by applying offset + if let ProcessingState::SkippingUsingOffset(remaining_offset) = state { + if let Precision::Exact(row_count) = num_rows { + if row_count <= remaining_offset { + state = ProcessingState::SkippingUsingOffset( + remaining_offset - row_count, + ); + continue; // skip reading this file + } else { + // we have exhausted offset, lets start + // reading files now and applying limit + state = ProcessingState::ReadingFiles; + final_remaining_offset = if remaining_offset == 0 { + None + } else { + Some(remaining_offset) + }; + } + } else { + // TODO(feniljain): What should happen when + // we get inexact rows? + } + } + + // Add files to group after verifying + // offset(if present) is exhausted file_group.push(file); // Check if we've hit the limit (if one was specified) @@ -803,5 +869,5 @@ async fn get_files_with_limit( // in, and the statistic could have been different had we processed the // files in a different order. let inexact_stats = all_files.next().await.is_some(); - Ok((file_group, inexact_stats)) + Ok((file_group, inexact_stats, final_remaining_offset)) } diff --git a/datafusion/catalog/src/async.rs b/datafusion/catalog/src/async.rs index 1b8039d828fdb..b91bbee0d07cb 100644 --- a/datafusion/catalog/src/async.rs +++ b/datafusion/catalog/src/async.rs @@ -466,6 +466,7 @@ mod tests { _projection: Option<&Vec>, _filters: &[Expr], _limit: Option, + _skip: Option, ) -> Result> { unimplemented!() } diff --git a/datafusion/catalog/src/cte_worktable.rs b/datafusion/catalog/src/cte_worktable.rs index 9565dcc60141e..eeb9bcb30ae2c 100644 --- a/datafusion/catalog/src/cte_worktable.rs +++ b/datafusion/catalog/src/cte_worktable.rs @@ -87,11 +87,13 @@ impl TableProvider for CteWorkTable { projection: Option<&Vec>, filters: &[Expr], limit: Option, + offset: Option, ) -> Result> { let options = ScanArgs::default() .with_projection(projection.map(|p| p.as_slice())) .with_filters(Some(filters)) - .with_limit(limit); + .with_limit(limit) + .with_offset(offset); Ok(self.scan_with_args(state, options).await?.into_inner()) } diff --git a/datafusion/catalog/src/default_table_source.rs b/datafusion/catalog/src/default_table_source.rs index fb6531ba0b2ee..b8c374ca9ed89 100644 --- a/datafusion/catalog/src/default_table_source.rs +++ b/datafusion/catalog/src/default_table_source.rs @@ -135,6 +135,7 @@ fn preserves_table_type() { _: Option<&Vec>, _: &[Expr], _: Option, + _: Option, ) -> Result, DataFusionError> { unimplemented!() diff --git a/datafusion/catalog/src/memory/table.rs b/datafusion/catalog/src/memory/table.rs index 7865eb016bee1..e20766733a727 100644 --- a/datafusion/catalog/src/memory/table.rs +++ b/datafusion/catalog/src/memory/table.rs @@ -147,7 +147,7 @@ impl MemTable { ) -> Result { let schema = t.schema(); let constraints = t.constraints(); - let exec = t.scan(state, None, &[], None).await?; + let exec = t.scan(state, None, &[], None, None).await?; let partition_count = exec.output_partitioning().partition_count(); let mut join_set = JoinSet::new(); @@ -235,6 +235,7 @@ impl TableProvider for MemTable { projection: Option<&Vec>, _filters: &[Expr], _limit: Option, + _offset: Option, ) -> Result> { let mut partitions = vec![]; for arc_inner_vec in self.batches.iter() { diff --git a/datafusion/catalog/src/stream.rs b/datafusion/catalog/src/stream.rs index bdd72a1b1d70b..35d532ebb3306 100644 --- a/datafusion/catalog/src/stream.rs +++ b/datafusion/catalog/src/stream.rs @@ -325,6 +325,7 @@ impl TableProvider for StreamTable { projection: Option<&Vec>, _filters: &[Expr], limit: Option, + _offset: Option, ) -> Result> { let projected_schema = match projection { Some(p) => { diff --git a/datafusion/catalog/src/streaming.rs b/datafusion/catalog/src/streaming.rs index 31669171b291a..ddc3cfee5d1bf 100644 --- a/datafusion/catalog/src/streaming.rs +++ b/datafusion/catalog/src/streaming.rs @@ -100,6 +100,7 @@ impl TableProvider for StreamingTable { projection: Option<&Vec>, _filters: &[Expr], limit: Option, + _offset: Option, ) -> Result> { let physical_sort = if !self.sort_order.is_empty() { let df_schema = DFSchema::try_from(Arc::clone(&self.schema))?; diff --git a/datafusion/catalog/src/table.rs b/datafusion/catalog/src/table.rs index 1f223852c2b9d..ddaa0b8580123 100644 --- a/datafusion/catalog/src/table.rs +++ b/datafusion/catalog/src/table.rs @@ -169,6 +169,7 @@ pub trait TableProvider: Debug + Sync + Send { projection: Option<&Vec>, filters: &[Expr], limit: Option, + offset: Option, ) -> Result>; /// Create an [`ExecutionPlan`] for scanning the table using structured arguments. @@ -196,8 +197,9 @@ pub trait TableProvider: Debug + Sync + Send { let filters = args.filters().unwrap_or(&[]); let projection = args.projection().map(|p| p.to_vec()); let limit = args.limit(); + let offset = args.offset(); let plan = self - .scan(state, projection.as_ref(), filters, limit) + .scan(state, projection.as_ref(), filters, limit, offset) .await?; Ok(plan.into()) } @@ -361,6 +363,7 @@ pub struct ScanArgs<'a> { filters: Option<&'a [Expr]>, projection: Option<&'a [usize]>, limit: Option, + offset: Option, } impl<'a> ScanArgs<'a> { @@ -422,6 +425,25 @@ impl<'a> ScanArgs<'a> { pub fn limit(&self) -> Option { self.limit } + + /// Set the number of rows to skip before limit is applied the scan. + /// + /// If specified, the scan would skip first N rows and then start reading. + /// It is used to optimize queries with `OFFSET` clauses. + /// + /// # Arguments + /// * `offset` - Optional rows to skip + pub fn with_offset(mut self, offset: Option) -> Self { + self.offset = offset; + self + } + + /// Get the number of rows to skip before limit is applied the scan. + /// + /// Returns the offset count, or `None` if no offset was specified. + pub fn offset(&self) -> Option { + self.offset + } } /// Result of a table scan operation from [`TableProvider::scan_with_args`]. diff --git a/datafusion/catalog/src/view.rs b/datafusion/catalog/src/view.rs index 54c54431a5913..04128b7ee19e7 100644 --- a/datafusion/catalog/src/view.rs +++ b/datafusion/catalog/src/view.rs @@ -116,6 +116,7 @@ impl TableProvider for ViewTable { projection: Option<&Vec>, filters: &[Expr], limit: Option, + offset: Option, ) -> Result> { let filter = filters.iter().cloned().reduce(|acc, new| acc.and(new)); let plan = self.logical_plan().clone(); @@ -147,7 +148,7 @@ impl TableProvider for ViewTable { }; if let Some(limit) = limit { - plan = plan.limit(0, Some(limit))?; + plan = plan.limit(offset.unwrap_or(0), Some(limit))?; } state.create_physical_plan(&plan.build()?).await diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index fe760760eef3f..fa40a6378ff27 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -2592,6 +2592,7 @@ impl TableProvider for DataFrameTableProvider { projection: Option<&Vec>, filters: &[Expr], limit: Option, + offset: Option, ) -> Result> { let mut expr = LogicalPlanBuilder::from(self.plan.clone()); // Add filter when given @@ -2606,7 +2607,7 @@ impl TableProvider for DataFrameTableProvider { // add a limit if given if let Some(l) = limit { - expr = expr.limit(0, Some(l))? + expr = expr.limit(offset.unwrap_or(0), Some(l))? } let plan = expr.build()?; state.create_physical_plan(&plan).await diff --git a/datafusion/core/src/datasource/empty.rs b/datafusion/core/src/datasource/empty.rs index 5aeca92b1626d..f01485e66dd28 100644 --- a/datafusion/core/src/datasource/empty.rs +++ b/datafusion/core/src/datasource/empty.rs @@ -75,6 +75,7 @@ impl TableProvider for EmptyTable { projection: Option<&Vec>, _filters: &[Expr], _limit: Option, + _offset: Option, ) -> Result> { // even though there is no data, projections apply let projected_schema = project_schema(&self.schema, projection)?; diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 93d77e10ba23c..df78041876dae 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -236,7 +236,7 @@ mod tests { let table = load_table(&ctx, "alltypes_plain.parquet").await?; let projection = None; let exec = table - .scan(&ctx.state(), projection, &[], None) + .scan(&ctx.state(), projection, &[], None, None) .await .expect("Scan table"); @@ -399,7 +399,7 @@ mod tests { let filter = Expr::not_eq(col("p1"), lit("v1")); let scan = table - .scan(&ctx.state(), None, &[filter], None) + .scan(&ctx.state(), None, &[filter], None, None) .await .expect("Empty execution plan"); @@ -452,7 +452,9 @@ mod tests { let table = ListingTable::try_new(config)?; - let result = table.list_files_for_scan(&ctx.state(), &[], None).await?; + let result = table + .list_files_for_scan(&ctx.state(), &[], None, None) + .await?; assert_eq!(result.file_groups.len(), output_partitioning); @@ -487,7 +489,9 @@ mod tests { let table = ListingTable::try_new(config)?; - let result = table.list_files_for_scan(&ctx.state(), &[], None).await?; + let result = table + .list_files_for_scan(&ctx.state(), &[], None, None) + .await?; assert_eq!(result.file_groups.len(), output_partitioning); @@ -537,7 +541,9 @@ mod tests { let table = ListingTable::try_new(config)?; - let result = table.list_files_for_scan(&ctx.state(), &[], None).await?; + let result = table + .list_files_for_scan(&ctx.state(), &[], None, None) + .await?; assert_eq!(result.file_groups.len(), output_partitioning); @@ -1309,7 +1315,9 @@ mod tests { let table = ListingTable::try_new(config)?; - let result = table.list_files_for_scan(&ctx.state(), &[], None).await?; + let result = table + .list_files_for_scan(&ctx.state(), &[], None, None) + .await?; assert_eq!(result.file_groups.len(), 1); let files = result.file_groups[0].clone(); @@ -1351,7 +1359,7 @@ mod tests { let table_default = ListingTable::try_new(config_default)?; - let exec_default = table_default.scan(&state, None, &[], None).await?; + let exec_default = table_default.scan(&state, None, &[], None, None).await?; assert_eq!( exec_default.partition_statistics(None)?.num_rows, Precision::Absent @@ -1372,7 +1380,7 @@ mod tests { .with_schema(schema_disabled); let table_disabled = ListingTable::try_new(config_disabled)?; - let exec_disabled = table_disabled.scan(&state, None, &[], None).await?; + let exec_disabled = table_disabled.scan(&state, None, &[], None, None).await?; assert_eq!( exec_disabled.partition_statistics(None)?.num_rows, Precision::Absent @@ -1391,7 +1399,7 @@ mod tests { .with_schema(schema_enabled); let table_enabled = ListingTable::try_new(config_enabled)?; - let exec_enabled = table_enabled.scan(&state, None, &[], None).await?; + let exec_enabled = table_enabled.scan(&state, None, &[], None, None).await?; assert_eq!( exec_enabled.partition_statistics(None)?.num_rows, Precision::Exact(8) @@ -1472,11 +1480,13 @@ mod tests { let table = ListingTable::try_new(config)?; // The scan should work correctly - let scan_result = table.scan(&ctx.state(), None, &[], None).await; + let scan_result = table.scan(&ctx.state(), None, &[], None, None).await; assert!(scan_result.is_ok(), "Scan should succeed"); // Verify file listing works - let result = table.list_files_for_scan(&ctx.state(), &[], None).await?; + let result = table + .list_files_for_scan(&ctx.state(), &[], None, None) + .await?; assert!( !result.file_groups.is_empty(), "Should list files successfully" diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs index 3ca388af0c4c1..6c4eac9152940 100644 --- a/datafusion/core/src/datasource/listing_table_factory.rs +++ b/datafusion/core/src/datasource/listing_table_factory.rs @@ -195,7 +195,11 @@ impl TableProviderFactory for ListingTableFactory { if session_state.config().collect_statistics() { let filters = &[]; let limit = None; - if let Err(e) = table.list_files_for_scan(state, filters, limit).await { + let offset = None; + if let Err(e) = table + .list_files_for_scan(state, filters, limit, offset) + .await + { log::warn!("Failed to pre-warm statistics cache: {e}"); } } diff --git a/datafusion/core/src/datasource/memory_test.rs b/datafusion/core/src/datasource/memory_test.rs index c7721cafb02ea..ae8498854416a 100644 --- a/datafusion/core/src/datasource/memory_test.rs +++ b/datafusion/core/src/datasource/memory_test.rs @@ -60,7 +60,7 @@ mod tests { // scan with projection let exec = provider - .scan(&session_ctx.state(), Some(&vec![2, 1]), &[], None) + .scan(&session_ctx.state(), Some(&vec![2, 1]), &[], None, None) .await?; let mut it = exec.execute(0, task_ctx)?; @@ -94,7 +94,7 @@ mod tests { let provider = MemTable::try_new(schema, vec![vec![batch]])?; - let exec = provider.scan(&session_ctx.state(), None, &[], None).await?; + let exec = provider.scan(&session_ctx.state(), None, &[], None, None).await?; let mut it = exec.execute(0, task_ctx)?; let batch1 = it.next().await.unwrap()?; assert_eq!(3, batch1.schema().fields().len()); @@ -127,7 +127,7 @@ mod tests { let projection: Vec = vec![0, 4]; match provider - .scan(&session_ctx.state(), Some(&projection), &[], None) + .scan(&session_ctx.state(), Some(&projection), &[], None, None) .await { Err(DataFusionError::ArrowError(err, _)) => match err.as_ref() { @@ -254,7 +254,7 @@ mod tests { let provider = MemTable::try_new(Arc::new(merged_schema), vec![vec![batch1, batch2]])?; - let exec = provider.scan(&session_ctx.state(), None, &[], None).await?; + let exec = provider.scan(&session_ctx.state(), None, &[], None, None).await?; let mut it = exec.execute(0, task_ctx)?; let batch1 = it.next().await.unwrap()?; assert_eq!(3, batch1.schema().fields().len()); diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index cc7d534776d7e..627f7cbd7ba56 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -460,6 +460,7 @@ impl DefaultPhysicalPlanner { projection, filters, fetch, + skip, .. }) => { let source = source_as_provider(source)?; @@ -471,7 +472,8 @@ impl DefaultPhysicalPlanner { let opts = ScanArgs::default() .with_projection(projection.as_deref()) .with_filters(Some(&filters_vec)) - .with_limit(*fetch); + .with_limit(*fetch) + .with_offset(*skip); let res = source.scan_with_args(session_state, opts).await?; Arc::clone(res.plan()) } @@ -1011,6 +1013,7 @@ impl DefaultPhysicalPlanner { LogicalPlan::Sort(Sort { expr, input, fetch, .. }) => { + // TODO(feniljain): pass skip here let physical_input = children.one()?; let input_dfschema = input.as_ref().schema(); let sort_exprs = create_physical_sort_exprs( @@ -4166,6 +4169,7 @@ digraph { _projection: Option<&Vec>, _filters: &[Expr], _limit: Option, + _offset: Option, ) -> Result> { Ok(Arc::new(NoOpExecutionPlan::new(Arc::clone( &self.physical_schema, diff --git a/datafusion/core/src/test_util/mod.rs b/datafusion/core/src/test_util/mod.rs index 466ee38a426fd..bc79ee3939b7e 100644 --- a/datafusion/core/src/test_util/mod.rs +++ b/datafusion/core/src/test_util/mod.rs @@ -226,6 +226,7 @@ impl TableProvider for TestTableProvider { _projection: Option<&Vec>, _filters: &[Expr], _limit: Option, + _offset: Option, ) -> Result> { unimplemented!("TestTableProvider is a stub for testing.") } diff --git a/datafusion/core/tests/custom_sources_cases/dml_planning.rs b/datafusion/core/tests/custom_sources_cases/dml_planning.rs index 84cf97710a902..b8ce54c81c1e0 100644 --- a/datafusion/core/tests/custom_sources_cases/dml_planning.rs +++ b/datafusion/core/tests/custom_sources_cases/dml_planning.rs @@ -77,6 +77,7 @@ impl TableProvider for CaptureDeleteProvider { _projection: Option<&Vec>, _filters: &[Expr], _limit: Option, + _offset: Option, ) -> Result> { Ok(Arc::new(EmptyExec::new(Arc::clone(&self.schema)))) } @@ -147,6 +148,7 @@ impl TableProvider for CaptureUpdateProvider { _projection: Option<&Vec>, _filters: &[Expr], _limit: Option, + _offset: Option, ) -> Result> { Ok(Arc::new(EmptyExec::new(Arc::clone(&self.schema)))) } diff --git a/datafusion/core/tests/custom_sources_cases/mod.rs b/datafusion/core/tests/custom_sources_cases/mod.rs index 8453615c2886b..efa93bb1ad19f 100644 --- a/datafusion/core/tests/custom_sources_cases/mod.rs +++ b/datafusion/core/tests/custom_sources_cases/mod.rs @@ -232,6 +232,7 @@ impl TableProvider for CustomTableProvider { projection: Option<&Vec>, _filters: &[Expr], _limit: Option, + _offset: Option, ) -> Result> { Ok(Arc::new(CustomExecutionPlan::new(projection.cloned()))) } diff --git a/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs b/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs index ca1eaa1f958ea..3cdad818aa467 100644 --- a/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs +++ b/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs @@ -182,7 +182,8 @@ impl TableProvider for CustomProvider { _state: &dyn Session, projection: Option<&Vec>, filters: &[Expr], - _: Option, + _limit: Option, + _offset: Option, ) -> Result> { let empty = Vec::new(); let projection = projection.unwrap_or(&empty); diff --git a/datafusion/core/tests/custom_sources_cases/statistics.rs b/datafusion/core/tests/custom_sources_cases/statistics.rs index 820c2a470b376..f5a50350f5b30 100644 --- a/datafusion/core/tests/custom_sources_cases/statistics.rs +++ b/datafusion/core/tests/custom_sources_cases/statistics.rs @@ -95,6 +95,7 @@ impl TableProvider for StatisticsValidation { filters: &[Expr], // limit is ignored because it is not mandatory for a `TableProvider` to honor it _limit: Option, + _offset: Option, ) -> Result> { // Filters should not be pushed down as they are marked as unsupported by default. assert_eq!( diff --git a/datafusion/core/tests/memory_limit/mod.rs b/datafusion/core/tests/memory_limit/mod.rs index c28d23ba0602b..c4875ec744608 100644 --- a/datafusion/core/tests/memory_limit/mod.rs +++ b/datafusion/core/tests/memory_limit/mod.rs @@ -1156,6 +1156,7 @@ impl TableProvider for SortedTableProvider { projection: Option<&Vec>, _filters: &[Expr], _limit: Option, + _offset: Option, ) -> Result> { let mem_conf = MemorySourceConfig::try_new( &self.batches, diff --git a/datafusion/core/tests/parquet/file_statistics.rs b/datafusion/core/tests/parquet/file_statistics.rs index fdefdafa00aa4..32970cdd98bb1 100644 --- a/datafusion/core/tests/parquet/file_statistics.rs +++ b/datafusion/core/tests/parquet/file_statistics.rs @@ -59,7 +59,7 @@ async fn check_stats_precision_with_filter_pushdown() { options.execution.parquet.pushdown_filters = true; // Scan without filter, stats are exact - let exec = table.scan(&state, None, &[], None).await.unwrap(); + let exec = table.scan(&state, None, &[], None, None).await.unwrap(); assert_eq!( exec.partition_statistics(None).unwrap().num_rows, Precision::Exact(8), @@ -71,7 +71,7 @@ async fn check_stats_precision_with_filter_pushdown() { // source operator after the appropriate optimizer pass. let filter_expr = Expr::gt(col("id"), lit(1)); let exec_with_filter = table - .scan(&state, None, std::slice::from_ref(&filter_expr), None) + .scan(&state, None, std::slice::from_ref(&filter_expr), None, None) .await .unwrap(); @@ -118,7 +118,7 @@ async fn load_table_stats_with_session_level_cache() { //Session 1 first time list files assert_eq!(get_static_cache_size(&state1), 0); - let exec1 = table1.scan(&state1, None, &[], None).await.unwrap(); + let exec1 = table1.scan(&state1, None, &[], None, None).await.unwrap(); assert_eq!( exec1.partition_statistics(None).unwrap().num_rows, @@ -135,7 +135,7 @@ async fn load_table_stats_with_session_level_cache() { //Session 2 first time list files //check session 1 cache result not show in session 2 assert_eq!(get_static_cache_size(&state2), 0); - let exec2 = table2.scan(&state2, None, &[], None).await.unwrap(); + let exec2 = table2.scan(&state2, None, &[], None, None).await.unwrap(); assert_eq!( exec2.partition_statistics(None).unwrap().num_rows, Precision::Exact(8) @@ -150,7 +150,7 @@ async fn load_table_stats_with_session_level_cache() { //Session 1 second time list files //check session 1 cache result not show in session 2 assert_eq!(get_static_cache_size(&state1), 1); - let exec3 = table1.scan(&state1, None, &[], None).await.unwrap(); + let exec3 = table1.scan(&state1, None, &[], None, None).await.unwrap(); assert_eq!( exec3.partition_statistics(None).unwrap().num_rows, Precision::Exact(8) @@ -195,7 +195,7 @@ async fn list_files_with_session_level_cache() { //Session 1 first time list files assert_eq!(get_list_file_cache_size(&state1), 0); - let exec1 = table1.scan(&state1, None, &[], None).await.unwrap(); + let exec1 = table1.scan(&state1, None, &[], None, None).await.unwrap(); let data_source_exec = exec1.as_any().downcast_ref::().unwrap(); let data_source = data_source_exec.data_source(); let parquet1 = data_source @@ -211,7 +211,7 @@ async fn list_files_with_session_level_cache() { //Session 2 first time list files //check session 1 cache result not show in session 2 assert_eq!(get_list_file_cache_size(&state2), 0); - let exec2 = table2.scan(&state2, None, &[], None).await.unwrap(); + let exec2 = table2.scan(&state2, None, &[], None, None).await.unwrap(); let data_source_exec = exec2.as_any().downcast_ref::().unwrap(); let data_source = data_source_exec.data_source(); let parquet2 = data_source @@ -227,7 +227,7 @@ async fn list_files_with_session_level_cache() { //Session 1 second time list files //check session 1 cache result not show in session 2 assert_eq!(get_list_file_cache_size(&state1), 1); - let exec3 = table1.scan(&state1, None, &[], None).await.unwrap(); + let exec3 = table1.scan(&state1, None, &[], None, None).await.unwrap(); let data_source_exec = exec3.as_any().downcast_ref::().unwrap(); let data_source = data_source_exec.data_source(); let parquet3 = data_source diff --git a/datafusion/core/tests/physical_optimizer/partition_statistics.rs b/datafusion/core/tests/physical_optimizer/partition_statistics.rs index b33305c23ede2..d4c4c842d9c48 100644 --- a/datafusion/core/tests/physical_optimizer/partition_statistics.rs +++ b/datafusion/core/tests/physical_optimizer/partition_statistics.rs @@ -108,7 +108,7 @@ mod test { .unwrap() .clone(); listing_table - .scan(&ctx.state(), None, &[], None) + .scan(&ctx.state(), None, &[], None, None) .await .unwrap() } diff --git a/datafusion/core/tests/user_defined/insert_operation.rs b/datafusion/core/tests/user_defined/insert_operation.rs index 7ad00dece1b24..ce17afaf7cb46 100644 --- a/datafusion/core/tests/user_defined/insert_operation.rs +++ b/datafusion/core/tests/user_defined/insert_operation.rs @@ -105,6 +105,7 @@ impl TableProvider for TestInsertTableProvider { _projection: Option<&Vec>, _filters: &[Expr], _limit: Option, + _offset: Option, ) -> Result> { unimplemented!("TestInsertTableProvider is a stub for testing.") } diff --git a/datafusion/core/tests/user_defined/user_defined_table_functions.rs b/datafusion/core/tests/user_defined/user_defined_table_functions.rs index 8be8609c62480..3b5df8ea273cd 100644 --- a/datafusion/core/tests/user_defined/user_defined_table_functions.rs +++ b/datafusion/core/tests/user_defined/user_defined_table_functions.rs @@ -136,6 +136,7 @@ impl TableProvider for SimpleCsvTable { projection: Option<&Vec>, _filters: &[Expr], _limit: Option, + _offset: Option, ) -> Result> { let batches = if !self.exprs.is_empty() { let max_return_lines = self.interpreter_expr(state).await?; diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 83bdf79c8fcc0..4c66d0074ff43 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -348,6 +348,13 @@ impl FileOpener for ParquetOpener { ArrowReaderMetadata::load_async(&mut async_file_reader, options.clone()) .await?; + // TODO(feniljain): + // - use remaining_offset mechanism to tell arrow-rs to skip + // decoding rows + // if reader_metadata.metadata().file_metadata().num_rows() < self.offset { + // // skip reading the file + // } + // Note about schemas: we are actually dealing with **3 different schemas** here: // - The table schema as defined by the TableProvider. // This is what the user sees, what they get when they `SELECT * FROM table`, etc. diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 1f7c37315c47a..df56c6ab2c3da 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -152,6 +152,9 @@ pub struct FileScanConfig { /// The maximum number of records to read from this plan. If `None`, /// all records after filtering are returned. pub limit: Option, + /// Number of records to skip before starting to read `limit` rows. If `None, + /// records are read from the start. + pub offset: Option, /// All equivalent lexicographical orderings that describe the schema. pub output_ordering: Vec, /// File compression type @@ -240,6 +243,7 @@ pub struct FileScanConfigBuilder { object_store_url: ObjectStoreUrl, file_source: Arc, limit: Option, + offset: Option, constraints: Option, file_groups: Vec, statistics: Option, @@ -268,6 +272,7 @@ impl FileScanConfigBuilder { statistics: None, output_ordering: vec![], file_compression_type: None, + offset: None, limit: None, constraints: None, batch_size: None, @@ -283,6 +288,13 @@ impl FileScanConfigBuilder { self } + /// Number of records to skip before starting to read `limit` rows. If `None, + /// records are read from the start. + pub fn with_offset(mut self, offset: Option) -> Self { + self.offset = offset; + self + } + /// Set the file source for scanning files. /// /// This method allows you to change the file source implementation (e.g. ParquetSource, CsvSource, etc.) @@ -449,6 +461,7 @@ impl FileScanConfigBuilder { let Self { object_store_url, file_source, + offset, limit, constraints, file_groups, @@ -470,6 +483,7 @@ impl FileScanConfigBuilder { FileScanConfig { object_store_url, file_source, + offset, limit, constraints, file_groups, @@ -493,6 +507,7 @@ impl From for FileScanConfigBuilder { output_ordering: config.output_ordering, file_compression_type: Some(config.file_compression_type), limit: config.limit, + offset: config.offset, constraints: Some(config.constraints), batch_size: config.batch_size, expr_adapter_factory: config.expr_adapter_factory, @@ -567,6 +582,10 @@ impl DataSource for FileScanConfig { write!(f, ", limit={limit}")?; } + if let Some(offset) = self.offset { + write!(f, ", offset={offset}")?; + } + display_orderings(f, &orderings)?; if !self.constraints.is_empty() { @@ -747,6 +766,17 @@ impl DataSource for FileScanConfig { self.limit } + fn with_offset(&self, offset: Option) -> Option> { + let source = FileScanConfigBuilder::from(self.clone()) + .with_offset(offset) + .build(); + Some(Arc::new(source)) + } + + fn offset(&self) -> Option { + self.offset + } + fn metrics(&self) -> ExecutionPlanMetricsSet { self.file_source.metrics().clone() } @@ -1177,6 +1207,10 @@ impl DisplayAs for FileScanConfig { write!(f, ", limit={limit}")?; } + if let Some(offset) = self.offset { + write!(f, ", offset={offset}")?; + } + display_orderings(f, &orderings)?; if !self.constraints.is_empty() { @@ -1746,6 +1780,7 @@ mod tests { // Build with various configurations let config = builder .with_limit(Some(1000)) + .with_offset(Some(2000)) .with_projection_indices(Some(vec![0, 1])) .unwrap() .with_statistics(Statistics::new_unknown(&file_schema)) @@ -1766,6 +1801,7 @@ mod tests { assert_eq!(config.object_store_url, object_store_url); assert_eq!(*config.file_schema(), file_schema); assert_eq!(config.limit, Some(1000)); + assert_eq!(config.offset, Some(2000)); assert_eq!( config .file_source @@ -1925,6 +1961,7 @@ mod tests { .with_projection_indices(Some(vec![0, 2])) .unwrap() .with_limit(Some(10)) + .with_offset(Some(20)) .with_file(file.clone()) .with_constraints(Constraints::default()) .build(); @@ -1948,6 +1985,7 @@ mod tests { Some(vec![0, 2]) ); assert_eq!(new_config.limit, Some(10)); + assert_eq!(new_config.offset, Some(20)); assert_eq!(*new_config.table_partition_cols(), partition_cols); assert_eq!(new_config.file_groups.len(), 1); assert_eq!(new_config.file_groups[0].len(), 1); diff --git a/datafusion/datasource/src/file_stream.rs b/datafusion/datasource/src/file_stream.rs index c8090382094ef..444c839aa0b02 100644 --- a/datafusion/datasource/src/file_stream.rs +++ b/datafusion/datasource/src/file_stream.rs @@ -104,6 +104,10 @@ impl FileStream { /// bunch of sequential IO), it can be parallelized with decoding. fn start_next_file(&mut self) -> Option> { let part_file = self.file_iter.pop_front()?; + println!( + "DEBUG::start_next_file::opening file::{}", + part_file.object_meta.location + ); Some(self.file_opener.open(part_file)) } diff --git a/datafusion/datasource/src/memory.rs b/datafusion/datasource/src/memory.rs index 1d12bb3200309..c59d03a0a8fdb 100644 --- a/datafusion/datasource/src/memory.rs +++ b/datafusion/datasource/src/memory.rs @@ -73,6 +73,9 @@ pub struct MemorySourceConfig { /// The maximum number of records to read from this plan. If `None`, /// all records after filtering are returned. fetch: Option, + /// Number of records to skip before starting to read `limit` rows. If `None, + /// records are read from the start. + offset: Option, } impl DataSource for MemorySourceConfig { @@ -228,6 +231,15 @@ impl DataSource for MemorySourceConfig { self.fetch } + fn with_offset(&self, offset: Option) -> Option> { + let source = self.clone(); + Some(Arc::new(source.with_offset(offset))) + } + + fn offset(&self) -> Option { + self.offset + } + fn try_swapping_with_projection( &self, projection: &ProjectionExprs, @@ -271,6 +283,7 @@ impl MemorySourceConfig { sort_information: vec![], show_sizes: true, fetch: None, + offset: None, }) } @@ -373,6 +386,7 @@ impl MemorySourceConfig { sort_information: vec![], show_sizes: true, fetch: None, + offset: None, }; Ok(DataSourceExec::from_data_source(source)) } @@ -383,6 +397,12 @@ impl MemorySourceConfig { self } + /// Set the offset of the files + pub fn with_offset(mut self, offset: Option) -> Self { + self.offset = offset; + self + } + /// Set `show_sizes` to determine whether to display partition sizes pub fn with_show_sizes(mut self, show_sizes: bool) -> Self { self.show_sizes = show_sizes; diff --git a/datafusion/datasource/src/source.rs b/datafusion/datasource/src/source.rs index a3892dfac9778..7af6dbe66e8cd 100644 --- a/datafusion/datasource/src/source.rs +++ b/datafusion/datasource/src/source.rs @@ -170,7 +170,10 @@ pub trait DataSource: Send + Sync + Debug { /// Return a copy of this DataSource with a new fetch limit fn with_fetch(&self, _limit: Option) -> Option>; + /// Return a copy of this DataSource with a new offset limit + fn with_offset(&self, _offset: Option) -> Option>; fn fetch(&self) -> Option; + fn offset(&self) -> Option; fn metrics(&self) -> ExecutionPlanMetricsSet { ExecutionPlanMetricsSet::new() } @@ -332,6 +335,17 @@ impl ExecutionPlan for DataSourceExec { self.data_source.fetch() } + fn with_offset(&self, offset: Option) -> Option> { + let data_source = self.data_source.with_offset(offset)?; + let cache = self.cache.clone(); + + Some(Arc::new(Self { data_source, cache })) + } + + fn offset(&self) -> Option { + self.data_source.offset() + } + fn try_swapping_with_projection( &self, projection: &ProjectionExec, diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 6f654428e41a1..c1fbf84f2004f 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -484,7 +484,14 @@ impl LogicalPlanBuilder { projection: Option>, filters: Vec, ) -> Result { - Self::scan_with_filters_inner(table_name, table_source, projection, filters, None) + Self::scan_with_filters_inner( + table_name, + table_source, + projection, + filters, + None, + None, + ) } /// Convert a table provider into a builder with a TableScan with filter and fetch @@ -494,6 +501,7 @@ impl LogicalPlanBuilder { projection: Option>, filters: Vec, fetch: Option, + skip: Option, ) -> Result { Self::scan_with_filters_inner( table_name, @@ -501,6 +509,7 @@ impl LogicalPlanBuilder { projection, filters, fetch, + skip, ) } @@ -510,9 +519,16 @@ impl LogicalPlanBuilder { projection: Option>, filters: Vec, fetch: Option, + skip: Option, ) -> Result { - let table_scan = - TableScan::try_new(table_name, table_source, projection, filters, fetch)?; + let table_scan = TableScan::try_new( + table_name, + table_source, + projection, + filters, + fetch, + skip, + )?; // Inline TableScan if table_scan.filters.is_empty() @@ -792,7 +808,7 @@ impl LogicalPlanBuilder { self, sorts: impl IntoIterator> + Clone, ) -> Result { - self.sort_with_limit(sorts, None) + self.sort_with_limit(sorts, None, None) } /// Apply a sort @@ -800,6 +816,7 @@ impl LogicalPlanBuilder { self, sorts: impl IntoIterator> + Clone, fetch: Option, + skip: Option, ) -> Result { let sorts = rewrite_sort_cols_by_aggs(sorts, &self.plan)?; @@ -825,6 +842,7 @@ impl LogicalPlanBuilder { expr: normalize_sorts(sorts, &self.plan)?, input: self.plan, fetch, + skip, }))); } @@ -842,6 +860,7 @@ impl LogicalPlanBuilder { expr: normalize_sorts(sorts, &plan)?, input: Arc::new(plan), fetch, + skip, }); Projection::try_new(new_expr, Arc::new(sort_plan)) @@ -2031,6 +2050,7 @@ pub fn table_scan_with_filter_and_fetch( projection: Option>, filters: Vec, fetch: Option, + skip: Option, ) -> Result { let table_source = table_source(table_schema); let name = name @@ -2042,6 +2062,7 @@ pub fn table_scan_with_filter_and_fetch( projection, filters, fetch, + skip, ) } diff --git a/datafusion/expr/src/logical_plan/display.rs b/datafusion/expr/src/logical_plan/display.rs index 480974b055d11..136ce80f14e63 100644 --- a/datafusion/expr/src/logical_plan/display.rs +++ b/datafusion/expr/src/logical_plan/display.rs @@ -348,6 +348,7 @@ impl<'a, 'b> PgJsonVisitor<'a, 'b> { table_name, filters, fetch, + skip, .. }) => { let mut object = json!({ @@ -401,6 +402,10 @@ impl<'a, 'b> PgJsonVisitor<'a, 'b> { object["Fetch"] = serde_json::Value::Number((*f).into()); } + if let Some(s) = skip { + object["Skip"] = serde_json::Value::Number((*s).into()); + } + object } LogicalPlan::Projection(Projection { expr, .. }) => { @@ -467,7 +472,9 @@ impl<'a, 'b> PgJsonVisitor<'a, 'b> { "Aggregates": expr_vec_fmt!(aggr_expr) }) } - LogicalPlan::Sort(Sort { expr, fetch, .. }) => { + LogicalPlan::Sort(Sort { + expr, fetch, skip, .. + }) => { let mut object = json!({ "Node Type": "Sort", "Sort Key": expr_vec_fmt!(expr), @@ -477,6 +484,10 @@ impl<'a, 'b> PgJsonVisitor<'a, 'b> { object["Fetch"] = serde_json::Value::Number((*fetch).into()); } + if let Some(skip) = skip { + object["Skip"] = serde_json::Value::Number((*skip).into()); + } + object } LogicalPlan::Join(Join { diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 4219c24bfc9c9..269ed17453c03 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -883,6 +883,7 @@ impl LogicalPlan { LogicalPlan::Sort(Sort { expr: sort_expr, fetch, + skip, .. }) => { let input = self.only_input(inputs)?; @@ -894,6 +895,7 @@ impl LogicalPlan { .collect(), input: Arc::new(input), fetch: *fetch, + skip: *skip, })) } LogicalPlan::Join(Join { @@ -1808,6 +1810,7 @@ impl LogicalPlan { projection, filters, fetch, + skip, .. }) => { let projected_fields = match projection { @@ -1871,6 +1874,10 @@ impl LogicalPlan { } } + if let Some(n) = skip { + write!(f, ", skip={n}")?; + } + if let Some(n) = fetch { write!(f, ", fetch={n}")?; } @@ -1932,7 +1939,9 @@ impl LogicalPlan { expr_vec_fmt!(group_expr), expr_vec_fmt!(aggr_expr) ), - LogicalPlan::Sort(Sort { expr, fetch, .. }) => { + LogicalPlan::Sort(Sort { + expr, fetch, skip, .. + }) => { write!(f, "Sort: ")?; for (i, expr_item) in expr.iter().enumerate() { if i > 0 { @@ -1940,10 +1949,15 @@ impl LogicalPlan { } write!(f, "{expr_item}")?; } + if let Some(a) = fetch { write!(f, ", fetch={a}")?; } + if let Some(s) = skip { + write!(f, ", skip={s}")?; + } + Ok(()) } LogicalPlan::Join(Join { @@ -2681,6 +2695,8 @@ pub struct TableScan { pub projected_schema: DFSchemaRef, /// Optional expressions to be used as filters by the table provider pub filters: Vec, + /// Optional number of rows to skip before starting to read + pub skip: Option, /// Optional number of rows to read pub fetch: Option, } @@ -2694,6 +2710,7 @@ impl Debug for TableScan { .field("projected_schema", &self.projected_schema) .field("filters", &self.filters) .field("fetch", &self.fetch) + .field("skip", &self.skip) .finish_non_exhaustive() } } @@ -2705,6 +2722,7 @@ impl PartialEq for TableScan { && self.projected_schema == other.projected_schema && self.filters == other.filters && self.fetch == other.fetch + && self.skip == other.skip } } @@ -2722,20 +2740,24 @@ impl PartialOrd for TableScan { pub projection: &'a Option>, /// Optional expressions to be used as filters by the table provider pub filters: &'a Vec, - /// Optional number of rows to read + /// Optional number of rows to skip before starting to read pub fetch: &'a Option, + /// Optional number of rows to read + pub skip: &'a Option, } let comparable_self = ComparableTableScan { table_name: &self.table_name, projection: &self.projection, filters: &self.filters, fetch: &self.fetch, + skip: &self.skip, }; let comparable_other = ComparableTableScan { table_name: &other.table_name, projection: &other.projection, filters: &other.filters, fetch: &other.fetch, + skip: &other.skip, }; comparable_self .partial_cmp(&comparable_other) @@ -2751,6 +2773,7 @@ impl Hash for TableScan { self.projected_schema.hash(state); self.filters.hash(state); self.fetch.hash(state); + self.skip.hash(state); } } @@ -2763,6 +2786,7 @@ impl TableScan { projection: Option>, filters: Vec, fetch: Option, + skip: Option, ) -> Result { let table_name = table_name.into(); @@ -2804,6 +2828,7 @@ impl TableScan { projected_schema, filters, fetch, + skip, }) } } @@ -3758,6 +3783,8 @@ pub struct Sort { pub expr: Vec, /// The incoming logical plan pub input: Arc, + /// Optional number of rows to skip before fetch + pub skip: Option, /// Optional fetch limit pub fetch: Option, } @@ -4968,6 +4995,7 @@ mod tests { projected_schema: Arc::clone(&schema), filters: vec![], fetch: None, + skip: None, })); let col = schema.field_names()[0].clone(); @@ -4998,6 +5026,7 @@ mod tests { projected_schema: Arc::clone(&unique_schema), filters: vec![], fetch: None, + skip: None, })); let col = schema.field_names()[0].clone(); diff --git a/datafusion/expr/src/logical_plan/tree_node.rs b/datafusion/expr/src/logical_plan/tree_node.rs index 62a27b0a025ad..c25441c6047cb 100644 --- a/datafusion/expr/src/logical_plan/tree_node.rs +++ b/datafusion/expr/src/logical_plan/tree_node.rs @@ -121,9 +121,19 @@ impl TreeNode for LogicalPlan { schema, }) }), - LogicalPlan::Sort(Sort { expr, input, fetch }) => input - .map_elements(f)? - .update_data(|input| LogicalPlan::Sort(Sort { expr, input, fetch })), + LogicalPlan::Sort(Sort { + expr, + input, + fetch, + skip, + }) => input.map_elements(f)?.update_data(|input| { + LogicalPlan::Sort(Sort { + expr, + input, + fetch, + skip, + }) + }), LogicalPlan::Join(Join { left, right, @@ -576,9 +586,19 @@ impl LogicalPlan { null_equality, }) }), - LogicalPlan::Sort(Sort { expr, input, fetch }) => expr - .map_elements(f)? - .update_data(|expr| LogicalPlan::Sort(Sort { expr, input, fetch })), + LogicalPlan::Sort(Sort { + expr, + input, + fetch, + skip, + }) => expr.map_elements(f)?.update_data(|expr| { + LogicalPlan::Sort(Sort { + expr, + input, + fetch, + skip, + }) + }), LogicalPlan::Extension(Extension { node }) => { // would be nice to avoid this copy -- maybe can // update extension to just observer Exprs @@ -599,6 +619,7 @@ impl LogicalPlan { projected_schema, filters, fetch, + skip, }) => filters.map_elements(f)?.update_data(|filters| { LogicalPlan::TableScan(TableScan { table_name, @@ -607,6 +628,7 @@ impl LogicalPlan { projected_schema, filters, fetch, + skip, }) }), LogicalPlan::Distinct(Distinct::On(DistinctOn { diff --git a/datafusion/ffi/src/table_provider.rs b/datafusion/ffi/src/table_provider.rs index df8b648026d3e..049d417a464ad 100644 --- a/datafusion/ffi/src/table_provider.rs +++ b/datafusion/ffi/src/table_provider.rs @@ -111,6 +111,7 @@ pub struct FFI_TableProvider { projections: RVec, filters_serialized: RVec, limit: ROption, + // TODO(feniljain): add offset here? ) -> FfiFuture>, /// Return the type of table. See [`TableType`] for options. @@ -273,7 +274,7 @@ unsafe extern "C" fn scan_fn_wrapper( let plan = rresult_return!( internal_provider - .scan(session, Some(&projections), &filters, limit.into()) + .scan(session, Some(&projections), &filters, limit.into(), None) .await ); @@ -458,6 +459,7 @@ impl TableProvider for ForeignTableProvider { projection: Option<&Vec>, filters: &[Expr], limit: Option, + _offset: Option, // TODO(feniljain): use this ) -> Result> { let session = FFI_SessionRef::new(session, None, self.0.logical_codec.clone()); diff --git a/datafusion/functions-table/src/generate_series.rs b/datafusion/functions-table/src/generate_series.rs index b806798bcecc0..ab10db6cb7c9f 100644 --- a/datafusion/functions-table/src/generate_series.rs +++ b/datafusion/functions-table/src/generate_series.rs @@ -480,6 +480,7 @@ impl TableProvider for GenerateSeriesTable { projection: Option<&Vec>, _filters: &[Expr], _limit: Option, + _offset: Option, ) -> Result> { let batch_size = state.config_options().execution.batch_size; let generator = self.as_generator(batch_size)?; diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs b/datafusion/optimizer/src/analyzer/type_coercion.rs index 02395c76bdd92..568a39ac88a5e 100644 --- a/datafusion/optimizer/src/analyzer/type_coercion.rs +++ b/datafusion/optimizer/src/analyzer/type_coercion.rs @@ -1489,6 +1489,7 @@ mod test { expr: vec![sort_expr], input: Arc::new(plan), fetch: None, + skip: None, }); // Plan C: no coerce @@ -1612,6 +1613,7 @@ mod test { expr: vec![sort_expr], input: Arc::new(plan), fetch: None, + skip: None, }); // Plan C: no coerce diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs b/datafusion/optimizer/src/common_subexpr_eliminate.rs index d9273a8f60fb2..509e2859953c4 100644 --- a/datafusion/optimizer/src/common_subexpr_eliminate.rs +++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs @@ -96,7 +96,12 @@ impl CommonSubexprEliminate { sort: Sort, config: &dyn OptimizerConfig, ) -> Result> { - let Sort { expr, input, fetch } = sort; + let Sort { + expr, + input, + fetch, + skip, + } = sort; let input = Arc::unwrap_or_clone(input); let (sort_expressions, sort_params): (Vec<_>, Vec<(_, _)>) = expr .into_iter() @@ -117,6 +122,7 @@ impl CommonSubexprEliminate { .collect(), input: Arc::new(new_input), fetch, + skip, }) }); Ok(new_sort) diff --git a/datafusion/optimizer/src/eliminate_duplicated_expr.rs b/datafusion/optimizer/src/eliminate_duplicated_expr.rs index 113c92c2c8e99..7b4c73353cfd7 100644 --- a/datafusion/optimizer/src/eliminate_duplicated_expr.rs +++ b/datafusion/optimizer/src/eliminate_duplicated_expr.rs @@ -86,6 +86,7 @@ impl OptimizerRule for EliminateDuplicatedExpr { expr: unique_exprs, input: sort.input, fetch: sort.fetch, + skip: sort.skip, }))) } LogicalPlan::Aggregate(agg) => { diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs b/datafusion/optimizer/src/optimize_projections/mod.rs index 548eadffa242e..09b2db3f73f6e 100644 --- a/datafusion/optimizer/src/optimize_projections/mod.rs +++ b/datafusion/optimizer/src/optimize_projections/mod.rs @@ -260,6 +260,7 @@ fn optimize_projections( filters, fetch, projected_schema: _, + skip, } = table_scan; // Get indices referred to in the original (schema with all fields) @@ -274,6 +275,7 @@ fn optimize_projections( Some(projection), filters, fetch, + skip, ) .map(LogicalPlan::TableScan) .map(Transformed::yes); diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index 755ffdbafc869..b33c63556af80 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -3119,6 +3119,7 @@ mod tests { projection, source: Arc::new(test_provider), fetch: None, + skip: None, }); Ok(LogicalPlanBuilder::from(table_scan)) diff --git a/datafusion/optimizer/src/push_down_limit.rs b/datafusion/optimizer/src/push_down_limit.rs index 7b302adf22acc..d0a233e821d6f 100644 --- a/datafusion/optimizer/src/push_down_limit.rs +++ b/datafusion/optimizer/src/push_down_limit.rs @@ -17,7 +17,7 @@ //! [`PushDownLimit`] pushes `LIMIT` earlier in the query plan -use std::cmp::min; +use std::cmp::{max, min}; use std::sync::Arc; use crate::optimizer::ApplyOrder; @@ -90,21 +90,28 @@ impl OptimizerRule for PushDownLimit { return Ok(Transformed::no(LogicalPlan::Limit(limit))); }; + // TODO(feniljain): Change all the other LogicalPlan nodes + // to accept skip field and add tests cases for all of them + // + // impl/test: + // X/X tablescan + // X/- union + // X/- join + // X/- sort + // X/- projection + // X/- subquery + // X/- extension match Arc::unwrap_or_clone(limit.input) { LogicalPlan::TableScan(mut scan) => { - let rows_needed = if fetch != 0 { fetch + skip } else { 0 }; - let new_fetch = scan - .fetch - .map(|x| min(x, rows_needed)) - .or(Some(rows_needed)); - if new_fetch == scan.fetch { + let new_fetch = scan.fetch.map(|x| min(x, fetch)).or(Some(fetch)); + let new_skip = scan.skip.map(|x| max(x, skip)).or(Some(skip)); + + // push limit and offset into the table scan itself + if new_fetch == scan.fetch && new_skip == scan.skip { original_limit(skip, fetch, LogicalPlan::TableScan(scan)) } else { - // push limit into the table scan itself - scan.fetch = scan - .fetch - .map(|x| min(x, rows_needed)) - .or(Some(rows_needed)); + scan.fetch = new_fetch; + scan.skip = new_skip; transformed_limit(skip, fetch, LogicalPlan::TableScan(scan)) } } @@ -113,7 +120,7 @@ impl OptimizerRule for PushDownLimit { union.inputs = union .inputs .into_iter() - .map(|input| make_arc_limit(0, fetch + skip, input)) + .map(|input| make_arc_limit(skip, fetch, input)) .collect(); transformed_limit(skip, fetch, LogicalPlan::Union(union)) } @@ -124,20 +131,16 @@ impl OptimizerRule for PushDownLimit { })), LogicalPlan::Sort(mut sort) => { - let new_fetch = { - let sort_fetch = skip + fetch; - Some(sort.fetch.map(|f| f.min(sort_fetch)).unwrap_or(sort_fetch)) - }; - if new_fetch == sort.fetch { - if skip > 0 { - original_limit(skip, fetch, LogicalPlan::Sort(sort)) - } else { - Ok(Transformed::yes(LogicalPlan::Sort(sort))) - } + let new_fetch = sort.fetch.map(|x| min(x, fetch)).or(Some(fetch)); + let new_skip = sort.skip.map(|x| max(x, skip)).or(Some(skip)); + + // push limit and offset into the table sort itself + if new_fetch == sort.fetch && new_skip == sort.skip { + original_limit(skip, fetch, LogicalPlan::Sort(sort)) } else { sort.fetch = new_fetch; - limit.input = Arc::new(LogicalPlan::Sort(sort)); - Ok(Transformed::yes(LogicalPlan::Limit(limit))) + sort.skip = new_skip; + transformed_limit(skip, fetch, LogicalPlan::Sort(sort)) } } LogicalPlan::Projection(mut proj) => { @@ -163,8 +166,8 @@ impl OptimizerRule for PushDownLimit { .into_iter() .map(|child| { LogicalPlan::Limit(Limit { - skip: None, - fetch: Some(Box::new(lit((fetch + skip) as i64))), + skip: Some(Box::new(lit((skip) as i64))), + fetch: Some(Box::new(lit((fetch) as i64))), input: Arc::new(child.clone()), }) }) @@ -463,6 +466,23 @@ mod test { ) } + #[test] + fn limit_and_skip_push_down_to_table_scan() -> Result<()> { + let table_scan = test_table_scan()?; + + let plan = LogicalPlanBuilder::from(table_scan) + .limit(5, Some(1000))? + .build()?; + + assert_optimized_plan_equal!( + plan, + @r" + Limit: skip=5, fetch=1000 + TableScan: test, skip=5, fetch=1000 + " + ) + } + #[test] fn limit_pushdown_multiple_limits() -> Result<()> { let table_scan = test_table_scan()?; diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index 06da0b8933c18..4140bac36807e 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -521,6 +521,23 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { None } + // TODO(feniljain): + // - Explore usage of `with_offset` in implementors of `with_fetch` + // - Maybe we dont need these method at all, as `ListingTable` and + // other table providers set offset directly on FileScanConfigBuidler, + // i.e. `DataSource` impl. + + /// Returns a offset variant of this `ExecutionPlan` node, if it supports + /// offset. Returns `None` otherwise. + fn with_offset(&self, _offset: Option) -> Option> { + None + } + + /// Gets the offset count for the operator, `None` means there is no offset. + fn offset(&self) -> Option { + None + } + /// Gets the effect on cardinality, if known fn cardinality_effect(&self) -> CardinalityEffect { CardinalityEffect::Unknown diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs index 218c2e4e47d04..9760e9f5cc8fb 100644 --- a/datafusion/proto/src/logical_plan/mod.rs +++ b/datafusion/proto/src/logical_plan/mod.rs @@ -267,6 +267,7 @@ fn from_table_source( projected_schema, filters: vec![], fetch: None, + skip: None, }); LogicalPlanNode::try_from_logical_plan(&r, extension_codec) @@ -539,7 +540,9 @@ impl AsLogicalPlan for LogicalPlanNode { from_proto::parse_sorts(&sort.expr, ctx, extension_codec)?; let fetch: Option = sort.fetch.try_into().ok(); LogicalPlanBuilder::from(input) - .sort_with_limit(sort_expr, fetch)? + // TODO(feniljain): Change this to skip taken from + // `SortNode` when you change the proto + .sort_with_limit(sort_expr, fetch, None)? .build() } LogicalPlanType::Repartition(repartition) => { @@ -1409,7 +1412,12 @@ impl AsLogicalPlan for LogicalPlanNode { ))), }) } - LogicalPlan::Sort(Sort { input, expr, fetch }) => { + LogicalPlan::Sort(Sort { + input, + expr, + fetch, + skip: _, + }) => { let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan( input.as_ref(), extension_codec, @@ -1422,6 +1430,8 @@ impl AsLogicalPlan for LogicalPlanNode { input: Some(Box::new(input)), expr: sort_expr, fetch: fetch.map(|f| f as i64).unwrap_or(-1i64), + // TODO(feniljain): add skip to proto + // skip: skip.map(|s| s as i64).unwrap_or(-1i64), }, ))), }) diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index 9558effb8a2a6..18719fd413e52 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -587,6 +587,7 @@ pub fn serialize_file_scan_config( }) .transpose()?; + // TODO(feniljain): Add offset to proto Ok(protobuf::FileScanExecConf { file_groups, statistics: Some((&conf.statistics()).into()), diff --git a/datafusion/sql/src/unparser/plan.rs b/datafusion/sql/src/unparser/plan.rs index 56bf887dbde43..229986f66f6db 100644 --- a/datafusion/sql/src/unparser/plan.rs +++ b/datafusion/sql/src/unparser/plan.rs @@ -1046,7 +1046,10 @@ impl Unparser<'_> { } fn is_scan_with_pushdown(scan: &TableScan) -> bool { - scan.projection.is_some() || !scan.filters.is_empty() || scan.fetch.is_some() + scan.projection.is_some() + || !scan.filters.is_empty() + || scan.fetch.is_some() + || scan.skip.is_some() } /// Try to unparse a table scan with pushdown operations into a new subquery plan. @@ -1135,7 +1138,7 @@ impl Unparser<'_> { } if let Some(fetch) = table_scan.fetch { - builder = builder.limit(0, Some(fetch))?; + builder = builder.limit(table_scan.skip.unwrap_or(0), Some(fetch))?; } // If the table scan has an alias but no projection or filters, it means no column references are rebased. diff --git a/datafusion/sql/src/unparser/rewrite.rs b/datafusion/sql/src/unparser/rewrite.rs index ec1b17cd28a91..dbf62ebe5ae4d 100644 --- a/datafusion/sql/src/unparser/rewrite.rs +++ b/datafusion/sql/src/unparser/rewrite.rs @@ -75,6 +75,7 @@ pub(super) fn normalize_union_schema(plan: &LogicalPlan) -> Result expr: rewrite_sort_expr_for_union(sort.expr)?, input: sort.input, fetch: sort.fetch, + skip: sort.skip, }))) } _ => Ok(Transformed::no(plan)), diff --git a/datafusion/sql/tests/cases/plan_to_sql.rs b/datafusion/sql/tests/cases/plan_to_sql.rs index 46a42ae534af0..384112b39804a 100644 --- a/datafusion/sql/tests/cases/plan_to_sql.rs +++ b/datafusion/sql/tests/cases/plan_to_sql.rs @@ -1278,6 +1278,7 @@ fn test_table_scan_with_empty_projection_and_filter_postgres() { Some(vec![]), vec![col("id").gt(lit(10))], None, + None, ) .unwrap() .build() @@ -1300,6 +1301,7 @@ fn test_table_scan_with_empty_projection_and_filter_default_dialect() { Some(vec![]), vec![col("id").gt(lit(10))], None, + None, ) .unwrap() .build() @@ -1483,11 +1485,17 @@ fn test_table_scan_alias() -> Result<()> { @"SELECT a.id FROM t1 AS a WHERE ((a.id > 1) AND (a.age < 2))" ); - let table_scan_with_fetch = - table_scan_with_filter_and_fetch(Some("t1"), &schema, None, vec![], Some(10))? - .project(vec![col("id")])? - .alias("a")? - .build()?; + let table_scan_with_fetch = table_scan_with_filter_and_fetch( + Some("t1"), + &schema, + None, + vec![], + Some(10), + None, + )? + .project(vec![col("id")])? + .alias("a")? + .build()?; let table_scan_with_fetch = plan_to_sql(&table_scan_with_fetch)?; assert_snapshot!( table_scan_with_fetch, @@ -1500,6 +1508,7 @@ fn test_table_scan_alias() -> Result<()> { Some(vec![0, 1]), vec![col("id").gt(lit(1))], Some(10), + None, )? .project(vec![col("id")])? .alias("a")? @@ -1665,9 +1674,15 @@ fn test_table_scan_pushdown() -> Result<()> { @"SELECT t1.age FROM t1 WHERE (t1.id > t1.age)" ); - let table_scan_with_inline_fetch = - table_scan_with_filter_and_fetch(Some("t1"), &schema, None, vec![], Some(10))? - .build()?; + let table_scan_with_inline_fetch = table_scan_with_filter_and_fetch( + Some("t1"), + &schema, + None, + vec![], + Some(10), + None, + )? + .build()?; let table_scan_with_inline_fetch = plan_to_sql(&table_scan_with_inline_fetch)?; assert_snapshot!( table_scan_with_inline_fetch, @@ -1680,6 +1695,7 @@ fn test_table_scan_pushdown() -> Result<()> { Some(vec![0, 1]), vec![], Some(10), + None, )? .build()?; let table_scan_with_projection_and_inline_fetch = @@ -1695,6 +1711,7 @@ fn test_table_scan_pushdown() -> Result<()> { Some(vec![0, 1]), vec![col("id").gt(col("age"))], Some(10), + None, )? .build()?; let table_scan_with_all = plan_to_sql(&table_scan_with_all)?; @@ -1717,6 +1734,8 @@ fn test_table_scan_pushdown() -> Result<()> { @"SELECT * FROM t1 WHERE (t1.id = 5) AND (t1.id > t1.age)" ); + // TODO(feniljain): add OFFSET test cases here + Ok(()) } @@ -1729,7 +1748,7 @@ fn test_sort_with_push_down_fetch() -> Result<()> { let plan = table_scan(Some("t1"), &schema, None)? .project(vec![col("id"), col("age")])? - .sort_with_limit(vec![col("age").sort(true, true)], Some(10))? + .sort_with_limit(vec![col("age").sort(true, true)], Some(10), None)? .build()?; let sql = plan_to_sql(&plan)?; diff --git a/datafusion/sqllogictest/src/test_context.rs b/datafusion/sqllogictest/src/test_context.rs index 9ec085b41eec0..d367a8a4029f8 100644 --- a/datafusion/sqllogictest/src/test_context.rs +++ b/datafusion/sqllogictest/src/test_context.rs @@ -263,6 +263,7 @@ pub async fn register_temp_table(ctx: &SessionContext) { _: Option<&Vec>, _: &[Expr], _: Option, + _: Option, ) -> Result, DataFusionError> { unimplemented!() } diff --git a/datafusion/substrait/src/logical_plan/producer/rel/sort_rel.rs b/datafusion/substrait/src/logical_plan/producer/rel/sort_rel.rs index d4520a4c37b14..3f265768fb68b 100644 --- a/datafusion/substrait/src/logical_plan/producer/rel/sort_rel.rs +++ b/datafusion/substrait/src/logical_plan/producer/rel/sort_rel.rs @@ -27,7 +27,12 @@ pub fn from_sort( producer: &mut impl SubstraitProducer, sort: &Sort, ) -> datafusion::common::Result> { - let Sort { expr, input, fetch } = sort; + let Sort { + expr, + input, + fetch, + skip: _, // TODO(feniljain) + } = sort; let sort_fields = expr .iter() .map(|e| substrait_sort_field(producer, e, input.schema()))