Skip to content

Commit ee88c04

Browse files
authored
💥 C Bridge: Custom slot supplier rework (#1025)
1 parent de67417 commit ee88c04

File tree

3 files changed

+336
-79
lines changed

3 files changed

+336
-79
lines changed

core-c-bridge/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ crate-type = ["cdylib"]
1010
[dependencies]
1111
anyhow = "1.0"
1212
async-trait = "0.1"
13+
crossbeam-utils = "0.8"
1314
futures-util = { version = "0.3", default-features = false }
1415
http = "1.3"
1516
libc = "0.2"

core-c-bridge/include/temporal-sdk-core-c-bridge.h

Lines changed: 112 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,8 @@ typedef struct TemporalCoreRandom TemporalCoreRandom;
8181

8282
typedef struct TemporalCoreRuntime TemporalCoreRuntime;
8383

84+
typedef struct TemporalCoreSlotReserveCompletionCtx TemporalCoreSlotReserveCompletionCtx;
85+
8486
typedef struct TemporalCoreWorker TemporalCoreWorker;
8587

8688
typedef struct TemporalCoreWorkerReplayPusher TemporalCoreWorkerReplayPusher;
@@ -570,18 +572,17 @@ typedef struct TemporalCoreSlotReserveCtx {
570572
struct TemporalCoreByteArrayRef worker_identity;
571573
struct TemporalCoreByteArrayRef worker_build_id;
572574
bool is_sticky;
573-
void *token_src;
574575
} TemporalCoreSlotReserveCtx;
575576

576-
typedef void (*TemporalCoreCustomReserveSlotCallback)(const struct TemporalCoreSlotReserveCtx *ctx,
577-
void *sender);
577+
typedef void (*TemporalCoreCustomSlotSupplierReserveCallback)(const struct TemporalCoreSlotReserveCtx *ctx,
578+
const struct TemporalCoreSlotReserveCompletionCtx *completion_ctx,
579+
void *user_data);
578580

579-
typedef void (*TemporalCoreCustomCancelReserveCallback)(void *token_source);
581+
typedef void (*TemporalCoreCustomSlotSupplierCancelReserveCallback)(const struct TemporalCoreSlotReserveCompletionCtx *completion_ctx,
582+
void *user_data);
580583

581-
/**
582-
* Must return C#-tracked id for the permit. A zero value means no permit was reserved.
583-
*/
584-
typedef uintptr_t (*TemporalCoreCustomTryReserveSlotCallback)(const struct TemporalCoreSlotReserveCtx *ctx);
584+
typedef uintptr_t (*TemporalCoreCustomSlotSupplierTryReserveCallback)(const struct TemporalCoreSlotReserveCtx *ctx,
585+
void *user_data);
585586

586587
typedef enum TemporalCoreSlotInfo_Tag {
587588
WorkflowSlotInfo,
@@ -621,32 +622,87 @@ typedef struct TemporalCoreSlotInfo {
621622
typedef struct TemporalCoreSlotMarkUsedCtx {
622623
struct TemporalCoreSlotInfo slot_info;
623624
/**
624-
* C# id for the slot permit.
625+
* Lang-issued permit ID.
625626
*/
626627
uintptr_t slot_permit;
627628
} TemporalCoreSlotMarkUsedCtx;
628629

629-
typedef void (*TemporalCoreCustomMarkSlotUsedCallback)(const struct TemporalCoreSlotMarkUsedCtx *ctx);
630+
typedef void (*TemporalCoreCustomSlotSupplierMarkUsedCallback)(const struct TemporalCoreSlotMarkUsedCtx *ctx,
631+
void *user_data);
630632

631633
typedef struct TemporalCoreSlotReleaseCtx {
632634
const struct TemporalCoreSlotInfo *slot_info;
633635
/**
634-
* C# id for the slot permit.
636+
* Lang-issued permit ID.
635637
*/
636638
uintptr_t slot_permit;
637639
} TemporalCoreSlotReleaseCtx;
638640

639-
typedef void (*TemporalCoreCustomReleaseSlotCallback)(const struct TemporalCoreSlotReleaseCtx *ctx);
641+
typedef void (*TemporalCoreCustomSlotSupplierReleaseCallback)(const struct TemporalCoreSlotReleaseCtx *ctx,
642+
void *user_data);
640643

641-
typedef void (*TemporalCoreCustomSlotImplFreeCallback)(const struct TemporalCoreCustomSlotSupplierCallbacks *userimpl);
644+
typedef bool (*TemporalCoreCustomSlotSupplierAvailableSlotsCallback)(uintptr_t *available_slots,
645+
void *user_data);
646+
647+
typedef void (*TemporalCoreCustomSlotSupplierFreeCallback)(const struct TemporalCoreCustomSlotSupplierCallbacks *userimpl);
642648

643649
typedef struct TemporalCoreCustomSlotSupplierCallbacks {
644-
TemporalCoreCustomReserveSlotCallback reserve;
645-
TemporalCoreCustomCancelReserveCallback cancel_reserve;
646-
TemporalCoreCustomTryReserveSlotCallback try_reserve;
647-
TemporalCoreCustomMarkSlotUsedCallback mark_used;
648-
TemporalCoreCustomReleaseSlotCallback release;
649-
TemporalCoreCustomSlotImplFreeCallback free;
650+
/**
651+
* Called to initiate asynchronous slot reservation. `ctx` contains information about
652+
* reservation request. The pointer is only valid for the duration of the function call; the
653+
* implementation should copy the data out of it for later use, and return as soon as possible.
654+
*
655+
* When slot is reserved, the implementation should call [`temporal_core_complete_async_reserve`]
656+
* with the same `completion_ctx` as passed to this function. Reservation cannot be cancelled
657+
* by Lang, but it can be cancelled by Core through [`cancel_reserve`](Self::cancel_reserve)
658+
* callback. If reservation was cancelled, [`temporal_core_complete_async_cancel_reserve`]
659+
* should be called instead.
660+
*
661+
* Slot reservation cannot error. The implementation should recover from errors and keep trying
662+
* to reserve a slot until it eventually succeeds, or until reservation is cancelled by Core.
663+
*/
664+
TemporalCoreCustomSlotSupplierReserveCallback reserve;
665+
/**
666+
* Called to cancel slot reservation. `completion_ctx` specifies which reservation is being
667+
* cancelled; the matching [`reserve`](Self::reserve) call was made with the same `completion_ctx`.
668+
* After cancellation, the implementation should call [`temporal_core_complete_async_cancel_reserve`]
669+
* with the same `completion_ctx`. Calling [`temporal_core_complete_async_reserve`] is not
670+
* needed after cancellation.
671+
*/
672+
TemporalCoreCustomSlotSupplierCancelReserveCallback cancel_reserve;
673+
/**
674+
* Called to try an immediate slot reservation. The callback should return 0 if immediate
675+
* reservation is not currently possible, or permit ID if reservation was successful. Permit ID
676+
* is arbitrary, but must be unique among live reservations as it's later used for [`mark_used`](Self::mark_used)
677+
* and [`release`](Self::release) callbacks.
678+
*/
679+
TemporalCoreCustomSlotSupplierTryReserveCallback try_reserve;
680+
/**
681+
* Called after successful reservation to mark slot as used. See [`SlotSupplier`](temporal_sdk_core_api::worker::SlotSupplier)
682+
* trait for details.
683+
*/
684+
TemporalCoreCustomSlotSupplierMarkUsedCallback mark_used;
685+
/**
686+
* Called to free a previously reserved slot.
687+
*/
688+
TemporalCoreCustomSlotSupplierReleaseCallback release;
689+
/**
690+
* Called to retrieve the number of available slots if known. If the implementation knows how
691+
* many slots are available at the moment, it should set the value behind the `available_slots`
692+
* pointer and return true. If that number is unknown, it should return false.
693+
*
694+
* This function pointer can be set to null. It will be treated as if the number of available
695+
* slots is never known.
696+
*/
697+
TemporalCoreCustomSlotSupplierAvailableSlotsCallback available_slots;
698+
/**
699+
* Called when the slot supplier is being dropped. All resources should be freed.
700+
*/
701+
TemporalCoreCustomSlotSupplierFreeCallback free;
702+
/**
703+
* Passed as an extra argument to the callbacks.
704+
*/
705+
void *user_data;
650706
} TemporalCoreCustomSlotSupplierCallbacks;
651707

652708
typedef struct TemporalCoreCustomSlotSupplierCallbacksImpl {
@@ -984,10 +1040,44 @@ struct TemporalCoreWorkerReplayPushResult temporal_core_worker_replay_push(struc
9841040
struct TemporalCoreByteArrayRef workflow_id,
9851041
struct TemporalCoreByteArrayRef history);
9861042

987-
void temporal_core_complete_async_reserve(void *sender, uintptr_t permit_id);
1043+
/**
1044+
* Completes asynchronous slot reservation started by a call to [`CustomSlotSupplierCallbacks::reserve`].
1045+
*
1046+
* `completion_ctx` must be the same as the one passed to the matching [`reserve`](CustomSlotSupplierCallbacks::reserve)
1047+
* call. `permit_id` is arbitrary, but must be unique among live reservations as it's later used
1048+
* for [`mark_used`](CustomSlotSupplierCallbacks::mark_used) and [`release`](CustomSlotSupplierCallbacks::release)
1049+
* callbacks.
1050+
*
1051+
* This function returns true if the reservation was completed successfully, or false if the
1052+
* reservation was cancelled before completion. If this function returns false, the implementation
1053+
* should call [`temporal_core_complete_async_cancel_reserve`] with the same `completion_ctx`.
1054+
*
1055+
* **Caution:** if this function returns true, `completion_ctx` gets freed. Afterwards, calling
1056+
* either [`temporal_core_complete_async_reserve`] or [`temporal_core_complete_async_cancel_reserve`]
1057+
* with the same `completion_ctx` will cause **memory corruption!**
1058+
*/
1059+
bool temporal_core_complete_async_reserve(const struct TemporalCoreSlotReserveCompletionCtx *completion_ctx,
1060+
uintptr_t permit_id);
9881061

989-
void temporal_core_set_reserve_cancel_target(struct TemporalCoreSlotReserveCtx *ctx,
990-
void *token_ptr);
1062+
/**
1063+
* Completes cancellation of asynchronous slot reservation.
1064+
*
1065+
* Cancellation can only be initiated by Core. It's done by calling [`CustomSlotSupplierCallbacks::cancel_reserve`]
1066+
* after an earlier call to [`CustomSlotSupplierCallbacks::reserve`].
1067+
*
1068+
* `completion_ctx` must be the same as the one passed to the matching [`cancel_reserve`](CustomSlotSupplierCallbacks::cancel_reserve)
1069+
* call.
1070+
*
1071+
* This function returns true on successful cancellation, or false if cancellation was not
1072+
* requested for the given `completion_ctx`. A false value indicates there's likely a logic bug in
1073+
* the implementation where it doesn't correctly wait for [`cancel_reserve`](CustomSlotSupplierCallbacks::cancel_reserve)
1074+
* callback to be called.
1075+
*
1076+
* **Caution:** if this function returns true, `completion_ctx` gets freed. Afterwards, calling
1077+
* either [`temporal_core_complete_async_reserve`] or [`temporal_core_complete_async_cancel_reserve`]
1078+
* with the same `completion_ctx` will cause **memory corruption!**
1079+
*/
1080+
bool temporal_core_complete_async_cancel_reserve(const struct TemporalCoreSlotReserveCompletionCtx *completion_ctx);
9911081

9921082
#ifdef __cplusplus
9931083
} // extern "C"

0 commit comments

Comments
 (0)