diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000000..464c3eb908 --- /dev/null +++ b/.gitmodules @@ -0,0 +1,15 @@ +[submodule "tools/pika_migrate/third/blackwidow"] + path = tools/pika_migrate/third/blackwidow + url = https://github.com/Qihoo360/blackwidow.git +[submodule "tools/pika_migrate/third/slash"] + path = tools/pika_migrate/third/slash + url = https://github.com/Qihoo360/slash.git +[submodule "tools/pika_migrate/third/pink"] + path = tools/pika_migrate/third/pink + url = https://github.com/Qihoo360/pink.git +[submodule "tools/pika_migrate/third/glog"] + path = tools/pika_migrate/third/glog + url = https://github.com/Qihoo360/glog.git +[submodule "tools/pika_migrate/third/rocksdb"] + path = tools/pika_migrate/third/rocksdb + url = https://github.com/facebook/rocksdb.git diff --git a/tools/pika_migrate/src/redis_sender.cc b/tools/pika_migrate/src/redis_sender.cc index 29ff66233d..2408d9758f 100644 --- a/tools/pika_migrate/src/redis_sender.cc +++ b/tools/pika_migrate/src/redis_sender.cc @@ -12,6 +12,9 @@ #include static time_t kCheckDiff = 1; +// 增加空闲 keepalive 周期(秒),应小于服务端 keepalive_timeout(如 60s),默认 30s +static time_t kKeepaliveInterval = 30; +static std::string kpingCmd = "*1\r\n$4\r\nPING\r\n"; RedisSender::RedisSender(int id, std::string ip, int64_t port, std::string user, std::string password): id_(id), @@ -149,17 +152,28 @@ int RedisSender::SendCommand(std::string &command) { void *RedisSender::ThreadMain() { LOG(INFO) << "Start redis sender " << id_ << " thread..."; - // sleep(15); int ret = 0; ConnectRedis(); while (!should_exit_) { - std::unique_lock lock(signal_mutex_); - while (commandQueueSize() == 0 && !should_exit_) { - rsignal_.wait_for(lock, std::chrono::milliseconds(100)); + commands_mutex_.Lock(); + while (commands_queue_.size() == 0 && !should_exit_) { + rsignal_.TimedWait(100); + time_t whileNow = ::time(NULL); + // 如果队列仍为空,定期保活(PING) + if (commands_queue_.size() == 0 && cli_ != nullptr) { + if (whileNow - last_write_time_ >= kKeepaliveInterval) { + int r = SendCommand(kpingCmd); + if (r == 0) { + LOG(INFO) << "RedisSender " << id_ << " keepalive PING sent to " << ip_ << ":" << port_; + last_write_time_ = ::time(NULL); + } else { + LOG(WARNING) << "RedisSender " << id_ << " keepalive PING failed, will try reconnect"; + } + } + } } - if (should_exit_) { break; } @@ -170,11 +184,15 @@ void *RedisSender::ThreadMain() { // get redis command std::string command; - { - std::lock_guard l(command_queue_mutex_); - command = commands_queue_.front(); - elements_++; - commands_queue_.pop(); + commands_mutex_.Lock(); + command = commands_queue_.front(); + elements_++; + commands_queue_.pop(); + wsignal_.Signal(); + commands_mutex_.Unlock(); + ret = SendCommand(command); + if (ret == 0) { + cnt_++; } wsignal_.notify_one(); diff --git a/tools/pika_migrate/third/blackwidow b/tools/pika_migrate/third/blackwidow new file mode 160000 index 0000000000..7726e79de1 --- /dev/null +++ b/tools/pika_migrate/third/blackwidow @@ -0,0 +1 @@ +Subproject commit 7726e79de1bcdb4275a8444d77407e5d2999d834 diff --git a/tools/pika_migrate/third/glog b/tools/pika_migrate/third/glog new file mode 160000 index 0000000000..ecdbd7cda6 --- /dev/null +++ b/tools/pika_migrate/third/glog @@ -0,0 +1 @@ +Subproject commit ecdbd7cda69e1ff304ac02f7f277715a162e1474 diff --git a/tools/pika_migrate/third/pink b/tools/pika_migrate/third/pink new file mode 160000 index 0000000000..8c4e09c23f --- /dev/null +++ b/tools/pika_migrate/third/pink @@ -0,0 +1 @@ +Subproject commit 8c4e09c23f2e820a1a9341fff14eee5e102ab2bd diff --git a/tools/pika_migrate/third/rocksdb b/tools/pika_migrate/third/rocksdb new file mode 160000 index 0000000000..004237e627 --- /dev/null +++ b/tools/pika_migrate/third/rocksdb @@ -0,0 +1 @@ +Subproject commit 004237e62790320d8e630456cbeb6f4a1f3579c2 diff --git a/tools/pika_migrate/third/slash b/tools/pika_migrate/third/slash new file mode 160000 index 0000000000..c1315c0d5e --- /dev/null +++ b/tools/pika_migrate/third/slash @@ -0,0 +1 @@ +Subproject commit c1315c0d5ed2a60b21cc1db4b9f29ee833e7d2a3