Skip to content

Commit 8fd1ba5

Browse files
authored
Merge pull request #1144 from Altinity/preserve_format_settings_export_part
Preserve the entire format settings object in export part manifest
2 parents 44b5a93 + f941909 commit 8fd1ba5

File tree

7 files changed

+17
-22
lines changed

7 files changed

+17
-22
lines changed

src/Storages/IStorage.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -468,6 +468,7 @@ It is currently only implemented in StorageObjectStorage.
468468
Block & /* block_with_partition_values */,
469469
std::string & /* destination_file_path */,
470470
bool /* overwrite_if_exists */,
471+
const std::optional<FormatSettings> & /* format_settings */,
471472
ContextPtr /* context */)
472473
{
473474
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Import is not implemented for storage {}", getName());

src/Storages/MergeTree/MergeTreeData.cpp

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
#include <Disks/SingleDiskVolume.h>
4848
#include <Disks/TemporaryFileOnDisk.h>
4949
#include <Disks/createVolume.h>
50+
#include <Formats/FormatFactory.h>
5051
#include <IO/Operators.h>
5152
#include <IO/S3Common.h>
5253
#include <IO/SharedThreadPools.h>
@@ -6241,13 +6242,12 @@ void MergeTreeData::exportPartToTable(const PartitionCommand & command, ContextP
62416242
part_name, getStorageID().getFullTableName());
62426243

62436244
{
6245+
const auto format_settings = getFormatSettings(query_context);
62446246
MergeTreeExportManifest manifest(
62456247
dest_storage->getStorageID(),
62466248
part,
62476249
query_context->getSettingsRef()[Setting::export_merge_tree_part_overwrite_file_if_exists],
6248-
query_context->getSettingsRef()[Setting::output_format_parallel_formatting],
6249-
query_context->getSettingsRef()[Setting::output_format_parquet_parallel_encoding],
6250-
query_context->getSettingsRef()[Setting::max_threads]);
6250+
format_settings);
62516251

62526252
std::lock_guard lock(export_manifests_mutex);
62536253

@@ -6293,17 +6293,13 @@ void MergeTreeData::exportPartToTableImpl(
62936293

62946294
try
62956295
{
6296-
auto context_copy = Context::createCopy(local_context);
6297-
context_copy->setSetting("output_format_parallel_formatting", manifest.parallel_formatting);
6298-
context_copy->setSetting("output_format_parquet_parallel_encoding", manifest.parquet_parallel_encoding);
6299-
context_copy->setSetting("max_threads", manifest.max_threads);
6300-
63016296
sink = destination_storage->import(
63026297
manifest.data_part->name + "_" + manifest.data_part->checksums.getTotalChecksumHex(),
63036298
block_with_partition_values,
63046299
destination_file_path,
63056300
manifest.overwrite_file_if_exists,
6306-
context_copy);
6301+
manifest.format_settings,
6302+
local_context);
63076303
}
63086304
catch (const Exception & e)
63096305
{

src/Storages/MergeTree/MergeTreeExportManifest.h

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,24 +13,17 @@ struct MergeTreeExportManifest
1313
const StorageID & destination_storage_id_,
1414
const DataPartPtr & data_part_,
1515
bool overwrite_file_if_exists_,
16-
bool parallel_formatting_,
17-
bool parallel_formatting_parquet_,
18-
std::size_t max_threads_)
16+
const FormatSettings & format_settings_)
1917
: destination_storage_id(destination_storage_id_),
2018
data_part(data_part_),
2119
overwrite_file_if_exists(overwrite_file_if_exists_),
22-
parallel_formatting(parallel_formatting_),
23-
parquet_parallel_encoding(parallel_formatting_parquet_),
24-
max_threads(max_threads_),
20+
format_settings(format_settings_),
2521
create_time(time(nullptr)) {}
2622

2723
StorageID destination_storage_id;
2824
DataPartPtr data_part;
2925
bool overwrite_file_if_exists;
30-
bool parallel_formatting;
31-
/// parquet has a different setting for parallel formatting
32-
bool parquet_parallel_encoding;
33-
std::size_t max_threads;
26+
FormatSettings format_settings;
3427

3528
time_t create_time;
3629
mutable bool in_progress = false;

src/Storages/ObjectStorage/StorageObjectStorage.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -483,6 +483,7 @@ SinkToStoragePtr StorageObjectStorage::import(
483483
Block & block_with_partition_values,
484484
std::string & destination_file_path,
485485
bool overwrite_if_exists,
486+
const std::optional<FormatSettings> & format_settings_,
486487
ContextPtr local_context)
487488
{
488489
std::string partition_key;
@@ -508,7 +509,7 @@ SinkToStoragePtr StorageObjectStorage::import(
508509
destination_file_path,
509510
object_storage,
510511
configuration,
511-
std::nullopt, /// passing nullopt to force rebuild for format_settings based on query context
512+
format_settings_ ? format_settings_ : format_settings,
512513
std::make_shared<const Block>(getInMemoryMetadataPtr()->getSampleBlock()),
513514
local_context);
514515
}

src/Storages/ObjectStorage/StorageObjectStorage.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ class StorageObjectStorage : public IStorage
8686
Block & /* block_with_partition_values */,
8787
std::string & /* destination_file_path */,
8888
bool /* overwrite_if_exists */,
89+
const std::optional<FormatSettings> & /* format_settings_ */,
8990
ContextPtr /* context */) override;
9091

9192
void truncate(

src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
#include <optional>
12
#include <Storages/ObjectStorage/StorageObjectStorageCluster.h>
23

34
#include <Common/Exception.h>
@@ -575,12 +576,13 @@ SinkToStoragePtr StorageObjectStorageCluster::import(
575576
Block & block_with_partition_values,
576577
std::string & destination_file_path,
577578
bool overwrite_if_exists,
579+
const std::optional<FormatSettings> & format_settings_,
578580
ContextPtr context)
579581
{
580582
if (pure_storage)
581-
return pure_storage->import(file_name, block_with_partition_values, destination_file_path, overwrite_if_exists, context);
583+
return pure_storage->import(file_name, block_with_partition_values, destination_file_path, overwrite_if_exists, format_settings_, context);
582584

583-
return IStorageCluster::import(file_name, block_with_partition_values, destination_file_path, overwrite_if_exists, context);
585+
return IStorageCluster::import(file_name, block_with_partition_values, destination_file_path, overwrite_if_exists, format_settings_, context);
584586
}
585587

586588
void StorageObjectStorageCluster::readFallBackToPure(

src/Storages/ObjectStorage/StorageObjectStorageCluster.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ class StorageObjectStorageCluster : public IStorageCluster
130130
Block & /* block_with_partition_values */,
131131
std::string & /* destination_file_path */,
132132
bool /* overwrite_if_exists */,
133+
const std::optional<FormatSettings> & /* format_settings_ */,
133134
ContextPtr /* context */) override;
134135
bool prefersLargeBlocks() const override;
135136

0 commit comments

Comments
 (0)