Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions src/acl.c
Original file line number Diff line number Diff line change
Expand Up @@ -649,7 +649,7 @@ static void ACLChangeSelectorPerm(aclSelector *selector, struct serverCommand *c
struct serverCommand *sub = next;
ACLSetSelectorCommandBit(selector, sub->id, allow);
}
hashtableResetIterator(&iter);
hashtableCleanupIterator(&iter);
}
}

Expand All @@ -672,7 +672,7 @@ static void ACLSetSelectorCommandBitsForCategory(hashtable *commands, aclSelecto
ACLSetSelectorCommandBitsForCategory(cmd->subcommands_ht, selector, cflag, value);
}
}
hashtableResetIterator(&iter);
hashtableCleanupIterator(&iter);
}

/* This function is responsible for recomputing the command bits for all selectors of the existing users.
Expand Down Expand Up @@ -1925,7 +1925,7 @@ static int ACLShouldKillPubsubClient(client *c, list *upcoming) {
int res = ACLCheckChannelAgainstList(upcoming, o->ptr, sdslen(o->ptr), 1);
kill = (res == ACL_DENIED_CHANNEL);
}
hashtableResetIterator(&iter);
hashtableCleanupIterator(&iter);

/* Check for channel violations. */
if (!kill) {
Expand All @@ -1938,7 +1938,7 @@ static int ACLShouldKillPubsubClient(client *c, list *upcoming) {
int res = ACLCheckChannelAgainstList(upcoming, o->ptr, sdslen(o->ptr), 0);
kill = (res == ACL_DENIED_CHANNEL);
}
hashtableResetIterator(&iter);
hashtableCleanupIterator(&iter);
}
if (!kill) {
/* Check for shard channels violation. */
Expand All @@ -1950,7 +1950,7 @@ static int ACLShouldKillPubsubClient(client *c, list *upcoming) {
int res = ACLCheckChannelAgainstList(upcoming, o->ptr, sdslen(o->ptr), 0);
kill = (res == ACL_DENIED_CHANNEL);
}
hashtableResetIterator(&iter);
hashtableCleanupIterator(&iter);
}

if (kill) {
Expand Down Expand Up @@ -2744,7 +2744,7 @@ static void aclCatWithFlags(client *c, hashtable *commands, uint64_t cflag, int
aclCatWithFlags(c, cmd->subcommands_ht, cflag, arraylen);
}
}
hashtableResetIterator(&iter);
hashtableCleanupIterator(&iter);
}

