Skip to content

Commit 51e73b1

Browse files
authored
Merge pull request #1028 from Altinity/backports/25.6.5/85134_allow_key_value_arguments_in_s3_s3_cluster
Antalya 25.6.5 - Backport of ClickHouse#85134 - Allow key-value arguments in s3/s3cluster engine
2 parents 51862f0 + cee48d1 commit 51e73b1

File tree

5 files changed

+305
-19
lines changed

5 files changed

+305
-19
lines changed

src/Parsers/FunctionSecretArgumentsFinder.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,7 @@ class FunctionSecretArgumentsFinder
241241
break;
242242
if (f->name() == "headers")
243243
result.nested_maps.push_back(f->name());
244-
else if (f->name() != "extra_credentials")
244+
else if (f->name() != "extra_credentials" && f->name() != "equals")
245245
break;
246246
count -= 1;
247247
}
@@ -266,6 +266,8 @@ class FunctionSecretArgumentsFinder
266266
return;
267267
}
268268

269+
findSecretNamedArgument("secret_access_key", url_arg_idx);
270+
269271
/// We should check other arguments first because we don't need to do any replacement in case of
270272
/// s3('url', NOSIGN, 'format' [, 'compression'] [, extra_credentials(..)] [, headers(..)])
271273
/// s3('url', 'format', 'structure' [, 'compression'] [, extra_credentials(..)] [, headers(..)])
@@ -625,6 +627,8 @@ class FunctionSecretArgumentsFinder
625627
return;
626628
}
627629

630+
findSecretNamedArgument("secret_access_key", 0);
631+
628632
/// We should check other arguments first because we don't need to do any replacement in case of
629633
/// S3('url', NOSIGN, 'format' [, 'compression'] [, extra_credentials(..)] [, headers(..)])
630634
/// S3('url', 'format', 'compression' [, extra_credentials(..)] [, headers(..)])

src/Storages/ObjectStorage/S3/Configuration.cpp

Lines changed: 207 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -294,18 +294,103 @@ bool StorageS3Configuration::collectCredentials(ASTPtr maybe_credentials, S3::S3
294294
return true;
295295
}
296296

297+
template <typename T>
298+
static std::optional<T> getFromPositionOrKeyValue(
299+
const std::string & key,
300+
const ASTs & args,
301+
const std::unordered_map<std::string_view, size_t> & engine_args_to_idx,
302+
const std::unordered_map<std::string, Field> & key_value_args)
303+
{
304+
if (auto arg_it = engine_args_to_idx.find(key); arg_it != engine_args_to_idx.end())
305+
return checkAndGetLiteralArgument<T>(args[arg_it->second], key);
306+
307+
if (auto arg_it = key_value_args.find(key); arg_it != key_value_args.end())
308+
return arg_it->second.safeGet<T>();
309+
310+
return std::nullopt;
311+
};
312+
313+
static std::unordered_map<std::string, Field> parseKeyValueArguments(const ASTs & function_args, ContextPtr context)
314+
{
315+
std::unordered_map<std::string, Field> key_value_args;
316+
for (const auto & arg : function_args)
317+
{
318+
const auto * function_ast = arg->as<ASTFunction>();
319+
if (!function_ast || function_ast->name != "equals")
320+
continue;
321+
322+
auto * args_expr = assert_cast<ASTExpressionList *>(function_ast->arguments.get());
323+
auto & children = args_expr->children;
324+
if (children.size() != 2)
325+
{
326+
throw Exception(
327+
ErrorCodes::BAD_ARGUMENTS,
328+
"Key value argument is incorrect: expected 2 arguments, got {}",
329+
children.size());
330+
}
331+
332+
auto key_literal = evaluateConstantExpressionOrIdentifierAsLiteral(children[0], context);
333+
auto value_literal = evaluateConstantExpressionOrIdentifierAsLiteral(children[1], context);
334+
335+
auto arg_name_value = key_literal->as<ASTLiteral>()->value;
336+
if (arg_name_value.getType() != Field::Types::Which::String)
337+
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expected string as credential name");
338+
339+
auto arg_name = arg_name_value.safeGet<String>();
340+
auto arg_value = value_literal->as<ASTLiteral>()->value;
341+
342+
auto inserted = key_value_args.emplace(arg_name, arg_value).second;
343+
if (!inserted)
344+
throw Exception(ErrorCodes::LOGICAL_ERROR, "Duplicate key value argument: {}", arg_name);
345+
}
346+
return key_value_args;
347+
}
348+
349+
static ASTs::iterator getFirstKeyValueArgument(ASTs & args)
350+
{
351+
ASTs::iterator first_key_value_arg_it = args.end();
352+
for (auto * it = args.begin(); it != args.end(); ++it)
353+
{
354+
const auto * function_ast = (*it)->as<ASTFunction>();
355+
if (function_ast && function_ast->name == "equals")
356+
{
357+
if (first_key_value_arg_it == args.end())
358+
first_key_value_arg_it = it;
359+
}
360+
else if (first_key_value_arg_it != args.end())
361+
{
362+
throw Exception(
363+
ErrorCodes::BAD_ARGUMENTS,
364+
"Expected positional arguments to go before key-value arguments");
365+
}
366+
}
367+
return first_key_value_arg_it;
368+
}
369+
297370
void StorageS3Configuration::fromAST(ASTs & args, ContextPtr context, bool with_structure)
298371
{
299372
auto extra_credentials = extractExtraCredentials(args);
300373

301374
size_t count = StorageURL::evalArgsAndCollectHeaders(args, headers_from_ast, context);
302375

376+
ASTs key_value_asts;
377+
if (auto * first_key_value_arg_it = getFirstKeyValueArgument(args);
378+
first_key_value_arg_it != args.end())
379+
{
380+
key_value_asts = ASTs(first_key_value_arg_it, args.end());
381+
count -= key_value_asts.size();
382+
}
383+
303384
if (count == 0 || count > getMaxNumberOfArguments(with_structure))
304385
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
305386
"Storage S3 requires 1 to {} arguments. All supported signatures:\n{}",
306387
getMaxNumberOfArguments(with_structure),
307388
getSignatures(with_structure));
308389

