Skip to content

Commit a403248

Browse files
authored
[feature] support pin/unpin for P/D (#36)
Signed-off-by: wangyi.ywq@bytedance.com
1 parent d7fcca2 commit a403248

File tree

24 files changed

+774
-86
lines changed

24 files changed

+774
-86
lines changed

client/benchmark.c

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -933,7 +933,8 @@ static void zc_acquire_cb(uint64_t rid, priskv_status status, void *result)
933933
zctx->pctx->job->mm->memcpy(zctx->value, (void *)region->addr, region->length);
934934
zctx->token = region->token;
935935
zctx->pctx->job->last_stage = "RELEASE";
936-
priskv_release_async(zctx->pctx->client, &zctx->token, (uint64_t)zctx, zc_release_cb);
936+
priskv_release_async(zctx->pctx->client, &zctx->token, false /* unpin_on_release */, (uint64_t)zctx,
937+
zc_release_cb);
937938
}
938939

939940
static void zc_seal_cb(uint64_t rid, priskv_status status, void *result)
@@ -962,7 +963,8 @@ static void zc_alloc_cb(uint64_t rid, priskv_status status, void *result)
962963
zctx->pctx->job->mm->memcpy((void *)region->addr, zctx->value, copy_len);
963964
zctx->token = region->token;
964965
zctx->pctx->job->last_stage = "SEAL";
965-
priskv_seal_async(zctx->pctx->client, &zctx->token, (uint64_t)zctx, zc_seal_cb);
966+
priskv_seal_async(zctx->pctx->client, &zctx->token, false /* pin_on_seal */, (uint64_t)zctx,
967+
zc_seal_cb);
966968
}
967969

968970
/* ZeroCopy DROP callback is no longer used (published keys use DELETE semantics) */
@@ -994,8 +996,8 @@ static void priskv_drv_get(void *ctx, const char *key, void *value, uint32_t val
994996
zctx->value = value;
995997
zctx->value_len = value_len;
996998
priskv_ctx->job->last_stage = "ACQUIRE";
997-
priskv_acquire_async(priskv_ctx->client, key, PRISKV_KEY_MAX_TIMEOUT, (uint64_t)zctx,
998-
zc_acquire_cb);
999+
priskv_acquire_async(priskv_ctx->client, key, PRISKV_KEY_MAX_TIMEOUT, false /* pin_on_acquire */,
1000+
(uint64_t)zctx, zc_acquire_cb);
9991001
return;
10001002
}
10011003
/* Remove duplicate ZeroCopy GET branch */

client/client.c

Lines changed: 82 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
#include <stdlib.h>
3333
#include <unistd.h>
3434
#include <getopt.h>
35+
#include <inttypes.h>
3536

