Skip to content

Commit 0a646cb

Browse files
author
wuxianrong
committed
Enhance asynchronous performance optimization
1 parent 66b19c7 commit 0a646cb

File tree

21 files changed

+410
-795
lines changed

21 files changed

+410
-795
lines changed

conf/pika.conf

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -647,12 +647,6 @@ raft-enabled : no
647647
# All nodes in the same Raft cluster should have the same group-id
648648
raft-group-id : pika_raft_group
649649

650-
# Initial Raft peers (comma separated, e.g., 127.0.0.1:12221,127.0.0.1:12222)
651-
# Format: ip:port,ip:port,ip:port
652-
# The port should be: pika_port + 3000 (e.g., if pika port is 9221, raft port is 12221)
653-
# Leave empty if initializing a single-node cluster
654-
raft-peers :
655-
656650
# Raft election timeout in milliseconds
657651
# This is the time to wait before starting a new election if no heartbeat is received
658652
# A larger value reduces the chance of unnecessary elections but increases failover time

include/pika_command.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,6 @@ const std::string kCmdNameClearCache = "clearcache";
7070
// Raft commands
7171
const std::string kCmdNameRaftCluster = "raft.cluster";
7272
const std::string kCmdNameRaftNode = "raft.node";
73-
const std::string kCmdNameRaftConfig = "raft.config";
7473

7574
// Migrate slot
7675
const std::string kCmdNameSlotsMgrtSlot = "slotsmgrtslot";

