Skip to content

Commit 7ad42a2

Browse files
author
wuxianrong
committed
braft Performance optimization
1 parent 720c150 commit 7ad42a2

File tree

10 files changed

+577
-86
lines changed

10 files changed

+577
-86
lines changed

conf/pika.conf

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -656,11 +656,35 @@ raft-peers :
656656
# Raft election timeout in milliseconds
657657
# This is the time to wait before starting a new election if no heartbeat is received
658658
# A larger value reduces the chance of unnecessary elections but increases failover time
659-
# Default value is 1000ms (1 second)
660-
raft-election-timeout-ms : 1000
659+
# Optimized default: 300ms (for faster failover, was 1000ms)
660+
# Range: 100-5000ms, Recommended: 300-500ms for LAN
661+
raft-election-timeout-ms : 300
661662

662663
# Raft snapshot interval in seconds
663664
# This determines how often Raft takes snapshots of the state machine
664665
# Snapshots are used to compact the log and speed up node recovery
665666
# Default value is 3600 seconds (1 hour)
666667
raft-snapshot-interval-s : 3600
668+
669+
## Raft Performance Tuning (批处理性能优化)
670+
671+
# Enable request batching for better throughput
672+
# When enabled, multiple client requests are batched into a single Raft log entry
673+
# This significantly reduces Raft consensus overhead
674+
# Default: yes (enabled for optimal performance)
675+
raft-enable-batching : yes
676+
677+
# Batch timeout in milliseconds
678+
# How long to wait before flushing a batch (if max-batch-size not reached)
679+
# Lower values = lower latency but less batching benefit
680+
# Higher values = more batching but higher P99 latency
681+
# Range: 1-10ms, Recommended: 2-5ms
682+
# Default: 2ms
683+
raft-batch-timeout-ms : 2
684+
685+
# Maximum number of requests in a single batch
686+
# When this limit is reached, the batch is flushed immediately
687+
# Higher values = more batching but higher memory usage
688+
# Range: 100-10000, Recommended: 500-2000
689+
# Default: 1000
690+
raft-max-batch-size : 1000

include/pika_conf.h

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -906,6 +906,20 @@ class PikaConf : public pstd::BaseConf {
906906
return raft_snapshot_interval_s_;
907907
}
908908

909+
// Raft 性能优化相关配置
910+
bool raft_enable_batching() {
911+
std::shared_lock l(rwlock_);
912+
return raft_enable_batching_;
913+
}
914+
int raft_batch_timeout_ms() {
915+
std::shared_lock l(rwlock_);
916+
return raft_batch_timeout_ms_;
917+
}
918+
int raft_max_batch_size() {
919+
std::shared_lock l(rwlock_);
920+
return raft_max_batch_size_;
921+
}
922+
909923
int Load();
910924
int ConfigRewrite();
911925
int ConfigRewriteSlaveOf();
@@ -1093,8 +1107,13 @@ class PikaConf : public pstd::BaseConf {
10931107
bool raft_enabled_ = false;
10941108
std::string raft_group_id_;
10951109
std::string raft_peers_;
1096-
int raft_election_timeout_ms_ = 1000;
1110+
int raft_election_timeout_ms_ = 300; // 优化后的默认值:300ms
10971111
int raft_snapshot_interval_s_ = 3600;
1112+
1113+
// Raft 性能优化配置
1114+
bool raft_enable_batching_ = true; // 启用批处理(默认开启)
1115+
int raft_batch_timeout_ms_ = 2; // 批处理超时:2ms
1116+
int raft_max_batch_size_ = 1000; // 最大批量大小:1000
10981117
};
10991118

11001119
#endif

