-
Notifications
You must be signed in to change notification settings - Fork 238
Expand file tree
/
Copy pathrun_stats.h
More file actions
268 lines (228 loc) · 11.5 KB
/
run_stats.h
File metadata and controls
268 lines (228 loc) · 11.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
/*
* Copyright (C) 2011-2026 Redis Labs Ltd.
*
* This file is part of memtier_benchmark.
*
* memtier_benchmark is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, version 2.
*
* memtier_benchmark is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with memtier_benchmark. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef MEMTIER_BENCHMARK_RUN_STATS_H
#define MEMTIER_BENCHMARK_RUN_STATS_H
#include <stdlib.h>
#include <stdio.h>
#include <atomic>
#include <map>
#include <vector>
#include <string>
#include <pthread.h>
// Mutex wrapper with correct copy/move semantics: copies always initialize a
// fresh mutex rather than duplicating mutex state (which is undefined behavior).
// This allows run_stats to remain copyable without a hand-written copy constructor.
struct reinit_mutex_t
{
mutable pthread_mutex_t mtx;
reinit_mutex_t() { pthread_mutex_init(&mtx, NULL); }
~reinit_mutex_t() { pthread_mutex_destroy(&mtx); }
reinit_mutex_t(const reinit_mutex_t &) { pthread_mutex_init(&mtx, NULL); }
reinit_mutex_t &operator=(const reinit_mutex_t &) { return *this; }
};
#include "memtier_benchmark.h"
#include "run_stats_types.h"
#include "JSON_handler.h"
#include "deps/hdr_histogram/hdr_histogram.h"
#include "deps/hdr_histogram/hdr_histogram_log.h"
inline long long int ts_diff(struct timeval a, struct timeval b)
{
unsigned long long aval = a.tv_sec * 1000000 + a.tv_usec;
unsigned long long bval = b.tv_sec * 1000000 + b.tv_usec;
return bval - aval;
}
enum tabel_el_type
{
string_el,
double_el
};
struct table_el
{
tabel_el_type type;
std::string format;
std::string str_value;
double double_value;
table_el *init_str(std::string fmt, std::string val)
{
type = string_el;
format = fmt;
str_value = val;
return this;
}
table_el *init_double(std::string fmt, double val)
{
type = double_el;
format = fmt;
double_value = val;
return this;
}
};
struct table_column
{
table_column() {}
table_column(unsigned int col_size) : column_size(col_size) {}
unsigned int column_size;
std::vector<table_el> elements;
};
class output_table
{
private:
std::vector<table_column> columns;
public:
void print_header(FILE *out, const char *header);
void add_column(table_column &col);
void print(FILE *out, const char *header);
};
// Structure to hold aggregated stats by command type
struct aggregated_command_type_stats
{
std::string command_type; // e.g., "SET", "GET"
totals_cmd stats; // aggregated totals
safe_hdr_histogram latency_hist; // merged histogram
std::vector<size_t> command_indices; // indices of commands with this type
std::vector<one_sec_cmd_stats> per_second_stats; // aggregated per-second stats for JSON time series
};
class run_stats
{
protected:
friend bool one_second_stats_predicate(const one_second_stats &a, const one_second_stats &b);
benchmark_config *m_config;
struct timeval m_start_time;
struct timeval m_end_time;
// Atomic flag set after m_start_time is fully written; guards cross-thread reads.
// std::atomic is not copyable, but run_stats needs to be (stored in std::vector).
// Copies occur only after threads join, so relaxed copy semantics are safe.
struct copyable_atomic_bool
{
std::atomic<bool> flag;
copyable_atomic_bool(bool v = false) : flag(v) {}
copyable_atomic_bool(const copyable_atomic_bool &o) : flag(o.flag.load(std::memory_order_relaxed)) {}
copyable_atomic_bool &operator=(const copyable_atomic_bool &o)
{
flag.store(o.flag.load(std::memory_order_relaxed), std::memory_order_relaxed);
return *this;
}
} m_started;
bool m_interrupted;
totals m_totals;
std::list<one_second_stats> m_stats;
std::vector<double> quantiles_list;
// current second stats ( appended to m_stats and reset every second )
one_second_stats m_cur_stats;
safe_hdr_histogram m_get_latency_histogram;
safe_hdr_histogram m_set_latency_histogram;
safe_hdr_histogram m_wait_latency_histogram;
std::vector<safe_hdr_histogram> m_ar_commands_latency_histograms;
safe_hdr_histogram m_totals_latency_histogram;
// instantaneous command stats ( used in the per second latencies )
safe_hdr_histogram inst_m_get_latency_histogram;
safe_hdr_histogram inst_m_set_latency_histogram;
safe_hdr_histogram inst_m_wait_latency_histogram;
std::vector<safe_hdr_histogram> inst_m_ar_commands_latency_histograms;
safe_hdr_histogram inst_m_totals_latency_histogram;
// Protects inst_m_totals_latency_histogram against concurrent reset/aggregation reads.
// Worker threads use hdr_record_value_capped_atomic() for lock-free writes;
// this mutex serializes hdr_reset and hdr_add from the main thread.
reinit_mutex_t m_inst_histogram_mutex;
void roll_cur_stats(struct timeval *ts);
public:
run_stats(benchmark_config *config);
void setup_arbitrary_commands(size_t n_arbitrary_commands);
void set_start_time(struct timeval *start_time);
void set_end_time(struct timeval *end_time);
void set_interrupted(bool interrupted) { m_interrupted = interrupted; }
bool get_interrupted() const { return m_interrupted; }
void update_get_op(struct timeval *ts, unsigned int bytes_rx, unsigned int bytes_tx, unsigned int latency,
unsigned int hits, unsigned int misses);
void update_set_op(struct timeval *ts, unsigned int bytes_rx, unsigned int bytes_tx, unsigned int latency);
void update_connection_error(struct timeval *ts);
void update_moved_get_op(struct timeval *ts, unsigned int bytes_rx, unsigned int bytes_tx, unsigned int latency);
void update_moved_set_op(struct timeval *ts, unsigned int bytes_rx, unsigned int bytes_tx, unsigned int latency);
void update_moved_arbitrary_op(struct timeval *ts, unsigned int bytes_rx, unsigned int bytes_tx,
unsigned int latency, size_t arbitrary_index);
void update_ask_get_op(struct timeval *ts, unsigned int bytes_rx, unsigned int bytes_tx, unsigned int latency);
void update_ask_set_op(struct timeval *ts, unsigned int bytes_rx, unsigned int bytes_tx, unsigned int latency);
void update_ask_arbitrary_op(struct timeval *ts, unsigned int bytes_rx, unsigned int bytes_tx, unsigned int latency,
size_t arbitrary_index);
void update_wait_op(struct timeval *ts, unsigned int latency);
void update_arbitrary_op(struct timeval *ts, unsigned int bytes_rx, unsigned int bytes_tx, unsigned int latency,
size_t arbitrary_index);
void aggregate_average(const std::vector<run_stats> &all_stats);
void summarize(totals &result) const;
void summarize_current_second();
void merge(const run_stats &other, int iteration);
std::vector<one_sec_cmd_stats> get_one_sec_cmd_stats_get();
std::vector<one_sec_cmd_stats> get_one_sec_cmd_stats_set();
std::vector<one_sec_cmd_stats> get_one_sec_cmd_stats_wait();
std::vector<one_sec_cmd_stats> get_one_sec_cmd_stats_totals();
std::vector<one_sec_cmd_stats> get_one_sec_cmd_stats_arbitrary_command(unsigned int pos);
std::vector<unsigned int> get_one_sec_cmd_stats_timestamp();
void save_csv_one_sec(FILE *f, unsigned long int &total_get_ops, unsigned long int &total_set_ops,
unsigned long int &total_wait_ops);
// Safely copy instantaneous total latency histogram into target under mutex.
// Use this instead of a raw pointer getter to avoid data races with worker threads.
void copy_inst_histogram(hdr_histogram *target) const;
void save_csv_one_sec_cluster(FILE *f);
void save_csv_set_get_commands(FILE *f, bool cluster_mode);
void save_csv_arbitrary_commands_one_sec(FILE *f, arbitrary_command_list &command_list,
std::vector<unsigned long int> &total_arbitrary_commands_ops);
void save_csv_arbitrary_commands(FILE *f, arbitrary_command_list &command_list);
bool save_hdr_percentiles_print_format(struct hdr_histogram *hdr, char *filename);
bool save_hdr_log_format(struct hdr_histogram *hdr, char *filename, char *header);
bool save_hdr_full_run(benchmark_config *config, int run_number);
bool save_hdr_set_command(benchmark_config *config, int run_number);
bool save_hdr_get_command(benchmark_config *config, int run_number);
bool save_hdr_arbitrary_commands(benchmark_config *config, int run_number);
bool save_csv(const char *filename, benchmark_config *config);
void debug_dump(void);
// function to handle the results output
bool print_arbitrary_commands_results();
// Build aggregated stats by command type (e.g., SET, GET) from per-command stats
std::vector<aggregated_command_type_stats> build_aggregated_command_stats(arbitrary_command_list &command_list);
void print_type_column(output_table &table, arbitrary_command_list &command_list,
const std::vector<aggregated_command_type_stats> *aggregated = nullptr);
void print_ops_sec_column(output_table &table,
const std::vector<aggregated_command_type_stats> *aggregated = nullptr);
void print_hits_sec_column(output_table &table);
void print_missess_sec_column(output_table &table);
void print_moved_sec_column(output_table &table,
const std::vector<aggregated_command_type_stats> *aggregated = nullptr);
void print_ask_sec_column(output_table &table,
const std::vector<aggregated_command_type_stats> *aggregated = nullptr);
void print_avg_latency_column(output_table &table,
const std::vector<aggregated_command_type_stats> *aggregated = nullptr);
void print_quantile_latency_column(output_table &table, double quantile, char *label,
const std::vector<aggregated_command_type_stats> *aggregated = nullptr);
void print_kb_sec_column(output_table &table,
const std::vector<aggregated_command_type_stats> *aggregated = nullptr);
void print_json(json_handler *jsonhandler, arbitrary_command_list &command_list, bool cluster_mode,
const std::vector<aggregated_command_type_stats> *aggregated = nullptr);
void print_histogram(FILE *out, json_handler *jsonhandler, arbitrary_command_list &command_list,
const std::vector<aggregated_command_type_stats> *aggregated = nullptr);
void print(FILE *file, benchmark_config *config, const char *header = NULL, json_handler *jsonhandler = NULL);
unsigned int get_duration(void);
unsigned long int get_duration_usec(void);
unsigned long int get_total_bytes(void);
unsigned long int get_total_ops(void);
unsigned long int get_total_latency(void);
unsigned long int get_total_connection_errors(void);
// Returns true if set_start_time() was called, indicating the client
// produced (or was ready to produce) meaningful stats data.
bool has_started(void) const { return m_started.flag.load(std::memory_order_acquire); }
};
#endif // MEMTIER_BENCHMARK_RUN_STATS_H