390+
auto key_value_args = parseKeyValueArguments(key_value_asts, context);
391+
if (key_value_args.contains("structure"))
392+
with_structure = false;
393+
309394
const auto & config = context->getConfigRef();
310395
s3_capabilities = std::make_unique<S3Capabilities>(getCapabilitiesFromConfig(config, "s3"));
311396

@@ -532,50 +617,77 @@ void StorageS3Configuration::fromAST(ASTs & args, ContextPtr context, bool with_
532617
s3_settings->request_settings.updateIfChanged(endpoint_settings->request_settings);
533618
}
534619

535-
if (engine_args_to_idx.contains("format"))
620+
if (auto format_value = getFromPositionOrKeyValue<String>("format", args, engine_args_to_idx, key_value_args);
621+
format_value.has_value())
536622
{
537-
auto format_ = checkAndGetLiteralArgument<String>(args[engine_args_to_idx["format"]], "format");
623+
auto format_ = format_value.value();
538624
/// Set format to configuration only of it's not 'auto',
539625
/// because we can have default format set in configuration.
540626
if (format_ != "auto")
541627
setFormat(format_);
542628
}
543629

544-
if (engine_args_to_idx.contains("structure"))
545-
setStructure(checkAndGetLiteralArgument<String>(args[engine_args_to_idx["structure"]], "structure"));
630+
if (auto structure_value = getFromPositionOrKeyValue<String>("structure", args, engine_args_to_idx, key_value_args);
631+
structure_value.has_value())
632+
{
633+
setStructure(structure_value.value());
634+
}
546635

547-
if (engine_args_to_idx.contains("compression_method"))
548-
setCompressionMethod(checkAndGetLiteralArgument<String>(args[engine_args_to_idx["compression_method"]], "compression_method"));
636+
if (auto compression_method_value = getFromPositionOrKeyValue<String>("compression_method", args, engine_args_to_idx, key_value_args);
637+
compression_method_value.has_value())
638+
{
639+
setCompressionMethod(compression_method_value.value());
640+
}
549641

550-
if (engine_args_to_idx.contains("partition_strategy"))
642+
if (auto partition_strategy_value = getFromPositionOrKeyValue<String>("partition_strategy", args, engine_args_to_idx, key_value_args);
643+
partition_strategy_value.has_value())
551644
{
552-
const auto partition_strategy_name = checkAndGetLiteralArgument<String>(args[engine_args_to_idx["partition_strategy"]], "partition_strategy");
645+
const auto & partition_strategy_name = partition_strategy_value.value();
553646
const auto partition_strategy_type_opt = magic_enum::enum_cast<PartitionStrategyFactory::StrategyType>(partition_strategy_name, magic_enum::case_insensitive);
554647

555-
if (!partition_strategy_type_opt)
648+
if (!partition_strategy_type_opt.has_value())
556649
{
557650
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Partition strategy {} is not supported", partition_strategy_name);
558651
}
559652

560653
setPartitionStrategyType(partition_strategy_type_opt.value());
561654
}
562655

