Skip to content

Commit 528ed60

Browse files
xiedeyantualamb
andauthored
Improved experience when remote object store URL does not end in / (apache#17364)
* Improved experience when remote object store URL does not end in / * refactor * fix code style * fix * fix * Simplify implementation, add more tests * tweak * Add comments * fix and test non existing --------- Co-authored-by: Andrew Lamb <[email protected]>
1 parent 39c5983 commit 528ed60

File tree

3 files changed

+284
-19
lines changed

3 files changed

+284
-19
lines changed

datafusion-cli/tests/cli_integration.rs

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -360,3 +360,40 @@ fn test_backtrace_output(#[case] query: &str) {
360360
stderr
361361
);
362362
}
363+
364+
#[tokio::test]
365+
async fn test_s3_url_fallback() {
366+
if env::var("TEST_STORAGE_INTEGRATION").is_err() {
367+
eprintln!("Skipping external storages integration tests");
368+
return;
369+
}
370+
371+
let container = setup_minio_container().await;
372+
373+
let mut settings = make_settings();
374+
settings.set_snapshot_suffix("s3_url_fallback");
375+
let _bound = settings.bind_to_scope();
376+
377+
let port = container.get_host_port_ipv4(9000).await.unwrap();
378+
379+
// Create a table using a prefix path (without trailing slash)
380+
// This should trigger the fallback logic where head() fails on the prefix
381+
// and list() is used to discover the actual files
382+
let input = r#"CREATE EXTERNAL TABLE partitioned_data
383+
STORED AS CSV
384+
LOCATION 's3://data/partitioned_csv'
385+
OPTIONS (
386+
'format.has_header' 'false'
387+
);
388+
389+
SELECT * FROM partitioned_data ORDER BY column_1, column_2 LIMIT 5;
390+
"#;
391+
392+
assert_cmd_snapshot!(cli()
393+
.env_clear()
394+
.env("AWS_ACCESS_KEY_ID", "TEST-DataFusionLogin")
395+
.env("AWS_SECRET_ACCESS_KEY", "TEST-DataFusionPassword")
396+
.env("AWS_ENDPOINT", format!("http://localhost:{port}"))
397+
.env("AWS_ALLOW_HTTP", "true")
398+
.pass_stdin(input));
399+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
---
2+
source: datafusion-cli/tests/cli_integration.rs
3+
info:
4+
program: datafusion-cli
5+
args: []
6+
env:
7+
AWS_ACCESS_KEY_ID: TEST-DataFusionLogin
8+
AWS_ALLOW_HTTP: "true"
9+
AWS_ENDPOINT: "http://localhost:32771"
10+
AWS_SECRET_ACCESS_KEY: TEST-DataFusionPassword
11+
stdin: "CREATE EXTERNAL TABLE partitioned_data\nSTORED AS CSV\nLOCATION 's3://data/partitioned_csv'\nOPTIONS (\n 'format.has_header' 'false'\n);\n\nSELECT * FROM partitioned_data ORDER BY column_1, column_2 LIMIT 5;\n"
12+
---
13+
success: true
14+
exit_code: 0
15+
----- stdout -----
16+
[CLI_VERSION]
17+
0 row(s) fetched.
18+
[ELAPSED]
19+
20+
+----------+----------+----------+
21+
| column_1 | column_2 | column_3 |
22+
+----------+----------+----------+
23+
| 0 | 0 | true |
24+
| 0 | 1 | false |
25+
| 0 | 2 | true |
26+
| 0 | 3 | false |
27+
| 0 | 4 | true |
28+
+----------+----------+----------+
29+
5 row(s) fetched.
30+
[ELAPSED]
31+
32+
\q
33+
34+
----- stderr -----

datafusion/datasource/src/url.rs

Lines changed: 213 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -242,33 +242,27 @@ impl ListingTableUrl {
242242
) -> Result<BoxStream<'a, Result<ObjectMeta>>> {
243243
let exec_options = &ctx.config_options().execution;
244244
let ignore_subdirectory = exec_options.listing_table_ignore_subdirectory;
245-
// If the prefix is a file, use a head request, otherwise list
246-
let list = match self.is_collection() {
247-
true => match ctx.runtime_env().cache_manager.get_list_files_cache() {
248-
None => store.list(Some(&self.prefix)),
249-
Some(cache) => {
250-
if let Some(res) = cache.get(&self.prefix) {
251-
debug!("Hit list all files cache");
252-
futures::stream::iter(res.as_ref().clone().into_iter().map(Ok))
253-
.boxed()
254-
} else {
255-
let list_res = store.list(Some(&self.prefix));
256-
let vec = list_res.try_collect::<Vec<ObjectMeta>>().await?;
257-
cache.put(&self.prefix, Arc::new(vec.clone()));
258-
futures::stream::iter(vec.into_iter().map(Ok)).boxed()
259-
}
260-
}
261-
},
262-
false => futures::stream::once(store.head(&self.prefix)).boxed(),
245+
246+
let list: BoxStream<'a, Result<ObjectMeta>> = if self.is_collection() {
247+
list_with_cache(ctx, store, &self.prefix).await?
248+
} else {
249+
match store.head(&self.prefix).await {
250+
Ok(meta) => futures::stream::once(async { Ok(meta) })
251+
.map_err(|e| DataFusionError::ObjectStore(Box::new(e)))
252+
.boxed(),
253+
// If the head command fails, it is likely that object doesn't exist.
254+
// Retry as though it were a prefix (aka a collection)
255+
Err(_) => list_with_cache(ctx, store, &self.prefix).await?,
256+
}
263257
};
258+
264259
Ok(list
265260
.try_filter(move |meta| {
266261
let path = &meta.location;
267262
let extension_match = path.as_ref().ends_with(file_extension);
268263
let glob_match = self.contains(path, ignore_subdirectory);
269264
futures::future::ready(extension_match && glob_match)
270265
})
271-
.map_err(|e| DataFusionError::ObjectStore(Box::new(e)))
272266
.boxed())
273267
}
274268

