Skip to content

Commit def88bd

Browse files
author
wuxianrong
committed
Enhance asynchronous performance optimization
1 parent 66b19c7 commit def88bd

File tree

13 files changed

+386
-202
lines changed

13 files changed

+386
-202
lines changed

src/pika_kv.cc

Lines changed: 57 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,11 @@
1212
#include "include/pika_cache.h"
1313
#include "include/pika_conf.h"
1414
#include "include/pika_slot_command.h"
15+
#include "include/pika_server.h"
16+
#include "praft/praft.h"
1517

1618
extern std::unique_ptr<PikaConf> g_pika_conf;
19+
extern PikaServer* g_pika_server;
1720
/* SET key value [NX] [XX] [EX <seconds>] [PX <milliseconds>] */
1821
void SetCmd::DoInitial() {
1922
if (!CheckArg(argv_.size())) {
@@ -67,24 +70,74 @@ void SetCmd::DoInitial() {
6770
void SetCmd::Do() {
6871
int32_t res = 1;
6972
STAGE_TIMER_GUARD(storage_duration_ms, true);
73+
74+
bool is_raft_leader = false;
75+
if (g_pika_server && g_pika_server->GetRaftManager()) {
76+
auto node = g_pika_server->GetRaftManager()->GetRaftNode(db_->GetDBName());
77+
is_raft_leader = (node && node->IsLeader());
78+
}
79+
80+
storage::CommitCallback callback = nullptr;
81+
82+
if (is_raft_leader && db_->storage() && db_->storage()->IsRaftEnabled()) {
83+
auto self = std::static_pointer_cast<SetCmd>(shared_from_this());
84+
auto resp_ptr = std::make_shared<std::string>();
85+
86+
auto pika_conn = std::dynamic_pointer_cast<PikaClientConn>(GetConn());
87+
if (!pika_conn) {
88+
res_.SetRes(CmdRes::kErrOther, "Invalid connection");
89+
return;
90+
}
91+
92+
callback = [self, resp_ptr, pika_conn](rocksdb::Status status) {
93+
int32_t result = (status.ok() || status.IsNotFound()) ? 1 : 0;
94+
95+
if (status.ok() || status.IsNotFound()) {
96+
if (self->condition_ == SetCmd::kVX) {
97+
self->res_.AppendInteger(self->success_);
98+
} else {
99+
if (result == 1) {
100+
self->res_.SetRes(CmdRes::kOk);
101+
AddSlotKey("k", self->key_, self->db_);
102+
} else {
103+
self->res_.AppendStringLen(-1);
104+
}
105+
}
106+
} else {
107+
self->res_.SetRes(CmdRes::kErrOther, status.ToString());
108+
}
109+
110+
*resp_ptr = std::move(self->res_.message());
111+
pika_conn->WriteResp(*resp_ptr);
112+
pika_conn->NotifyEpoll(true);
113+
};
114+
}
115+
116+
// Call storage layer with optional callback
70117
switch (condition_) {
71118
case SetCmd::kXX:
72-
s_ = db_->storage()->Setxx(key_, value_, &res, static_cast<int32_t>(sec_));
119+
s_ = db_->storage()->Setxx(key_, value_, &res, static_cast<int32_t>(sec_), callback);
73120
break;
74121
case SetCmd::kNX:
75-
s_ = db_->storage()->Setnx(key_, value_, &res, static_cast<int32_t>(sec_));
122+
s_ = db_->storage()->Setnx(key_, value_, &res, static_cast<int32_t>(sec_), callback);
76123
break;
77124
case SetCmd::kVX:
78125
s_ = db_->storage()->Setvx(key_, target_, value_, &success_, static_cast<int32_t>(sec_));
79126
break;
80127
case SetCmd::kEXORPX:
81-
s_ = db_->storage()->Setex(key_, value_, static_cast<int32_t>(sec_));
128+
s_ = db_->storage()->Setex(key_, value_, static_cast<int32_t>(sec_), callback);
82129
break;
83130
default:
84-
s_ = db_->storage()->Set(key_, value_);
131+
s_ = db_->storage()->Set(key_, value_, callback);
85132
break;
86133
}
87134

135+
// For async mode (Leader), response is handled by callback
136+
if (is_raft_leader && db_->storage() && db_->storage()->IsRaftEnabled()) {
137+
return;
138+
}
139+
140+
// For sync mode (non-Leader or non-Raft), set response immediately
88141
if (s_.ok() || s_.IsNotFound()) {
89142
if (condition_ == SetCmd::kVX) {
90143
res_.AppendInteger(success_);

src/pika_raft.cc

Lines changed: 6 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -204,21 +204,9 @@ void RaftNodeCmd::Do() {
204204
<< ", peer: " << peer_addr_;
205205
status = raft_mgr->AddNode(db_name_, peer_addr_);
206206
if (status.ok()) {
207-
// Update raft-peers in config file
208-
std::string current_peers = g_pika_conf->raft_peers();
209-
if (current_peers.empty()) {
210-
current_peers = peer_addr_;
211-
} else if (current_peers.find(peer_addr_) == std::string::npos) {
212-
current_peers += "," + peer_addr_;
213-
}
214-
215-
if (g_pika_conf->SetConfStr("raft-peers", current_peers)) {
216-
if (g_pika_conf->WriteBack()) {
217-
LOG(INFO) << "Updated raft-peers in config file: " << current_peers;
218-
} else {
219-
LOG(WARNING) << "Failed to write raft-peers to config file";
220-
}
221-
}
207+
// Don't modify config file - braft manages cluster membership in raft_meta
208+
// The raft-peers in config file is only used for initial bootstrap
209+
LOG(INFO) << "Node added successfully to Raft cluster (managed by braft raft_meta)";
222210

223211
res_.AppendStringRaw("+OK\r\n");
224212
} else {
@@ -232,38 +220,9 @@ void RaftNodeCmd::Do() {
232220
<< ", peer: " << peer_addr_;
233221
status = raft_mgr->RemoveNode(db_name_, peer_addr_);
234222
if (status.ok()) {
235-
// Update raft-peers in config file
236-
std::string current_peers = g_pika_conf->raft_peers();
237-
if (!current_peers.empty()) {
238-
// Parse and rebuild peer list without the removed peer
239-
std::vector<std::string> peer_list;
240-
std::stringstream ss(current_peers);
241-
std::string peer;
242-
while (std::getline(ss, peer, ',')) {
243-
peer.erase(0, peer.find_first_not_of(" \t"));
244-
peer.erase(peer.find_last_not_of(" \t") + 1);
245-
if (!peer.empty() && peer != peer_addr_) {
246-
peer_list.push_back(peer);
247-
}
248-
}
249-
250-
// Rebuild peers string
251-
std::string new_peers;
252-
for (size_t i = 0; i < peer_list.size(); i++) {
253-
new_peers += peer_list[i];
254-
if (i < peer_list.size() - 1) {
255-
new_peers += ",";
256-
}
257-
}
258-
259-
if (g_pika_conf->SetConfStr("raft-peers", new_peers)) {
260-
if (g_pika_conf->WriteBack()) {
261-
LOG(INFO) << "Updated raft-peers in config file: " << new_peers;
262-
} else {
263-
LOG(WARNING) << "Failed to write raft-peers to config file";
264-
}
265-
}
266-
}
223+
// Don't modify config file - braft manages cluster membership in raft_meta
224+
// The raft-peers in config file is only used for initial bootstrap
225+
LOG(INFO) << "Node removed successfully from Raft cluster (managed by braft raft_meta)";
267226

268227
res_.AppendStringRaw("+OK\r\n");
269228
} else {

src/pika_server.cc

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,10 +117,11 @@ PikaServer::PikaServer()
117117

118118
std::lock_guard rwl(storage_options_rw_);
119119
storage_options_.append_log_function =
120-
[this](const ::pikiwidb::Binlog& binlog, std::promise<rocksdb::Status>&& promise) {
120+
[this](const ::pikiwidb::Binlog& binlog, std::promise<rocksdb::Status>&& promise,
121+
storage::CommitCallback callback) {
121122
std::string db_name = "db0";
122123

123-
raft_manager_->AppendLog(db_name, binlog, std::move(promise));
124+
raft_manager_->AppendLog(db_name, binlog, std::move(promise), callback);
124125
};
125126
LOG(INFO) << "Raft append_log_function registered in storage_options";
126127
}

src/praft/include/praft/praft.h

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,10 @@
2121
#include "pstd/include/pstd_mutex.h"
2222
#include "pstd/include/pstd_status.h"
2323
#include "rocksdb/status.h"
24+
#include "storage/batch.h"
2425

2526
class PikaServer;
27+
class Cmd;
2628

2729
// Forward declarations
2830
namespace storage {
@@ -33,6 +35,10 @@ namespace pikiwidb {
3335
class Binlog;
3436
}
3537

38+
namespace net {
39+
class NetConn;
40+
}
41+
3642
namespace pika_raft {
3743

3844
// Raft log entry data structure
@@ -44,13 +50,22 @@ class WriteDoneClosure : public braft::Closure {
4450

4551
void Run() override;
4652

47-
// Set promise for synchronous Raft apply
53+
// Set promise for synchronous Raft apply (used in Follower on_apply)
4854
void SetPromise(std::shared_ptr<std::promise<rocksdb::Status>> p) {
4955
promise_ = p;
5056
}
57+
58+
// Set callback for async response (used in Leader)
59+
void SetCallback(storage::CommitCallback callback) {
60+
callback_ = callback;
61+
}
5162

5263
private:
64+
// For synchronous mode (Follower)
5365
std::shared_ptr<std::promise<rocksdb::Status>> promise_;
66+
67+
// For asynchronous mode (Leader)
68+
storage::CommitCallback callback_;
5469
};
5570

5671
// Pika state machine implementation
@@ -162,10 +177,12 @@ class RaftManager {
162177
// Get cluster information
163178
pstd::Status GetClusterInfo(const std::string& db_name, std::string* info);
164179

165-
// Append binlog
180+
// Append binlog (supports both sync and async modes)
181+
// Sync mode: pass promise, async mode: pass callback
166182
void AppendLog(const std::string& db_name,
167183
const ::pikiwidb::Binlog& log,
168-
std::promise<rocksdb::Status>&& promise);
184+
std::promise<rocksdb::Status>&& promise,
185+
storage::CommitCallback callback = nullptr);
169186

170187
// Get Raft node for a specific DB
171188
std::shared_ptr<PikaRaftNode> GetRaftNode(const std::string& db_name);

src/praft/src/binlog.proto

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,24 @@ enum OperateType {
1414
kDelete = 2;
1515
}
1616

17+
// 数据类型(与 storage::DataType 对应)
18+
enum DataType {
19+
kAll = 0;
20+
kStrings = 1;
21+
kHashes = 2;
22+
kLists = 3;
23+
kZSets = 4;
24+
kSets = 5;
25+
kStreams = 6;
26+
}
27+
1728
// Binlog 条目(对应单个 RocksDB 操作)
1829
message BinlogEntry {
19-
uint32 cf_idx = 1; // 列族索引 (column family index)
20-
OperateType op_type = 2; // 操作类型
21-
bytes key = 3; // 已编码的 key
22-
optional bytes value = 4; // 已编码的 value(包含 TTL、version 等)
30+
DataType data_type = 1; // 数据类型(用于定位对应的 RocksDB 实例)
31+
uint32 cf_idx = 2; // 列族索引 (column family index, 0=meta/default, 1=data)
32+
OperateType op_type = 3; // 操作类型
33+
bytes key = 4; // 已编码的 key
34+
optional bytes value = 5; // 已编码的 value(包含 TTL、version 等)
2335
}
2436

2537
// Binlog(对应一次 Raft 日志提交)

0 commit comments

Comments
 (0)