3637
#include "priskv.h"
3738
#include "priskv-log.h"
@@ -154,6 +155,7 @@ static void set_handler_base(client_context *ctx, char *args, bool alloc)
154155
{
155156
char *key, *value, *opt, *opt_val, *str_end;
156157
uint64_t expire_time_ms = 0;
158+
bool pin_on_seal = false;
157159
size_t valuelen;
158160
priskv_sgl sgl;
159161
priskv_status status;
@@ -194,6 +196,9 @@ static void set_handler_base(client_context *ctx, char *args, bool alloc)
194196
if (!strcmp(opt, "EX")) {
195197
expire_time_ms *= 1000;
196198
}
199+
} else if (!strcmp(opt, "PIN")) {
200+
/* Redis-style: allow 'PIN' to indicate pin-on-seal */
201+
pin_on_seal = true;
197202
} else {
198203
printf("%s\n", invalid_args_msg);
199204
return;
@@ -217,7 +222,7 @@ static void set_handler_base(client_context *ctx, char *args, bool alloc)
217222
printf("ALLOC_SET status(%d): %s, addr %p, length %u, token 0x%lx\n", status,
218223
priskv_status_str(status), (void *)region.addr, region.length, region.token);
219224
memcpy((void *)region.addr, value, (size_t)region.length);
220-
status = priskv_seal(ctx->client, &region.token);
225+
status = priskv_seal(ctx->client, &region.token, pin_on_seal);
221226
if (status != PRISKV_STATUS_OK) {
222227
printf("Failed to SEAL, status(%d): %s\n", status, priskv_status_str(status));
223228
return;
@@ -269,7 +274,8 @@ static void get_handler_base(client_context *ctx, char *args, bool acquire)
269274
if (acquire) {
270275
priskv_memory_region region = {0};
271276
printf("ACQUIRE key=%s\n", key);
272-
status = priskv_acquire(ctx->client, key, PRISKV_KEY_MAX_TIMEOUT, &region);
277+
/* Do not pin on acquire by default from CLI */
278+
status = priskv_acquire(ctx->client, key, PRISKV_KEY_MAX_TIMEOUT, false, &region);
273279
if (status != PRISKV_STATUS_OK) {
274280
printf("Failed to GET, status(%d): %s\n", status, priskv_status_str(status));
275281
return;
@@ -279,7 +285,8 @@ static void get_handler_base(client_context *ctx, char *args, bool acquire)
279285
printf("ACQUIRE GET status(%d): %s\n", status, priskv_status_str(status));
280286
printf("ACQUIRE GET value[%u]=%s\n", region.length, (char *)ctx->buf);
281287

282-
status = priskv_release(ctx->client, &region.token);
288+
/* Do not unpin on release by default from CLI */
289+
status = priskv_release(ctx->client, &region.token, false);
283290
if (status != PRISKV_STATUS_OK) {
284291
printf("Failed to RELEASE, status(%d): %s\n", status, priskv_status_str(status));
285292
return;
@@ -384,6 +391,7 @@ static void seal_token_handler(client_context *ctx, char *args)
384391
char *tokstr, *str_end;
385392
uint64_t token = 0;
386393
priskv_status status;
394+
bool pin_on_seal = false;
387395

388396
tokstr = strtok_r(args, " ", &args);
389397
if (!tokstr) {
@@ -400,7 +408,30 @@ static void seal_token_handler(client_context *ctx, char *args)
400408
return;
401409
}
402410
}
403-
status = priskv_seal(ctx->client, &token);
411+
/* Parse optional flags: 'PIN [TTL <ms>]' */
412+
while (args && strlen(args) > 0) {
413+
char *flag = strtok_r(args, " ", &args);
414+
if (!strcmp(flag, "PIN")) {
415+
pin_on_seal = true;
416+
} else if (!strcmp(flag, "TTL")) {
417+
/* TODO(wangyi): TTL passthrough is not implemented; parse and discard for now */
418+
char *ttl = strtok_r(args, " ", &args);
419+
if (!ttl || !strlen(ttl)) {
420+
printf("%s\n", invalid_args_msg);
421+
return;
422+
}
423+
} else {
424+
printf("%s\n", invalid_args_msg);
425+
return;
426+
}
427+
}
428+
429+
printf("SEAL token=%" PRIu64 " [PIN=%d]\n", token, pin_on_seal);
430+
/* TODO(wangyi): Support per-command TTL for pin-on-seal (e.g., 'PIN TTL N')
431+
* - Parse TTL value and pass through protocol once pin_ttl_ms is supported.
432+
* - Default to server-side TTL when not provided.
433+
*/
434+
status = priskv_seal(ctx->client, &token, pin_on_seal);
404435
printf("SEAL status(%d): %s\n", status, priskv_status_str(status));
405436
}
406437

@@ -409,15 +440,38 @@ static void acquire_only_handler(client_context *ctx, char *args)
409440
char *key;
410441
priskv_status status;
411442
priskv_memory_region region = {0};
443+
bool pin_on_acquire = false;
412444

413445
key = strtok_r(args, " ", &args);
414446
if (!key) {
415447
printf("%s\n", invalid_args_msg);
416448
return;
417449
}
418450

419-
printf("ACQUIRE key=%s\n", key);
420-
status = priskv_acquire(ctx->client, key, PRISKV_KEY_MAX_TIMEOUT, &region);
451+
/* Parse optional flags: 'PIN [TTL <ms>]' */
452+
while (args && strlen(args) > 0) {
453+
char *flag = strtok_r(args, " ", &args);
454+
if (!strcmp(flag, "PIN")) {
455+
pin_on_acquire = true;
456+
} else if (!strcmp(flag, "TTL")) {
457+
/* TODO(wangyi): TTL passthrough is not implemented; parse and discard for now */
458+
char *ttl = strtok_r(args, " ", &args);
459+
if (!ttl || !strlen(ttl)) {
460+
printf("%s\n", invalid_args_msg);
461+
return;
462+
}
463+
} else {
464+
printf("%s\n", invalid_args_msg);
465+
return;
466+
}
467+
}
468+
469+
/* Align output field name with CLI flag semantics */
470+
printf("ACQUIRE key=%s [PIN=%d]\n", key, pin_on_acquire);
471+
/* TODO(wangyi): Support per-command TTL for pin-on-acquire (e.g., 'PIN TTL N')
472+
* - Parse TTL value and pass through protocol once pin_ttl_ms is supported.
473+
*/
474+
status = priskv_acquire(ctx->client, key, PRISKV_KEY_MAX_TIMEOUT, pin_on_acquire, &region);
421475
printf("ACQUIRE status(%d): %s, addr %p, length %u, token 0x%lx\n", status,
422476
priskv_status_str(status), (void *)region.addr, region.length, region.token);
423477
if (status == PRISKV_STATUS_OK) {
@@ -434,6 +488,7 @@ static void release_token_handler(client_context *ctx, char *args)
434488
char *tokstr, *str_end;
435489
uint64_t token = 0;
436490
priskv_status status;
491+
bool unpin_on_release = false;
437492

438493
tokstr = strtok_r(args, " ", &args);
439494
if (!tokstr) {
@@ -450,7 +505,22 @@ static void release_token_handler(client_context *ctx, char *args)
450505
return;
451506
}
452507
}
453-
status = priskv_release(ctx->client, &token);
508+
/* Parse optional flags: 'UNPIN' */
509+
while (args && strlen(args) > 0) {
510+
char *flag = strtok_r(args, " ", &args);
511+
if (!strcmp(flag, "UNPIN")) {
512+
unpin_on_release = true;
513+
} else {
514+
printf("%s\n", invalid_args_msg);
515+
return;
516+
}
517+
}
518+
519+
printf("RELEASE token=%" PRIu64" [UNPIN=%d]\n", token, unpin_on_release);
520+
/* TODO(wangyi): Diagnostics for UNPIN_NOT_CLOSED and TTL interactions
521+
* - Consider printing hints when UNPIN_NOT_CLOSED occurs to aid debugging.
522+
*/
523+
status = priskv_release(ctx->client, &token, unpin_on_release);
454524
printf("RELEASE status(%d): %s\n", status, priskv_status_str(status));
455525
}
456526

@@ -475,6 +545,7 @@ static void drop_token_handler(client_context *ctx, char *args)
475545
return;
476546
}
477547
}
548+
printf("DROP token=% \n" PRIu64, token);
478549
status = priskv_drop(ctx->client, &token);
479550
printf("DROP status(%d): %s\n", status, priskv_status_str(status));
480551
}
@@ -649,14 +720,14 @@ static priskv_command commands[] = {
649720
{"set", set_handler,
650721
"set KEY VALUE [ EX seconds | PX milliseconds ]\tset key:value to priskv\n"},
651722
{"alloc_set", alloc_set_handler,
652-
"alloc set KEY VALUE [ EX seconds | PX milliseconds ]\tset key:value to priskv\n"},
723+
"alloc_set KEY VALUE [ EX seconds | PX milliseconds ] [ PIN ]\tzero-copy set with optional pin on seal\n"},
653724
{"get", get_handler, "get KEY\t\t\t\t\t\tget key:value from priskv\n"},
654725
{"acquire_get", acquire_get_handler, "acquire get KEY\t\t\t\t\t\tget key:value from priskv\n"},
655726
{"alloc", alloc_only_handler,
656727
"alloc KEY BYTES [ EX seconds | PX milliseconds ]\t\tallocate region and print token\n"},
657-
{"seal", seal_token_handler, "seal TOKEN|last\t\t\t\t\tseal previously alloc'ed token\n"},
658-
{"acquire", acquire_only_handler, "acquire KEY\t\t\t\t\t\tacquire region and print token\n"},
659-
{"release", release_token_handler, "release TOKEN|last\t\t\t\t\trelease previously acquired token\n"},
728+
{"seal", seal_token_handler, "seal TOKEN|last [ PIN [ TTL milliseconds ] ]\t\tseal previously alloc'ed token\n"},
729+
{"acquire", acquire_only_handler, "acquire KEY [ PIN [ TTL milliseconds ] ]\t\tacquire region and print token\n"},
730+
{"release", release_token_handler, "release TOKEN|last [ UNPIN ]\t\trelease previously acquired token\n"},
660731
{"drop", drop_token_handler, "drop TOKEN|last\t\t\t\t\tDrop unpublished ALLOC token\n"},
661732
{"test", test_handler, "test KEY\t\t\t\t\t\ttest if the key exists in priskv\n"},
662733
{"delete", delete_handler, "delete KEY\t\t\t\t\t\tdelete the key from priskv\n"},