/* Add the formatted response from a single selector to the ACL GETUSER
Expand Down
6 changes: 3 additions & 3 deletions src/aof.c
Original file line number Diff line number Diff line change
Expand Up @@ -1906,19 +1906,19 @@ int rewriteSortedSetObject(rio *r, robj *key, robj *o) {

if (!rioWriteBulkCount(r, '*', 2 + cmd_items * 2) || !rioWriteBulkString(r, "ZADD", 4) ||
!rioWriteBulkObject(r, key)) {
hashtableResetIterator(&iter);
hashtableCleanupIterator(&iter);
return 0;
}
}
sds ele = node->ele;
if (!rioWriteBulkDouble(r, node->score) || !rioWriteBulkString(r, ele, sdslen(ele))) {
hashtableResetIterator(&iter);
hashtableCleanupIterator(&iter);
return 0;
}
if (++count == AOF_REWRITE_ITEMS_PER_CMD) count = 0;
items--;
}
hashtableResetIterator(&iter);
hashtableCleanupIterator(&iter);
} else {
serverPanic("Unknown sorted zset encoding");
}
Expand Down
2 changes: 1 addition & 1 deletion src/debug.c
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ void xorObjectDigest(serverDb *db, robj *keyobj, unsigned char *digest, robj *o)
mixDigest(eledigest, buf, strlen(buf));
xorDigest(digest, eledigest, 20);
}
hashtableResetIterator(&iter);
hashtableCleanupIterator(&iter);
} else {
serverPanic("Unknown sorted set encoding");
}
Expand Down
2 changes: 1 addition & 1 deletion src/defrag.c
Original file line number Diff line number Diff line change
Expand Up @@ -775,7 +775,7 @@ static void defragPubsubScanCallback(void *privdata, void *elemref) {
bool replaced = hashtableReplaceReallocatedEntry(client_channels, channel, newchannel);
serverAssert(replaced);
}
hashtableResetIterator(&iter);
hashtableCleanupIterator(&iter);
}

/* Try to defrag the dictionary of clients that is stored as the value part. */
Expand Down
77 changes: 65 additions & 12 deletions src/hashtable.c
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,9 @@ typedef struct hashtableBucket {
/* A key property is that the bucket size is one cache line. */
static_assert(sizeof(bucket) == HASHTABLE_BUCKET_SIZE, "Bucket size mismatch");

/* Forward declaration for iter type */
typedef struct iter iter;

struct hashtable {
hashtableType *type;
ssize_t rehash_idx; /* -1 = rehashing not in progress. */
Expand All @@ -293,10 +296,11 @@ struct hashtable {
int16_t pause_rehash; /* Non-zero = rehashing is paused */
int16_t pause_auto_shrink; /* Non-zero = automatic resizing disallowed. */
size_t child_buckets[2]; /* Number of allocated child buckets. */
iter *safe_iterators; /* Head of linked list of safe iterators */
void *metadata[];
};

typedef struct {
struct iter {
hashtable *hashtable;
bucket *bucket;
long index;
Expand All @@ -309,7 +313,8 @@ typedef struct {
/* Safe iterator temporary storage for bucket chain compaction. */
uint64_t last_seen_size;
};
} iter;
iter *next_safe_iter; /* Next safe iterator in hashtable's list */
};

/* The opaque hashtableIterator is defined as a blob of bytes. */
static_assert(sizeof(hashtableIterator) >= sizeof(iter),
Expand Down Expand Up @@ -1084,6 +1089,37 @@ static inline int shouldPrefetchValues(iter *iter) {
return (iter->flags & HASHTABLE_ITER_PREFETCH_VALUES);
}

/* Add a safe iterator to the hashtable's tracking list */
static void trackSafeIterator(iter *it) {
assert(it->next_safe_iter == NULL);
hashtable *ht = it->hashtable;
it->next_safe_iter = ht->safe_iterators;
ht->safe_iterators = it;
}

/* Remove a safe iterator from the hashtable's tracking list */
static void untrackSafeIterator(iter *it) {
hashtable *ht = it->hashtable;
if (ht->safe_iterators == it) {
ht->safe_iterators = it->next_safe_iter;
} else {
iter *current = ht->safe_iterators;
assert(current != NULL);
while (current->next_safe_iter != it) {
current = current->next_safe_iter;
assert(current != NULL);
}
current->next_safe_iter = it->next_safe_iter;
}
it->next_safe_iter = NULL;
it->hashtable = NULL; /* Mark as invalid */
}

/* Invalidate all safe iterators by setting hashtable = NULL */
static void invalidateAllSafeIterators(hashtable *ht) {
while (ht->safe_iterators) untrackSafeIterator(ht->safe_iterators);
}

/* --- API functions --- */

/* Allocates and initializes a new hashtable specified by the given type. */
Expand All @@ -1098,6 +1134,7 @@ hashtable *hashtableCreate(hashtableType *type) {
ht->rehash_idx = -1;
ht->pause_rehash = 0;
ht->pause_auto_shrink = 0;
ht->safe_iterators = NULL;
resetTable(ht, 0);
resetTable(ht, 1);
if (type->trackMemUsage) type->trackMemUsage(ht, alloc_size);
Expand Down Expand Up @@ -1153,6 +1190,7 @@ void hashtableEmpty(hashtable *ht, void(callback)(hashtable *)) {

/* Deletes all the entries and frees the table. */
void hashtableRelease(hashtable *ht) {
invalidateAllSafeIterators(ht);
hashtableEmpty(ht, NULL);
/* Call trackMemUsage before zfree, so trackMemUsage can access ht. */
if (ht->type->trackMemUsage) {
Expand Down Expand Up @@ -1242,6 +1280,7 @@ static void hashtablePauseRehashing(hashtable *ht) {
/* Resumes incremental rehashing, after pausing it. */
static void hashtableResumeRehashing(hashtable *ht) {
ht->pause_rehash--;
assert(ht->pause_rehash >= 0);
hashtableResumeAutoShrink(ht);
}

Expand Down Expand Up @@ -1962,7 +2001,9 @@ size_t hashtableScanDefrag(hashtable *ht, size_t cursor, hashtableScanFunction f
* rehashing to prevent entries from moving around. It's allowed to insert and
* replace entries. Deleting entries is only allowed for the entry that was just
* returned by hashtableNext. Deleting other entries is possible, but doing so
* can cause internal fragmentation, so don't.
* can cause internal fragmentation, so don't. The hash table itself can be
* safely deleted while safe iterators exist - they will be invalidated and
* subsequent calls to hashtableNext will return false.
*
* Guarantees for safe iterators:
*
Expand All @@ -1978,7 +2019,7 @@ size_t hashtableScanDefrag(hashtable *ht, size_t cursor, hashtableScanFunction f
* - Entries that are inserted during the iteration may or may not be returned
* by the iterator.
*
* Call hashtableNext to fetch each entry. You must call hashtableResetIterator
* Call hashtableNext to fetch each entry. You must call hashtableCleanupIterator
* when you are done with the iterator.
*/
void hashtableInitIterator(hashtableIterator *iterator, hashtable *ht, uint8_t flags) {
Expand All @@ -1988,22 +2029,31 @@ void hashtableInitIterator(hashtableIterator *iterator, hashtable *ht, uint8_t f
iter->table = 0;
iter->index = -1;
iter->flags = flags;
iter->next_safe_iter = NULL;
if (isSafe(iter) && ht != NULL) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can ht be NULL here? Maybe put an assertion at the top if you want to check this.

Copy link
Contributor Author

@rainsupreme rainsupreme Nov 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kvstore's iterator contains a hashtableIterator, and in kvstoreIteratorInit() they want to init their hashtableIterator to something and redo it later when they decide which hashtable to actually start with. Or that was my impression. I discovered this from a failed tcl test 😅

trackSafeIterator(iter);
}
}

/* Reinitializes the iterator for the provided hashtable while
* preserving the flags from its previous initialization. */
void hashtableReinitIterator(hashtableIterator *iterator, hashtable *ht) {
/* Reinitializes the iterator to begin a new iteration of the provided hashtable
* while preserving the flags from its previous initialization. */
void hashtableRetargetIterator(hashtableIterator *iterator, hashtable *ht) {
iter *iter = iteratorFromOpaque(iterator);
hashtableInitIterator(iterator, ht, iter->flags);
uint8_t flags = iter->flags;

hashtableCleanupIterator(iterator);
hashtableInitIterator(iterator, ht, flags);
}

/* Resets a stack-allocated iterator. */
void hashtableResetIterator(hashtableIterator *iterator) {
/* Performs required cleanup for a stack-allocated iterator. */
void hashtableCleanupIterator(hashtableIterator *iterator) {
iter *iter = iteratorFromOpaque(iterator);
if (iter->hashtable == NULL) return;

if (!(iter->index == -1 && iter->table == 0)) {
if (isSafe(iter)) {
hashtableResumeRehashing(iter->hashtable);
assert(iter->hashtable->pause_rehash >= 0);
untrackSafeIterator(iter);
} else {
assert(iter->fingerprint == hashtableFingerprint(iter->hashtable));
}
Expand All @@ -2021,7 +2071,7 @@ hashtableIterator *hashtableCreateIterator(hashtable *ht, uint8_t flags) {
/* Resets and frees the memory of an allocated iterator, i.e. one created using
* hashtableCreate(Safe)Iterator. */
void hashtableReleaseIterator(hashtableIterator *iterator) {
hashtableResetIterator(iterator);
hashtableCleanupIterator(iterator);
iter *iter = iteratorFromOpaque(iterator);
zfree(iter);
}
Expand All @@ -2030,6 +2080,9 @@ void hashtableReleaseIterator(hashtableIterator *iterator) {
* Returns false if there are no more entries. */
bool hashtableNext(hashtableIterator *iterator, void **elemptr) {
iter *iter = iteratorFromOpaque(iterator);
/* Check if iterator has been invalidated */
if (iter->hashtable == NULL) return false;

while (1) {
if (iter->index == -1 && iter->table == 0) {
/* It's the first call to next. */
Expand Down
6 changes: 3 additions & 3 deletions src/hashtable.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ typedef struct hashtable hashtable;
typedef struct hashtableStats hashtableStats;

/* Can types that can be stack allocated. */
typedef uint64_t hashtableIterator[5];
typedef uint64_t hashtableIterator[6];
typedef uint64_t hashtablePosition[2];
typedef uint64_t hashtableIncrementalFindState[5];

Expand Down Expand Up @@ -160,8 +160,8 @@ bool hashtableIncrementalFindGetResult(hashtableIncrementalFindState *state, voi
size_t hashtableScan(hashtable *ht, size_t cursor, hashtableScanFunction fn, void *privdata);
size_t hashtableScanDefrag(hashtable *ht, size_t cursor, hashtableScanFunction fn, void *privdata, void *(*defragfn)(void *), int flags);
void hashtableInitIterator(hashtableIterator *iter, hashtable *ht, uint8_t flags);
void hashtableReinitIterator(hashtableIterator *iterator, hashtable *ht);
void hashtableResetIterator(hashtableIterator *iter);
void hashtableRetargetIterator(hashtableIterator *iterator, hashtable *ht);
void hashtableCleanupIterator(hashtableIterator *iter);
hashtableIterator *hashtableCreateIterator(hashtable *ht, uint8_t flags);
void hashtableReleaseIterator(hashtableIterator *iter);
bool hashtableNext(hashtableIterator *iter, void **elemptr);
Expand Down
8 changes: 4 additions & 4 deletions src/kvstore.c
Original file line number Diff line number Diff line change
Expand Up @@ -628,7 +628,7 @@ kvstoreIterator *kvstoreIteratorInit(kvstore *kvs, uint8_t flags) {
/* Free the kvs_it returned by kvstoreIteratorInit. */
void kvstoreIteratorRelease(kvstoreIterator *kvs_it) {
hashtableIterator *iter = &kvs_it->di;
hashtableResetIterator(iter);
hashtableCleanupIterator(iter);
/* In the safe iterator context, we may delete entries. */
if (kvs_it->didx != KVSTORE_INDEX_NOT_FOUND) {
freeHashtableIfNeeded(kvs_it->kvs, kvs_it->didx);
Expand Down Expand Up @@ -672,7 +672,7 @@ static hashtable *kvstoreIteratorNextHashtable(kvstoreIterator *kvs_it) {
if (kvs_it->didx != KVSTORE_INDEX_NOT_FOUND && kvstoreGetHashtable(kvs_it->kvs, kvs_it->didx)) {
/* Before we move to the next hashtable, reset the iter of the previous hashtable. */
hashtableIterator *iter = &kvs_it->di;
hashtableResetIterator(iter);
hashtableCleanupIterator(iter);
/* In the safe iterator context, we may delete entries. */
freeHashtableIfNeeded(kvs_it->kvs, kvs_it->didx);
}
Expand All @@ -697,7 +697,7 @@ bool kvstoreIteratorNext(kvstoreIterator *kvs_it, void **next) {
/* No current hashtable or reached the end of the hash table. */
hashtable *ht = kvstoreIteratorNextHashtable(kvs_it);
if (!ht) return false;
hashtableReinitIterator(&kvs_it->di, ht);
hashtableRetargetIterator(&kvs_it->di, ht);
return hashtableNext(&kvs_it->di, next);
}
}
Expand Down Expand Up @@ -779,7 +779,7 @@ kvstoreHashtableIterator *kvstoreGetHashtableIterator(kvstore *kvs, int didx, ui
void kvstoreReleaseHashtableIterator(kvstoreHashtableIterator *kvs_di) {
/* The hashtable may be deleted during the iteration process, so here need to check for NULL. */
if (kvstoreGetHashtable(kvs_di->kvs, kvs_di->didx)) {
hashtableResetIterator(&kvs_di->di);
hashtableCleanupIterator(&kvs_di->di);
/* In the safe iterator context, we may delete entries. */
freeHashtableIfNeeded(kvs_di->kvs, kvs_di->didx);
}
Expand Down
4 changes: 2 additions & 2 deletions src/latency.c
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,7 @@ void latencyAllCommandsFillCDF(client *c, hashtable *commands, int *command_with
latencyAllCommandsFillCDF(c, cmd->subcommands_ht, command_with_data);
}
}
hashtableResetIterator(&iter);
hashtableCleanupIterator(&iter);
}

/* latencyCommand() helper to produce for a specific command set,
Expand Down Expand Up @@ -580,7 +580,7 @@ void latencySpecificCommandsFillCDF(client *c) {
command_with_data++;
}
}
hashtableResetIterator(&iter);
hashtableCleanupIterator(&iter);
}
}
setDeferredMapLen(c, replylen, command_with_data);
Expand Down
4 changes: 2 additions & 2 deletions src/module.c
Original file line number Diff line number Diff line change
Expand Up @@ -12520,7 +12520,7 @@ int moduleFreeCommand(struct ValkeyModule *module, struct serverCommand *cmd) {
sdsfree(sub->fullname);
zfree(sub);
}
hashtableResetIterator(&iter);
hashtableCleanupIterator(&iter);
hashtableRelease(cmd->subcommands_ht);
}

Expand All @@ -12544,7 +12544,7 @@ void moduleUnregisterCommands(struct ValkeyModule *module) {
sdsfree(cmd->fullname);
zfree(cmd);
}
hashtableResetIterator(&iter);
hashtableCleanupIterator(&iter);
}

/* We parse argv to add sds "NAME VALUE" pairs to the server.module_configs_queue list of configs.
Expand Down
8 changes: 4 additions & 4 deletions src/object.c
Original file line number Diff line number Diff line change
Expand Up @@ -643,7 +643,7 @@ void dismissSetObject(robj *o, size_t size_hint) {
sds item = next;
dismissSds(item);
}
hashtableResetIterator(&iter);
hashtableCleanupIterator(&iter);
}

dismissHashtable(ht);
Expand Down Expand Up @@ -694,7 +694,7 @@ void dismissHashObject(robj *o, size_t size_hint) {
while (hashtableNext(&iter, &next)) {
entryDismissMemory(next);
}
hashtableResetIterator(&iter);
hashtableCleanupIterator(&iter);
}

dismissHashtable(ht);
Expand Down Expand Up @@ -1171,7 +1171,7 @@ size_t objectComputeSize(robj *key, robj *o, size_t sample_size, int dbid) {
elesize += sdsAllocSize(element);
samples++;
}
hashtableResetIterator(&iter);
hashtableCleanupIterator(&iter);
if (samples) asize += (double)elesize / samples * hashtableSize(ht);
} else if (o->encoding == OBJ_ENCODING_INTSET) {
asize += zmalloc_size(o->ptr);
Expand Down Expand Up @@ -1214,7 +1214,7 @@ size_t objectComputeSize(robj *key, robj *o, size_t sample_size, int dbid) {
elesize += entryMemUsage(next);
samples++;
}
hashtableResetIterator(&iter);
hashtableCleanupIterator(&iter);
if (samples) asize += (double)elesize / samples * hashtableSize(ht);
if (vsetIsValid(volatile_fields)) asize += vsetMemUsage(volatile_fields);
} else {
Expand Down
Loading
Loading