diff --git a/conf/pika.conf b/conf/pika.conf index 97d171d419..38f94bcaaf 100644 --- a/conf/pika.conf +++ b/conf/pika.conf @@ -494,7 +494,7 @@ default-slot-num : 1024 # The value option is [yes | no] # enable-blob-garbage-collection : no -# the cutoff that the GC logic uses to determine which blob files should be considered “old“. +# the cutoff that the GC logic uses to determine which blob files should be considered "old". # This parameter can be tuned to adjust the trade-off between write amplification and space amplification. # blob-garbage-collection-age-cutoff : 0.25 @@ -527,6 +527,25 @@ max-rsync-parallel-num : 4 # The synchronization mode of Pika primary/secondary replication is determined by ReplicationID. ReplicationID in one replication_cluster are the same # replication-id : +# The maximum number of big keys to output in 'info' and log output. +# This controls how many big key entries are shown at most for each type. +# Default: 10 +BIGKEYS_SHOW_LIMIT = 10 + +# The threshold for member count to trigger big key detection +# Default: 10000 +bigkeys_member_threshold : 10000 + +# The threshold for key and value length (in bytes) +# to trigger big key detection (for string type). +# Default: 1048576 (1MB) +bigkeys_key_value_length_threshold : 1048576 + +# The interval (in minutes) for outputting big key statistics to the log. +# Interval time for scanning expired big key threads. +# Default: 1 +# When set to 0, big key logging will be disabled. +bigkeys_log_interval : 1 ################### ## Cache Settings ################### diff --git a/include/pika_admin.h b/include/pika_admin.h index de0ddd5a0f..c6862ac14f 100644 --- a/include/pika_admin.h +++ b/include/pika_admin.h @@ -267,7 +267,8 @@ class InfoCmd : public Cmd { kInfoAll, kInfoDebug, kInfoCommandStats, - kInfoCache + kInfoCache, + kInfoBigKeys }; InfoCmd(const std::string& name, int arity, uint32_t flag) : Cmd(name, arity, flag) {} void Do() override; @@ -277,6 +278,7 @@ class InfoCmd : public Cmd { void Execute() override; private: + int bigkeys_limit_ = 0; InfoSection info_section_; bool rescan_ = false; // whether to rescan the keyspace bool off_ = false; @@ -295,15 +297,19 @@ class InfoCmd : public Cmd { const static std::string kDebugSection; const static std::string kCommandStatsSection; const static std::string kCacheSection; + const static std::string kBigKeysSection; void DoInitial() override; void Clear() override { rescan_ = false; off_ = false; keyspace_scan_dbs_.clear(); + info_section_ = kInfoErr; + bigkeys_limit_ = 0; } void InfoServer(std::string& info); + void InfoBigKeys(std::string& info); void InfoClients(std::string& info); void InfoStats(std::string& info); void InfoExecCount(std::string& info); diff --git a/include/pika_conf.h b/include/pika_conf.h index 80d5abe8f0..f466146c20 100644 --- a/include/pika_conf.h +++ b/include/pika_conf.h @@ -160,6 +160,23 @@ class PikaConf : public pstd::BaseConf { std::shared_lock l(rwlock_); return binlog_writer_num_; } + //big keys + int bigkeys_show_limit() { + std::shared_lock l(rwlock_); + return bigkeys_show_limit_; + } + int bigkeys_member_threshold() { + std::shared_lock l(rwlock_); + return bigkeys_member_threshold_; + } + int bigkeys_key_value_length_threshold() { + std::shared_lock l(rwlock_); + return bigkeys_key_value_length_threshold_; + } + int bigkeys_log_interval() { + std::shared_lock l(rwlock_); + return bigkeys_log_interval_; + } bool slotmigrate() { std::shared_lock l(rwlock_); return slotmigrate_; @@ -729,7 +746,23 @@ class PikaConf : public pstd::BaseConf { log_net_activities_.store(false); } } - + //big keys + void SetBigkeysShowLimit(const int value) { + std::lock_guard l(rwlock_); + bigkeys_show_limit_ = value; + } + void SetBigkeysKeyValueLengthThreshold(const int value) { + std::lock_guard l(rwlock_); + bigkeys_key_value_length_threshold_ = value; + } + void SetBigkeysMemberCountThreshold(const int value) { + std::lock_guard l(rwlock_); + bigkeys_member_threshold_ = value; + } + void SetBigkeysLogInterval(const int value) { + std::lock_guard l(rwlock_); + bigkeys_log_interval_ = value; + } // Rsync Rate limiting configuration void SetThrottleBytesPerSecond(const int value) { std::lock_guard l(rwlock_); @@ -895,6 +928,11 @@ class PikaConf : public pstd::BaseConf { int thread_pool_size_ = 0; int slow_cmd_thread_pool_size_ = 0; int admin_thread_pool_size_ = 0; + //big keys + int bigkeys_show_limit_ = 10; + int bigkeys_member_threshold_ = 10000; + int bigkeys_key_value_length_threshold_ = 1048576; + int bigkeys_log_interval_ = 60; std::unordered_set slow_cmd_set_; // Because the exporter of Pika_exporter implements Auth authentication // with the Exporter of Pika, and the Exporter authenticates the Auth when diff --git a/include/pika_db.h b/include/pika_db.h index bcaf3f8b16..af993ba3e7 100644 --- a/include/pika_db.h +++ b/include/pika_db.h @@ -128,6 +128,9 @@ class DB : public std::enable_shared_from_this, public pstd::noncopyable { void SetCompactRangeOptions(const bool is_canceled); + // Update big keys configuration + void UpdateStorageBigKeysConfig(uint32_t log_interval, uint64_t member_threshold, uint64_t key_value_length_threshold, size_t show_limit); + std::shared_ptr LockMgr(); /* * Cache used diff --git a/include/pika_server.h b/include/pika_server.h index 81cda87b04..bc0e1f9f1e 100644 --- a/include/pika_server.h +++ b/include/pika_server.h @@ -145,6 +145,7 @@ class PikaServer : public pstd::noncopyable { void DBSetMaxCacheStatisticKeys(uint32_t max_cache_statistic_keys); void DBSetSmallCompactionThreshold(uint32_t small_compaction_threshold); void DBSetSmallCompactionDurationThreshold(uint32_t small_compaction_duration_threshold); + void UpdateDBBigKeysConfig(); bool GetDBBinlogOffset(const std::string& db_name, BinlogOffset* boffset); pstd::Status DoSameThingEveryDB(const TaskType& type); @@ -527,6 +528,8 @@ class PikaServer : public pstd::noncopyable { void AutoDeleteExpiredDump(); void AutoUpdateNetworkMetric(); void PrintThreadPoolQueueStatus(); + void LogBigKeysInfo(); + void CleanExpiredBigKeys(); void StatDiskUsage(); int64_t GetLastSaveTime(const std::string& dump_dir); diff --git a/src/pika.cc b/src/pika.cc index 9ff7c3008c..2dda6f9c43 100644 --- a/src/pika.cc +++ b/src/pika.cc @@ -211,6 +211,7 @@ int main(int argc, char* argv[]) { g_pika_rm = std::make_unique(); g_network_statistic = std::make_unique(); g_pika_server->InitDBStruct(); + g_pika_server->UpdateDBBigKeysConfig(); //the cmd table of g_pika_cmd_table_manager must be inited before calling PikaServer::InitStatistic(CmdTable* ) g_pika_server->InitStatistic(g_pika_cmd_table_manager->GetCmdTable()); auto status = g_pika_server->InitAcl(); diff --git a/src/pika_admin.cc b/src/pika_admin.cc index 81d47bc43f..8da3260bcc 100644 --- a/src/pika_admin.cc +++ b/src/pika_admin.cc @@ -884,7 +884,7 @@ const std::string InfoCmd::kRocksDBSection = "rocksdb"; const std::string InfoCmd::kDebugSection = "debug"; const std::string InfoCmd::kCommandStatsSection = "commandstats"; const std::string InfoCmd::kCacheSection = "cache"; - +const std::string InfoCmd::kBigKeysSection = "bigkeys"; const std::string ClientCmd::KILLTYPE_NORMAL = "normal"; const std::string ClientCmd::KILLTYPE_PUBSUB = "pubsub"; @@ -910,6 +910,8 @@ void InfoCmd::DoInitial() { keyspace_scan_dbs_ = g_pika_server->GetAllDBName(); } else if (strcasecmp(argv_[1].data(), kServerSection.data()) == 0) { info_section_ = kInfoServer; + } else if (strcasecmp(argv_[1].data(), "bigkeys") == 0) { + info_section_ = kInfoBigKeys; } else if (strcasecmp(argv_[1].data(), kClientsSection.data()) == 0) { info_section_ = kInfoClients; } else if (strcasecmp(argv_[1].data(), kStatsSection.data()) == 0) { @@ -994,6 +996,9 @@ void InfoCmd::Do() { InfoReplication(info); info.append("\r\n"); InfoKeyspace(info); + info.append("\r\n"); + InfoBigKeys(info); + info.append("\r\n"); break; case kInfoAll: InfoServer(info); @@ -1017,6 +1022,9 @@ void InfoCmd::Do() { InfoKeyspace(info); info.append("\r\n"); InfoRocksDB(info); + info.append("\r\n"); + InfoBigKeys(info); + info.append("\r\n"); break; case kInfoServer: InfoServer(info); @@ -1054,6 +1062,9 @@ void InfoCmd::Do() { case kInfoCache: InfoCache(info, db_); break; + case kInfoBigKeys: + InfoBigKeys(info); + break; default: // kInfoErr is nothing break; @@ -1094,7 +1105,21 @@ void InfoCmd::InfoServer(std::string& info) { info.append(tmp_stream.str()); } - +void InfoCmd::InfoBigKeys(std::string& info) { + std::stringstream tmp_stream; + std::shared_lock db_rwl(g_pika_server->dbs_rw_); + for (const auto& db_item : g_pika_server->dbs_) { + if (!db_item.second) { + continue; + } + std::vector bigkeys; + db_item.second->storage()->GetBigKeyStatistics(&bigkeys); + std::string bigkey_info; + storage::FormatBigKeyStatistics(bigkeys, &bigkey_info, g_pika_conf->bigkeys_show_limit()); + tmp_stream << bigkey_info; + } + info.append(tmp_stream.str()); +} void InfoCmd::InfoClients(std::string& info) { std::stringstream tmp_stream; tmp_stream << "# Clients" @@ -1833,7 +1858,27 @@ void ConfigCmd::ConfigGet(std::string& ret) { EncodeString(&config_body, "max-cache-statistic-keys"); EncodeNumber(&config_body, g_pika_conf->max_cache_statistic_keys()); } - + //big keys + if (pstd::stringmatch(pattern.data(), "BIGKEYS_SHOW_LIMIT", 1)) { + elements += 2; + EncodeString(&config_body, "BIGKEYS_SHOW_LIMIT"); + EncodeNumber(&config_body, g_pika_conf->bigkeys_show_limit()); + } + if (pstd::stringmatch(pattern.data(), "bigkeys_member_threshold", 1)) { + elements += 2; + EncodeString(&config_body, "bigkeys_member_threshold"); + EncodeNumber(&config_body, g_pika_conf->bigkeys_member_threshold()); + } + if (pstd::stringmatch(pattern.data(), "bigkeys_key_value_length_threshold", 1)) { + elements += 2; + EncodeString(&config_body, "bigkeys_key_value_length_threshold"); + EncodeNumber(&config_body, g_pika_conf->bigkeys_key_value_length_threshold()); + } + if (pstd::stringmatch(pattern.data(), "bigkeys_log_interval", 1)) { + elements += 2; + EncodeString(&config_body, "bigkeys_log_interval"); + EncodeNumber(&config_body, g_pika_conf->bigkeys_log_interval()); + } if (pstd::stringmatch(pattern.data(), "small-compaction-threshold", 1) != 0) { elements += 2; EncodeString(&config_body, "small-compaction-threshold"); @@ -2963,6 +3008,35 @@ void ConfigCmd::ConfigSet(std::shared_ptr db) { } g_pika_conf->SetMaxConnRbufSize(static_cast(ival)); res_.AppendStringRaw("+OK\r\n"); + //big keys + } else if (set_item == "BIGKEYS_SHOW_LIMIT") { + if (!pstd::string2int(value.data(), value.size(), &ival) || ival < 0) { + res_.AppendStringRaw("-ERR Invalid argument '" + value + "' for CONFIG SET 'BIGKEYS_SHOW_LIMIT'\r\n"); + return; + } + g_pika_conf->SetBigkeysShowLimit(ival); + res_.AppendStringRaw("+OK\r\n"); + } else if (set_item == "bigkeys_member_threshold") { + if (!pstd::string2int(value.data(), value.size(), &ival) || ival < 0) { + res_.AppendStringRaw("-ERR Invalid argument '" + value + "' for CONFIG SET 'bigkeys_member_threshold'\r\n"); + return; + } + g_pika_conf->SetBigkeysMemberCountThreshold(ival); + res_.AppendStringRaw("+OK\r\n"); + } else if (set_item == "bigkeys_key_value_length_threshold") { + if (!pstd::string2int(value.data(), value.size(), &ival) || ival < 0) { + res_.AppendStringRaw("-ERR Invalid argument '" + value + "' for CONFIG SET 'bigkeys_key_value_length_threshold'\r\n"); + return; + } + g_pika_conf->SetBigkeysKeyValueLengthThreshold(ival); + res_.AppendStringRaw("+OK\r\n"); + } else if (set_item == "bigkeys_log_interval") { + if (!pstd::string2int(value.data(), value.size(), &ival) || ival < 0) { + res_.AppendStringRaw("-ERR Invalid argument '" + value + "' for CONFIG SET 'bigkeys_log_interval'\r\n"); + return; + } + g_pika_conf->SetBigkeysLogInterval(ival); + res_.AppendStringRaw("+OK\r\n"); } else { res_.AppendStringRaw("-ERR Unsupported CONFIG parameter: " + set_item + "\r\n"); } diff --git a/src/pika_conf.cc b/src/pika_conf.cc index 94071eac7f..14561c2184 100644 --- a/src/pika_conf.cc +++ b/src/pika_conf.cc @@ -61,7 +61,22 @@ int PikaConf::Load() { if (root_connection_num_ < 0) { root_connection_num_ = 2; } - + GetConfInt("bigkeys_show_limit", &bigkeys_show_limit_); + if (bigkeys_show_limit_ <= 0) { + bigkeys_show_limit_ = 10; + } + GetConfInt("bigkeys_key_value_length_threshold", &bigkeys_key_value_length_threshold_); + if (bigkeys_key_value_length_threshold_ <= 0) { + bigkeys_key_value_length_threshold_ = 1048576; + } + GetConfInt("bigkeys_member_threshold", &bigkeys_member_threshold_); + if (bigkeys_member_threshold_ <= 0) { + bigkeys_member_threshold_ = 10000; + } + GetConfInt("bigkeys_log_interval", &bigkeys_log_interval_); + if (bigkeys_log_interval_ < 0) { + bigkeys_log_interval_ = 1; + } std::string swe; GetConfStr("slowlog-write-errorlog", &swe); slowlog_write_errorlog_.store(swe == "yes" ? true : false); @@ -805,6 +820,11 @@ int PikaConf::ConfigRewrite() { SetConfInt64("thread-migrate-keys-num", thread_migrate_keys_num_); // slaveof config item is special SetConfStr("slaveof", slaveof_); + //big keys + SetConfInt("bigkeys_show_limit", bigkeys_show_limit_); + SetConfInt("bigkeys_key_value_length_threshold", bigkeys_key_value_length_threshold_); + SetConfInt("bigkeys_member_threshold", bigkeys_member_threshold_); + SetConfInt("bigkeys_log_interval", bigkeys_log_interval_); // cache config SetConfStr("cache-index-and-filter-blocks", cache_index_and_filter_blocks_ ? "yes" : "no"); SetConfInt("cache-model", cache_mode_); diff --git a/src/pika_db.cc b/src/pika_db.cc index 58c4f3bf77..8b66c491ea 100644 --- a/src/pika_db.cc +++ b/src/pika_db.cc @@ -194,6 +194,13 @@ void DB::SetCompactRangeOptions(const bool is_canceled) { storage_->SetCompactRangeOptions(is_canceled); } +void DB::UpdateStorageBigKeysConfig(uint32_t log_interval, uint64_t member_threshold, uint64_t key_value_length_threshold, size_t show_limit) { + if (!opened_) { + return; + } + storage_->UpdateBigKeysConfig(log_interval, member_threshold, key_value_length_threshold, show_limit); +} + DisplayCacheInfo DB::GetCacheInfo() { std::lock_guard l(cache_info_rwlock_); return cache_info_; diff --git a/src/pika_server.cc b/src/pika_server.cc index bbf444191d..9ce2ac0057 100644 --- a/src/pika_server.cc +++ b/src/pika_server.cc @@ -506,6 +506,20 @@ void PikaServer::DBSetSmallCompactionDurationThreshold(uint32_t small_compaction } } +void PikaServer::UpdateDBBigKeysConfig() { + std::lock_guard l(dbs_rw_); + for (const auto& db_item : dbs_) { + db_item.second->DBLock(); + db_item.second->UpdateStorageBigKeysConfig( + g_pika_conf->bigkeys_log_interval(), + g_pika_conf->bigkeys_member_threshold(), + g_pika_conf->bigkeys_key_value_length_threshold(), + g_pika_conf->bigkeys_show_limit() + ); + db_item.second->DBUnlock(); + } +} + bool PikaServer::GetDBBinlogOffset(const std::string& db_name, BinlogOffset* const boffset) { std::shared_ptr db = g_pika_rm->GetSyncMasterDBByName(DBInfo(db_name)); if (!db) { @@ -1143,6 +1157,8 @@ void PikaServer::DoTimingTask() { UpdateCacheInfo(); // Print the queue status periodically PrintThreadPoolQueueStatus(); + LogBigKeysInfo(); + CleanExpiredBigKeys(); StatDiskUsage(); } @@ -1901,3 +1917,139 @@ void PikaServer::CacheConfigInit(cache::CacheConfig& cache_cfg) { cache_cfg.lfu_decay_time = g_pika_conf->cache_lfu_decay_time(); } void PikaServer::SetLogNetActivities(bool value) { pika_dispatch_thread_->SetLogNetActivities(value); } + +void PikaServer::LogBigKeysInfo() { + uint64_t interval_minutes = g_pika_conf->bigkeys_log_interval(); + if (interval_minutes == 0) { + return; + } + + thread_local uint64_t last_output_time = 0; + uint64_t current_time = pstd::NowMicros(); + + uint64_t interval_us = interval_minutes * 60 * 1000000; + + if (current_time - last_output_time < interval_us) { + return; + } + + last_output_time = current_time; + + std::shared_lock l(dbs_rw_); + for (const auto& db_item : dbs_) { + if (!db_item.second) { + continue; + } + + std::vector bigkeys; + db_item.second->DBLockShared(); + + db_item.second->storage()->GetBigKeyStatistics(&bigkeys); + + if (!bigkeys.empty()) { + std::map> type_map; + for (const auto& bk : bigkeys) { + type_map[bk.type].push_back(&bk); + } + for (auto& [type, vec] : type_map) { + std::vector sorted_vec = vec; + size_t bigkeys_limit = g_pika_conf->bigkeys_show_limit(); + if (type == storage::DataType::kStrings) { + std::stable_sort(sorted_vec.begin(), sorted_vec.end(), + [](const storage::BigKeyInfo* a, const storage::BigKeyInfo* b) -> bool { + if (a->value_length != b->value_length) { + return a->value_length > b->value_length; + } + return a->key_length > b->key_length; + }); + } else { + std::stable_sort(sorted_vec.begin(), sorted_vec.end(), + [](const storage::BigKeyInfo* a, const storage::BigKeyInfo* b) -> bool { + if (a->member_size != b->member_size) { + return a->member_size > b->member_size; + } + return a->key_length > b->key_length; + }); + } + size_t show_num = std::min(sorted_vec.size(), bigkeys_limit); + LOG(WARNING) << "[BigKey] Found " << sorted_vec.size() << " big keys, showing top " << show_num; + for (size_t i = 0; i < show_num; ++i) { + const auto& bk = *sorted_vec[i]; + if (bk.type == storage::DataType::kStrings) { + LOG(WARNING) << "[BigKey] Type: string, key: " << bk.key + << ", key_length: " << bk.key_length + << ", value_length: " << bk.value_length; + } else { + std::string type_name; + switch (bk.type) { + case storage::DataType::kHashes: type_name = "hash"; break; + case storage::DataType::kLists: type_name = "list"; break; + case storage::DataType::kZSets: type_name = "zset"; break; + case storage::DataType::kSets: type_name = "set"; break; + case storage::DataType::kStreams: type_name = "stream"; break; + default: type_name = "unknown"; break; + } + LOG(WARNING) << "[BigKey] Type: " << type_name << ", key: " << bk.key + << ", key_length: " << bk.key_length + << ", member_size: " << bk.member_size; + } + } + } + } + + db_item.second->DBUnlockShared(); + } +} + +void PikaServer::CleanExpiredBigKeys() { + thread_local uint64_t last_check_time = 0; + uint64_t now = pstd::NowMicros(); + uint64_t interval_us = static_cast(g_pika_conf->bigkeys_log_interval()) * 60 * 1000000; + if (now - last_check_time < interval_us) { + return; + } + last_check_time = now; + + std::shared_lock l(dbs_rw_); + for (const auto& db_item : dbs_) { + if (!db_item.second) continue; + + std::vector bigkeys; + db_item.second->DBLockShared(); + + db_item.second->storage()->GetBigKeyStatistics(&bigkeys); + + for (const auto& bk : bigkeys) { + std::map ttls; + std::map type_status; + + ttls = db_item.second->storage()->TTL(storage::Slice(bk.key), &type_status); + + bool all_expired = true; + for (const auto& ts : type_status) { + if (ts.second.ok()) { // A key with this name and type exists. + auto ttl_it = ttls.find(ts.first); + if (ttl_it != ttls.end()) { + // We found a TTL for it. Check if it's persistent or has time left. + const int64_t ttl = ttl_it->second; + if (ttl > 0 || ttl == -1) { + all_expired = false; + break; + } + } else { + // This is unexpected. The key exists but TTL was not returned. + // To be safe, we assume it is persistent and do not clean it. + all_expired = false; + break; + } + } + } + + if (all_expired) { + db_item.second->storage()->CheckAndRecordBigKeys(bk.key, bk.type, 0, 0, 0, true); + } + } + + db_item.second->DBUnlockShared(); + } +} diff --git a/src/storage/include/storage/storage.h b/src/storage/include/storage/storage.h index 8c2e53a7b3..61c55336e1 100644 --- a/src/storage/include/storage/storage.h +++ b/src/storage/include/storage/storage.h @@ -127,7 +127,15 @@ struct ScoreMember { enum BeforeOrAfter { Before, After }; enum DataType { kAll, kStrings, kHashes, kLists, kZSets, kSets, kStreams }; - +//Big Keys INFO struct +struct BigKeyInfo { + std::string key; + DataType type; + uint64_t member_size; //Record the capacity of complex data types + uint64_t key_length; + uint64_t value_length; +}; +void FormatBigKeyStatistics(const std::vector& bigkeys, std::string* out, size_t bigkeys_limit); const char DataTypeTag[] = {'a', 'k', 'h', 'l', 'z', 's', 'x'}; enum class OptionType { @@ -1100,6 +1108,16 @@ class Storage { Status EnableAutoCompaction(const OptionType& option_type, const std::string& db_type, const std::unordered_map& options); void GetRocksDBInfo(std::string& info); + void GetBigKeyStatistics(std::vector* bigkeys); + // Update big keys related configuration + void UpdateBigKeysConfig(uint32_t log_interval, uint64_t member_threshold, uint64_t key_value_length_threshold, size_t show_limit); + // Add or remove big key from statistics + void CheckAndRecordBigKeys(const std::string& key, DataType type, uint64_t member_size, uint64_t key_length = 0, uint64_t value_length = 0, bool is_delete = false); + //big keys + uint32_t bigkeys_log_interval_; + uint64_t bigkeys_member_threshold_; + uint64_t bigkeys_key_value_length_threshold_; + size_t bigkeys_limit_; private: std::unique_ptr strings_db_; diff --git a/src/storage/src/redis.cc b/src/storage/src/redis.cc index 3066a62759..3085dc838b 100644 --- a/src/storage/src/redis.cc +++ b/src/storage/src/redis.cc @@ -5,7 +5,8 @@ #include "src/redis.h" #include - +#include +#include namespace storage { Redis::Redis(Storage* const s, const DataType& type) @@ -13,7 +14,9 @@ Redis::Redis(Storage* const s, const DataType& type) type_(type), lock_mgr_(std::make_shared(1000, 0, std::make_shared())), small_compaction_threshold_(5000), - small_compaction_duration_threshold_(10000) { + small_compaction_duration_threshold_(10000), + db_(nullptr), + big_keys_info_map_(){ statistics_store_ = std::make_unique>(); scan_cursors_store_ = std::make_unique>(); scan_cursors_store_->SetCapacity(5000); @@ -109,7 +112,17 @@ Status Redis::SetOptions(const OptionType& option_type, const std::unordered_map } return s; } - +inline const char* DataTypeName(DataType type) { + switch (type) { + case kStrings: return "string"; + case kHashes: return "hash"; + case kLists: return "list"; + case kSets: return "set"; + case kZSets: return "zset"; + case kStreams: return "stream"; + default: return "unknown"; + } +} void Redis::GetRocksDBInfo(std::string &info, const char *prefix) { std::ostringstream string_stream; string_stream << "#" << prefix << "RocksDB" << "\r\n"; @@ -191,6 +204,18 @@ void Redis::SetWriteWalOptions(const bool is_wal_disable) { default_write_options_.disableWAL = is_wal_disable; } +void Redis::GetBigKeyStatistics(std::vector* bigkeys) { + if (!bigkeys) { + return; + } + + std::shared_lock lock(big_keys_mutex_); + for (const auto& kv : big_keys_info_map_) { + BigKeyInfo info = kv.second; + info.key = kv.first; + bigkeys->push_back(info); + } +} void Redis::SetCompactRangeOptions(const bool is_canceled) { if (!default_compact_range_options_.canceled) { default_compact_range_options_.canceled = new std::atomic(is_canceled); @@ -198,5 +223,47 @@ void Redis::SetCompactRangeOptions(const bool is_canceled) { default_compact_range_options_.canceled->store(is_canceled); } } - +//big keys +void Redis::CheckAndRecordBigKeys( + const std::string& key, + DataType type, + uint64_t member_size, + uint64_t key_length, + uint64_t value_length, + bool is_delete) { + if (!storage_) { + LOG(WARNING) << "Storage is null in CheckAndRecordBigKeys"; + return; + } + std::unique_lock lock(big_keys_mutex_); + if (is_delete) { + auto it = big_keys_info_map_.find(key); + if (it != big_keys_info_map_.end()) { + big_keys_info_map_.erase(it); + } + return; + } + bool is_bigkey = false; + if (member_size > storage_->bigkeys_member_threshold_) { + is_bigkey = true; + } + if (type == kStrings && ((key_length > storage_->bigkeys_key_value_length_threshold_) || + (value_length > storage_->bigkeys_key_value_length_threshold_))) { + is_bigkey = true; + } + if (is_bigkey) { + BigKeyInfo info; + info.type = type; + info.member_size = member_size; + info.key_length = key_length; + info.value_length = value_length; + info.key = key; + big_keys_info_map_[key] = info; + } else { + auto it = big_keys_info_map_.find(key); + if (it != big_keys_info_map_.end()) { + big_keys_info_map_.erase(it); + } + } +} } // namespace storage diff --git a/src/storage/src/redis.h b/src/storage/src/redis.h index 21eaa2aa94..6644f52624 100644 --- a/src/storage/src/redis.h +++ b/src/storage/src/redis.h @@ -9,6 +9,7 @@ #include #include #include +#include #include "rocksdb/db.h" #include "rocksdb/slice.h" @@ -20,11 +21,15 @@ #include "src/lru_cache.h" #include "src/mutex_impl.h" #include "storage/storage.h" - +#include +#include +#include +#include namespace storage { using Status = rocksdb::Status; using Slice = rocksdb::Slice; - +//Big Keys INFO struct +struct BigKeyInfo; class Redis { public: Redis(Storage* storage, const DataType& type); @@ -115,7 +120,8 @@ class Redis { Status SetSmallCompactionDurationThreshold(uint64_t small_compaction_duration_threshold); std::vector GetHandles(){ return handles_;}; void GetRocksDBInfo(std::string &info, const char *prefix); - + virtual void GetBigKeyStatistics(std::vector* bigkeys); + void CheckAndRecordBigKeys(const std::string& key, DataType type, uint64_t member_size, uint64_t key_length = 0, uint64_t value_length = 0, bool is_delete = false); protected: Storage* const storage_; DataType type_; @@ -126,6 +132,9 @@ class Redis { rocksdb::WriteOptions default_write_options_; rocksdb::ReadOptions default_read_options_; rocksdb::CompactRangeOptions default_compact_range_options_; + // big keys + std::shared_mutex big_keys_mutex_; + std::unordered_map big_keys_info_map_; // For Scan std::unique_ptr> scan_cursors_store_; diff --git a/src/storage/src/redis_hashes.cc b/src/storage/src/redis_hashes.cc index b885a487dd..df69873ac9 100644 --- a/src/storage/src/redis_hashes.cc +++ b/src/storage/src/redis_hashes.cc @@ -421,6 +421,15 @@ Status RedisHashes::HIncrby(const Slice& key, const Slice& field, int64_t value, } s = db_->Write(default_write_options_, &batch); UpdateSpecificKeyStatistics(key.ToString(), statistic); + if (s.ok()) { + int32_t current_count = 1; + + if (meta_value.size() > 0) { + ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value); + current_count = parsed_hashes_meta_value.count(); + } + CheckAndRecordBigKeys(key.ToString(), kHashes, current_count, key.ToString().size(), 0); + } return s; } @@ -495,6 +504,14 @@ Status RedisHashes::HIncrbyfloat(const Slice& key, const Slice& field, const Sli } s = db_->Write(default_write_options_, &batch); UpdateSpecificKeyStatistics(key.ToString(), statistic); + if (s.ok()) { + int32_t current_count = 1; + if (meta_value.size() > 0) { + ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value); + current_count = parsed_hashes_meta_value.count(); + } + CheckAndRecordBigKeys(key.ToString(), kHashes, current_count, key.ToString().size(), 0); + } return s; } @@ -659,6 +676,14 @@ Status RedisHashes::HMSet(const Slice& key, const std::vector& fvs) } s = db_->Write(default_write_options_, &batch); UpdateSpecificKeyStatistics(key.ToString(), statistic); + if (s.ok()) { + int32_t current_count = 1; + if (meta_value.size() > 0) { + ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value); + current_count = parsed_hashes_meta_value.count(); + } + CheckAndRecordBigKeys(key.ToString(), kHashes, current_count, key.ToString().size(), 0); + } return s; } @@ -718,6 +743,8 @@ Status RedisHashes::HSet(const Slice& key, const Slice& field, const Slice& valu } s = db_->Write(default_write_options_, &batch); UpdateSpecificKeyStatistics(key.ToString(), statistic); + int32_t count = *res; + CheckAndRecordBigKeys(key.ToString(), kHashes, count, key.ToString().size(), 0); return s; } @@ -768,7 +795,18 @@ Status RedisHashes::HSetnx(const Slice& key, const Slice& field, const Slice& va } else { return s; } - return db_->Write(default_write_options_, &batch); + s = db_->Write(default_write_options_, &batch); + if (s.ok()) { + int32_t current_count = 1; + if (*ret == 0) { + if (meta_value.size() > 0) { + ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value); + current_count = parsed_hashes_meta_value.count(); + } + } + CheckAndRecordBigKeys(key.ToString(), kHashes, current_count, field.size(), 0); + } + return s; } Status RedisHashes::HVals(const Slice& key, std::vector* values) { @@ -1195,6 +1233,7 @@ Status RedisHashes::Del(const Slice& key) { parsed_hashes_meta_value.InitialMetaValue(); s = db_->Put(default_write_options_, handles_[0], key, meta_value); UpdateSpecificKeyStatistics(key.ToString(), statistic); + CheckAndRecordBigKeys(key.ToString(), kHashes, 0, 0, 0, true); } } return s; diff --git a/src/storage/src/redis_hashes.h b/src/storage/src/redis_hashes.h index cc6c7c6529..bd2e730c07 100644 --- a/src/storage/src/redis_hashes.h +++ b/src/storage/src/redis_hashes.h @@ -18,7 +18,6 @@ class RedisHashes : public Redis { public: RedisHashes(Storage* s, const DataType& type); ~RedisHashes() override = default; - // Common Commands Status Open(const StorageOptions& storage_options, const std::string& db_path) override; Status CompactRange(const rocksdb::Slice* begin, const rocksdb::Slice* end, diff --git a/src/storage/src/redis_lists.cc b/src/storage/src/redis_lists.cc index 09a045a4d4..2d7702c679 100644 --- a/src/storage/src/redis_lists.cc +++ b/src/storage/src/redis_lists.cc @@ -25,6 +25,8 @@ const rocksdb::Comparator* ListsDataKeyComparator() { RedisLists::RedisLists(Storage* const s, const DataType& type) : Redis(s, type) {} + + Status RedisLists::Open(const StorageOptions& storage_options, const std::string& db_path) { statistics_store_->SetCapacity(storage_options.statistics_max_size); small_compaction_threshold_ = storage_options.small_compaction_threshold; @@ -74,6 +76,7 @@ Status RedisLists::Open(const StorageOptions& storage_options, const std::string return rocksdb::DB::Open(db_ops, db_path, column_families, &handles_, &db_); } + Status RedisLists::CompactRange(const rocksdb::Slice* begin, const rocksdb::Slice* end, const ColumnFamilyType& type) { if (type == kMeta || type == kMetaAndData) { db_->CompactRange(default_compact_range_options_, handles_[0], begin, end); @@ -316,8 +319,12 @@ Status RedisLists::LInsert(const Slice& key, const BeforeOrAfter& before_or_afte ListsDataKey lists_target_key(key, version, target_index); batch.Put(handles_[1], lists_target_key.Encode(), value); *ret = static_cast(parsed_lists_meta_value.count()); - return db_->Write(default_write_options_, &batch); - } + Status write_status = db_->Write(default_write_options_, &batch); + if (write_status.ok()) { + CheckAndRecordBigKeys(key.ToString(), kLists, *ret, key.ToString().size(), 0); + } + return write_status; + } } } else if (s.IsNotFound()) { *ret = 0; @@ -429,7 +436,11 @@ Status RedisLists::LPush(const Slice& key, const std::vector& value } else { return s; } - return db_->Write(default_write_options_, &batch); + s = db_->Write(default_write_options_, &batch); + if (s.ok()) { + CheckAndRecordBigKeys(key.ToString(), kLists, *ret, key.ToString().size(), 0); + } + return s; } Status RedisLists::LPushx(const Slice& key, const std::vector& values, uint64_t* len) { @@ -456,7 +467,11 @@ Status RedisLists::LPushx(const Slice& key, const std::vector& valu } batch.Put(handles_[0], key, meta_value); *len = parsed_lists_meta_value.count(); - return db_->Write(default_write_options_, &batch); + Status write_status = db_->Write(default_write_options_, &batch); + if (write_status.ok()) { + CheckAndRecordBigKeys(key.ToString(), kLists, *len, key.ToString().size(), 0); + } + return write_status; } } return s; @@ -978,7 +993,11 @@ Status RedisLists::RPush(const Slice& key, const std::vector& value } else { return s; } - return db_->Write(default_write_options_, &batch); + s = db_->Write(default_write_options_, &batch); + if (s.ok()) { + CheckAndRecordBigKeys(key.ToString(), kLists, *ret, key.ToString().size(), 0); + } + return s; } Status RedisLists::RPushx(const Slice& key, const std::vector& values, uint64_t* len) { @@ -1005,7 +1024,11 @@ Status RedisLists::RPushx(const Slice& key, const std::vector& valu } batch.Put(handles_[0], key, meta_value); *len = parsed_lists_meta_value.count(); - return db_->Write(default_write_options_, &batch); + Status write_status = db_->Write(default_write_options_, &batch); + if (write_status.ok()) { + CheckAndRecordBigKeys(key.ToString(), kLists, *len, key.ToString().size(), 0); + } + return write_status; } } return s; @@ -1155,6 +1178,7 @@ Status RedisLists::Del(const Slice& key) { parsed_lists_meta_value.InitialMetaValue(); s = db_->Put(default_write_options_, handles_[0], key, meta_value); UpdateSpecificKeyStatistics(key.ToString(), statistic); + CheckAndRecordBigKeys(key.ToString(), kLists, 0, 0, 0, true); } } return s; diff --git a/src/storage/src/redis_lists.h b/src/storage/src/redis_lists.h index d56c5e47e7..cb84cb2dfa 100644 --- a/src/storage/src/redis_lists.h +++ b/src/storage/src/redis_lists.h @@ -19,7 +19,7 @@ class RedisLists : public Redis { public: RedisLists(Storage* s, const DataType& type); ~RedisLists() override = default; - + // Common commands Status Open(const StorageOptions& storage_options, const std::string& db_path) override; Status CompactRange(const rocksdb::Slice* begin, const rocksdb::Slice* end, diff --git a/src/storage/src/redis_sets.cc b/src/storage/src/redis_sets.cc index 5707e032d4..29464fc96d 100644 --- a/src/storage/src/redis_sets.cc +++ b/src/storage/src/redis_sets.cc @@ -31,7 +31,6 @@ rocksdb::Status RedisSets::Open(const StorageOptions& storage_options, const std statistics_store_->SetCapacity(storage_options.statistics_max_size); small_compaction_threshold_ = storage_options.small_compaction_threshold; small_compaction_duration_threshold_ = storage_options.small_compaction_duration_threshold; - rocksdb::Options ops(storage_options.options); rocksdb::Status s = rocksdb::DB::Open(ops, db_path, &db_); if (s.ok()) { @@ -265,7 +264,11 @@ rocksdb::Status RedisSets::SAdd(const Slice& key, const std::vector } else { return s; } - return db_->Write(default_write_options_, &batch); + s = db_->Write(default_write_options_, &batch); + if (s.ok()) { + CheckAndRecordBigKeys(key.ToString(), kSets, *ret, key.ToString().size(), 0); + } + return s; } rocksdb::Status RedisSets::SCard(const Slice& key, int32_t* ret) { @@ -1380,6 +1383,7 @@ rocksdb::Status RedisSets::Del(const Slice& key) { parsed_sets_meta_value.InitialMetaValue(); s = db_->Put(default_write_options_, handles_[0], key, meta_value); UpdateSpecificKeyStatistics(key.ToString(), statistic); + CheckAndRecordBigKeys(key.ToString(), kSets, 0, 0, 0, true); } } return s; diff --git a/src/storage/src/redis_sets.h b/src/storage/src/redis_sets.h index 139412da59..e563c2241c 100644 --- a/src/storage/src/redis_sets.h +++ b/src/storage/src/redis_sets.h @@ -9,7 +9,7 @@ #include #include #include - +#include "src/redis.h" #include "src/custom_comparator.h" #include "src/lru_cache.h" #include "src/redis.h" diff --git a/src/storage/src/redis_streams.cc b/src/storage/src/redis_streams.cc index 48578ae5b5..d5d9b89296 100644 --- a/src/storage/src/redis_streams.cc +++ b/src/storage/src/redis_streams.cc @@ -95,7 +95,7 @@ Status RedisStreams::XAdd(const Slice& key, const std::string& serialized_messag if (!s.ok()) { return s; } - + CheckAndRecordBigKeys(key.ToString(), kStreams, stream_meta.length(), key.ToString().size(), 0); return Status::OK(); } @@ -369,7 +369,6 @@ Status RedisStreams::Open(const StorageOptions& storage_options, const std::stri column_families.emplace_back("data_cf", data_cf_ops); return rocksdb::DB::Open(db_ops, db_path, column_families, &handles_, &db_); } - Status RedisStreams::CompactRange(const rocksdb::Slice* begin, const rocksdb::Slice* end, const ColumnFamilyType& type) { if (type == kMeta || type == kMetaAndData) { @@ -607,6 +606,7 @@ Status RedisStreams::Del(const Slice& key) { stream_meta_value.InitMetaValue(); s = db_->Put(default_write_options_, handles_[0], key, stream_meta_value.value()); UpdateSpecificKeyStatistics(key.ToString(), statistic); + CheckAndRecordBigKeys(key.ToString(), kStreams, 0, 0, 0, true); } } return s; diff --git a/src/storage/src/redis_strings.cc b/src/storage/src/redis_strings.cc index 2a39beff6b..249957de42 100644 --- a/src/storage/src/redis_strings.cc +++ b/src/storage/src/redis_strings.cc @@ -13,7 +13,7 @@ #include #include #include - +#include "src/redis.h" #include "src/scope_record_lock.h" #include "src/scope_snapshot.h" #include "src/strings_filter.h" @@ -36,7 +36,6 @@ Status RedisStrings::Open(const StorageOptions& storage_options, const std::stri } table_ops.filter_policy.reset(rocksdb::NewBloomFilterPolicy(10, true)); ops.table_factory.reset(rocksdb::NewBlockBasedTableFactory(table_ops)); - return rocksdb::DB::Open(ops, db_path, &db_); } @@ -176,7 +175,11 @@ Status RedisStrings::Append(const Slice& key, const Slice& value, int32_t* ret, StringsValue strings_value(new_value); strings_value.set_timestamp(timestamp); *ret = static_cast(new_value.size()); - return db_->Put(default_write_options_, key, strings_value.Encode()); + s = db_->Put(default_write_options_, key, strings_value.Encode()); + if (s.ok()) { + CheckAndRecordBigKeys(key.ToString(), kStrings, 1, key.ToString().size(), value.size()); + } + return s; *expired_timestamp_sec = timestamp; } } else if (s.IsNotFound()) { @@ -540,7 +543,11 @@ Status RedisStrings::GetSet(const Slice& key, const Slice& value, std::string* o return s; } StringsValue strings_value(value); - return db_->Put(default_write_options_, key, strings_value.Encode()); + s = db_->Put(default_write_options_, key, strings_value.Encode()); + if (s.ok()) { + CheckAndRecordBigKeys(key.ToString(), kStrings, 1, key.ToString().size(), value.size()); + } + return s; } Status RedisStrings::Incrby(const Slice& key, int64_t value, int64_t* ret, int32_t* expired_timestamp_sec) { @@ -708,6 +715,7 @@ Status RedisStrings::MSet(const std::vector& kvs) { for (const auto& kv : kvs) { StringsValue strings_value(kv.value); batch.Put(kv.key, strings_value.Encode()); + CheckAndRecordBigKeys(kv.key, kStrings, 1, kv.key.size(), kv.value.size()); } return db_->Write(default_write_options_, &batch); } @@ -725,6 +733,7 @@ Status RedisStrings::MSetnx(const std::vector& kvs, int32_t* ret) { exists = true; break; } + CheckAndRecordBigKeys(kv.key, kStrings, 1, kv.key.size(), kv.value.size()); } } if (!exists) { @@ -739,7 +748,11 @@ Status RedisStrings::MSetnx(const std::vector& kvs, int32_t* ret) { Status RedisStrings::Set(const Slice& key, const Slice& value) { StringsValue strings_value(value); ScopeRecordLock l(lock_mgr_, key); - return db_->Put(default_write_options_, key, strings_value.Encode()); + Status s = db_->Put(default_write_options_, key, strings_value.Encode()); + if (s.ok()) { + CheckAndRecordBigKeys(key.ToString(), kStrings, 1, key.ToString().size(), value.size()); + } + return s; } Status RedisStrings::Setxx(const Slice& key, const Slice& value, int32_t* ret, const int32_t ttl) { @@ -765,7 +778,11 @@ Status RedisStrings::Setxx(const Slice& key, const Slice& value, int32_t* ret, c if (ttl > 0) { strings_value.SetRelativeTimestamp(ttl); } - return db_->Put(default_write_options_, key, strings_value.Encode()); + s = db_->Put(default_write_options_, key, strings_value.Encode()); + if (s.ok()) { + CheckAndRecordBigKeys(key.ToString(), kStrings, 1, key.ToString().size(), value.size()); + } + return s; } } @@ -827,7 +844,11 @@ Status RedisStrings::Setex(const Slice& key, const Slice& value, int32_t ttl) { return s; } ScopeRecordLock l(lock_mgr_, key); - return db_->Put(default_write_options_, key, strings_value.Encode()); + s = db_->Put(default_write_options_, key, strings_value.Encode()); + if (s.ok()) { + CheckAndRecordBigKeys(key.ToString(), kStrings, 1, key.ToString().size(), value.size()); + } + return s; } Status RedisStrings::Setnx(const Slice& key, const Slice& value, int32_t* ret, const int32_t ttl) { @@ -845,6 +866,7 @@ Status RedisStrings::Setnx(const Slice& key, const Slice& value, int32_t* ret, c s = db_->Put(default_write_options_, key, strings_value.Encode()); if (s.ok()) { *ret = 1; + CheckAndRecordBigKeys(key.ToString(), kStrings, 1, key.ToString().size(), value.size()); } } } else if (s.IsNotFound()) { @@ -855,6 +877,7 @@ Status RedisStrings::Setnx(const Slice& key, const Slice& value, int32_t* ret, c s = db_->Put(default_write_options_, key, strings_value.Encode()); if (s.ok()) { *ret = 1; + CheckAndRecordBigKeys(key.ToString(), kStrings, 1, key.ToString().size(), value.size()); } } return s; @@ -951,13 +974,21 @@ Status RedisStrings::Setrange(const Slice& key, int64_t start_offset, const Slic *ret = static_cast(new_value.length()); StringsValue strings_value(new_value); strings_value.set_timestamp(timestamp); - return db_->Put(default_write_options_, key, strings_value.Encode()); + Status s = db_->Put(default_write_options_, key, strings_value.Encode()); + if (s.ok()) { + CheckAndRecordBigKeys(key.ToString(), kStrings, 1, key.ToString().size(), value.size()); + } + return s; } else if (s.IsNotFound()) { std::string tmp(start_offset, '\0'); new_value = tmp.append(value.data()); *ret = static_cast(new_value.length()); StringsValue strings_value(new_value); - return db_->Put(default_write_options_, key, strings_value.Encode()); + Status s = db_->Put(default_write_options_, key, strings_value.Encode()); + if (s.ok()) { + CheckAndRecordBigKeys(key.ToString(), kStrings, 1, key.ToString().size(), value.size()); + } + return s; } return s; } @@ -1306,6 +1337,7 @@ Status RedisStrings::Del(const Slice& key) { if (parsed_strings_value.IsStale()) { return Status::NotFound("Stale"); } + CheckAndRecordBigKeys(key.ToString(), kStrings, 1, key.ToString().size(), 0, true); return db_->Delete(default_write_options_, key); } return s; diff --git a/src/storage/src/redis_strings.h b/src/storage/src/redis_strings.h index d0365cc6ae..e3b02d51dd 100644 --- a/src/storage/src/redis_strings.h +++ b/src/storage/src/redis_strings.h @@ -17,7 +17,6 @@ class RedisStrings : public Redis { public: RedisStrings(Storage* s, const DataType& type); ~RedisStrings() override = default; - // Common Commands Status Open(const StorageOptions& storage_options, const std::string& db_path) override; Status CompactRange(const rocksdb::Slice* begin, const rocksdb::Slice* end, diff --git a/src/storage/src/redis_zsets.cc b/src/storage/src/redis_zsets.cc index a4153c9d9a..5795a3d897 100644 --- a/src/storage/src/redis_zsets.cc +++ b/src/storage/src/redis_zsets.cc @@ -399,6 +399,9 @@ Status RedisZSets::ZAdd(const Slice& key, const std::vector& score_ } s = db_->Write(default_write_options_, &batch); UpdateSpecificKeyStatistics(key.ToString(), statistic); + if (s.ok()) { + CheckAndRecordBigKeys(key.ToString(), kZSets, *ret, key.ToString().size(), 0); + } return s; } @@ -538,6 +541,15 @@ Status RedisZSets::ZIncrby(const Slice& key, const Slice& member, double increme *ret = score; s = db_->Write(default_write_options_, &batch); UpdateSpecificKeyStatistics(key.ToString(), statistic); + if (s.ok()) { + int32_t count = 0; + s = db_->Get(default_read_options_, handles_[0], key, &meta_value); + if (s.ok()) { + ParsedZSetsMetaValue parsed_zsets_meta_value(&meta_value); + count = parsed_zsets_meta_value.count(); + } + CheckAndRecordBigKeys(key.ToString(), kZSets, count, key.ToString().size(), 0); + } return s; } @@ -1520,6 +1532,7 @@ Status RedisZSets::Del(const Slice& key) { parsed_zsets_meta_value.InitialMetaValue(); s = db_->Put(default_write_options_, handles_[0], key, meta_value); UpdateSpecificKeyStatistics(key.ToString(), statistic); + CheckAndRecordBigKeys(key.ToString(), kZSets, 0, 0, 0, true); } } return s; diff --git a/src/storage/src/storage.cc b/src/storage/src/storage.cc index 3126717859..85351314f6 100644 --- a/src/storage/src/storage.cc +++ b/src/storage/src/storage.cc @@ -1808,7 +1808,17 @@ std::string Storage::GetCurrentTaskType() { return "No"; } } - +inline const char* DataTypeName(DataType type) { + switch (type) { + case kStrings: return "string"; + case kHashes: return "hash"; + case kLists: return "list"; + case kSets: return "set"; + case kZSets: return "zset"; + case kStreams: return "stream"; + default: return "unknown"; + } +} Status Storage::GetUsage(const std::string& property, uint64_t* const result) { *result = GetProperty(ALL_DB, property); return Status::OK(); @@ -1854,7 +1864,104 @@ uint64_t Storage::GetProperty(const std::string& db_type, const std::string& pro } return result; } +void Storage::GetBigKeyStatistics(std::vector* bigkeys) { + hashes_db_->GetBigKeyStatistics(bigkeys); + lists_db_->GetBigKeyStatistics(bigkeys); + zsets_db_->GetBigKeyStatistics(bigkeys); + sets_db_->GetBigKeyStatistics(bigkeys); + streams_db_->GetBigKeyStatistics(bigkeys); + strings_db_->GetBigKeyStatistics(bigkeys); +} + +void Storage::UpdateBigKeysConfig(uint32_t bigkeys_log_interval, + uint64_t bigkeys_member_threshold, + uint64_t bigkeys_key_value_length_threshold, + size_t bigkeys_show_limit) { + bigkeys_log_interval_ = bigkeys_log_interval; + bigkeys_member_threshold_ = bigkeys_member_threshold; + bigkeys_key_value_length_threshold_ = bigkeys_key_value_length_threshold; + bigkeys_limit_ = bigkeys_show_limit; +} + +void Storage::CheckAndRecordBigKeys(const std::string& key, DataType type, uint64_t member_size, uint64_t key_length, uint64_t value_length, bool is_delete) { + switch (type) { + case kStrings: + strings_db_->CheckAndRecordBigKeys(key, type, member_size, key_length, value_length, is_delete); + break; + case kHashes: + hashes_db_->CheckAndRecordBigKeys(key, type, member_size, key_length, value_length, is_delete); + break; + case kLists: + lists_db_->CheckAndRecordBigKeys(key, type, member_size, key_length, value_length, is_delete); + break; + case kZSets: + zsets_db_->CheckAndRecordBigKeys(key, type, member_size, key_length, value_length, is_delete); + break; + case kSets: + sets_db_->CheckAndRecordBigKeys(key, type, member_size, key_length, value_length, is_delete); + break; + case kStreams: + streams_db_->CheckAndRecordBigKeys(key, type, member_size, key_length, value_length, is_delete); + break; + default: + break; + } +} +void FormatBigKeyStatistics(const std::vector& bigkeys, std::string* out, size_t bigkeys_limit) { + std::map> type_map; + for (const auto& bk : bigkeys) { + type_map[bk.type].push_back(&bk); + } + std::ostringstream oss; + oss << "# BigKeys statistics" << std::endl; + oss << "# Show only the first " << bigkeys_limit << " big keys of each type" << std::endl; + oss << "Total number of big keys: " << bigkeys.size() << std::endl; + for (const auto& [type, vec] : type_map) { + std::string type_name = DataTypeName(type); + if (!type_name.empty()) { + type_name[0] = toupper(type_name[0]); + } + oss << "# " << type_name << std::endl; + oss << "Big Keys number: " << vec.size() << std::endl; + std::vector sorted_vec = vec; + if (type == DataType::kStrings) { + // String type: First sort by value_length in descending order, then by key_length in descending order when the same + std::stable_sort(sorted_vec.begin(), sorted_vec.end(), + [](const BigKeyInfo* a, const BigKeyInfo* b) -> bool { + if (a->value_length != b->value_length) { + return a->value_length > b->value_length; + } + return a->key_length > b->key_length; + }); + } else { + // Non-String type: First sort by member_size in descending order, then by key_length in descending order when the same + std::stable_sort(sorted_vec.begin(), sorted_vec.end(), + [](const BigKeyInfo* a, const BigKeyInfo* b) -> bool { + if (a->member_size != b->member_size) { + return a->member_size > b->member_size; + } + return a->key_length > b->key_length; + }); + } + size_t show_num = std::min(sorted_vec.size(), bigkeys_limit); + for (size_t i = 0; i < show_num; ++i) { + const auto* bk = sorted_vec[i]; + if (type == DataType::kStrings) { + oss << "Type: string, key: " << bk->key + << ", key_length: " << bk->key_length + << ", value_length: " << bk->value_length; + } else { + oss << "Type: " << DataTypeName(type) << ", key: " << bk->key + << ", key_length: " << bk->key_length + << ", member_size: " << bk->member_size; + } + oss << std::endl; + } + oss << std::endl; + } + *out = oss.str(); +} Status Storage::GetKeyNum(std::vector* key_infos) { KeyInfo key_info; // NOTE: keep the db order with string, hash, list, zset, set diff --git a/tools/pika_exporter/config/info.toml b/tools/pika_exporter/config/info.toml index 5597752f93..1dcf70175b 100644 --- a/tools/pika_exporter/config/info.toml +++ b/tools/pika_exporter/config/info.toml @@ -6,7 +6,7 @@ cpu = true replication = true keyspace = true cache = true - +bigkeys = true execcount = false commandstats = false rocksdb = false diff --git a/tools/pika_exporter/exporter/client.go b/tools/pika_exporter/exporter/client.go index 5410ffe9a4..b302228148 100644 --- a/tools/pika_exporter/exporter/client.go +++ b/tools/pika_exporter/exporter/client.go @@ -127,6 +127,7 @@ func (c *client) InfoNoneCommandList() (string, error) { "COMMAND_EXEC_COUNT": InfoConf.Execcount, "COMMANDSTATS": InfoConf.Commandstats, "ROCKSDB": InfoConf.Rocksdb, + "BIGKEYS": InfoConf.BigKeys, } for section, flag := range sectionsMap { if flag { @@ -155,6 +156,7 @@ func (c *client) InfoAllCommandList() (string, error) { "COMMAND_EXEC_COUNT": InfoConf.Execcount, "COMMANDSTATS": InfoConf.Commandstats, "ROCKSDB": InfoConf.Rocksdb, + "BIGKEYS": InfoConf.BigKeys, } for section, flag := range sectionsMap { if flag { diff --git a/tools/pika_exporter/exporter/conf.go b/tools/pika_exporter/exporter/conf.go index 5e2bd2460c..6e9d65a7ae 100644 --- a/tools/pika_exporter/exporter/conf.go +++ b/tools/pika_exporter/exporter/conf.go @@ -23,7 +23,7 @@ type InfoConfig struct { Commandstats bool `toml:"commandstats"` Rocksdb bool `toml:"rocksdb"` Cache bool `toml:"cache"` - + BigKeys bool `toml:"bigkeys"` Info bool InfoAll bool } diff --git a/tools/pika_exporter/exporter/metrics/bigkeys.go b/tools/pika_exporter/exporter/metrics/bigkeys.go new file mode 100644 index 0000000000..baf232181c --- /dev/null +++ b/tools/pika_exporter/exporter/metrics/bigkeys.go @@ -0,0 +1,116 @@ +package metrics + +import ( + "regexp" + "strconv" +) + +type multiMatchRegexParser struct { + name string + source string + reg *regexp.Regexp +} + +func (p *multiMatchRegexParser) Parse(m MetricMeta, c Collector, opt ParseOption) { + s := opt.Info + if p.source != "" { + s, _ = opt.Extracts[p.source] + } + + matches := p.reg.FindAllStringSubmatch(s, -1) + if matches == nil || len(matches) == 0 { + return + } + + processedKeys := make(map[string]bool) + + for _, match := range matches { + extracts := make(map[string]string) + for k, v := range opt.Extracts { + extracts[k] = v + } + + for i, name := range p.reg.SubexpNames() { + if i > 0 && i < len(match) { + if name != "" { + extracts[name] = trimSpace(match[i]) + } else { + extracts[strconv.Itoa(i)] = trimSpace(match[i]) + } + } + } + + key := "" + if val, ok := extracts["key"]; ok { + key = val + } + + if key != "" && processedKeys[key] { + continue + } + + if key != "" { + processedKeys[key] = true + } + + opt.Extracts = extracts + (&normalParser{}).Parse(m, c, opt) + } +} + +func RegisterBigKeys() { + Register(collectBigKeysMetrics) +} + +var collectBigKeysMetrics = map[string]MetricConfig{ + "bigkeys_string": { + Parser: &multiMatchRegexParser{ + name: "bigkeys_string", + source: "bigkeys_output", + reg: regexp.MustCompile( + `Type: string, key: (?P[^,]+), key_length: (?P\d+), value_length: (?P\d+)`, + ), + }, + MetricMeta: MetaDatas{ + { + Name: "bigkeys_string_key_length", + Help: "Big key length for string type", + Type: metricTypeGauge, + Labels: []string{LabelNameAddr, LabelNameAlias, "type", "key"}, + ValueName: "key_length", + }, + { + Name: "bigkeys_string_value_length", + Help: "Big key value length for string type", + Type: metricTypeGauge, + Labels: []string{LabelNameAddr, LabelNameAlias, "type", "key"}, + ValueName: "value_length", + }, + }, + }, + "bigkeys_complex": { + Parser: &multiMatchRegexParser{ + name: "bigkeys_complex", + source: "bigkeys_output", + reg: regexp.MustCompile( + `Type: (?Phash|list|set|zset), key: (?P[^,]+), key_length: (?P\d+), member_size: (?P\d+)`, + ), + }, + MetricMeta: MetaDatas{ + { + Name: "bigkeys_member_size", + Help: "Big key member size by type and key", + Type: metricTypeGauge, + Labels: []string{LabelNameAddr, LabelNameAlias, "type", "key"}, + ValueName: "member_size", + }, + { + Name: "bigkeys_complex_key_length", + Help: "Big key length for complex type", + Type: metricTypeGauge, + Labels: []string{LabelNameAddr, LabelNameAlias, "type", "key"}, + ValueName: "key_length", + }, + }, + }, +} \ No newline at end of file diff --git a/tools/pika_exporter/exporter/parser.go b/tools/pika_exporter/exporter/parser.go index b6543dfb46..ffa5efe6c2 100644 --- a/tools/pika_exporter/exporter/parser.go +++ b/tools/pika_exporter/exporter/parser.go @@ -30,7 +30,13 @@ func parseInfo(info string) (*semver.Version, map[string]string, error) { func extractInfo(s string) (map[string]string, error) { m := make(map[string]string) - scanner := bufio.NewScanner(strings.NewReader(s)) + + kvPart, bigKeysOutput := extractBigKeys(s) + if bigKeysOutput != "" { + m["bigkeys_output"] = bigKeysOutput + } + + scanner := bufio.NewScanner(strings.NewReader(kvPart)) for scanner.Scan() { line := scanner.Text() @@ -49,6 +55,15 @@ func extractInfo(s string) (map[string]string, error) { return m, scanner.Err() } +func extractBigKeys(s string) (kvPart, bigKeysOutput string) { + if startIdx := strings.Index(s, "# BigKeys statistics"); startIdx != -1 { + // Ensure we split at a line boundary + lineStartIdx := strings.LastIndex(s[:startIdx], "\n") + 1 + return s[:lineStartIdx], s[lineStartIdx:] + } + return s, "" +} + func trimSpace(s string) string { s = strings.TrimLeft(s, " ") s = strings.TrimRight(s, " ") diff --git a/tools/pika_exporter/exporter/pika.go b/tools/pika_exporter/exporter/pika.go index 0e74da6356..77099880ca 100644 --- a/tools/pika_exporter/exporter/pika.go +++ b/tools/pika_exporter/exporter/pika.go @@ -361,6 +361,9 @@ func (e *exporter) registerMetrics() { if config.Cache { metrics.RegisterCache() } + if config.BigKeys { + metrics.RegisterBigKeys() + } metrics.RegisterBinlog() }