Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions conf/pika.conf
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,19 @@ thread-pool-size : 12
# When set to no, they are not separated.
slow-cmd-pool : no

# Enable thread pool borrowing mechanism, which allows threads to help each other when one pool is busy
# and another is idle. This can improve resource utilization and reduce command latency under uneven load.
# [yes | no] - The default value is yes.
threadpool-borrow-enable : yes

# The queue length threshold (as a percentage of max queue size) at which a thread pool is considered busy
# and may borrow threads from another pool. The value range is [1-99], default is 80.
threadpool-borrow-threshold-percent : 80

# The queue length threshold (as a percentage of max queue size) at which a thread pool is considered idle
# and may lend threads to another pool. The value range is [1-99], default is 20.
threadpool-idle-threshold-percent : 20

# Size of the low level thread pool, The threads within this pool
# are dedicated to handling slow user requests.
slow-cmd-thread-pool-size : 1
Expand Down
1 change: 1 addition & 0 deletions include/pika_admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,7 @@ class InfoCmd : public Cmd {
const static std::string kDebugSection;
const static std::string kCommandStatsSection;
const static std::string kCacheSection;
const static std::string kThreadpoolSection;

void DoInitial() override;
void Clear() override {
Expand Down
17 changes: 17 additions & 0 deletions include/pika_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,18 @@ class PikaConf : public pstd::BaseConf {
std::shared_lock l(rwlock_);
return slow_cmd_pool_;
}
bool threadpool_borrow_enable() {
std::shared_lock l(rwlock_);
return threadpool_borrow_enable_;
}
int threadpool_borrow_threshold_percent() {
std::shared_lock l(rwlock_);
return threadpool_borrow_threshold_percent_;
}
int threadpool_idle_threshold_percent() {
std::shared_lock l(rwlock_);
return threadpool_idle_threshold_percent_;
}
std::string server_id() {
std::shared_lock l(rwlock_);
return server_id_;
Expand Down Expand Up @@ -942,6 +954,11 @@ class PikaConf : public pstd::BaseConf {
std::string bgsave_prefix_;
std::string pidfile_;
std::atomic<bool> slow_cmd_pool_;

// Thread pool task borrowing configuration
bool threadpool_borrow_enable_ = true;
int threadpool_borrow_threshold_percent_ = 80;
int threadpool_idle_threshold_percent_ = 20;

std::string compression_;
std::string compression_per_level_;
Expand Down
145 changes: 145 additions & 0 deletions include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
#endif
#include <memory>
#include <set>
#include <array>
#include <chrono>
#include <unordered_map>

#include "src/cache/include/config.h"
#include "net/include/bg_thread.h"
Expand Down Expand Up @@ -78,6 +81,108 @@ struct TaskArg {
void DoBgslotscleanup(void* arg);
void DoBgslotsreload(void* arg);

// 命令分类器:基于命令执行时间自动分类快慢命令
class CommandClassifier {
public:
CommandClassifier(uint64_t slow_threshold_us = 50000, uint64_t fast_threshold_us = 10000,
size_t window_size = 1000);

// 记录命令执行时间并动态调整分类
void RecordExecutionTime(const std::string& cmd_name, uint64_t duration_us);

// 检查命令是否为慢命令
bool IsSlowCommand(const std::string& cmd_name) const;

// 将命令标记为慢命令
void MarkAsSlowCommand(const std::string& cmd_name);

// 将命令标记为快命令
void MarkAsFastCommand(const std::string& cmd_name);

// 获取所有命令的平均执行时间
std::unordered_map<std::string, double> GetCommandAvgTimes() const;

private:
struct CommandStats {
uint64_t total_time = 0;
uint64_t count = 0;
std::vector<uint64_t> recent_times; // 最近N次执行时间
size_t time_index = 0; // 环形缓冲区索引
bool initialized = false; // 是否已经收集了足够的样本

void AddTime(uint64_t time, size_t window_size);
double GetAverageTime() const;
double GetRecentAverageTime() const;
};

mutable std::mutex mutex_;
std::unordered_map<std::string, CommandStats> cmd_stats_;
std::unordered_set<std::string> slow_commands_;
uint64_t slow_threshold_us_; // 慢命令阈值(微秒)
uint64_t fast_threshold_us_; // 快命令阈值(微秒)
size_t window_size_; // 滑动窗口大小
};

// 线程池监控指标
struct ThreadPoolMetrics {
std::atomic<uint64_t> tasks_scheduled{0};
std::atomic<uint64_t> tasks_completed{0};
std::atomic<uint64_t> queue_overflows{0};
std::atomic<uint64_t> borrow_attempts{0};
std::atomic<uint64_t> successful_borrows{0};

// 延迟分布统计(1ms, 5ms, 10ms, 50ms, 100ms, 500ms, 1s, 5s, >5s)
std::array<std::atomic<uint64_t>, 9> latency_buckets{};

void RecordLatency(uint64_t latency_us);
std::string ExportMetrics(const std::string& pool_name) const;
void Reset();
};

// 基于令牌桶的流量控制
class RateLimiter {
public:
explicit RateLimiter(double rate_per_sec, double burst_size = 100.0);

// 尝试获取令牌,成功返回true
bool TryAcquire(double tokens = 1.0);

// 调整速率
void SetRate(double rate_per_sec);

private:
void Refill();

double rate_; // 每秒允许的令牌数
double max_tokens_; // 令牌桶容量
double tokens_; // 当前令牌数
std::chrono::steady_clock::time_point last_update_; // 上次更新时间
std::mutex mutex_;
};

// 一致性哈希实现,用于键亲和性
class ConsistentHash {
public:
explicit ConsistentHash(int num_replicas = 100, int num_buckets = 1024);

// 添加节点
void AddNode(const std::string& node);

// 移除节点
void RemoveNode(const std::string& node);

// 获取键对应的节点
std::string GetNode(const std::string& key) const;

// 计算键的哈希值
size_t HashKey(const std::string& key) const;

private:
int num_replicas_;
std::map<size_t, std::string> ring_;
std::hash<std::string> hasher_;
};

class PikaServer : public pstd::noncopyable {
public:
PikaServer();
Expand Down Expand Up @@ -194,6 +299,13 @@ class PikaServer : public pstd::noncopyable {
size_t ClientProcessorThreadPoolMaxQueueSize();
size_t SlowCmdThreadPoolCurQueueSize();
size_t SlowCmdThreadPoolMaxQueueSize();

/*
* Thread pool dynamic resize
*/
bool ResizeFastCmdThreadPool(size_t new_size);
bool ResizeSlowCmdThreadPool(size_t new_size);
void GetThreadPoolInfo(std::string* info);

/*
* BGSave used
Expand Down Expand Up @@ -495,6 +607,30 @@ class PikaServer : public pstd::noncopyable {
void ProcessCronTask();
double HitRatio();
void SetLogNetActivities(bool value);

/*
* 改进的快慢命令分离相关方法
*/
// 动态调整借用阈值
void AdjustBorrowThresholds();

// 自适应命令分类
bool IsDynamicSlowCommand(const std::string& cmd_name) const;
void RecordCommandExecutionTime(const std::string& cmd_name, uint64_t duration_us);

// 设置流控
void SetCommandRateLimit(double rate_per_sec);
bool CheckCommandRateLimit();

// 获取增强的监控指标
std::string GetEnhancedThreadPoolMetrics() const;

// 使用一致性哈希判断键亲和性
bool IsKeyAffinityToSlow(const std::string& key) const;

// 重置监控指标
void ResetThreadPoolMetrics();

/*
* disable compact
*/
Expand Down Expand Up @@ -649,6 +785,15 @@ class PikaServer : public pstd::noncopyable {
* lastsave used
*/
int64_t lastsave_ = 0;

/*
* 改进的快慢命令分离相关成员
*/
std::unique_ptr<CommandClassifier> cmd_classifier_;
std::unique_ptr<ThreadPoolMetrics> fast_pool_metrics_;
std::unique_ptr<ThreadPoolMetrics> slow_pool_metrics_;
std::unique_ptr<RateLimiter> cmd_rate_limiter_;
std::unique_ptr<ConsistentHash> key_hash_;

/*
* acl
Expand Down
8 changes: 8 additions & 0 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -884,6 +884,7 @@ const std::string InfoCmd::kRocksDBSection = "rocksdb";
const std::string InfoCmd::kDebugSection = "debug";
const std::string InfoCmd::kCommandStatsSection = "commandstats";
const std::string InfoCmd::kCacheSection = "cache";
const std::string InfoCmd::kThreadpoolSection = "threadpool";


const std::string ClientCmd::KILLTYPE_NORMAL = "normal";
Expand Down Expand Up @@ -969,6 +970,8 @@ void InfoCmd::DoInitial() {
info_section_ = kInfoCommandStats;
} else if (strcasecmp(argv_[1].data(), kCacheSection.data()) == 0) {
info_section_ = kInfoCache;
} else if (strcasecmp(argv_[1].data(), kThreadpoolSection.data()) == 0) {
info_section_ = kInfoThreadpool;
} else {
info_section_ = kInfoErr;
}
Expand Down Expand Up @@ -1010,6 +1013,8 @@ void InfoCmd::Do() {
info.append("\r\n");
InfoCache(info, db_);
info.append("\r\n");
g_pika_server->GetThreadPoolInfo(&info);
info.append("\r\n");
InfoCPU(info);
info.append("\r\n");
InfoReplication(info);
Expand Down Expand Up @@ -1054,6 +1059,9 @@ void InfoCmd::Do() {
case kInfoCache:
InfoCache(info, db_);
break;
case kInfoThreadpool:
g_pika_server->GetThreadPoolInfo(&info);
break;
default:
// kInfoErr is nothing
break;
Expand Down
14 changes: 14 additions & 0 deletions src/pika_conf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,20 @@ int PikaConf::Load() {
GetConfStr("slow-cmd-pool", &slowcmdpool);
slow_cmd_pool_.store(slowcmdpool == "yes" ? true : false);

std::string threadpool_borrow;
GetConfStr("threadpool-borrow-enable", &threadpool_borrow);
threadpool_borrow_enable_ = threadpool_borrow == "yes" ? true : false;

GetConfInt("threadpool-borrow-threshold-percent", &threadpool_borrow_threshold_percent_);
if (threadpool_borrow_threshold_percent_ <= 0 || threadpool_borrow_threshold_percent_ >= 100) {
threadpool_borrow_threshold_percent_ = 80;
}

GetConfInt("threadpool-idle-threshold-percent", &threadpool_idle_threshold_percent_);
if (threadpool_idle_threshold_percent_ <= 0 || threadpool_idle_threshold_percent_ >= 100) {
threadpool_idle_threshold_percent_ = 20;
}

int binlog_writer_num = 1;
GetConfInt("binlog-writer-num", &binlog_writer_num);
if (binlog_writer_num <= 0 || binlog_writer_num > 24) {
Expand Down
Loading
Loading