@@ -306,6 +300,33 @@ impl ListingTableUrl {
306300
}
307301
}
308302

303+
async fn list_with_cache<'b>(
304+
ctx: &'b dyn Session,
305+
store: &'b dyn ObjectStore,
306+
prefix: &'b Path,
307+
) -> Result<BoxStream<'b, Result<ObjectMeta>>> {
308+
match ctx.runtime_env().cache_manager.get_list_files_cache() {
309+
None => Ok(store
310+
.list(Some(prefix))
311+
.map(|res| res.map_err(|e| DataFusionError::ObjectStore(Box::new(e))))
312+
.boxed()),
313+
Some(cache) => {
314+
let vec = if let Some(res) = cache.get(prefix) {
315+
debug!("Hit list all files cache");
316+
res.as_ref().clone()
317+
} else {
318+
let vec = store
319+
.list(Some(prefix))
320+
.try_collect::<Vec<ObjectMeta>>()
321+
.await?;
322+
cache.put(prefix, Arc::new(vec.clone()));
323+
vec
324+
};
325+
Ok(futures::stream::iter(vec.into_iter().map(Ok)).boxed())
326+
}
327+
}
328+
}
329+
309330
/// Creates a file URL from a potentially relative filesystem path
310331
#[cfg(not(target_arch = "wasm32"))]
311332
fn url_from_filesystem_path(s: &str) -> Option<Url> {
@@ -384,6 +405,18 @@ fn split_glob_expression(path: &str) -> Option<(&str, &str)> {
384405
#[cfg(test)]
385406
mod tests {
386407
use super::*;
408+
use datafusion_common::config::TableOptions;
409+
use datafusion_common::DFSchema;
410+
use datafusion_execution::config::SessionConfig;
411+
use datafusion_execution::runtime_env::RuntimeEnv;
412+
use datafusion_execution::TaskContext;
413+
use datafusion_expr::execution_props::ExecutionProps;
414+
use datafusion_expr::{AggregateUDF, Expr, LogicalPlan, ScalarUDF, WindowUDF};
415+
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
416+
use datafusion_physical_plan::ExecutionPlan;
417+
use object_store::PutPayload;
418+
use std::any::Any;
419+
use std::collections::HashMap;
387420
use tempfile::tempdir;
388421

389422
#[test]
@@ -597,4 +630,165 @@ mod tests {
597630
"file path ends with .ext - extension is ext",
598631
);
599632
}
633+
634+
#[tokio::test]
635+
async fn test_list_files() {
636+
let store = object_store::memory::InMemory::new();
637+
// Create some files:
638+
create_file(&store, "a.parquet").await;
639+
create_file(&store, "/t/b.parquet").await;
640+
create_file(&store, "/t/c.csv").await;
641+
create_file(&store, "/t/d.csv").await;
642+
643+
assert_eq!(
644+
list_all_files("/", &store, "parquet").await,
645+
vec!["a.parquet"],
646+
);
647+
648+
// test with and without trailing slash
649+
assert_eq!(
650+
list_all_files("/t/", &store, "parquet").await,
651+
vec!["t/b.parquet"],
652+
);
653+
assert_eq!(
654+
list_all_files("/t", &store, "parquet").await,
655+
vec!["t/b.parquet"],
656+
);
657+
658+
// test with and without trailing slash
659+
assert_eq!(
660+
list_all_files("/t", &store, "csv").await,
661+
vec!["t/c.csv", "t/d.csv"],
662+
);
663+
assert_eq!(
664+
list_all_files("/t/", &store, "csv").await,
665+
vec!["t/c.csv", "t/d.csv"],
666+
);
667+
668+
// Test a non existing prefix
669+
assert_eq!(
670+
list_all_files("/NonExisting", &store, "csv").await,
671+
vec![] as Vec<String>
672+
);
673+
assert_eq!(
674+
list_all_files("/NonExisting/", &store, "csv").await,
675+
vec![] as Vec<String>
676+
);
677+
}
678+
679+
/// Creates a file with "hello world" content at the specified path
680+
async fn create_file(object_store: &dyn ObjectStore, path: &str) {
681+
object_store
682+
.put(&Path::from(path), PutPayload::from_static(b"hello world"))
683+
.await
684+
.expect("failed to create test file");
685+
}
686+
687+
/// Runs "list_all_files" and returns their paths
688+
///
689+
/// Panic's on error
690+
async fn list_all_files(
691+
url: &str,
692+
store: &dyn ObjectStore,
693+
file_extension: &str,
694+
) -> Vec<String> {
695+
try_list_all_files(url, store, file_extension)
696+
.await
697+
.unwrap()
698+
}
699+
700+
/// Runs "list_all_files" and returns their paths
701+
async fn try_list_all_files(
702+
url: &str,
703+
store: &dyn ObjectStore,
704+
file_extension: &str,
705+
) -> Result<Vec<String>> {
706+
let session = MockSession::new();
707+
let url = ListingTableUrl::parse(url)?;
708+
let files = url
709+
.list_all_files(&session, store, file_extension)
710+
.await?
711+
.try_collect::<Vec<_>>()
712+
.await?
713+
.into_iter()
714+
.map(|meta| meta.location.as_ref().to_string())
715+
.collect();
716+
Ok(files)
717+
}
718+
719+
struct MockSession {
720+
config: SessionConfig,
721+
runtime_env: Arc<RuntimeEnv>,
722+
}
723+
724+
impl MockSession {
725+
fn new() -> Self {
726+
Self {
727+
config: SessionConfig::new(),
728+
runtime_env: Arc::new(RuntimeEnv::default()),
729+
}
730+
}
731+
}
732+
733+
#[async_trait::async_trait]
734+
impl Session for MockSession {
735+
fn session_id(&self) -> &str {
736+
unimplemented!()
737+
}
738+
739+
fn config(&self) -> &SessionConfig {
740+
&self.config
741+
}
742+
743+
async fn create_physical_plan(
744+
&self,
745+
_logical_plan: &LogicalPlan,
746+
) -> Result<Arc<dyn ExecutionPlan>> {
747+
unimplemented!()
748+
}
749+
750+
fn create_physical_expr(
751+
&self,
752+
_expr: Expr,
753+
_df_schema: &DFSchema,
754+
) -> Result<Arc<dyn PhysicalExpr>> {
755+
unimplemented!()
756+
}
757+
758+
fn scalar_functions(&self) -> &HashMap<String, Arc<ScalarUDF>> {
759+
unimplemented!()
760+
}
761+
762+
fn aggregate_functions(&self) -> &HashMap<String, Arc<AggregateUDF>> {
763+
unimplemented!()
764+
}
765+
766+
fn window_functions(&self) -> &HashMap<String, Arc<WindowUDF>> {
767+
unimplemented!()
768+
}
769+
770+
fn runtime_env(&self) -> &Arc<RuntimeEnv> {
771+
&self.runtime_env
772+
}
773+
774+
fn execution_props(&self) -> &ExecutionProps {
775+
unimplemented!()
776+
}
777+
778+
fn as_any(&self) -> &dyn Any {
779+
unimplemented!()
780+
}
781+
782+
fn table_options(&self) -> &TableOptions {
783+
unimplemented!()
784+
}
785+
786+
fn table_options_mut(&mut self) -> &mut TableOptions {
787+
unimplemented!()
788+
}
789+
790+
fn task_ctx(&self) -> Arc<TaskContext> {
791+
unimplemented!()
792+
}
793+
}
600794
}

0 commit comments

Comments
 (0)