563-
if (engine_args_to_idx.contains("partition_columns_in_data_file"))
564-
setPartitionColumnsInDataFile(checkAndGetLiteralArgument<bool>(args[engine_args_to_idx["partition_columns_in_data_file"]], "partition_columns_in_data_file"));
656+
if (auto partition_columns_in_data_file_value = getFromPositionOrKeyValue<bool>("partition_columns_in_data_file", args, engine_args_to_idx, key_value_args);
657+
partition_columns_in_data_file_value.has_value())
658+
{
659+
setPartitionColumnsInDataFile(partition_columns_in_data_file_value.value());
660+
}
565661
else
566662
setPartitionColumnsInDataFile(getPartitionStrategyType() != PartitionStrategyFactory::StrategyType::HIVE);
567663

568-
if (engine_args_to_idx.contains("access_key_id"))
569-
s3_settings->auth_settings[S3AuthSetting::access_key_id] = checkAndGetLiteralArgument<String>(args[engine_args_to_idx["access_key_id"]], "access_key_id");
664+
if (auto access_key_id_value = getFromPositionOrKeyValue<String>("access_key_id", args, engine_args_to_idx, key_value_args);
665+
access_key_id_value.has_value())
666+
{
667+
s3_settings->auth_settings[S3AuthSetting::access_key_id] = access_key_id_value.value();
668+
}
570669

571-
if (engine_args_to_idx.contains("secret_access_key"))
572-
s3_settings->auth_settings[S3AuthSetting::secret_access_key] = checkAndGetLiteralArgument<String>(args[engine_args_to_idx["secret_access_key"]], "secret_access_key");
670+
if (auto secret_access_key_value = getFromPositionOrKeyValue<String>("secret_access_key", args, engine_args_to_idx, key_value_args);
671+
secret_access_key_value.has_value())
672+
{
673+
s3_settings->auth_settings[S3AuthSetting::secret_access_key] = secret_access_key_value.value();
674+
}
573675

574-
if (engine_args_to_idx.contains("session_token"))
575-
s3_settings->auth_settings[S3AuthSetting::session_token] = checkAndGetLiteralArgument<String>(args[engine_args_to_idx["session_token"]], "session_token");
676+
if (auto session_token_value = getFromPositionOrKeyValue<String>("session_token", args, engine_args_to_idx, key_value_args);
677+
session_token_value.has_value())
678+
{
679+
s3_settings->auth_settings[S3AuthSetting::session_token] = session_token_value.value();
680+
}
576681

577682
if (no_sign_request)
683+
{
578684
s3_settings->auth_settings[S3AuthSetting::no_sign_request] = no_sign_request;
685+
}
686+
else if (auto no_sign_value = getFromPositionOrKeyValue<bool>("no_sign", args, {}, key_value_args);
687+
no_sign_value.has_value())
688+
{
689+
s3_settings->auth_settings[S3AuthSetting::no_sign_request] = no_sign_value.value();
690+
}
579691

580692
static_configuration = !s3_settings->auth_settings[S3AuthSetting::access_key_id].value.empty() || s3_settings->auth_settings[S3AuthSetting::no_sign_request].changed;
581693

