Skip to content

Commit ff551f6

Browse files
committed
exports throtler
1 parent 8f2557f commit ff551f6

File tree

8 files changed

+35
-12
lines changed

8 files changed

+35
-12
lines changed

src/Core/ServerSettings.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ namespace DB
9999
DECLARE(UInt64, max_unexpected_parts_loading_thread_pool_size, 8, R"(The number of threads to load inactive set of data parts (Unexpected ones) at startup.)", 0) \
100100
DECLARE(UInt64, max_parts_cleaning_thread_pool_size, 128, R"(The number of threads for concurrent removal of inactive data parts.)", 0) \
101101
DECLARE(UInt64, max_mutations_bandwidth_for_server, 0, R"(The maximum read speed of all mutations on server in bytes per second. Zero means unlimited.)", 0) \
102+
DECLARE(UInt64, max_exports_bandwidth_for_server, 0, R"(The maximum read speed of all exports on server in bytes per second. Zero means unlimited.)", 0) \
102103
DECLARE(UInt64, max_merges_bandwidth_for_server, 0, R"(The maximum read speed of all merges on server in bytes per second. Zero means unlimited.)", 0) \
103104
DECLARE(UInt64, max_replicated_fetches_network_bandwidth_for_server, 0, R"(The maximum speed of data exchange over the network in bytes per second for replicated fetches. Zero means unlimited.)", 0) \
104105
DECLARE(UInt64, max_replicated_sends_network_bandwidth_for_server, 0, R"(The maximum speed of data exchange over the network in bytes per second for replicated sends. Zero means unlimited.)", 0) \

src/Interpreters/Context.cpp

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,7 @@ namespace ServerSetting
288288
extern const ServerSettingsUInt64 max_local_write_bandwidth_for_server;
289289
extern const ServerSettingsUInt64 max_merges_bandwidth_for_server;
290290
extern const ServerSettingsUInt64 max_mutations_bandwidth_for_server;
291+
extern const ServerSettingsUInt64 max_exports_bandwidth_for_server;
291292
extern const ServerSettingsUInt64 max_remote_read_network_bandwidth_for_server;
292293
extern const ServerSettingsUInt64 max_remote_write_network_bandwidth_for_server;
293294
extern const ServerSettingsUInt64 max_replicated_fetches_network_bandwidth_for_server;
@@ -504,6 +505,8 @@ struct ContextSharedPart : boost::noncopyable
504505
mutable ThrottlerPtr mutations_throttler; /// A server-wide throttler for mutations
505506
mutable ThrottlerPtr merges_throttler; /// A server-wide throttler for merges
506507

508+
mutable ThrottlerPtr exports_throttler; /// A server-wide throttler for exports
509+
507510
MultiVersion<Macros> macros; /// Substitutions extracted from config.
508511
std::unique_ptr<DDLWorker> ddl_worker TSA_GUARDED_BY(mutex); /// Process ddl commands from zk.
509512
LoadTaskPtr ddl_worker_startup_task; /// To postpone `ddl_worker->startup()` after all tables startup
@@ -992,6 +995,9 @@ struct ContextSharedPart : boost::noncopyable
992995

993996
if (auto bandwidth = server_settings[ServerSetting::max_merges_bandwidth_for_server])
994997
merges_throttler = std::make_shared<Throttler>(bandwidth);
998+
999+
if (auto bandwidth = server_settings[ServerSetting::max_exports_bandwidth_for_server])
1000+
exports_throttler = std::make_shared<Throttler>(bandwidth);
9951001
}
9961002
};
9971003

@@ -4041,6 +4047,11 @@ ThrottlerPtr Context::getMergesThrottler() const
40414047
return shared->merges_throttler;
40424048
}
40434049

