Skip to content

Commit f3bd121

Browse files
authored
Merge pull request #1041 from Altinity/fp_antalya_25_8_export_mt_part
Antalya 25.8 - Forward port of # 1009 - Export merge tree part
2 parents 770d011 + c7e28bc commit f3bd121

File tree

65 files changed

+1229
-139
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

65 files changed

+1229
-139
lines changed

src/Access/Common/AccessType.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,7 @@ enum class AccessType : uint8_t
210210
enabled implicitly by the grant ALTER_TABLE */\
211211
M(ALTER_SETTINGS, "ALTER SETTING, ALTER MODIFY SETTING, MODIFY SETTING, RESET SETTING", TABLE, ALTER_TABLE) /* allows to execute ALTER MODIFY SETTING */\
212212
M(ALTER_MOVE_PARTITION, "ALTER MOVE PART, MOVE PARTITION, MOVE PART", TABLE, ALTER_TABLE) \
213+
M(ALTER_EXPORT_PART, "ALTER EXPORT PART, EXPORT PART", TABLE, ALTER_TABLE) \
213214
M(ALTER_FETCH_PARTITION, "ALTER FETCH PART, FETCH PARTITION", TABLE, ALTER_TABLE) \
214215
M(ALTER_FREEZE_PARTITION, "FREEZE PARTITION, UNFREEZE", TABLE, ALTER_TABLE) \
215216
M(ALTER_UNLOCK_SNAPSHOT, "UNLOCK SNAPSHOT", TABLE, ALTER_TABLE) \

src/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ add_headers_and_sources(dbms Storages/ObjectStorage/Azure)
130130
add_headers_and_sources(dbms Storages/ObjectStorage/S3)
131131
add_headers_and_sources(dbms Storages/ObjectStorage/HDFS)
132132
add_headers_and_sources(dbms Storages/ObjectStorage/Local)
133+
add_headers_and_sources(dbms Storages/ObjectStorage/MergeTree)
133134
add_headers_and_sources(dbms Storages/ObjectStorage/DataLakes)
134135
add_headers_and_sources(dbms Storages/ObjectStorage/DataLakes/Iceberg)
135136
add_headers_and_sources(dbms Storages/ObjectStorage/DataLakes/DeltaLake)

src/Common/CurrentMetrics.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
M(Merge, "Number of executing background merges") \
1111
M(MergeParts, "Number of source parts participating in current background merges") \
1212
M(Move, "Number of currently executing moves") \
13+
M(Export, "Number of currently executing exports") \
1314
M(PartMutation, "Number of mutations (ALTER DELETE/UPDATE)") \
1415
M(ReplicatedFetch, "Number of data parts being fetched from replica") \
1516
M(ReplicatedSend, "Number of data parts being sent to replicas") \

src/Common/ProfileEvents.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@
2828
M(AsyncInsertBytes, "Data size in bytes of asynchronous INSERT queries.", ValueType::Bytes) \
2929
M(AsyncInsertRows, "Number of rows inserted by asynchronous INSERT queries.", ValueType::Number) \
3030
M(AsyncInsertCacheHits, "Number of times a duplicate hash id has been found in asynchronous INSERT hash id cache.", ValueType::Number) \
31+
M(PartsExports, "Number of successful part exports.", ValueType::Number) \
32+
M(PartsExportFailures, "Number of failed part exports.", ValueType::Number) \
33+
M(PartsExportDuplicated, "Number of part exports that failed because target already exists.", ValueType::Number) \
34+
M(PartsExportTotalMilliseconds, "Total time spent on part export operations.", ValueType::Milliseconds) \
3135
M(FailedQuery, "Number of failed queries.", ValueType::Number) \
3236
M(FailedSelectQuery, "Same as FailedQuery, but only for SELECT queries.", ValueType::Number) \
3337
M(FailedInsertQuery, "Same as FailedQuery, but only for INSERT queries.", ValueType::Number) \
@@ -171,6 +175,8 @@
171175
M(MergesThrottlerSleepMicroseconds, "Total time a query was sleeping to conform 'max_merges_bandwidth_for_server' throttling.", ValueType::Microseconds) \
172176
M(MutationsThrottlerBytes, "Bytes passed through 'max_mutations_bandwidth_for_server' throttler.", ValueType::Bytes) \
173177
M(MutationsThrottlerSleepMicroseconds, "Total time a query was sleeping to conform 'max_mutations_bandwidth_for_server' throttling.", ValueType::Microseconds) \
178+
M(ExportsThrottlerBytes, "Bytes passed through 'max_exports_bandwidth_for_server' throttler.", ValueType::Bytes) \
179+
M(ExportsThrottlerSleepMicroseconds, "Total time a query was sleeping to conform 'max_exports_bandwidth_for_server' throttling.", ValueType::Microseconds) \
174180
M(QueryRemoteReadThrottlerBytes, "Bytes passed through 'max_remote_read_network_bandwidth' throttler.", ValueType::Bytes) \
175181
M(QueryRemoteReadThrottlerSleepMicroseconds, "Total time a query was sleeping to conform 'max_remote_read_network_bandwidth' throttling.", ValueType::Microseconds) \
176182
M(QueryRemoteWriteThrottlerBytes, "Bytes passed through 'max_remote_write_network_bandwidth' throttler.", ValueType::Bytes) \

