Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/module.c
Original file line number Diff line number Diff line change
Expand Up @@ -1385,6 +1385,8 @@ int VM_CreateCommand(ValkeyModuleCtx *ctx,
serverAssert(hashtableAdd(server.commands, cp->serverCmd));
serverAssert(hashtableAdd(server.orig_commands, cp->serverCmd));
cp->serverCmd->id = ACLGetCommandID(declared_name); /* ID used for ACL. */
/* Invalidate COMMAND response cache since we added a new command */
invalidateCommandCache();
return VALKEYMODULE_OK;
}

Expand Down Expand Up @@ -12530,6 +12532,14 @@ int moduleFreeCommand(struct ValkeyModule *module, struct serverCommand *cmd) {
hdr_close(cmd->latency_histogram);
cmd->latency_histogram = NULL;
}
if (cmd->info_cache_resp2) {
sdsfree(cmd->info_cache_resp2);
cmd->info_cache_resp2 = NULL;
}
if (cmd->info_cache_resp3) {
sdsfree(cmd->info_cache_resp3);
cmd->info_cache_resp3 = NULL;
}
moduleFreeArgs(cmd->args, cmd->num_args);
zfree(cp);

Expand Down Expand Up @@ -12571,6 +12581,8 @@ void moduleUnregisterCommands(struct ValkeyModule *module) {
zfree(cmd);
}
hashtableResetIterator(&iter);
/* Invalidate COMMAND response cache since we removed commands */
invalidateCommandCache();
}

/* We parse argv to add sds "NAME VALUE" pairs to the server.module_configs_queue list of configs.
Expand Down
133 changes: 111 additions & 22 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -2342,6 +2342,8 @@ void initServerConfig(void) {
* valkey.conf using the rename-command directive. */
server.commands = hashtableCreate(&commandSetType);
server.orig_commands = hashtableCreate(&originalCommandSetType);
server.command_response_cache_resp2 = NULL;
server.command_response_cache_resp3 = NULL;
populateCommandTable();

/* Debugging */
Expand Down Expand Up @@ -3282,6 +3284,10 @@ int populateCommandStructure(struct serverCommand *c) {
* has been issued for the first time */
c->latency_histogram = NULL;

/* Initialize command info cache */
c->info_cache_resp2 = NULL;
c->info_cache_resp3 = NULL;

/* Handle the legacy range spec and the "movablekeys" flag (must be done after populating all key specs). */
populateCommandLegacyRangeSpec(c);

Expand Down Expand Up @@ -5222,30 +5228,78 @@ void addReplyCommandSubCommands(client *c,
hashtableResetIterator(&iter);
}

/* Collect all output from a caching client (both buffer and reply list) */
static sds collectCachedResponse(client *c) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be a common function associated with a cached response client. It's unclear why this is different from aggregateClientOutputBuffer and I tagged @roshkhatri for info.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aggregateClientOutputBuffer assumes c->bufpos == 0 which is not the case here as Add* functions utilize c->buf

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI - I talked with @roshkhatri. He agrees that the existing function aggregateClientOutputBuffer should be updated rather than creating a new function. His existing function assumed c->bufpos == 0 for his specific use case, but this does not have to be generally true. We can update that function.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, we can remove the check from aggregateClientOutputBuffer and we can add this check in generateClusterSlotResponse

we would not need to add this function for the same utility.

sds response = sdsempty();

/* First, collect from the fixed buffer if any */
if (c->bufpos > 0) {
response = sdscatlen(response, c->buf, c->bufpos);
}

/* Then, collect from the reply list */
listIter li;
listNode *ln;
clientReplyBlock *val_block;
listRewind(c->reply, &li);
while ((ln = listNext(&li)) != NULL) {
val_block = (clientReplyBlock *)listNodeValue(ln);
response = sdscatlen(response, val_block->buf, val_block->used);
}

return response;
}

/* Forward declaration */
void addReplyCommandInfo(client *c, struct serverCommand *cmd);
Comment on lines +5253 to +5254
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just move cacheCommandInfo after addReplyCommandInfo, avoiding the need for a forward declaration.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is a recursion to process subcommands


/* Generate and cache the command info response for a given protocol version */
static void cacheCommandInfo(struct serverCommand *cmd, int resp) {
client *caching_client = createCachedResponseClient(resp);

int firstkey = 0, lastkey = 0, keystep = 0;
if (cmd->legacy_range_key_spec.begin_search_type != KSPEC_BS_INVALID) {
firstkey = cmd->legacy_range_key_spec.bs.index.pos;
lastkey = cmd->legacy_range_key_spec.fk.range.lastkey;
if (lastkey >= 0) lastkey += firstkey;
keystep = cmd->legacy_range_key_spec.fk.range.keystep;
}

addReplyArrayLen(caching_client, 10);
addReplyBulkCBuffer(caching_client, cmd->fullname, sdslen(cmd->fullname));
addReplyLongLong(caching_client, cmd->arity);
addReplyFlagsForCommand(caching_client, cmd);
addReplyLongLong(caching_client, firstkey);
addReplyLongLong(caching_client, lastkey);
addReplyLongLong(caching_client, keystep);
addReplyCommandCategories(caching_client, cmd);
addReplyCommandTips(caching_client, cmd);
addReplyCommandKeySpecs(caching_client, cmd);
addReplyCommandSubCommands(caching_client, cmd, addReplyCommandInfo, 0);

if (resp == 2) {
cmd->info_cache_resp2 = collectCachedResponse(caching_client);
} else {
cmd->info_cache_resp3 = collectCachedResponse(caching_client);
}

deleteCachedResponseClient(caching_client);
}

