1717
1818//! The table implementation.
1919
20- use std::collections::HashMap;
21- use std::{any::Any, str::FromStr, sync::Arc};
22-
2320use super::helpers::{expr_applicable_for_cols, pruned_partition_list};
2421use super::{ListingTableUrl, PartitionedFile};
22+ use std::collections::HashMap;
23+ use std::{any::Any, str::FromStr, sync::Arc};
2524
2625use crate::datasource::{
2726 create_ordering,
2827 file_format::{
2928 file_compression_type::FileCompressionType, FileFormat, FilePushdownSupport,
3029 },
31- get_statistics_with_limit,
3230 physical_plan::FileSinkConfig,
3331};
3432use crate::execution::context::SessionState;
@@ -55,9 +53,12 @@ use datafusion_physical_expr::{
5553
5654use async_trait::async_trait;
5755use datafusion_catalog::Session;
56+ use datafusion_common::stats::Precision;
57+ use datafusion_datasource::add_row_stats;
58+ use datafusion_datasource::compute_all_files_statistics;
5859use datafusion_datasource::file_groups::FileGroup;
5960use datafusion_physical_expr_common::sort_expr::LexRequirement;
60- use futures::{future, stream, StreamExt, TryStreamExt};
61+ use futures::{future, stream, Stream, StreamExt, TryStreamExt};
6162use itertools::Itertools;
6263use object_store::ObjectStore;
6364
@@ -1115,32 +1116,26 @@ impl ListingTable {
11151116 let files = file_list
11161117 .map(|part_file| async {
11171118 let part_file = part_file?;
1118- if self.options.collect_stat {
1119- let statistics =
1120- self.do_collect_statistics(ctx, &store, &part_file).await?;
1121- Ok((part_file, statistics))
1119+ let statistics = if self.options.collect_stat {
1120+ self.do_collect_statistics(ctx, &store, &part_file).await?
11221121 } else {
1123- Ok((
1124- part_file,
1125- Arc::new(Statistics::new_unknown(&self.file_schema)),
1126- ))
1127- }
1122+ Arc::new(Statistics::new_unknown(&self.file_schema))
1123+ };
1124+ Ok(part_file.with_statistics(statistics))
11281125 })
11291126 .boxed()
11301127 .buffer_unordered(ctx.config_options().execution.meta_fetch_concurrency);
11311128
1132- let (files, statistics) = get_statistics_with_limit(
1133- files,
1129+ let (file_group, inexact_stats) =
1130+ get_files_with_limit(files, limit, self.options.collect_stat).await?;
1131+
1132+ let file_groups = file_group.split_files(self.options.target_partitions);
1133+ compute_all_files_statistics(
1134+ file_groups,
11341135 self.schema(),
1135- limit,
11361136 self.options.collect_stat,
1137+ inexact_stats,
11371138 )
1138- .await?;
1139-
1140- Ok((
1141- files.split_files(self.options.target_partitions),
1142- statistics,
1143- ))
11441139 }
11451140
11461141 /// Collects statistics for a given partitioned file.
@@ -1182,6 +1177,86 @@ impl ListingTable {
11821177 }
11831178}
11841179
1180+ /// Processes a stream of partitioned files and returns a `FileGroup` containing the files.
1181+ ///
1182+ /// This function collects files from the provided stream until either:
1183+ /// 1. The stream is exhausted
1184+ /// 2. The accumulated number of rows exceeds the provided `limit` (if specified)
1185+ ///
1186+ /// # Arguments
1187+ /// * `files` - A stream of `Result<PartitionedFile>` items to process
1188+ /// * `limit` - An optional row count limit. If provided, the function will stop collecting files
1189+ /// once the accumulated number of rows exceeds this limit
1190+ /// * `collect_stats` - Whether to collect and accumulate statistics from the files
1191+ ///
1192+ /// # Returns
1193+ /// A `Result` containing a `FileGroup` with the collected files
1194+ /// and a boolean indicating whether the statistics are inexact.
1195+ ///
1196+ /// # Note
1197+ /// The function will continue processing files if statistics are not available or if the
1198+ /// limit is not provided. If `collect_stats` is false, statistics won't be accumulated
1199+ /// but files will still be collected.
1200+ async fn get_files_with_limit(
1201+ files: impl Stream<Item = Result<PartitionedFile>>,
1202+ limit: Option<usize>,
1203+ collect_stats: bool,
1204+ ) -> Result<(FileGroup, bool)> {
1205+ let mut file_group = FileGroup::default();
1206+ // Fusing the stream allows us to call next safely even once it is finished.
1207+ let mut all_files = Box::pin(files.fuse());
1208+ let mut num_rows = Precision::<usize>::Absent;
1209+ while let Some(first_file) = all_files.next().await {
1210+ let file = first_file?;
1211+ if let Some(file_statistic) = &file.statistics {
1212+ num_rows = file_statistic.num_rows;
1213+ }
1214+ file_group.push(file);
1215+
1216+ // If the number of rows exceeds the limit, we can stop processing
1217+ // files. This only applies when we know the number of rows. It also
1218+ // currently ignores tables that have no statistics regarding the
1219+ // number of rows.
1220+ let conservative_num_rows = match num_rows {
1221+ Precision::Exact(nr) => nr,
1222+ _ => usize::MIN,
1223+ };
1224+ if conservative_num_rows <= limit.unwrap_or(usize::MAX) {
1225+ while let Some(current) = all_files.next().await {
1226+ let file = current?;
1227+ if !collect_stats {
1228+ file_group.push(file);
1229+ continue;
1230+ }
1231+
1232+ // We accumulate the number of rows, total byte size and null
1233+ // counts across all the files in question. If any file does not
1234+ // provide any information or provides an inexact value, we demote
1235+ // the statistic precision to inexact.
1236+ if let Some(file_stats) = &file.statistics {
1237+ num_rows = add_row_stats(num_rows, file_stats.num_rows);
1238+ }
1239+ file_group.push(file);
1240+
1241+ // If the number of rows exceeds the limit, we can stop processing
1242+ // files. This only applies when we know the number of rows. It also
1243+ // currently ignores tables that have no statistics regarding the
1244+ // number of rows.
1245+ if num_rows.get_value().unwrap_or(&usize::MIN)
1246+ > &limit.unwrap_or(usize::MAX)
1247+ {
1248+ break;
1249+ }
1250+ }
1251+ }
1252+ }
1253+ // If we still have files in the stream, it means that the limit kicked
1254+ // in, and the statistic could have been different had we processed the
1255+ // files in a different order.
1256+ let inexact_stats = all_files.next().await.is_some();
1257+ Ok((file_group, inexact_stats))
1258+ }
1259+
11851260#[cfg(test)]
11861261mod tests {
11871262 use super::*;
0 commit comments