Skip to content

Commit ee06d47

Browse files
Fix sequential metadata fetching in ListingTable causing high latency
When scanning an exact list of remote Parquet files, the ListingTable was fetching file metadata (via head calls) sequentially. This was due to using `stream::iter(file_list).flatten()`, which processes each one-item stream in order. For remote blob stores, where each head call can take tens to hundreds of milliseconds, this sequential behavior significantly increased the time to create the physical plan. This commit replaces the sequential flattening with concurrent merging using `futures::stream::select_all(file_list)`. With this change, the `head` requests are executed in parallel (up to the configured `meta_fetch_concurrency` limit), reducing latency when creating the physical plan. Additionally, tests have been updated to ensure that metadata fetching occurs concurrently.
1 parent fc2fbb3 commit ee06d47

File tree

2 files changed

+249
-5
lines changed

2 files changed

+249
-5
lines changed

datafusion/core/src/datasource/listing/table.rs

Lines changed: 116 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ use datafusion_physical_expr::{
5656
use async_trait::async_trait;
5757
use datafusion_catalog::Session;
5858
use datafusion_physical_expr_common::sort_expr::LexRequirement;
59-
use futures::{future, stream, StreamExt, TryStreamExt};
59+
use futures::{future, StreamExt, TryStreamExt};
6060
use itertools::Itertools;
6161
use object_store::ObjectStore;
6262

@@ -1098,7 +1098,7 @@ impl ListingTable {
10981098
)
10991099
}))
11001100
.await?;
1101-
let file_list = stream::iter(file_list).flatten();
1101+
let file_list = futures::stream::select_all(file_list);
11021102
// collect the statistics if required by the config
11031103
let files = file_list
11041104
.map(|part_file| async {
@@ -1195,7 +1195,9 @@ mod tests {
11951195
use datafusion_physical_expr::PhysicalSortExpr;
11961196
use datafusion_physical_plan::ExecutionPlanProperties;
11971197

1198+
use crate::test::object_store::{ensure_head_concurrency, make_test_store_and_state};
11981199
use tempfile::TempDir;
1200+
use url::Url;
11991201

12001202
#[tokio::test]
12011203
async fn read_single_file() -> Result<()> {
@@ -1584,6 +1586,74 @@ mod tests {
15841586
Ok(())
15851587
}
15861588

1589+
#[tokio::test]
1590+
async fn test_assert_list_files_for_exact_paths() -> Result<()> {
1591+
// more expected partitions than files
1592+
assert_list_files_for_exact_paths(
1593+
&[
1594+
"bucket/key1/file0",
1595+
"bucket/key1/file1",
1596+
"bucket/key1/file2",
1597+
"bucket/key2/file3",
1598+
"bucket/key2/file4",
1599+
],
1600+
12,
1601+
5,
1602+
Some(""),
1603+
)
1604+
.await?;
1605+
1606+
// as many expected partitions as files
1607+
assert_list_files_for_exact_paths(
1608+
&[
1609+
"bucket/key1/file0",
1610+
"bucket/key1/file1",
1611+
"bucket/key1/file2",
1612+
"bucket/key2/file3",
1613+
"bucket/key2/file4",
1614+
],
1615+
5,
1616+
5,
1617+
Some(""),
1618+
)
1619+
.await?;
1620+
1621+
// more files as expected partitions
1622+
assert_list_files_for_exact_paths(
1623+
&[
1624+
"bucket/key1/file0",
1625+
"bucket/key1/file1",
1626+
"bucket/key1/file2",
1627+
"bucket/key2/file3",
1628+
"bucket/key2/file4",
1629+
],
1630+
2,
1631+
2,
1632+
Some(""),
1633+
)
1634+
.await?;
1635+
1636+
// no files => no groups
1637+
assert_list_files_for_exact_paths(&[], 2, 0, Some("")).await?;
1638+
1639+
// files that don't match the default file ext
1640+
assert_list_files_for_exact_paths(
1641+
&[
1642+
"bucket/key1/file0.avro",
1643+
"bucket/key1/file1.csv",
1644+
"bucket/key1/file2.avro",
1645+
"bucket/key2/file3.csv",
1646+
"bucket/key2/file4.avro",
1647+
"bucket/key3/file5.csv",
1648+
],
1649+
2,
1650+
2,
1651+
None,
1652+
)
1653+
.await?;
1654+
Ok(())
1655+
}
1656+
15871657
async fn load_table(
15881658
ctx: &SessionContext,
15891659
name: &str,
@@ -1670,6 +1740,50 @@ mod tests {
16701740
Ok(())
16711741
}
16721742

1743+
/// Check that the files listed by the table match the specified `output_partitioning`
1744+
/// when the object store contains `files`, and validate that file metadata is fetched
1745+
/// concurrently
1746+
async fn assert_list_files_for_exact_paths(
1747+
files: &[&str],
1748+
target_partitions: usize,
1749+
output_partitioning: usize,
1750+
file_ext: Option<&str>,
1751+
) -> Result<()> {
1752+
let ctx = SessionContext::new();
1753+
let (store, _) = make_test_store_and_state(
1754+
&files.iter().map(|f| (*f, 10)).collect::<Vec<_>>(),
1755+
);
1756+
1757+
let head_blocking_store = ensure_head_concurrency(store, files.len());
1758+
1759+
let url = Url::parse("test://").unwrap();
1760+
ctx.register_object_store(&url, head_blocking_store.clone());
1761+
1762+
let format = AvroFormat {};
1763+
1764+
let opt = ListingOptions::new(Arc::new(format))
1765+
.with_file_extension_opt(file_ext)
1766+
.with_target_partitions(target_partitions);
1767+
1768+
let schema = Schema::new(vec![Field::new("a", DataType::Boolean, false)]);
1769+
1770+
let table_paths = files
1771+
.iter()
1772+
.map(|t| ListingTableUrl::parse(format!("test:///{}", t)).unwrap())
1773+
.collect();
1774+
let config = ListingTableConfig::new_with_multi_paths(table_paths)
1775+
.with_listing_options(opt)
1776+
.with_schema(Arc::new(schema));
1777+
1778+
let table = ListingTable::try_new(config)?;
1779+
1780+
let (file_list, _) = table.list_files_for_scan(&ctx.state(), &[], None).await?;
1781+
1782+
assert_eq!(file_list.len(), output_partitioning);
1783+
1784+
Ok(())
1785+
}
1786+
16731787
#[tokio::test]
16741788
async fn test_insert_into_append_new_json_files() -> Result<()> {
16751789
let mut config_map: HashMap<String, String> = HashMap::new();

datafusion/core/src/test/object_store.rs

Lines changed: 133 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,27 @@
2020
use crate::execution::context::SessionState;
2121
use crate::execution::session_state::SessionStateBuilder;
2222
use crate::prelude::SessionContext;
23+
use futures::stream::BoxStream;
2324
use futures::FutureExt;
24-
use object_store::{memory::InMemory, path::Path, ObjectMeta, ObjectStore};
25+
use object_store::{
26+
memory::InMemory, path::Path, Error, GetOptions, GetResult, ListResult,
27+
MultipartUpload, ObjectMeta, ObjectStore, PutMultipartOpts, PutOptions, PutPayload,
28+
PutResult,
29+
};
30+
use std::fmt::{Debug, Display, Formatter};
2531
use std::sync::Arc;
32+
use tokio::{
33+
sync::Barrier,
34+
time::{timeout, Duration},
35+
};
2636
use url::Url;
2737

28-
/// Returns a test object store with the provided `ctx`
38+
/// Registers a test object store with the provided `ctx`
2939
pub fn register_test_store(ctx: &SessionContext, files: &[(&str, u64)]) {
3040
let url = Url::parse("test://").unwrap();
31-
ctx.register_object_store(&url, make_test_store_and_state(files).0);
41+
let (store, _) = make_test_store_and_state(files);
42+
let store = ensure_head_concurrency(store, 4);
43+
ctx.register_object_store(&url, store);
3244
}
3345

3446
/// Create a test object store with the provided files
@@ -61,3 +73,121 @@ pub fn local_unpartitioned_file(path: impl AsRef<std::path::Path>) -> ObjectMeta
6173
version: None,
6274
}
6375
}
76+
77+
/// Blocks the object_store `head` call until `concurrency` number of calls are pending.
78+
pub fn ensure_head_concurrency(
79+
object_store: Arc<dyn ObjectStore>,
80+
concurrency: usize,
81+
) -> Arc<dyn ObjectStore> {
82+
Arc::new(BlockingObjectStore::new(object_store, concurrency))
83+
}
84+
85+
/// An object store that “blocks” in its `head` call until an expected number of concurrent calls are reached.
86+
#[derive(Debug)]
87+
struct BlockingObjectStore {
88+
inner: Arc<dyn ObjectStore>,
89+
barrier: Arc<Barrier>,
90+
}
91+
92+
impl BlockingObjectStore {
93+
const NAME: &'static str = "BlockingObjectStore";
94+
fn new(inner: Arc<dyn ObjectStore>, expected_concurrency: usize) -> Self {
95+
Self {
96+
inner,
97+
barrier: Arc::new(Barrier::new(expected_concurrency)),
98+
}
99+
}
100+
}
101+
102+
impl Display for BlockingObjectStore {
103+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
104+
Display::fmt(&self.inner, f)
105+
}
106+
}
107+
108+
/// All trait methods are forwarded to the inner object store, except for
109+
/// the `head` method which waits until the expected number of concurrent calls is reached.
110+
#[async_trait::async_trait]
111+
impl ObjectStore for BlockingObjectStore {
112+
async fn put_opts(
113+
&self,
114+
location: &Path,
115+
payload: PutPayload,
116+
opts: PutOptions,
117+
) -> object_store::Result<PutResult> {
118+
self.inner.put_opts(location, payload, opts).await
119+
}
120+
async fn put_multipart_opts(
121+
&self,
122+
location: &Path,
123+
opts: PutMultipartOpts,
124+
) -> object_store::Result<Box<dyn MultipartUpload>> {
125+
self.inner.put_multipart_opts(location, opts).await
126+
}
127+
128+
async fn get_opts(
129+
&self,
130+
location: &Path,
131+
options: GetOptions,
132+
) -> object_store::Result<GetResult> {
133+
self.inner.get_opts(location, options).await
134+
}
135+
136+
async fn head(&self, location: &Path) -> object_store::Result<ObjectMeta> {
137+
println!(
138+
"{} received head call for {location}",
139+
BlockingObjectStore::NAME
140+
);
141+
// Wait until the expected number of concurrent calls is reached, but timeout after 1 second to avoid hanging failing tests.
142+
let wait_result = timeout(Duration::from_secs(1), self.barrier.wait()).await;
143+
match wait_result {
144+
Ok(_) => println!(
145+
"{} barrier reached for {location}",
146+
BlockingObjectStore::NAME
147+
),
148+
Err(_) => {
149+
let error_message = format!(
150+
"{} barrier wait timed out for {location}",
151+
BlockingObjectStore::NAME
152+
);
153+
log::error!("{}", error_message);
154+
return Err(Error::Generic {
155+
store: BlockingObjectStore::NAME,
156+
source: error_message.into(),
157+
});
158+
}
159+
}
160+
// Forward the call to the inner object store.
161+
self.inner.head(location).await
162+
}
163+
164+
async fn delete(&self, location: &Path) -> object_store::Result<()> {
165+
self.inner.delete(location).await
166+
}
167+
168+
fn list(
169+
&self,
170+
prefix: Option<&Path>,
171+
) -> BoxStream<'_, object_store::Result<ObjectMeta>> {
172+
self.inner.list(prefix)
173+
}
174+
175+
async fn list_with_delimiter(
176+
&self,
177+
prefix: Option<&Path>,
178+
) -> object_store::Result<ListResult> {
179+
self.inner.list_with_delimiter(prefix).await
180+
}
181+
182+
async fn copy(&self, from: &Path, to: &Path) -> object_store::Result<()> {
183+
self.inner.copy(from, to).await
184+
}
185+
186+
async fn copy_if_not_exists(
187+
&self,
188+
from: &Path,
189+
to: &Path,
190+
) -> object_store::Result<()> {
191+
self.inner.copy_if_not_exists(from, to).await
192+
}
193+
}

0 commit comments

Comments
 (0)