4050+
ThrottlerPtr Context::getExportsThrottler() const
4051+
{
4052+
return shared->exports_throttler;
4053+
}
4054+
40444055
void Context::reloadRemoteThrottlerConfig(size_t read_bandwidth, size_t write_bandwidth) const
40454056
{
40464057
if (read_bandwidth)

src/Interpreters/Context.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1635,6 +1635,7 @@ class Context: public ContextData, public std::enable_shared_from_this<Context>
16351635

16361636
ThrottlerPtr getMutationsThrottler() const;
16371637
ThrottlerPtr getMergesThrottler() const;
1638+
ThrottlerPtr getExportsThrottler() const;
16381639

16391640
void reloadRemoteThrottlerConfig(size_t read_bandwidth, size_t write_bandwidth) const;
16401641
void reloadLocalThrottlerConfig(size_t read_bandwidth, size_t write_bandwidth) const;

src/Storages/MergeTree/MergeTreeSequentialSource.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,9 @@ MergeTreeSequentialSource::MergeTreeSequentialSource(
140140
case Merge:
141141
read_settings.local_throttler = context->getMergesThrottler();
142142
break;
143+
case Export:
144+
read_settings.local_throttler = context->getExportsThrottler();
145+
break;
143146
}
144147

145148
read_settings.remote_throttler = read_settings.local_throttler;

src/Storages/MergeTree/MergeTreeSequentialSource.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ enum MergeTreeSequentialSourceType
1515
{
1616
Mutation,
1717
Merge,
18+
Export,
1819
};
1920

2021
/// Create stream for reading single part from MergeTree.

src/Storages/ObjectStorage/MergeTree/ExportPartPlainMergeTreeTask.cpp

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,6 @@
1010
namespace DB
1111
{
1212

13-
namespace Setting
14-
{
15-
extern const SettingsBool output_format_parallel_formatting;
16-
}
17-
1813
ExportPartPlainMergeTreeTask::ExportPartPlainMergeTreeTask(
1914
StorageMergeTree & storage_,
2015
const DataPartPtr & part_to_export_,

src/Storages/ObjectStorage/MergeTree/ExportPartPlainMergeTreeTask.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ class ExportPartPlainMergeTreeTask : public IExecutableTask
2323
ContextPtr context_,
2424
std::shared_ptr<MergeTreeExportManifest> manifest_,
2525
IExecutableTask::TaskResultCallback & task_result_callback_,
26-
size_t max_retries_ = 3);
26+
size_t max_retries_);
2727

2828
void onCompleted() override;
2929
bool executeStep() override;

src/Storages/ObjectStorage/StorageObjectStorage.cpp

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -565,20 +565,31 @@ void StorageObjectStorage::importMergeTreePart(
565565

566566
QueryPlan plan;
567567

568-
/// using the mutations type for now
569-
MergeTreeSequentialSourceType read_type = MergeTreeSequentialSourceType::Mutation;
568+
/// using the mutations type for now. This impacts in the throttling strategy
569+
MergeTreeSequentialSourceType read_type = MergeTreeSequentialSourceType::Export;
570570

571+
/// todo implement these settings
571572
bool apply_deleted_mask = true;
572573
bool read_with_direct_io = false;
573574
bool prefetch = false;
574575

575-
const auto partition_columns = configuration->partition_strategy->getPartitionColumns();
576+
std::string partition_key;
576577

577-
auto block_with_partition_values = data_part->partition.getBlockWithPartitionValues(partition_columns);
578+
if (configuration->partition_strategy)
579+
{
580+
const auto partition_columns = configuration->partition_strategy->getPartitionColumns();
581+
582+
auto block_with_partition_values = data_part->partition.getBlockWithPartitionValues(partition_columns);
583+
584+
const auto column_with_partition_key = configuration->partition_strategy->computePartitionKey(block_with_partition_values);
578585

579-
const auto column_with_partition_key = configuration->partition_strategy->computePartitionKey(block_with_partition_values);
586+
if (!column_with_partition_key->empty())
587+
{
588+
partition_key = column_with_partition_key->getDataAt(0).toString();
589+
}
590+
}
580591

581-
const auto file_path = configuration->file_path_generator->getWritingPath(column_with_partition_key->getDataAt(0).toString());
592+
const auto file_path = configuration->file_path_generator->getWritingPath(partition_key);
582593

583594
MergeTreeData::IMutationsSnapshot::Params params
584595
{

0 commit comments

Comments
 (0)