include/pika_conf.h

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -893,10 +893,6 @@ class PikaConf : public pstd::BaseConf {
893893
std::shared_lock l(rwlock_);
894894
return raft_group_id_;
895895
}
896-
std::string raft_peers() {
897-
std::shared_lock l(rwlock_);
898-
return raft_peers_;
899-
}
900896
int raft_election_timeout_ms() {
901897
std::shared_lock l(rwlock_);
902898
return raft_election_timeout_ms_;
@@ -1092,7 +1088,6 @@ class PikaConf : public pstd::BaseConf {
10921088
// Raft configuration
10931089
bool raft_enabled_ = false;
10941090
std::string raft_group_id_;
1095-
std::string raft_peers_;
10961091
int raft_election_timeout_ms_ = 1000;
10971092
int raft_snapshot_interval_s_ = 3600;
10981093
};

include/pika_raft.h

Lines changed: 4 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,9 @@
2121
* Raft Commands
2222
*/
2323

24-
// RAFT.CLUSTER [INIT|JOIN|INFO] [args...]
25-
// INIT: RAFT.CLUSTER INIT peer1,peer2,peer3
26-
// JOIN: RAFT.CLUSTER JOIN leader_address
27-
// INFO: RAFT.CLUSTER INFO
24+
// RAFT.CLUSTER [INIT|INFO] [args...]
25+
// INIT: RAFT.CLUSTER INIT [peer1,peer2,peer3] (peers optional, no peers = prepare for cluster expansion)
26+
// INFO: RAFT.CLUSTER INFO [db_name]
2827
class RaftClusterCmd : public Cmd {
2928
public:
3029
RaftClusterCmd(const std::string& name, int arity, uint32_t flag)
@@ -36,7 +35,7 @@ class RaftClusterCmd : public Cmd {
3635
Cmd* Clone() override { return new RaftClusterCmd(*this); }
3736

3837
private:
39-
enum class Operation { INIT, JOIN, INFO, UNKNOWN };
38+
enum class Operation { INIT, INFO, UNKNOWN };
4039

4140
void DoInitial() override;
4241
void Clear() override {
@@ -76,33 +75,5 @@ class RaftNodeCmd : public Cmd {
7675
std::string db_name_;
7776
};
7877

79-
// RAFT.CONFIG GET|SET key [value] [db_name]
80-
class RaftConfigCmd : public Cmd {
81-
public:
82-
RaftConfigCmd(const std::string& name, int arity, uint32_t flag)
83-
: Cmd(name, arity, flag, static_cast<uint32_t>(AclCategory::ADMIN)) {}
84-
85-
void Do() override;
86-
void Split(const HintKeys& hint_keys) override {};
87-
void Merge() override {};
88-
Cmd* Clone() override { return new RaftConfigCmd(*this); }
89-
90-
private:
91-
enum class Operation { GET, SET, UNKNOWN };
92-
93-
void DoInitial() override;
94-
void Clear() override {
95-
operation_ = Operation::UNKNOWN;
96-
config_key_.clear();
97-
config_value_.clear();
98-
db_name_.clear();
99-
}
100-
101-
Operation operation_;
102-
std::string config_key_;
103-
std::string config_value_;
104-
std::string db_name_;
105-
};
106-
10778
#endif // PIKA_RAFT_H_
10879

src/pika_command.cc

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -168,10 +168,6 @@ void InitCmdTable(CmdTable* cmd_table) {
168168
std::make_unique<RaftNodeCmd>(kCmdNameRaftNode, -3, kCmdFlagsWrite | kCmdFlagsAdmin | kCmdFlagsSlow);
169169
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameRaftNode, std::move(raftnodeptr)));
170170

171-
std::unique_ptr<Cmd> raftconfigptr =
172-
std::make_unique<RaftConfigCmd>(kCmdNameRaftConfig, -2, kCmdFlagsRead | kCmdFlagsAdmin | kCmdFlagsSlow);
173-
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameRaftConfig, std::move(raftconfigptr)));
174-
175171
#ifdef WITH_COMMAND_DOCS
176172
std::unique_ptr<Cmd> commandptr =
177173
std::make_unique<CommandCmd>(kCmdNameCommand, -1, kCmdFlagsRead | kCmdFlagsAdmin | kCmdFlagsSlow);

src/pika_conf.cc

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -713,8 +713,6 @@ int PikaConf::Load() {
713713
raft_group_id_ = "pika_raft_group";
714714
}
715715

716-
GetConfStr("raft-peers", &raft_peers_);
717-
718716
GetConfInt("raft-election-timeout-ms", &raft_election_timeout_ms_);
719717
if (raft_election_timeout_ms_ <= 0) {
720718
raft_election_timeout_ms_ = 1000;

src/pika_kv.cc

Lines changed: 57 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,11 @@
1212
#include "include/pika_cache.h"
1313
#include "include/pika_conf.h"
1414
#include "include/pika_slot_command.h"
15+
#include "include/pika_server.h"
16+
#include "praft/praft.h"
1517

1618
extern std::unique_ptr<PikaConf> g_pika_conf;
19+
extern PikaServer* g_pika_server;
1720
/* SET key value [NX] [XX] [EX <seconds>] [PX <milliseconds>] */
1821
void SetCmd::DoInitial() {
1922
if (!CheckArg(argv_.size())) {
@@ -67,24 +70,74 @@ void SetCmd::DoInitial() {
6770
void SetCmd::Do() {
6871
int32_t res = 1;
6972
STAGE_TIMER_GUARD(storage_duration_ms, true);
73+
74+
bool is_raft_leader = false;
75+
if (g_pika_server && g_pika_server->GetRaftManager()) {
76+
auto node = g_pika_server->GetRaftManager()->GetRaftNode(db_->GetDBName());
77+
is_raft_leader = (node && node->IsLeader());
78+
}
79+
80+
storage::CommitCallback callback = nullptr;
81+
82+
if (is_raft_leader && db_->storage() && db_->storage()->IsRaftEnabled()) {
83+
auto self = std::static_pointer_cast<SetCmd>(shared_from_this());
84+
auto resp_ptr = std::make_shared<std::string>();
85+
86+
auto pika_conn = std::dynamic_pointer_cast<PikaClientConn>(GetConn());
87+
if (!pika_conn) {
88+
res_.SetRes(CmdRes::kErrOther, "Invalid connection");
89+
return;
90+
}
91+
92+
callback = [self, resp_ptr, pika_conn](rocksdb::Status status) {
93+
int32_t result = (status.ok() || status.IsNotFound()) ? 1 : 0;
94+
95+
if (status.ok() || status.IsNotFound()) {
96+
if (self->condition_ == SetCmd::kVX) {
97+
self->res_.AppendInteger(self->success_);
98+
} else {
99+
if (result == 1) {
100+
self->res_.SetRes(CmdRes::kOk);
101+
AddSlotKey("k", self->key_, self->db_);
102+
} else {
103+
self->res_.AppendStringLen(-1);
104+
}
105+
}
106+
} else {
107+
self->res_.SetRes(CmdRes::kErrOther, status.ToString());
108+
}
109+
110+
*resp_ptr = std::move(self->res_.message());
111+
pika_conn->WriteResp(*resp_ptr);
112+
pika_conn->NotifyEpoll(true);
113+
};
114+
}
115+
116+
// Call storage layer with optional callback
70117
switch (condition_) {
71118
case SetCmd::kXX:
72-
s_ = db_->storage()->Setxx(key_, value_, &res, static_cast<int32_t>(sec_));
119+
s_ = db_->storage()->Setxx(key_, value_, &res, static_cast<int32_t>(sec_), callback);
73120
break;
74121
case SetCmd::kNX:
75-
s_ = db_->storage()->Setnx(key_, value_, &res, static_cast<int32_t>(sec_));
122+
s_ = db_->storage()->Setnx(key_, value_, &res, static_cast<int32_t>(sec_), callback);
76123
break;
77124
case SetCmd::kVX:
78125
s_ = db_->storage()->Setvx(key_, target_, value_, &success_, static_cast<int32_t>(sec_));
79126
break;
80127
case SetCmd::kEXORPX:
81-
s_ = db_->storage()->Setex(key_, value_, static_cast<int32_t>(sec_));
128+
s_ = db_->storage()->Setex(key_, value_, static_cast<int32_t>(sec_), callback);
82129
break;
83130
default:
84-
s_ = db_->storage()->Set(key_, value_);
131+
s_ = db_->storage()->Set(key_, value_, callback);
85132
break;
86133
}
87134

135+
// For async mode (Leader), response is handled by callback
136+
if (is_raft_leader && db_->storage() && db_->storage()->IsRaftEnabled()) {
137+
return;
138+
}
139+
140+
// For sync mode (non-Leader or non-Raft), set response immediately
88141
if (s_.ok() || s_.IsNotFound()) {
89142
if (condition_ == SetCmd::kVX) {
90143
res_.AppendInteger(success_);

0 commit comments

Comments
 (0)