Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[submodule "tools/pika_migrate/third/blackwidow"]
path = tools/pika_migrate/third/blackwidow
url = https://github.com/Qihoo360/blackwidow.git
[submodule "tools/pika_migrate/third/slash"]
path = tools/pika_migrate/third/slash
url = https://github.com/Qihoo360/slash.git
[submodule "tools/pika_migrate/third/pink"]
path = tools/pika_migrate/third/pink
url = https://github.com/Qihoo360/pink.git
[submodule "tools/pika_migrate/third/glog"]
path = tools/pika_migrate/third/glog
url = https://github.com/Qihoo360/glog.git
[submodule "tools/pika_migrate/third/rocksdb"]
path = tools/pika_migrate/third/rocksdb
url = https://github.com/facebook/rocksdb.git
38 changes: 28 additions & 10 deletions tools/pika_migrate/src/redis_sender.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
#include <glog/logging.h>

static time_t kCheckDiff = 1;
// 增加空闲 keepalive 周期(秒),应小于服务端 keepalive_timeout(如 60s),默认 30s
static time_t kKeepaliveInterval = 30;
static std::string kpingCmd = "*1\r\n$4\r\nPING\r\n";

RedisSender::RedisSender(int id, std::string ip, int64_t port, std::string user, std::string password):
id_(id),
Expand Down Expand Up @@ -149,17 +152,28 @@ int RedisSender::SendCommand(std::string &command) {

void *RedisSender::ThreadMain() {
LOG(INFO) << "Start redis sender " << id_ << " thread...";
// sleep(15);
int ret = 0;

ConnectRedis();

while (!should_exit_) {
std::unique_lock lock(signal_mutex_);
while (commandQueueSize() == 0 && !should_exit_) {
rsignal_.wait_for(lock, std::chrono::milliseconds(100));
commands_mutex_.Lock();
while (commands_queue_.size() == 0 && !should_exit_) {
rsignal_.TimedWait(100);
time_t whileNow = ::time(NULL);
// 如果队列仍为空,定期保活(PING)
if (commands_queue_.size() == 0 && cli_ != nullptr) {
if (whileNow - last_write_time_ >= kKeepaliveInterval) {
int r = SendCommand(kpingCmd);
if (r == 0) {
LOG(INFO) << "RedisSender " << id_ << " keepalive PING sent to " << ip_ << ":" << port_;
last_write_time_ = ::time(NULL);
} else {
LOG(WARNING) << "RedisSender " << id_ << " keepalive PING failed, will try reconnect";
}
}
}
}

if (should_exit_) {
break;
}
Expand All @@ -170,11 +184,15 @@ void *RedisSender::ThreadMain() {

// get redis command
std::string command;
{
std::lock_guard l(command_queue_mutex_);
command = commands_queue_.front();
elements_++;
commands_queue_.pop();
commands_mutex_.Lock();
command = commands_queue_.front();
elements_++;
commands_queue_.pop();
wsignal_.Signal();
commands_mutex_.Unlock();
ret = SendCommand(command);
if (ret == 0) {
cnt_++;
}

wsignal_.notify_one();
Expand Down
1 change: 1 addition & 0 deletions tools/pika_migrate/third/blackwidow
Submodule blackwidow added at 7726e7
1 change: 1 addition & 0 deletions tools/pika_migrate/third/glog
Submodule glog added at ecdbd7
1 change: 1 addition & 0 deletions tools/pika_migrate/third/pink
Submodule pink added at 8c4e09
1 change: 1 addition & 0 deletions tools/pika_migrate/third/rocksdb
Submodule rocksdb added at 004237
1 change: 1 addition & 0 deletions tools/pika_migrate/third/slash
Submodule slash added at c1315c