Skip to content

Commit cb3e88b

Browse files
committed
kv/transport: Make publish+pin atomic; pin/unpin target latest version
• Add priskv_publish_node_with_pin(kv, keynode, pin_on_publish) and make priskv_publish_node delegate to it. Publishing now runs under the hash-bucket lock, inherits pin_count from the old version, removes the old visible node, inserts the new one, and cleans up the old outside the lock—ensuring a single visible version. • Replace deprecated priskv_key_pin with priskv_key_pin_latest; update semantics so both pin and unpin operate on the latest visible version under the bucket lock, with inline expiry cleanup on miss/expired. • Transport SEAL now calls priskv_publish_node_with_pin to remove the publish→pin race; ACQUIRE uses priskv_key_pin_latest; RELEASE uses priskv_key_unpin_latest. • Add concurrency tests (two-thread and stress) to verify single-version visibility and pin_count inheritance; extend negative/permission and failure-path coverage.
1 parent fbfa3e1 commit cb3e88b

File tree

4 files changed

+396
-79
lines changed

4 files changed

+396
-79
lines changed

server/kv.c

Lines changed: 138 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -451,11 +451,79 @@ static priskv_key *priskv_find_key(priskv_kv *kv, uint8_t *key, uint16_t keylen,
451451
return NULL;
452452
}
453453

454+
/*
455+
* Apply a delta to pin_count on the latest visible version of a key.
456+
* - Look up the latest version under the hash-bucket lock.
457+
* - If the key is expired, remove it and return NO_SUCH_KEY.
458+
* - For delta > 0: increment pin_count by delta.
459+
* - For delta < 0: decrement pin_count by |delta| if possible; otherwise return
460+
* PRISKV_RESP_STATUS_UNPIN_NOT_CLOSED without modifying pin_count.
461+
* - Does not update kv->pin_stats; the caller is responsible for stats accounting.
462+
*/
463+
static priskv_resp_status priskv_pin_count_delta_latest(priskv_kv *kv, uint8_t *key,
464+
uint16_t keylen, int32_t delta);
465+
454466
static void __priskv_del_key(priskv_kv *kv, priskv_key *keynode)
455467
{
456468
priskv_keynode_deref(keynode);
457469
}
458470

