Skip to content

Commit 447a945

Browse files
Fix sequential metadata fetching in ListingTable causing high latency
When scanning an exact list of remote Parquet files, the ListingTable was fetching file metadata (via head calls) sequentially. This was due to using `stream::iter(file_list).flatten()`, which processes each one-item stream in order. For remote blob stores, where each head call can take tens to hundreds of milliseconds, this sequential behavior significantly increased the time to create the physical plan. This commit replaces the sequential flattening with concurrent merging using `futures::stream::select_all(file_list)`. With this change, the `head` requests are executed in parallel (up to the configured `meta_fetch_concurrency` limit), reducing latency when creating the physical plan. Additionally, tests have been updated to ensure that metadata fetching occurs concurrently.
1 parent fc2fbb3 commit 447a945

File tree

2 files changed

+211
-10
lines changed

2 files changed

+211
-10
lines changed

datafusion/core/src/datasource/listing/table.rs

Lines changed: 120 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ use datafusion_physical_expr::{
5656
use async_trait::async_trait;
5757
use datafusion_catalog::Session;
5858
use datafusion_physical_expr_common::sort_expr::LexRequirement;
59-
use futures::{future, stream, StreamExt, TryStreamExt};
59+
use futures::{future, StreamExt, TryStreamExt};
6060
use itertools::Itertools;
6161
use object_store::ObjectStore;
6262

@@ -1098,7 +1098,7 @@ impl ListingTable {
10981098
)
10991099
}))
11001100
.await?;
1101-
let file_list = stream::iter(file_list).flatten();
1101+
let file_list = futures::stream::select_all(file_list);
11021102
// collect the statistics if required by the config
11031103
let files = file_list
11041104
.map(|part_file| async {
@@ -1196,6 +1196,8 @@ mod tests {
11961196
use datafusion_physical_plan::ExecutionPlanProperties;
11971197

11981198
use tempfile::TempDir;
1199+
use url::Url;
1200+
use crate::test::object_store::{ensure_head_concurrency, make_test_store_and_state};
11991201

12001202
#[tokio::test]
12011203
async fn read_single_file() -> Result<()> {
@@ -1508,7 +1510,7 @@ mod tests {
15081510
5,
15091511
Some(""),
15101512
)
1511-
.await?;
1513+
.await?;
15121514

15131515
// as many expected partitions as files
15141516
assert_list_files_for_multi_paths(
@@ -1525,7 +1527,7 @@ mod tests {
15251527
5,
15261528
Some(""),
15271529
)
1528-
.await?;
1530+
.await?;
15291531

15301532
// more files as expected partitions
15311533
assert_list_files_for_multi_paths(
@@ -1542,7 +1544,7 @@ mod tests {
15421544
2,
15431545
Some(""),
15441546
)
1545-
.await?;
1547+
.await?;
15461548

15471549
// no files => no groups
15481550
assert_list_files_for_multi_paths(&[], &["test:///bucket/key1/"], 2, 0, Some(""))
@@ -1563,7 +1565,7 @@ mod tests {
15631565
1,
15641566
Some(""),
15651567
)
1566-
.await?;
1568+
.await?;
15671569

15681570
// files that don't match the prefix or the default file ext
15691571
assert_list_files_for_multi_paths(
@@ -1580,7 +1582,76 @@ mod tests {
15801582
2,
15811583
None,
15821584
)
1583-
.await?;
1585+
.await?;
1586+
Ok(())
1587+
}
1588+
1589+
#[tokio::test]
1590+
async fn test_assert_list_files_for_exact_paths() -> Result<()> {
1591+
// more expected partitions than files
1592+
assert_list_files_for_exact_paths(
1593+
&[
1594+
"bucket/key1/file0",
1595+
"bucket/key1/file1",
1596+
"bucket/key1/file2",
1597+
"bucket/key2/file3",
1598+
"bucket/key2/file4",
1599+
],
1600+
12,
1601+
5,
1602+
Some(""),
1603+
)
1604+
.await?;
1605+
1606+
// as many expected partitions as files
1607+
assert_list_files_for_exact_paths(
1608+
&[
1609+
"bucket/key1/file0",
1610+
"bucket/key1/file1",
1611+
"bucket/key1/file2",
1612+
"bucket/key2/file3",
1613+
"bucket/key2/file4",
1614+
],
1615+
5,
1616+
5,
1617+
Some(""),
1618+
)
1619+
.await?;
1620+
1621+
// more files as expected partitions
1622+
assert_list_files_for_exact_paths(
1623+
&[
1624+
"bucket/key1/file0",
1625+
"bucket/key1/file1",
1626+
"bucket/key1/file2",
1627+
"bucket/key2/file3",
1628+
"bucket/key2/file4",
1629+
],
1630+
2,
1631+
2,
1632+
Some(""),
1633+
)
1634+
.await?;
1635+
1636+
// no files => no groups
1637+
assert_list_files_for_exact_paths(&[], 2, 0, Some(""))
1638+
.await?;
1639+
1640+
// files that don't match the default file ext
1641+
assert_list_files_for_exact_paths(
1642+
&[
1643+
"bucket/key1/file0.avro",
1644+
"bucket/key1/file1.csv",
1645+
"bucket/key1/file2.avro",
1646+
"bucket/key2/file3.csv",
1647+
"bucket/key2/file4.avro",
1648+
"bucket/key3/file5.csv",
1649+
],
1650+
2,
1651+
2,
1652+
None,
1653+
)
1654+
.await?;
15841655
Ok(())
15851656
}
15861657

@@ -1670,6 +1741,48 @@ mod tests {
16701741
Ok(())
16711742
}
16721743

1744+
/// Check that the files listed by the table match the specified `output_partitioning`
1745+
/// when the object store contains `files`, and validate that file metadata is fetched
1746+
/// concurrently
1747+
async fn assert_list_files_for_exact_paths(
1748+
files: &[&str],
1749+
target_partitions: usize,
1750+
output_partitioning: usize,
1751+
file_ext: Option<&str>,
1752+
) -> Result<()> {
1753+
let ctx = SessionContext::new();
1754+
let (store, _) = make_test_store_and_state(&files.iter().map(|f| (*f, 10)).collect::<Vec<_>>());
1755+
1756+
let head_blocking_store = ensure_head_concurrency(store, files.len());
1757+
1758+
let url = Url::parse("test://").unwrap();
1759+
ctx.register_object_store(&url, head_blocking_store.clone());
1760+
1761+
let format = AvroFormat {};
1762+
1763+
let opt = ListingOptions::new(Arc::new(format))
1764+
.with_file_extension_opt(file_ext)
1765+
.with_target_partitions(target_partitions);
1766+
1767+
let schema = Schema::new(vec![Field::new("a", DataType::Boolean, false)]);
1768+
1769+
let table_paths = files
1770+
.iter()
1771+
.map(|t| ListingTableUrl::parse(format!("test:///{}", t)).unwrap())
1772+
.collect();
1773+
let config = ListingTableConfig::new_with_multi_paths(table_paths)
1774+
.with_listing_options(opt)
1775+
.with_schema(Arc::new(schema));
1776+
1777+
let table = ListingTable::try_new(config)?;
1778+
1779+
let (file_list, _) = table.list_files_for_scan(&ctx.state(), &[], None).await?;
1780+
1781+
assert_eq!(file_list.len(), output_partitioning);
1782+
1783+
Ok(())
1784+
}
1785+
16731786
#[tokio::test]
16741787
async fn test_insert_into_append_new_json_files() -> Result<()> {
16751788
let mut config_map: HashMap<String, String> = HashMap::new();

datafusion/core/src/test/object_store.rs

Lines changed: 91 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,23 @@
1717

1818
//! Object store implementation used for testing
1919
20+
use std::fmt::{Debug, Display, Formatter};
2021
use crate::execution::context::SessionState;
2122
use crate::execution::session_state::SessionStateBuilder;
2223
use crate::prelude::SessionContext;
2324
use futures::FutureExt;
24-
use object_store::{memory::InMemory, path::Path, ObjectMeta, ObjectStore};
25+
use object_store::{memory::InMemory, path::Path, Error, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore, PutMultipartOpts, PutOptions, PutPayload, PutResult};
2526
use std::sync::Arc;
27+
use futures::stream::BoxStream;
28+
use tokio::{sync::Barrier, time::{timeout, Duration}};
2629
use url::Url;
2730

28-
/// Returns a test object store with the provided `ctx`
31+
/// Registers a test object store with the provided `ctx`
2932
pub fn register_test_store(ctx: &SessionContext, files: &[(&str, u64)]) {
3033
let url = Url::parse("test://").unwrap();
31-
ctx.register_object_store(&url, make_test_store_and_state(files).0);
34+
let (store, _) = make_test_store_and_state(files);
35+
let store = ensure_head_concurrency(store, 4);
36+
ctx.register_object_store(&url, store);
3237
}
3338

3439
/// Create a test object store with the provided files
@@ -61,3 +66,86 @@ pub fn local_unpartitioned_file(path: impl AsRef<std::path::Path>) -> ObjectMeta
6166
version: None,
6267
}
6368
}
69+
70+
/// Blocks the object_store `head` call until `concurrency` number of calls are pending.
71+
pub fn ensure_head_concurrency(object_store: Arc<dyn ObjectStore>, concurrency: usize) -> Arc<dyn ObjectStore> {
72+
Arc::new(BlockingObjectStore::new(object_store, concurrency))
73+
}
74+
75+
/// An object store that “blocks” in its `head` call until an expected number of concurrent calls are reached.
76+
#[derive(Debug)]
77+
struct BlockingObjectStore {
78+
inner: Arc<dyn ObjectStore>,
79+
barrier: Arc<Barrier>,
80+
}
81+
82+
impl BlockingObjectStore {
83+
const NAME: &'static str = "BlockingObjectStore";
84+
fn new(inner: Arc<dyn ObjectStore>, expected_concurrency: usize) -> Self {
85+
Self {
86+
inner,
87+
barrier: Arc::new(Barrier::new(expected_concurrency)),
88+
}
89+
}
90+
}
91+
92+
impl Display for BlockingObjectStore {
93+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
94+
Display::fmt(&self.inner, f)
95+
}
96+
}
97+
98+
/// All trait methods are forwarded to the inner object store, except for
99+
/// the `head` method which waits until the expected number of concurrent calls is reached.
100+
#[async_trait::async_trait]
101+
impl ObjectStore for BlockingObjectStore {
102+
async fn put_opts(&self, location: &Path, payload: PutPayload, opts: PutOptions) -> object_store::Result<PutResult> {
103+
self.inner.put_opts(location, payload, opts).await
104+
}
105+
async fn put_multipart_opts(&self, location: &Path, opts: PutMultipartOpts) -> object_store::Result<Box<dyn MultipartUpload>> {
106+
self.inner.put_multipart_opts(location, opts).await
107+
}
108+
109+
async fn get_opts(&self, location: &Path, options: GetOptions) -> object_store::Result<GetResult> {
110+
self.inner.get_opts(location, options).await
111+
}
112+
113+
async fn head(&self, location: &Path) -> object_store::Result<ObjectMeta> {
114+
println!("{} received head call for {location}", BlockingObjectStore::NAME);
115+
// Wait until the expected number of concurrent calls is reached, but timeout after 1 second to avoid hanging failing tests.
116+
let wait_result = timeout(Duration::from_secs(1), self.barrier.wait()).await;
117+
match wait_result {
118+
Ok(_) => println!("{} barrier reached for {location}", BlockingObjectStore::NAME),
119+
Err(_) => {
120+
let error_message = format!("{} barrier wait timed out for {location}", BlockingObjectStore::NAME);
121+
log::error!("{}", error_message);
122+
return Err(Error::Generic {
123+
store: BlockingObjectStore::NAME,
124+
source: error_message.into(),
125+
});
126+
}
127+
}
128+
// Forward the call to the inner object store.
129+
self.inner.head(location).await
130+
}
131+
132+
async fn delete(&self, location: &Path) -> object_store::Result<()> {
133+
self.inner.delete(location).await
134+
}
135+
136+
fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, object_store::Result<ObjectMeta>> {
137+
self.inner.list(prefix)
138+
}
139+
140+
async fn list_with_delimiter(&self, prefix: Option<&Path>) -> object_store::Result<ListResult> {
141+
self.inner.list_with_delimiter(prefix).await
142+
}
143+
144+
async fn copy(&self, from: &Path, to: &Path) -> object_store::Result<()> {
145+
self.inner.copy(from, to).await
146+
}
147+
148+
async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> object_store::Result<()> {
149+
self.inner.copy_if_not_exists(from, to).await
150+
}
151+
}

0 commit comments

Comments
 (0)