@@ -610,14 +722,87 @@ void StorageS3Configuration::addStructureAndFormatToArgsIfNeeded(
610722
auto extra_credentials = extractExtraCredentials(args);
611723

612724
HTTPHeaderEntries tmp_headers;
725+
613726
size_t count = StorageURL::evalArgsAndCollectHeaders(args, tmp_headers, context);
614727

728+
ASTs key_value_asts;
729+
auto * first_key_value_arg_it = getFirstKeyValueArgument(args);
730+
if (first_key_value_arg_it != args.end())
731+
{
732+
key_value_asts = ASTs(first_key_value_arg_it, args.end());
733+
count -= key_value_asts.size();
734+
}
735+
615736
if (count == 0 || count > getMaxNumberOfArguments())
616-
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected 1 to {} arguments in table function s3, got {}", getMaxNumberOfArguments(), count);
737+
{
738+
throw Exception(
739+
ErrorCodes::LOGICAL_ERROR,
740+
"Expected 1 to {} arguments in table function s3, got {}",
741+
getMaxNumberOfArguments(), count);
742+
}
617743

618744
auto format_literal = std::make_shared<ASTLiteral>(format_);
619745
auto structure_literal = std::make_shared<ASTLiteral>(structure_);
620746

747+
bool format_in_key_value = false;
748+
bool structure_in_key_value = false;
749+
for (auto * it = first_key_value_arg_it; it != args.end(); ++it)
750+
{
751+
const auto & arg = *it;
752+
const auto * function_ast = arg->as<ASTFunction>();
753+
if (!function_ast || function_ast->name != "equals")
754+
continue;
755+
756+
auto * args_expr = assert_cast<ASTExpressionList *>(function_ast->arguments.get());
757+
auto & children = args_expr->children;
758+
if (children.size() != 2)
759+
{
760+
throw Exception(
761+
ErrorCodes::BAD_ARGUMENTS,
762+
"Key value argument is incorrect: expected 2 arguments, got {}",
763+
children.size());
764+
}
765+
766+
auto literal = evaluateConstantExpressionOrIdentifierAsLiteral(children[0], context);
767+
768+
auto arg_name_value = literal->as<ASTLiteral>()->value;
769+
if (arg_name_value.getType() != Field::Types::Which::String)
770+
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expected string as credential name");
771+
auto arg_name = arg_name_value.safeGet<String>();
772+
773+
if (arg_name == "format")
774+
{
775+
children[1] = format_literal;
776+
format_in_key_value = true;
777+
}
778+
else if (arg_name == "structure")
779+
{
780+
children[1] = structure_literal;
781+
structure_in_key_value = true;
782+
}
783+
}
784+
785+
if (format_in_key_value && structure_in_key_value)
786+
{
787+
/// Add extracted extra credentials to the end of the args.
788+
if (extra_credentials)
789+
args.push_back(extra_credentials);
790+
return;
791+
}
792+
else if (format_in_key_value && with_structure)
793+
{
794+
/// Structure goes right after format, so if format is in key-value,
795+
/// then structure is required to be key-value.
796+
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expected positional arguments to go before key-value arguments");
797+
}
798+
else if (structure_in_key_value)
799+
{
800+
with_structure = false;
801+
}
802+
803+
/// We will return it back at the end.
804+
args.erase(first_key_value_arg_it, args.end());
805+
621806
/// s3(s3_url)
622807
if (count == 1)
623808
{
@@ -776,6 +961,9 @@ void StorageS3Configuration::addStructureAndFormatToArgsIfNeeded(
776961
args[5] = structure_literal;
777962
}
778963

964+
if (!key_value_asts.empty())
965+
args.insert(args.end(), std::make_move_iterator(key_value_asts.begin()), std::make_move_iterator(key_value_asts.end()));
966+
779967
/// Add extracted extra credentials to the end of the args.
780968
if (extra_credentials)
781969
args.push_back(extra_credentials);
@@ -809,3 +997,4 @@ ASTPtr StorageS3Configuration::createArgsWithAccessData() const
809997
}
810998

811999
#endif
1000+

tests/integration/test_mask_sensitive_info/test.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -329,6 +329,7 @@ def test_create_table():
329329

330330
f"Kafka() SETTINGS kafka_broker_list = '127.0.0.1', kafka_topic_list = 'topic', kafka_group_name = 'group', kafka_format = 'JSONEachRow', kafka_security_protocol = 'sasl_ssl', kafka_sasl_mechanism = 'PLAIN', kafka_sasl_username = 'user', kafka_sasl_password = '{password}', format_avro_schema_registry_url = 'http://schema_user:{password}@'",
331331
f"Kafka() SETTINGS kafka_broker_list = '127.0.0.1', kafka_topic_list = 'topic', kafka_group_name = 'group', kafka_format = 'JSONEachRow', kafka_security_protocol = 'sasl_ssl', kafka_sasl_mechanism = 'PLAIN', kafka_sasl_username = 'user', kafka_sasl_password = '{password}', format_avro_schema_registry_url = 'http://schema_user:{password}@domain.com'",
332+
f"S3('http://minio1:9001/root/data/test5.csv.gz', 'CSV', access_key_id = 'minio', secret_access_key = '{password}', compression_method = 'gzip')",
332333
]
333334

334335
def make_test_case(i):
@@ -419,6 +420,7 @@ def make_test_case(i):
419420

420421
"CREATE TABLE table44 (`x` int) ENGINE = Kafka SETTINGS kafka_broker_list = '127.0.0.1', kafka_topic_list = 'topic', kafka_group_name = 'group', kafka_format = 'JSONEachRow', kafka_security_protocol = 'sasl_ssl', kafka_sasl_mechanism = 'PLAIN', kafka_sasl_username = 'user', kafka_sasl_password = '[HIDDEN]', format_avro_schema_registry_url = 'http://schema_user:[HIDDEN]@'",
421422
"CREATE TABLE table45 (`x` int) ENGINE = Kafka SETTINGS kafka_broker_list = '127.0.0.1', kafka_topic_list = 'topic', kafka_group_name = 'group', kafka_format = 'JSONEachRow', kafka_security_protocol = 'sasl_ssl', kafka_sasl_mechanism = 'PLAIN', kafka_sasl_username = 'user', kafka_sasl_password = '[HIDDEN]', format_avro_schema_registry_url = 'http://schema_user:[HIDDEN]@domain.com'",
423+
"CREATE TABLE table46 (`x` int) ENGINE = S3('http://minio1:9001/root/data/test5.csv.gz', 'CSV', access_key_id = 'minio', secret_access_key = '[HIDDEN]', compression_method = 'gzip')",
422424
],
423425
must_not_contain=[password],
424426
)

0 commit comments

Comments
 (0)