471+
static priskv_resp_status priskv_pin_count_delta_latest(priskv_kv *kv, uint8_t *key,
472+
uint16_t keylen, int32_t delta)
473+
{
474+
if (!kv || !key || !keylen) {
475+
return PRISKV_RESP_STATUS_SERVER_ERROR;
476+
}
477+
478+
/* Locate bucket and the latest visible node for the key */
479+
uint32_t crc = priskv_crc32(key, keylen);
480+
priskv_hash_head *hash_head = &kv->hash_heads[crc % kv->bucket_count];
481+
482+
struct timeval now;
483+
gettimeofday(&now, NULL);
484+
485+
pthread_spin_lock(&hash_head->lock);
486+
priskv_key *cur; priskv_key *latest = NULL;
487+
list_for_each (&hash_head->head, cur, entry) {
488+
if (cur->keylen == keylen && memcmp(cur->key, key, keylen) == 0) {
489+
latest = cur;
490+
break;
491+
}
492+
}
493+
494+
if (latest == NULL) {
495+
pthread_spin_unlock(&hash_head->lock);
496+
return PRISKV_RESP_STATUS_NO_SUCH_KEY;
497+
}
498+
499+
/* If expired, remove from hash while holding the bucket lock and cleanup outside */
500+
if (priskv_key_timeout(latest, now)) {
501+
list_del(&latest->entry);
502+
pthread_spin_unlock(&hash_head->lock);
503+
priskv_lru_del_key(latest);
504+
__priskv_del_key(kv, latest);
505+
return PRISKV_RESP_STATUS_NO_SUCH_KEY;
506+
}
507+
508+
priskv_resp_status resp = PRISKV_RESP_STATUS_OK;
509+
pthread_spin_lock(&latest->lock);
510+
if (delta >= 0) {
511+
latest->pin_count += (uint32_t)delta;
512+
} else {
513+
uint32_t need = (uint32_t)(-delta);
514+
if (latest->pin_count >= need) {
515+
latest->pin_count -= need;
516+
} else {
517+
/* Not enough pins closed by the caller */
518+
resp = PRISKV_RESP_STATUS_UNPIN_NOT_CLOSED;
519+
}
520+
}
521+
pthread_spin_unlock(&latest->lock);
522+
pthread_spin_unlock(&hash_head->lock);
523+
524+
return resp;
525+
}
526+
459527
int priskv_get_key_for_seal(void *_kv, uint8_t *key, uint16_t keylen, uint8_t **val,
460528
uint32_t *valuelen, void **_keynode)
461529
{
@@ -722,39 +790,81 @@ int priskv_alloc_node_private(void *_kv, uint8_t *key, uint16_t keylen, uint8_t
722790
* new node into the hash table.
723791
* - Join LRU and clear inprocess so it becomes readable (ACQUIRE/GET).
724792
*/
725-
int priskv_publish_node(void *_kv, void *_keynode)
793+
int priskv_publish_node_with_pin(void *_kv, void *_keynode, bool pin_on_publish)
726794
{
727795
priskv_kv *kv = _kv;
728796
priskv_key *keynode = (priskv_key *)_keynode;
729-
priskv_key *old_keynode;
797+
priskv_key *old_keynode = NULL;
730798

731799
if (!keynode || keynode->kv != kv) {
732800
return PRISKV_RESP_STATUS_SERVER_ERROR;
733801
}
734802

735-
old_keynode = priskv_find_key(kv, (uint8_t *)keynode->key, keynode->keylen,
736-
PRISKV_KEY_MAX_TIMEOUT, true, NULL);
803+
/* Atomically replace the visible version under the hash-bucket lock to avoid
804+
* a window where multiple versions can be inserted concurrently. */
805+
uint32_t crc = priskv_crc32(keynode->key, keynode->keylen);
806+
priskv_hash_head *hash_head = &kv->hash_heads[crc % kv->bucket_count];
807+
808+
pthread_spin_lock(&hash_head->lock);
809+
/* Find existing visible key (if any) */
810+
priskv_key *iter;
811+
list_for_each (&hash_head->head, iter, entry) {
812+
if (iter->keylen == keynode->keylen &&
813+
memcmp(iter->key, keynode->key, keynode->keylen) == 0) {
814+
old_keynode = iter;
815+
break;
816+
}
817+
}
818+
819+
/* Inherit pin_count from old version before making the new one visible. */
737820
if (old_keynode) {
738-
/* inherit pin_count from old version to keep lifecycle semantics */
739821
pthread_spin_lock(&old_keynode->lock);
740822
uint32_t old_pins = old_keynode->pin_count;
741823
pthread_spin_unlock(&old_keynode->lock);
742824

743825
pthread_spin_lock(&keynode->lock);
744826
keynode->pin_count = old_pins;
745827
pthread_spin_unlock(&keynode->lock);
746-
priskv_lru_del_key(old_keynode);
747-
__priskv_del_key(kv, old_keynode);
828+
829+
/* Remove old from hash list (visibility) while holding the bucket lock. */
830+
list_del(&old_keynode->entry);
831+
} else {
832+
/* No old version: initialize pin_count to 0 for safety. */
833+
pthread_spin_lock(&keynode->lock);
834+
keynode->pin_count = 0;
835+
pthread_spin_unlock(&keynode->lock);
748836
}
749837

750-
/* Insert new node and add to LRU */
751-
priskv_insert_keynode(kv, keynode);
838+
/* Optional: pin on publish within the same critical section to ensure atomicity
839+
* relative to visibility. */
840+
if (pin_on_publish) {
841+
pthread_spin_lock(&keynode->lock);
842+
keynode->pin_count++;
843+
pthread_spin_unlock(&keynode->lock);
844+
}
845+
846+
/* Insert new node into hash list under the same lock to ensure single visible version. */
847+
list_add_tail(&hash_head->head, &keynode->entry);
848+
pthread_spin_unlock(&hash_head->lock);
849+
850+
/* Add new node to LRU and finalize publish state. */
752851
priskv_lru_access(keynode, false);
753852
keynode->inprocess = false;
754853

854+
/* Cleanup the old version outside the bucket lock. */
855+
if (old_keynode) {
856+
priskv_lru_del_key(old_keynode);
857+
__priskv_del_key(kv, old_keynode);
858+
}
859+
755860
return PRISKV_RESP_STATUS_OK;
756861
}
757862

863+
int priskv_publish_node(void *_kv, void *_keynode)
864+
{
865+
return priskv_publish_node_with_pin(_kv, _keynode, false);
866+
}
867+
758868
/*
759869
* Drop for ALLOC-private nodes (unpublished):
760870
* - This function expects a private keynode allocated via priskv_alloc_node_private.
@@ -782,22 +892,6 @@ int priskv_drop_node(void *_kv, void *_keynode)
782892
return PRISKV_RESP_STATUS_OK;
783893
}
784894

785-
/* Increment pin_count on the given keynode. */
786-
int priskv_key_pin(void *_kv, void *_keynode)
787-
{
788-
priskv_kv *kv = (priskv_kv *)_kv;
789-
priskv_key *keynode = (priskv_key *)_keynode;
790-
if (!keynode) {
791-
return PRISKV_RESP_STATUS_SERVER_ERROR;
792-
}
793-
pthread_spin_lock(&keynode->lock);
794-
keynode->pin_count++;
795-
pthread_spin_unlock(&keynode->lock);
796-
if (kv) {
797-
kv->pin_stats.pin_ops++;
798-
}
799-
return PRISKV_RESP_STATUS_OK;
800-
}
801895

802896
/* Decrement pin_count on the latest version of the key corresponding to keynode. */
803897
int priskv_key_unpin_latest(void *_kv, void *_keynode)
@@ -808,39 +902,30 @@ int priskv_key_unpin_latest(void *_kv, void *_keynode)
808902
return PRISKV_RESP_STATUS_SERVER_ERROR;
809903
}
810904

811-
uint8_t *key = node->key;
812-
uint16_t keylen = node->keylen;
813-
bool expired = false;
814-
priskv_key *latest = priskv_find_key(kv, key, keylen, PRISKV_KEY_MAX_TIMEOUT, false, &expired);
815-
if (!latest || expired) {
816-
/* latest not found or expired already; nothing to unpin */
817-
if (kv) {
818-
kv->pin_stats.unpin_ops++;
819-
}
820-
return PRISKV_RESP_STATUS_NO_SUCH_KEY;
905+
priskv_resp_status resp = priskv_pin_count_delta_latest(kv, node->key, node->keylen, -1);
906+
907+
/* Stats: count every UNPIN attempt; record not-closed cases. */
908+
kv->pin_stats.unpin_ops++;
909+
if (resp == PRISKV_RESP_STATUS_UNPIN_NOT_CLOSED) {
910+
kv->pin_stats.unpin_not_closed++;
821911
}
912+
return resp;
913+
}
822914

823-
pthread_spin_lock(&latest->lock);
824-
if (latest->pin_count > 0) {
825-
latest->pin_count--;
826-
} else {
827-
/* indicate unpin is not closed (mismatched) */
828-
pthread_spin_unlock(&latest->lock);
829-
priskv_keynode_deref(latest);
830-
if (kv) {
831-
kv->pin_stats.unpin_ops++;
832-
kv->pin_stats.unpin_not_closed++;
833-
}
834-
return PRISKV_RESP_STATUS_UNPIN_NOT_CLOSED;
915+
/* Increment pin_count on the latest visible version of the key corresponding to keynode. */
916+
int priskv_key_pin_latest(void *_kv, void *_keynode)
917+
{
918+
priskv_kv *kv = (priskv_kv *)_kv;
919+
priskv_key *node = (priskv_key *)_keynode;
920+
if (!kv || !node) {
921+
return PRISKV_RESP_STATUS_SERVER_ERROR;
835922
}
836-
pthread_spin_unlock(&latest->lock);
837923

838-
/* release the temporary ref from priskv_find_key */
839-
priskv_keynode_deref(latest);
840-
if (kv) {
841-
kv->pin_stats.unpin_ops++;
924+
priskv_resp_status resp = priskv_pin_count_delta_latest(kv, node->key, node->keylen, +1);
925+
if (resp == PRISKV_RESP_STATUS_OK) {
926+
kv->pin_stats.pin_ops++;
842927
}
843-
return PRISKV_RESP_STATUS_OK;
928+
return resp;
844929
}
845930

846931
uint64_t priskv_get_pin_ops(void *_kv)

server/kv.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -159,11 +159,13 @@ void priskv_key_serialize_exit(struct priskv_tiering_req *completed_req);
159159
int priskv_alloc_node_private(void *_kv, uint8_t *key, uint16_t keylen, uint8_t **val,
160160
uint32_t alloc_length, uint64_t timeout, void **_keynode);
161161
int priskv_publish_node(void *_kv, void *_keynode);
162+
/* Atomically publish and optionally pin on publish (for SEAL with PIN). */
163+
int priskv_publish_node_with_pin(void *_kv, void *_keynode, bool pin_on_publish);
162164
int priskv_drop_node(void *_kv, void *_keynode);
163165

164-
/* Pin/Unpin controls for lifecycle protection */
165-
int priskv_key_pin(void *_kv, void *_keynode);
166166
int priskv_key_unpin_latest(void *_kv, void *_keynode);
167+
/* Pin on the latest version of the key corresponding to keynode */
168+
int priskv_key_pin_latest(void *_kv, void *_keynode);
167169

168170
/* Pin/Unpin observability */
169171
uint64_t priskv_get_pin_ops(void *_kv);

0 commit comments

Comments
 (0)