Skip to content

Commit c0d78eb

Browse files
committed
fix bugs
1 parent d6a50a4 commit c0d78eb

File tree

14 files changed

+70
-25
lines changed

14 files changed

+70
-25
lines changed

include/pika_consensus.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,7 @@ class ConsensusCoordinator {
275275
void SetCommittedId(const LogOffset& offset) {
276276
std::lock_guard l(committed_id_rwlock_);
277277
committed_id_ = offset;
278+
LOG(INFO) << "update commitid to : " << offset.ToString();
278279
context_->UpdateAppliedIndex(committed_id_);
279280
}
280281

src/net/src/client_thread.cc

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,9 @@ Status ClientThread::Write(const std::string& ip, const int port, const std::str
8282
}
8383
to_send_[ip_port].push_back(msg);
8484
}
85+
//LOG(INFO) << "before client notifywrite, msg size: " << msg.size() << "to_send queue size: " << to_send_[ip_port].size();
8586
NotifyWrite(ip_port);
87+
//LOG(INFO) << "after client notifywrite";
8688
return Status::OK();
8789
}
8890

@@ -341,9 +343,11 @@ void ClientThread::ProcessNotifyEvents(const NetFiredEvent* pfe) {
341343
}
342344
msgs.swap(iter->second);
343345
}
346+
//LOG(INFO) << "to send msgs num: " << msgs.size();
344347
// get msg from to_send_
345348
std::vector<std::string> send_failed_msgs;
346349
for (auto& msg : msgs) {
350+
//LOG(INFO) << "to send msg size: " << msg.size();
347351
if (ipport_conns_[ip_port]->WriteResp(msg)) {
348352
send_failed_msgs.push_back(msg);
349353
}
@@ -356,7 +360,7 @@ void ClientThread::ProcessNotifyEvents(const NetFiredEvent* pfe) {
356360
NotifyWrite(ip_port);
357361
}
358362
} else if (ti.notify_type() == kNotiClose) {
359-
LOG(INFO) << "received kNotiClose";
363+
//LOG(INFO) << "received kNotiClose";
360364
net_multiplexer_->NetDelEvent(fd, 0);
361365
CloseFd(fd, ip_port);
362366
fd_conns_.erase(fd);
@@ -435,6 +439,7 @@ void* ClientThread::ThreadMain() {
435439

436440
if ((should_close == 0) && (pfe->mask & kWritable) && conn->is_reply()) {
437441
WriteStatus write_status = conn->SendReply();
442+
//LOG(INFO) << "net client send request, writeStatus: " << write_status;
438443
conn->set_last_interaction(now);
439444
if (write_status == kWriteAll) {
440445
net_multiplexer_->NetModEvent(pfe->fd, 0, kReadable);

src/net/src/holy_thread.cc

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,22 +117,27 @@ void HolyThread::HandleConnEvent(NetFiredEvent* pfe) {
117117
}
118118
}
119119

120+
//LOG(INFO) << "HandleConnEvent ------";
121+
120122
if (async_) {
121123
if (pfe->mask & kReadable) {
122124
ReadStatus read_status = in_conn->GetRequest();
123125
struct timeval now;
124126
gettimeofday(&now, nullptr);
125127
in_conn->set_last_interaction(now);
126128
if (read_status == kReadAll) {
129+
//LOG(INFO) << "async ,readble event readALL";
127130
// do nothing still watch EPOLLIN
128131
} else if (read_status == kReadHalf) {
132+
//LOG(INFO) << "async ,readble event readHalf";
129133
return;
130134
} else {
131135
// kReadError kReadClose kFullError kParseError kDealError
132136
should_close = 1;
133137
}
134138
}
135139
if ((pfe->mask & kWritable) && in_conn->is_reply()) {
140+
//LOG(INFO) << "async, writable event";
136141
WriteStatus write_status = in_conn->SendReply();
137142
if (write_status == kWriteAll) {
138143
in_conn->set_is_reply(false);
@@ -145,6 +150,7 @@ void HolyThread::HandleConnEvent(NetFiredEvent* pfe) {
145150
}
146151
} else {
147152
if (pfe->mask & kReadable) {
153+
//LOG(INFO) << "sync, readble event";
148154
ReadStatus getRes = in_conn->GetRequest();
149155
struct timeval now;
150156
gettimeofday(&now, nullptr);
@@ -159,6 +165,7 @@ void HolyThread::HandleConnEvent(NetFiredEvent* pfe) {
159165
}
160166
}
161167
if (pfe->mask & kWritable) {
168+
//LOG(INFO) << "sync, writable event";
162169
WriteStatus write_status = in_conn->SendReply();
163170
if (write_status == kWriteAll) {
164171
in_conn->set_is_reply(false);
@@ -290,7 +297,7 @@ void HolyThread::ProcessNotifyEvents(const net::NetFiredEvent* pfe) {
290297
if (ti.notify_type() == net::kNotiWrite) {
291298
net_multiplexer_->NetModEvent(ti.fd(), 0, kReadable | kWritable);
292299
} else if (ti.notify_type() == net::kNotiClose) {
293-
LOG(INFO) << "receive noti close";
300+
//LOG(INFO) << "receive noti close";
294301
std::shared_ptr<net::NetConn> conn = get_conn(fd);
295302
if (!conn) {
296303
continue;

src/net/src/pb_conn.cc

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,10 @@
33
// LICENSE file in the root directory of this source tree. An additional grant
44
// of patent rights can be found in the PATENTS file in the same directory.
55

6+
#include <sys/socket.h>
7+
#include <netinet/tcp.h>
8+
#include <netinet/in.h>
9+
610
#include "net/include/pb_conn.h"
711

812
#include <arpa/inet.h>
@@ -36,9 +40,13 @@ ReadStatus PbConn::GetRequest() {
3640
while (true) {
3741
switch (connStatus_) {
3842
case kHeader: {
43+
int quickack = 1;
3944
ssize_t nread = read(fd(), rbuf_ + cur_pos_, COMMAND_HEADER_LENGTH - cur_pos_);
45+
setsockopt(fd(), IPPROTO_TCP, TCP_QUICKACK, &quickack, sizeof(quickack));
46+
//LOG(INFO) << "kHeader nread size: " << nread;
4047
if (nread == -1) {
4148
if (errno == EAGAIN) {
49+
//LOG(INFO) << "kHeader errno == EAGAIN readhalf";
4250
return kReadHalf;
4351
} else {
4452
return kReadError;
@@ -56,6 +64,7 @@ ReadStatus PbConn::GetRequest() {
5664
connStatus_ = kPacket;
5765
continue;
5866
}
67+
//LOG(INFO) << "kHeader readhalf";
5968
return kReadHalf;
6069
}
6170
}
@@ -75,8 +84,12 @@ ReadStatus PbConn::GetRequest() {
7584
}
7685
// read msg body
7786
ssize_t nread = read(fd(), rbuf_ + cur_pos_, remain_packet_len_);
87+
int quickack = 1;
88+
setsockopt(fd(), IPPROTO_TCP, TCP_QUICKACK, &quickack, sizeof(quickack));
89+
//LOG(INFO) << "kPacket nread size: " << nread;
7890
if (nread == -1) {
7991
if (errno == EAGAIN) {
92+
//LOG(INFO) << "kPacket errno == EAGAIN readhalf";
8093
return kReadHalf;
8194
} else {
8295
return kReadError;
@@ -88,12 +101,15 @@ ReadStatus PbConn::GetRequest() {
88101
cur_pos_ += static_cast<uint32_t>(nread);
89102
remain_packet_len_ -= static_cast<int32_t>(nread);
90103
if (remain_packet_len_ == 0) {
104+
//LOG(INFO) << "connStatus_ turns to kComplete";
91105
connStatus_ = kComplete;
92106
continue;
93107
}
108+
//LOG(INFO) << "kPacket readhalf";
94109
return kReadHalf;
95110
}
96111
case kComplete: {
112+
//LOG(INFO) << "kComplete";
97113
if (DealMessage() != 0) {
98114
return kDealError;
99115
}
@@ -117,12 +133,15 @@ WriteStatus PbConn::SendReply() {
117133
ssize_t nwritten = 0;
118134
size_t item_len;
119135
std::lock_guard l(resp_mu_);
136+
//LOG(INFO) << "queue size to write to fd " << write_buf_.queue_.size();
120137
while (!write_buf_.queue_.empty()) {
121138
std::string item = write_buf_.queue_.front();
122139
item_len = item.size();
140+
//LOG(INFO) << "queue item size: " << item_len;
123141
while (item_len - write_buf_.item_pos_ > 0) {
124142
nwritten = write(fd(), item.data() + write_buf_.item_pos_, item_len - write_buf_.item_pos_);
125143
if (nwritten <= 0) {
144+
LOG(ERROR) << "nwritten less than 0";
126145
break;
127146
}
128147
g_network_statistic->IncrReplOutputBytes(nwritten);
@@ -144,6 +163,7 @@ WriteStatus PbConn::SendReply() {
144163
if (item_len - write_buf_.item_pos_ != 0) {
145164
return kWriteHalf;
146165
}
166+
LOG(ERROR) << "write item success";
147167
}
148168
return kWriteAll;
149169
}

src/net/src/server_thread.cc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,7 @@ void* ServerThread::ThreadMain() {
178178
char ip_addr[INET_ADDRSTRLEN] = "";
179179

180180
while (!should_stop()) {
181+
//LOG(INFO) << "serverthread loop begin";
181182
if (cron_interval_ > 0) {
182183
gettimeofday(&now, nullptr);
183184
if (when.tv_sec > now.tv_sec || (when.tv_sec == now.tv_sec && when.tv_usec > now.tv_usec)) {
@@ -193,7 +194,9 @@ void* ServerThread::ThreadMain() {
193194
}
194195
}
195196

197+
//LOG(INFO) << "before server netpoll";
196198
nfds = net_multiplexer_->NetPoll(timeout);
199+
//LOG(INFO) << "server netpoll timeout: " << timeout << " nfds: " << nfds;
197200
for (int i = 0; i < nfds; i++) {
198201
pfe = (net_multiplexer_->FiredEvents()) + i;
199202
fd = pfe->fd;

src/pika_binlog_transverter.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ bool PikaBinlogTransverter::BinlogDecode(BinlogType type, const std::string& bin
9797
<< " left length:" << binlog_str.size();
9898
return false;
9999
}
100+
//LOG(INFO) << "content: " << binlog_item->content_ << " filnum: " << binlog_item->filenum_ << " offset: " << binlog_item->offset_;
100101
return true;
101102
}
102103

src/pika_command.cc

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -878,14 +878,18 @@ void Cmd::InternalProcessCommand(const HintKeys& hint_keys) {
878878
if(res().ok()){
879879
DoCommand(hint_keys);
880880
}
881+
/*
881882
if (g_pika_conf->slowlog_slower_than() >= 0) {
882883
do_duration_ += pstd::NowMicros() - start_us;
883884
}
885+
*/
884886
}else{
885887
DoCommand(hint_keys);
888+
/*
886889
if (g_pika_conf->slowlog_slower_than() >= 0) {
887890
do_duration_ += pstd::NowMicros() - start_us;
888891
}
892+
*/
889893

890894
DoBinlog();
891895
}
@@ -898,7 +902,7 @@ void Cmd::InternalProcessCommand(const HintKeys& hint_keys) {
898902
}
899903

900904
uint64_t end_us = pstd::NowMicros();
901-
this->binlog_duration_ms = (end_us - before_do_binlog_us) / 1000;
905+
// this->binlog_duration_ms = (end_us - before_do_binlog_us) / 1000;
902906
}
903907

904908
void Cmd::DoCommand(const HintKeys& hint_keys) {
@@ -951,6 +955,7 @@ bool Cmd::DoReadCommandInCache() {
951955

952956

953957
void Cmd::DoBinlog() {
958+
//LOG(INFO) << "cmd::DoBinlog";
954959
if (res().ok() && is_write() && g_pika_conf->write_binlog()) {
955960
std::shared_ptr<net::NetConn> conn_ptr = GetConn();
956961
std::shared_ptr<std::string> resp_ptr = GetResp();
@@ -978,9 +983,11 @@ void Cmd::DoBinlog() {
978983
res().SetRes(CmdRes::kErrOther, s.ToString());
979984
}
980985

986+
//LOG(INFO) << "cmd::DoBinlog done, failed";
981987
return;
982988
}
983989
}
990+
//LOG(INFO) << "cmd::DoBinlog done";
984991
}
985992

986993
#define PIKA_STAGE_DURATION_OUTPUT(duration) \

src/pika_consensus.cc

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,9 @@ Status Context::Init() {
6565
void Context::UpdateAppliedIndex(const LogOffset& offset) {
6666
std::lock_guard l(rwlock_);
6767
LogOffset cur_offset;
68-
applied_win_.Update(SyncWinItem(offset), SyncWinItem(offset), &cur_offset);
68+
//LOG(INFO) << "UpdateAppliedIndex: " << offset.ToString();
69+
//applied_win_.Update(SyncWinItem(offset), SyncWinItem(offset), &cur_offset);
70+
//LOG(INFO) << "UpdateAppliedIndex: " << offset.ToString() << " done";
6971
if (cur_offset > applied_index_) {
7072
applied_index_ = cur_offset;
7173
StableSave();
@@ -381,8 +383,6 @@ Status ConsensusCoordinator::UpdateSlave(const std::string& ip, int port, const
381383
std::lock_guard l(slave_ptr->slave_mu);
382384
slave_ptr->acked_offset = end;
383385
sync_pros_.AddMatchIndex(ip, port, slave_ptr->acked_offset);
384-
LOG(INFO) << "PacificA slave ip: " << ip << ", port :" << port << ",slave acked_offset "
385-
<< slave_ptr->acked_offset.ToString();
386386
if (slave_ptr->slave_state != kSlaveBinlogSync && slave_ptr->acked_offset >= slave_ptr->target_offset) {
387387
slave_ptr->slave_state = kSlaveBinlogSync;
388388
LOG(INFO) << "PacificA change slave_state kSlaveBinlogSync acked_offset: " << slave_ptr->acked_offset.ToString()
@@ -596,8 +596,6 @@ Status ConsensusCoordinator::FindBinlogFileNum(const std::map<uint32_t, std::str
596596

597597
Status ConsensusCoordinator::FindLogicOffsetBySearchingBinlog(const BinlogOffset& hint_offset, uint64_t target_index,
598598
LogOffset* found_offset) {
599-
LOG(INFO) << DBInfo(db_name_).ToString() << "FindLogicOffsetBySearchingBinlog hint offset "
600-
<< hint_offset.ToString() << " target_index " << target_index;
601599
BinlogOffset start_offset;
602600
std::map<uint32_t, std::string> binlogs;
603601
if (!stable_logger_->GetBinlogFiles(&binlogs)) {
@@ -811,6 +809,7 @@ bool ConsensusCoordinator::GetISConsistency() {
811809
}
812810

813811
bool ConsensusCoordinator::checkFinished(const LogOffset& offset) {
812+
std::lock_guard l(committed_id_rwlock_);
814813
if (offset <= committed_id_) {
815814
return true;
816815
}
@@ -948,7 +947,7 @@ Status ConsensusCoordinator::ApplyBinlog(const std::shared_ptr<Cmd>& cmd_ptr) {
948947
} else {
949948
int32_t wait_ms = 250;
950949
while (g_pika_rm->GetUnfinishedAsyncWriteDBTaskCount(db_name_) > 0) {
951-
std::this_thread::sleep_for(std::chrono::milliseconds(wait_ms));
950+
//std::this_thread::sleep_for(std::chrono::milliseconds(wait_ms));
952951
wait_ms *= 2;
953952
wait_ms = wait_ms < 3000 ? wait_ms : 3000;
954953
}

src/pika_repl_server_conn.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,6 @@ void PikaReplServerConn::HandleMetaSyncRequest(void* arg) {
3636
response.set_code(InnerMessage::kError);
3737
response.set_reply("Auth with master error, Invalid masterauth");
3838
} else {
39-
LOG(INFO) << "Receive MetaSync, Slave ip: " << node.ip() << ", Slave port:" << node.port()
40-
<< ", is_consistency: " << is_consistency;
4139
std::vector<DBStruct> db_structs = g_pika_conf->db_structs();
4240
int slave_size = g_pika_server->slave_size();
4341
bool success = g_pika_server->TryAddSlave(node.ip(), node.port(), conn->fd(), g_pika_conf->db_structs());
@@ -425,13 +423,15 @@ void PikaReplServerConn::HandleBinlogSyncRequest(void* arg) {
425423
return;
426424
}
427425
s = g_pika_rm->UpdateSyncBinlogStatus(slave_node, range_start, range_end);
426+
LOG(INFO) << "update master binlog offset: " << range_start.ToString() << " to " << range_end.ToString() << " status: " << s.ToString();
428427
if (!s.ok()) {
429428
LOG(WARNING) << "Update binlog ack failed " << db_name << " " << s.ToString();
430429
conn->NotifyClose();
431430
return;
432431
}
433432

434433
g_pika_server->SignalAuxiliary();
434+
LOG(INFO) << "after signal auxiliary";
435435
}
436436

437437
void PikaReplServerConn::HandleRemoveSlaveNodeRequest(void* arg) {

src/pika_rm.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -491,7 +491,7 @@ Status SyncMasterDB::ConsensusProposeLog(const std::shared_ptr<Cmd>& cmd_ptr) {
491491
if (checkFinished(offset)) {
492492
return Status::OK();
493493
}
494-
std::this_thread::sleep_for(std::chrono::milliseconds(50));
494+
//std::this_thread::sleep_for(std::chrono::milliseconds(50));
495495
}
496496

497497
return Status::Timeout("No consistency achieved within 10 seconds");

0 commit comments

Comments
 (0)