diff --git a/src/valkey-benchmark.c b/src/valkey-benchmark.c index a29e5d4fc0..e4793e67b9 100644 --- a/src/valkey-benchmark.c +++ b/src/valkey-benchmark.c @@ -81,6 +81,26 @@ static const char *PLACEHOLDERS[PLACEHOLDER_COUNT] = { "__rand_int__", "__rand_1st__", "__rand_2nd__", "__rand_3rd__", "__rand_4th__", "__rand_5th__", "__rand_6th__", "__rand_7th__", "__rand_8th__", "__rand_9th__"}; +/* Dataset support structures */ +#define MAX_DATASET_FIELDS 64 +#define FIELD_PREFIX "__field:" +#define FIELD_PREFIX_LEN 8 +#define FIELD_SUFFIX "__" +#define FIELD_SUFFIX_LEN 2 + +typedef struct datasetRecord { + sds *fields; +} datasetRecord; + +typedef struct dataset { + char format; /* Format: 'c'=csv, 't'=tsv, 'x'=xml */ + char delimiter; /* Field delimiter for CSV/TSV */ + sds *field_names; /* Field name lookup table */ + int field_count; /* Number of fields */ + datasetRecord *records; /* Structured field data */ + size_t record_count; /* Number of records */ +} dataset; + struct benchmarkThread; struct clusterNode; struct serverConfig; @@ -150,6 +170,15 @@ static struct config { atomic_uint_fast64_t last_time_ns; uint64_t time_per_token; uint64_t time_per_burst; + /* Dataset support */ + sds dataset_file; + int max_documents; /* Maximum documents to load from dataset */ + sds xml_root_element; /* XML root element name */ + dataset *current_dataset; /* Current loaded dataset */ + /* Command template for dataset mode */ + int template_argc; + sds *template_argv; + int has_field_placeholders; } config; /* Locations of the placeholders __rand_int__, __rand_1st__, @@ -226,6 +255,13 @@ static void freeServerConfig(serverConfig *cfg); static int fetchClusterSlotsConfiguration(client c); static void updateClusterSlotsConfiguration(void); static long long showThroughput(struct aeEventLoop *eventLoop, long long id, void *clientData); +static sds getFieldValue(const char *row, int column_index, char delimiter); +static sds getXmlFieldValue(const char *xml_doc, const char *field_name); +static sds generateCompleteCommand(int record_index); +static sds formatBytes(size_t bytes); +static dataset *initDataset(void); +static void freeDataset(dataset *ds); +static void reportDatasetMemory(dataset *ds); /* Dict callbacks */ static uint64_t dictSdsHash(const void *key); @@ -840,8 +876,25 @@ static void writeHandler(aeEventLoop *el, int fd, void *privdata, int mask) { return; } - /* Really initialize: replace keys and set start time. */ - if (config.replace_placeholders) replacePlaceholders(c->obuf + c->prefixlen, config.pipeline); + /* Dataset field access mode - completely independent command generation */ + if (config.has_field_placeholders && config.current_dataset && config.current_dataset->record_count > 0) { + static _Atomic uint64_t record_counter = 0; + + /* Generate complete pipeline commands for dataset placeholders */ + sdssetlen(c->obuf, c->prefixlen); + for (int p = 0; p < config.pipeline; p++) { + uint64_t record_index = atomic_fetch_add_explicit(&record_counter, 1, memory_order_relaxed) % config.current_dataset->record_count; + sds complete_cmd = generateCompleteCommand(record_index); + c->obuf = sdscatlen(c->obuf, complete_cmd, sdslen(complete_cmd)); + sdsfree(complete_cmd); + } + } else { + /* Standard mode */ + if (config.replace_placeholders) { + replacePlaceholders(c->obuf + c->prefixlen, config.pipeline); + } + } + if (config.cluster_mode && c->staglen > 0) setClusterKeyHashTag(c); c->slots_last_update = atomic_load_explicit(&config.slots_last_update, memory_order_relaxed); c->start = ustime(); @@ -1587,6 +1640,617 @@ static void updateClusterSlotsConfiguration(void) { pthread_mutex_unlock(&config.is_updating_slots_mutex); } +/* Validate field placeholders in command arguments */ +static void validateFieldPlaceholders(sds *template_argv, int template_argc) { + for (int arg_idx = 0; arg_idx < template_argc; arg_idx++) { + const char *arg = template_argv[arg_idx]; + const char *field_pos = strstr(arg, FIELD_PREFIX); + while (field_pos) { + const char *field_start = field_pos + FIELD_PREFIX_LEN; + const char *field_end = strstr(field_start, FIELD_SUFFIX); + if (!field_end) break; + + /* Extract and validate field name */ + size_t field_name_len = field_end - field_start; + sds field_name = sdsnewlen(field_start, field_name_len); + + int field_found = 0; + for (int k = 0; k < config.current_dataset->field_count; k++) { + if (!strcmp(field_name, config.current_dataset->field_names[k])) { + field_found = 1; + break; + } + } + + if (!field_found) { + fprintf(stderr, "Error: Field placeholder '__field:%s__' not found in dataset fields\n", field_name); + fprintf(stderr, "Available fields: "); + for (int j = 0; j < config.current_dataset->field_count; j++) { + fprintf(stderr, "%s%s", config.current_dataset->field_names[j], + (j < config.current_dataset->field_count - 1) ? ", " : "\n"); + } + sdsfree(field_name); + exit(1); + } + + sdsfree(field_name); + field_pos = strstr(field_end + FIELD_SUFFIX_LEN, FIELD_PREFIX); + } + } +} + +/* Format bytes into human-readable string */ +static sds formatBytes(size_t bytes) { + if (bytes < 1024) { + return sdscatprintf(sdsempty(), "%zu bytes", bytes); + } else if (bytes < 1024 * 1024) { + return sdscatprintf(sdsempty(), "%.2f KB", bytes / 1024.0); + } else if (bytes < 1024 * 1024 * 1024) { + return sdscatprintf(sdsempty(), "%.2f MB", bytes / (1024.0 * 1024.0)); + } else { + return sdscatprintf(sdsempty(), "%.2f GB", bytes / (1024.0 * 1024.0 * 1024.0)); + } +} + +/* CSV field discovery - parse header line */ +static int csvDiscoverFields(dataset *ds) { + FILE *fp = fopen(config.dataset_file, "r"); + if (!fp) { + fprintf(stderr, "Cannot open dataset file: %s\n", config.dataset_file); + return 0; + } + + char *line = NULL; + size_t len = 0; + if (getline(&line, &len, fp) == -1) { + fprintf(stderr, "Cannot read header from dataset file\n"); + free(line); + fclose(fp); + return 0; + } + + /* Remove trailing newlines */ + len = strlen(line); + if (len > 0 && line[len - 1] == '\n') line[len - 1] = '\0'; + if (len > 1 && line[len - 2] == '\r') line[len - 2] = '\0'; + + /* Parse header using delimiter */ + int count; + char delim_str[2] = {ds->delimiter, '\0'}; + ds->field_names = sdssplitlen(line, strlen(line), delim_str, 1, &count); + ds->field_count = count; + + free(line); + fclose(fp); + return 1; +} + +/* Generic document limit check */ +static int shouldStopLoading(size_t current_count) { + if (config.max_documents > 0 && (int)current_count >= config.max_documents) { + return 1; + } + return 0; +} + + +static int scanXmlFields(const char *doc_start, const char *doc_end, dataset *ds, const char *xml_root_element) { + char field_names[MAX_DATASET_FIELDS][64]; + int field_count = 0; + + char start_root_tag[64], end_root_tag[64]; + snprintf(start_root_tag, sizeof(start_root_tag), "<%s>", xml_root_element); + snprintf(end_root_tag, sizeof(end_root_tag), "", xml_root_element); + int root_start_tag_len = strlen(start_root_tag); + int root_end_tag_len = strlen(end_root_tag); + + const char *current_pos = doc_start; + while ((current_pos = strchr(current_pos, '<')) != NULL && current_pos < doc_end) { + if (current_pos[1] == '/' || current_pos[1] == '!' || + !strncmp(current_pos, start_root_tag, root_start_tag_len) || + !strncmp(current_pos, end_root_tag, root_end_tag_len)) { + current_pos++; + continue; + } + + const char *tag_end = strchr(current_pos, '>'); + if (!tag_end || tag_end >= doc_end) break; + + const char *field_start = current_pos + 1; + size_t field_name_len = tag_end - field_start; + + if (field_name_len == 0 || field_name_len >= 64 || memchr(field_start, ' ', field_name_len)) { + current_pos = tag_end + 1; + continue; + } + + int is_duplicate = 0; + for (int i = 0; i < field_count; i++) { + if (strlen(field_names[i]) == field_name_len && + !memcmp(field_names[i], field_start, field_name_len)) { + is_duplicate = 1; + break; + } + } + + if (!is_duplicate && field_count < MAX_DATASET_FIELDS) { + memcpy(field_names[field_count], field_start, field_name_len); + field_names[field_count][field_name_len] = '\0'; + field_count++; + } + + current_pos = tag_end + 1; + } + + if (field_count == 0) return 0; + + ds->field_names = zmalloc(field_count * sizeof(sds)); + for (int i = 0; i < field_count; i++) { + ds->field_names[i] = sdsnew(field_names[i]); + } + ds->field_count = field_count; + + return 1; +} + +static int loadXmlDataset(dataset *ds) { + FILE *fp = fopen(config.dataset_file, "r"); + if (!fp) return 0; + + if (!config.xml_root_element) { + fprintf(stderr, "Error: XML dataset requires --xml-root-element parameter\n"); + fprintf(stderr, "Example: --xml-root-element doc\n"); + fclose(fp); + return 0; + } + + char start_tag[64], end_tag[64]; + snprintf(start_tag, sizeof(start_tag), "<%s>", config.xml_root_element); + snprintf(end_tag, sizeof(end_tag), "", config.xml_root_element); + + char buffer[1024]; + sds current_doc = sdsempty(); + int fields_discovered = 0; + size_t capacity = 1000; + + ds->records = zmalloc(sizeof(datasetRecord) * capacity); + + if (!config.quiet) { + printf("Loading XML dataset from %s...\n", config.dataset_file); + } + + while (fgets(buffer, sizeof(buffer), fp) && !shouldStopLoading(ds->record_count)) { + current_doc = sdscat(current_doc, buffer); + + const char *doc_start = strstr(current_doc, start_tag); + if (!doc_start) continue; + + const char *doc_end = strstr(doc_start, end_tag); + if (!doc_end) continue; + + doc_end += strlen(end_tag); + + if (!fields_discovered) { + if (!scanXmlFields(doc_start, doc_end, ds, config.xml_root_element)) { + fprintf(stderr, "No XML fields discovered\n"); + sdsfree(current_doc); + fclose(fp); + return 0; + } + fields_discovered = 1; + + if (!config.quiet) { + printf("Discovered %d fields: ", ds->field_count); + for (int i = 0; i < ds->field_count; i++) { + printf("%s%s", ds->field_names[i], (i < ds->field_count - 1) ? ", " : "\n"); + } + } + } + + if (ds->record_count >= capacity) { + capacity *= 2; + ds->records = zrealloc(ds->records, sizeof(datasetRecord) * capacity); + } + + datasetRecord *record = &ds->records[ds->record_count]; + record->fields = zmalloc(sizeof(sds) * ds->field_count); + + sds doc_str = sdsnewlen(doc_start, doc_end - doc_start); + for (int i = 0; i < ds->field_count; i++) { + record->fields[i] = getXmlFieldValue(doc_str, ds->field_names[i]); + } + sdsfree(doc_str); + + ds->record_count++; + + if (!config.quiet && ds->record_count % 10000 == 0) { + printf("\rLoaded %zu documents...", ds->record_count); + fflush(stdout); + } + + sdsclear(current_doc); + } + + sdsfree(current_doc); + fclose(fp); + return 1; +} + +/* CSV/TSV structured record loader */ +static int csvLoadDocuments(dataset *ds) { + FILE *fp = fopen(config.dataset_file, "r"); + if (!fp) return 0; + + /* Skip header */ + char *line = NULL; + size_t len = 0; + if (getline(&line, &len, fp) == -1) { + fprintf(stderr, "Cannot read header from dataset file\n"); + free(line); + fclose(fp); + return 0; + } + + size_t capacity = 1000; + ds->records = zmalloc(sizeof(datasetRecord) * capacity); + + const char *format_name = (ds->format == 'c') ? "csv" : (ds->format == 't') ? "tsv" + : "xml"; + if (!config.quiet) { + printf("Loading %s dataset from %s...\n", format_name, config.dataset_file); + } + + while (getline(&line, &len, fp) != -1 && !shouldStopLoading(ds->record_count)) { + if (line[0] == '\0' || line[0] == '\n') continue; + + /* Clean line endings */ + size_t line_len = strlen(line); + if (line_len > 0 && line[line_len - 1] == '\n') line[line_len - 1] = '\0'; + if (line_len > 1 && line[line_len - 2] == '\r') line[line_len - 2] = '\0'; + + if (ds->record_count >= capacity) { + capacity *= 2; + ds->records = zrealloc(ds->records, sizeof(datasetRecord) * capacity); + } + + /* Extract field values into structured record */ + datasetRecord *record = &ds->records[ds->record_count]; + record->fields = zmalloc(sizeof(sds) * ds->field_count); + + for (int i = 0; i < ds->field_count; i++) { + record->fields[i] = getFieldValue(line, i, ds->delimiter); + } + + ds->record_count++; + } + + free(line); + fclose(fp); + return 1; +} + +/* Initialize dataset from file */ +static dataset *initDataset(void) { + dataset *ds = zcalloc(sizeof(dataset)); + if (!ds) return NULL; + + /* Detect file format from extension, handle tmpfile suffixes */ + const char *filename = config.dataset_file; + if (strstr(filename, ".csv")) { + ds->format = 'c'; + ds->delimiter = ','; + } else if (strstr(filename, ".tsv")) { + ds->format = 't'; + ds->delimiter = '\t'; + } else if (strstr(filename, ".xml")) { + ds->format = 'x'; + ds->delimiter = 0; + } else { + ds->format = 'c'; + ds->delimiter = ','; + } + + /* Load documents and discover fields */ + if (ds->format == 'x') { + if (!loadXmlDataset(ds)) goto error; + } else { + /* CSV/TSV: discover fields first, then load documents */ + if (!csvDiscoverFields(ds)) goto error; + if (!csvLoadDocuments(ds)) goto error; + } + + return ds; + +error: + freeDataset(ds); + return NULL; +} + +/* Free dataset */ +static void freeDataset(dataset *ds) { + if (!ds) return; + + /* Free field names */ + if (ds->field_names) { + sdsfreesplitres(ds->field_names, ds->field_count); + } + + /* Free structured records */ + if (ds->records) { + for (size_t i = 0; i < ds->record_count; i++) { + if (ds->records[i].fields) { + for (int j = 0; j < ds->field_count; j++) { + sdsfree(ds->records[i].fields[j]); + } + zfree(ds->records[i].fields); + } + } + zfree(ds->records); + } + + zfree(ds); +} + +/* Extract field value from CSV/TSV row */ +static sds getFieldValue(const char *row, int column_index, char delimiter) { + int current_col = 0; + const char *start = row; + const char *p = row; + int in_quotes = 0; + + while (*p) { + if (*p == '"') { + in_quotes = !in_quotes; + } else if (*p == delimiter && !in_quotes) { + if (current_col == column_index) { + /* Found our column */ + size_t len = p - start; + /* Remove surrounding quotes if present */ + if (len > 0 && start[0] == '"' && p[-1] == '"') { + start++; + len -= 2; + } + return sdsnewlen(start, len); + } + current_col++; + start = p + 1; + } + p++; + } + + /* Last column or only column */ + if (current_col == column_index) { + size_t len = p - start; + if (len > 0 && start[0] == '"' && p[-1] == '"') { + start++; + len -= 2; + } + return sdsnewlen(start, len); + } + + return sdsempty(); +} + +/* Extract field value from XML document */ +static sds getXmlFieldValue(const char *xml_doc, const char *field_name) { + char start_tag[128], end_tag[128]; + snprintf(start_tag, sizeof(start_tag), "<%s>", field_name); + snprintf(end_tag, sizeof(end_tag), "", field_name); + + const char *tag_start = strstr(xml_doc, start_tag); + if (!tag_start) return sdsempty(); + + const char *content_start = tag_start + strlen(start_tag); + const char *tag_end = strstr(content_start, end_tag); + if (!tag_end) return sdsempty(); + + size_t content_len = tag_end - content_start; + return sdsnewlen(content_start, content_len); +} + +/* Calculate total memory required for dataset benchmarking */ +static void reportDatasetMemory(dataset *ds) { + if (!config.quiet) { + /* Calculate total memory from structured records */ + size_t total_memory = 0; + for (size_t i = 0; i < ds->record_count; i++) { + for (int j = 0; j < ds->field_count; j++) { + total_memory += sdslen(ds->records[i].fields[j]); + } + } + sds size_str = formatBytes(total_memory); + printf("Dataset: %zu documents (%s)\n", ds->record_count, size_str); + sdsfree(size_str); + } +} + + +/* Find field index in dataset by name */ +static int findFieldIndex(const char *field_name, size_t field_name_len) { + for (int k = 0; k < config.current_dataset->field_count; k++) { + if (strlen(config.current_dataset->field_names[k]) == field_name_len && + !memcmp(config.current_dataset->field_names[k], field_name, field_name_len)) { + return k; + } + } + return -1; +} + +/* Extract field value from dataset record */ +static const char *extractDatasetFieldValue(int field_idx, int record_index) { + return config.current_dataset->records[record_index].fields[field_idx]; +} + +static sds replaceOccurrence(sds processed_arg, const char *pos, const char *replacement) { + size_t offset = pos - processed_arg; + size_t replacement_len = strlen(replacement); + size_t total_len = offset + replacement_len + (sdslen(processed_arg) - offset - PLACEHOLDER_LEN); + + /* Single allocation for final result */ + sds result = sdsnewlen(NULL, total_len); + char *p = result; + + memcpy(p, processed_arg, offset); + p += offset; + + memcpy(p, replacement, replacement_len); + p += replacement_len; + + const char *after_start = pos + PLACEHOLDER_LEN; + size_t after_len = sdslen(processed_arg) - offset - PLACEHOLDER_LEN; + memcpy(p, after_start, after_len); + + sdsfree(processed_arg); + return result; +} + +/* Process field placeholders in a single argument */ +static sds processFieldsInArg(sds arg, int record_index) { + if (!strstr(arg, FIELD_PREFIX)) return arg; + + const char *field_pos = strstr(arg, FIELD_PREFIX); + const char *field_start = field_pos + FIELD_PREFIX_LEN; + const char *field_end = strstr(field_start, FIELD_SUFFIX); + if (!field_end) return arg; + + size_t field_name_len = field_end - field_start; + int field_idx = findFieldIndex(field_start, field_name_len); + if (field_idx == -1) return arg; + + const char *field_value = extractDatasetFieldValue(field_idx, record_index); + size_t before_len = field_pos - arg; + const char *after_start = field_end + FIELD_SUFFIX_LEN; + + sds result = sdsnewlen(arg, before_len); + result = sdscat(result, field_value); + result = sdscat(result, after_start); + + sdsfree(arg); + return result; +} + + +static sds processRandPlaceholdersForDataSet(sds cmd, _Atomic uint64_t *seq_key) { + if (!config.replace_placeholders || config.keyspacelen == 0) return cmd; + + for (int ph = 0; ph < PLACEHOLDER_COUNT; ph++) { + if (!strstr(cmd, PLACEHOLDERS[ph])) continue; + + uint64_t shared_key = 0; + int generate_shared_key = (ph != 0); + + if (generate_shared_key) { + /* Generate shared key for __rand_1st__ - __rand_9th__ */ + if (config.sequential_replacement) { + shared_key = atomic_fetch_add_explicit(&seq_key[ph], 1, memory_order_relaxed); + } else { + shared_key = random(); + } + shared_key %= config.keyspacelen; + } + + /* Process all occurrences */ + size_t search_offset = 0; + char *pos; + while ((pos = strstr(cmd + search_offset, PLACEHOLDERS[ph])) != NULL) { + uint64_t key = generate_shared_key ? shared_key : 0; + + if (!generate_shared_key) { + /* __rand_int__: Generate different key per occurrence */ + if (config.sequential_replacement) { + key = atomic_fetch_add_explicit(&seq_key[ph], 1, memory_order_relaxed); + } else { + key = random(); + } + key %= config.keyspacelen; + } + + char key_str[24]; + snprintf(key_str, sizeof(key_str), "%012llu", (unsigned long long)key); + + size_t offset = pos - cmd; + cmd = replaceOccurrence(cmd, pos, key_str); + search_offset = offset + PLACEHOLDER_LEN; + } + } + + return cmd; +} + +/* Generate complete command with field placeholders replaced before RESP encoding */ +static sds generateCompleteCommand(int record_index) { + static _Atomic uint64_t seq_key[PLACEHOLDER_COUNT] = {0}; + + sds *processed_argv = zmalloc(config.template_argc * sizeof(sds)); + for (int i = 0; i < config.template_argc; i++) { + processed_argv[i] = processFieldsInArg(sdsdup(config.template_argv[i]), record_index); + } + + char *cmd; + int len = valkeyFormatCommandArgv(&cmd, config.template_argc, (const char **)processed_argv, NULL); + sds result = sdsnewlen(cmd, len); + free(cmd); + + result = processRandPlaceholdersForDataSet(result, seq_key); + + /* Clean up processed arguments */ + for (int i = 0; i < config.template_argc; i++) { + sdsfree(processed_argv[i]); + } + zfree(processed_argv); + + return result; +} + +/* Free dataset memory */ +static void cleanupDataset(void) { + if (config.current_dataset) { + freeDataset(config.current_dataset); + config.current_dataset = NULL; + } +} + +/* Add RESP command to sequence with repeat count */ +static void addRespCommandToSequence(sds *sds_args, size_t *argvlen, int start, int end, int repeat, sds *cmd_seq, int *seq_len) { + char *cmd = NULL; + int len = valkeyFormatCommandArgv(&cmd, end - start, (const char **)sds_args + start, argvlen + start); + for (int j = 0; j < repeat; j++) { + *cmd_seq = sdscatlen(*cmd_seq, cmd, len); + } + *seq_len += repeat; + free(cmd); +} + +/* Process command sequence for dataset or standard mode */ +static void processCommandSequence(sds *sds_args, size_t *argvlen, int start, int end, int repeat, sds *cmd_seq, int *seq_len) { + if (config.current_dataset) { + /* Check if this command has field placeholders */ + config.has_field_placeholders = 0; + for (int check_idx = start; check_idx < end; check_idx++) { + if (strstr(sds_args[check_idx], FIELD_PREFIX)) { + config.has_field_placeholders = 1; + break; + } + } + + if (config.has_field_placeholders) { + /* Store command template */ + config.template_argc = end - start; + config.template_argv = zmalloc(config.template_argc * sizeof(sds)); + for (int tmpl_idx = 0; tmpl_idx < config.template_argc; tmpl_idx++) { + config.template_argv[tmpl_idx] = sdsdup(sds_args[start + tmpl_idx]); + } + + validateFieldPlaceholders(config.template_argv, config.template_argc); + + /* Pipelining is now supported with structured field cache */ + } + } + + /* Add command to sequence */ + addRespCommandToSequence(sds_args, argvlen, start, end, repeat, cmd_seq, seq_len); +} + /* Generate random data for the benchmark. See #7196. */ static void genBenchmarkRandomData(char *data, int count) { static uint32_t state = 1234; @@ -1745,6 +2409,16 @@ int parseOptions(int argc, char **argv) { config.num_functions = atoi(argv[++i]); } else if (!strcmp(argv[i], "--num-keys-in-fcall")) { config.num_keys_in_fcall = atoi(argv[++i]); + } else if (!strcmp(argv[i], "--dataset")) { + if (lastarg) goto invalid; + config.dataset_file = sdsnew(argv[++i]); + } else if (!strcmp(argv[i], "--maxdocs")) { + if (lastarg) goto invalid; + config.max_documents = atoi(argv[++i]); + if (config.max_documents <= 0) config.max_documents = -1; + } else if (!strcmp(argv[i], "--xml-root-element")) { + if (lastarg) goto invalid; + config.xml_root_element = sdsnew(argv[++i]); } else if (!strcmp(argv[i], "--help")) { exit_status = 0; goto usage; @@ -1851,6 +2525,8 @@ int parseOptions(int argc, char **argv) { "__rand_1st__ Like __rand_int__ but multiple occurrences will have the same\n" " value. __rand_2nd__ through __rand_9th__ are also available.\n" " __data__ Replaced with data of the size specified by the -d option.\n" + " __field:name__ Replaced with data from the specified field/column in the\n" + " dataset. Requires --dataset option.\n" " {tag} Replaced with a tag that routes the command to each node in\n" " a cluster. Include this in key names when running in cluster\n" " mode.\n" @@ -1928,7 +2604,9 @@ int parseOptions(int argc, char **argv) { " loaded when running the 'function_load' test. (default 10).\n" " --num-keys-in-fcall \n" " Sets the number of keys passed to FCALL command when running\n" - " the 'fcall' test. (default 1)\n", + " the 'fcall' test. (default 1)\n" + " --dataset Path to CSV/TSV/XML dataset file for field placeholder replacement.\n" + " All fields auto-detected with natural content lengths.\n", tls_usage, rdma_usage, " --mptcp Enable an MPTCP connection.\n" @@ -2105,12 +2783,36 @@ int main(int argc, char **argv) { config.num_functions = 10; config.num_keys_in_fcall = 1; config.resp3 = 0; + config.dataset_file = NULL; + config.max_documents = -1; /* -1 = unlimited */ + config.xml_root_element = NULL; + config.current_dataset = NULL; + config.template_argc = 0; + config.template_argv = NULL; + config.has_field_placeholders = 0; resetPlaceholders(); i = parseOptions(argc, argv); argc -= i; argv += i; + /* Setup dataset if specified */ + if (config.dataset_file) { + if (argc == 0) { + fprintf(stderr, "Error: Dataset mode requires a command with field placeholders\n"); + fprintf(stderr, "Example: SET doc:__rand_int__ \"__field:content__\"\n"); + exit(1); + } + + config.current_dataset = initDataset(); + if (!config.current_dataset) { + fprintf(stderr, "Failed to initialize dataset\n"); + exit(1); + } + + reportDatasetMemory(config.current_dataset); + } + tag = ""; #ifdef USE_OPENSSL @@ -2252,18 +2954,15 @@ int main(int argc, char **argv) { } else if (i == argc || strcmp(";", sds_args[i]) == 0) { cmd = NULL; if (i == start) continue; - /* End of command. RESP-encode and append to sequence. */ - len = valkeyFormatCommandArgv(&cmd, i - start, - (const char **)sds_args + start, - argvlen + start); - for (int j = 0; j < repeat; j++) { - cmd_seq = sdscatlen(cmd_seq, cmd, len); - } - seq_len += repeat; - free(cmd); + + processCommandSequence(sds_args, argvlen, start, i, repeat, &cmd_seq, &seq_len); start = i + 1; repeat = 1; } else if (strstr(sds_args[i], "__data__")) { + if (config.current_dataset) { + fprintf(stderr, "Error: __data__ placeholders cannot be used with --dataset option\n"); + exit(1); + } /* Replace data placeholders with data of length given by -d. */ int num_parts; sds *parts = sdssplitlen(sds_args[i], sdslen(sds_args[i]), @@ -2282,6 +2981,7 @@ int main(int argc, char **argv) { sds_args[i] = newarg; argvlen[i] = sdslen(sds_args[i]); } + /* NOTE: Field placeholder processing is handled above in the command-level loop to ensure row consistency */ } len = sdslen(cmd_seq); /* adjust the datasize to the parsed command */ @@ -2502,6 +3202,15 @@ int main(int argc, char **argv) { freeCliConnInfo(config.conn_info); if (config.server_config != NULL) freeServerConfig(config.server_config); resetPlaceholders(); + cleanupDataset(); + + /* Clean up command template */ + if (config.template_argv) { + for (int i = 0; i < config.template_argc; i++) { + sdsfree(config.template_argv[i]); + } + zfree(config.template_argv); + } return 0; } diff --git a/tests/integration/valkey-benchmark.tcl b/tests/integration/valkey-benchmark.tcl index 8b3a30741e..13d2e464c3 100644 --- a/tests/integration/valkey-benchmark.tcl +++ b/tests/integration/valkey-benchmark.tcl @@ -198,6 +198,224 @@ tags {"benchmark network external:skip logreqres:skip"} { assert {$different_count > 0} } + test {benchmark: dataset CSV with field placeholders} { + # Create test CSV dataset + set csv_data "title,content,author\nTest Title 1,Test Content 1,Author 1\nTest Title 2,Test Content 2,Author 2" + set csv_file [tmpfile "dataset.csv"] + set fd [open $csv_file w] + puts $fd $csv_data + close $fd + + set cmd [valkeybenchmark $master_host $master_port "--dataset $csv_file -n 4 -r 10 -- HSET doc:__rand_int__ title \"__field:title__\" content \"__field:content__\""] + common_bench_setup $cmd + assert_match {*calls=4,*} [cmdstat hset] + + # Verify field data was inserted correctly + set keys [r keys "doc:*"] + assert {[llength $keys] > 0} + set sample_key [lindex $keys 0] + set title [r hget $sample_key title] + set content [r hget $sample_key content] + assert {$title eq "Test Title 1" || $title eq "Test Title 2"} + assert {$content eq "Test Content 1" || $content eq "Test Content 2"} + + file delete $csv_file + } + + test {benchmark: dataset XML with field placeholders} { + # Create test XML dataset matching Wikipedia structure + set xml_data "XML Title 1XML Abstract 1http://example1.comtest1http://test1.com\nXML Title 2XML Abstract 2http://example2.comtest2http://test2.com" + set xml_file [tmpfile "dataset.xml"] + set fd [open $xml_file w] + puts $fd $xml_data + close $fd + + set cmd [valkeybenchmark $master_host $master_port "--dataset $xml_file --xml-root-element doc -n 4 -r 10 -- HSET xml_doc:__rand_int__ title \"__field:title__\" abstract \"__field:abstract__\""] + common_bench_setup $cmd + assert_match {*calls=4,*} [cmdstat hset] + + # Verify XML field data was inserted correctly + set keys [r keys "xml_doc:*"] + assert {[llength $keys] > 0} + set sample_key [lindex $keys 0] + set title [r hget $sample_key title] + set abstract [r hget $sample_key abstract] + assert {$title eq "XML Title 1" || $title eq "XML Title 2"} + assert {$abstract eq "XML Abstract 1" || $abstract eq "XML Abstract 2"} + + file delete $xml_file + } + + test {benchmark: dataset with maxdocs limit} { + # Create test dataset with multiple rows + set csv_data "name,value\nitem1,value1\nitem2,value2\nitem3,value3\nitem4,value4" + set csv_file [tmpfile "dataset.csv"] + set fd [open $csv_file w] + puts $fd $csv_data + close $fd + + set cmd [valkeybenchmark $master_host $master_port "--dataset $csv_file --maxdocs 2 -n 4 -r 10 -- SET item:__rand_int__ \"__field:value__\""] + common_bench_setup $cmd + assert_match {*calls=4,*} [cmdstat set] + + # Should only use first 2 documents due to maxdocs limit + set keys [r keys "item:*"] + assert {[llength $keys] > 0} + + # Verify ALL keys only contain values from first 2 documents + set unique_values {} + foreach key $keys { + set value [r get $key] + assert {$value eq "value1" || $value eq "value2"} + if {[lsearch $unique_values $value] == -1} { + lappend unique_values $value + } + } + + file delete $csv_file + } + + test {benchmark: dataset error handling - invalid field} { + set csv_data "name,value\nitem1,value1" + set csv_file [tmpfile "dataset.csv"] + set fd [open $csv_file w] + puts $fd $csv_data + close $fd + + set cmd [valkeybenchmark $master_host $master_port "--dataset $csv_file -n 1 -- SET item:__rand_int__ \"__field:invalid_field__\""] + + # Should fail with invalid field error + if {[catch { exec {*}$cmd } error]} { + assert_match "*not found in dataset fields*" $error + } else { + fail "Expected error for invalid field placeholder" + } + + file delete $csv_file + } + + test {benchmark: dataset TSV with field placeholders} { + # Create test TSV dataset (tab-separated values) + set tsv_data "name\tvalue\tcount\nitem1\tvalue1\t100\nitem2\tvalue2\t200" + set tsv_file [tmpfile "dataset.tsv"] + set fd [open $tsv_file w] + puts $fd $tsv_data + close $fd + + set cmd [valkeybenchmark $master_host $master_port "--dataset $tsv_file -n 4 -r 10 -- HSET tsv_doc:__rand_int__ name \"__field:name__\" value \"__field:value__\" count __field:count__"] + common_bench_setup $cmd + assert_match {*calls=4,*} [cmdstat hset] + + # Verify TSV field data was inserted correctly + set keys [r keys "tsv_doc:*"] + assert {[llength $keys] > 0} + set sample_key [lindex $keys 0] + set name [r hget $sample_key name] + set value [r hget $sample_key value] + set count [r hget $sample_key count] + assert {$name eq "item1" || $name eq "item2"} + assert {$value eq "value1" || $value eq "value2"} + assert {$count eq "100" || $count eq "200"} + + file delete $tsv_file + } + + test {benchmark: XML dataset missing root element error} { + # Create test XML dataset + set xml_data "XML Title 1XML Abstract 1" + set xml_file [tmpfile "dataset.xml"] + set fd [open $xml_file w] + puts $fd $xml_data + close $fd + + # Should fail without --xml-root-element parameter + set cmd [valkeybenchmark $master_host $master_port "--dataset $xml_file -n 1 -- SET xml:__rand_int__ \"__field:title__\""] + + if {[catch { exec {*}$cmd } error]} { + assert_match "*XML dataset requires --xml-root-element parameter*" $error + } else { + fail "Expected error for XML dataset without --xml-root-element" + } + + file delete $xml_file + } + + test {benchmark: dataset with maxdocs larger than available documents} { + # Create test dataset with only 2 rows but request maxdocs=5 + set csv_data "name,value\nitem1,value1\nitem2,value2" + set csv_file [tmpfile "dataset.csv"] + set fd [open $csv_file w] + puts $fd $csv_data + close $fd + + set cmd [valkeybenchmark $master_host $master_port "--dataset $csv_file --maxdocs 5 -n 4 -r 10 -- SET item:__rand_int__ \"__field:value__\""] + common_bench_setup $cmd + assert_match {*calls=4,*} [cmdstat set] + + # Should gracefully use all available documents (2), cycling through them + set keys [r keys "item:*"] + assert {[llength $keys] > 0} + + # All values should still be only from available documents + foreach key $keys { + set value [r get $key] + assert {$value eq "value1" || $value eq "value2"} + } + + file delete $csv_file + } + + test {benchmark: mixed placeholders - dataset fields and rand placeholders} { + # Test combining __field:name__ with __rand_int__ placeholders + set csv_data "category,description\nuser,User Management\norder,Order Processing" + set csv_file [tmpfile "dataset.csv"] + set fd [open $csv_file w] + puts $fd $csv_data + close $fd + + set cmd [valkeybenchmark $master_host $master_port "--dataset $csv_file -n 6 -r 100 -- HSET mixed:__rand_int__ category \"__field:category__\" desc \"__field:description__\" score __rand_1st__"] + common_bench_setup $cmd + assert_match {*calls=6,*} [cmdstat hset] + + # Verify both field and random placeholders work together + set keys [r keys "mixed:*"] + assert {[llength $keys] > 0} + set sample_key [lindex $keys 0] + set category [r hget $sample_key category] + set desc [r hget $sample_key desc] + set score [r hget $sample_key score] + + # Field placeholders should contain dataset values + assert {$category eq "user" || $category eq "order"} + assert {$desc eq "User Management" || $desc eq "Order Processing"} + + # Random placeholder should be a 12-digit number + assert {[string length $score] == 12} + assert {[string is integer $score]} + + file delete $csv_file + } + + test {benchmark: dataset mode requires field placeholders} { + set csv_data "name,value\nitem1,value1\nitem2,value2" + set csv_file [tmpfile "dataset.csv"] + set fd [open $csv_file w] + puts $fd $csv_data + close $fd + + # Dataset mode should require field placeholders in the command + set cmd [valkeybenchmark $master_host $master_port "--dataset $csv_file -n 10 -r 10 -t set"] + + # Should fail with error about missing field placeholders + if {[catch { exec {*}$cmd } error]} { + assert_match "*Dataset mode requires a command with field placeholders*" $error + } else { + fail "Expected error for dataset mode without field placeholders" + } + + file delete $csv_file + } + test {benchmark: sequential zadd results in expected number of keys} { set cmd [valkeybenchmark $master_host $master_port "-r 50 -n 50 --sequential -t zadd"] common_bench_setup $cmd