Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion conf/pika.conf
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ default-slot-num : 1024
# The value option is [yes | no]
# enable-blob-garbage-collection : no

# the cutoff that the GC logic uses to determine which blob files should be considered old.
# the cutoff that the GC logic uses to determine which blob files should be considered "old".
# This parameter can be tuned to adjust the trade-off between write amplification and space amplification.
# blob-garbage-collection-age-cutoff : 0.25

Expand Down Expand Up @@ -527,6 +527,25 @@ max-rsync-parallel-num : 4
# The synchronization mode of Pika primary/secondary replication is determined by ReplicationID. ReplicationID in one replication_cluster are the same
# replication-id :

# The maximum number of big keys to output in 'info' and log output.
# This controls how many big key entries are shown at most for each type.
# Default: 10
BIGKEYS_SHOW_LIMIT = 10

# The threshold for member count to trigger big key detection
# Default: 10000
bigkeys_member_threshold : 10000

# The threshold for key and value length (in bytes)
# to trigger big key detection (for string type).
# Default: 1048576 (1MB)
bigkeys_key_value_length_threshold : 1048576

# The interval (in minutes) for outputting big key statistics to the log.
# Interval time for scanning expired big key threads.
# Default: 1
# When set to 0, big key logging will be disabled.
bigkeys_log_interval : 1
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里默认改成0吧,默认不打印

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

