diff --git a/CMakeLists.txt b/CMakeLists.txt index f3bb015d5c..bb185ebe8b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -652,6 +652,24 @@ ExternalProject_Add(rocksdb make -j${CPU_CORE} ) +ExternalProject_Add(prometheus_cpp + URL + https://github.com/jupp0r/prometheus-cpp/releases/download/v1.2.4/prometheus-cpp-with-submodules.tar.gz + CMAKE_ARGS + -DBUILD_SHARED_LIBS=OFF + -DENABLE_PUSH=OFF + -DENABLE_COMPRESSION=OFF + -DCMAKE_INSTALL_LIBDIR=${INSTALL_LIBDIR} + -DCMAKE_INSTALL_INCLUDEDIR=${INSTALL_INCLUDEDIR} + BUILD_ALWAYS + 1 + BUILD_COMMAND + make -j${CPU_CORE} +) + +set(PROMETHEUS_CPP_CORE_LIB ${INSTALL_LIBDIR}/libprometheus-cpp-core.a) +set(PROMETHEUS_CPP_PULL_LIB ${INSTALL_LIBDIR}/libprometheus-cpp-pull.a) + ExternalProject_Add(rediscache URL https://github.com/pikiwidb/rediscache/archive/refs/tags/v1.0.7.tar.gz @@ -846,6 +864,8 @@ target_link_libraries(${PROJECT_NAME} ${LIB_PROTOBUF} ${LIB_GFLAGS} ${LIB_FMT} + ${PROMETHEUS_CPP_PULL_LIB} + ${PROMETHEUS_CPP_CORE_LIB} libsnappy.a libzstd.a liblz4.a diff --git a/include/pika_admin.h b/include/pika_admin.h index de0ddd5a0f..c88b9f2b1e 100644 --- a/include/pika_admin.h +++ b/include/pika_admin.h @@ -267,6 +267,8 @@ class InfoCmd : public Cmd { kInfoAll, kInfoDebug, kInfoCommandStats, + kInfoSlowCommand, + kInfoCommandP99, kInfoCache }; InfoCmd(const std::string& name, int arity, uint32_t flag) : Cmd(name, arity, flag) {} @@ -294,6 +296,8 @@ class InfoCmd : public Cmd { const static std::string kRocksDBSection; const static std::string kDebugSection; const static std::string kCommandStatsSection; + const static std::string kCommandP99Section; + const static std::string kSlowCommandSection; const static std::string kCacheSection; void DoInitial() override; @@ -314,6 +318,8 @@ class InfoCmd : public Cmd { void InfoRocksDB(std::string& info); void InfoDebug(std::string& info); void InfoCommandStats(std::string& info); + void InfoCommandP99(std::string& info); + void InfoSlowCommand(std::string& info); void InfoCache(std::string& info, std::shared_ptr db); std::string CacheStatusToString(int status); diff --git a/include/pika_client_conn.h b/include/pika_client_conn.h index bc4c28db6a..749288b6ca 100644 --- a/include/pika_client_conn.h +++ b/include/pika_client_conn.h @@ -130,7 +130,7 @@ class PikaClientConn : public net::RedisConn { std::shared_ptr DoCmd(const PikaCmdArgsType& argv, const std::string& opt, const std::shared_ptr& resp_ptr, bool cache_miss_in_rtc); - void ProcessSlowlog(const PikaCmdArgsType& argv, std::shared_ptr c_ptr); + void ProcessSlowlog(const PikaCmdArgsType& argv, std::shared_ptr c_ptr, const std::string& opt); void ProcessMonitor(const PikaCmdArgsType& argv); void ExecRedisCmd(const PikaCmdArgsType& argv, std::shared_ptr& resp_ptr, bool cache_miss_in_rtc); diff --git a/include/pika_cmd_table_manager.h b/include/pika_cmd_table_manager.h index 8177fa63b9..2a8bb9affd 100644 --- a/include/pika_cmd_table_manager.h +++ b/include/pika_cmd_table_manager.h @@ -8,11 +8,17 @@ #include #include +#include +#include +#include +#include #include "include/acl.h" #include "include/pika_command.h" #include "include/pika_data_distribution.h" +using namespace prometheus; + struct CommandStatistics { CommandStatistics() = default; CommandStatistics(const CommandStatistics& other) { @@ -23,6 +29,23 @@ struct CommandStatistics { std::atomic cmd_time_consuming = 0; }; +struct HistogramData { + std::shared_ptr registry; + prometheus::Family* family; + std::unordered_map histograms; + + HistogramData() { + registry = std::make_shared(); + family = &prometheus::BuildHistogram() + .Name("pika_command_duration_seconds") + .Help("Execution time of Pika commands in seconds") + .Register(*registry); + } + + HistogramData(const HistogramData&) = delete; + HistogramData& operator=(const HistogramData&) = delete; +}; + class PikaCmdTableManager { friend AclSelector; @@ -42,6 +65,11 @@ class PikaCmdTableManager { * Info Commandstats used */ std::unordered_map* GetCommandStatMap(); + std::unordered_map GetSlowCommandCount(); + void UpdateSlowCommandCount(const std::string& opt); + void ResetCommandCount(); + prometheus::Histogram& GetHistogram(const std::string& opt); + std::shared_ptr GetHistogramsData(); private: std::shared_ptr NewCommand(const std::string& opt); @@ -60,5 +88,10 @@ class PikaCmdTableManager { * Info Commandstats used */ std::unordered_map cmdstat_map_; + std::unordered_map slow_command_count_; + std::shared_mutex slow_command_mutex_; + std::shared_mutex histograms_mutex_; + std::mutex data_mutex_; + std::shared_ptr data_; }; #endif diff --git a/include/pika_server.h b/include/pika_server.h index 81cda87b04..16f75e8ccf 100644 --- a/include/pika_server.h +++ b/include/pika_server.h @@ -259,6 +259,7 @@ class PikaServer : public pstd::noncopyable { void ResetStat(); void incr_accumulative_connections(); void ResetLastSecQuerynum(); + void ResetCommandCount(); void UpdateQueryNumAndExecCountDB(const std::string& db_name, const std::string& command, bool is_write); std::unordered_map ServerExecCountDB(); std::unordered_map ServerAllDBStat(); diff --git a/src/pika_admin.cc b/src/pika_admin.cc index 81d47bc43f..010b30e9de 100644 --- a/src/pika_admin.cc +++ b/src/pika_admin.cc @@ -882,6 +882,8 @@ const std::string InfoCmd::kKeyspaceSection = "keyspace"; const std::string InfoCmd::kDataSection = "data"; const std::string InfoCmd::kRocksDBSection = "rocksdb"; const std::string InfoCmd::kDebugSection = "debug"; +const std::string InfoCmd::kCommandP99Section = "commandp99"; +const std::string InfoCmd::kSlowCommandSection = "slowcommand"; const std::string InfoCmd::kCommandStatsSection = "commandstats"; const std::string InfoCmd::kCacheSection = "cache"; @@ -967,6 +969,10 @@ void InfoCmd::DoInitial() { info_section_ = kInfoDebug; } else if (strcasecmp(argv_[1].data(), kCommandStatsSection.data()) == 0) { info_section_ = kInfoCommandStats; + } else if (strcasecmp(argv_[1].data(), kCommandP99Section.data()) == 0) { + info_section_ = kInfoCommandP99; + } else if (strcasecmp(argv_[1].data(), kSlowCommandSection.data()) == 0) { + info_section_ = kInfoSlowCommand; } else if (strcasecmp(argv_[1].data(), kCacheSection.data()) == 0) { info_section_ = kInfoCache; } else { @@ -1008,6 +1014,10 @@ void InfoCmd::Do() { info.append("\r\n"); InfoCommandStats(info); info.append("\r\n"); + InfoCommandP99(info); + info.append("\r\n"); + InfoSlowCommand(info); + info.append("\r\n"); InfoCache(info, db_); info.append("\r\n"); InfoCPU(info); @@ -1051,6 +1061,12 @@ void InfoCmd::Do() { case kInfoCommandStats: InfoCommandStats(info); break; + case kInfoCommandP99: + InfoCommandP99(info); + break; + case kInfoSlowCommand: + InfoSlowCommand(info); + break; case kInfoCache: InfoCache(info, db_); break; @@ -1499,6 +1515,73 @@ void InfoCmd::InfoCommandStats(std::string& info) { info.append(tmp_stream.str()); } +void InfoCmd::InfoCommandP99(std::string& info) { + std::stringstream tmp_stream; + tmp_stream.precision(2); + tmp_stream.setf(std::ios::fixed); + tmp_stream << "# Commands P99" << "\r\n"; + auto data = g_pika_cmd_table_manager->GetHistogramsData(); + auto* histogram_family = data->family; + for (const auto& metric_family : histogram_family->Collect()) { + for (const auto& metric : metric_family.metric) { + std::string command_name; + + for (const auto& label : metric.label) { + if (label.name == "command") { + command_name = label.value; + break; + } + } + + double total_count = metric.histogram.sample_count; + + if (command_name.empty()) { + tmp_stream << "Command: UNKNOWN\r\n"; + } else { + tmp_stream << "Command: " << command_name << "\r\n"; + } + + double tp99_threshold = total_count * 0.99; + double tp999_threshold = total_count * 0.999; + double tp9999_threshold = total_count * 0.9999; + double tp99 = 0, tp999 = 0, tp9999 = 0; + + for (const auto& bucket : metric.histogram.bucket) { + if (bucket.cumulative_count >= tp99_threshold && tp99 == 0) { + tp99 = bucket.upper_bound; + } + if (bucket.cumulative_count >= tp999_threshold && tp999 == 0) { + tp999 = bucket.upper_bound; + } + if (bucket.cumulative_count >= tp9999_threshold && tp9999 == 0) { + tp9999 = bucket.upper_bound; + break; + } + } + tmp_stream << "TP99 ms: " << tp99 << "\r\n"; + tmp_stream << "TP999 ms: " << tp999 << "\r\n"; + tmp_stream << "TP9999 ms: " << tp9999 << "\r\n"; + tmp_stream << "----------------------\r\n"; + } + } + + info.append(tmp_stream.str()); +} + +void InfoCmd::InfoSlowCommand(std::string& info) { + std::stringstream tmp_stream; + tmp_stream.precision(2); + tmp_stream.setf(std::ios::fixed); + auto stats = g_pika_cmd_table_manager->GetSlowCommandCount(); + tmp_stream << "# SlowCommand Count" << "\r\n"; + for (auto iter : stats) { + if (iter.second.cmd_count != 0) { + tmp_stream << iter.first << ":slow_count=" << iter.second.cmd_count << "\r\n"; + } + } + info.append(tmp_stream.str()); +} + void InfoCmd::InfoCache(std::string& info, std::shared_ptr db) { std::stringstream tmp_stream; tmp_stream << "# Cache" << "\r\n"; diff --git a/src/pika_client_conn.cc b/src/pika_client_conn.cc index d84a78de06..b0cf13a612 100644 --- a/src/pika_client_conn.cc +++ b/src/pika_client_conn.cc @@ -7,7 +7,10 @@ #include #include #include - +#include +#include +#include +#include #include "include/pika_admin.h" #include "include/pika_client_conn.h" #include "include/pika_cmd_table_manager.h" @@ -221,19 +224,21 @@ std::shared_ptr PikaClientConn::DoCmd(const PikaCmdArgsType& argv, const st c_ptr->Execute(); time_stat_->process_done_ts_ = pstd::NowMicros(); + g_pika_cmd_table_manager->GetHistogram(opt).Observe(time_stat_->total_time() / 1000); auto cmdstat_map = g_pika_cmd_table_manager->GetCommandStatMap(); (*cmdstat_map)[opt].cmd_count.fetch_add(1); (*cmdstat_map)[opt].cmd_time_consuming.fetch_add(time_stat_->total_time()); if (g_pika_conf->slowlog_slower_than() >= 0) { - ProcessSlowlog(argv, c_ptr); + ProcessSlowlog(argv, c_ptr, opt); } return c_ptr; } -void PikaClientConn::ProcessSlowlog(const PikaCmdArgsType& argv, std::shared_ptr c_ptr) { +void PikaClientConn::ProcessSlowlog(const PikaCmdArgsType& argv, std::shared_ptr c_ptr, const std::string& opt) { if (time_stat_->total_time() > g_pika_conf->slowlog_slower_than()) { + g_pika_cmd_table_manager->UpdateSlowCommandCount(opt); g_pika_server->SlowlogPushEntry(argv, time_stat_->start_ts() / 1000000, time_stat_->total_time()); if (g_pika_conf->slowlog_write_errorlog()) { bool trim = false; diff --git a/src/pika_cmd_table_manager.cc b/src/pika_cmd_table_manager.cc index 974fceb0ee..60058f85c6 100644 --- a/src/pika_cmd_table_manager.cc +++ b/src/pika_cmd_table_manager.cc @@ -14,9 +14,21 @@ extern std::unique_ptr g_pika_conf; +void PikaCmdTableManager::ResetCommandCount() { + { + std::unique_lock write_lock(slow_command_mutex_); + slow_command_count_.clear(); + } + { + std::lock_guard lock(data_mutex_); + data_ = std::make_shared(); + } +} + PikaCmdTableManager::PikaCmdTableManager() { cmds_ = std::make_unique(); cmds_->reserve(300); + data_ = std::make_shared(); } void PikaCmdTableManager::InitCmdTable(void) { @@ -63,6 +75,61 @@ void PikaCmdTableManager::RenameCommand(const std::string before, const std::str } } +prometheus::Histogram& PikaCmdTableManager::GetHistogram(const std::string& opt) { + std::shared_ptr data_copy; + { + std::lock_guard lock(data_mutex_); + data_copy = data_; + } + + { + std::shared_lock read_lock(histograms_mutex_); + auto it = data_copy->histograms.find(opt); + if (it != data_copy->histograms.end()) { + return *(it->second); + } + } + + std::unique_lock write_lock(histograms_mutex_); + auto& new_histogram = data_copy->family->Add( + {{"command", opt}}, + prometheus::Histogram::BucketBoundaries{0.5, 1, 2, 3, 5, 7, 10, 15, 20, 30, 40, 50, 65, 75, 85, 100, 125, 140, 150, 160, 175, 185, 200, 300, 400, 500, 750, 1000, 2000, 5000, 10000} + ); + data_copy->histograms[opt] = &new_histogram; + return new_histogram; +} + +std::shared_ptr PikaCmdTableManager::GetHistogramsData() { + std::shared_ptr data_copy; + { + std::lock_guard lock(data_mutex_); + data_copy = data_; + } + return data_copy; +} + +void PikaCmdTableManager::UpdateSlowCommandCount(const std::string& opt) { + { + std::shared_lock read_lock(slow_command_mutex_); + if (slow_command_count_.find(opt) != slow_command_count_.end()) { + slow_command_count_[opt].cmd_count.fetch_add(1); + return; + } + } + + { + std::unique_lock write_lock(slow_command_mutex_); + slow_command_count_[opt]; + } + + slow_command_count_[opt].cmd_count.fetch_add(1); +} + +std::unordered_map PikaCmdTableManager::GetSlowCommandCount() { + std::shared_lock lock(slow_command_mutex_); + return slow_command_count_; +} + std::unordered_map* PikaCmdTableManager::GetCommandStatMap() { return &cmdstat_map_; } diff --git a/src/pika_server.cc b/src/pika_server.cc index bbf444191d..f4d1a47158 100644 --- a/src/pika_server.cc +++ b/src/pika_server.cc @@ -1033,6 +1033,16 @@ void PikaServer::ResetLastSecQuerynum() { statistic_.ResetDBLastSecQuerynum(); } +void PikaServer::ResetCommandCount() { + thread_local uint64_t last_reset_time = 0; + auto current_time = pstd::NowMicros(); + if (current_time - last_reset_time < 60 * 1000 * 1000) { + return; + } + last_reset_time = current_time; + g_pika_cmd_table_manager->ResetCommandCount(); +} + void PikaServer::UpdateQueryNumAndExecCountDB(const std::string& db_name, const std::string& command, bool is_write) { std::string cmd(command); statistic_.server_stat.qps.querynum++; @@ -1139,6 +1149,8 @@ void PikaServer::DoTimingTask() { ResetLastSecQuerynum(); // Auto update network instantaneous metric AutoUpdateNetworkMetric(); + // Reset command statistics + ResetCommandCount(); ProcessCronTask(); UpdateCacheInfo(); // Print the queue status periodically diff --git a/tools/pika_exporter/config/info.toml b/tools/pika_exporter/config/info.toml index 5597752f93..91cb111d31 100644 --- a/tools/pika_exporter/config/info.toml +++ b/tools/pika_exporter/config/info.toml @@ -6,6 +6,8 @@ cpu = true replication = true keyspace = true cache = true +commandp99 = true +slowcommand = true execcount = false commandstats = false diff --git a/tools/pika_exporter/exporter/client.go b/tools/pika_exporter/exporter/client.go index 5410ffe9a4..b1266ce223 100644 --- a/tools/pika_exporter/exporter/client.go +++ b/tools/pika_exporter/exporter/client.go @@ -127,6 +127,8 @@ func (c *client) InfoNoneCommandList() (string, error) { "COMMAND_EXEC_COUNT": InfoConf.Execcount, "COMMANDSTATS": InfoConf.Commandstats, "ROCKSDB": InfoConf.Rocksdb, + "CommandP99": InfoConf.CommandP99, + "SlowCommand": InfoConf.SlowCommand, } for section, flag := range sectionsMap { if flag { @@ -155,6 +157,8 @@ func (c *client) InfoAllCommandList() (string, error) { "COMMAND_EXEC_COUNT": InfoConf.Execcount, "COMMANDSTATS": InfoConf.Commandstats, "ROCKSDB": InfoConf.Rocksdb, + "CommandP99": InfoConf.CommandP99, + "SlowCommand": InfoConf.SlowCommand, } for section, flag := range sectionsMap { if flag { diff --git a/tools/pika_exporter/exporter/conf.go b/tools/pika_exporter/exporter/conf.go index 5e2bd2460c..f88a36f975 100644 --- a/tools/pika_exporter/exporter/conf.go +++ b/tools/pika_exporter/exporter/conf.go @@ -21,6 +21,8 @@ type InfoConfig struct { Keyspace bool `toml:"keyspace"` Execcount bool `toml:"execcount"` Commandstats bool `toml:"commandstats"` + CommandP99 bool `toml:"commandp99"` + SlowCommand bool `toml:"slowcommand"` Rocksdb bool `toml:"rocksdb"` Cache bool `toml:"cache"` diff --git a/tools/pika_exporter/exporter/metrics/commandp99.go b/tools/pika_exporter/exporter/metrics/commandp99.go new file mode 100644 index 0000000000..4671c38420 --- /dev/null +++ b/tools/pika_exporter/exporter/metrics/commandp99.go @@ -0,0 +1,42 @@ +package metrics + +import ( + "regexp" +) + +func RegisterCommandP99() { + Register(collectCommandP99Metrics) +} + +var collectCommandP99Metrics = map[string]MetricConfig{ + "command_p99_info": { + Parser: ®exParser{ + name: "command_p99_info", + reg: regexp.MustCompile(`Command:\s*(?P\S+)\s*\r?\n(?:.*\r?\n)*?TP99 ms:\s*(?P[\d.]+)\s*\r?\n.*?TP999 ms:\s*(?P[\d.]+)\s*\r?\n.*?TP9999 ms:\s*(?P[\d.]+)`), + Parser: &normalParser{}, + }, + MetricMeta: MetaDatas{ + { + Name: "command_p99_latency", + Help: "99th percentile latency (ms) for each Pika command", + Type: metricTypeGauge, + Labels: []string{LabelNameAddr, LabelNameAlias, "cmd"}, + ValueName: "tp99", + }, + { + Name: "command_p999_latency", + Help: "99.9th percentile latency (ms) for each Pika command", + Type: metricTypeGauge, + Labels: []string{LabelNameAddr, LabelNameAlias, "cmd"}, + ValueName: "tp999", + }, + { + Name: "command_p9999_latency", + Help: "99.99th percentile latency (ms) for each Pika command", + Type: metricTypeGauge, + Labels: []string{LabelNameAddr, LabelNameAlias, "cmd"}, + ValueName: "tp9999", + }, + }, + }, +} \ No newline at end of file diff --git a/tools/pika_exporter/exporter/metrics/slowcommand.go b/tools/pika_exporter/exporter/metrics/slowcommand.go new file mode 100644 index 0000000000..2ae623f991 --- /dev/null +++ b/tools/pika_exporter/exporter/metrics/slowcommand.go @@ -0,0 +1,28 @@ +package metrics + +import ( + "regexp" +) + +func RegisterSlowCommand() { + Register(collectSlowCommandMetrics) +} + +var collectSlowCommandMetrics = map[string]MetricConfig{ + "slow_command_info": { + Parser: ®exParser{ + name: "slow_command_info", + reg: regexp.MustCompile(`(?P\S+):slow_count=(?P\d+)`), + Parser: &normalParser{}, + }, + MetricMeta: MetaDatas{ + { + Name: "command_slow_count", + Help: "Number of times each Pika command was slow", + Type: metricTypeGauge, + Labels: []string{LabelNameAddr, LabelNameAlias, "cmd"}, + ValueName: "slow_count", + }, + }, + }, +} diff --git a/tools/pika_exporter/exporter/pika.go b/tools/pika_exporter/exporter/pika.go index 0e74da6356..4a1a666cbf 100644 --- a/tools/pika_exporter/exporter/pika.go +++ b/tools/pika_exporter/exporter/pika.go @@ -346,6 +346,12 @@ func (e *exporter) registerMetrics() { if config.Replication { metrics.RegisterReplication() } + if config.CommandP99 { + metrics.RegisterCommandP99() + } + if config.SlowCommand { + metrics.RegisterSlowCommand() + } if config.Keyspace { metrics.RegisterKeyspace() }