Skip to content

Commit 4e66b8c

Browse files
Add pre-computed key routing for monitor commands in cluster mode
- Pre-compute keys for all monitor commands at load time (second argument = key) - FT.SEARCH and FT.AGGREGATE treated as keyless (any connection can send) - Sequential mode: peek at next command's key to route to correct slot owner - Random mode: send from current connection, rely on MOVED/ASK - Add literal_key_type for specific monitor placeholders (__monitor_line1__, etc.) - Add protocol validation: monitor input only works with Redis protocols - Remove debug print statements from tests
1 parent 7d00e16 commit 4e66b8c

File tree

6 files changed

+226
-8
lines changed

6 files changed

+226
-8
lines changed

client.cpp

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -327,6 +327,13 @@ bool client::create_arbitrary_request(unsigned int command_index, struct timeval
327327
return false;
328328
}
329329

330+
// Mark the first argument as a literal key for cluster routing
331+
// Most Redis commands have the key as the first argument after the command name
332+
if (temp_cmd.command_args.size() > 0) {
333+
temp_cmd.command_args[0].type = literal_key_type;
334+
temp_cmd.keys_count = 1;
335+
}
336+
330337
// Format the command for the protocol (adds RESP headers)
331338
if (!m_connections[conn_id]->get_protocol()->format_arbitrary_command(temp_cmd)) {
332339
fprintf(stderr, "error: failed to format random monitor command at runtime: %s\n", monitor_cmd.c_str());
@@ -344,6 +351,9 @@ bool client::create_arbitrary_request(unsigned int command_index, struct timeval
344351
assert(res == available_for_conn);
345352
cmd_size +=
346353
m_connections[conn_id]->send_arbitrary_command(arg, m_obj_gen->get_key(), m_obj_gen->get_key_len());
354+
} else if (arg->type == literal_key_type) {
355+
// Send the literal key value from the monitor command
356+
cmd_size += m_connections[conn_id]->send_arbitrary_command(arg, arg->data.c_str(), arg->data.length());
347357
} else if (arg->type == data_type) {
348358
unsigned int value_len;
349359
const char *value = m_obj_gen->get_value(0, &value_len);
@@ -409,6 +419,9 @@ bool client::create_arbitrary_request(unsigned int command_index, struct timeval
409419
cmd_size +=
410420
m_connections[conn_id]->send_arbitrary_command(arg, m_obj_gen->get_key(), m_obj_gen->get_key_len());
411421
}
422+
} else if (arg->type == literal_key_type) {
423+
// Send the literal key value from the monitor command
424+
cmd_size += m_connections[conn_id]->send_arbitrary_command(arg, arg->data.c_str(), arg->data.length());
412425
} else if (arg->type == data_type) {
413426
unsigned int value_len;
414427
const char *value = m_obj_gen->get_value(0, &value_len);

cluster_client.cpp

Lines changed: 80 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -349,12 +349,91 @@ bool cluster_client::create_arbitrary_request(unsigned int command_index, struct
349349
* if the generated key belongs to this connection before starting to send it */
350350
assert(m_key_index_pools[conn_id]->empty());
351351

352+
const arbitrary_command &cmd = get_arbitrary_command(command_index);
353+
354+
/* Check if this command has a literal key (from monitor input) */
355+
for (unsigned int i = 0; i < cmd.command_args.size(); i++) {
356+
if (cmd.command_args[i].type == literal_key_type) {
357+
/* For literal keys, calculate slot directly from the key value */
358+
const std::string &literal_key = cmd.command_args[i].data;
359+
unsigned int hslot = calc_hslot_crc16_cluster(literal_key.c_str(), literal_key.length());
360+
unsigned int target_conn_id = m_slot_to_shard[hslot];
361+
362+
/* If this connection owns the slot, send the request */
363+
if (target_conn_id == conn_id) {
364+
client::create_arbitrary_request(command_index, timestamp, conn_id);
365+
return true;
366+
}
367+
368+
/* If the target connection is disconnected, trigger slot refresh */
369+
if (m_connections[target_conn_id]->get_connection_state() == conn_disconnected) {
370+
m_connections[conn_id]->set_cluster_slots();
371+
return false;
372+
}
373+
374+
/* If target connection is refreshing slots, skip */
375+
if (m_connections[target_conn_id]->get_cluster_slots_state() != setup_done) {
376+
return false;
377+
}
378+
379+
/* Queue is full, skip for now */
380+
key_index_pool *key_idx_pool = m_key_index_pools[target_conn_id];
381+
if (key_idx_pool->size() >= KEY_INDEX_QUEUE_MAX_SIZE) {
382+
return false;
383+
}
384+
385+
/* Store command index for the target connection (no key_index needed for literal keys) */
386+
key_idx_pool->push(command_index);
387+
key_idx_pool->push(0); /* Dummy key_index - not used for literal keys */
388+
return true;
389+
}
390+
}
391+
392+
/* Handle monitor_random_type: use pre-computed keys for cluster routing.
393+
* For sequential mode, we peek at the index before advancing to check ownership.
394+
* For random mode, we send from any connection and rely on MOVED/ASK because
395+
* we can't predict which random command will be selected. */
396+
if (cmd.command_args.size() == 1 && cmd.command_args[0].type == monitor_random_type) {
397+
bool is_random = (m_config->monitor_pattern == 'R');
398+
399+
if (is_random) {
400+
// For random mode, just send from this connection.
401+
// MOVED/ASK will redirect if needed. We can't pre-route because
402+
// the random command selection happens inside client::create_arbitrary_request()
403+
// and would differ from any check we do here.
404+
client::create_arbitrary_request(command_index, timestamp, conn_id);
405+
return true;
406+
}
407+
408+
// For sequential mode, peek at the next index without advancing
409+
size_t monitor_index = m_config->monitor_commands->peek_next_sequential_index();
410+
411+
// Check if this command has a key (using pre-computed key)
412+
if (m_config->monitor_commands->has_key(monitor_index)) {
413+
const std::string &key = m_config->monitor_commands->get_key(monitor_index);
414+
unsigned int hslot = calc_hslot_crc16_cluster(key.c_str(), key.length());
415+
unsigned int target_conn_id = m_slot_to_shard[hslot];
416+
417+
// If this connection doesn't own the slot, skip this request
418+
// The connection that owns this slot will pick it up later
419+
if (target_conn_id != conn_id) {
420+
return false;
421+
}
422+
}
423+
// If no key (keyless command like FT.SEARCH/FT.AGGREGATE), any connection can send it
424+
425+
// This connection owns the slot (or command is keyless), proceed with sending
426+
client::create_arbitrary_request(command_index, timestamp, conn_id);
427+
return true;
428+
}
429+
352430
/* keyless command can be used by any connection */
353-
if (get_arbitrary_command(command_index).keys_count == 0) {
431+
if (cmd.keys_count == 0) {
354432
client::create_arbitrary_request(command_index, timestamp, conn_id);
355433
return true;
356434
}
357435

436+
/* Normal key placeholder handling */
358437
unsigned long long key_index;
359438
get_key_response res = get_key_for_conn(command_index, conn_id, &key_index);
360439

config_types.cpp

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -478,6 +478,83 @@ bool arbitrary_command::split_command_to_args()
478478
}
479479

480480
// Monitor command list implementation
481+
482+
// Extract key from a command string (second argument after command name)
483+
// Returns empty string for keyless commands (FT.SEARCH, FT.AGGREGATE, or commands with no args)
484+
std::string monitor_command_list::extract_key_from_command(const std::string &command)
485+
{
486+
const char *p = command.c_str();
487+
std::string first_word;
488+
std::string second_word;
489+
int word_count = 0;
490+
491+
while (*p && word_count < 2) {
492+
// Skip blanks
493+
while (*p && isspace(*p)) {
494+
p++;
495+
}
496+
if (!*p) break;
497+
498+
std::string current_word;
499+
bool in_quotes = false;
500+
bool in_single_quotes = false;
501+
502+
while (*p) {
503+
if (in_quotes) {
504+
if (*p == '"') {
505+
in_quotes = false;
506+
p++;
507+
break;
508+
}
509+
current_word += *p++;
510+
} else if (in_single_quotes) {
511+
if (*p == '\'') {
512+
in_single_quotes = false;
513+
p++;
514+
break;
515+
}
516+
current_word += *p++;
517+
} else {
518+
if (*p == '"') {
519+
in_quotes = true;
520+
p++;
521+
} else if (*p == '\'') {
522+
in_single_quotes = true;
523+
p++;
524+
} else if (isspace(*p)) {
525+
break;
526+
} else {
527+
current_word += *p++;
528+
}
529+
}
530+
}
531+
532+
if (!current_word.empty()) {
533+
if (word_count == 0) {
534+
first_word = current_word;
535+
} else {
536+
second_word = current_word;
537+
}
538+
word_count++;
539+
}
540+
}
541+
542+
// No second argument - keyless command
543+
if (second_word.empty()) {
544+
return "";
545+
}
546+
547+
// Check for FT.SEARCH and FT.AGGREGATE - these are keyless (can be sent to any connection)
548+
std::string upper_cmd = first_word;
549+
std::transform(upper_cmd.begin(), upper_cmd.end(), upper_cmd.begin(), ::toupper);
550+
if (upper_cmd == "FT.SEARCH" || upper_cmd == "FT.AGGREGATE") {
551+
return "";
552+
}
553+
554+
// Return the second argument as the key
555+
return second_word;
556+
}
557+
481558
bool monitor_command_list::load_from_file(const char *filename)
482559
{
483560
FILE *file = fopen(filename, "r");
@@ -509,6 +586,10 @@ bool monitor_command_list::load_from_file(const char *filename)
509586
}
510587

511588
commands.push_back(command_str);
589+
590+
// Extract and store the key for cluster routing
591+
std::string key = extract_key_from_command(command_str);
592+
keys.push_back(key);
512593
}
513594

514595
fclose(file);
@@ -557,3 +638,28 @@ const std::string &monitor_command_list::get_next_sequential_command(size_t *out
557638
if (out_index) *out_index = index;
558639
return commands[index];
559640
}
641+
642+
const std::string &monitor_command_list::get_key(size_t index) const
643+
{
644+
static std::string empty;
645+
if (index >= keys.size()) {
646+
return empty;
647+
}
648+
return keys[index];
649+
}
650+
651+
bool monitor_command_list::has_key(size_t index) const
652+
{
653+
if (index >= keys.size()) {
654+
return false;
655+
}
656+
return !keys[index].empty();
657+
}
658+
659+
size_t monitor_command_list::peek_next_sequential_index() const
660+
{
661+
if (commands.empty()) {
662+
return 0;
663+
}
664+
return next_index.load(std::memory_order_relaxed) % commands.size();
665+
}

config_types.h

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,8 @@ enum command_arg_type
127127
data_type = 2,
128128
monitor_type = 3,
129129
monitor_random_type = 4,
130-
undefined_type = 5
130+
literal_key_type = 5, // Key is a literal value (from monitor commands) - used for cluster routing
131+
undefined_type = 6
131132
};
132133

133134
struct command_arg
@@ -205,15 +206,27 @@ struct monitor_command_list
205206
{
206207
private:
207208
std::vector<std::string> commands;
209+
std::vector<std::string> keys; // Pre-computed key for each command (empty if keyless)
208210
std::atomic<size_t> next_index;
209211

212+
// Extract key from a command string (second argument, or empty for keyless commands)
213+
static std::string extract_key_from_command(const std::string &command);
214+
210215
public:
211216
monitor_command_list() : next_index(0) { ; }
212217

213218
bool load_from_file(const char *filename);
214219
const std::string &get_command(size_t index) const;
215220
const std::string &get_random_command(object_generator *obj_gen, size_t *out_index) const;
216221
const std::string &get_next_sequential_command(size_t *out_index);
222+
223+
// Get the pre-computed key for a command (empty string if keyless)
224+
const std::string &get_key(size_t index) const;
225+
bool has_key(size_t index) const;
226+
227+
// Peek at next sequential index without advancing (for cluster routing)
228+
size_t peek_next_sequential_index() const;
229+
217230
size_t size() const { return commands.size(); }
218231
};
219232

memtier_benchmark.cpp

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1985,6 +1985,12 @@ int main(int argc, char *argv[])
19851985

19861986
// Load monitor input file if specified
19871987
if (cfg.monitor_input) {
1988+
// Monitor input only works with Redis protocols
1989+
if (!is_redis_protocol(cfg.protocol)) {
1990+
fprintf(stderr, "error: --monitor-input is only supported with Redis protocols (redis, resp2, resp3).\n");
1991+
exit(1);
1992+
}
1993+
19881994
if (!cfg.monitor_commands->load_from_file(cfg.monitor_input)) {
19891995
exit(1);
19901996
}
@@ -2050,6 +2056,13 @@ int main(int argc, char *argv[])
20502056
exit(1);
20512057
}
20522058

2059+
// Mark the first argument as a literal key for cluster routing
2060+
// Most Redis commands have the key as the first argument after the command name
2061+
if (cmd.command_args.size() > 0) {
2062+
cmd.command_args[0].type = literal_key_type;
2063+
cmd.keys_count = 1;
2064+
}
2065+
20532066
// Update command name (first word of the command)
20542067
size_t pos = cmd.command.find(" ");
20552068
if (pos == std::string::npos) {

tests/test_monitor_input.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -588,16 +588,13 @@ def test_command_stats_breakdown_by_command(env):
588588
# Check stdout for aggregated output
589589
with open("{0}/mb.stdout".format(config.results_dir)) as stdout:
590590
stdout_content = stdout.read()
591-
env.debugPrint("stdout content:\n{}".format(stdout_content), True)
592591

593592
# Count occurrences of "Sets" and "Gets" in the output
594593
# With aggregation, we should see exactly one "Sets" row and one "Gets" row
595594
lines = stdout_content.split("\n")
596595
sets_count = sum(1 for line in lines if line.strip().startswith("Sets"))
597596
gets_count = sum(1 for line in lines if line.strip().startswith("Gets"))
598597

599-
env.debugPrint("Sets rows: {}, Gets rows: {}".format(sets_count, gets_count), True)
600-
601598
# Should have exactly 1 Sets row and 1 Gets row (aggregated)
602599
env.assertEqual(sets_count, 1)
603600
env.assertEqual(gets_count, 1)
@@ -648,16 +645,13 @@ def test_command_stats_breakdown_by_line(env):
648645
# Check stdout for per-command output
649646
with open("{0}/mb.stdout".format(config.results_dir)) as stdout:
650647
stdout_content = stdout.read()
651-
env.debugPrint("stdout content:\n{}".format(stdout_content), True)
652648

653649
# Count occurrences of "Sets" and "Gets" in the output
654650
# Without aggregation, we should see 2 "Sets" rows and 2 "Gets" rows
655651
lines = stdout_content.split("\n")
656652
sets_count = sum(1 for line in lines if line.strip().startswith("Sets"))
657653
gets_count = sum(1 for line in lines if line.strip().startswith("Gets"))
658654

659-
env.debugPrint("Sets rows: {}, Gets rows: {}".format(sets_count, gets_count), True)
660-
661655
# Should have 2 Sets rows and 2 Gets rows (one per command)
662656
env.assertEqual(sets_count, 2)
663657
env.assertEqual(gets_count, 2)

0 commit comments

Comments
 (0)