src/pika_conf.cc

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -717,13 +717,26 @@ int PikaConf::Load() {
717717

718718
GetConfInt("raft-election-timeout-ms", &raft_election_timeout_ms_);
719719
if (raft_election_timeout_ms_ <= 0) {
720-
raft_election_timeout_ms_ = 1000;
720+
raft_election_timeout_ms_ = 300; // 优化后的默认值
721721
}
722722

723723
GetConfInt("raft-snapshot-interval-s", &raft_snapshot_interval_s_);
724724
if (raft_snapshot_interval_s_ <= 0) {
725725
raft_snapshot_interval_s_ = 3600;
726726
}
727+
728+
// Raft 性能优化配置
729+
GetConfBool("raft-enable-batching", &raft_enable_batching_);
730+
731+
GetConfInt("raft-batch-timeout-ms", &raft_batch_timeout_ms_);
732+
if (raft_batch_timeout_ms_ <= 0) {
733+
raft_batch_timeout_ms_ = 2; // 默认 2ms
734+
}
735+
736+
GetConfInt("raft-max-batch-size", &raft_max_batch_size_);
737+
if (raft_max_batch_size_ <= 0) {
738+
raft_max_batch_size_ = 1000; // 默认 1000
739+
}
727740

728741
return ret;
729742
}

src/praft/CMakeLists.txt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,11 @@ add_dependencies(binlog_pb protobuf)
2121
# Collect all source files (excluding binlog.proto which is in binlog_pb)
2222
aux_source_directory(./src DIR_SRCS)
2323

24+
# Add batch_manager source files explicitly
25+
set(DIR_SRCS ${DIR_SRCS}
26+
${CMAKE_CURRENT_SOURCE_DIR}/batch_manager.cc
27+
)
28+
2429
# Create static library (WITHOUT binlog protobuf sources, link binlog_pb instead)
2530
add_library(praft STATIC ${DIR_SRCS})
2631

