Skip to content

Commit 3649dc8

Browse files
authored
datafusion-cli: Use correct S3 region if it is not specified (#16502)
* datafusion-cli: Use correct S3 region if it is not specified * Simplify the retry logic
1 parent b405380 commit 3649dc8

File tree

5 files changed

+183
-23
lines changed

5 files changed

+183
-23
lines changed

datafusion-cli/src/catalog.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,7 @@ impl SchemaProvider for DynamicObjectStoreSchemaProvider {
200200
table_url.scheme(),
201201
url,
202202
&state.default_table_options(),
203+
false,
203204
)
204205
.await?;
205206
state.runtime_env().register_object_store(url, store);
@@ -229,7 +230,6 @@ pub fn substitute_tilde(cur: String) -> String {
229230
}
230231
#[cfg(test)]
231232
mod tests {
232-
233233
use super::*;
234234

235235
use datafusion::catalog::SchemaProvider;

datafusion-cli/src/exec.rs

Lines changed: 38 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -26,28 +26,28 @@ use crate::{
2626
object_storage::get_object_store,
2727
print_options::{MaxRows, PrintOptions},
2828
};
29-
use futures::StreamExt;
30-
use std::collections::HashMap;
31-
use std::fs::File;
32-
use std::io::prelude::*;
33-
use std::io::BufReader;
34-
3529
use datafusion::common::instant::Instant;
3630
use datafusion::common::{plan_datafusion_err, plan_err};
3731
use datafusion::config::ConfigFileType;
3832
use datafusion::datasource::listing::ListingTableUrl;
3933
use datafusion::error::{DataFusionError, Result};
34+
use datafusion::execution::memory_pool::MemoryConsumer;
4035
use datafusion::logical_expr::{DdlStatement, LogicalPlan};
4136
use datafusion::physical_plan::execution_plan::EmissionType;
37+
use datafusion::physical_plan::spill::get_record_batch_memory_size;
4238
use datafusion::physical_plan::{execute_stream, ExecutionPlanProperties};
4339
use datafusion::sql::parser::{DFParser, Statement};
44-
use datafusion::sql::sqlparser::dialect::dialect_from_str;
45-
46-
use datafusion::execution::memory_pool::MemoryConsumer;
47-
use datafusion::physical_plan::spill::get_record_batch_memory_size;
4840
use datafusion::sql::sqlparser;
41+
use datafusion::sql::sqlparser::dialect::dialect_from_str;
42+
use futures::StreamExt;
43+
use log::warn;
44+
use object_store::Error::Generic;
4945
use rustyline::error::ReadlineError;
5046
use rustyline::Editor;
47+
use std::collections::HashMap;
48+
use std::fs::File;
49+
use std::io::prelude::*;
50+
use std::io::BufReader;
5151
use tokio::signal;
5252

5353
/// run and execute SQL statements and commands, against a context with the given print options
@@ -231,10 +231,21 @@ pub(super) async fn exec_and_print(
231231
let adjusted =
232232
AdjustedPrintOptions::new(print_options.clone()).with_statement(&statement);
233233

234-
let plan = create_plan(ctx, statement).await?;
234+
let plan = create_plan(ctx, statement.clone(), false).await?;
235235
let adjusted = adjusted.with_plan(&plan);
236236

237-
let df = ctx.execute_logical_plan(plan).await?;
237+
let df = match ctx.execute_logical_plan(plan).await {
238+
Ok(df) => df,
239+
Err(DataFusionError::ObjectStore(Generic { store, source: _ }))
240+
if "S3".eq_ignore_ascii_case(store)
241+
&& matches!(&statement, Statement::CreateExternalTable(_)) =>
242+
{
243+
warn!("S3 region is incorrect, auto-detecting the correct region (this may be slow). Consider updating your region configuration.");
244+
let plan = create_plan(ctx, statement, true).await?;
245+
ctx.execute_logical_plan(plan).await?
246+
}
247+
Err(e) => return Err(e),
248+
};
238249
let physical_plan = df.create_physical_plan().await?;
239250

240251
// Track memory usage for the query result if it's bounded
@@ -348,6 +359,7 @@ fn config_file_type_from_str(ext: &str) -> Option<ConfigFileType> {
348359
async fn create_plan(
349360
ctx: &dyn CliSessionContext,
350361
statement: Statement,
362+
resolve_region: bool,
351363
) -> Result<LogicalPlan, DataFusionError> {
352364
let mut plan = ctx.session_state().statement_to_plan(statement).await?;
353365

@@ -362,6 +374,7 @@ async fn create_plan(
362374
&cmd.location,
363375
&cmd.options,
364376
format,
377+
resolve_region,
365378
)
366379
.await?;
367380
}
@@ -374,6 +387,7 @@ async fn create_plan(
374387
&copy_to.output_url,
375388
&copy_to.options,
376389
format,
390+
false,
377391
)
378392
.await?;
379393
}
@@ -412,6 +426,7 @@ pub(crate) async fn register_object_store_and_config_extensions(
412426
location: &String,
413427
options: &HashMap<String, String>,
414428
format: Option<ConfigFileType>,
429+
resolve_region: bool,
415430
) -> Result<()> {
416431
// Parse the location URL to extract the scheme and other components
417432
let table_path = ListingTableUrl::parse(location)?;
@@ -433,8 +448,14 @@ pub(crate) async fn register_object_store_and_config_extensions(
433448
table_options.alter_with_string_hash_map(options)?;
434449

435450
// Retrieve the appropriate object store based on the scheme, URL, and modified table options
436-
let store =
437-
get_object_store(&ctx.session_state(), scheme, url, &table_options).await?;
451+
let store = get_object_store(
452+
&ctx.session_state(),
453+
scheme,
454+
url,
455+
&table_options,
456+
resolve_region,
457+
)
458+
.await?;
438459

439460
// Register the retrieved object store in the session context's runtime environment
440461
ctx.register_object_store(url, store);
@@ -462,6 +483,7 @@ mod tests {
462483
&cmd.location,
463484
&cmd.options,
464485
format,
486+
false,
465487
)
466488
.await?;
467489
} else {
@@ -488,6 +510,7 @@ mod tests {
488510
&cmd.output_url,
489511
&cmd.options,
490512
format,
513+
false,
491514
)
492515
.await?;
493516
} else {
@@ -534,7 +557,7 @@ mod tests {
534557
let statements = DFParser::parse_sql_with_dialect(&sql, dialect.as_ref())?;
535558
for statement in statements {
536559
//Should not fail
537-
let mut plan = create_plan(&ctx, statement).await?;
560+
let mut plan = create_plan(&ctx, statement, false).await?;
538561
if let LogicalPlan::Copy(copy_to) = &mut plan {
539562
assert_eq!(copy_to.output_url, location);
540563
assert_eq!(copy_to.file_type.get_ext(), "parquet".to_string());

datafusion-cli/src/object_storage.rs

Lines changed: 82 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,15 +32,28 @@ use aws_config::BehaviorVersion;
3232
use aws_credential_types::provider::error::CredentialsError;
3333
use aws_credential_types::provider::{ProvideCredentials, SharedCredentialsProvider};
3434
use log::debug;
35-
use object_store::aws::{AmazonS3Builder, AwsCredential};
35+
use object_store::aws::{AmazonS3Builder, AmazonS3ConfigKey, AwsCredential};
3636
use object_store::gcp::GoogleCloudStorageBuilder;
3737
use object_store::http::HttpBuilder;
3838
use object_store::{ClientOptions, CredentialProvider, ObjectStore};
3939
use url::Url;
4040

41+
#[cfg(not(test))]
42+
use object_store::aws::resolve_bucket_region;
43+
44+
// Provide a local mock when running tests so we don't make network calls
45+
#[cfg(test)]
46+
async fn resolve_bucket_region(
47+
_bucket: &str,
48+
_client_options: &ClientOptions,
49+
) -> object_store::Result<String> {
50+
Ok("eu-central-1".to_string())
51+
}
52+
4153
pub async fn get_s3_object_store_builder(
4254
url: &Url,
4355
aws_options: &AwsOptions,
56+
resolve_region: bool,
4457
) -> Result<AmazonS3Builder> {
4558
let AwsOptions {
4659
access_key_id,
@@ -88,6 +101,16 @@ pub async fn get_s3_object_store_builder(
88101
builder = builder.with_region(region);
89102
}
90103

104+
// If the region is not set or auto_detect_region is true, resolve the region.
105+
if builder
106+
.get_config_value(&AmazonS3ConfigKey::Region)
107+
.is_none()
108+
|| resolve_region
109+
{
110+
let region = resolve_bucket_region(bucket_name, &ClientOptions::new()).await?;
111+
builder = builder.with_region(region);
112+
}
113+
91114
if let Some(endpoint) = endpoint {
92115
// Make a nicer error if the user hasn't allowed http and the endpoint
93116
// is http as the default message is "URL scheme is not allowed"
@@ -470,6 +493,7 @@ pub(crate) async fn get_object_store(
470493
scheme: &str,
471494
url: &Url,
472495
table_options: &TableOptions,
496+
resolve_region: bool,
473497
) -> Result<Arc<dyn ObjectStore>, DataFusionError> {
474498
let store: Arc<dyn ObjectStore> = match scheme {
475499
"s3" => {
@@ -478,7 +502,8 @@ pub(crate) async fn get_object_store(
478502
"Given table options incompatible with the 's3' scheme"
479503
);
480504
};
481-
let builder = get_s3_object_store_builder(url, options).await?;
505+
let builder =
506+
get_s3_object_store_builder(url, options, resolve_region).await?;
482507
Arc::new(builder.build()?)
483508
}
484509
"oss" => {
@@ -557,12 +582,14 @@ mod tests {
557582
let table_options = get_table_options(&ctx, &sql).await;
558583
let aws_options = table_options.extensions.get::<AwsOptions>().unwrap();
559584
let builder =
560-
get_s3_object_store_builder(table_url.as_ref(), aws_options).await?;
585+
get_s3_object_store_builder(table_url.as_ref(), aws_options, false).await?;
561586

562587
// If the environment variables are set (as they are in CI) use them
563588
let expected_access_key_id = std::env::var("AWS_ACCESS_KEY_ID").ok();
564589
let expected_secret_access_key = std::env::var("AWS_SECRET_ACCESS_KEY").ok();
565-
let expected_region = std::env::var("AWS_REGION").ok();
590+
let expected_region = Some(
591+
std::env::var("AWS_REGION").unwrap_or_else(|_| "eu-central-1".to_string()),
592+
);
566593
let expected_endpoint = std::env::var("AWS_ENDPOINT").ok();
567594

568595
// get the actual configuration information, then assert_eq!
@@ -624,7 +651,7 @@ mod tests {
624651
let table_options = get_table_options(&ctx, &sql).await;
625652
let aws_options = table_options.extensions.get::<AwsOptions>().unwrap();
626653
let builder =
627-
get_s3_object_store_builder(table_url.as_ref(), aws_options).await?;
654+
get_s3_object_store_builder(table_url.as_ref(), aws_options, false).await?;
628655
// get the actual configuration information, then assert_eq!
629656
let config = [
630657
(AmazonS3ConfigKey::AccessKeyId, access_key_id),
@@ -667,7 +694,7 @@ mod tests {
667694

668695
let table_options = get_table_options(&ctx, &sql).await;
669696
let aws_options = table_options.extensions.get::<AwsOptions>().unwrap();
670-
let err = get_s3_object_store_builder(table_url.as_ref(), aws_options)
697+
let err = get_s3_object_store_builder(table_url.as_ref(), aws_options, false)
671698
.await
672699
.unwrap_err();
673700

@@ -686,7 +713,55 @@ mod tests {
686713

687714
let aws_options = table_options.extensions.get::<AwsOptions>().unwrap();
688715
// ensure this isn't an error
689-
get_s3_object_store_builder(table_url.as_ref(), aws_options).await?;
716+
get_s3_object_store_builder(table_url.as_ref(), aws_options, false).await?;
717+
718+
Ok(())
719+
}
720+
721+
#[tokio::test]
722+
async fn s3_object_store_builder_resolves_region_when_none_provided() -> Result<()> {
723+
let expected_region = "eu-central-1";
724+
let location = "s3://test-bucket/path/file.parquet";
725+
726+
let table_url = ListingTableUrl::parse(location)?;
727+
let aws_options = AwsOptions {
728+
region: None, // No region specified - should auto-detect
729+
..Default::default()
730+
};
731+
732+
let builder =
733+
get_s3_object_store_builder(table_url.as_ref(), &aws_options, false).await?;
734+
735+
// Verify that the region was auto-detected in test environment
736+
assert_eq!(
737+
builder.get_config_value(&AmazonS3ConfigKey::Region),
738+
Some(expected_region.to_string())
739+
);
740+
741+
Ok(())
742+
}
743+
744+
#[tokio::test]
745+
async fn s3_object_store_builder_overrides_region_when_resolve_region_enabled(
746+
) -> Result<()> {
747+
let original_region = "us-east-1";
748+
let expected_region = "eu-central-1"; // This should be the auto-detected region
749+
let location = "s3://test-bucket/path/file.parquet";
750+
751+
let table_url = ListingTableUrl::parse(location)?;
752+
let aws_options = AwsOptions {
753+
region: Some(original_region.to_string()), // Explicit region provided
754+
..Default::default()
755+
};
756+
757+
let builder =
758+
get_s3_object_store_builder(table_url.as_ref(), &aws_options, true).await?;
759+
760+
// Verify that the region was overridden by auto-detection
761+
assert_eq!(
762+
builder.get_config_value(&AmazonS3ConfigKey::Region),
763+
Some(expected_region.to_string())
764+
);
690765

691766
Ok(())
692767
}

datafusion-cli/tests/cli_integration.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,3 +209,36 @@ SELECT * FROM CARS limit 1;
209209

210210
assert_cmd_snapshot!(cli().env_clear().pass_stdin(input));
211211
}
212+
213+
#[tokio::test]
214+
async fn test_aws_region_auto_resolution() {
215+
if env::var("TEST_STORAGE_INTEGRATION").is_err() {
216+
eprintln!("Skipping external storages integration tests");
217+
return;
218+
}
219+
220+
let mut settings = make_settings();
221+
settings.add_filter(r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z", "[TIME]");
222+
let _bound = settings.bind_to_scope();
223+
224+
let bucket = "s3://clickhouse-public-datasets/hits_compatible/athena_partitioned/hits_1.parquet";
225+
let region = "us-east-1";
226+
227+
let input = format!(
228+
r#"CREATE EXTERNAL TABLE hits
229+
STORED AS PARQUET
230+
LOCATION '{bucket}'
231+
OPTIONS(
232+
'aws.region' '{region}',
233+
'aws.skip_signature' true
234+
);
235+
236+
SELECT COUNT(*) FROM hits;
237+
"#
238+
);
239+
240+
assert_cmd_snapshot!(cli()
241+
.env("RUST_LOG", "warn")
242+
.env_remove("AWS_ENDPOINT")
243+
.pass_stdin(input));
244+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
---
2+
source: datafusion-cli/tests/cli_integration.rs
3+
info:
4+
program: datafusion-cli
5+
args: []
6+
env:
7+
AWS_ENDPOINT: ""
8+
RUST_LOG: warn
9+
stdin: "CREATE EXTERNAL TABLE hits\nSTORED AS PARQUET\nLOCATION 's3://clickhouse-public-datasets/hits_compatible/athena_partitioned/hits_1.parquet'\nOPTIONS(\n 'aws.region' 'us-east-1',\n 'aws.skip_signature' true\n);\n\nSELECT COUNT(*) FROM hits;\n"
10+
---
11+
success: true
12+
exit_code: 0
13+
----- stdout -----
14+
[CLI_VERSION]
15+
0 row(s) fetched.
16+
[ELAPSED]
17+
18+
+----------+
19+
| count(*) |
20+
+----------+
21+
| 1000000 |
22+
+----------+
23+
1 row(s) fetched.
24+
[ELAPSED]
25+
26+
\q
27+
28+
----- stderr -----
29+
[[TIME] WARN datafusion_cli::exec] S3 region is incorrect, auto-detecting the correct region (this may be slow). Consider updating your region configuration.

0 commit comments

Comments
 (0)