diff --git a/core-c-bridge/Cargo.toml b/core-c-bridge/Cargo.toml index ca20af40b..958f47d07 100644 --- a/core-c-bridge/Cargo.toml +++ b/core-c-bridge/Cargo.toml @@ -10,6 +10,7 @@ crate-type = ["cdylib"] [dependencies] anyhow = "1.0" async-trait = "0.1" +crossbeam-utils = "0.8" futures-util = { version = "0.3", default-features = false } http = "1.3" libc = "0.2" diff --git a/core-c-bridge/include/temporal-sdk-core-c-bridge.h b/core-c-bridge/include/temporal-sdk-core-c-bridge.h index 10a66e0fa..0bc69d4f5 100644 --- a/core-c-bridge/include/temporal-sdk-core-c-bridge.h +++ b/core-c-bridge/include/temporal-sdk-core-c-bridge.h @@ -81,6 +81,8 @@ typedef struct TemporalCoreRandom TemporalCoreRandom; typedef struct TemporalCoreRuntime TemporalCoreRuntime; +typedef struct TemporalCoreSlotReserveCompletionCtx TemporalCoreSlotReserveCompletionCtx; + typedef struct TemporalCoreWorker TemporalCoreWorker; typedef struct TemporalCoreWorkerReplayPusher TemporalCoreWorkerReplayPusher; @@ -570,18 +572,17 @@ typedef struct TemporalCoreSlotReserveCtx { struct TemporalCoreByteArrayRef worker_identity; struct TemporalCoreByteArrayRef worker_build_id; bool is_sticky; - void *token_src; } TemporalCoreSlotReserveCtx; -typedef void (*TemporalCoreCustomReserveSlotCallback)(const struct TemporalCoreSlotReserveCtx *ctx, - void *sender); +typedef void (*TemporalCoreCustomSlotSupplierReserveCallback)(const struct TemporalCoreSlotReserveCtx *ctx, + const struct TemporalCoreSlotReserveCompletionCtx *completion_ctx, + void *user_data); -typedef void (*TemporalCoreCustomCancelReserveCallback)(void *token_source); +typedef void (*TemporalCoreCustomSlotSupplierCancelReserveCallback)(const struct TemporalCoreSlotReserveCompletionCtx *completion_ctx, + void *user_data); -/** - * Must return C#-tracked id for the permit. A zero value means no permit was reserved. - */ -typedef uintptr_t (*TemporalCoreCustomTryReserveSlotCallback)(const struct TemporalCoreSlotReserveCtx *ctx); +typedef uintptr_t (*TemporalCoreCustomSlotSupplierTryReserveCallback)(const struct TemporalCoreSlotReserveCtx *ctx, + void *user_data); typedef enum TemporalCoreSlotInfo_Tag { WorkflowSlotInfo, @@ -621,32 +622,87 @@ typedef struct TemporalCoreSlotInfo { typedef struct TemporalCoreSlotMarkUsedCtx { struct TemporalCoreSlotInfo slot_info; /** - * C# id for the slot permit. + * Lang-issued permit ID. */ uintptr_t slot_permit; } TemporalCoreSlotMarkUsedCtx; -typedef void (*TemporalCoreCustomMarkSlotUsedCallback)(const struct TemporalCoreSlotMarkUsedCtx *ctx); +typedef void (*TemporalCoreCustomSlotSupplierMarkUsedCallback)(const struct TemporalCoreSlotMarkUsedCtx *ctx, + void *user_data); typedef struct TemporalCoreSlotReleaseCtx { const struct TemporalCoreSlotInfo *slot_info; /** - * C# id for the slot permit. + * Lang-issued permit ID. */ uintptr_t slot_permit; } TemporalCoreSlotReleaseCtx; -typedef void (*TemporalCoreCustomReleaseSlotCallback)(const struct TemporalCoreSlotReleaseCtx *ctx); +typedef void (*TemporalCoreCustomSlotSupplierReleaseCallback)(const struct TemporalCoreSlotReleaseCtx *ctx, + void *user_data); -typedef void (*TemporalCoreCustomSlotImplFreeCallback)(const struct TemporalCoreCustomSlotSupplierCallbacks *userimpl); +typedef bool (*TemporalCoreCustomSlotSupplierAvailableSlotsCallback)(uintptr_t *available_slots, + void *user_data); + +typedef void (*TemporalCoreCustomSlotSupplierFreeCallback)(const struct TemporalCoreCustomSlotSupplierCallbacks *userimpl); typedef struct TemporalCoreCustomSlotSupplierCallbacks { - TemporalCoreCustomReserveSlotCallback reserve; - TemporalCoreCustomCancelReserveCallback cancel_reserve; - TemporalCoreCustomTryReserveSlotCallback try_reserve; - TemporalCoreCustomMarkSlotUsedCallback mark_used; - TemporalCoreCustomReleaseSlotCallback release; - TemporalCoreCustomSlotImplFreeCallback free; + /** + * Called to initiate asynchronous slot reservation. `ctx` contains information about + * reservation request. The pointer is only valid for the duration of the function call; the + * implementation should copy the data out of it for later use, and return as soon as possible. + * + * When slot is reserved, the implementation should call [`temporal_core_complete_async_reserve`] + * with the same `completion_ctx` as passed to this function. Reservation cannot be cancelled + * by Lang, but it can be cancelled by Core through [`cancel_reserve`](Self::cancel_reserve) + * callback. If reservation was cancelled, [`temporal_core_complete_async_cancel_reserve`] + * should be called instead. + * + * Slot reservation cannot error. The implementation should recover from errors and keep trying + * to reserve a slot until it eventually succeeds, or until reservation is cancelled by Core. + */ + TemporalCoreCustomSlotSupplierReserveCallback reserve; + /** + * Called to cancel slot reservation. `completion_ctx` specifies which reservation is being + * cancelled; the matching [`reserve`](Self::reserve) call was made with the same `completion_ctx`. + * After cancellation, the implementation should call [`temporal_core_complete_async_cancel_reserve`] + * with the same `completion_ctx`. Calling [`temporal_core_complete_async_reserve`] is not + * needed after cancellation. + */ + TemporalCoreCustomSlotSupplierCancelReserveCallback cancel_reserve; + /** + * Called to try an immediate slot reservation. The callback should return 0 if immediate + * reservation is not currently possible, or permit ID if reservation was successful. Permit ID + * is arbitrary, but must be unique among live reservations as it's later used for [`mark_used`](Self::mark_used) + * and [`release`](Self::release) callbacks. + */ + TemporalCoreCustomSlotSupplierTryReserveCallback try_reserve; + /** + * Called after successful reservation to mark slot as used. See [`SlotSupplier`](temporal_sdk_core_api::worker::SlotSupplier) + * trait for details. + */ + TemporalCoreCustomSlotSupplierMarkUsedCallback mark_used; + /** + * Called to free a previously reserved slot. + */ + TemporalCoreCustomSlotSupplierReleaseCallback release; + /** + * Called to retrieve the number of available slots if known. If the implementation knows how + * many slots are available at the moment, it should set the value behind the `available_slots` + * pointer and return true. If that number is unknown, it should return false. + * + * This function pointer can be set to null. It will be treated as if the number of available + * slots is never known. + */ + TemporalCoreCustomSlotSupplierAvailableSlotsCallback available_slots; + /** + * Called when the slot supplier is being dropped. All resources should be freed. + */ + TemporalCoreCustomSlotSupplierFreeCallback free; + /** + * Passed as an extra argument to the callbacks. + */ + void *user_data; } TemporalCoreCustomSlotSupplierCallbacks; typedef struct TemporalCoreCustomSlotSupplierCallbacksImpl { @@ -984,10 +1040,44 @@ struct TemporalCoreWorkerReplayPushResult temporal_core_worker_replay_push(struc struct TemporalCoreByteArrayRef workflow_id, struct TemporalCoreByteArrayRef history); -void temporal_core_complete_async_reserve(void *sender, uintptr_t permit_id); +/** + * Completes asynchronous slot reservation started by a call to [`CustomSlotSupplierCallbacks::reserve`]. + * + * `completion_ctx` must be the same as the one passed to the matching [`reserve`](CustomSlotSupplierCallbacks::reserve) + * call. `permit_id` is arbitrary, but must be unique among live reservations as it's later used + * for [`mark_used`](CustomSlotSupplierCallbacks::mark_used) and [`release`](CustomSlotSupplierCallbacks::release) + * callbacks. + * + * This function returns true if the reservation was completed successfully, or false if the + * reservation was cancelled before completion. If this function returns false, the implementation + * should call [`temporal_core_complete_async_cancel_reserve`] with the same `completion_ctx`. + * + * **Caution:** if this function returns true, `completion_ctx` gets freed. Afterwards, calling + * either [`temporal_core_complete_async_reserve`] or [`temporal_core_complete_async_cancel_reserve`] + * with the same `completion_ctx` will cause **memory corruption!** + */ +bool temporal_core_complete_async_reserve(const struct TemporalCoreSlotReserveCompletionCtx *completion_ctx, + uintptr_t permit_id); -void temporal_core_set_reserve_cancel_target(struct TemporalCoreSlotReserveCtx *ctx, - void *token_ptr); +/** + * Completes cancellation of asynchronous slot reservation. + * + * Cancellation can only be initiated by Core. It's done by calling [`CustomSlotSupplierCallbacks::cancel_reserve`] + * after an earlier call to [`CustomSlotSupplierCallbacks::reserve`]. + * + * `completion_ctx` must be the same as the one passed to the matching [`cancel_reserve`](CustomSlotSupplierCallbacks::cancel_reserve) + * call. + * + * This function returns true on successful cancellation, or false if cancellation was not + * requested for the given `completion_ctx`. A false value indicates there's likely a logic bug in + * the implementation where it doesn't correctly wait for [`cancel_reserve`](CustomSlotSupplierCallbacks::cancel_reserve) + * callback to be called. + * + * **Caution:** if this function returns true, `completion_ctx` gets freed. Afterwards, calling + * either [`temporal_core_complete_async_reserve`] or [`temporal_core_complete_async_cancel_reserve`] + * with the same `completion_ctx` will cause **memory corruption!** + */ +bool temporal_core_complete_async_cancel_reserve(const struct TemporalCoreSlotReserveCompletionCtx *completion_ctx); #ifdef __cplusplus } // extern "C" diff --git a/core-c-bridge/src/worker.rs b/core-c-bridge/src/worker.rs index 8eaa13f26..6308dd8bc 100644 --- a/core-c-bridge/src/worker.rs +++ b/core-c-bridge/src/worker.rs @@ -2,9 +2,11 @@ use crate::{ ByteArray, ByteArrayRef, ByteArrayRefArray, UserDataHandle, client::Client, runtime::Runtime, }; use anyhow::{Context, bail}; +use crossbeam_utils::atomic::AtomicCell; use prost::Message; use std::{ collections::{HashMap, HashSet}, + num::NonZero, sync::Arc, time::Duration, }; @@ -28,8 +30,8 @@ use temporal_sdk_core_protos::{ temporal::api::history::v1::History, }; use tokio::sync::{ + Notify, mpsc::{Sender, channel}, - oneshot, }; use tokio_stream::wrappers::ReceiverStream; @@ -165,14 +167,24 @@ struct CustomSlotSupplier { unsafe impl Send for CustomSlotSupplier {} unsafe impl Sync for CustomSlotSupplier {} -pub type CustomReserveSlotCallback = - unsafe extern "C" fn(ctx: *const SlotReserveCtx, sender: *mut libc::c_void); -pub type CustomCancelReserveCallback = unsafe extern "C" fn(token_source: *mut libc::c_void); -/// Must return C#-tracked id for the permit. A zero value means no permit was reserved. -pub type CustomTryReserveSlotCallback = unsafe extern "C" fn(ctx: *const SlotReserveCtx) -> usize; -pub type CustomMarkSlotUsedCallback = unsafe extern "C" fn(ctx: *const SlotMarkUsedCtx); -pub type CustomReleaseSlotCallback = unsafe extern "C" fn(ctx: *const SlotReleaseCtx); -pub type CustomSlotImplFreeCallback = +pub type CustomSlotSupplierReserveCallback = unsafe extern "C" fn( + ctx: *const SlotReserveCtx, + completion_ctx: *const SlotReserveCompletionCtx, + user_data: *mut libc::c_void, +); +pub type CustomSlotSupplierCancelReserveCallback = unsafe extern "C" fn( + completion_ctx: *const SlotReserveCompletionCtx, + user_data: *mut libc::c_void, +); +pub type CustomSlotSupplierTryReserveCallback = + unsafe extern "C" fn(ctx: *const SlotReserveCtx, user_data: *mut libc::c_void) -> usize; +pub type CustomSlotSupplierMarkUsedCallback = + unsafe extern "C" fn(ctx: *const SlotMarkUsedCtx, user_data: *mut libc::c_void); +pub type CustomSlotSupplierReleaseCallback = + unsafe extern "C" fn(ctx: *const SlotReleaseCtx, user_data: *mut libc::c_void); +pub type CustomSlotSupplierAvailableSlotsCallback = + Option bool>; +pub type CustomSlotSupplierFreeCallback = unsafe extern "C" fn(userimpl: *const CustomSlotSupplierCallbacks); #[repr(C)] @@ -181,12 +193,46 @@ pub struct CustomSlotSupplierCallbacksImpl(pub *const CustomSlotSupplierCallback #[repr(C)] pub struct CustomSlotSupplierCallbacks { - pub reserve: CustomReserveSlotCallback, - pub cancel_reserve: CustomCancelReserveCallback, - pub try_reserve: CustomTryReserveSlotCallback, - pub mark_used: CustomMarkSlotUsedCallback, - pub release: CustomReleaseSlotCallback, - pub free: CustomSlotImplFreeCallback, + /// Called to initiate asynchronous slot reservation. `ctx` contains information about + /// reservation request. The pointer is only valid for the duration of the function call; the + /// implementation should copy the data out of it for later use, and return as soon as possible. + /// + /// When slot is reserved, the implementation should call [`temporal_core_complete_async_reserve`] + /// with the same `completion_ctx` as passed to this function. Reservation cannot be cancelled + /// by Lang, but it can be cancelled by Core through [`cancel_reserve`](Self::cancel_reserve) + /// callback. If reservation was cancelled, [`temporal_core_complete_async_cancel_reserve`] + /// should be called instead. + /// + /// Slot reservation cannot error. The implementation should recover from errors and keep trying + /// to reserve a slot until it eventually succeeds, or until reservation is cancelled by Core. + pub reserve: CustomSlotSupplierReserveCallback, + /// Called to cancel slot reservation. `completion_ctx` specifies which reservation is being + /// cancelled; the matching [`reserve`](Self::reserve) call was made with the same `completion_ctx`. + /// After cancellation, the implementation should call [`temporal_core_complete_async_cancel_reserve`] + /// with the same `completion_ctx`. Calling [`temporal_core_complete_async_reserve`] is not + /// needed after cancellation. + pub cancel_reserve: CustomSlotSupplierCancelReserveCallback, + /// Called to try an immediate slot reservation. The callback should return 0 if immediate + /// reservation is not currently possible, or permit ID if reservation was successful. Permit ID + /// is arbitrary, but must be unique among live reservations as it's later used for [`mark_used`](Self::mark_used) + /// and [`release`](Self::release) callbacks. + pub try_reserve: CustomSlotSupplierTryReserveCallback, + /// Called after successful reservation to mark slot as used. See [`SlotSupplier`](temporal_sdk_core_api::worker::SlotSupplier) + /// trait for details. + pub mark_used: CustomSlotSupplierMarkUsedCallback, + /// Called to free a previously reserved slot. + pub release: CustomSlotSupplierReleaseCallback, + /// Called to retrieve the number of available slots if known. If the implementation knows how + /// many slots are available at the moment, it should set the value behind the `available_slots` + /// pointer and return true. If that number is unknown, it should return false. + /// + /// This function pointer can be set to null. It will be treated as if the number of available + /// slots is never known. + pub available_slots: CustomSlotSupplierAvailableSlotsCallback, + /// Called when the slot supplier is being dropped. All resources should be freed. + pub free: CustomSlotSupplierFreeCallback, + /// Passed as an extra argument to the callbacks. + pub user_data: *mut libc::c_void, } impl CustomSlotSupplierCallbacksImpl { @@ -223,8 +269,6 @@ pub struct SlotReserveCtx { pub worker_identity: ByteArrayRef, pub worker_build_id: ByteArrayRef, pub is_sticky: bool, - // The C# side will store a pointer here to the cancellation token source - pub token_src: *mut libc::c_void, } unsafe impl Send for SlotReserveCtx {} @@ -249,31 +293,67 @@ pub enum SlotInfo { #[repr(C)] pub struct SlotMarkUsedCtx { pub slot_info: SlotInfo, - /// C# id for the slot permit. + /// Lang-issued permit ID. pub slot_permit: usize, } #[repr(C)] pub struct SlotReleaseCtx { pub slot_info: *const SlotInfo, - /// C# id for the slot permit. + /// Lang-issued permit ID. pub slot_permit: usize, } -struct CancelReserveGuard { - token_src: *mut libc::c_void, - callback: CustomCancelReserveCallback, +pub struct SlotReserveCompletionCtx { + state: AtomicCell, + notify: Notify, } -impl Drop for CancelReserveGuard { + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum SlotReserveOperationState { + Pending, + Cancelled, + Completed(NonZero), +} + +struct CancelReserveGuard<'a, SK: SlotKind + Send + Sync> { + slot_supplier: &'a CustomSlotSupplier, + completion_ctx: Arc, + completed: bool, +} + +impl<'a, SK: SlotKind + Send + Sync> Drop for CancelReserveGuard<'a, SK> { fn drop(&mut self) { - if !self.token_src.is_null() { + // do not cancel if already completed + if !self.completed { + let state = self + .completion_ctx + .state + .swap(SlotReserveOperationState::Cancelled); unsafe { - (self.callback)(self.token_src); + let inner = &*self.slot_supplier.inner.0; + match state { + SlotReserveOperationState::Cancelled => { + // This situation should never happen, but on the other hand, it doesn't + // result in any unsafety, deadlock or leak. It's safe to ignore it, but in + // debug builds we'd like to know it happened. + debug_assert!(false, "slot reservation cancelled twice") + } + SlotReserveOperationState::Pending => { + (inner.cancel_reserve)(Arc::as_ptr(&self.completion_ctx), inner.user_data) + } + SlotReserveOperationState::Completed(slot_permit) => (inner.release)( + &SlotReleaseCtx { + slot_info: std::ptr::null(), + slot_permit: slot_permit.into(), + }, + inner.user_data, + ), + } } } } } -unsafe impl Send for CancelReserveGuard {} #[async_trait::async_trait] impl temporal_sdk_core_api::worker::SlotSupplier @@ -282,36 +362,53 @@ impl temporal_sdk_core_api::worker::SlotSupplier type SlotKind = SK; async fn reserve_slot(&self, ctx: &dyn SlotReservationContext) -> SlotSupplierPermit { - let (tx, rx) = oneshot::channel(); let ctx = Self::convert_reserve_ctx(ctx); - let tx = Box::into_raw(Box::new(tx)) as *mut libc::c_void; + let completion_ctx = Arc::new(SlotReserveCompletionCtx { + state: AtomicCell::new(SlotReserveOperationState::Pending), + notify: Notify::new(), + }); unsafe { - let _drop_guard = CancelReserveGuard { - token_src: ctx.token_src, - callback: (*self.inner.0).cancel_reserve, - }; - ((*self.inner.0).reserve)(&ctx, tx); - rx.await.expect("reserve channel is not closed") + let inner = &*self.inner.0; + (inner.reserve)(&ctx, Arc::into_raw(completion_ctx.clone()), inner.user_data); + } + let mut guard = CancelReserveGuard { + slot_supplier: self, + completion_ctx, + completed: false, + }; + // if the future is dropped before this await resolves, the guard is dropped which triggers cancellation + guard.completion_ctx.notify.notified().await; + guard.completed = true; + match guard.completion_ctx.state.load() { + SlotReserveOperationState::Completed(permit_id) => { + SlotSupplierPermit::with_user_data::(permit_id.get()) + } + other => panic!("Unexpected slot reservation state: expected Completed, got {other:?}"), } } fn try_reserve_slot(&self, ctx: &dyn SlotReservationContext) -> Option { let ctx = Self::convert_reserve_ctx(ctx); - let permit_id = unsafe { ((*self.inner.0).try_reserve)(&ctx) }; + let permit_id = unsafe { ((*self.inner.0).try_reserve)(&ctx, (*self.inner.0).user_data) }; if permit_id == 0 { None } else { - Some(SlotSupplierPermit::with_user_data(permit_id)) + Some(SlotSupplierPermit::with_user_data::(permit_id)) } } fn mark_slot_used(&self, ctx: &dyn SlotMarkUsedContext) { let ctx = SlotMarkUsedCtx { slot_info: Self::convert_slot_info(ctx.info().downcast()), - slot_permit: ctx.permit().user_data::().copied().unwrap_or(0), + slot_permit: ctx + .permit() + .user_data::() + .copied() + .expect("permit user data should be usize"), }; unsafe { - ((*self.inner.0).mark_used)(&ctx); + let inner = &*self.inner.0; + (inner.mark_used)(&ctx, inner.user_data); } } @@ -323,15 +420,26 @@ impl temporal_sdk_core_api::worker::SlotSupplier } let ctx = SlotReleaseCtx { slot_info: info_ptr, - slot_permit: ctx.permit().user_data::().copied().unwrap_or(0), + slot_permit: ctx + .permit() + .user_data::() + .copied() + .expect("permit user data should be usize"), }; unsafe { - ((*self.inner.0).release)(&ctx); + let inner = &*self.inner.0; + (inner.release)(&ctx, inner.user_data); } } fn available_slots(&self) -> Option { - None + unsafe { + let inner = &*self.inner.0; + inner.available_slots.and_then(|f| { + let mut available_slots = 0; + f(&mut available_slots, inner.user_data).then_some(available_slots) + }) + } } } @@ -360,7 +468,6 @@ impl CustomSlotSupplier { ByteArrayRef::empty() }, is_sticky: ctx.is_sticky(), - token_src: std::ptr::null_mut(), } } @@ -939,29 +1046,88 @@ pub extern "C" fn temporal_core_worker_replay_push( } } +/// Completes asynchronous slot reservation started by a call to [`CustomSlotSupplierCallbacks::reserve`]. +/// +/// `completion_ctx` must be the same as the one passed to the matching [`reserve`](CustomSlotSupplierCallbacks::reserve) +/// call. `permit_id` is arbitrary, but must be unique among live reservations as it's later used +/// for [`mark_used`](CustomSlotSupplierCallbacks::mark_used) and [`release`](CustomSlotSupplierCallbacks::release) +/// callbacks. +/// +/// This function returns true if the reservation was completed successfully, or false if the +/// reservation was cancelled before completion. If this function returns false, the implementation +/// should call [`temporal_core_complete_async_cancel_reserve`] with the same `completion_ctx`. +/// +/// **Caution:** if this function returns true, `completion_ctx` gets freed. Afterwards, calling +/// either [`temporal_core_complete_async_reserve`] or [`temporal_core_complete_async_cancel_reserve`] +/// with the same `completion_ctx` will cause **memory corruption!** #[unsafe(no_mangle)] pub extern "C" fn temporal_core_complete_async_reserve( - sender: *mut libc::c_void, + completion_ctx: *const SlotReserveCompletionCtx, permit_id: usize, -) { - if !sender.is_null() { - unsafe { - let sender = Box::from_raw(sender as *mut oneshot::Sender); - let permit = SlotSupplierPermit::with_user_data(permit_id); - let _ = sender.send(permit); +) -> bool { + if completion_ctx.is_null() { + panic!("completion_ctx is null"); + } + let permit_id = + NonZero::new(permit_id).expect("permit_id cannot be 0 on successful reservation"); + let prev_state = unsafe { + // Not turning completion_ctx into Arc yet as we only want to deallocate it on success + (*completion_ctx).state.compare_exchange( + SlotReserveOperationState::Pending, + SlotReserveOperationState::Completed(permit_id), + ) + }; + match prev_state { + Ok(_) => { + let completion_ctx = unsafe { Arc::from_raw(completion_ctx) }; + completion_ctx.notify.notify_one(); + true } - } else { - panic!("ReserveSlot sender must not be null!"); + Err(SlotReserveOperationState::Cancelled) => false, + Err(SlotReserveOperationState::Completed(prev_permit_id)) => { + panic!( + "temporal_core_complete_async_reserve called twice for the same reservation - first permit ID {prev_permit_id}, second permit ID {permit_id}" + ) + } + Err(SlotReserveOperationState::Pending) => unreachable!(), } } +/// Completes cancellation of asynchronous slot reservation. +/// +/// Cancellation can only be initiated by Core. It's done by calling [`CustomSlotSupplierCallbacks::cancel_reserve`] +/// after an earlier call to [`CustomSlotSupplierCallbacks::reserve`]. +/// +/// `completion_ctx` must be the same as the one passed to the matching [`cancel_reserve`](CustomSlotSupplierCallbacks::cancel_reserve) +/// call. +/// +/// This function returns true on successful cancellation, or false if cancellation was not +/// requested for the given `completion_ctx`. A false value indicates there's likely a logic bug in +/// the implementation where it doesn't correctly wait for [`cancel_reserve`](CustomSlotSupplierCallbacks::cancel_reserve) +/// callback to be called. +/// +/// **Caution:** if this function returns true, `completion_ctx` gets freed. Afterwards, calling +/// either [`temporal_core_complete_async_reserve`] or [`temporal_core_complete_async_cancel_reserve`] +/// with the same `completion_ctx` will cause **memory corruption!** #[unsafe(no_mangle)] -pub extern "C" fn temporal_core_set_reserve_cancel_target( - ctx: *mut SlotReserveCtx, - token_ptr: *mut libc::c_void, -) { - if let Some(ctx) = unsafe { ctx.as_mut() } { - ctx.token_src = token_ptr; +pub extern "C" fn temporal_core_complete_async_cancel_reserve( + completion_ctx: *const SlotReserveCompletionCtx, +) -> bool { + if completion_ctx.is_null() { + panic!("completion_ctx is null"); + } + let state = unsafe { (*completion_ctx).state.load() }; + match state { + SlotReserveOperationState::Cancelled => { + drop(unsafe { Arc::from_raw(completion_ctx) }); + true + } + SlotReserveOperationState::Pending => false, + SlotReserveOperationState::Completed(permit_id) => { + panic!( + "temporal_core_complete_async_cancel_reserve called on completed reservation - permit ID {permit_id}" + ) + } } }