src/praft/batch_manager.cc

Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
// Copyright (c) 2024-present, Qihoo, Inc. All rights reserved.
2+
// This source code is licensed under the BSD-style license found in the
3+
// LICENSE file in the root directory of this source tree.
4+
5+
#include "batch_manager.h"
6+
#include <glog/logging.h>
7+
#include "pstd/include/pstd_defer.h"
8+
9+
namespace pika_raft {
10+
11+
// BatchClosure 实现
12+
void BatchClosure::Run() {
13+
std::unique_ptr<BatchClosure> self_guard(this);
14+
15+
// 根据 Raft 结果通知所有 promise
16+
if (status().ok()) {
17+
for (auto& promise : promises_) {
18+
if (promise) {
19+
promise->set_value(rocksdb::Status::OK());
20+
}
21+
}
22+
} else {
23+
rocksdb::Status error_status = rocksdb::Status::IOError(status().error_str());
24+
for (auto& promise : promises_) {
25+
if (promise) {
26+
promise->set_value(error_status);
27+
}
28+
}
29+
}
30+
}
31+
32+
// BatchManager 实现
33+
BatchManager::BatchManager(braft::Node* raft_node, const BatchConfig& config)
34+
: raft_node_(raft_node), config_(config) {
35+
pending_items_.reserve(config_.max_batch_size);
36+
}
37+
38+
BatchManager::~BatchManager() {
39+
Stop();
40+
}
41+
42+
void BatchManager::Submit(const std::string& log_data,
43+
std::shared_ptr<std::promise<rocksdb::Status>> promise) {
44+
if (!config_.enable_batching) {
45+
// 批处理未启用,直接提交
46+
auto* closure = new BatchClosure({promise});
47+
braft::Task task;
48+
butil::IOBuf buf;
49+
buf.append(log_data);
50+
task.data = &buf;
51+
task.done = closure;
52+
raft_node_->apply(task);
53+
return;
54+
}
55+
56+
std::lock_guard<std::mutex> lock(mutex_);
57+
58+
BatchItem item;
59+
item.log_data = log_data;
60+
item.promise = promise;
61+
item.submit_time_us = butil::gettimeofday_us();
62+
63+
pending_items_.push_back(std::move(item));
64+
total_requests_.fetch_add(1, std::memory_order_relaxed);
65+
66+
// 如果队列满了,立即通知工作线程
67+
if (pending_items_.size() >= config_.max_batch_size) {
68+
cv_.notify_one();
69+
}
70+
}
71+
72+
void BatchManager::Start() {
73+
if (running_.exchange(true)) {
74+
return; // 已经在运行
75+
}
76+
77+
batch_thread_ = std::thread([this]() { BatchWorker(); });
78+
LOG(INFO) << "BatchManager started with config: timeout=" << config_.batch_timeout_ms
79+
<< "ms, max_batch_size=" << config_.max_batch_size;
80+
}
81+
82+
void BatchManager::Stop() {
83+
if (!running_.exchange(false)) {
84+
return; // 已经停止
85+
}
86+
87+
cv_.notify_one();
88+
if (batch_thread_.joinable()) {
89+
batch_thread_.join();
90+
}
91+
92+
LOG(INFO) << "BatchManager stopped. Total requests: " << total_requests_.load()
93+
<< ", Total batches: " << total_batches_.load();
94+
}
95+
96+
void BatchManager::BatchWorker() {
97+
LOG(INFO) << "BatchManager worker thread started";
98+
99+
while (running_.load(std::memory_order_relaxed)) {
100+
std::unique_lock<std::mutex> lock(mutex_);
101+
102+
// 等待有数据或超时
103+
cv_.wait_for(lock, std::chrono::milliseconds(config_.batch_timeout_ms),
104+
[this]() {
105+
return !pending_items_.empty() ||
106+
!running_.load(std::memory_order_relaxed);
107+
});
108+
109+
if (!running_.load(std::memory_order_relaxed)) {
110+
// 处理剩余请求
111+
if (!pending_items_.empty()) {
112+
FlushBatch();
113+
}
114+
break;
115+
}
116+
117+
if (!pending_items_.empty()) {
118+
FlushBatch();
119+
}
120+
}
121+
122+
LOG(INFO) << "BatchManager worker thread stopped";
123+
}
124+
125+
void BatchManager::FlushBatch() {
126+
// 假设调用者已持有锁
127+
128+
if (pending_items_.empty()) {
129+
return;
130+
}
131+
132+
size_t batch_size = pending_items_.size();
133+
134+
// 合并所有日志数据
135+
// 格式:<count>\n<log1_len>\n<log1_data><log2_len>\n<log2_data>...
136+
std::string batch_data;
137+
batch_data.reserve(batch_size * 100); // 预估大小
138+
139+
// 写入批次大小
140+
batch_data.append(std::to_string(batch_size));
141+
batch_data.append("\n");
142+
143+
// 收集所有 promise
144+
std::vector<std::shared_ptr<std::promise<rocksdb::Status>>> promises;
145+
promises.reserve(batch_size);
146+
147+
for (auto& item : pending_items_) {
148+
// 写入单个日志长度和数据
149+
batch_data.append(std::to_string(item.log_data.size()));
150+
batch_data.append("\n");
151+
batch_data.append(item.log_data);
152+
153+
promises.push_back(std::move(item.promise));
154+
}
155+
156+
pending_items_.clear();
157+
158+
// 创建批量 closure
159+
auto* closure = new BatchClosure(std::move(promises));
160+
161+
// 提交到 Raft
162+
braft::Task task;
163+
butil::IOBuf buf;
164+
buf.append(batch_data);
165+
task.data = &buf;
166+
task.done = closure;
167+
168+
raft_node_->apply(task);
169+
170+
total_batches_.fetch_add(1, std::memory_order_relaxed);
171+
172+
VLOG(1) << "Flushed batch of " << batch_size << " requests to Raft";
173+
}
174+
175+
BatchManager::Stats BatchManager::GetStats() const {
176+
Stats stats;
177+
stats.total_requests = total_requests_.load(std::memory_order_relaxed);
178+
stats.total_batches = total_batches_.load(std::memory_order_relaxed);
179+
180+
if (stats.total_batches > 0) {
181+
stats.avg_batch_size = static_cast<double>(stats.total_requests) / stats.total_batches;
182+
}
183+
184+
std::lock_guard<std::mutex> lock(const_cast<std::mutex&>(mutex_));
185+
stats.pending_count = pending_items_.size();
186+
187+
return stats;
188+
}
189+
190+
} // namespace pika_raft
191+

0 commit comments

Comments
 (0)