Skip to content

Commit 8c722c8

Browse files
visualYJDrock-git
authored andcommitted
[feat][store] Impl auto merge.
1 parent aa22e73 commit 8c722c8

18 files changed

+857
-9
lines changed

CMakeLists.txt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -515,6 +515,7 @@ file(GLOB_RECURSE LIBEXPR_SRCS ${PROJECT_SOURCE_DIR}/src/libexpr/src/*.cc)
515515
file(GLOB COPROCESSOR_SRCS ${PROJECT_SOURCE_DIR}/src/coprocessor/*.cc)
516516
file(GLOB CLIENT_SRCS ${PROJECT_SOURCE_DIR}/src/client/*.cc)
517517
file(GLOB CLIENT_V2_SRCS ${PROJECT_SOURCE_DIR}/src/client_v2/*.cc)
518+
file(GLOB MERGE_SRCS ${PROJECT_SOURCE_DIR}/src/merge/*.cc)
518519
if(WITH_DISKANN)
519520
file(GLOB DISKANN_SRCS ${PROJECT_SOURCE_DIR}/src/diskann/*.cc)
520521
endif()
@@ -605,7 +606,8 @@ add_library(
605606
${SERIAL_SRCS}
606607
${COPROCESSOR_SRCS}
607608
${LIBEXPR_SRCS}
608-
${DISKANN_SRCS})
609+
${DISKANN_SRCS}
610+
${MERGE_SRCS})
609611

610612
if(NOT WITH_DISKANN)
611613
list(REMOVE_ITEM DINGODB_OBJS ${DISKANN_SRCS})

conf/document.template.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,13 @@ region:
1717
region_max_size: 536870912 # 512MB
1818
enable_auto_split: true
1919
split_check_interval_s: 120
20+
enable_auto_merge: true
21+
merge_check_interval_s: 120
22+
region_merge_min_size: 1048576 # 1MB
23+
region_merge_min_keys_count: 10000
24+
merge_size_ratio: 0.2
25+
merge_keys_ratio: 0.2
26+
merge_check_concurrency: 3
2027
raft:
2128
listen_host: $RAFT_LISTEN_HOST$
2229
host: $RAFT_HOST$

conf/index.template.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,13 @@ region:
1717
region_max_size: 536870912 # 512MB
1818
enable_auto_split: true
1919
split_check_interval_s: 120
20+
enable_auto_merge: true
21+
merge_check_interval_s: 120
22+
region_merge_min_size: 1048576 # 1MB
23+
region_merge_min_keys_count: 10000
24+
merge_size_ratio: 0.2
25+
merge_keys_ratio: 0.2
26+
merge_check_concurrency: 3
2027
raft:
2128
listen_host: $RAFT_LISTEN_HOST$
2229
host: $RAFT_HOST$

conf/store.template.yaml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,14 @@ region:
1616
region_max_size: 268435456 # 256MB
1717
enable_auto_split: true
1818
split_check_interval_s: 120
19+
enable_auto_merge: true
20+
merge_check_interval_s: 120
21+
max_merge_region_size: 1048576 # 1MB
22+
max_merge_region_keys: 10000
23+
split_merge_interval: 3600 #1h
24+
merge_size_ratio: 0.2
25+
merge_keys_ratio: 0.2
26+
merge_check_concurrency: 3
1927
raft:
2028
listen_host: $RAFT_LISTEN_HOST$
2129
host: $RAFT_HOST$

src/common/constant.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,15 @@ class Constant {
193193
static constexpr uint32_t kSplitKeysNumberDefaultValue = 100000;
194194
static constexpr float kSplitKeysRatioDefaultValue = 0.5;
195195

196+
// merge region
197+
static constexpr uint32_t kAutoMergeRegionMaxSizeDefaultValue = 1048576; // 1M
198+
static constexpr uint32_t kAutoMergeRegionMaxKeysCountDefaultValue = 10000;
199+
static constexpr uint32_t kSplitMergeIntervalDefaultValue = 3600; // 1h
200+
static constexpr uint32_t kDefaultMergeCheckConcurrency = 3;
201+
static constexpr int64_t kRegionMetricsUpdateSecondDefaultValue = 60; // 60s
202+
static constexpr float kMergeRatioDefaultValue = 0.2;
203+
static constexpr float kMergeKeysRatioDefaultValue = 0.2;
204+
196205
static const int32_t kRaftLogFallBehindThreshold = 1000;
197206
static const int32_t kTransferLeaderRaftLogFallBehindThreshold = 16;
198207

src/common/helper.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1658,7 +1658,7 @@ bool Helper::ParallelRunTask(TaskFunctor task, void* arg, int concurrency) {
16581658
return true;
16591659
}
16601660

1661-
butil::Status Helper::ValidateRaftStatusForSplit(std::shared_ptr<pb::common::BRaftStatus> raft_status) { // NOLINT
1661+
butil::Status Helper::ValidateRaftStatusForSplitMerge(std::shared_ptr<pb::common::BRaftStatus> raft_status) { // NOLINT
16621662
if (raft_status == nullptr) {
16631663
return butil::Status(pb::error::EINTERNAL, "Get raft status failed.");
16641664
}

src/common/helper.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -341,8 +341,8 @@ class Helper {
341341
using TaskFunctor = void* (*)(void*);
342342
static bool ParallelRunTask(TaskFunctor task, void* arg, int concurrency);
343343

344-
// Validate raft status whether suitable or not region split.
345-
static butil::Status ValidateRaftStatusForSplit(std::shared_ptr<pb::common::BRaftStatus> raft_status);
344+
// Validate raft status whether suitable or not region split/merge.
345+
static butil::Status ValidateRaftStatusForSplitMerge(std::shared_ptr<pb::common::BRaftStatus> raft_status);
346346

347347
static butil::Status ParseRaftSnapshotRegionMeta(const std::string& snapshot_path,
348348
pb::store_internal::RaftSnapshotRegionMeta& meta);

src/config/config_helper.cc

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,76 @@ int64_t ConfigHelper::GetSplitCheckApproximateSize() {
5555
return static_cast<int64_t>(static_cast<double>(Constant::kDefaultSplitCheckApproximateSizeRatio) * region_max_size);
5656
}
5757

58+
int64_t ConfigHelper::GetMergeCheckSize() {
59+
auto config = ConfigManager::GetInstance().GetRoleConfig();
60+
if (config == nullptr) {
61+
return Constant::kAutoMergeRegionMaxSizeDefaultValue;
62+
}
63+
int64_t region_max_size = config->GetInt64("region.max_merge_region_size");
64+
if (region_max_size < Constant::kAutoMergeRegionMaxSizeDefaultValue) {
65+
region_max_size = Constant::kAutoMergeRegionMaxSizeDefaultValue;
66+
DINGO_LOG(WARNING) << fmt::format("[config] max_merge_region_size is too small, set default value({})",
67+
Constant::kAutoMergeRegionMaxSizeDefaultValue);
68+
}
69+
return region_max_size;
70+
}
71+
72+
int64_t ConfigHelper::GetSplitMergeInterval() {
73+
auto config = ConfigManager::GetInstance().GetRoleConfig();
74+
if (config == nullptr) {
75+
return Constant::kSplitMergeIntervalDefaultValue;
76+
}
77+
int64_t split_merge_interval = config->GetInt64("region.split_merge_interval");
78+
if (split_merge_interval < Constant::kSplitMergeIntervalDefaultValue) {
79+
split_merge_interval = Constant::kSplitMergeIntervalDefaultValue;
80+
DINGO_LOG(WARNING) << fmt::format("[config] split_merge_interval is too small, set default value({})",
81+
Constant::kSplitMergeIntervalDefaultValue);
82+
}
83+
return split_merge_interval;
84+
}
85+
86+
int64_t ConfigHelper::GetMergeCheckKeysCount() {
87+
auto config = ConfigManager::GetInstance().GetRoleConfig();
88+
if (config == nullptr) {
89+
return Constant::kAutoMergeRegionMaxKeysCountDefaultValue;
90+
}
91+
int64_t region_max_keys_count = config->GetInt64("region.max_merge_region_keys");
92+
if (region_max_keys_count < Constant::kAutoMergeRegionMaxKeysCountDefaultValue) {
93+
region_max_keys_count = Constant::kAutoMergeRegionMaxKeysCountDefaultValue;
94+
DINGO_LOG(WARNING) << fmt::format("[config] max_merge_region_keys is too small, set default value({})",
95+
Constant::kAutoMergeRegionMaxKeysCountDefaultValue);
96+
}
97+
return region_max_keys_count;
98+
}
99+
100+
float ConfigHelper::GetMergeSizeRatio() {
101+
auto config = ConfigManager::GetInstance().GetRoleConfig();
102+
if (config == nullptr) {
103+
return Constant::kMergeRatioDefaultValue;
104+
}
105+
float merge_ratio = static_cast<float>(config->GetDouble("region.merge_size_ratio"));
106+
if (merge_ratio < 0.1 || merge_ratio > 0.9) {
107+
merge_ratio = Constant::kMergeRatioDefaultValue;
108+
DINGO_LOG(WARNING) << fmt::format("[config] merge_size_ratio out of range, set default value({})",
109+
Constant::kMergeRatioDefaultValue);
110+
}
111+
return merge_ratio;
112+
}
113+
114+
float ConfigHelper::GetMergeKeysRatio() {
115+
auto config = ConfigManager::GetInstance().GetRoleConfig();
116+
if (config == nullptr) {
117+
return Constant::kMergeKeysRatioDefaultValue;
118+
}
119+
float merge_ratio = static_cast<float>(config->GetDouble("region.merge_keys_ratio"));
120+
if (merge_ratio < 0.1 || merge_ratio > 0.9) {
121+
merge_ratio = Constant::kMergeKeysRatioDefaultValue;
122+
DINGO_LOG(WARNING) << fmt::format("[config] split_size_ratio out of range, set default value({})",
123+
Constant::kMergeKeysRatioDefaultValue);
124+
}
125+
return merge_ratio;
126+
}
127+
58128
std::string ConfigHelper::GetSplitPolicy() {
59129
auto config = ConfigManager::GetInstance().GetRoleConfig();
60130
if (config == nullptr) {

src/config/config_helper.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,11 @@ class ConfigHelper {
4949
static double GetWorkerThreadRatio();
5050
static int32_t GetRaftWorkerThreadNum();
5151
static double GetRaftWorkerThreadRatio();
52+
static int64_t GetMergeCheckSize();
53+
static int64_t GetMergeCheckKeysCount();
54+
static int64_t GetSplitMergeInterval();
55+
static float GetMergeSizeRatio();
56+
static float GetMergeKeysRatio();
5257
};
5358

5459
} // namespace dingodb

src/handler/raft_apply_handler.cc

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,10 @@ bool HandlePreCreateRegionSplit(const pb::raft::SplitRequest &request, store::Re
298298
store_region_meta->UpdateState(to_region, pb::common::StoreRegionState::SPLITTING);
299299
store_region_meta->UpdateState(from_region, pb::common::StoreRegionState::SPLITTING);
300300

301+
// set last split time
302+
to_region->UpdateLastSplitTimestamp();
303+
from_region->UpdateLastSplitTimestamp();
304+
301305
// set child region version/range/state
302306
store_region_meta->UpdateEpochVersionAndRange(to_region, to_region->Epoch().version() + 1, to_range, "split child");
303307

@@ -541,6 +545,10 @@ bool HandlePostCreateRegionSplit(const pb::raft::SplitRequest &request, store::R
541545
// store_region_meta->UpdateEpochVersionAndRange(child_region, child_region->Epoch().version() + 1, child_range,
542546
// "child parent");
543547

548+
// set last split time
549+
parent_region->UpdateLastSplitTimestamp();
550+
child_region->UpdateLastSplitTimestamp();
551+
544552
DINGO_LOG(INFO) << fmt::format(
545553
"[split.spliting][job_id({}).region({}->{})] splited, child region({}/{}) parent region({}/{})", request.job_id(),
546554
parent_region->Id(), child_region->Id(), child_region->EpochToString(), child_region->RangeToString(),
@@ -1494,8 +1502,7 @@ int DocumentDeleteHandler::Handle(std::shared_ptr<Context> ctx, store::RegionPtr
14941502
auto status = document_index_wrapper->Delete(Helper::PbRepeatedToVector(request.ids()));
14951503
if (tracker) tracker->SetDocumentIndexWriteTime(Helper::TimestampNs() - start_time);
14961504
DINGO_LOG(DEBUG) << fmt::format("[raft.apply][region({})] delete document, count: {} cost: {}ns",
1497-
document_index_id, request.ids().size(),
1498-
Helper::TimestampNs() - start_time);
1505+
document_index_id, request.ids().size(), Helper::TimestampNs() - start_time);
14991506
if (status.ok()) {
15001507
if (region->GetStoreEngineType() == pb::common::STORE_ENG_RAFT_STORE && log_id != INT64_MAX) {
15011508
document_index_wrapper->SetApplyLogId(log_id);

0 commit comments

Comments
 (0)