Skip to content

Commit c458920

Browse files
authored
Merge pull request ClickHouse#62425 from Avogar/fix-partitioned-write
Respect settings truncate_on_insert/create_new_file_on_insert in s3/hdfs/azure engines during partitioned write
2 parents 5576cb7 + cae7cbf commit c458920

File tree

7 files changed

+255
-94
lines changed

7 files changed

+255
-94
lines changed

src/Storages/HDFS/StorageHDFS.cpp

Lines changed: 40 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -871,6 +871,40 @@ class HDFSSink : public SinkToStorage
871871
bool cancelled = false;
872872
};
873873

874+
namespace
875+
{
876+
std::optional<String> checkAndGetNewFileOnInsertIfNeeded(const ContextPtr & context, const String & uri, size_t sequence_number)
877+
{
878+
const auto [path_from_uri, uri_without_path] = getPathFromUriAndUriWithoutPath(uri);
879+
880+
HDFSBuilderWrapper builder = createHDFSBuilder(uri_without_path + "/", context->getGlobalContext()->getConfigRef());
881+
HDFSFSPtr fs = createHDFSFS(builder.get());
882+
883+
if (context->getSettingsRef().hdfs_truncate_on_insert || hdfsExists(fs.get(), path_from_uri.c_str()))
884+
return std::nullopt;
885+
886+
if (context->getSettingsRef().hdfs_create_new_file_on_insert)
887+
{
888+
auto pos = uri.find_first_of('.', uri.find_last_of('/'));
889+
String new_uri;
890+
do
891+
{
892+
new_uri = uri.substr(0, pos) + "." + std::to_string(sequence_number) + (pos == std::string::npos ? "" : uri.substr(pos));
893+
++sequence_number;
894+
}
895+
while (!hdfsExists(fs.get(), new_uri.c_str()));
896+
897+
return new_uri;
898+
}
899+
900+
throw Exception(
901+
ErrorCodes::BAD_ARGUMENTS,
902+
"File with path {} already exists. If you want to overwrite it, enable setting hdfs_truncate_on_insert, "
903+
"if you want to create new file on each insert, enable setting hdfs_create_new_file_on_insert",
904+
path_from_uri);
905+
}
906+
}
907+
874908
class PartitionedHDFSSink : public PartitionedSink
875909
{
876910
public:
@@ -894,6 +928,8 @@ class PartitionedHDFSSink : public PartitionedSink
894928
{
895929
auto path = PartitionedSink::replaceWildcards(uri, partition_id);
896930
PartitionedSink::validatePartitionKey(path, true);
931+
if (auto new_path = checkAndGetNewFileOnInsertIfNeeded(context, path, 1))
932+
path = *new_path;
897933
return std::make_shared<HDFSSink>(path, format, sample_block, context, compression_method);
898934
}
899935

@@ -1056,7 +1092,7 @@ void ReadFromHDFS::initializePipeline(QueryPipelineBuilder & pipeline, const Bui
10561092

10571093
SinkToStoragePtr StorageHDFS::write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr context_, bool /*async_insert*/)
10581094
{
1059-
String current_uri = uris.back();
1095+
String current_uri = uris.front();
10601096

10611097
bool has_wildcards = current_uri.find(PartitionedSink::PARTITION_ID_WILDCARD) != String::npos;
10621098
const auto * insert_query = dynamic_cast<const ASTInsertQuery *>(query.get());
@@ -1078,34 +1114,10 @@ SinkToStoragePtr StorageHDFS::write(const ASTPtr & query, const StorageMetadataP
10781114
if (is_path_with_globs)
10791115
throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED, "URI '{}' contains globs, so the table is in readonly mode", uris.back());
10801116

1081-
const auto [path_from_uri, uri_without_path] = getPathFromUriAndUriWithoutPath(current_uri);
1082-
1083-
HDFSBuilderWrapper builder = createHDFSBuilder(uri_without_path + "/", context_->getGlobalContext()->getConfigRef());
1084-
HDFSFSPtr fs = createHDFSFS(builder.get());
1085-
1086-
bool truncate_on_insert = context_->getSettingsRef().hdfs_truncate_on_insert;
1087-
if (!truncate_on_insert && !hdfsExists(fs.get(), path_from_uri.c_str()))
1117+
if (auto new_uri = checkAndGetNewFileOnInsertIfNeeded(context_, uris.front(), uris.size()))
10881118
{
1089-
if (context_->getSettingsRef().hdfs_create_new_file_on_insert)
1090-
{
1091-
auto pos = uris[0].find_first_of('.', uris[0].find_last_of('/'));
1092-
size_t index = uris.size();
1093-
String new_uri;
1094-
do
1095-
{
1096-
new_uri = uris[0].substr(0, pos) + "." + std::to_string(index) + (pos == std::string::npos ? "" : uris[0].substr(pos));
1097-
++index;
1098-
}
1099-
while (!hdfsExists(fs.get(), new_uri.c_str()));
1100-
uris.push_back(new_uri);
1101-
current_uri = new_uri;
1102-
}
1103-
else
1104-
throw Exception(
1105-
ErrorCodes::BAD_ARGUMENTS,
1106-
"File with path {} already exists. If you want to overwrite it, enable setting hdfs_truncate_on_insert, "
1107-
"if you want to create new file on each insert, enable setting hdfs_create_new_file_on_insert",
1108-
path_from_uri);
1119+
uris.push_back(*new_uri);
1120+
current_uri = *new_uri;
11091121
}
11101122

11111123
return std::make_shared<HDFSSink>(current_uri,

src/Storages/StorageAzureBlob.cpp

Lines changed: 39 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -634,6 +634,36 @@ class StorageAzureBlobSink : public SinkToStorage
634634
std::mutex cancel_mutex;
635635
};
636636

637+
namespace
638+
{
639+
std::optional<String> checkAndGetNewFileOnInsertIfNeeded(const ContextPtr & context, AzureObjectStorage * object_storage, const String & path, size_t sequence_number)
640+
{
641+
if (context->getSettingsRef().azure_truncate_on_insert || !object_storage->exists(StoredObject(path)))
642+
return std::nullopt;
643+
644+
if (context->getSettingsRef().azure_create_new_file_on_insert)
645+
{
646+
auto pos = path.find_first_of('.');
647+
String new_path;
648+
do
649+
{
650+
new_path = path.substr(0, pos) + "." + std::to_string(sequence_number) + (pos == std::string::npos ? "" : path.substr(pos));
651+
++sequence_number;
652+
}
653+
while (object_storage->exists(StoredObject(new_path)));
654+
655+
return new_path;
656+
}
657+
658+
throw Exception(
659+
ErrorCodes::BAD_ARGUMENTS,
660+
"Object with key {} already exists. "
661+
"If you want to overwrite it, enable setting azure_truncate_on_insert, if you "
662+
"want to create a new file on each insert, enable setting azure_create_new_file_on_insert",
663+
path);
664+
}
665+
}
666+
637667
class PartitionedStorageAzureBlobSink : public PartitionedSink, WithContext
638668
{
639669
public:
@@ -660,6 +690,8 @@ class PartitionedStorageAzureBlobSink : public PartitionedSink, WithContext
660690
{
661691
auto partition_key = replaceWildcards(blob, partition_id);
662692
validateKey(partition_key);
693+
if (auto new_path = checkAndGetNewFileOnInsertIfNeeded(getContext(), object_storage, partition_key, 1))
694+
partition_key = *new_path;
663695

664696
return std::make_shared<StorageAzureBlobSink>(
665697
format,
@@ -837,8 +869,9 @@ void ReadFromAzureBlob::initializePipeline(QueryPipelineBuilder & pipeline, cons
837869

838870
SinkToStoragePtr StorageAzureBlob::write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context, bool /*async_insert*/)
839871
{
872+
auto path = configuration.blobs_paths.front();
840873
auto sample_block = metadata_snapshot->getSampleBlock();
841-
auto chosen_compression_method = chooseCompressionMethod(configuration.blobs_paths.back(), configuration.compression_method);
874+
auto chosen_compression_method = chooseCompressionMethod(path, configuration.compression_method);
842875
auto insert_query = std::dynamic_pointer_cast<ASTInsertQuery>(query);
843876

844877
auto partition_by_ast = insert_query ? (insert_query->partition_by ? insert_query->partition_by : partition_by) : nullptr;
@@ -854,44 +887,18 @@ SinkToStoragePtr StorageAzureBlob::write(const ASTPtr & query, const StorageMeta
854887
format_settings,
855888
chosen_compression_method,
856889
object_storage.get(),
857-
configuration.blobs_paths.back());
890+
path);
858891
}
859892
else
860893
{
861894
if (configuration.withGlobs())
862895
throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED,
863896
"AzureBlobStorage key '{}' contains globs, so the table is in readonly mode", configuration.blob_path);
864897

865-
bool truncate_in_insert = local_context->getSettingsRef().azure_truncate_on_insert;
866-
867-
if (!truncate_in_insert && object_storage->exists(StoredObject(configuration.blob_path)))
898+
if (auto new_path = checkAndGetNewFileOnInsertIfNeeded(local_context, object_storage.get(), path, configuration.blobs_paths.size()))
868899
{
869-
870-
if (local_context->getSettingsRef().azure_create_new_file_on_insert)
871-
{
872-
size_t index = configuration.blobs_paths.size();
873-
const auto & first_key = configuration.blobs_paths[0];
874-
auto pos = first_key.find_first_of('.');
875-
String new_key;
876-
877-
do
878-
{
879-
new_key = first_key.substr(0, pos) + "." + std::to_string(index) + (pos == std::string::npos ? "" : first_key.substr(pos));
880-
++index;
881-
}
882-
while (object_storage->exists(StoredObject(new_key)));
883-
884-
configuration.blobs_paths.push_back(new_key);
885-
}
886-
else
887-
{
888-
throw Exception(
889-
ErrorCodes::BAD_ARGUMENTS,
890-
"Object in bucket {} with key {} already exists. "
891-
"If you want to overwrite it, enable setting azure_truncate_on_insert, if you "
892-
"want to create a new file on each insert, enable setting azure_create_new_file_on_insert",
893-
configuration.container, configuration.blobs_paths.back());
894-
}
900+
configuration.blobs_paths.push_back(*new_path);
901+
path = *new_path;
895902
}
896903

897904
return std::make_shared<StorageAzureBlobSink>(
@@ -901,7 +908,7 @@ SinkToStoragePtr StorageAzureBlob::write(const ASTPtr & query, const StorageMeta
901908
format_settings,
902909
chosen_compression_method,
903910
object_storage.get(),
904-
configuration.blobs_paths.back());
911+
path);
905912
}
906913
}
907914

src/Storages/StorageFile.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1968,22 +1968,22 @@ SinkToStoragePtr StorageFile::write(
19681968
"Table '{}' is in readonly mode because of globs in filepath",
19691969
getStorageID().getNameForLogs());
19701970

1971-
path = paths.back();
1971+
path = paths.front();
19721972
fs::create_directories(fs::path(path).parent_path());
19731973

19741974
std::error_code error_code;
19751975
if (!context->getSettingsRef().engine_file_truncate_on_insert && !is_path_with_globs
19761976
&& !FormatFactory::instance().checkIfFormatSupportAppend(format_name, context, format_settings)
1977-
&& fs::file_size(paths.back(), error_code) != 0 && !error_code)
1977+
&& fs::file_size(path, error_code) != 0 && !error_code)
19781978
{
19791979
if (context->getSettingsRef().engine_file_allow_create_multiple_files)
19801980
{
1981-
auto pos = paths[0].find_first_of('.', paths[0].find_last_of('/'));
1981+
auto pos = path.find_first_of('.', path.find_last_of('/'));
19821982
size_t index = paths.size();
19831983
String new_path;
19841984
do
19851985
{
1986-
new_path = paths[0].substr(0, pos) + "." + std::to_string(index) + (pos == std::string::npos ? "" : paths[0].substr(pos));
1986+
new_path = path.substr(0, pos) + "." + std::to_string(index) + (pos == std::string::npos ? "" : path.substr(pos));
19871987
++index;
19881988
}
19891989
while (fs::exists(new_path));

src/Storages/StorageS3.cpp

Lines changed: 40 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -955,6 +955,36 @@ class StorageS3Sink : public SinkToStorage
955955
std::mutex cancel_mutex;
956956
};
957957

958+
namespace
959+
{
960+
std::optional<String> checkAndGetNewFileOnInsertIfNeeded(const ContextPtr & context, const StorageS3::Configuration & configuration, const String & key, size_t sequence_number)
961+
{
962+
if (context->getSettingsRef().s3_truncate_on_insert || !S3::objectExists(*configuration.client, configuration.url.bucket, key, configuration.url.version_id, configuration.request_settings))
963+
return std::nullopt;
964+
965+
if (context->getSettingsRef().s3_create_new_file_on_insert)
966+
{
967+
auto pos = key.find_first_of('.');
968+
String new_key;
969+
do
970+
{
971+
new_key = key.substr(0, pos) + "." + std::to_string(sequence_number) + (pos == std::string::npos ? "" : key.substr(pos));
972+
++sequence_number;
973+
}
974+
while (S3::objectExists(*configuration.client, configuration.url.bucket, new_key, configuration.url.version_id, configuration.request_settings));
975+
976+
return new_key;
977+
}
978+
979+
throw Exception(
980+
ErrorCodes::BAD_ARGUMENTS,
981+
"Object in bucket {} with key {} already exists. "
982+
"If you want to overwrite it, enable setting s3_truncate_on_insert, if you "
983+
"want to create a new file on each insert, enable setting s3_create_new_file_on_insert",
984+
configuration.url.bucket, key);
985+
}
986+
}
987+
958988

959989
class PartitionedStorageS3Sink : public PartitionedSink, WithContext
960990
{
@@ -988,6 +1018,9 @@ class PartitionedStorageS3Sink : public PartitionedSink, WithContext
9881018
auto partition_key = replaceWildcards(key, partition_id);
9891019
validateKey(partition_key);
9901020

1021+
if (auto new_key = checkAndGetNewFileOnInsertIfNeeded(getContext(), configuration, partition_key, /* sequence_number */1))
1022+
partition_key = *new_key;
1023+
9911024
return std::make_shared<StorageS3Sink>(
9921025
format,
9931026
sample_block,
@@ -1248,6 +1281,7 @@ void ReadFromStorageS3Step::initializePipeline(QueryPipelineBuilder & pipeline,
12481281
SinkToStoragePtr StorageS3::write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context, bool /*async_insert*/)
12491282
{
12501283
auto query_configuration = updateConfigurationAndGetCopy(local_context);
1284+
auto key = query_configuration.keys.front();
12511285

12521286
auto sample_block = metadata_snapshot->getSampleBlock();
12531287
auto chosen_compression_method = chooseCompressionMethod(query_configuration.keys.back(), query_configuration.compression_method);
@@ -1267,43 +1301,19 @@ SinkToStoragePtr StorageS3::write(const ASTPtr & query, const StorageMetadataPtr
12671301
chosen_compression_method,
12681302
query_configuration,
12691303
query_configuration.url.bucket,
1270-
query_configuration.keys.back());
1304+
key);
12711305
}
12721306
else
12731307
{
12741308
if (query_configuration.withGlobs())
12751309
throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED,
12761310
"S3 key '{}' contains globs, so the table is in readonly mode", query_configuration.url.key);
12771311

1278-
bool truncate_in_insert = local_context->getSettingsRef().s3_truncate_on_insert;
1279-
1280-
if (!truncate_in_insert && S3::objectExists(*query_configuration.client, query_configuration.url.bucket, query_configuration.keys.back(), query_configuration.url.version_id, query_configuration.request_settings))
1312+
if (auto new_key = checkAndGetNewFileOnInsertIfNeeded(local_context, configuration, query_configuration.keys.front(), query_configuration.keys.size()))
12811313
{
1282-
if (local_context->getSettingsRef().s3_create_new_file_on_insert)
1283-
{
1284-
size_t index = query_configuration.keys.size();
1285-
const auto & first_key = query_configuration.keys[0];
1286-
auto pos = first_key.find_first_of('.');
1287-
String new_key;
1288-
do
1289-
{
1290-
new_key = first_key.substr(0, pos) + "." + std::to_string(index) + (pos == std::string::npos ? "" : first_key.substr(pos));
1291-
++index;
1292-
}
1293-
while (S3::objectExists(*query_configuration.client, query_configuration.url.bucket, new_key, query_configuration.url.version_id, query_configuration.request_settings));
1294-
1295-
query_configuration.keys.push_back(new_key);
1296-
configuration.keys.push_back(new_key);
1297-
}
1298-
else
1299-
{
1300-
throw Exception(
1301-
ErrorCodes::BAD_ARGUMENTS,
1302-
"Object in bucket {} with key {} already exists. "
1303-
"If you want to overwrite it, enable setting s3_truncate_on_insert, if you "
1304-
"want to create a new file on each insert, enable setting s3_create_new_file_on_insert",
1305-
query_configuration.url.bucket, query_configuration.keys.back());
1306-
}
1314+
query_configuration.keys.push_back(*new_key);
1315+
configuration.keys.push_back(*new_key);
1316+
key = *new_key;
13071317
}
13081318

13091319
return std::make_shared<StorageS3Sink>(
@@ -1314,7 +1324,7 @@ SinkToStoragePtr StorageS3::write(const ASTPtr & query, const StorageMetadataPtr
13141324
chosen_compression_method,
13151325
query_configuration,
13161326
query_configuration.url.bucket,
1317-
query_configuration.keys.back());
1327+
key);
13181328
}
13191329
}
13201330

0 commit comments

Comments
 (0)