src/Core/ServerSettings.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ namespace DB
112112
DECLARE(UInt64, max_unexpected_parts_loading_thread_pool_size, 8, R"(The number of threads to load inactive set of data parts (Unexpected ones) at startup.)", 0) \
113113
DECLARE(UInt64, max_parts_cleaning_thread_pool_size, 128, R"(The number of threads for concurrent removal of inactive data parts.)", 0) \
114114
DECLARE(UInt64, max_mutations_bandwidth_for_server, 0, R"(The maximum read speed of all mutations on server in bytes per second. Zero means unlimited.)", 0) \
115+
DECLARE(UInt64, max_exports_bandwidth_for_server, 0, R"(The maximum read speed of all exports on server in bytes per second. Zero means unlimited.)", 0) \
115116
DECLARE(UInt64, max_merges_bandwidth_for_server, 0, R"(The maximum read speed of all merges on server in bytes per second. Zero means unlimited.)", 0) \
116117
DECLARE(UInt64, max_replicated_fetches_network_bandwidth_for_server, 0, R"(The maximum speed of data exchange over the network in bytes per second for replicated fetches. Zero means unlimited.)", 0) \
117118
DECLARE(UInt64, max_replicated_sends_network_bandwidth_for_server, 0, R"(The maximum speed of data exchange over the network in bytes per second for replicated sends. Zero means unlimited.)", 0) \

