Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -652,6 +652,24 @@ ExternalProject_Add(rocksdb
make -j${CPU_CORE}
)

ExternalProject_Add(prometheus_cpp
URL
https://github.com/jupp0r/prometheus-cpp/releases/download/v1.2.4/prometheus-cpp-with-submodules.tar.gz
CMAKE_ARGS
-DBUILD_SHARED_LIBS=OFF
-DENABLE_PUSH=OFF
-DENABLE_COMPRESSION=OFF
-DCMAKE_INSTALL_LIBDIR=${INSTALL_LIBDIR}
-DCMAKE_INSTALL_INCLUDEDIR=${INSTALL_INCLUDEDIR}
BUILD_ALWAYS
1
BUILD_COMMAND
make -j${CPU_CORE}
)

set(PROMETHEUS_CPP_CORE_LIB ${INSTALL_LIBDIR}/libprometheus-cpp-core.a)
set(PROMETHEUS_CPP_PULL_LIB ${INSTALL_LIBDIR}/libprometheus-cpp-pull.a)

ExternalProject_Add(rediscache
URL
https://github.com/pikiwidb/rediscache/archive/refs/tags/v1.0.7.tar.gz
Expand Down Expand Up @@ -846,6 +864,8 @@ target_link_libraries(${PROJECT_NAME}
${LIB_PROTOBUF}
${LIB_GFLAGS}
${LIB_FMT}
${PROMETHEUS_CPP_PULL_LIB}
${PROMETHEUS_CPP_CORE_LIB}
libsnappy.a
libzstd.a
liblz4.a
Expand Down
6 changes: 6 additions & 0 deletions include/pika_admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,8 @@ class InfoCmd : public Cmd {
kInfoAll,
kInfoDebug,
kInfoCommandStats,
kInfoSlowCommand,
kInfoCommandP99,
kInfoCache
};
InfoCmd(const std::string& name, int arity, uint32_t flag) : Cmd(name, arity, flag) {}
Expand Down Expand Up @@ -294,6 +296,8 @@ class InfoCmd : public Cmd {
const static std::string kRocksDBSection;
const static std::string kDebugSection;
const static std::string kCommandStatsSection;
const static std::string kCommandP99Section;
const static std::string kSlowCommandSection;
const static std::string kCacheSection;

void DoInitial() override;
Expand All @@ -314,6 +318,8 @@ class InfoCmd : public Cmd {
void InfoRocksDB(std::string& info);
void InfoDebug(std::string& info);
void InfoCommandStats(std::string& info);
void InfoCommandP99(std::string& info);
void InfoSlowCommand(std::string& info);
void InfoCache(std::string& info, std::shared_ptr<DB> db);

std::string CacheStatusToString(int status);
Expand Down
2 changes: 1 addition & 1 deletion include/pika_client_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ class PikaClientConn : public net::RedisConn {
std::shared_ptr<Cmd> DoCmd(const PikaCmdArgsType& argv, const std::string& opt,
const std::shared_ptr<std::string>& resp_ptr, bool cache_miss_in_rtc);

void ProcessSlowlog(const PikaCmdArgsType& argv, std::shared_ptr<Cmd> c_ptr);
void ProcessSlowlog(const PikaCmdArgsType& argv, std::shared_ptr<Cmd> c_ptr, const std::string& opt);
void ProcessMonitor(const PikaCmdArgsType& argv);

void ExecRedisCmd(const PikaCmdArgsType& argv, std::shared_ptr<std::string>& resp_ptr, bool cache_miss_in_rtc);
Expand Down
33 changes: 33 additions & 0 deletions include/pika_cmd_table_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,17 @@

#include <shared_mutex>
#include <thread>
#include <prometheus/exposer.h>
#include <prometheus/registry.h>
#include <prometheus/counter.h>
#include <prometheus/histogram.h>

#include "include/acl.h"
#include "include/pika_command.h"
#include "include/pika_data_distribution.h"

using namespace prometheus;

struct CommandStatistics {
CommandStatistics() = default;
CommandStatistics(const CommandStatistics& other) {
Expand All @@ -23,6 +29,23 @@ struct CommandStatistics {
std::atomic<uint64_t> cmd_time_consuming = 0;
};

struct HistogramData {
std::shared_ptr<prometheus::Registry> registry;
prometheus::Family<prometheus::Histogram>* family;
std::unordered_map<std::string, prometheus::Histogram*> histograms;

HistogramData() {
registry = std::make_shared<prometheus::Registry>();
family = &prometheus::BuildHistogram()
.Name("pika_command_duration_seconds")
.Help("Execution time of Pika commands in seconds")
.Register(*registry);
}

HistogramData(const HistogramData&) = delete;
HistogramData& operator=(const HistogramData&) = delete;
};

class PikaCmdTableManager {
friend AclSelector;

Expand All @@ -42,6 +65,11 @@ class PikaCmdTableManager {
* Info Commandstats used
*/
std::unordered_map<std::string, CommandStatistics>* GetCommandStatMap();
std::unordered_map<std::string, CommandStatistics> GetSlowCommandCount();
void UpdateSlowCommandCount(const std::string& opt);
void ResetCommandCount();
prometheus::Histogram& GetHistogram(const std::string& opt);
std::shared_ptr<HistogramData> GetHistogramsData();

private:
std::shared_ptr<Cmd> NewCommand(const std::string& opt);
Expand All @@ -60,5 +88,10 @@ class PikaCmdTableManager {
* Info Commandstats used
*/
std::unordered_map<std::string, CommandStatistics> cmdstat_map_;
std::unordered_map<std::string, CommandStatistics> slow_command_count_;
std::shared_mutex slow_command_mutex_;
std::shared_mutex histograms_mutex_;
std::mutex data_mutex_;
std::shared_ptr<HistogramData> data_;
};
#endif
1 change: 1 addition & 0 deletions include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ class PikaServer : public pstd::noncopyable {
void ResetStat();
void incr_accumulative_connections();
void ResetLastSecQuerynum();
void ResetCommandCount();
void UpdateQueryNumAndExecCountDB(const std::string& db_name, const std::string& command, bool is_write);
std::unordered_map<std::string, uint64_t> ServerExecCountDB();
std::unordered_map<std::string, QpsStatistic> ServerAllDBStat();
Expand Down
83 changes: 83 additions & 0 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -882,6 +882,8 @@ const std::string InfoCmd::kKeyspaceSection = "keyspace";
const std::string InfoCmd::kDataSection = "data";
const std::string InfoCmd::kRocksDBSection = "rocksdb";
const std::string InfoCmd::kDebugSection = "debug";
const std::string InfoCmd::kCommandP99Section = "commandp99";
const std::string InfoCmd::kSlowCommandSection = "slowcommand";
const std::string InfoCmd::kCommandStatsSection = "commandstats";
const std::string InfoCmd::kCacheSection = "cache";

Expand Down Expand Up @@ -967,6 +969,10 @@ void InfoCmd::DoInitial() {
info_section_ = kInfoDebug;
} else if (strcasecmp(argv_[1].data(), kCommandStatsSection.data()) == 0) {
info_section_ = kInfoCommandStats;
} else if (strcasecmp(argv_[1].data(), kCommandP99Section.data()) == 0) {
info_section_ = kInfoCommandP99;
} else if (strcasecmp(argv_[1].data(), kSlowCommandSection.data()) == 0) {
info_section_ = kInfoSlowCommand;
} else if (strcasecmp(argv_[1].data(), kCacheSection.data()) == 0) {
info_section_ = kInfoCache;
} else {
Expand Down Expand Up @@ -1008,6 +1014,10 @@ void InfoCmd::Do() {
info.append("\r\n");
InfoCommandStats(info);
info.append("\r\n");
InfoCommandP99(info);
info.append("\r\n");
InfoSlowCommand(info);
info.append("\r\n");
InfoCache(info, db_);
info.append("\r\n");
InfoCPU(info);
Expand Down Expand Up @@ -1051,6 +1061,12 @@ void InfoCmd::Do() {
case kInfoCommandStats:
InfoCommandStats(info);
break;
case kInfoCommandP99:
InfoCommandP99(info);
break;
case kInfoSlowCommand:
InfoSlowCommand(info);
break;
case kInfoCache:
InfoCache(info, db_);
break;
Expand Down Expand Up @@ -1499,6 +1515,73 @@ void InfoCmd::InfoCommandStats(std::string& info) {
info.append(tmp_stream.str());
}

void InfoCmd::InfoCommandP99(std::string& info) {
std::stringstream tmp_stream;
tmp_stream.precision(2);
tmp_stream.setf(std::ios::fixed);
tmp_stream << "# Commands P99" << "\r\n";
auto data = g_pika_cmd_table_manager->GetHistogramsData();
auto* histogram_family = data->family;
for (const auto& metric_family : histogram_family->Collect()) {
for (const auto& metric : metric_family.metric) {
std::string command_name;

for (const auto& label : metric.label) {
if (label.name == "command") {
command_name = label.value;
break;
}
}

double total_count = metric.histogram.sample_count;

if (command_name.empty()) {
tmp_stream << "Command: UNKNOWN\r\n";
} else {
tmp_stream << "Command: " << command_name << "\r\n";
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

对于P99理论上至少需要100个样本才有意义。建议增加一个判断,若total_count < 100就continue。

double tp99_threshold = total_count * 0.99;
double tp999_threshold = total_count * 0.999;
double tp9999_threshold = total_count * 0.9999;
double tp99 = 0, tp999 = 0, tp9999 = 0;

for (const auto& bucket : metric.histogram.bucket) {
if (bucket.cumulative_count >= tp99_threshold && tp99 == 0) {
tp99 = bucket.upper_bound;
}
if (bucket.cumulative_count >= tp999_threshold && tp999 == 0) {
tp999 = bucket.upper_bound;
}
if (bucket.cumulative_count >= tp9999_threshold && tp9999 == 0) {
tp9999 = bucket.upper_bound;
break;
}
}
tmp_stream << "TP99 ms: " << tp99 << "\r\n";
tmp_stream << "TP999 ms: " << tp999 << "\r\n";
tmp_stream << "TP9999 ms: " << tp9999 << "\r\n";
tmp_stream << "----------------------\r\n";
}
}

info.append(tmp_stream.str());
}

void InfoCmd::InfoSlowCommand(std::string& info) {
std::stringstream tmp_stream;
tmp_stream.precision(2);
tmp_stream.setf(std::ios::fixed);
auto stats = g_pika_cmd_table_manager->GetSlowCommandCount();
tmp_stream << "# SlowCommand Count" << "\r\n";
for (auto iter : stats) {
if (iter.second.cmd_count != 0) {
tmp_stream << iter.first << ":slow_count=" << iter.second.cmd_count << "\r\n";
}
}
info.append(tmp_stream.str());
}

void InfoCmd::InfoCache(std::string& info, std::shared_ptr<DB> db) {
std::stringstream tmp_stream;
tmp_stream << "# Cache" << "\r\n";
Expand Down
11 changes: 8 additions & 3 deletions src/pika_client_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@
#include <glog/logging.h>
#include <utility>
#include <vector>

#include <prometheus/exposer.h>
#include <prometheus/registry.h>
#include <prometheus/counter.h>
#include <prometheus/histogram.h>
#include "include/pika_admin.h"
#include "include/pika_client_conn.h"
#include "include/pika_cmd_table_manager.h"
Expand Down Expand Up @@ -221,19 +224,21 @@ std::shared_ptr<Cmd> PikaClientConn::DoCmd(const PikaCmdArgsType& argv, const st
c_ptr->Execute();

time_stat_->process_done_ts_ = pstd::NowMicros();
g_pika_cmd_table_manager->GetHistogram(opt).Observe(time_stat_->total_time() / 1000);
auto cmdstat_map = g_pika_cmd_table_manager->GetCommandStatMap();
(*cmdstat_map)[opt].cmd_count.fetch_add(1);
(*cmdstat_map)[opt].cmd_time_consuming.fetch_add(time_stat_->total_time());

if (g_pika_conf->slowlog_slower_than() >= 0) {
ProcessSlowlog(argv, c_ptr);
ProcessSlowlog(argv, c_ptr, opt);
}

return c_ptr;
}

void PikaClientConn::ProcessSlowlog(const PikaCmdArgsType& argv, std::shared_ptr<Cmd> c_ptr) {
void PikaClientConn::ProcessSlowlog(const PikaCmdArgsType& argv, std::shared_ptr<Cmd> c_ptr, const std::string& opt) {
if (time_stat_->total_time() > g_pika_conf->slowlog_slower_than()) {
g_pika_cmd_table_manager->UpdateSlowCommandCount(opt);
g_pika_server->SlowlogPushEntry(argv, time_stat_->start_ts() / 1000000, time_stat_->total_time());
if (g_pika_conf->slowlog_write_errorlog()) {
bool trim = false;
Expand Down
67 changes: 67 additions & 0 deletions src/pika_cmd_table_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,21 @@

extern std::unique_ptr<PikaConf> g_pika_conf;

void PikaCmdTableManager::ResetCommandCount() {
{
std::unique_lock<std::shared_mutex> write_lock(slow_command_mutex_);
slow_command_count_.clear();
}
{
std::lock_guard<std::mutex> lock(data_mutex_);
data_ = std::make_shared<HistogramData>();
}
}

PikaCmdTableManager::PikaCmdTableManager() {
cmds_ = std::make_unique<CmdTable>();
cmds_->reserve(300);
data_ = std::make_shared<HistogramData>();
}

void PikaCmdTableManager::InitCmdTable(void) {
Expand Down Expand Up @@ -63,6 +75,61 @@ void PikaCmdTableManager::RenameCommand(const std::string before, const std::str
}
}

prometheus::Histogram& PikaCmdTableManager::GetHistogram(const std::string& opt) {
std::shared_ptr<HistogramData> data_copy;
{
std::lock_guard<std::mutex> lock(data_mutex_);
data_copy = data_;
}

{
std::shared_lock<std::shared_mutex> read_lock(histograms_mutex_);
auto it = data_copy->histograms.find(opt);
if (it != data_copy->histograms.end()) {
return *(it->second);
}
}

std::unique_lock<std::shared_mutex> write_lock(histograms_mutex_);
auto& new_histogram = data_copy->family->Add(
{{"command", opt}},
prometheus::Histogram::BucketBoundaries{0.5, 1, 2, 3, 5, 7, 10, 15, 20, 30, 40, 50, 65, 75, 85, 100, 125, 140, 150, 160, 175, 185, 200, 300, 400, 500, 750, 1000, 2000, 5000, 10000}
);
data_copy->histograms[opt] = &new_histogram;
return new_histogram;
}

std::shared_ptr<HistogramData> PikaCmdTableManager::GetHistogramsData() {
std::shared_ptr<HistogramData> data_copy;
{
std::lock_guard<std::mutex> lock(data_mutex_);
data_copy = data_;
}
return data_copy;
}

void PikaCmdTableManager::UpdateSlowCommandCount(const std::string& opt) {
{
std::shared_lock<std::shared_mutex> read_lock(slow_command_mutex_);
if (slow_command_count_.find(opt) != slow_command_count_.end()) {
slow_command_count_[opt].cmd_count.fetch_add(1);
return;
}
}

{
std::unique_lock<std::shared_mutex> write_lock(slow_command_mutex_);
slow_command_count_[opt];
}

slow_command_count_[opt].cmd_count.fetch_add(1);
}

std::unordered_map<std::string, CommandStatistics> PikaCmdTableManager::GetSlowCommandCount() {
std::shared_lock<std::shared_mutex> lock(slow_command_mutex_);
return slow_command_count_;
}

std::unordered_map<std::string, CommandStatistics>* PikaCmdTableManager::GetCommandStatMap() {
return &cmdstat_map_;
}
Expand Down
Loading
Loading