Skip to content

Commit 9ddfcff

Browse files
Add rate limiting to control the number of requests per second (#237)
* Add rate limiting to control the number of requests per second A new 'rate-limiting' option was added to control the number of request per second. The rate limiting is based on the 'Token Bucket' algorithm, and according to the configured rate, on each interval, a "new" amount of requests allowed to be sent to the server. The rate-limiting is at the connection level. Therefore, in cluster mode, the limitation is for each shard, so if, for example, the cluster has three shards and the user configured one request per second. On every second, memtier-benchmark will send three requests, one for each shard. * Added tests to cover --rate-limiting option. Ensured the help message explains the per connection rate-limit * Fixed per cluster rps test limits --------- Co-authored-by: YaacovHazan <[email protected]> Co-authored-by: filipecosta90 <[email protected]>
1 parent 0325e00 commit 9ddfcff

File tree

6 files changed

+118
-9
lines changed

6 files changed

+118
-9
lines changed

memtier_benchmark.cpp

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ static void config_print(FILE *file, struct benchmark_config *cfg)
127127
"run_count = %u\n"
128128
"debug = %u\n"
129129
"requests = %llu\n"
130+
"rate_limit = %u\n"
130131
"clients = %u\n"
131132
"threads = %u\n"
132133
"test_time = %u\n"
@@ -176,6 +177,7 @@ static void config_print(FILE *file, struct benchmark_config *cfg)
176177
cfg->run_count,
177178
cfg->debug,
178179
cfg->requests,
180+
cfg->request_rate,
179181
cfg->clients,
180182
cfg->threads,
181183
cfg->test_time,
@@ -233,6 +235,7 @@ static void config_print_to_json(json_handler * jsonhandler, struct benchmark_co
233235
jsonhandler->write_obj("run_count" ,"%u", cfg->run_count);
234236
jsonhandler->write_obj("debug" ,"%u", cfg->debug);
235237
jsonhandler->write_obj("requests" ,"%llu", cfg->requests);
238+
jsonhandler->write_obj("rate_limit" ,"%u", cfg->request_rate);
236239
jsonhandler->write_obj("clients" ,"%u", cfg->clients);
237240
jsonhandler->write_obj("threads" ,"%u", cfg->threads);
238241
jsonhandler->write_obj("test_time" ,"%u", cfg->test_time);
@@ -421,6 +424,7 @@ static int config_parse_args(int argc, char *argv[], struct benchmark_config *cf
421424
o_tls_sni,
422425
o_tls_protocols,
423426
o_hdr_file_prefix,
427+
o_rate_limiting,
424428
o_help
425429
};
426430

@@ -489,6 +493,7 @@ static int config_parse_args(int argc, char *argv[], struct benchmark_config *cf
489493
{ "command", 1, 0, o_command },
490494
{ "command-key-pattern", 1, 0, o_command_key_pattern },
491495
{ "command-ratio", 1, 0, o_command_ratio },
496+
{ "rate-limiting", 1, 0, o_rate_limiting },
492497
{ NULL, 0, 0, 0 }
493498
};
494499

@@ -861,6 +866,15 @@ static int config_parse_args(int argc, char *argv[], struct benchmark_config *cf
861866
}
862867
break;
863868
}
869+
case o_rate_limiting: {
870+
endptr = NULL;
871+
cfg->request_rate = (unsigned int) strtoul(optarg, &endptr, 10);
872+
if (!cfg->request_rate || !endptr || *endptr != '\0') {
873+
fprintf(stderr, "error: rate must be greater than zero.\n");
874+
return -1;
875+
}
876+
break;
877+
}
864878
#ifdef USE_TLS
865879
case o_tls:
866880
cfg->tls = true;
@@ -948,7 +962,7 @@ void usage() {
948962
" --key=FILE Use specified private key for TLS\n"
949963
" --cacert=FILE Use specified CA certs bundle for TLS\n"
950964
" --tls-skip-verify Skip verification of server certificate\n"
951-
" --tls-protocols Specify the tls protocol version to use, comma delemited. Use a combination of 'TLSv1', 'TLSv1.1', 'TLSv1.2' and 'TLSv1.3'"
965+
" --tls-protocols Specify the tls protocol version to use, comma delemited. Use a combination of 'TLSv1', 'TLSv1.1', 'TLSv1.2' and 'TLSv1.3'.\n"
952966
" --sni=STRING Add an SNI header\n"
953967
#endif
954968
" -x, --run-count=NUMBER Number of full-test iterations to perform\n"
@@ -967,6 +981,8 @@ void usage() {
967981
"Test Options:\n"
968982
" -n, --requests=NUMBER Number of total requests per client (default: 10000)\n"
969983
" use 'allkeys' to run on the entire key-range\n"
984+
" --rate-limiting=NUMBER The max number of requests to make per second from an individual connection (default is unlimited rate).\n"
985+
" If you use --rate-limiting and a very large rate is entered which cannot be met, memtier will do as many requests as possible per second.\n"
970986
" -c, --clients=NUMBER Number of clients per thread (default: 50)\n"
971987
" -t, --threads=NUMBER Number of threads (default: 4)\n"
972988
" --test-time=SECS Number of seconds to run the test\n"
@@ -1348,6 +1364,16 @@ int main(int argc, char *argv[])
13481364
delete tmp_protocol;
13491365
}
13501366

1367+
// if user configured rate limiting, do some calculations
1368+
if (cfg.request_rate) {
1369+
/* Our event resolution is (at least) 50 events per second (event every >= 20 ml).
1370+
* When we calculate the number of request per interval, we are taking
1371+
* the upper bound and adjust the interval accordingly to get more accuracy */
1372+
cfg.request_per_interval = (cfg.request_rate + 50 - 1) / 50;
1373+
unsigned int events_per_second = cfg.request_rate / cfg.request_per_interval;
1374+
cfg.request_interval_microsecond = 1000000 / events_per_second;
1375+
benchmark_debug_log("Rate limiting configured to send %u requests per %u millisecond\n", cfg.request_per_interval, cfg.request_interval_microsecond / 1000);
1376+
}
13511377

13521378
#ifdef USE_TLS
13531379
// Initialize OpenSSL only if we're really going to use it.

memtier_benchmark.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,9 @@ struct benchmark_config {
104104
bool cluster_mode;
105105
struct arbitrary_command_list* arbitrary_commands;
106106
const char *hdr_prefix;
107+
unsigned int request_rate;
108+
unsigned int request_per_interval;
109+
unsigned int request_interval_microsecond;
107110
#ifdef USE_TLS
108111
bool tls;
109112
const char *tls_cert;

shard_connection.cpp

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,13 @@
5656
#include "event2/bufferevent_ssl.h"
5757
#endif
5858

59+
void cluster_client_timer_handler(evutil_socket_t fd, short what, void *ctx)
60+
{
61+
shard_connection *sc = (shard_connection *) ctx;
62+
assert(sc != NULL);
63+
sc->handle_timer_event();
64+
}
65+
5966
void cluster_client_read_handler(bufferevent *bev, void *ctx)
6067
{
6168
shard_connection *sc = (shard_connection *) ctx;
@@ -66,7 +73,6 @@ void cluster_client_read_handler(bufferevent *bev, void *ctx)
6673
void cluster_client_event_handler(bufferevent *bev, short events, void *ctx)
6774
{
6875
shard_connection *sc = (shard_connection *) ctx;
69-
7076
assert(sc != NULL);
7177
sc->handle_event(events);
7278
}
@@ -123,7 +129,7 @@ verify_request::~verify_request(void)
123129
shard_connection::shard_connection(unsigned int id, connections_manager* conns_man, benchmark_config* config,
124130
struct event_base* event_base, abstract_protocol* abs_protocol) :
125131
m_address(NULL), m_port(NULL), m_unix_sockaddr(NULL),
126-
m_bev(NULL), m_pending_resp(0), m_connection_state(conn_disconnected),
132+
m_bev(NULL), m_request_per_cur_interval(0), m_pending_resp(0), m_connection_state(conn_disconnected),
127133
m_hello(setup_done), m_authentication(setup_done), m_db_selection(setup_done), m_cluster_slots(setup_done) {
128134
m_id = id;
129135
m_conns_manager = conns_man;
@@ -341,6 +347,10 @@ request* shard_connection::pop_req() {
341347
void shard_connection::push_req(request* req) {
342348
m_pipeline->push(req);
343349
m_pending_resp++;
350+
if (m_config->request_rate) {
351+
assert(m_request_per_cur_interval > 0);
352+
m_request_per_cur_interval--;
353+
}
344354
}
345355

346356
bool shard_connection::is_conn_setup_done() {
@@ -486,21 +496,28 @@ void shard_connection::process_first_request() {
486496
fill_pipeline();
487497
}
488498

489-
490499
void shard_connection::fill_pipeline(void)
491500
{
492501
struct timeval now;
493502
gettimeofday(&now, NULL);
503+
494504
while (!m_conns_manager->finished() && m_pipeline->size() < m_config->pipeline) {
495505
if (!is_conn_setup_done()) {
496506
send_conn_setup_commands(now);
497507
return;
498508
}
509+
499510
// don't exceed requests
500511
if (m_conns_manager->hold_pipeline(m_id)) {
501512
break;
502513
}
503514

515+
// that's enough, we reached the rate limit
516+
if (m_config->request_rate && m_request_per_cur_interval == 0) {
517+
// return and skip on update events
518+
return;
519+
}
520+
504521
// client manage requests logic
505522
m_conns_manager->create_request(now, m_id);
506523
}
@@ -511,6 +528,9 @@ void shard_connection::fill_pipeline(void)
511528
if ((m_pending_resp == 0) && (evbuffer_get_length(bufferevent_get_output(m_bev)) == 0)) {
512529
benchmark_debug_log("%s Done, no requests to send no response to wait for\n", get_readable_id());
513530
bufferevent_disable(m_bev, EV_WRITE|EV_READ);
531+
if (m_config->request_rate) {
532+
event_del(m_event_timer);
533+
}
514534
}
515535
}
516536
}
@@ -526,6 +546,14 @@ void shard_connection::handle_event(short events)
526546
bufferevent_enable(m_bev, EV_READ|EV_WRITE);
527547

528548
if (!m_conns_manager->get_reqs_processed()) {
549+
/* Set timer for request rate */
550+
if (m_config->request_rate) {
551+
struct timeval interval = { 0, (long int)m_config->request_interval_microsecond };
552+
m_request_per_cur_interval = m_config->request_per_interval;
553+
m_event_timer = event_new(m_event_base, -1, EV_PERSIST, cluster_client_timer_handler, (void *)this);
554+
event_add(m_event_timer, &interval);
555+
}
556+
529557
process_first_request();
530558
} else {
531559
benchmark_debug_log("reconnection complete, proceeding with test\n");
@@ -561,6 +589,11 @@ void shard_connection::handle_event(short events)
561589
}
562590
}
563591

592+
void shard_connection::handle_timer_event() {
593+
m_request_per_cur_interval = m_config->request_per_interval;
594+
fill_pipeline();
595+
}
596+
564597
void shard_connection::send_wait_command(struct timeval* sent_time,
565598
unsigned int num_slaves, unsigned int timeout) {
566599
int cmd_size = 0;

shard_connection.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ struct verify_request : public request {
7676
};
7777

7878
class shard_connection {
79+
friend void cluster_client_timer_handler(evutil_socket_t fd, short what, void *ctx);
7980
friend void cluster_client_read_handler(bufferevent *bev, void *ctx);
8081
friend void cluster_client_event_handler(bufferevent *bev, short events, void *ctx);
8182

@@ -148,6 +149,7 @@ class shard_connection {
148149
void fill_pipeline(void);
149150

150151
void handle_event(short evtype);
152+
void handle_timer_event();
151153

152154
unsigned int m_id;
153155
connections_manager* m_conns_manager;
@@ -160,9 +162,11 @@ class shard_connection {
160162
struct sockaddr_un* m_unix_sockaddr;
161163
struct bufferevent *m_bev;
162164
struct event_base* m_event_base;
165+
struct event* m_event_timer;
163166

164167
abstract_protocol* m_protocol;
165168
std::queue<request *>* m_pipeline;
169+
unsigned int m_request_per_cur_interval; // number requests to send during the current interval
166170

167171
int m_pending_resp;
168172

tests/include.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ def ensure_tls_protocols(master_nodes_connections):
1616

1717

1818
def assert_minimum_memtier_outcomes(config, env, memtier_ok, overall_expected_request_count,
19-
overall_request_count):
19+
overall_request_count, overall_request_delta=None):
2020
failed_asserts = env.getNumberOfFailedAssertion()
2121
try:
2222
# assert correct exit code
@@ -25,8 +25,11 @@ def assert_minimum_memtier_outcomes(config, env, memtier_ok, overall_expected_re
2525
env.assertTrue(os.path.isfile('{0}/mb.stdout'.format(config.results_dir)))
2626
env.assertTrue(os.path.isfile('{0}/mb.stderr'.format(config.results_dir)))
2727
env.assertTrue(os.path.isfile('{0}/mb.json'.format(config.results_dir)))
28-
# assert we have the expected request count
29-
env.assertEqual(overall_expected_request_count, overall_request_count)
28+
if overall_request_delta is None:
29+
# assert we have the expected request count
30+
env.assertEqual(overall_expected_request_count, overall_request_count)
31+
else:
32+
env.assertAlmostEqual(overall_expected_request_count, overall_request_count,overall_request_delta)
3033
finally:
3134
if env.getNumberOfFailedAssertion() > failed_asserts:
3235
debugPrintMemtierOnError(config, env)
@@ -108,13 +111,14 @@ def addTLSArgs(benchmark_specs, env):
108111

109112

110113

111-
def get_default_memtier_config(threads=10, clients=5, requests=1000):
114+
def get_default_memtier_config(threads=10, clients=5, requests=1000, test_time=None):
112115
config = {
113116
"memtier_benchmark": {
114117
"binary": MEMTIER_BINARY,
115118
"threads": threads,
116119
"clients": clients,
117-
"requests": requests
120+
"requests": requests,
121+
"test_time": test_time
118122
},
119123
}
120124
return config

tests/tests_oss_simple_flow.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -378,3 +378,42 @@ def test_default_arbitrary_command_hset_multi_data_placeholders(env):
378378
overall_request_count = agg_info_commandstats(master_nodes_connections, merged_command_stats)
379379
assert_minimum_memtier_outcomes(config, env, memtier_ok, overall_expected_request_count,
380380
overall_request_count)
381+
382+
def test_default_set_get_rate_limited(env):
383+
master_nodes_list = env.getMasterNodesList()
384+
for client_count in [1,2,4]:
385+
for thread_count in [1,2]:
386+
rps_per_client = 100
387+
test_time_secs = 5
388+
overall_expected_rps = rps_per_client * client_count * thread_count * len(master_nodes_list)
389+
overall_expected_request_count = test_time_secs * overall_expected_rps
390+
# we give a 1 sec margin
391+
request_delta = overall_expected_rps
392+
# we will specify rate limit and the test time, which should help us get an approximate request count
393+
benchmark_specs = {"name": env.testName, "args": ['--rate-limiting={}'.format(rps_per_client)]}
394+
addTLSArgs(benchmark_specs, env)
395+
config = get_default_memtier_config(thread_count,client_count,None,test_time_secs)
396+
397+
master_nodes_connections = env.getOSSMasterNodesConnectionList()
398+
399+
# reset the commandstats
400+
for master_connection in master_nodes_connections:
401+
master_connection.execute_command("CONFIG", "RESETSTAT")
402+
403+
add_required_env_arguments(benchmark_specs, config, env, master_nodes_list)
404+
405+
# Create a temporary directory
406+
test_dir = tempfile.mkdtemp()
407+
408+
config = RunConfig(test_dir, env.testName, config, {})
409+
ensure_clean_benchmark_folder(config.results_dir)
410+
411+
benchmark = Benchmark.from_json(config, benchmark_specs)
412+
413+
# benchmark.run() returns True if the return code of memtier_benchmark was 0
414+
memtier_ok = benchmark.run()
415+
416+
master_nodes_connections = env.getOSSMasterNodesConnectionList()
417+
merged_command_stats = {'cmdstat_set': {'calls': 0}, 'cmdstat_get': {'calls': 0}}
418+
overall_request_count = agg_info_commandstats(master_nodes_connections, merged_command_stats)
419+
assert_minimum_memtier_outcomes(config, env, memtier_ok, overall_expected_request_count, overall_request_count, request_delta)

0 commit comments

Comments
 (0)