Skip to content

Commit 9628874

Browse files
committed
Better
1 parent 16cdd06 commit 9628874

File tree

4 files changed

+19
-13
lines changed

4 files changed

+19
-13
lines changed

src/Storages/HDFS/StorageHDFS.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1092,7 +1092,7 @@ void ReadFromHDFS::initializePipeline(QueryPipelineBuilder & pipeline, const Bui
10921092

10931093
SinkToStoragePtr StorageHDFS::write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr context_, bool /*async_insert*/)
10941094
{
1095-
String current_uri = uris.back();
1095+
String current_uri = uris.front();
10961096

10971097
bool has_wildcards = current_uri.find(PartitionedSink::PARTITION_ID_WILDCARD) != String::npos;
10981098
const auto * insert_query = dynamic_cast<const ASTInsertQuery *>(query.get());
@@ -1114,7 +1114,7 @@ SinkToStoragePtr StorageHDFS::write(const ASTPtr & query, const StorageMetadataP
11141114
if (is_path_with_globs)
11151115
throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED, "URI '{}' contains globs, so the table is in readonly mode", uris.back());
11161116

1117-
if (auto new_uri = checkFileExistsAndCreateNewKeyIfNeeded(context_, current_uri, uris.size()))
1117+
if (auto new_uri = checkFileExistsAndCreateNewKeyIfNeeded(context_, uris.front(), uris.size()))
11181118
{
11191119
uris.push_back(*new_uri);
11201120
current_uri = *new_uri;

src/Storages/StorageAzureBlob.cpp

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -868,8 +868,9 @@ void ReadFromAzureBlob::initializePipeline(QueryPipelineBuilder & pipeline, cons
868868

869869
SinkToStoragePtr StorageAzureBlob::write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context, bool /*async_insert*/)
870870
{
871+
auto path = configuration.blobs_paths.front();
871872
auto sample_block = metadata_snapshot->getSampleBlock();
872-
auto chosen_compression_method = chooseCompressionMethod(configuration.blobs_paths.back(), configuration.compression_method);
873+
auto chosen_compression_method = chooseCompressionMethod(path, configuration.compression_method);
873874
auto insert_query = std::dynamic_pointer_cast<ASTInsertQuery>(query);
874875

875876
auto partition_by_ast = insert_query ? (insert_query->partition_by ? insert_query->partition_by : partition_by) : nullptr;
@@ -885,16 +886,19 @@ SinkToStoragePtr StorageAzureBlob::write(const ASTPtr & query, const StorageMeta
885886
format_settings,
886887
chosen_compression_method,
887888
object_storage.get(),
888-
configuration.blobs_paths.back());
889+
path);
889890
}
890891
else
891892
{
892893
if (configuration.withGlobs())
893894
throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED,
894895
"AzureBlobStorage key '{}' contains globs, so the table is in readonly mode", configuration.blob_path);
895896

896-
if (auto new_path = checkFileExistsAndCreateNewKeyIfNeeded(local_context, object_storage.get(), configuration.blobs_paths.back(), configuration.blobs_paths.size()))
897+
if (auto new_path = checkFileExistsAndCreateNewKeyIfNeeded(local_context, object_storage.get(), path, configuration.blobs_paths.size()))
898+
{
897899
configuration.blobs_paths.push_back(*new_path);
900+
path = *new_path;
901+
}
898902

899903
return std::make_shared<StorageAzureBlobSink>(
900904
configuration.format,
@@ -903,7 +907,7 @@ SinkToStoragePtr StorageAzureBlob::write(const ASTPtr & query, const StorageMeta
903907
format_settings,
904908
chosen_compression_method,
905909
object_storage.get(),
906-
configuration.blobs_paths.back());
910+
path);
907911
}
908912
}
909913

src/Storages/StorageFile.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1968,22 +1968,22 @@ SinkToStoragePtr StorageFile::write(
19681968
"Table '{}' is in readonly mode because of globs in filepath",
19691969
getStorageID().getNameForLogs());
19701970

1971-
path = paths.back();
1971+
path = paths.front();
19721972
fs::create_directories(fs::path(path).parent_path());
19731973

19741974
std::error_code error_code;
19751975
if (!context->getSettingsRef().engine_file_truncate_on_insert && !is_path_with_globs
19761976
&& !FormatFactory::instance().checkIfFormatSupportAppend(format_name, context, format_settings)
1977-
&& fs::file_size(paths.back(), error_code) != 0 && !error_code)
1977+
&& fs::file_size(path, error_code) != 0 && !error_code)
19781978
{
19791979
if (context->getSettingsRef().engine_file_allow_create_multiple_files)
19801980
{
1981-
auto pos = paths[0].find_first_of('.', paths[0].find_last_of('/'));
1981+
auto pos = path.find_first_of('.', path.find_last_of('/'));
19821982
size_t index = paths.size();
19831983
String new_path;
19841984
do
19851985
{
1986-
new_path = paths[0].substr(0, pos) + "." + std::to_string(index) + (pos == std::string::npos ? "" : paths[0].substr(pos));
1986+
new_path = path.substr(0, pos) + "." + std::to_string(index) + (pos == std::string::npos ? "" : path.substr(pos));
19871987
++index;
19881988
}
19891989
while (fs::exists(new_path));

src/Storages/StorageS3.cpp

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1281,6 +1281,7 @@ void ReadFromStorageS3Step::initializePipeline(QueryPipelineBuilder & pipeline,
12811281
SinkToStoragePtr StorageS3::write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context, bool /*async_insert*/)
12821282
{
12831283
auto query_configuration = updateConfigurationAndGetCopy(local_context);
1284+
auto key = query_configuration.keys.front();
12841285

12851286
auto sample_block = metadata_snapshot->getSampleBlock();
12861287
auto chosen_compression_method = chooseCompressionMethod(query_configuration.keys.back(), query_configuration.compression_method);
@@ -1300,18 +1301,19 @@ SinkToStoragePtr StorageS3::write(const ASTPtr & query, const StorageMetadataPtr
13001301
chosen_compression_method,
13011302
query_configuration,
13021303
query_configuration.url.bucket,
1303-
query_configuration.keys.back());
1304+
key);
13041305
}
13051306
else
13061307
{
13071308
if (query_configuration.withGlobs())
13081309
throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED,
13091310
"S3 key '{}' contains globs, so the table is in readonly mode", query_configuration.url.key);
13101311

1311-
if (auto new_key = checkFileExistsAndCreateNewKeyIfNeeded(local_context, configuration, query_configuration.keys.back(), query_configuration.keys.size()))
1312+
if (auto new_key = checkFileExistsAndCreateNewKeyIfNeeded(local_context, configuration, query_configuration.keys.front(), query_configuration.keys.size()))
13121313
{
13131314
query_configuration.keys.push_back(*new_key);
13141315
configuration.keys.push_back(*new_key);
1316+
key = *new_key;
13151317
}
13161318

13171319
return std::make_shared<StorageS3Sink>(
@@ -1322,7 +1324,7 @@ SinkToStoragePtr StorageS3::write(const ASTPtr & query, const StorageMetadataPtr
13221324
chosen_compression_method,
13231325
query_configuration,
13241326
query_configuration.url.bucket,
1325-
query_configuration.keys.back());
1327+
key);
13261328
}
13271329
}
13281330

0 commit comments

Comments
 (0)