Skip to content

Commit 6b9870a

Browse files
committed
Merge branch 'antalya-25.8' into export_replicated_mt_partition_v2
2 parents 59cd727 + 8fd1ba5 commit 6b9870a

File tree

8 files changed

+17
-22
lines changed

8 files changed

+17
-22
lines changed

src/Storages/IStorage.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -468,6 +468,7 @@ It is currently only implemented in StorageObjectStorage.
468468
Block & /* block_with_partition_values */,
469469
std::string & /* destination_file_path */,
470470
bool /* overwrite_if_exists */,
471+
const std::optional<FormatSettings> & /* format_settings */,
471472
ContextPtr /* context */)
472473
{
473474
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Import is not implemented for storage {}", getName());

src/Storages/MergeTree/ExportPartTask.cpp

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -86,17 +86,13 @@ bool ExportPartTask::executeStep()
8686

8787
try
8888
{
89-
auto context_copy = Context::createCopy(local_context);
90-
context_copy->setSetting("output_format_parallel_formatting", manifest.parallel_formatting);
91-
context_copy->setSetting("output_format_parquet_parallel_encoding", manifest.parquet_parallel_encoding);
92-
context_copy->setSetting("max_threads", manifest.max_threads);
93-
9489
sink = destination_storage->import(
9590
manifest.data_part->name + "_" + manifest.data_part->checksums.getTotalChecksumHex(),
9691
block_with_partition_values,
9792
(*exports_list_entry)->destination_file_path,
9893
manifest.file_already_exists_policy == MergeTreePartExportManifest::FileAlreadyExistsPolicy::overwrite,
99-
context_copy);
94+
manifest.format_settings,
95+
local_context);
10096
}
10197
catch (const Exception & e)
10298
{

src/Storages/MergeTree/MergeTreeData.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
#include <Disks/SingleDiskVolume.h>
4949
#include <Disks/TemporaryFileOnDisk.h>
5050
#include <Disks/createVolume.h>
51+
#include <Formats/FormatFactory.h>
5152
#include <IO/Operators.h>
5253
#include <IO/S3Common.h>
5354
#include <IO/SharedThreadPools.h>
@@ -6254,14 +6255,13 @@ void MergeTreeData::exportPartToTable(
62546255
part_name, getStorageID().getFullTableName());
62556256

62566257
{
6258+
const auto format_settings = getFormatSettings(query_context);
62576259
MergeTreePartExportManifest manifest(
62586260
dest_storage->getStorageID(),
62596261
part,
62606262
transaction_id,
62616263
query_context->getSettingsRef()[Setting::export_merge_tree_part_file_already_exists_policy].value,
6262-
query_context->getSettingsRef()[Setting::output_format_parallel_formatting],
6263-
query_context->getSettingsRef()[Setting::output_format_parquet_parallel_encoding],
6264-
query_context->getSettingsRef()[Setting::max_threads],
6264+
format_settings,
62656265
completion_callback);
62666266

62676267
std::lock_guard lock(export_manifests_mutex);

src/Storages/MergeTree/MergeTreePartExportManifest.h

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -45,17 +45,13 @@ struct MergeTreePartExportManifest
4545
const DataPartPtr & data_part_,
4646
const String & transaction_id_,
4747
FileAlreadyExistsPolicy file_already_exists_policy_,
48-
bool parallel_formatting_,
49-
bool parquet_parallel_encoding_,
50-
std::size_t max_threads_,
48+
const FormatSettings & format_settings_,
5149
std::function<void(CompletionCallbackResult)> completion_callback_ = {})
5250
: destination_storage_id(destination_storage_id_),
5351
data_part(data_part_),
5452
transaction_id(transaction_id_),
5553
file_already_exists_policy(file_already_exists_policy_),
56-
parallel_formatting(parallel_formatting_),
57-
parquet_parallel_encoding(parquet_parallel_encoding_),
58-
max_threads(max_threads_),
54+
format_settings(format_settings_),
5955
completion_callback(completion_callback_),
6056
create_time(time(nullptr)) {}
6157

@@ -64,10 +60,7 @@ struct MergeTreePartExportManifest
6460
/// Used for killing the export.
6561
String transaction_id;
6662
FileAlreadyExistsPolicy file_already_exists_policy;
67-
bool parallel_formatting;
68-
/// parquet has a different setting for parallel formatting
69-
bool parquet_parallel_encoding;
70-
std::size_t max_threads;
63+
FormatSettings format_settings;
7164

7265
std::function<void(CompletionCallbackResult)> completion_callback;
7366

src/Storages/ObjectStorage/StorageObjectStorage.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -484,6 +484,7 @@ SinkToStoragePtr StorageObjectStorage::import(
484484
Block & block_with_partition_values,
485485
std::string & destination_file_path,
486486
bool overwrite_if_exists,
487+
const std::optional<FormatSettings> & format_settings_,
487488
ContextPtr local_context)
488489
{
489490
std::string partition_key;
@@ -509,7 +510,7 @@ SinkToStoragePtr StorageObjectStorage::import(
509510
destination_file_path,
510511
object_storage,
511512
configuration,
512-
std::nullopt, /// passing nullopt to force rebuild for format_settings based on query context
513+
format_settings_ ? format_settings_ : format_settings,
513514
std::make_shared<const Block>(getInMemoryMetadataPtr()->getSampleBlock()),
514515
local_context);
515516
}

src/Storages/ObjectStorage/StorageObjectStorage.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ class StorageObjectStorage : public IStorage
8686
Block & /* block_with_partition_values */,
8787
std::string & /* destination_file_path */,
8888
bool /* overwrite_if_exists */,
89+
const std::optional<FormatSettings> & /* format_settings_ */,
8990
ContextPtr /* context */) override;
9091

9192
void commitExportPartitionTransaction(

src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
#include <optional>
12
#include <Storages/ObjectStorage/StorageObjectStorageCluster.h>
23

34
#include <Common/Exception.h>
@@ -575,12 +576,13 @@ SinkToStoragePtr StorageObjectStorageCluster::import(
575576
Block & block_with_partition_values,
576577
std::string & destination_file_path,
577578
bool overwrite_if_exists,
579+
const std::optional<FormatSettings> & format_settings_,
578580
ContextPtr context)
579581
{
580582
if (pure_storage)
581-
return pure_storage->import(file_name, block_with_partition_values, destination_file_path, overwrite_if_exists, context);
583+
return pure_storage->import(file_name, block_with_partition_values, destination_file_path, overwrite_if_exists, format_settings_, context);
582584

583-
return IStorageCluster::import(file_name, block_with_partition_values, destination_file_path, overwrite_if_exists, context);
585+
return IStorageCluster::import(file_name, block_with_partition_values, destination_file_path, overwrite_if_exists, format_settings_, context);
584586
}
585587

586588
void StorageObjectStorageCluster::readFallBackToPure(

src/Storages/ObjectStorage/StorageObjectStorageCluster.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ class StorageObjectStorageCluster : public IStorageCluster
130130
Block & /* block_with_partition_values */,
131131
std::string & /* destination_file_path */,
132132
bool /* overwrite_if_exists */,
133+
const std::optional<FormatSettings> & /* format_settings_ */,
133134
ContextPtr /* context */) override;
134135
bool prefersLargeBlocks() const override;
135136

0 commit comments

Comments
 (0)