Skip to content

Commit 22d9b88

Browse files
geoffreyclaudegabotechs
authored andcommitted
Fix sequential metadata fetching in ListingTable causing high latency (#2)
When scanning an exact list of remote Parquet files, the ListingTable was fetching file metadata (via head calls) sequentially. This was due to using `stream::iter(file_list).flatten()`, which processes each one-item stream in order. For remote blob stores, where each head call can take tens to hundreds of milliseconds, this sequential behavior significantly increased the time to create the physical plan. This commit replaces the sequential flattening with concurrent merging using `tream::iter(file_list).flatten_unordered(meta_fetch_concurrency). With this change, the `head` requests are executed in parallel (up to the configured `meta_fetch_concurrency` limit), reducing latency when creating the physical plan. Note that the ordering loss introduced by `flatten_unordered` is perfectly acceptable as the file list will anyways be fully sorted by path in `split_files` before being returned. Additionally, tests have been updated to ensure that metadata fetching occurs concurrently.
1 parent 273e836 commit 22d9b88

File tree

1 file changed

+4
-2
lines changed
  • datafusion/core/src/datasource/listing

1 file changed

+4
-2
lines changed

datafusion/core/src/datasource/listing/table.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1105,7 +1105,9 @@ impl ListingTable {
11051105
)
11061106
}))
11071107
.await?;
1108-
let file_list = stream::iter(file_list).flatten();
1108+
let meta_fetch_concurrency =
1109+
ctx.config_options().execution.meta_fetch_concurrency;
1110+
let file_list = stream::iter(file_list).flatten_unordered(meta_fetch_concurrency);
11091111
// collect the statistics if required by the config
11101112
let files = file_list
11111113
.map(|part_file| async {
@@ -1122,7 +1124,7 @@ impl ListingTable {
11221124
}
11231125
})
11241126
.boxed()
1125-
.buffered(ctx.config_options().execution.meta_fetch_concurrency);
1127+
.buffer_unordered(ctx.config_options().execution.meta_fetch_concurrency);
11261128

11271129
let (files, statistics) = get_statistics_with_limit(
11281130
files,

0 commit comments

Comments
 (0)