/* Output the representation of a server command. Used by the COMMAND command and COMMAND INFO. */
void addReplyCommandInfo(client *c, struct serverCommand *cmd) {
if (!cmd) {
addReplyNull(c);
} else {
int firstkey = 0, lastkey = 0, keystep = 0;
if (cmd->legacy_range_key_spec.begin_search_type != KSPEC_BS_INVALID) {
firstkey = cmd->legacy_range_key_spec.bs.index.pos;
lastkey = cmd->legacy_range_key_spec.fk.range.lastkey;
if (lastkey >= 0) lastkey += firstkey;
keystep = cmd->legacy_range_key_spec.fk.range.keystep;
/* Use cached response if available for the client's protocol version */
sds cache = (c->resp == 2) ? cmd->info_cache_resp2 : cmd->info_cache_resp3;

if (cache == NULL) {
cacheCommandInfo(cmd, c->resp);
cache = (c->resp == 2) ? cmd->info_cache_resp2 : cmd->info_cache_resp3;
}

addReplyArrayLen(c, 10);
addReplyBulkCBuffer(c, cmd->fullname, sdslen(cmd->fullname));
addReplyLongLong(c, cmd->arity);
addReplyFlagsForCommand(c, cmd);
addReplyLongLong(c, firstkey);
addReplyLongLong(c, lastkey);
addReplyLongLong(c, keystep);
addReplyCommandCategories(c, cmd);
addReplyCommandTips(c, cmd);
addReplyCommandKeySpecs(c, cmd);
addReplyCommandSubCommands(c, cmd, addReplyCommandInfo, 0);

addReplyProto(c, cache, sdslen(cache));
}
}

Expand Down Expand Up @@ -5370,17 +5424,52 @@ void getKeysSubcommand(client *c) {
getKeysSubcommandImpl(c, 0);
}

/* COMMAND (no args) */
void commandCommand(client *c) {
/* Invalidate the cached COMMAND response when command table changes */
void invalidateCommandCache(void) {
if (server.command_response_cache_resp2) {
sdsfree(server.command_response_cache_resp2);
server.command_response_cache_resp2 = NULL;
}
if (server.command_response_cache_resp3) {
sdsfree(server.command_response_cache_resp3);
server.command_response_cache_resp3 = NULL;
}
}

/* Generate and cache the full COMMAND response */
static void cacheCommandResponse(int resp) {
client *caching_client = createCachedResponseClient(resp);

hashtableIterator iter;
void *next;
addReplyArrayLen(c, hashtableSize(server.commands));
addReplyArrayLen(caching_client, hashtableSize(server.commands));
hashtableInitIterator(&iter, server.commands, 0);
while (hashtableNext(&iter, &next)) {
struct serverCommand *cmd = next;
addReplyCommandInfo(c, cmd);
addReplyCommandInfo(caching_client, cmd);
}
hashtableResetIterator(&iter);

if (resp == 2) {
server.command_response_cache_resp2 = collectCachedResponse(caching_client);
} else {
server.command_response_cache_resp3 = collectCachedResponse(caching_client);
}

deleteCachedResponseClient(caching_client);
}

/* COMMAND (no args) */
void commandCommand(client *c) {
/* Use cached response if available for the client's protocol version */
sds cache = (c->resp == 2) ? server.command_response_cache_resp2 : server.command_response_cache_resp3;

if (cache == NULL) {
cacheCommandResponse(c->resp);
cache = (c->resp == 2) ? server.command_response_cache_resp2 : server.command_response_cache_resp3;
}

addReplyProto(c, cache, sdslen(cache));
}

/* COMMAND COUNT */
Expand Down
5 changes: 5 additions & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -1677,6 +1677,8 @@ struct valkeyServer {
serverDb **db; /* each db created when it's first used */
hashtable *commands; /* Command table */
hashtable *orig_commands; /* Command table before command renaming. */
sds command_response_cache_resp2; /* Cached COMMAND response for RESP2 */
sds command_response_cache_resp3; /* Cached COMMAND response for RESP3 */
Comment on lines +1680 to +1681
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add an sds array here?

aeEventLoop *el;
_Atomic AeIoState io_poll_state; /* Indicates the state of the IO polling. */
int io_ae_fired_events; /* Number of poll events received by the IO thread. */
Expand Down Expand Up @@ -2626,6 +2628,8 @@ struct serverCommand {
* (not the fullname), and the value is the serverCommand structure pointer. */
struct serverCommand *parent;
struct ValkeyModuleCommand *module_cmd; /* A pointer to the module command data (NULL if native command) */
sds info_cache_resp2; /* Cached COMMAND INFO response for RESP2 */
sds info_cache_resp3; /* Cached COMMAND INFO response for RESP3 */
Comment on lines +2631 to +2632
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would recommend building an array similar to cached_cluster_slot_info.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

};

struct serverError {
Expand Down Expand Up @@ -3783,6 +3787,7 @@ void commandCommand(client *c);
void commandCountCommand(client *c);
void commandListCommand(client *c);
void commandInfoCommand(client *c);
void invalidateCommandCache(void);
void commandGetKeysCommand(client *c);
void commandGetKeysAndFlagsCommand(client *c);
void commandHelpCommand(client *c);
Expand Down