From 2c2c720b072ea875261284d132d752e6b5794359 Mon Sep 17 00:00:00 2001 From: chenbt Date: Tue, 25 Nov 2025 16:59:57 +0800 Subject: [PATCH 1/3] Save submodule versions at tag v3.5.6 --- .gitmodules | 15 +++++++++++++++ tools/pika_migrate/third/blackwidow | 1 + tools/pika_migrate/third/glog | 1 + tools/pika_migrate/third/pink | 1 + tools/pika_migrate/third/rocksdb | 1 + tools/pika_migrate/third/slash | 1 + 6 files changed, 20 insertions(+) create mode 100644 .gitmodules create mode 160000 tools/pika_migrate/third/blackwidow create mode 160000 tools/pika_migrate/third/glog create mode 160000 tools/pika_migrate/third/pink create mode 160000 tools/pika_migrate/third/rocksdb create mode 160000 tools/pika_migrate/third/slash diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000000..464c3eb908 --- /dev/null +++ b/.gitmodules @@ -0,0 +1,15 @@ +[submodule "tools/pika_migrate/third/blackwidow"] + path = tools/pika_migrate/third/blackwidow + url = https://github.com/Qihoo360/blackwidow.git +[submodule "tools/pika_migrate/third/slash"] + path = tools/pika_migrate/third/slash + url = https://github.com/Qihoo360/slash.git +[submodule "tools/pika_migrate/third/pink"] + path = tools/pika_migrate/third/pink + url = https://github.com/Qihoo360/pink.git +[submodule "tools/pika_migrate/third/glog"] + path = tools/pika_migrate/third/glog + url = https://github.com/Qihoo360/glog.git +[submodule "tools/pika_migrate/third/rocksdb"] + path = tools/pika_migrate/third/rocksdb + url = https://github.com/facebook/rocksdb.git diff --git a/tools/pika_migrate/third/blackwidow b/tools/pika_migrate/third/blackwidow new file mode 160000 index 0000000000..7726e79de1 --- /dev/null +++ b/tools/pika_migrate/third/blackwidow @@ -0,0 +1 @@ +Subproject commit 7726e79de1bcdb4275a8444d77407e5d2999d834 diff --git a/tools/pika_migrate/third/glog b/tools/pika_migrate/third/glog new file mode 160000 index 0000000000..ecdbd7cda6 --- /dev/null +++ b/tools/pika_migrate/third/glog @@ -0,0 +1 @@ +Subproject commit ecdbd7cda69e1ff304ac02f7f277715a162e1474 diff --git a/tools/pika_migrate/third/pink b/tools/pika_migrate/third/pink new file mode 160000 index 0000000000..8c4e09c23f --- /dev/null +++ b/tools/pika_migrate/third/pink @@ -0,0 +1 @@ +Subproject commit 8c4e09c23f2e820a1a9341fff14eee5e102ab2bd diff --git a/tools/pika_migrate/third/rocksdb b/tools/pika_migrate/third/rocksdb new file mode 160000 index 0000000000..004237e627 --- /dev/null +++ b/tools/pika_migrate/third/rocksdb @@ -0,0 +1 @@ +Subproject commit 004237e62790320d8e630456cbeb6f4a1f3579c2 diff --git a/tools/pika_migrate/third/slash b/tools/pika_migrate/third/slash new file mode 160000 index 0000000000..c1315c0d5e --- /dev/null +++ b/tools/pika_migrate/third/slash @@ -0,0 +1 @@ +Subproject commit c1315c0d5ed2a60b21cc1db4b9f29ee833e7d2a3 From 66bd23403f66891d0b47423cf3912d10f822fde2 Mon Sep 17 00:00:00 2001 From: chenbt Date: Fri, 28 Nov 2025 14:35:26 +0800 Subject: [PATCH 2/3] =?UTF-8?q?fix:=20=E5=90=8C=E6=AD=A5=E6=97=B6=E5=AE=B9?= =?UTF-8?q?=E6=98=93broken=20pipe=E5=AF=BC=E8=87=B4=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E4=B8=A2=E5=A4=B1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tools/pika_migrate/src/redis_sender.cc | 24 ++++++++++++++++++------ 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/tools/pika_migrate/src/redis_sender.cc b/tools/pika_migrate/src/redis_sender.cc index 74c41eabbd..ff2abf40f1 100644 --- a/tools/pika_migrate/src/redis_sender.cc +++ b/tools/pika_migrate/src/redis_sender.cc @@ -14,6 +14,9 @@ #include "slash/include/xdebug.h" static time_t kCheckDiff = 1; +// 增加空闲 keepalive 周期(秒),应小于服务端 keepalive_timeout(如 60s),默认 30s +static time_t kKeepaliveInterval = 30; +static std::string kpingCmd = "*1\r\n$4\r\nPING\r\n"; RedisSender::RedisSender(int id, std::string ip, int64_t port, std::string password): id_(id), @@ -121,14 +124,14 @@ void RedisSender::Stop() { void RedisSender::SendRedisCommand(const std::string &command) { commands_mutex_.Lock(); - if (commands_queue_.size() < 100000) { + if (commands_queue_.size() < 100) { commands_queue_.push(command); rsignal_.Signal(); commands_mutex_.Unlock(); return; } - while (commands_queue_.size() > 100000) { + while (commands_queue_.size() > 100) { wsignal_.Wait(); } commands_queue_.push(command); @@ -167,7 +170,6 @@ int RedisSender::SendCommand(std::string &command) { void *RedisSender::ThreadMain() { LOG(INFO) << "Start redis sender " << id_ << " thread..."; - // sleep(15); int ret = 0; ConnectRedis(); @@ -176,9 +178,20 @@ void *RedisSender::ThreadMain() { commands_mutex_.Lock(); while (commands_queue_.size() == 0 && !should_exit_) { rsignal_.TimedWait(100); - // rsignal_.Wait(); + time_t whileNow = ::time(NULL); + // 如果队列仍为空,定期保活(PING) + if (commands_queue_.size() == 0 && cli_ != nullptr) { + if (whileNow - last_write_time_ >= kKeepaliveInterval) { + int r = SendCommand(kpingCmd); + if (r == 0) { + LOG(INFO) << "RedisSender " << id_ << " keepalive PING sent to " << ip_ << ":" << port_; + last_write_time_ = ::time(NULL); + } else { + LOG(WARNING) << "RedisSender " << id_ << " keepalive PING failed, will try reconnect"; + } + } + } } - // if (commands_queue_.size() == 0 && should_exit_) { if (should_exit_) { commands_mutex_.Unlock(); break; @@ -194,7 +207,6 @@ void *RedisSender::ThreadMain() { std::string command; commands_mutex_.Lock(); command = commands_queue_.front(); - // printf("%d, command %s\n", id_, command.c_str()); elements_++; commands_queue_.pop(); wsignal_.Signal(); From ea822f94cb88399767ca0e5851cc41e75804a245 Mon Sep 17 00:00:00 2001 From: chenbt Date: Fri, 28 Nov 2025 14:54:43 +0800 Subject: [PATCH 3/3] =?UTF-8?q?fix:=20=E6=B5=8B=E8=AF=95commands=5Fqueue?= =?UTF-8?q?=5F.size=E5=9B=9E=E6=BB=9A?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tools/pika_migrate/src/redis_sender.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tools/pika_migrate/src/redis_sender.cc b/tools/pika_migrate/src/redis_sender.cc index ff2abf40f1..899140efe4 100644 --- a/tools/pika_migrate/src/redis_sender.cc +++ b/tools/pika_migrate/src/redis_sender.cc @@ -124,14 +124,14 @@ void RedisSender::Stop() { void RedisSender::SendRedisCommand(const std::string &command) { commands_mutex_.Lock(); - if (commands_queue_.size() < 100) { + if (commands_queue_.size() < 100000) { commands_queue_.push(command); rsignal_.Signal(); commands_mutex_.Unlock(); return; } - while (commands_queue_.size() > 100) { + while (commands_queue_.size() > 100000) { wsignal_.Wait(); } commands_queue_.push(command);