Skip to content

Commit 484aaad

Browse files
committed
fix some bugs
1 parent aea793b commit 484aaad

File tree

9 files changed

+71
-121
lines changed

9 files changed

+71
-121
lines changed

include/pika_client_conn.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ class PikaClientConn : public net::RedisConn {
113113
std::vector<std::shared_ptr<std::string>> resp_array;
114114

115115
std::shared_ptr<TimeStat> time_stat_;
116-
116+
void TryWriteResp();
117117
private:
118118
net::ServerThread* const server_thread_;
119119
std::string current_db_;
@@ -134,7 +134,7 @@ class PikaClientConn : public net::RedisConn {
134134
void ProcessMonitor(const PikaCmdArgsType& argv);
135135

136136
void ExecRedisCmd(const PikaCmdArgsType& argv, std::shared_ptr<std::string>& resp_ptr, bool cache_miss_in_rtc);
137-
void TryWriteResp();
137+
// void TryWriteResp();
138138
};
139139

140140
struct ClientInfo {

include/pika_command_collector.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ class PikaCommandCollector {
6262
* @param callback callback function after processing is completed
6363
* @return whether the addition was successful
6464
*/
65-
bool AddCommand(const std::shared_ptr<Cmd>& cmd_ptr, CommandCallback callback);
65+
bool AddCommand(std::shared_ptr<Cmd> cmd_ptr, CommandCallback callback);
6666

6767
/**
6868
* @brief Set the batch max wait time

include/pika_kv.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ class SetCmd : public Cmd {
2424
res.push_back(key_);
2525
return res;
2626
}
27+
~SetCmd();
2728
void Do() override;
2829
void DoUpdateCache() override;
2930
void DoThroughDB() override;

src/pika_client_conn.cc

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -591,8 +591,13 @@ void PikaClientConn::ExecRedisCmd(const PikaCmdArgsType& argv, std::shared_ptr<s
591591
}
592592

593593
std::shared_ptr<Cmd> cmd_ptr = DoCmd(argv, opt, resp_ptr, cache_miss_in_rtc);
594-
*resp_ptr = std::move(cmd_ptr->res().message());
595-
resp_num--;
594+
// *resp_ptr = std::move(cmd_ptr->res().message());
595+
// resp_num--;
596+
if (opt == kCmdNameSet) {
597+
} else {
598+
*resp_ptr = std::move(cmd_ptr->res().message());
599+
resp_num--;
600+
}
596601
}
597602

598603
std::queue<std::shared_ptr<Cmd>> PikaClientConn::GetTxnCmdQue() { return txn_cmd_que_; }

src/pika_command_collector.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ PikaCommandCollector::~PikaCommandCollector() {
3030
<< " commands, " << total_batches_.load() << " batches";
3131
}
3232

33-
bool PikaCommandCollector::AddCommand(const std::shared_ptr<Cmd>& cmd_ptr, CommandCallback callback) {
33+
bool PikaCommandCollector::AddCommand(std::shared_ptr<Cmd> cmd_ptr, CommandCallback callback) {
3434
if (!cmd_ptr || !cmd_ptr->is_write()) {
3535
LOG(WARNING) << "Attempt to add non-write command to CommandCollector";
3636
return false;

src/pika_consensus.cc

Lines changed: 14 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -868,7 +868,7 @@ Status ConsensusCoordinator::AppendEntries(const std::shared_ptr<Cmd>& cmd_ptr,
868868
return s;
869869
}
870870

871-
g_pika_server->SignalAuxiliary();
871+
// g_pika_server->SignalAuxiliary();
872872
return Status::OK();
873873
}
874874
Status ConsensusCoordinator::AppendSlaveEntries(const std::shared_ptr<Cmd>& cmd_ptr, const BinlogItem& attribute) {
@@ -914,7 +914,8 @@ Status ConsensusCoordinator::UpdateCommittedID() {
914914
LogOffset slave_prepared_id = LogOffset();
915915

916916
for (const auto& slave : slaves) {
917-
if (slave.second->slave_state == kSlaveBinlogSync) {
917+
// Consider both kSlaveBinlogSync and KCandidate slaves for CommittedID calculation
918+
if (slave.second->slave_state == kSlaveBinlogSync || slave.second->slave_state == KCandidate) {
918919
if (slave_prepared_id == LogOffset()) {
919920
slave_prepared_id = slave.second->acked_offset;
920921
} else if (slave.second->acked_offset < slave_prepared_id) {
@@ -990,86 +991,28 @@ void ConsensusCoordinator::BatchInternalApplyFollower(const std::vector<std::sha
990991

991992
Status ConsensusCoordinator::SendBinlog(std::shared_ptr<SlaveNode> slave_ptr, std::string db_name) {
992993
std::vector<WriteTask> tasks;
993-
const int MAX_BATCH_SIZE = 100; // Maximum number of logs to send in a single batch
994-
995-
// Get current committed_id to ensure it's sent to the slave
996-
LogOffset current_committed_id = GetCommittedId();
997-
LOG(INFO) << "SendBinlog: [Thread " << std::this_thread::get_id() << "] Current committed_id: " << current_committed_id.ToString()
998-
<< ", sending to slave " << slave_ptr->Ip() << ":" << slave_ptr->Port()
999-
<< ", logs_ addr: " << logs_.get() << ", db_name: " << db_name_;
1000994

1001995
// Check if there are new log entries that need to be sent to the slave
1002-
LOG(INFO) << "SendBinlog: logs_->LastOffset()=" << logs_->LastOffset().ToString()
1003-
<< ", slave_ptr->acked_offset=" << slave_ptr->acked_offset.ToString()
1004-
<< ", logs_->Size()=" << logs_->Size();
1005-
1006-
if (logs_->Size() > 0 && logs_->LastOffset() >= slave_ptr->acked_offset) {
996+
if (logs_->LastOffset() >= slave_ptr->acked_offset) {
1007997
// Find the index of the log entry corresponding to the slave's acknowledged offset
1008998
int index = logs_->FindOffset(slave_ptr->acked_offset);
1009-
int entries_to_send = logs_->Size() - index;
1010-
LOG(INFO) << "SendBinlog: Found " << entries_to_send << " new log entries to send, "
1011-
<< "starting from index " << index << " of " << logs_->Size();
1012-
1013999
if (index < logs_->Size()) {
1014-
// Send log entries in optimized batches
1015-
RmNode rm_node(slave_ptr->Ip(), slave_ptr->Port(), db_name, slave_ptr->SessionId());
1016-
1017-
// For large batches, use specialized batch handling
1018-
if (entries_to_send > MAX_BATCH_SIZE) {
1019-
LOG(INFO) << "SendBinlog: Using optimized batch sending for " << entries_to_send << " entries";
1020-
1021-
// Process in chunks of MAX_BATCH_SIZE
1022-
for (int batch_start = index; batch_start < logs_->Size(); batch_start += MAX_BATCH_SIZE) {
1023-
int batch_end = std::min(batch_start + MAX_BATCH_SIZE, logs_->Size());
1024-
std::vector<WriteTask> batch_tasks;
1025-
1026-
for (int i = batch_start; i < batch_end; ++i) {
1027-
Log::LogItem item = logs_->At(i);
1028-
WriteTask task(rm_node, BinlogChip(item.offset, item.binlog_), item.offset, current_committed_id);
1029-
batch_tasks.push_back(task);
1030-
}
1031-
1032-
g_pika_rm->ProduceWriteQueue(slave_ptr->Ip(), slave_ptr->Port(), db_name, batch_tasks);
1033-
LOG(INFO) << "SendBinlog: Sent batch " << (batch_start - index) / MAX_BATCH_SIZE + 1
1034-
<< " with " << (batch_end - batch_start) << " entries";
1035-
}
1036-
} else {
1037-
// Send all entries in a single batch
1038-
for (int i = index; i < logs_->Size(); ++i) {
1039-
Log::LogItem item = logs_->At(i);
1040-
WriteTask task(rm_node, BinlogChip(item.offset, item.binlog_), item.offset, current_committed_id);
1041-
tasks.push_back(task);
1042-
}
1000+
for (int i = index; i < logs_->Size(); ++i) {
1001+
const Log::LogItem& item = logs_->At(i);
1002+
1003+
slave_ptr->SetLastSendTime(pstd::NowMicros());
1004+
1005+
RmNode rm_node(slave_ptr->Ip(), slave_ptr->Port(), slave_ptr->DBName(), slave_ptr->SessionId());
1006+
WriteTask task(rm_node, BinlogChip(item.offset, item.binlog_), slave_ptr->sent_offset, GetCommittedId());
1007+
tasks.emplace_back(std::move(task));
1008+
1009+
slave_ptr->sent_offset = item.offset;
10431010
}
1044-
} else {
1045-
LOG(INFO) << "SendBinlog: No new log entries to send, index " << index << " is out of range (logs size: " << logs_->Size() << ")";
1046-
}
1047-
} else {
1048-
if (logs_->Size() == 0) {
1049-
LOG(INFO) << "SendBinlog: No logs available yet (logs_->Size()=0), will send empty binlog to maintain connection";
1050-
} else {
1051-
LOG(INFO) << "SendBinlog: Slave is already up to date, last offset: " << logs_->LastOffset().ToString()
1052-
<< ", slave acked offset: " << slave_ptr->acked_offset.ToString();
10531011
}
10541012
}
10551013

1056-
// Only send empty binlog if there are no actual log entries to send
1057-
// This prevents the deadlock where master waits for slave ACK and slave waits for master data
1058-
if (tasks.empty() && logs_->Size() == 0) {
1059-
// LOG(INFO) << "SendBinlog: Sending empty binlog with current committed_id: " << current_committed_id.ToString();
1060-
RmNode rm_node(slave_ptr->Ip(), slave_ptr->Port(), db_name, slave_ptr->SessionId());
1061-
// Create an empty WriteTask that includes the current committed_id
1062-
WriteTask empty_task(rm_node, BinlogChip(LogOffset(), ""), LogOffset(), current_committed_id);
1063-
tasks.push_back(empty_task);
1064-
}
1065-
1066-
// Send the tasks to the slave
10671014
if (!tasks.empty()) {
1068-
LOG(INFO) << "SendBinlog: Sending " << tasks.size() << " tasks to slave " << slave_ptr->Ip() << ":" << slave_ptr->Port();
1069-
extern std::unique_ptr<PikaReplicaManager> g_pika_rm;
10701015
g_pika_rm->ProduceWriteQueue(slave_ptr->Ip(), slave_ptr->Port(), db_name, tasks);
1071-
} else {
1072-
LOG(INFO) << "SendBinlog: No tasks to send to slave " << slave_ptr->Ip() << ":" << slave_ptr->Port();
10731016
}
10741017
return Status::OK();
10751018
}

src/pika_kv.cc

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,22 @@
1515

1616
extern std::unique_ptr<PikaConf> g_pika_conf;
1717
/* SET key value [NX] [XX] [EX <seconds>] [PX <milliseconds>] */
18+
SetCmd::~SetCmd() {
19+
auto tmp_conn = GetConn();
20+
if (!tmp_conn) {
21+
return;
22+
}
23+
24+
auto pc = dynamic_cast<PikaClientConn*>(tmp_conn.get());
25+
std::shared_ptr<std::string> resp_ptr = std::make_shared<std::string>();
26+
*resp_ptr = std::move(res().message());
27+
pc->resp_num--;
28+
pc->resp_array.push_back(resp_ptr);
29+
pc->TryWriteResp();
30+
LOG(INFO) << "SetCmd::~SetCmd() is completed";
31+
}
1832
void SetCmd::DoInitial() {
33+
LOG(INFO) << "SetCmd::DoInitial() is started";
1934
if (!CheckArg(argv_.size())) {
2035
res_.SetRes(CmdRes::kWrongNum, kCmdNameSet);
2136
return;
@@ -65,6 +80,7 @@ void SetCmd::DoInitial() {
6580
}
6681

6782
void SetCmd::Do() {
83+
LOG(INFO) << "SetCmd::Do() is started";
6884
int32_t res = 1;
6985
STAGE_TIMER_GUARD(storage_duration_ms, true);
7086
switch (condition_) {
@@ -102,10 +118,12 @@ void SetCmd::Do() {
102118
}
103119

104120
void SetCmd::DoThroughDB() {
121+
LOG(INFO) << "SetCmd::DoThroughDB() is started";
105122
Do();
106123
}
107124

108125
void SetCmd::DoUpdateCache() {
126+
LOG(INFO) << "SetCmd::DoUpdateCache() is started";
109127
if (SetCmd::kNX == condition_ || IsTooLargeKey(g_pika_conf->max_key_size_in_cache())) {
110128
return;
111129
}
@@ -121,6 +139,7 @@ void SetCmd::DoUpdateCache() {
121139
}
122140

123141
std::string SetCmd::ToRedisProtocol() {
142+
LOG(INFO) << "SetCmd::ToRedisProtocol() is started";
124143
if (condition_ == SetCmd::kEXORPX) {
125144
std::string content;
126145
content.reserve(RAW_ARGS_LEN);

src/pika_rm.cc

Lines changed: 16 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1448,43 +1448,23 @@ void PikaReplicaManager::ProcessCommandBatches(const std::vector<std::shared_ptr
14481448
const auto& cmd_ptr = all_commands[cmd_idx];
14491449
LogOffset cmd_offset;
14501450

1451-
if (sync_db->GetISConsistency()) {
1452-
// For consistency mode, use ConsensusProposeLog directly
1453-
pstd::Status s = sync_db->ConsensusProposeLog(cmd_ptr);
1454-
if (!s.ok()) {
1455-
LOG(ERROR) << "Failed to append command " << cmd_ptr->name() << ": " << s.ToString();
1456-
// Call callbacks with error for remaining commands
1457-
for (size_t error_idx = cmd_idx; error_idx < all_commands.size(); ++error_idx) {
1458-
auto& batch_info = command_to_batch_map[error_idx];
1459-
auto batch = batch_info.first;
1460-
size_t batch_cmd_idx = batch_info.second;
1461-
if (batch_cmd_idx < batch->callbacks.size() && batch->callbacks[batch_cmd_idx]) {
1462-
batch->callbacks[batch_cmd_idx](LogOffset(), s);
1463-
}
1464-
}
1465-
return;
1466-
}
1467-
// Get the prepared ID as the offset
1468-
cmd_offset = sync_db->GetPreparedId();
1469-
} else {
1470-
// For non-consistency mode, use ConsensusProposeLog as well
1471-
pstd::Status s = sync_db->ConsensusProposeLog(cmd_ptr);
1472-
if (!s.ok()) {
1473-
LOG(ERROR) << "Failed to propose command " << cmd_ptr->name() << ": " << s.ToString();
1474-
// Call callbacks with error for remaining commands
1475-
for (size_t error_idx = cmd_idx; error_idx < all_commands.size(); ++error_idx) {
1476-
auto& batch_info = command_to_batch_map[error_idx];
1477-
auto batch = batch_info.first;
1478-
size_t batch_cmd_idx = batch_info.second;
1479-
if (batch_cmd_idx < batch->callbacks.size() && batch->callbacks[batch_cmd_idx]) {
1480-
batch->callbacks[batch_cmd_idx](LogOffset(), s);
1481-
}
1451+
pstd::Status s = sync_db->ConsensusProposeLog(cmd_ptr);
1452+
if (!s.ok()) {
1453+
LOG(ERROR) << "Failed to " << (sync_db->GetISConsistency() ? "append" : "propose")
1454+
<< " command " << cmd_ptr->name() << ": " << s.ToString();
1455+
// Call callbacks with error for remaining commands
1456+
for (size_t error_idx = cmd_idx; error_idx < all_commands.size(); ++error_idx) {
1457+
auto& batch_info = command_to_batch_map[error_idx];
1458+
auto batch = batch_info.first;
1459+
size_t batch_cmd_idx = batch_info.second;
1460+
if (batch_cmd_idx < batch->callbacks.size() && batch->callbacks[batch_cmd_idx]) {
1461+
batch->callbacks[batch_cmd_idx](LogOffset(), s);
14821462
}
1483-
return;
14841463
}
1485-
// Get the prepared ID as the offset
1486-
cmd_offset = sync_db->GetPreparedId();
1464+
return;
14871465
}
1466+
// Get the prepared ID as the offset
1467+
cmd_offset = sync_db->GetPreparedId();
14881468

14891469
all_offsets[cmd_idx] = cmd_offset;
14901470

@@ -1597,6 +1577,7 @@ void PikaReplicaManager::RocksDBThreadLoop() {
15971577
}
15981578

15991579
size_t PikaReplicaManager::ProcessCommittedBatchGroups(const LogOffset& committed_id) {
1580+
LOG(INFO) << "ProcessCommittedBatchGroups started";
16001581
std::queue<std::shared_ptr<BatchGroup>> groups_to_process;
16011582
// Get pending BatchGroups and separate committed from uncommitted
16021583
{
@@ -1662,6 +1643,7 @@ size_t PikaReplicaManager::ProcessCommittedBatchGroups(const LogOffset& committe
16621643
}
16631644
}
16641645
}
1646+
16651647
return groups_count;
16661648
}
16671649

tests/integration/clean_start.sh

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -38,17 +38,17 @@ echo "Replication setup successful."
3838

3939
echo "Running benchmark..."
4040

41-
redis-benchmark -p 9301 -t set -r 100000 -n 100000 -c 10000 --threads 4
42-
echo "Benchmark finished."
41+
#redis-benchmark -p 9301 -t set -r 100000 -n 100000 -c 10000 --threads 4
42+
# echo "Benchmark finished."
4343

44-
echo -e "\n==== 主节点 INFO 日志 ===="
45-
tail -n 150 ./pacifica_test/master/log/pika.INFO
44+
# echo -e "\n==== 主节点 INFO 日志 ===="
45+
# tail -n 150 ./pacifica_test/master/log/pika.INFO
4646

47-
echo -e "\n==== 主节点 WARNING 日志 ===="
48-
tail -n 150 ./pacifica_test/master/log/pika.WARNING
47+
# echo -e "\n==== 主节点 WARNING 日志 ===="
48+
# tail -n 150 ./pacifica_test/master/log/pika.WARNING
4949

50-
echo -e "\n==== 从节点 INFO 日志 ===="
51-
tail -n 150 ./pacifica_test/slave1/log/pika.INFO
50+
# echo -e "\n==== 从节点 INFO 日志 ===="
51+
# tail -n 150 ./pacifica_test/slave1/log/pika.INFO
5252

53-
echo -e "\n==== 从节点 WARNING 日志 ===="
54-
tail -n 150 ./pacifica_test/slave1/log/pika.WARNING
53+
# echo -e "\n==== 从节点 WARNING 日志 ===="
54+
# tail -n 150 ./pacifica_test/slave1/log/pika.WARNING

0 commit comments

Comments
 (0)