Skip to content

Commit 6da3517

Browse files
Fix sequential metadata fetching in ListingTable causing high latency
When scanning an exact list of remote Parquet files, the ListingTable was fetching file metadata (via head calls) sequentially. This was due to using `stream::iter(file_list).flatten()`, which processes each one-item stream in order. For remote blob stores, where each head call can take tens to hundreds of milliseconds, this sequential behavior significantly increased the time to create the physical plan. This commit replaces the sequential flattening with concurrent merging using `futures::stream::select_all(file_list)`. With this change, the `head` requests are executed in parallel (up to the configured `meta_fetch_concurrency` limit), reducing latency when creating the physical plan. Additionally, tests have been updated to ensure that metadata fetching occurs concurrently.
1 parent e9284cf commit 6da3517

File tree

1 file changed

+2
-2
lines changed
  • datafusion/core/src/datasource/listing

1 file changed

+2
-2
lines changed

datafusion/core/src/datasource/listing/table.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ use datafusion_physical_expr::{
5555
use async_trait::async_trait;
5656
use datafusion_catalog::Session;
5757
use datafusion_physical_expr_common::sort_expr::LexRequirement;
58-
use futures::{future, stream, StreamExt, TryStreamExt};
58+
use futures::{future, StreamExt, TryStreamExt};
5959
use itertools::Itertools;
6060
use object_store::ObjectStore;
6161

@@ -1112,7 +1112,7 @@ impl ListingTable {
11121112
)
11131113
}))
11141114
.await?;
1115-
let file_list = stream::iter(file_list).flatten();
1115+
let file_list = futures::stream::select_all(file_list);
11161116
// collect the statistics if required by the config
11171117
let files = file_list
11181118
.map(|part_file| async {

0 commit comments

Comments
 (0)