client/priskv.h

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ extern "C"
3333
#endif
3434

3535
#include <stdint.h>
36+
#include <stdbool.h>
37+
#include "priskv-protocol.h" /* Include protocol request flag definitions */
3638

3739
typedef struct priskv_client priskv_client;
3840
typedef struct priskv_memory priskv_memory;
@@ -105,6 +107,9 @@ typedef enum priskv_status {
105107
/* no such token */
106108
PRISKV_STATUS_NO_SUCH_TOKEN,
107109

110+
/* unpin requested when pin_count == 0 on latest version */
111+
PRISKV_STATUS_UNPIN_NOT_CLOSED,
112+
108113
/* invalid SGL. the number of SGL within a command must not exceed @max_sgl */
109114
PRISKV_STATUS_INVALID_SGL,
110115

@@ -169,6 +174,9 @@ static inline const char *priskv_status_str(priskv_status status)
169174
case PRISKV_STATUS_NO_SUCH_TOKEN:
170175
return "No such token";
171176

177+
case PRISKV_STATUS_UNPIN_NOT_CLOSED:
178+
return "Unpin operation not closed";
179+
172180
case PRISKV_STATUS_INVALID_SGL:
173181
return "Invalid SGL";
174182

@@ -262,16 +270,17 @@ int priskv_alloc_async(priskv_client *client, const char *key, uint32_t alloc_le
262270
uint64_t timeout, uint64_t request_id, priskv_generic_cb cb);
263271

264272
/* Seal memory region alloced for write (by token pointer, reuse key field) */
265-
int priskv_seal_async(priskv_client *client, const uint64_t *token, uint64_t request_id,
266-
priskv_generic_cb cb);
273+
int priskv_seal_async(priskv_client *client, const uint64_t *token, bool pin_on_seal,
274+
uint64_t request_id, priskv_generic_cb cb);
267275

268276
/* Acquire memory region for zero copy read */
269277
int priskv_acquire_async(priskv_client *client, const char *key, uint64_t timeout,
270-
uint64_t request_id, priskv_generic_cb cb);
278+
bool pin_on_acquire, uint64_t request_id, priskv_generic_cb cb);
271279

