Skip to content

Commit edec48b

Browse files
author
wuxianrong
committed
Optimize the log submission using the proto method
1 parent d93216f commit edec48b

File tree

14 files changed

+247
-740
lines changed

14 files changed

+247
-740
lines changed

include/pika_raft.h

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -104,10 +104,5 @@ class RaftConfigCmd : public Cmd {
104104
std::string db_name_;
105105
};
106106

107-
// Thread-local flag to detect if we're in on_apply context
108-
namespace pika_raft {
109-
extern thread_local bool g_in_raft_apply;
110-
}
111-
112107
#endif // PIKA_RAFT_H_
113108

src/pika_command.cc

Lines changed: 5 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,6 @@
66
#include <memory>
77
#include <sstream>
88
#include <utility>
9-
#include <future>
10-
#include <chrono>
119

1210
#include <glog/logging.h>
1311
#include "include/pika_acl.h"
@@ -897,17 +895,16 @@ void Cmd::InternalProcessCommand(const HintKeys& hint_keys) {
897895

898896
uint64_t before_do_binlog_us = pstd::NowMicros();
899897
this->command_duration_ms = (before_do_binlog_us - before_do_command_us) / 1000;
900-
901-
// Release locks BEFORE calling DoBinlog() to avoid deadlock in Raft mode
902-
// In Raft mode, DoBinlog() will wait for on_apply which needs the same lock
898+
DoBinlog();
899+
900+
903901
if (!IsSuspend()) {
904902
db_->DBUnlockShared();
905903
}
906904
if (is_write()) {
907905
record_lock.Unlock(current_key());
908906
}
909907

910-
DoBinlog();
911908

912909
uint64_t end_us = pstd::NowMicros();
913910
this->binlog_duration_ms = (end_us - before_do_binlog_us) / 1000;
@@ -963,52 +960,8 @@ bool Cmd::DoReadCommandInCache() {
963960

964961

965962
void Cmd::DoBinlog() {
966-
// Skip binlog entirely if we're in on_apply context
967-
if (pika_raft::g_in_raft_apply) {
968-
return;
969-
}
970-
971-
// Check if Raft is enabled
972-
auto raft_manager = g_pika_server->GetRaftManager();
973-
if (raft_manager && is_write() && res().ok()) {
974-
// Plan A: Submit command to Raft using Redis protocol
975-
// Skip if we're already in on_apply context (avoid recursion)
976-
std::string redis_proto = ToRedisProtocol();
977-
978-
// Prepend db_name to Redis protocol for extraction in on_apply
979-
std::string log_data = db_name_ + "|" + redis_proto;
980-
981-
982-
// Create promise/future for synchronous waiting
983-
std::promise<rocksdb::Status> promise;
984-
auto future = promise.get_future();
985-
986-
// Submit to Raft
987-
auto status = raft_manager->SubmitCommandWithPromise(
988-
db_name_, log_data, std::move(promise));
989-
990-
if (!status.ok()) {
991-
LOG(ERROR) << "Failed to submit command to Raft: " << status.ToString();
992-
res_.SetRes(CmdRes::kErrOther, "Raft submit failed: " + status.ToString());
993-
return;
994-
}
995-
996-
// Wait for Raft to apply (with 10 second timeout)
997-
auto wait_status = future.wait_for(std::chrono::seconds(10));
998-
if (wait_status == std::future_status::timeout) {
999-
LOG(ERROR) << "Raft apply timeout for command: " << name_;
1000-
res_.SetRes(CmdRes::kErrOther, "Raft apply timeout");
1001-
return;
1002-
}
1003-
1004-
// Get the result
1005-
rocksdb::Status raft_result = future.get();
1006-
if (!raft_result.ok()) {
1007-
LOG(ERROR) << "Raft apply failed: " << raft_result.ToString();
1008-
res_.SetRes(CmdRes::kErrOther, "Raft apply failed: " + raft_result.ToString());
1009-
return;
1010-
}
1011-
963+
// 如果是 Raft 模式,跳过写 binlog(改用 Protobuf binlog)
964+
if (g_pika_server->GetRaftManager()) {
1012965
return;
1013966
}
1014967

src/pika_kv.cc

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,9 @@
1111

1212
#include "include/pika_cache.h"
1313
#include "include/pika_conf.h"
14-
#include "include/pika_raft.h"
15-
#include "include/pika_server.h"
1614
#include "include/pika_slot_command.h"
1715

1816
extern std::unique_ptr<PikaConf> g_pika_conf;
19-
extern PikaServer* g_pika_server;
2017
/* SET key value [NX] [XX] [EX <seconds>] [PX <milliseconds>] */
2118
void SetCmd::DoInitial() {
2219
if (!CheckArg(argv_.size())) {
@@ -68,17 +65,6 @@ void SetCmd::DoInitial() {
6865
}
6966

7067
void SetCmd::Do() {
71-
// Plan A: If Raft is enabled, skip actual write on first call
72-
// The write will happen when on_apply executes this command
73-
// Use thread-local flag to detect if we're in on_apply context
74-
if (g_pika_server->GetRaftManager() && !pika_raft::g_in_raft_apply) {
75-
// Raft mode: First call from client, skip write
76-
// Set OK response, actual write will happen in on_apply
77-
res_.SetRes(CmdRes::kOk);
78-
return;
79-
}
80-
81-
// Normal path: Either non-Raft mode or called from on_apply
8268
int32_t res = 1;
8369
STAGE_TIMER_GUARD(storage_duration_ms, true);
8470
switch (condition_) {

src/pika_server.cc

Lines changed: 12 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,15 @@ PikaServer::PikaServer()
114114
LOG(FATAL) << "Failed to initialize Raft manager: " << status.ToString();
115115
}
116116
LOG(INFO) << "Raft manager initialized successfully";
117+
118+
std::lock_guard rwl(storage_options_rw_);
119+
storage_options_.append_log_function =
120+
[this](const ::pikiwidb::Binlog& binlog, std::promise<rocksdb::Status>&& promise) {
121+
std::string db_name = "db0";
122+
123+
raft_manager_->AppendLog(db_name, binlog, std::move(promise));
124+
};
125+
LOG(INFO) << "Raft append_log_function registered in storage_options";
117126
}
118127

119128
bgsave_thread_.set_thread_name("PikaServer::bgsave_thread_");
@@ -237,58 +246,17 @@ void PikaServer::Start() {
237246
} else {
238247
LOG(INFO) << "Raft manager started successfully";
239248

240-
// 为每个数据库注册 binlog 回调
241249
for (const auto& db_item : dbs_) {
242250
std::string db_name = db_item.first;
243-
auto storage = db_item.second->storage(); // Returns std::shared_ptr<storage::Storage>
251+
auto storage = db_item.second->storage();
244252

245253
if (!storage) {
246-
LOG(WARNING) << "数据库 " << db_name << " 的 storage 为空,跳过回调注册";
254+
LOG(WARNING) << "Storage is null for DB: " << db_name;
247255
continue;
248256
}
249257

250-
// 设置存储引擎引用 (使用原始指针)
258+
storage->DisableWal(true);
251259
raft_manager_->SetStorage(storage.get());
252-
253-
// 注册 binlog 回调(使用 promise/future 同步)
254-
storage->SetBinlogWriteCallback(
255-
[this, db_name](const pikiwidb::Binlog& binlog, std::promise<rocksdb::Status>&& promise) {
256-
// 序列化 binlog
257-
std::string binlog_data;
258-
if (!binlog.SerializeToString(&binlog_data)) {
259-
LOG(ERROR) << "Failed to serialize binlog";
260-
promise.set_value(rocksdb::Status::Corruption("Binlog serialization failed"));
261-
return;
262-
}
263-
264-
LOG(INFO) << "收到 binlog 回调,数据库: " << db_name
265-
<< ", binlog 大小: " << binlog_data.size() << " 字节"
266-
<< ", entries: " << binlog.entries_size();
267-
268-
// 创建异步回调闭包
269-
auto* closure = new pika_raft::WriteDoneClosure(nullptr, nullptr);
270-
closure->SetBinlogData(binlog_data);
271-
272-
// TODO: 这里需要将 promise 传递给 closure,让 Raft apply 完成后设置结果
273-
// 目前先立即返回 OK,实际应该等待 Raft 应用完成
274-
275-
// 提交到 Raft
276-
auto apply_status = raft_manager_->ApplyBinlog(db_name, binlog_data, closure);
277-
if (!apply_status.ok()) {
278-
LOG(ERROR) << "提交 binlog 到 Raft 失败: " << apply_status.ToString();
279-
// 设置错误状态并调用 Run,让 closure 自己清理
280-
closure->status().set_error(-1, "%s", apply_status.ToString().c_str());
281-
closure->Run();
282-
promise.set_value(rocksdb::Status::IOError(apply_status.ToString()));
283-
} else {
284-
// TODO: 应该在 Raft apply 完成后才 set_value
285-
// 目前先简单地立即返回 OK
286-
promise.set_value(rocksdb::Status::OK());
287-
}
288-
}
289-
);
290-
291-
LOG(INFO) << "已为数据库 " << db_name << " 注册 binlog 回调";
292260
}
293261
}
294262
}

src/praft/include/praft/praft.h

Lines changed: 12 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,10 @@ namespace storage {
2929
class Storage;
3030
}
3131

32+
namespace pikiwidb {
33+
class Binlog;
34+
}
35+
3236
namespace net {
3337
class RedisConn;
3438
}
@@ -38,21 +42,6 @@ class Cmd;
3842
namespace pika_raft {
3943

4044
// Raft log entry data structure
41-
struct RaftLogEntry {
42-
std::string cmd_name;
43-
std::vector<std::string> args;
44-
std::string db_name;
45-
int64_t timestamp;
46-
47-
RaftLogEntry() : timestamp(0) {}
48-
49-
// Serialize to string
50-
std::string SerializeAsString() const;
51-
52-
// Deserialize from string
53-
bool ParseFromString(const std::string& data);
54-
};
55-
5645
// Write done closure for asynchronous Raft callback
5746
class WriteDoneClosure : public braft::Closure {
5847
public:
@@ -137,9 +126,6 @@ class PikaRaftNode {
137126
// Remove peer from the cluster
138127
pstd::Status RemovePeer(const braft::PeerId& peer);
139128

140-
// Apply a command to Raft
141-
pstd::Status Apply(const RaftLogEntry& entry);
142-
143129
// Get cluster status information
144130
void GetStatus(std::string* status_str);
145131

@@ -192,23 +178,17 @@ class RaftManager {
192178
// Check if Raft is enabled for a specific DB
193179
bool IsRaftEnabled(const std::string& db_name) const;
194180

195-
// Apply a command through Raft
196-
pstd::Status ApplyCommand(const std::string& db_name, const RaftLogEntry& entry);
197-
198181
// Apply binlog to Raft (called by storage callback)
199182
pstd::Status ApplyBinlog(const std::string& db_name,
200-
const std::string& binlog_data,
201-
WriteDoneClosure* done);
183+
const std::string& binlog_data,
184+
WriteDoneClosure* done);
202185

203-
// Submit command to Raft with promise (for synchronous waiting)
204-
pstd::Status SubmitCommandWithPromise(const std::string& db_name,
205-
const std::string& log_data,
206-
std::promise<rocksdb::Status>&& promise);
186+
// Append binlog (pikiwidb_raft 风格,直接接收 Binlog + promise)
187+
void AppendLog(const std::string& db_name,
188+
const ::pikiwidb::Binlog& log,
189+
std::promise<rocksdb::Status>&& promise);
207190

208-
// Apply command from Redis protocol (called in on_apply)
209-
rocksdb::Status ApplyCommandFromRedisProtocol(const std::string& redis_proto_data,
210-
const std::string& db_name);
211-
191+
// Submit command to Raft with promise (for synchronous waiting)
212192
// Get Raft node for a specific DB
213193
std::shared_ptr<PikaRaftNode> GetRaftNode(const std::string& db_name);
214194

@@ -222,7 +202,7 @@ class RaftManager {
222202
bool IsInitialized() const { return initialized_.load(); }
223203

224204
// Apply binlog entry to storage (public for PikaStateMachine to call)
225-
void ApplyBinlogEntry(const std::string& binlog_data);
205+
rocksdb::Status ApplyBinlogEntry(const std::string& binlog_data);
226206

227207
private:
228208
std::atomic<bool> initialized_;

src/praft/src/binlog.proto

Lines changed: 4 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,6 @@ package pikiwidb;
77

88
option optimize_for = LITE_RUNTIME;
99

10-
// 数据类型枚举
11-
enum DataType {
12-
kStrings = 0;
13-
kHashes = 1;
14-
kLists = 2;
15-
kSets = 3;
16-
kZSets = 4;
17-
kStreams = 5;
18-
}
19-
2010
// 操作类型
2111
enum OperateType {
2212
kNoOperate = 0;
@@ -26,28 +16,15 @@ enum OperateType {
2616

2717
// Binlog 条目(对应单个 RocksDB 操作)
2818
message BinlogEntry {
29-
DataType data_type = 1; // 数据类型(对应哪个 RocksDB)
19+
uint32 cf_idx = 1; // 列族索引 (column family index)
3020
OperateType op_type = 2; // 操作类型
3121
bytes key = 3; // 已编码的 key
3222
optional bytes value = 4; // 已编码的 value(包含 TTL、version 等)
33-
34-
// 用于日志恢复的元信息
35-
uint64 timestamp = 5; // 操作时间戳
36-
optional uint64 ttl = 6; // TTL(秒),0 表示永久
3723
}
3824

3925
// Binlog(对应一次 Raft 日志提交)
4026
message Binlog {
41-
uint32 db_id = 1; // 数据库 ID
42-
uint32 slot_idx = 2; // 槽位索引(预留)
43-
44-
// Raft 日志索引(对应 LogicOffset)
45-
uint32 term = 3; // Raft term
46-
uint64 log_index = 4; // Raft log index
47-
48-
// Binlog 文件位置(对应 BinlogOffset)
49-
uint32 filenum = 5; // Binlog 文件编号
50-
uint64 offset = 6; // Binlog 文件内偏移量
51-
52-
repeated BinlogEntry entries = 7; // 批量操作条目
27+
uint32 db_id = 1; // 数据库 ID
28+
uint32 slot_idx = 2; // 槽位索引(预留)
29+
repeated BinlogEntry entries = 3; // 批量操作条目
5330
}

0 commit comments

Comments
 (0)