src/Core/Settings.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6870,6 +6870,9 @@ Possible values:
68706870
)", 0) \
68716871
DECLARE(Bool, use_roaring_bitmap_iceberg_positional_deletes, false, R"(
68726872
Use roaring bitmap for iceberg positional deletes.
6873+
)", 0) \
6874+
DECLARE(Bool, export_merge_tree_part_overwrite_file_if_exists, false, R"(
6875+
Overwrite file if it already exists when exporting a merge tree part
68736876
)", 0) \
68746877
\
68756878
/* ####################################################### */ \
@@ -7080,6 +7083,9 @@ Execute request to object storage as remote on one of object_storage_cluster nod
70807083
DECLARE_WITH_ALIAS(Bool, allow_experimental_time_series_aggregate_functions, false, R"(
70817084
Experimental timeSeries* aggregate functions for Prometheus-like timeseries resampling, rate, delta calculation.
70827085
)", EXPERIMENTAL, allow_experimental_ts_to_grid_aggregate_function) \
7086+
DECLARE_WITH_ALIAS(Bool, allow_experimental_export_merge_tree_part, false, R"(
7087+
Experimental export merge tree part.
7088+
)", EXPERIMENTAL, allow_experimental_export_merge_tree_part) \
70837089
\
70847090
DECLARE(String, promql_database, "", R"(
70857091
Specifies the database name used by the 'promql' dialect. Empty string means the current database.

src/Core/SettingsChangesHistory.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,8 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
159159
{"output_format_parquet_enum_as_byte_array", true, true, "Enable writing Enum as byte array in Parquet by default"},
160160
{"object_storage_cluster", "", "", "New setting"},
161161
{"object_storage_max_nodes", 0, 0, "New setting"},
162+
{"allow_experimental_export_merge_tree_part", false, false, "New setting."},
163+
{"export_merge_tree_part_overwrite_file_if_exists", false, false, "New setting."},
162164
});
163165
addSettingsChanges(settings_changes_history, "25.6",
164166
{

src/Databases/DatabaseReplicated.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2245,7 +2245,8 @@ bool DatabaseReplicated::shouldReplicateQuery(const ContextPtr & query_context,
22452245
if (const auto * alter = query_ptr->as<const ASTAlterQuery>())
22462246
{
22472247
if (alter->isAttachAlter() || alter->isFetchAlter() || alter->isDropPartitionAlter()
2248-
|| is_keeper_map_table(query_ptr) || alter->isFreezeAlter() || alter->isUnlockSnapshot())
2248+
|| is_keeper_map_table(query_ptr) || alter->isFreezeAlter() || alter->isUnlockSnapshot()
2249+
|| alter->isExportPartAlter())
22492250
return false;
22502251

22512252
if (has_many_shards() || !is_replicated_table(query_ptr))

src/Disks/ObjectStorages/IObjectStorage.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,8 @@ struct RelativePathWithMetadata
162162
virtual ~RelativePathWithMetadata() = default;
163163

164164
virtual std::string getFileName() const { return std::filesystem::path(relative_path).filename(); }
165+
virtual std::string getFileNameWithoutExtension() const { return std::filesystem::path(relative_path).stem(); }
166+
165167
virtual std::string getPath() const { return relative_path; }
166168
virtual bool isArchive() const { return false; }
167169
virtual std::string getPathToArchive() const { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not an archive"); }

src/Interpreters/Context.cpp

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
#include <Storages/MarkCache.h>
3636
#include <Storages/MergeTree/MergeList.h>
3737
#include <Storages/MergeTree/MovesList.h>
38+
#include <Storages/MergeTree/ExportList.h>
3839
#include <Storages/MergeTree/ReplicatedFetchList.h>
3940
#include <Storages/MergeTree/MergeTreeData.h>
4041
#include <Storages/MergeTree/MergeTreeSettings.h>
@@ -157,6 +158,8 @@ namespace ProfileEvents
157158
extern const Event BackupThrottlerSleepMicroseconds;
158159
extern const Event MergesThrottlerBytes;
159160
extern const Event MergesThrottlerSleepMicroseconds;
161+
extern const Event ExportsThrottlerBytes;
162+
extern const Event ExportsThrottlerSleepMicroseconds;
160163
extern const Event MutationsThrottlerBytes;
161164
extern const Event MutationsThrottlerSleepMicroseconds;
162165
extern const Event QueryLocalReadThrottlerBytes;
@@ -325,6 +328,7 @@ namespace ServerSetting
325328
extern const ServerSettingsUInt64 max_local_write_bandwidth_for_server;
326329
extern const ServerSettingsUInt64 max_merges_bandwidth_for_server;
327330
extern const ServerSettingsUInt64 max_mutations_bandwidth_for_server;
331+
extern const ServerSettingsUInt64 max_exports_bandwidth_for_server;
328332
extern const ServerSettingsUInt64 max_remote_read_network_bandwidth_for_server;
329333
extern const ServerSettingsUInt64 max_remote_write_network_bandwidth_for_server;
330334
extern const ServerSettingsUInt64 max_replicated_fetches_network_bandwidth_for_server;
@@ -504,6 +508,7 @@ struct ContextSharedPart : boost::noncopyable
504508
GlobalOvercommitTracker global_overcommit_tracker;
505509
MergeList merge_list; /// The list of executable merge (for (Replicated)?MergeTree)
506510
MovesList moves_list; /// The list of executing moves (for (Replicated)?MergeTree)
511+
ExportsList exports_list; /// The list of executing exports (for (Replicated)?MergeTree)
507512
ReplicatedFetchList replicated_fetch_list;
508513
RefreshSet refresh_set; /// The list of active refreshes (for MaterializedView)
509514
ConfigurationPtr users_config TSA_GUARDED_BY(mutex); /// Config with the users, profiles and quotas sections.
@@ -545,6 +550,8 @@ struct ContextSharedPart : boost::noncopyable
545550
mutable ThrottlerPtr mutations_throttler; /// A server-wide throttler for mutations
546551
mutable ThrottlerPtr merges_throttler; /// A server-wide throttler for merges
547552

553+
mutable ThrottlerPtr exports_throttler; /// A server-wide throttler for exports
554+
548555
MultiVersion<Macros> macros; /// Substitutions extracted from config.
549556
std::unique_ptr<DDLWorker> ddl_worker TSA_GUARDED_BY(mutex); /// Process ddl commands from zk.
550557
LoadTaskPtr ddl_worker_startup_task; /// To postpone `ddl_worker->startup()` after all tables startup
@@ -1055,6 +1062,9 @@ struct ContextSharedPart : boost::noncopyable
10551062

10561063
if (auto bandwidth = server_settings[ServerSetting::max_merges_bandwidth_for_server])
10571064
merges_throttler = std::make_shared<Throttler>(bandwidth, ProfileEvents::MergesThrottlerBytes, ProfileEvents::MergesThrottlerSleepMicroseconds);
1065+
1066+
if (auto bandwidth = server_settings[ServerSetting::max_exports_bandwidth_for_server])
1067+
exports_throttler = std::make_shared<Throttler>(bandwidth, ProfileEvents::ExportsThrottlerBytes, ProfileEvents::ExportsThrottlerSleepMicroseconds);
10581068
}
10591069
};
10601070

@@ -1212,6 +1222,8 @@ MergeList & Context::getMergeList() { return shared->merge_list; }
12121222
const MergeList & Context::getMergeList() const { return shared->merge_list; }
12131223
MovesList & Context::getMovesList() { return shared->moves_list; }
12141224
const MovesList & Context::getMovesList() const { return shared->moves_list; }
1225+
ExportsList & Context::getExportsList() { return shared->exports_list; }
1226+
const ExportsList & Context::getExportsList() const { return shared->exports_list; }
12151227
ReplicatedFetchList & Context::getReplicatedFetchList() { return shared->replicated_fetch_list; }
12161228
const ReplicatedFetchList & Context::getReplicatedFetchList() const { return shared->replicated_fetch_list; }
12171229
RefreshSet & Context::getRefreshSet() { return shared->refresh_set; }
@@ -4155,6 +4167,11 @@ ThrottlerPtr Context::getMergesThrottler() const
41554167
return shared->merges_throttler;
41564168
}
41574169

4170+
ThrottlerPtr Context::getExportsThrottler() const
4171+
{
4172+
return shared->exports_throttler;
4173+
}
4174+
41584175
void Context::reloadRemoteThrottlerConfig(size_t read_bandwidth, size_t write_bandwidth) const
41594176
{
41604177
if (read_bandwidth)

0 commit comments

Comments
 (0)