Skip to content

Commit 6247f6a

Browse files
authored
Merge pull request ClickHouse#62423 from Avogar/better-exception-s3-globs-partitions
Fix exception message during writing to partitioned s3/hdfs/azure path with globs
2 parents a76058a + a406871 commit 6247f6a

File tree

9 files changed

+66
-15
lines changed

9 files changed

+66
-15
lines changed

src/Storages/HDFS/StorageHDFS.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1101,6 +1101,10 @@ SinkToStoragePtr StorageHDFS::write(const ASTPtr & query, const StorageMetadataP
11011101

11021102
if (is_partitioned_implementation)
11031103
{
1104+
String path = current_uri.substr(current_uri.find('/', current_uri.find("//") + 2));
1105+
if (PartitionedSink::replaceWildcards(path, "").find_first_of("*?{") != std::string::npos)
1106+
throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED, "URI '{}' contains globs, so the table is in readonly mode", uris.back());
1107+
11041108
return std::make_shared<PartitionedHDFSSink>(
11051109
partition_by_ast,
11061110
current_uri,

src/Storages/StorageAzureBlob.cpp

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -491,6 +491,13 @@ Poco::URI StorageAzureBlob::Configuration::getConnectionURL() const
491491
return Poco::URI(parsed_connection_string.BlobServiceUrl.GetAbsoluteUrl());
492492
}
493493

494+
bool StorageAzureBlob::Configuration::withGlobsIgnorePartitionWildcard() const
495+
{
496+
if (!withPartitionWildcard())
497+
return withGlobs();
498+
499+
return PartitionedSink::replaceWildcards(getPath(), "").find_first_of("*?{") != std::string::npos;
500+
}
494501

495502
StorageAzureBlob::StorageAzureBlob(
496503
const Configuration & configuration_,
@@ -810,7 +817,7 @@ void StorageAzureBlob::read(
810817
size_t max_block_size,
811818
size_t num_streams)
812819
{
813-
if (partition_by && configuration.withWildcard())
820+
if (partition_by && configuration.withPartitionWildcard())
814821
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Reading from a partitioned Azure storage is not implemented yet");
815822

816823
auto this_ptr = std::static_pointer_cast<StorageAzureBlob>(shared_from_this());
@@ -897,13 +904,17 @@ void ReadFromAzureBlob::initializePipeline(QueryPipelineBuilder & pipeline, cons
897904

898905
SinkToStoragePtr StorageAzureBlob::write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context, bool /*async_insert*/)
899906
{
907+
if (configuration.withGlobsIgnorePartitionWildcard())
908+
throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED,
909+
"AzureBlobStorage key '{}' contains globs, so the table is in readonly mode", configuration.blob_path);
910+
900911
auto path = configuration.blobs_paths.front();
901912
auto sample_block = metadata_snapshot->getSampleBlock();
902913
auto chosen_compression_method = chooseCompressionMethod(path, configuration.compression_method);
903914
auto insert_query = std::dynamic_pointer_cast<ASTInsertQuery>(query);
904915

905916
auto partition_by_ast = insert_query ? (insert_query->partition_by ? insert_query->partition_by : partition_by) : nullptr;
906-
bool is_partitioned_implementation = partition_by_ast && configuration.withWildcard();
917+
bool is_partitioned_implementation = partition_by_ast && configuration.withPartitionWildcard();
907918

908919
if (is_partitioned_implementation)
909920
{
@@ -919,10 +930,6 @@ SinkToStoragePtr StorageAzureBlob::write(const ASTPtr & query, const StorageMeta
919930
}
920931
else
921932
{
922-
if (configuration.withGlobs())
923-
throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED,
924-
"AzureBlobStorage key '{}' contains globs, so the table is in readonly mode", configuration.blob_path);
925-
926933
if (auto new_path = checkAndGetNewFileOnInsertIfNeeded(local_context, object_storage.get(), path, configuration.blobs_paths.size()))
927934
{
928935
configuration.blobs_paths.push_back(*new_path);

src/Storages/StorageAzureBlob.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,14 @@ class StorageAzureBlob : public IStorage
3535

3636
bool withGlobs() const { return blob_path.find_first_of("*?{") != std::string::npos; }
3737

38-
bool withWildcard() const
38+
bool withPartitionWildcard() const
3939
{
4040
static const String PARTITION_ID_WILDCARD = "{_partition_id}";
4141
return blobs_paths.back().find(PARTITION_ID_WILDCARD) != String::npos;
4242
}
4343

44+
bool withGlobsIgnorePartitionWildcard() const;
45+
4446
Poco::URI getConnectionURL() const;
4547

4648
std::string connection_url;

src/Storages/StorageS3.cpp

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1283,7 +1283,7 @@ void ReadFromStorageS3Step::createIterator(const ActionsDAG::Node * predicate)
12831283

12841284
void ReadFromStorageS3Step::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
12851285
{
1286-
if (storage.partition_by && query_configuration.withWildcard())
1286+
if (storage.partition_by && query_configuration.withPartitionWildcard())
12871287
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Reading from a partitioned S3 storage is not implemented yet");
12881288

12891289
createIterator(nullptr);
@@ -1341,12 +1341,16 @@ SinkToStoragePtr StorageS3::write(const ASTPtr & query, const StorageMetadataPtr
13411341
auto query_configuration = updateConfigurationAndGetCopy(local_context);
13421342
auto key = query_configuration.keys.front();
13431343

1344+
if (query_configuration.withGlobsIgnorePartitionWildcard())
1345+
throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED,
1346+
"S3 key '{}' contains globs, so the table is in readonly mode", query_configuration.url.key);
1347+
13441348
auto sample_block = metadata_snapshot->getSampleBlock();
13451349
auto chosen_compression_method = chooseCompressionMethod(query_configuration.keys.back(), query_configuration.compression_method);
13461350
auto insert_query = std::dynamic_pointer_cast<ASTInsertQuery>(query);
13471351

13481352
auto partition_by_ast = insert_query ? (insert_query->partition_by ? insert_query->partition_by : partition_by) : nullptr;
1349-
bool is_partitioned_implementation = partition_by_ast && query_configuration.withWildcard();
1353+
bool is_partitioned_implementation = partition_by_ast && query_configuration.withPartitionWildcard();
13501354

13511355
if (is_partitioned_implementation)
13521356
{
@@ -1363,10 +1367,6 @@ SinkToStoragePtr StorageS3::write(const ASTPtr & query, const StorageMetadataPtr
13631367
}
13641368
else
13651369
{
1366-
if (query_configuration.withGlobs())
1367-
throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED,
1368-
"S3 key '{}' contains globs, so the table is in readonly mode", query_configuration.url.key);
1369-
13701370
if (auto new_key = checkAndGetNewFileOnInsertIfNeeded(local_context, configuration, query_configuration.keys.front(), query_configuration.keys.size()))
13711371
{
13721372
query_configuration.keys.push_back(*new_key);
@@ -1530,6 +1530,14 @@ void StorageS3::Configuration::connect(const ContextPtr & context)
15301530
credentials.GetSessionToken());
15311531
}
15321532

1533+
bool StorageS3::Configuration::withGlobsIgnorePartitionWildcard() const
1534+
{
1535+
if (!withPartitionWildcard())
1536+
return withGlobs();
1537+
1538+
return PartitionedSink::replaceWildcards(getPath(), "").find_first_of("*?{") != std::string::npos;
1539+
}
1540+
15331541
void StorageS3::processNamedCollectionResult(StorageS3::Configuration & configuration, const NamedCollection & collection)
15341542
{
15351543
validateNamedCollection(collection, required_configuration_keys, optional_configuration_keys);

src/Storages/StorageS3.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -274,21 +274,23 @@ class StorageS3 : public IStorage
274274
{
275275
Configuration() = default;
276276

277-
String getPath() const { return url.key; }
277+
const String & getPath() const { return url.key; }
278278

279279
bool update(const ContextPtr & context);
280280

281281
void connect(const ContextPtr & context);
282282

283283
bool withGlobs() const { return url.key.find_first_of("*?{") != std::string::npos; }
284284

285-
bool withWildcard() const
285+
bool withPartitionWildcard() const
286286
{
287287
static const String PARTITION_ID_WILDCARD = "{_partition_id}";
288288
return url.bucket.find(PARTITION_ID_WILDCARD) != String::npos
289289
|| keys.back().find(PARTITION_ID_WILDCARD) != String::npos;
290290
}
291291

292+
bool withGlobsIgnorePartitionWildcard() const;
293+
292294
S3::URI url;
293295
S3::AuthSettings auth_settings;
294296
S3Settings::RequestSettings request_settings;

tests/integration/test_storage_azure_blob_storage/test.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1323,6 +1323,20 @@ def test_format_detection(cluster):
13231323
assert result == expected_result
13241324

13251325

1326+
def test_write_to_globbed_partitioned_path(cluster):
1327+
node = cluster.instances["node"]
1328+
storage_account_url = cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]
1329+
account_name = "devstoreaccount1"
1330+
account_key = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="
1331+
error = azure_query(
1332+
node,
1333+
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}', 'cont', 'test_data_*_{{_partition_id}}', '{account_name}', '{account_key}', 'CSV', 'auto', 'x UInt64') partition by 42 select 42",
1334+
expect_error="true",
1335+
)
1336+
1337+
assert "DATABASE_ACCESS_DENIED" in error
1338+
1339+
13261340
def test_parallel_read(cluster):
13271341
node = cluster.instances["node"]
13281342
connection_string = cluster.env_variables["AZURITE_CONNECTION_STRING"]

tests/integration/test_storage_hdfs/test.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1116,6 +1116,16 @@ def test_format_detection(started_cluster):
11161116
assert expected_result == result
11171117

11181118

1119+
def test_write_to_globbed_partitioned_path(started_cluster):
1120+
node = started_cluster.instances["node1"]
1121+
1122+
error = node.query_and_get_error(
1123+
"insert into function hdfs('hdfs://hdfs1:9000/test_data_*_{_partition_id}.csv') partition by 42 select 42"
1124+
)
1125+
1126+
assert "DATABASE_ACCESS_DENIED" in error
1127+
1128+
11191129
def test_respect_object_existence_on_partitioned_write(started_cluster):
11201130
node = started_cluster.instances["node1"]
11211131

tests/queries/0_stateless/03037_s3_write_to_globbed_partitioned_path.reference

Whitespace-only changes.
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
-- Tags: no-fasttest
2+
3+
insert into function s3('http://localhost:11111/test/data_*_{_partition_id}.csv') partition by number % 3 select * from numbers(10); -- {serverError DATABASE_ACCESS_DENIED}
4+

0 commit comments

Comments
 (0)