272280
/* Release memory region (by token pointer, reuse key field) */
273-
int priskv_release_async(priskv_client *client, const uint64_t *token, uint64_t request_id,
274-
priskv_generic_cb cb);
281+
int priskv_release_async(priskv_client *client, const uint64_t *token, bool unpin_on_release,
282+
uint64_t request_id, priskv_generic_cb cb);
283+
275284

276285
/* Drop memory region (by token pointer, reuse key field) */
277286
int priskv_drop_async(priskv_client *client, const uint64_t *token, uint64_t request_id,
@@ -325,11 +334,11 @@ uint64_t priskv_capacity(priskv_client *client);
325334

326335
int priskv_alloc(priskv_client *client, const char *key, uint32_t alloc_length, uint64_t timeout,
327336
priskv_memory_region *region);
328-
int priskv_seal(priskv_client *client, const uint64_t *token);
337+
int priskv_seal(priskv_client *client, const uint64_t *token, bool pin_on_seal);
329338

330339
int priskv_acquire(priskv_client *client, const char *key, uint64_t timeout,
331-
priskv_memory_region *region);
332-
int priskv_release(priskv_client *client, const uint64_t *token);
340+
bool pin_on_acquire, priskv_memory_region *region);
341+
int priskv_release(priskv_client *client, const uint64_t *token, bool unpin_on_release);
333342

