diff --git a/conf/pika.conf b/conf/pika.conf index 5317fcf45..7d512b08a 100644 --- a/conf/pika.conf +++ b/conf/pika.conf @@ -33,6 +33,19 @@ thread-pool-size : 12 # When set to no, they are not separated. slow-cmd-pool : no +# Enable thread pool borrowing mechanism, which allows threads to help each other when one pool is busy +# and another is idle. This can improve resource utilization and reduce command latency under uneven load. +# [yes | no] - The default value is yes. +threadpool-borrow-enable : yes + +# The queue length threshold (as a percentage of max queue size) at which a thread pool is considered busy +# and may borrow threads from another pool. The value range is [1-99], default is 80. +threadpool-borrow-threshold-percent : 80 + +# The queue length threshold (as a percentage of max queue size) at which a thread pool is considered idle +# and may lend threads to another pool. The value range is [1-99], default is 20. +threadpool-idle-threshold-percent : 20 + # Size of the low level thread pool, The threads within this pool # are dedicated to handling slow user requests. slow-cmd-thread-pool-size : 1 diff --git a/include/pika_admin.h b/include/pika_admin.h index de0ddd5a0..e7b39e2d7 100644 --- a/include/pika_admin.h +++ b/include/pika_admin.h @@ -295,6 +295,7 @@ class InfoCmd : public Cmd { const static std::string kDebugSection; const static std::string kCommandStatsSection; const static std::string kCacheSection; + const static std::string kThreadpoolSection; void DoInitial() override; void Clear() override { diff --git a/include/pika_conf.h b/include/pika_conf.h index 80d5abe8f..b13b15fab 100644 --- a/include/pika_conf.h +++ b/include/pika_conf.h @@ -168,6 +168,18 @@ class PikaConf : public pstd::BaseConf { std::shared_lock l(rwlock_); return slow_cmd_pool_; } + bool threadpool_borrow_enable() { + std::shared_lock l(rwlock_); + return threadpool_borrow_enable_; + } + int threadpool_borrow_threshold_percent() { + std::shared_lock l(rwlock_); + return threadpool_borrow_threshold_percent_; + } + int threadpool_idle_threshold_percent() { + std::shared_lock l(rwlock_); + return threadpool_idle_threshold_percent_; + } std::string server_id() { std::shared_lock l(rwlock_); return server_id_; @@ -942,6 +954,11 @@ class PikaConf : public pstd::BaseConf { std::string bgsave_prefix_; std::string pidfile_; std::atomic slow_cmd_pool_; + + // Thread pool task borrowing configuration + bool threadpool_borrow_enable_ = true; + int threadpool_borrow_threshold_percent_ = 80; + int threadpool_idle_threshold_percent_ = 20; std::string compression_; std::string compression_per_level_; diff --git a/include/pika_server.h b/include/pika_server.h index 81cda87b0..971ffc8b7 100644 --- a/include/pika_server.h +++ b/include/pika_server.h @@ -15,6 +15,9 @@ #endif #include #include +#include +#include +#include #include "src/cache/include/config.h" #include "net/include/bg_thread.h" @@ -78,6 +81,108 @@ struct TaskArg { void DoBgslotscleanup(void* arg); void DoBgslotsreload(void* arg); +// 命令分类器:基于命令执行时间自动分类快慢命令 +class CommandClassifier { +public: + CommandClassifier(uint64_t slow_threshold_us = 50000, uint64_t fast_threshold_us = 10000, + size_t window_size = 1000); + + // 记录命令执行时间并动态调整分类 + void RecordExecutionTime(const std::string& cmd_name, uint64_t duration_us); + + // 检查命令是否为慢命令 + bool IsSlowCommand(const std::string& cmd_name) const; + + // 将命令标记为慢命令 + void MarkAsSlowCommand(const std::string& cmd_name); + + // 将命令标记为快命令 + void MarkAsFastCommand(const std::string& cmd_name); + + // 获取所有命令的平均执行时间 + std::unordered_map GetCommandAvgTimes() const; + +private: + struct CommandStats { + uint64_t total_time = 0; + uint64_t count = 0; + std::vector recent_times; // 最近N次执行时间 + size_t time_index = 0; // 环形缓冲区索引 + bool initialized = false; // 是否已经收集了足够的样本 + + void AddTime(uint64_t time, size_t window_size); + double GetAverageTime() const; + double GetRecentAverageTime() const; + }; + + mutable std::mutex mutex_; + std::unordered_map cmd_stats_; + std::unordered_set slow_commands_; + uint64_t slow_threshold_us_; // 慢命令阈值(微秒) + uint64_t fast_threshold_us_; // 快命令阈值(微秒) + size_t window_size_; // 滑动窗口大小 +}; + +// 线程池监控指标 +struct ThreadPoolMetrics { + std::atomic tasks_scheduled{0}; + std::atomic tasks_completed{0}; + std::atomic queue_overflows{0}; + std::atomic borrow_attempts{0}; + std::atomic successful_borrows{0}; + + // 延迟分布统计(1ms, 5ms, 10ms, 50ms, 100ms, 500ms, 1s, 5s, >5s) + std::array, 9> latency_buckets{}; + + void RecordLatency(uint64_t latency_us); + std::string ExportMetrics(const std::string& pool_name) const; + void Reset(); +}; + +// 基于令牌桶的流量控制 +class RateLimiter { +public: + explicit RateLimiter(double rate_per_sec, double burst_size = 100.0); + + // 尝试获取令牌,成功返回true + bool TryAcquire(double tokens = 1.0); + + // 调整速率 + void SetRate(double rate_per_sec); + +private: + void Refill(); + + double rate_; // 每秒允许的令牌数 + double max_tokens_; // 令牌桶容量 + double tokens_; // 当前令牌数 + std::chrono::steady_clock::time_point last_update_; // 上次更新时间 + std::mutex mutex_; +}; + +// 一致性哈希实现,用于键亲和性 +class ConsistentHash { +public: + explicit ConsistentHash(int num_replicas = 100, int num_buckets = 1024); + + // 添加节点 + void AddNode(const std::string& node); + + // 移除节点 + void RemoveNode(const std::string& node); + + // 获取键对应的节点 + std::string GetNode(const std::string& key) const; + + // 计算键的哈希值 + size_t HashKey(const std::string& key) const; + +private: + int num_replicas_; + std::map ring_; + std::hash hasher_; +}; + class PikaServer : public pstd::noncopyable { public: PikaServer(); @@ -194,6 +299,13 @@ class PikaServer : public pstd::noncopyable { size_t ClientProcessorThreadPoolMaxQueueSize(); size_t SlowCmdThreadPoolCurQueueSize(); size_t SlowCmdThreadPoolMaxQueueSize(); + + /* + * Thread pool dynamic resize + */ + bool ResizeFastCmdThreadPool(size_t new_size); + bool ResizeSlowCmdThreadPool(size_t new_size); + void GetThreadPoolInfo(std::string* info); /* * BGSave used @@ -495,6 +607,30 @@ class PikaServer : public pstd::noncopyable { void ProcessCronTask(); double HitRatio(); void SetLogNetActivities(bool value); + + /* + * 改进的快慢命令分离相关方法 + */ + // 动态调整借用阈值 + void AdjustBorrowThresholds(); + + // 自适应命令分类 + bool IsDynamicSlowCommand(const std::string& cmd_name) const; + void RecordCommandExecutionTime(const std::string& cmd_name, uint64_t duration_us); + + // 设置流控 + void SetCommandRateLimit(double rate_per_sec); + bool CheckCommandRateLimit(); + + // 获取增强的监控指标 + std::string GetEnhancedThreadPoolMetrics() const; + + // 使用一致性哈希判断键亲和性 + bool IsKeyAffinityToSlow(const std::string& key) const; + + // 重置监控指标 + void ResetThreadPoolMetrics(); + /* * disable compact */ @@ -649,6 +785,15 @@ class PikaServer : public pstd::noncopyable { * lastsave used */ int64_t lastsave_ = 0; + + /* + * 改进的快慢命令分离相关成员 + */ + std::unique_ptr cmd_classifier_; + std::unique_ptr fast_pool_metrics_; + std::unique_ptr slow_pool_metrics_; + std::unique_ptr cmd_rate_limiter_; + std::unique_ptr key_hash_; /* * acl diff --git a/src/pika_admin.cc b/src/pika_admin.cc index 81d47bc43..3201a5b3a 100644 --- a/src/pika_admin.cc +++ b/src/pika_admin.cc @@ -884,6 +884,7 @@ const std::string InfoCmd::kRocksDBSection = "rocksdb"; const std::string InfoCmd::kDebugSection = "debug"; const std::string InfoCmd::kCommandStatsSection = "commandstats"; const std::string InfoCmd::kCacheSection = "cache"; +const std::string InfoCmd::kThreadpoolSection = "threadpool"; const std::string ClientCmd::KILLTYPE_NORMAL = "normal"; @@ -969,6 +970,8 @@ void InfoCmd::DoInitial() { info_section_ = kInfoCommandStats; } else if (strcasecmp(argv_[1].data(), kCacheSection.data()) == 0) { info_section_ = kInfoCache; + } else if (strcasecmp(argv_[1].data(), kThreadpoolSection.data()) == 0) { + info_section_ = kInfoThreadpool; } else { info_section_ = kInfoErr; } @@ -1010,6 +1013,8 @@ void InfoCmd::Do() { info.append("\r\n"); InfoCache(info, db_); info.append("\r\n"); + g_pika_server->GetThreadPoolInfo(&info); + info.append("\r\n"); InfoCPU(info); info.append("\r\n"); InfoReplication(info); @@ -1054,6 +1059,9 @@ void InfoCmd::Do() { case kInfoCache: InfoCache(info, db_); break; + case kInfoThreadpool: + g_pika_server->GetThreadPoolInfo(&info); + break; default: // kInfoErr is nothing break; diff --git a/src/pika_conf.cc b/src/pika_conf.cc index 80116aa84..f336e3e1f 100644 --- a/src/pika_conf.cc +++ b/src/pika_conf.cc @@ -76,6 +76,20 @@ int PikaConf::Load() { GetConfStr("slow-cmd-pool", &slowcmdpool); slow_cmd_pool_.store(slowcmdpool == "yes" ? true : false); + std::string threadpool_borrow; + GetConfStr("threadpool-borrow-enable", &threadpool_borrow); + threadpool_borrow_enable_ = threadpool_borrow == "yes" ? true : false; + + GetConfInt("threadpool-borrow-threshold-percent", &threadpool_borrow_threshold_percent_); + if (threadpool_borrow_threshold_percent_ <= 0 || threadpool_borrow_threshold_percent_ >= 100) { + threadpool_borrow_threshold_percent_ = 80; + } + + GetConfInt("threadpool-idle-threshold-percent", &threadpool_idle_threshold_percent_); + if (threadpool_idle_threshold_percent_ <= 0 || threadpool_idle_threshold_percent_ >= 100) { + threadpool_idle_threshold_percent_ = 20; + } + int binlog_writer_num = 1; GetConfInt("binlog-writer-num", &binlog_writer_num); if (binlog_writer_num <= 0 || binlog_writer_num > 24) { diff --git a/src/pika_server.cc b/src/pika_server.cc index bbf444191..ccb2e3c4c 100644 --- a/src/pika_server.cc +++ b/src/pika_server.cc @@ -33,6 +33,248 @@ extern std::unique_ptr g_network_statistic; // QUEUE_SIZE_THRESHOLD_PERCENTAGE is used to represent a percentage value and should be within the range of 0 to 100. const size_t QUEUE_SIZE_THRESHOLD_PERCENTAGE = 75; +// CommandClassifier 实现 +void CommandClassifier::CommandStats::AddTime(uint64_t time, size_t window_size) { + total_time += time; + count++; + + if (recent_times.size() < window_size) { + recent_times.push_back(time); + } else { + if (!initialized) { + initialized = true; + } + // 替换最旧的记录 + total_time -= recent_times[time_index]; + recent_times[time_index] = time; + time_index = (time_index + 1) % window_size; + } +} + +double CommandClassifier::CommandStats::GetAverageTime() const { + return count > 0 ? static_cast(total_time) / count : 0.0; +} + +double CommandClassifier::CommandStats::GetRecentAverageTime() const { + if (!initialized && recent_times.empty()) { + return 0.0; + } + + size_t sample_count = initialized ? recent_times.size() : time_index; + if (sample_count == 0) { + return 0.0; + } + + uint64_t recent_total = 0; + for (size_t i = 0; i < sample_count; i++) { + recent_total += recent_times[i]; + } + + return static_cast(recent_total) / sample_count; +} + +CommandClassifier::CommandClassifier(uint64_t slow_threshold_us, uint64_t fast_threshold_us, size_t window_size) + : slow_threshold_us_(slow_threshold_us), + fast_threshold_us_(fast_threshold_us), + window_size_(window_size) {} + +void CommandClassifier::RecordExecutionTime(const std::string& cmd_name, uint64_t duration_us) { + std::lock_guard lock(mutex_); + + // 更新命令执行时间统计 + auto& stats = cmd_stats_[cmd_name]; + stats.AddTime(duration_us, window_size_); + + // 根据最近平均执行时间分类命令 + if (stats.initialized || stats.recent_times.size() >= window_size_ / 2) { + double recent_avg = stats.GetRecentAverageTime(); + + if (recent_avg > slow_threshold_us_) { + // 如果平均时间超过慢命令阈值,标记为慢命令 + if (slow_commands_.find(cmd_name) == slow_commands_.end()) { + LOG(INFO) << "Command '" << cmd_name << "' reclassified as SLOW (avg time: " + << recent_avg / 1000.0 << "ms)"; + slow_commands_.insert(cmd_name); + } + } else if (recent_avg < fast_threshold_us_ && stats.initialized) { + // 如果平均时间低于快命令阈值且有足够样本,可能标记为快命令 + if (slow_commands_.find(cmd_name) != slow_commands_.end()) { + LOG(INFO) << "Command '" << cmd_name << "' reclassified as FAST (avg time: " + << recent_avg / 1000.0 << "ms)"; + slow_commands_.erase(cmd_name); + } + } + } +} + +bool CommandClassifier::IsSlowCommand(const std::string& cmd_name) const { + std::lock_guard lock(mutex_); + return slow_commands_.find(cmd_name) != slow_commands_.end(); +} + +void CommandClassifier::MarkAsSlowCommand(const std::string& cmd_name) { + std::lock_guard lock(mutex_); + slow_commands_.insert(cmd_name); +} + +void CommandClassifier::MarkAsFastCommand(const std::string& cmd_name) { + std::lock_guard lock(mutex_); + slow_commands_.erase(cmd_name); +} + +std::unordered_map CommandClassifier::GetCommandAvgTimes() const { + std::lock_guard lock(mutex_); + std::unordered_map result; + + for (const auto& [cmd_name, stats] : cmd_stats_) { + result[cmd_name] = stats.GetAverageTime(); + } + + return result; +} + +// ThreadPoolMetrics 实现 +void ThreadPoolMetrics::RecordLatency(uint64_t latency_us) { + if (latency_us < 1000) { + latency_buckets[0]++; + } else if (latency_us < 5000) { + latency_buckets[1]++; + } else if (latency_us < 10000) { + latency_buckets[2]++; + } else if (latency_us < 50000) { + latency_buckets[3]++; + } else if (latency_us < 100000) { + latency_buckets[4]++; + } else if (latency_us < 500000) { + latency_buckets[5]++; + } else if (latency_us < 1000000) { + latency_buckets[6]++; + } else if (latency_us < 5000000) { + latency_buckets[7]++; + } else { + latency_buckets[8]++; + } +} + +std::string ThreadPoolMetrics::ExportMetrics(const std::string& pool_name) const { + std::stringstream ss; + + ss << "# TYPE pika_threadpool_tasks_scheduled counter\n" + << "pika_threadpool_tasks_scheduled{pool=\"" << pool_name << "\"} " << tasks_scheduled.load() << "\n" + << "# TYPE pika_threadpool_tasks_completed counter\n" + << "pika_threadpool_tasks_completed{pool=\"" << pool_name << "\"} " << tasks_completed.load() << "\n" + << "# TYPE pika_threadpool_queue_overflows counter\n" + << "pika_threadpool_queue_overflows{pool=\"" << pool_name << "\"} " << queue_overflows.load() << "\n" + << "# TYPE pika_threadpool_borrow_attempts counter\n" + << "pika_threadpool_borrow_attempts{pool=\"" << pool_name << "\"} " << borrow_attempts.load() << "\n" + << "# TYPE pika_threadpool_successful_borrows counter\n" + << "pika_threadpool_successful_borrows{pool=\"" << pool_name << "\"} " << successful_borrows.load() << "\n"; + + // 延迟分布 + ss << "# TYPE pika_threadpool_latency_buckets counter\n"; + const char* bucket_labels[] = { + "0_1ms", "1_5ms", "5_10ms", "10_50ms", "50_100ms", + "100_500ms", "500_1000ms", "1_5s", "over_5s" + }; + + for (size_t i = 0; i < latency_buckets.size(); i++) { + ss << "pika_threadpool_latency_bucket{pool=\"" << pool_name + << "\",bucket=\"" << bucket_labels[i] << "\"} " + << latency_buckets[i].load() << "\n"; + } + + return ss.str(); +} + +void ThreadPoolMetrics::Reset() { + tasks_scheduled.store(0); + tasks_completed.store(0); + queue_overflows.store(0); + borrow_attempts.store(0); + successful_borrows.store(0); + + for (auto& bucket : latency_buckets) { + bucket.store(0); + } +} + +// RateLimiter 实现 +RateLimiter::RateLimiter(double rate_per_sec, double burst_size) + : rate_(rate_per_sec), + max_tokens_(burst_size), + tokens_(burst_size), + last_update_(std::chrono::steady_clock::now()) {} + +void RateLimiter::Refill() { + auto now = std::chrono::steady_clock::now(); + double elapsed = std::chrono::duration(now - last_update_).count(); + double new_tokens = elapsed * rate_; + tokens_ = std::min(tokens_ + new_tokens, max_tokens_); + last_update_ = now; +} + +bool RateLimiter::TryAcquire(double tokens) { + std::lock_guard lock(mutex_); + Refill(); + + if (tokens_ >= tokens) { + tokens_ -= tokens; + return true; + } + return false; +} + +void RateLimiter::SetRate(double rate_per_sec) { + std::lock_guard lock(mutex_); + Refill(); // 先根据旧速率更新令牌 + rate_ = rate_per_sec; +} + +// ConsistentHash 实现 +ConsistentHash::ConsistentHash(int num_replicas, int num_buckets) + : num_replicas_(num_replicas) { + // 添加"fast"和"slow"两个节点 + AddNode("fast"); + AddNode("slow"); +} + +void ConsistentHash::AddNode(const std::string& node) { + for (int i = 0; i < num_replicas_; i++) { + std::string key = node + ":" + std::to_string(i); + size_t hash = hasher_(key); + ring_[hash] = node; + } +} + +void ConsistentHash::RemoveNode(const std::string& node) { + for (int i = 0; i < num_replicas_; i++) { + std::string key = node + ":" + std::to_string(i); + size_t hash = hasher_(key); + ring_.erase(hash); + } +} + +std::string ConsistentHash::GetNode(const std::string& key) const { + if (ring_.empty()) { + return "fast"; // 默认返回fast + } + + size_t hash = HashKey(key); + + // 找到第一个大于等于hash的节点 + auto it = ring_.lower_bound(hash); + if (it == ring_.end()) { + // 如果没有找到,则环绕到第一个节点 + return ring_.begin()->second; + } else { + return it->second; + } +} + +size_t ConsistentHash::HashKey(const std::string& key) const { + return hasher_(key); +} + void DoPurgeDir(void* arg) { std::unique_ptr path(static_cast(arg)); LOG(INFO) << "Delete dir: " << *path << " start"; @@ -47,7 +289,12 @@ PikaServer::PikaServer() last_check_compact_time_({0, 0}), last_check_resume_time_({0, 0}), repl_state_(PIKA_REPL_NO_CONNECT), - role_(PIKA_ROLE_SINGLE) { + role_(PIKA_ROLE_SINGLE), + cmd_classifier_(new CommandClassifier(50000, 10000, 1000)), + fast_pool_metrics_(new ThreadPoolMetrics()), + slow_pool_metrics_(new ThreadPoolMetrics()), + cmd_rate_limiter_(new RateLimiter(g_pika_conf->maxclients() * 2)), // 默认速率:最大客户端连接数的2倍 + key_hash_(new ConsistentHash(100, 1024)) { // Init server ip host if (!ServerInit()) { LOG(FATAL) << "ServerInit iotcl error"; @@ -770,15 +1017,153 @@ void PikaServer::SetFirstMetaSync(bool v) { } void PikaServer::ScheduleClientPool(net::TaskFunc func, void* arg, bool is_slow_cmd, bool is_admin_cmd) { - if (is_slow_cmd && g_pika_conf->slow_cmd_pool()) { - pika_slow_cmd_thread_pool_->Schedule(func, arg); + // Apply rate limiting if enabled (only for non-admin commands) + if (!is_admin_cmd && !CheckCommandRateLimit()) { + // Rate limit exceeded, handle gracefully + auto bg_arg = static_cast(arg); + if (bg_arg) { + // Set error response for rate limiting + bg_arg->resp = std::make_shared("-ERR Rate limit exceeded, try again later\r\n"); + bg_arg->conn->WriteResp(*(bg_arg->resp)); + delete bg_arg; + } return; } + + // Admin commands always go to admin thread pool if (is_admin_cmd) { - pika_admin_cmd_thread_pool_->Schedule(func, arg); + if (pika_admin_cmd_thread_pool_) { + pika_admin_cmd_thread_pool_->Schedule(func, arg); + } + return; + } + + // Extract command info for dynamic classification and key affinity + auto bg_arg = static_cast(arg); + std::string cmd_name; + std::string key; + bool has_key = false; + bool key_affinity_to_slow = false; + + if (bg_arg && !bg_arg->redis_cmds.empty() && !bg_arg->redis_cmds[0].empty()) { + const auto& cmd_args = bg_arg->redis_cmds[0]; + if (!cmd_args.empty()) { + cmd_name = cmd_args[0]; + pstd::StringToLower(cmd_name); + + // Extract key for affinity routing + if (cmd_args.size() > 1) { + key = cmd_args[1]; + has_key = true; + // Use improved consistent hashing for key affinity + key_affinity_to_slow = IsKeyAffinityToSlow(key); + } + } + } + + // Check if command is dynamically classified as slow + // Override static classification if dynamic classification exists + if (!cmd_name.empty() && cmd_classifier_) { + bool dynamic_is_slow = IsDynamicSlowCommand(cmd_name); + if (dynamic_is_slow) { + is_slow_cmd = true; + } + } + + // Check if thread pool borrowing is enabled + bool borrow_enable = g_pika_conf->threadpool_borrow_enable(); + bool slow_cmd_pool_enable = g_pika_conf->slow_cmd_pool(); + + // Update metrics before scheduling + if (is_slow_cmd && slow_pool_metrics_) { + slow_pool_metrics_->tasks_scheduled++; + } else if (fast_pool_metrics_) { + fast_pool_metrics_->tasks_scheduled++; + } + + // If borrowing or slow pool is disabled, use simple routing + if (!borrow_enable || !slow_cmd_pool_enable) { + if (is_slow_cmd && slow_cmd_pool_enable) { + pika_slow_cmd_thread_pool_->Schedule(func, arg); + } else { + pika_client_processor_->SchedulePool(func, arg); + } return; } - pika_client_processor_->SchedulePool(func, arg); + + // Borrowing is enabled, check queue status + size_t slow_queue_size = SlowCmdThreadPoolCurQueueSize(); + size_t slow_max_queue = SlowCmdThreadPoolMaxQueueSize(); + size_t fast_queue_size = ClientProcessorThreadPoolCurQueueSize(); + size_t fast_max_queue = ClientProcessorThreadPoolMaxQueueSize(); + + int borrow_threshold_percent = g_pika_conf->threadpool_borrow_threshold_percent(); + int idle_threshold_percent = g_pika_conf->threadpool_idle_threshold_percent(); + + // Calculate thresholds + size_t slow_borrow_threshold = slow_max_queue * borrow_threshold_percent / 100; + size_t fast_borrow_threshold = fast_max_queue * borrow_threshold_percent / 100; + size_t slow_idle_threshold = slow_max_queue * idle_threshold_percent / 100; + size_t fast_idle_threshold = fast_max_queue * idle_threshold_percent / 100; + + // Enhanced decision logic considering dynamic classification and improved key affinity + if (is_slow_cmd) { + // This is a slow command (either statically or dynamically classified) + if (has_key && !key_affinity_to_slow) { + // Key affinity is to fast pool, must use fast pool to maintain order + pika_client_processor_->SchedulePool(func, arg); + } else if (slow_queue_size >= slow_borrow_threshold && fast_queue_size <= fast_idle_threshold) { + // Slow pool is busy and fast pool is idle + if (has_key && key_affinity_to_slow) { + // Key belongs to slow pool, cannot borrow + pika_slow_cmd_thread_pool_->Schedule(func, arg); + } else { + // Can borrow from fast pool + if (slow_pool_metrics_) { + slow_pool_metrics_->borrow_attempts++; + slow_pool_metrics_->successful_borrows++; + } + pika_client_processor_->SchedulePool(func, arg); + LOG(INFO) << "Slow cmd " << cmd_name << " borrows fast pool (slow_queue: " << slow_queue_size + << "/" << slow_max_queue << ", fast_queue: " << fast_queue_size + << "/" << fast_max_queue << ")"; + } + } else { + // Normal case: use slow pool + pika_slow_cmd_thread_pool_->Schedule(func, arg); + } + } else { + // This is a fast command + if (has_key && key_affinity_to_slow) { + // Key affinity is to slow pool, must use slow pool to maintain order + pika_slow_cmd_thread_pool_->Schedule(func, arg); + } else if (fast_queue_size >= fast_borrow_threshold && slow_queue_size <= slow_idle_threshold) { + // Fast pool is busy and slow pool is idle + if (has_key && !key_affinity_to_slow) { + // Key belongs to fast pool, cannot borrow + pika_client_processor_->SchedulePool(func, arg); + } else { + // Can borrow from slow pool + if (fast_pool_metrics_) { + fast_pool_metrics_->borrow_attempts++; + fast_pool_metrics_->successful_borrows++; + } + pika_slow_cmd_thread_pool_->Schedule(func, arg); + LOG(INFO) << "Fast cmd " << cmd_name << " borrows slow pool (fast_queue: " << fast_queue_size + << "/" << fast_max_queue << ", slow_queue: " << slow_queue_size + << "/" << slow_max_queue << ")"; + } + } else { + // Normal case: use fast pool + pika_client_processor_->SchedulePool(func, arg); + } + } + + // Periodically adjust borrow thresholds based on load (every 100 commands) + static int command_counter = 0; + if (++command_counter % 100 == 0) { + AdjustBorrowThresholds(); + } } size_t PikaServer::ClientProcessorThreadPoolCurQueueSize() { @@ -811,6 +1196,115 @@ size_t PikaServer::SlowCmdThreadPoolMaxQueueSize() { return pika_slow_cmd_thread_pool_->max_queue_size(); } +bool PikaServer::ResizeFastCmdThreadPool(size_t new_size) { + if (new_size == 0 || new_size > 1024) { + LOG(WARNING) << "Invalid fast cmd thread pool size: " << new_size << ", must be between 1 and 1024"; + return false; + } + + size_t old_size = g_pika_conf->thread_pool_size(); + if (new_size == old_size) { + LOG(INFO) << "Fast cmd thread pool size unchanged: " << new_size; + return true; + } + + LOG(INFO) << "Resizing fast cmd thread pool from " << old_size << " to " << new_size; + + // Update config + g_pika_conf->SetThreadPoolSize(static_cast(new_size)); + + // Stop old thread pool gracefully + LOG(INFO) << "Waiting for old fast cmd thread pool tasks to complete..."; + pika_client_processor_->Stop(); + + // Create and start new thread pool + pika_client_processor_ = std::make_unique(new_size, 100000); + int ret = pika_client_processor_->Start(); + if (ret != net::kSuccess) { + LOG(ERROR) << "Failed to start new fast cmd thread pool: " << ret; + return false; + } + + LOG(INFO) << "Successfully resized fast cmd thread pool to " << new_size; + return true; +} + +bool PikaServer::ResizeSlowCmdThreadPool(size_t new_size) { + if (new_size == 0 || new_size > 1024) { + LOG(WARNING) << "Invalid slow cmd thread pool size: " << new_size << ", must be between 1 and 1024"; + return false; + } + + size_t old_size = g_pika_conf->slow_cmd_thread_pool_size(); + if (new_size == old_size) { + LOG(INFO) << "Slow cmd thread pool size unchanged: " << new_size; + return true; + } + + LOG(INFO) << "Resizing slow cmd thread pool from " << old_size << " to " << new_size; + + // Update config + g_pika_conf->SetLowLevelThreadPoolSize(static_cast(new_size)); + + // Stop old thread pool gracefully + LOG(INFO) << "Waiting for old slow cmd thread pool tasks to complete..."; + while (SlowCmdThreadPoolCurQueueSize() != 0) { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + pika_slow_cmd_thread_pool_->stop_thread_pool(); + + // Create and start new thread pool + pika_slow_cmd_thread_pool_ = std::make_unique(new_size, 100000); + int ret = pika_slow_cmd_thread_pool_->start_thread_pool(); + if (ret != net::kSuccess) { + LOG(ERROR) << "Failed to start new slow cmd thread pool: " << ret; + return false; + } + + LOG(INFO) << "Successfully resized slow cmd thread pool to " << new_size; + return true; +} + +void PikaServer::GetThreadPoolInfo(std::string* info) { + std::stringstream tmp_stream; + + // Fast cmd thread pool info + size_t fast_pool_size = g_pika_conf->thread_pool_size(); + size_t fast_queue_size = ClientProcessorThreadPoolCurQueueSize(); + size_t fast_max_queue = ClientProcessorThreadPoolMaxQueueSize(); + double fast_usage = fast_max_queue > 0 ? (fast_queue_size * 100.0 / fast_max_queue) : 0.0; + + tmp_stream << "# Threadpool\r\n"; + tmp_stream << "fast_cmd_pool_size:" << fast_pool_size << "\r\n"; + tmp_stream << "fast_cmd_pool_queue_size:" << fast_queue_size << "\r\n"; + tmp_stream << "fast_cmd_pool_max_queue_size:" << fast_max_queue << "\r\n"; + tmp_stream << "fast_cmd_pool_usage:" << std::fixed << std::setprecision(2) << fast_usage << "%\r\n"; + + // Slow cmd thread pool info + if (g_pika_conf->slow_cmd_pool()) { + size_t slow_pool_size = g_pika_conf->slow_cmd_thread_pool_size(); + size_t slow_queue_size = SlowCmdThreadPoolCurQueueSize(); + size_t slow_max_queue = SlowCmdThreadPoolMaxQueueSize(); + double slow_usage = slow_max_queue > 0 ? (slow_queue_size * 100.0 / slow_max_queue) : 0.0; + + tmp_stream << "slow_cmd_pool_size:" << slow_pool_size << "\r\n"; + tmp_stream << "slow_cmd_pool_queue_size:" << slow_queue_size << "\r\n"; + tmp_stream << "slow_cmd_pool_max_queue_size:" << slow_max_queue << "\r\n"; + tmp_stream << "slow_cmd_pool_usage:" << std::fixed << std::setprecision(2) << slow_usage << "%\r\n"; + } else { + tmp_stream << "slow_cmd_pool_size:0\r\n"; + tmp_stream << "slow_cmd_pool_queue_size:0\r\n"; + tmp_stream << "slow_cmd_pool_max_queue_size:0\r\n"; + tmp_stream << "slow_cmd_pool_usage:0.00%\r\n"; + } + + // Admin cmd thread pool info + size_t admin_pool_size = g_pika_conf->admin_thread_pool_size(); + tmp_stream << "admin_cmd_pool_size:" << admin_pool_size << "\r\n"; + + info->append(tmp_stream.str()); +} + void PikaServer::BGSaveTaskSchedule(net::TaskFunc func, void* arg) { bgsave_thread_.StartThread(); bgsave_thread_.Schedule(func, arg); @@ -1901,3 +2395,135 @@ void PikaServer::CacheConfigInit(cache::CacheConfig& cache_cfg) { cache_cfg.lfu_decay_time = g_pika_conf->cache_lfu_decay_time(); } void PikaServer::SetLogNetActivities(bool value) { pika_dispatch_thread_->SetLogNetActivities(value); } + +// 改进的快慢命令分离相关方法实现 + +void PikaServer::AdjustBorrowThresholds() { + // 这个方法可以根据系统负载动态调整借用阈值 + // 当前使用简单的队列饱和度来调整 + + size_t fast_queue_size = ClientProcessorThreadPoolCurQueueSize(); + size_t fast_max_queue = ClientProcessorThreadPoolMaxQueueSize(); + size_t slow_queue_size = SlowCmdThreadPoolCurQueueSize(); + size_t slow_max_queue = SlowCmdThreadPoolMaxQueueSize(); + + if (fast_max_queue == 0 || slow_max_queue == 0) { + return; + } + + // 计算队列饱和度 + double fast_saturation = static_cast(fast_queue_size) / fast_max_queue; + double slow_saturation = static_cast(slow_queue_size) / slow_max_queue; + + int current_threshold = g_pika_conf->threadpool_borrow_threshold_percent(); + int new_threshold = current_threshold; + + // 如果两个线程池都很繁忙(饱和度>0.8),提高借用阈值(更难借用) + if (fast_saturation > 0.8 && slow_saturation > 0.8) { + new_threshold = std::min(90, current_threshold + 5); + } + // 如果两个线程池都很空闲(饱和度<0.3),降低借用阈值(更容易借用) + else if (fast_saturation < 0.3 && slow_saturation < 0.3) { + new_threshold = std::max(30, current_threshold - 5); + } + + if (new_threshold != current_threshold) { + g_pika_conf->SetThreadpoolBorrowThresholdPercent(new_threshold); + LOG(INFO) << "Adjusted borrow threshold from " << current_threshold + << "% to " << new_threshold << "% (fast_sat: " + << std::fixed << std::setprecision(2) << fast_saturation * 100 + << "%, slow_sat: " << slow_saturation * 100 << "%)"; + } +} + +bool PikaServer::IsDynamicSlowCommand(const std::string& cmd_name) const { + if (!cmd_classifier_) { + return false; + } + return cmd_classifier_->IsSlowCommand(cmd_name); +} + +void PikaServer::RecordCommandExecutionTime(const std::string& cmd_name, uint64_t duration_us) { + if (cmd_classifier_) { + cmd_classifier_->RecordExecutionTime(cmd_name, duration_us); + } + + // 同时记录到线程池指标中 + // 注意:这里需要知道命令是在哪个线程池执行的,简化起见我们根据是否是慢命令来判断 + if (IsDynamicSlowCommand(cmd_name) && slow_pool_metrics_) { + slow_pool_metrics_->RecordLatency(duration_us); + slow_pool_metrics_->tasks_completed++; + } else if (fast_pool_metrics_) { + fast_pool_metrics_->RecordLatency(duration_us); + fast_pool_metrics_->tasks_completed++; + } +} + +void PikaServer::SetCommandRateLimit(double rate_per_sec) { + if (cmd_rate_limiter_) { + cmd_rate_limiter_->SetRate(rate_per_sec); + LOG(INFO) << "Command rate limit set to " << rate_per_sec << " commands/sec"; + } +} + +bool PikaServer::CheckCommandRateLimit() { + if (!cmd_rate_limiter_) { + return true; // 如果没有限流器,总是允许 + } + return cmd_rate_limiter_->TryAcquire(1.0); +} + +std::string PikaServer::GetEnhancedThreadPoolMetrics() const { + std::stringstream ss; + + // 基本线程池信息 + GetThreadPoolInfo(&ss.str()); + + // 增强的指标(Prometheus格式) + ss << "\n# Enhanced Thread Pool Metrics\n"; + + if (fast_pool_metrics_) { + ss << fast_pool_metrics_->ExportMetrics("fast"); + } + + if (slow_pool_metrics_ && g_pika_conf->slow_cmd_pool()) { + ss << slow_pool_metrics_->ExportMetrics("slow"); + } + + // 命令分类统计 + if (cmd_classifier_) { + ss << "\n# Command Classification\n"; + auto cmd_avg_times = cmd_classifier_->GetCommandAvgTimes(); + for (const auto& [cmd_name, avg_time] : cmd_avg_times) { + bool is_slow = cmd_classifier_->IsSlowCommand(cmd_name); + ss << "command_avg_time{cmd=\"" << cmd_name + << "\",type=\"" << (is_slow ? "slow" : "fast") << "\"} " + << avg_time / 1000.0 << " # milliseconds\n"; + } + } + + return ss.str(); +} + +bool PikaServer::IsKeyAffinityToSlow(const std::string& key) const { + if (!key_hash_) { + // 如果没有一致性哈希,使用简单的哈希 + std::hash hasher; + return (hasher(key) % 2) == 1; + } + + std::string node = key_hash_->GetNode(key); + return node == "slow"; +} + +void PikaServer::ResetThreadPoolMetrics() { + if (fast_pool_metrics_) { + fast_pool_metrics_->Reset(); + } + + if (slow_pool_metrics_) { + slow_pool_metrics_->Reset(); + } + + LOG(INFO) << "Thread pool metrics reset"; +} diff --git a/tools/pika_migrate/conf/pika.conf b/tools/pika_migrate/conf/pika.conf index 7ca990005..a680e7aeb 100644 --- a/tools/pika_migrate/conf/pika.conf +++ b/tools/pika_migrate/conf/pika.conf @@ -132,7 +132,7 @@ masterauth : # The [password of user], which is empty by default. # [NOTICE] If this user password is the same as admin password (including both being empty), # the value of this parameter will be ignored and all users are considered as administrators, -# in this scenario, users are not subject to the restrictions imposed by the userblacklist. +# in this scenario, users are not subject to the restrictions imposed by the userb lacklist. # PS: "admin password" refers to value of the parameter above: requirepass. # userpass : @@ -147,6 +147,7 @@ masterauth : # If set to 'classic', Pika will create multiple DBs whose number is the value of configure item "databases". instance-mode : classic + # The number of databases when Pika runs in classic mode. # The default database id is DB 0. You can select a different one on # a per-connection by using SELECT. The db id range is [0, 'databases' value -1].