###################
## Cache Settings
###################
Expand Down
8 changes: 7 additions & 1 deletion include/pika_admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,8 @@ class InfoCmd : public Cmd {
kInfoAll,
kInfoDebug,
kInfoCommandStats,
kInfoCache
kInfoCache,
kInfoBigKeys
};
InfoCmd(const std::string& name, int arity, uint32_t flag) : Cmd(name, arity, flag) {}
void Do() override;
Expand All @@ -277,6 +278,7 @@ class InfoCmd : public Cmd {
void Execute() override;

private:
int bigkeys_limit_ = 0;
InfoSection info_section_;
bool rescan_ = false; // whether to rescan the keyspace
bool off_ = false;
Expand All @@ -295,15 +297,19 @@ class InfoCmd : public Cmd {
const static std::string kDebugSection;
const static std::string kCommandStatsSection;
const static std::string kCacheSection;
const static std::string kBigKeysSection;

void DoInitial() override;
void Clear() override {
rescan_ = false;
off_ = false;
keyspace_scan_dbs_.clear();
info_section_ = kInfoErr;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里在 Clear() 情况下为什么要置为 kInfoErr 状态,什么场景下这个命令会调用 Clear()

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我用 kInfoErr 状态代表一个错误或者未初始化的状态。确保命令对象在被重用之前,其内部状态恢复到初始状态,防止数据污染。当一个命令执行完成后,Clear() 方法会被调用,清理内部状态,释放资源,为下次复用做准备。

bigkeys_limit_ = 0;
}

void InfoServer(std::string& info);
void InfoBigKeys(std::string& info);
void InfoClients(std::string& info);
void InfoStats(std::string& info);
void InfoExecCount(std::string& info);
Expand Down
40 changes: 39 additions & 1 deletion include/pika_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,23 @@ class PikaConf : public pstd::BaseConf {
std::shared_lock l(rwlock_);
return binlog_writer_num_;
}
//big keys
int bigkeys_show_limit() {
std::shared_lock l(rwlock_);
return bigkeys_show_limit_;
}
int bigkeys_member_threshold() {
std::shared_lock l(rwlock_);
return bigkeys_member_threshold_;
}
int bigkeys_key_value_length_threshold() {
std::shared_lock l(rwlock_);
return bigkeys_key_value_length_threshold_;
}
int bigkeys_log_interval() {
std::shared_lock l(rwlock_);
return bigkeys_log_interval_;
}
bool slotmigrate() {
std::shared_lock l(rwlock_);
return slotmigrate_;
Expand Down Expand Up @@ -729,7 +746,23 @@ class PikaConf : public pstd::BaseConf {
log_net_activities_.store(false);
}
}

//big keys
void SetBigkeysShowLimit(const int value) {
std::lock_guard l(rwlock_);
bigkeys_show_limit_ = value;
}
void SetBigkeysKeyValueLengthThreshold(const int value) {
std::lock_guard l(rwlock_);
bigkeys_key_value_length_threshold_ = value;
}
void SetBigkeysMemberCountThreshold(const int value) {
std::lock_guard l(rwlock_);
bigkeys_member_threshold_ = value;
}
void SetBigkeysLogInterval(const int value) {
std::lock_guard l(rwlock_);
bigkeys_log_interval_ = value;
}
// Rsync Rate limiting configuration
void SetThrottleBytesPerSecond(const int value) {
std::lock_guard l(rwlock_);
Expand Down Expand Up @@ -895,6 +928,11 @@ class PikaConf : public pstd::BaseConf {
int thread_pool_size_ = 0;
int slow_cmd_thread_pool_size_ = 0;
int admin_thread_pool_size_ = 0;
//big keys
int bigkeys_show_limit_ = 10;
int bigkeys_member_threshold_ = 10000;
int bigkeys_key_value_length_threshold_ = 1048576;
int bigkeys_log_interval_ = 60;
std::unordered_set<std::string> slow_cmd_set_;
// Because the exporter of Pika_exporter implements Auth authentication
// with the Exporter of Pika, and the Exporter authenticates the Auth when
Expand Down
3 changes: 3 additions & 0 deletions include/pika_db.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ class DB : public std::enable_shared_from_this<DB>, public pstd::noncopyable {

void SetCompactRangeOptions(const bool is_canceled);

// Update big keys configuration
void UpdateStorageBigKeysConfig(uint32_t log_interval, uint64_t member_threshold, uint64_t key_value_length_threshold, size_t show_limit);

std::shared_ptr<pstd::lock::LockMgr> LockMgr();
/*
* Cache used
Expand Down
3 changes: 3 additions & 0 deletions include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ class PikaServer : public pstd::noncopyable {
void DBSetMaxCacheStatisticKeys(uint32_t max_cache_statistic_keys);
void DBSetSmallCompactionThreshold(uint32_t small_compaction_threshold);
void DBSetSmallCompactionDurationThreshold(uint32_t small_compaction_duration_threshold);
void UpdateDBBigKeysConfig();
bool GetDBBinlogOffset(const std::string& db_name, BinlogOffset* boffset);
pstd::Status DoSameThingEveryDB(const TaskType& type);

Expand Down Expand Up @@ -527,6 +528,8 @@ class PikaServer : public pstd::noncopyable {
void AutoDeleteExpiredDump();
void AutoUpdateNetworkMetric();
void PrintThreadPoolQueueStatus();
void LogBigKeysInfo();
void CleanExpiredBigKeys();
void StatDiskUsage();
int64_t GetLastSaveTime(const std::string& dump_dir);

Expand Down
1 change: 1 addition & 0 deletions src/pika.cc
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ int main(int argc, char* argv[]) {
g_pika_rm = std::make_unique<PikaReplicaManager>();
g_network_statistic = std::make_unique<net::NetworkStatistic>();
g_pika_server->InitDBStruct();
g_pika_server->UpdateDBBigKeysConfig();
//the cmd table of g_pika_cmd_table_manager must be inited before calling PikaServer::InitStatistic(CmdTable* )
g_pika_server->InitStatistic(g_pika_cmd_table_manager->GetCmdTable());
auto status = g_pika_server->InitAcl();
Expand Down
80 changes: 77 additions & 3 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -884,7 +884,7 @@ const std::string InfoCmd::kRocksDBSection = "rocksdb";
const std::string InfoCmd::kDebugSection = "debug";
const std::string InfoCmd::kCommandStatsSection = "commandstats";
const std::string InfoCmd::kCacheSection = "cache";

const std::string InfoCmd::kBigKeysSection = "bigkeys";

const std::string ClientCmd::KILLTYPE_NORMAL = "normal";
const std::string ClientCmd::KILLTYPE_PUBSUB = "pubsub";
Expand All @@ -910,6 +910,8 @@ void InfoCmd::DoInitial() {
keyspace_scan_dbs_ = g_pika_server->GetAllDBName();
} else if (strcasecmp(argv_[1].data(), kServerSection.data()) == 0) {
info_section_ = kInfoServer;
} else if (strcasecmp(argv_[1].data(), "bigkeys") == 0) {
info_section_ = kInfoBigKeys;
} else if (strcasecmp(argv_[1].data(), kClientsSection.data()) == 0) {
info_section_ = kInfoClients;
} else if (strcasecmp(argv_[1].data(), kStatsSection.data()) == 0) {
Expand Down Expand Up @@ -994,6 +996,9 @@ void InfoCmd::Do() {
InfoReplication(info);
info.append("\r\n");
InfoKeyspace(info);
info.append("\r\n");
InfoBigKeys(info);
info.append("\r\n");
break;
case kInfoAll:
InfoServer(info);
Expand All @@ -1017,6 +1022,9 @@ void InfoCmd::Do() {
InfoKeyspace(info);
info.append("\r\n");
InfoRocksDB(info);
info.append("\r\n");
InfoBigKeys(info);
info.append("\r\n");
break;
case kInfoServer:
InfoServer(info);
Expand Down Expand Up @@ -1054,6 +1062,9 @@ void InfoCmd::Do() {
case kInfoCache:
InfoCache(info, db_);
break;
case kInfoBigKeys:
InfoBigKeys(info);
break;
default:
// kInfoErr is nothing
break;
Expand Down Expand Up @@ -1094,7 +1105,21 @@ void InfoCmd::InfoServer(std::string& info) {

info.append(tmp_stream.str());
}

void InfoCmd::InfoBigKeys(std::string& info) {
std::stringstream tmp_stream;
std::shared_lock db_rwl(g_pika_server->dbs_rw_);
for (const auto& db_item : g_pika_server->dbs_) {
if (!db_item.second) {
continue;
}
std::vector<storage::BigKeyInfo> bigkeys;
db_item.second->storage()->GetBigKeyStatistics(&bigkeys);
std::string bigkey_info;
storage::FormatBigKeyStatistics(bigkeys, &bigkey_info, g_pika_conf->bigkeys_show_limit());
tmp_stream << bigkey_info;
}
info.append(tmp_stream.str());
}
void InfoCmd::InfoClients(std::string& info) {
std::stringstream tmp_stream;
tmp_stream << "# Clients"
Expand Down Expand Up @@ -1833,7 +1858,27 @@ void ConfigCmd::ConfigGet(std::string& ret) {
EncodeString(&config_body, "max-cache-statistic-keys");
EncodeNumber(&config_body, g_pika_conf->max_cache_statistic_keys());
}

//big keys
if (pstd::stringmatch(pattern.data(), "BIGKEYS_SHOW_LIMIT", 1)) {
elements += 2;
EncodeString(&config_body, "BIGKEYS_SHOW_LIMIT");
EncodeNumber(&config_body, g_pika_conf->bigkeys_show_limit());
}
if (pstd::stringmatch(pattern.data(), "bigkeys_member_threshold", 1)) {
elements += 2;
EncodeString(&config_body, "bigkeys_member_threshold");
EncodeNumber(&config_body, g_pika_conf->bigkeys_member_threshold());
}
if (pstd::stringmatch(pattern.data(), "bigkeys_key_value_length_threshold", 1)) {
elements += 2;
EncodeString(&config_body, "bigkeys_key_value_length_threshold");
EncodeNumber(&config_body, g_pika_conf->bigkeys_key_value_length_threshold());
}
if (pstd::stringmatch(pattern.data(), "bigkeys_log_interval", 1)) {
elements += 2;
EncodeString(&config_body, "bigkeys_log_interval");
EncodeNumber(&config_body, g_pika_conf->bigkeys_log_interval());
}
if (pstd::stringmatch(pattern.data(), "small-compaction-threshold", 1) != 0) {
elements += 2;
EncodeString(&config_body, "small-compaction-threshold");
Expand Down Expand Up @@ -2963,6 +3008,35 @@ void ConfigCmd::ConfigSet(std::shared_ptr<DB> db) {
}
g_pika_conf->SetMaxConnRbufSize(static_cast<int>(ival));
res_.AppendStringRaw("+OK\r\n");
//big keys
} else if (set_item == "BIGKEYS_SHOW_LIMIT") {
if (!pstd::string2int(value.data(), value.size(), &ival) || ival < 0) {
res_.AppendStringRaw("-ERR Invalid argument '" + value + "' for CONFIG SET 'BIGKEYS_SHOW_LIMIT'\r\n");
return;
}
g_pika_conf->SetBigkeysShowLimit(ival);
res_.AppendStringRaw("+OK\r\n");
} else if (set_item == "bigkeys_member_threshold") {
if (!pstd::string2int(value.data(), value.size(), &ival) || ival < 0) {
res_.AppendStringRaw("-ERR Invalid argument '" + value + "' for CONFIG SET 'bigkeys_member_threshold'\r\n");
return;
}
g_pika_conf->SetBigkeysMemberCountThreshold(ival);
res_.AppendStringRaw("+OK\r\n");
} else if (set_item == "bigkeys_key_value_length_threshold") {
if (!pstd::string2int(value.data(), value.size(), &ival) || ival < 0) {
res_.AppendStringRaw("-ERR Invalid argument '" + value + "' for CONFIG SET 'bigkeys_key_value_length_threshold'\r\n");
return;
}
g_pika_conf->SetBigkeysKeyValueLengthThreshold(ival);
res_.AppendStringRaw("+OK\r\n");
} else if (set_item == "bigkeys_log_interval") {
if (!pstd::string2int(value.data(), value.size(), &ival) || ival < 0) {
res_.AppendStringRaw("-ERR Invalid argument '" + value + "' for CONFIG SET 'bigkeys_log_interval'\r\n");
return;
}
g_pika_conf->SetBigkeysLogInterval(ival);
res_.AppendStringRaw("+OK\r\n");
} else {
res_.AppendStringRaw("-ERR Unsupported CONFIG parameter: " + set_item + "\r\n");
}
Expand Down
22 changes: 21 additions & 1 deletion src/pika_conf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,22 @@ int PikaConf::Load() {
if (root_connection_num_ < 0) {
root_connection_num_ = 2;
}

GetConfInt("bigkeys_show_limit", &bigkeys_show_limit_);
if (bigkeys_show_limit_ <= 0) {
bigkeys_show_limit_ = 10;
}
GetConfInt("bigkeys_key_value_length_threshold", &bigkeys_key_value_length_threshold_);
if (bigkeys_key_value_length_threshold_ <= 0) {
bigkeys_key_value_length_threshold_ = 1048576;
}
GetConfInt("bigkeys_member_threshold", &bigkeys_member_threshold_);
if (bigkeys_member_threshold_ <= 0) {
bigkeys_member_threshold_ = 10000;
}
GetConfInt("bigkeys_log_interval", &bigkeys_log_interval_);
if (bigkeys_log_interval_ < 0) {
bigkeys_log_interval_ = 1;
}
std::string swe;
GetConfStr("slowlog-write-errorlog", &swe);
slowlog_write_errorlog_.store(swe == "yes" ? true : false);
Expand Down Expand Up @@ -805,6 +820,11 @@ int PikaConf::ConfigRewrite() {
SetConfInt64("thread-migrate-keys-num", thread_migrate_keys_num_);
// slaveof config item is special
SetConfStr("slaveof", slaveof_);
//big keys
SetConfInt("bigkeys_show_limit", bigkeys_show_limit_);
SetConfInt("bigkeys_key_value_length_threshold", bigkeys_key_value_length_threshold_);
SetConfInt("bigkeys_member_threshold", bigkeys_member_threshold_);
SetConfInt("bigkeys_log_interval", bigkeys_log_interval_);
// cache config
SetConfStr("cache-index-and-filter-blocks", cache_index_and_filter_blocks_ ? "yes" : "no");
SetConfInt("cache-model", cache_mode_);
Expand Down
7 changes: 7 additions & 0 deletions src/pika_db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,13 @@ void DB::SetCompactRangeOptions(const bool is_canceled) {
storage_->SetCompactRangeOptions(is_canceled);
}

void DB::UpdateStorageBigKeysConfig(uint32_t log_interval, uint64_t member_threshold, uint64_t key_value_length_threshold, size_t show_limit) {
if (!opened_) {
return;
}
storage_->UpdateBigKeysConfig(log_interval, member_threshold, key_value_length_threshold, show_limit);
}

DisplayCacheInfo DB::GetCacheInfo() {
std::lock_guard l(cache_info_rwlock_);
return cache_info_;
Expand Down
Loading
Loading