334343
int priskv_drop(priskv_client *client, const uint64_t *token);
335344

client/sync.c

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -177,20 +177,22 @@ int priskv_alloc(priskv_client *client, const char *key, uint32_t alloc_length,
177177
return req_sync.status;
178178
}
179179

180-
int priskv_seal(priskv_client *client, const uint64_t *token)
180+
int priskv_seal(priskv_client *client, const uint64_t *token, bool pin_on_seal)
181181
{
182182
priskv_transport_req_sync req_sync = {.status = 0xffff, .done = false};
183-
priskv_seal_async(client, token, (uint64_t)&req_sync, priskv_common_sync_cb);
183+
priskv_seal_async(client, token, pin_on_seal, (uint64_t)&req_sync,
184+
priskv_common_sync_cb);
184185

185186
priskv_sync_wait(client, &req_sync.done);
186187
return req_sync.status;
187188
}
188189

189190
int priskv_acquire(priskv_client *client, const char *key, uint64_t timeout,
190-
priskv_memory_region *region)
191+
bool pin_on_acquire, priskv_memory_region *region)
191192
{
192193
priskv_transport_zero_copy_req_sync req_sync = {.status = 0xffff, .done = false};
193-
priskv_acquire_async(client, key, timeout, (uint64_t)&req_sync, priskv_zero_copy_req_sync_cb);
194+
priskv_acquire_async(client, key, timeout, pin_on_acquire, (uint64_t)&req_sync,
195+
priskv_zero_copy_req_sync_cb);
194196

195197
priskv_sync_wait(client, &req_sync.done);
196198
if (req_sync.status == PRISKV_STATUS_OK && region) {
@@ -201,10 +203,11 @@ int priskv_acquire(priskv_client *client, const char *key, uint64_t timeout,
201203
return req_sync.status;
202204
}
203205

204-
int priskv_release(priskv_client *client, const uint64_t *token)
206+
int priskv_release(priskv_client *client, const uint64_t *token, bool unpin_on_release)
205207
{
206208
priskv_transport_req_sync req_sync = {.status = 0xffff, .done = false};
207-
priskv_release_async(client, token, (uint64_t)&req_sync, priskv_common_sync_cb);
209+
priskv_release_async(client, token, unpin_on_release, (uint64_t)&req_sync,
210+
priskv_common_sync_cb);
208211
priskv_sync_wait(client, &req_sync.done);
209212

210213
return req_sync.status;

client/transport/rdma.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1256,6 +1256,7 @@ static int priskv_rdma_req_send(void *arg)
12561256
req->command = htobe16(rdma_req->cmd);
12571257
req->nsgl = htobe16(rdma_req->nsgl);
12581258
req->timeout = htobe64(rdma_req->timeout);
1259+
req->flags = htobe32(rdma_req->req_flags);
12591260
req->key_length = htobe16(rdma_req->keylen);
12601261

12611262
struct timeval client_metadata_send_time;

0 commit comments

Comments
 (0)