Skip to content

Commit 88548eb

Browse files
committed
Fix exception message during writing to partitioned s3 path woth globs
1 parent ce4ab06 commit 88548eb

File tree

4 files changed

+21
-9
lines changed

4 files changed

+21
-9
lines changed

src/Storages/StorageS3.cpp

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ class StorageS3Source::DisclosedGlobIterator::Impl : WithContext
207207
, list_objects_scheduler(threadPoolCallbackRunner<ListObjectsOutcome>(list_objects_pool, "ListObjects"))
208208
, file_progress_callback(file_progress_callback_)
209209
{
210-
if (globbed_uri.bucket.find_first_of("*?{") != std::string::npos)
210+
if (globbed_uri.bucket.find_first_of("*?{") != globbed_uri.bucket.npos)
211211
throw Exception(ErrorCodes::UNEXPECTED_EXPRESSION, "Expression can not have wildcards inside bucket name");
212212

213213
const String key_prefix = globbed_uri.key.substr(0, globbed_uri.key.find_first_of("*?{"));
@@ -1194,7 +1194,7 @@ void ReadFromStorageS3Step::createIterator(const ActionsDAG::Node * predicate)
11941194

11951195
void ReadFromStorageS3Step::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
11961196
{
1197-
if (storage.partition_by && query_configuration.withWildcard())
1197+
if (storage.partition_by && query_configuration.withPartitionWildcard())
11981198
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Reading from a partitioned S3 storage is not implemented yet");
11991199

12001200
createIterator(nullptr);
@@ -1249,12 +1249,16 @@ SinkToStoragePtr StorageS3::write(const ASTPtr & query, const StorageMetadataPtr
12491249
{
12501250
auto query_configuration = updateConfigurationAndGetCopy(local_context);
12511251

1252+
if (query_configuration.withGlobsIgnorePartitionWildcard())
1253+
throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED,
1254+
"S3 key '{}' contains globs, so the table is in readonly mode", query_configuration.url.key);
1255+
12521256
auto sample_block = metadata_snapshot->getSampleBlock();
12531257
auto chosen_compression_method = chooseCompressionMethod(query_configuration.keys.back(), query_configuration.compression_method);
12541258
auto insert_query = std::dynamic_pointer_cast<ASTInsertQuery>(query);
12551259

12561260
auto partition_by_ast = insert_query ? (insert_query->partition_by ? insert_query->partition_by : partition_by) : nullptr;
1257-
bool is_partitioned_implementation = partition_by_ast && query_configuration.withWildcard();
1261+
bool is_partitioned_implementation = partition_by_ast && query_configuration.withPartitionWildcard();
12581262

12591263
if (is_partitioned_implementation)
12601264
{
@@ -1271,10 +1275,6 @@ SinkToStoragePtr StorageS3::write(const ASTPtr & query, const StorageMetadataPtr
12711275
}
12721276
else
12731277
{
1274-
if (query_configuration.withGlobs())
1275-
throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED,
1276-
"S3 key '{}' contains globs, so the table is in readonly mode", query_configuration.url.key);
1277-
12781278
bool truncate_in_insert = local_context->getSettingsRef().s3_truncate_on_insert;
12791279

12801280
if (!truncate_in_insert && S3::objectExists(*query_configuration.client, query_configuration.url.bucket, query_configuration.keys.back(), query_configuration.url.version_id, query_configuration.request_settings))
@@ -1460,6 +1460,14 @@ void StorageS3::Configuration::connect(const ContextPtr & context)
14601460
credentials.GetSessionToken());
14611461
}
14621462

1463+
bool StorageS3::Configuration::withGlobsIgnorePartitionWildcard() const
1464+
{
1465+
if (!withPartitionWildcard())
1466+
return withGlobs();
1467+
1468+
return PartitionedSink::replaceWildcards(getPath(), "").find_first_of("*?{") != std::string::npos;
1469+
}
1470+
14631471
void StorageS3::processNamedCollectionResult(StorageS3::Configuration & configuration, const NamedCollection & collection)
14641472
{
14651473
validateNamedCollection(collection, required_configuration_keys, optional_configuration_keys);

src/Storages/StorageS3.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -274,21 +274,23 @@ class StorageS3 : public IStorage
274274
{
275275
Configuration() = default;
276276

277-
String getPath() const { return url.key; }
277+
const String & getPath() const { return url.key; }
278278

279279
bool update(const ContextPtr & context);
280280

281281
void connect(const ContextPtr & context);
282282

283283
bool withGlobs() const { return url.key.find_first_of("*?{") != std::string::npos; }
284284

285-
bool withWildcard() const
285+
bool withPartitionWildcard() const
286286
{
287287
static const String PARTITION_ID_WILDCARD = "{_partition_id}";
288288
return url.bucket.find(PARTITION_ID_WILDCARD) != String::npos
289289
|| keys.back().find(PARTITION_ID_WILDCARD) != String::npos;
290290
}
291291

292+
bool withGlobsIgnorePartitionWildcard() const;
293+
292294
S3::URI url;
293295
S3::AuthSettings auth_settings;
294296
S3Settings::RequestSettings request_settings;

tests/queries/0_stateless/03037_s3_write_to_globbed_partitioned_path.reference

Whitespace-only changes.
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
insert into function s3('http://localhost:11111/test/data_*_{_partition_id}.csv') partition by number % 3 select * from numbers(10); -- {serverError DATABASE_ACCESS_DENIED}
2+

0 commit comments

Comments
 (0)