Skip to content

Commit bada40f

Browse files
committed
put the background threads, kill operation and system table behind server experimental setting
1 parent 147d21c commit bada40f

File tree

10 files changed

+105
-25
lines changed

10 files changed

+105
-25
lines changed

src/Core/ServerSettings.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1144,6 +1144,7 @@ The policy on how to perform a scheduling of CPU slots specified by `concurrent_
11441144
DECLARE(UInt64, object_storage_list_objects_cache_max_entries, 1000, "Maximum size of ObjectStorage list objects cache in entries. Zero means disabled.", 0) \
11451145
DECLARE(UInt64, object_storage_list_objects_cache_ttl, 3600, "Time to live of records in ObjectStorage list objects cache in seconds. Zero means unlimited", 0) \
11461146
DECLARE(UInt64, input_format_parquet_metadata_cache_max_size, 500000000, "Maximum size of parquet file metadata cache", 0) \
1147+
DECLARE(Bool, enable_experimental_export_merge_tree_partition_feature, false, "Enable export replicated merge tree partition feature. It is experimental and not yet ready for production use.", 0) \
11471148
// clang-format on
11481149

11491150
/// If you add a setting which can be updated at runtime, please update 'changeable_settings' map in dumpToSystemServerSettingsColumns below

src/Interpreters/InterpreterKillQueryQuery.cpp

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include <DataTypes/DataTypeString.h>
2020
#include <Columns/ColumnsNumber.h>
2121
#include <DataTypes/DataTypesNumber.h>
22+
#include <Core/ServerSettings.h>
2223
#include <Processors/Sources/SourceFromSingleChunk.h>
2324
#include <Processors/ISource.h>
2425
#include <Processors/Executors/PullingPipelineExecutor.h>
@@ -37,11 +38,17 @@ namespace Setting
3738
extern const SettingsUInt64 max_parser_depth;
3839
}
3940

41+
namespace ServerSetting
42+
{
43+
extern const ServerSettingsBool enable_experimental_export_merge_tree_partition_feature;
44+
}
45+
4046
namespace ErrorCodes
4147
{
4248
extern const int LOGICAL_ERROR;
4349
extern const int ACCESS_DENIED;
4450
extern const int NOT_IMPLEMENTED;
51+
extern const int SUPPORT_IS_DISABLED;
4552
}
4653

4754

@@ -252,6 +259,12 @@ BlockIO InterpreterKillQueryQuery::execute()
252259
}
253260
case ASTKillQueryQuery::Type::ExportPartition:
254261
{
262+
if (!getContext()->getServerSettings()[ServerSetting::enable_experimental_export_merge_tree_partition_feature])
263+
{
264+
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED,
265+
"Exporting merge tree partition is experimental. Set the server setting `enable_experimental_export_merge_tree_partition_feature` to enable it");
266+
}
267+
255268
Block exports_block = getSelectResult(
256269
"source_database, source_table, transaction_id, destination_database, destination_table, partition_id",
257270
"system.replicated_partition_exports");

src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
#include <Core/BackgroundSchedulePool.h>
1212
#include <Core/ServerUUID.h>
1313
#include <boost/algorithm/string/replace.hpp>
14+
#include <Core/ServerSettings.h>
1415

1516

1617
namespace CurrentMetrics
@@ -27,6 +28,11 @@ namespace MergeTreeSetting
2728
extern const MergeTreeSettingsSeconds zookeeper_session_expiration_check_period;
2829
}
2930

31+
namespace ServerSetting
32+
{
33+
extern const ServerSettingsBool enable_experimental_export_merge_tree_partition_feature;
34+
}
35+
3036
namespace ErrorCodes
3137
{
3238
extern const int REPLICA_IS_ALREADY_ACTIVE;
@@ -171,12 +177,18 @@ bool ReplicatedMergeTreeRestartingThread::runImpl()
171177
storage.mutations_updating_task->activateAndSchedule();
172178
storage.mutations_finalizing_task->activateAndSchedule();
173179
storage.merge_selecting_task->activateAndSchedule();
180+
181+
if (storage.getContext()->getServerSettings()[ServerSetting::enable_experimental_export_merge_tree_partition_feature])
182+
{
183+
storage.export_merge_tree_partition_updating_task->activateAndSchedule();
184+
storage.export_merge_tree_partition_select_task->activateAndSchedule();
185+
storage.export_merge_tree_partition_status_handling_task->activateAndSchedule();
186+
}
187+
174188
storage.cleanup_thread.start();
175189
storage.async_block_ids_cache.start();
176190
storage.part_check_thread.start();
177-
storage.export_merge_tree_partition_updating_task->activateAndSchedule();
178-
storage.export_merge_tree_partition_select_task->activateAndSchedule();
179-
storage.export_merge_tree_partition_status_handling_task->activateAndSchedule();
191+
180192

181193
LOG_DEBUG(log, "Table started successfully");
182194
return true;

src/Storages/StorageReplicatedMergeTree.cpp

Lines changed: 31 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,11 @@ namespace ErrorCodes
304304
extern const int INVALID_SETTING_VALUE;
305305
}
306306

307+
namespace ServerSetting
308+
{
309+
extern const ServerSettingsBool enable_experimental_export_merge_tree_partition_feature;
310+
}
311+
307312
namespace ActionLocks
308313
{
309314
extern const StorageActionBlockType PartsMerge;
@@ -480,26 +485,30 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
480485
/// Will be activated by restarting thread.
481486
mutations_finalizing_task->deactivate();
482487

483-
export_merge_tree_partition_manifest_updater = std::make_shared<ExportPartitionManifestUpdatingTask>(*this);
488+
if (getContext()->getServerSettings()[ServerSetting::enable_experimental_export_merge_tree_partition_feature])
489+
{
490+
export_merge_tree_partition_manifest_updater = std::make_shared<ExportPartitionManifestUpdatingTask>(*this);
491+
492+
export_merge_tree_partition_task_scheduler = std::make_shared<ExportPartitionTaskScheduler>(*this);
484493

485-
export_merge_tree_partition_task_scheduler = std::make_shared<ExportPartitionTaskScheduler>(*this);
494+
export_merge_tree_partition_updating_task = getContext()->getSchedulePool().createTask(
495+
getStorageID().getFullTableName() + " (StorageReplicatedMergeTree::export_merge_tree_partition_updating_task)", [this] { exportMergeTreePartitionUpdatingTask(); });
486496

487-
export_merge_tree_partition_updating_task = getContext()->getSchedulePool().createTask(
488-
getStorageID().getFullTableName() + " (StorageReplicatedMergeTree::export_merge_tree_partition_updating_task)", [this] { exportMergeTreePartitionUpdatingTask(); });
497+
export_merge_tree_partition_updating_task->deactivate();
489498

490-
export_merge_tree_partition_updating_task->deactivate();
499+
export_merge_tree_partition_status_handling_task = getContext()->getSchedulePool().createTask(
500+
getStorageID().getFullTableName() + " (StorageReplicatedMergeTree::export_merge_tree_partition_status_handling_task)", [this] { exportMergeTreePartitionStatusHandlingTask(); });
491501

492-
export_merge_tree_partition_status_handling_task = getContext()->getSchedulePool().createTask(
493-
getStorageID().getFullTableName() + " (StorageReplicatedMergeTree::export_merge_tree_partition_status_handling_task)", [this] { exportMergeTreePartitionStatusHandlingTask(); });
502+
export_merge_tree_partition_status_handling_task->deactivate();
494503

495-
export_merge_tree_partition_status_handling_task->deactivate();
504+
export_merge_tree_partition_watch_callback = std::make_shared<Coordination::WatchCallback>(export_merge_tree_partition_updating_task->getWatchCallback());
496505

497-
export_merge_tree_partition_watch_callback = std::make_shared<Coordination::WatchCallback>(export_merge_tree_partition_updating_task->getWatchCallback());
498-
499-
export_merge_tree_partition_select_task = getContext()->getSchedulePool().createTask(
500-
getStorageID().getFullTableName() + " (StorageReplicatedMergeTree::export_merge_tree_partition_select_task)", [this] { selectPartsToExport(); });
506+
export_merge_tree_partition_select_task = getContext()->getSchedulePool().createTask(
507+
getStorageID().getFullTableName() + " (StorageReplicatedMergeTree::export_merge_tree_partition_select_task)", [this] { selectPartsToExport(); });
508+
509+
export_merge_tree_partition_select_task->deactivate();
510+
}
501511

502-
export_merge_tree_partition_select_task->deactivate();
503512

504513
bool has_zookeeper = getContext()->hasZooKeeper() || getContext()->hasAuxiliaryZooKeeper(zookeeper_info.zookeeper_name);
505514
if (has_zookeeper)
@@ -5923,9 +5932,13 @@ void StorageReplicatedMergeTree::partialShutdown()
59235932
queue_updating_task->deactivate();
59245933
mutations_updating_task->deactivate();
59255934
mutations_finalizing_task->deactivate();
5926-
export_merge_tree_partition_updating_task->deactivate();
5927-
export_merge_tree_partition_select_task->deactivate();
5928-
export_merge_tree_partition_status_handling_task->deactivate();
5935+
5936+
if (getContext()->getServerSettings()[ServerSetting::enable_experimental_export_merge_tree_partition_feature])
5937+
{
5938+
export_merge_tree_partition_updating_task->deactivate();
5939+
export_merge_tree_partition_select_task->deactivate();
5940+
export_merge_tree_partition_status_handling_task->deactivate();
5941+
}
59295942

59305943
cleanup_thread.stop();
59315944
async_block_ids_cache.stop();
@@ -8073,10 +8086,10 @@ void StorageReplicatedMergeTree::fetchPartition(
80738086

80748087
void StorageReplicatedMergeTree::exportPartitionToTable(const PartitionCommand & command, ContextPtr query_context)
80758088
{
8076-
if (!query_context->getSettingsRef()[Setting::allow_experimental_export_merge_tree_part])
8089+
if (!query_context->getServerSettings()[ServerSetting::enable_experimental_export_merge_tree_partition_feature])
80778090
{
80788091
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED,
8079-
"Exporting merge tree part is experimental. Set `allow_experimental_export_merge_tree_part` to enable it");
8092+
"Exporting merge tree partition is experimental. Set the server setting `enable_experimental_export_merge_tree_partition_feature` to enable it");
80808093
}
80818094

80828095
const auto dest_database = query_context->resolveDatabase(command.to_database);

src/Storages/System/attachSystemTables.cpp

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
#include <Databases/IDatabase.h>
66
#include <Storages/System/attachSystemTables.h>
77
#include <Storages/System/attachSystemTablesImpl.h>
8-
8+
#include <Core/ServerSettings.h>
99
#include <Storages/System/StorageSystemAggregateFunctionCombinators.h>
1010
#include <Storages/System/StorageSystemAsynchronousMetrics.h>
1111
#include <Storages/System/StorageSystemAsyncLoader.h>
@@ -130,6 +130,11 @@
130130
namespace DB
131131
{
132132

133+
namespace ServerSetting
134+
{
135+
extern const ServerSettingsBool enable_experimental_export_merge_tree_partition_feature;
136+
}
137+
133138
void attachSystemTablesServer(ContextPtr context, IDatabase & system_database, bool has_zookeeper)
134139
{
135140
attachNoDescription<StorageSystemOne>(context, system_database, "one", "This table contains a single row with a single dummy UInt8 column containing the value 0. Used when the table is not specified explicitly, for example in queries like `SELECT 1`.");
@@ -212,7 +217,10 @@ void attachSystemTablesServer(ContextPtr context, IDatabase & system_database, b
212217
attach<StorageSystemMerges>(context, system_database, "merges", "Contains a list of merges currently executing merges of MergeTree tables and their progress. Each merge operation is represented by a single row.");
213218
attach<StorageSystemMoves>(context, system_database, "moves", "Contains information about in-progress data part moves of MergeTree tables. Each data part movement is represented by a single row.");
214219
attach<StorageSystemExports>(context, system_database, "exports", "Contains a list of exports currently executing exports of MergeTree tables and their progress. Each export operation is represented by a single row.");
215-
attach<StorageSystemReplicatedPartitionExports>(context, system_database, "replicated_partition_exports", "Contains a list of partition exports of ReplicatedMergeTree tables and their progress. Each export operation is represented by a single row.");
220+
if (context->getServerSettings()[ServerSetting::enable_experimental_export_merge_tree_partition_feature])
221+
{
222+
attach<StorageSystemReplicatedPartitionExports>(context, system_database, "replicated_partition_exports", "Contains a list of partition exports of ReplicatedMergeTree tables and their progress. Each export operation is represented by a single row.");
223+
}
216224
attach<StorageSystemMutations>(context, system_database, "mutations", "Contains a list of mutations and their progress. Each mutation command is represented by a single row.");
217225
attachNoDescription<StorageSystemReplicas>(context, system_database, "replicas", "Contains information and status of all table replicas on current server. Each replica is represented by a single row.");
218226
attach<StorageSystemReplicationQueue>(context, system_database, "replication_queue", "Contains information about tasks from replication queues stored in ClickHouse Keeper, or ZooKeeper, for each table replica.");
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
<clickhouse>
2+
<enable_export_merge_tree_partition_feature>0</enable_export_merge_tree_partition_feature>
3+
</clickhouse>

tests/config/install.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ ln -sf $SRC_PATH/config.d/blob_storage_log.xml $DEST_SERVER_PATH/config.d/
8585
ln -sf $SRC_PATH/config.d/custom_settings_prefixes.xml $DEST_SERVER_PATH/config.d/
8686
ln -sf $SRC_PATH/config.d/database_catalog_drop_table_concurrency.xml $DEST_SERVER_PATH/config.d/
8787
ln -sf $SRC_PATH/config.d/enable_access_control_improvements.xml $DEST_SERVER_PATH/config.d/
88+
ln -sf $SRC_PATH/config.d/enable_experimental_export_merge_tree_partition.xml $DEST_SERVER_PATH/config.d/
8889
ln -sf $SRC_PATH/config.d/macros.xml $DEST_SERVER_PATH/config.d/
8990
ln -sf $SRC_PATH/config.d/secure_ports.xml $DEST_SERVER_PATH/config.d/
9091
ln -sf $SRC_PATH/config.d/clusters.xml $DEST_SERVER_PATH/config.d/
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
<clickhouse>
2+
<enable_experimental_export_merge_tree_partition_feature>1</enable_experimental_export_merge_tree_partition_feature>
3+
</clickhouse>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
<clickhouse>
2+
<enable_experimental_export_merge_tree_partition_feature>0</enable_experimental_export_merge_tree_partition_feature>
3+
</clickhouse>

tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,15 @@ def cluster():
1515
cluster = ClickHouseCluster(__file__)
1616
cluster.add_instance(
1717
"replica1",
18-
main_configs=["configs/named_collections.xml"],
18+
main_configs=["configs/named_collections.xml", "configs/allow_experimental_export_partition.xml"],
1919
user_configs=["configs/users.d/profile.xml"],
2020
with_minio=True,
2121
stay_alive=True,
2222
with_zookeeper=True,
2323
)
2424
cluster.add_instance(
2525
"replica2",
26-
main_configs=["configs/named_collections.xml"],
26+
main_configs=["configs/named_collections.xml", "configs/allow_experimental_export_partition.xml"],
2727
user_configs=["configs/users.d/profile.xml"],
2828
with_minio=True,
2929
stay_alive=True,
@@ -36,6 +36,14 @@ def cluster():
3636
user_configs=[],
3737
with_minio=True,
3838
)
39+
cluster.add_instance(
40+
"replica_with_export_disabled",
41+
main_configs=["configs/named_collections.xml", "configs/disable_experimental_export_partition.xml"],
42+
user_configs=["configs/users.d/profile.xml"],
43+
with_minio=True,
44+
stay_alive=True,
45+
with_zookeeper=True,
46+
)
3947
logging.info("Starting cluster...")
4048
cluster.start()
4149
yield cluster
@@ -538,6 +546,21 @@ def test_export_partition_file_already_exists_policy(cluster):
538546
) == "FAILED\n", "Export should be marked as FAILED"
539547

540548

549+
def test_export_partition_feature_is_disabled(cluster):
550+
replica_with_export_disabled = cluster.instances["replica_with_export_disabled"]
551+
552+
mt_table = "export_partition_feature_is_disabled_mt_table"
553+
s3_table = "export_partition_feature_is_disabled_s3_table"
554+
555+
create_tables_and_insert_data(replica_with_export_disabled, mt_table, s3_table, "replica1")
556+
557+
error = replica_with_export_disabled.query_and_get_error(f"ALTER TABLE {mt_table} EXPORT PARTITION ID '2020' TO TABLE {s3_table} SETTINGS allow_experimental_export_merge_tree_part=1;")
558+
assert "experimental" in error, "Expected error about disabled feature"
559+
560+
# make sure kill operation also throws
561+
error = replica_with_export_disabled.query_and_get_error(f"KILL EXPORT PARTITION WHERE partition_id = '2020' and source_table = '{mt_table}' and destination_table = '{s3_table}'")
562+
assert "experimental" in error, "Expected error about disabled feature"
563+
541564
# def test_source_mutations_during_export_snapshot(cluster):
542565
# node = cluster.instances["replica1"]
543566

0 commit comments

Comments
 (0)