Skip to content

Commit 8f822c1

Browse files
committed
Make return values work
1 parent e1d94b8 commit 8f822c1

File tree

3 files changed

+43
-16
lines changed

3 files changed

+43
-16
lines changed

.gitignore

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,6 @@ obj/
77
/tests/golangworker/golangworker
88
/.vs
99
/.vscode
10-
/.idea
10+
/.idea
11+
/.zed
12+
Temporalio.sln.DotSettings.user

src/Temporalio/Bridge/include/temporal-sdk-bridge.h

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -391,9 +391,12 @@ typedef struct SlotReserveCtx {
391391
bool is_sticky;
392392
} SlotReserveCtx;
393393

394-
typedef void (*CustomReserveSlotCallback)(struct SlotReserveCtx ctx);
394+
typedef void (*CustomReserveSlotCallback)(struct SlotReserveCtx ctx, void *sender);
395395

396-
typedef void (*CustomTryReserveSlotCallback)(struct SlotReserveCtx ctx);
396+
/**
397+
* Must return pointer to a C# object inheriting from SlotPermit
398+
*/
399+
typedef const void *(*CustomTryReserveSlotCallback)(struct SlotReserveCtx ctx);
397400

398401
typedef enum SlotInfo_Tag {
399402
WorkflowSlotInfo,
@@ -689,6 +692,8 @@ struct WorkerReplayPushResult worker_replay_push(struct Worker *worker,
689692
struct ByteArrayRef workflow_id,
690693
struct ByteArrayRef history);
691694

695+
void complete_async_reserve(void *sender, const void *permit);
696+
692697
#ifdef __cplusplus
693698
} // extern "C"
694699
#endif // __cplusplus

src/Temporalio/Bridge/src/worker.rs

Lines changed: 33 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use temporal_sdk_core_protos::coresdk::ActivityHeartbeat;
2424
use temporal_sdk_core_protos::coresdk::ActivityTaskCompletion;
2525
use temporal_sdk_core_protos::temporal::api::history::v1::History;
2626
use tokio::sync::mpsc::{channel, Sender};
27+
use tokio::sync::oneshot;
2728
use tokio_stream::wrappers::ReceiverStream;
2829

2930
use std::collections::HashMap;
@@ -84,8 +85,20 @@ pub struct ResourceBasedSlotSupplier {
8485
tuner_options: ResourceBasedTunerOptions,
8586
}
8687

87-
type CustomReserveSlotCallback = unsafe extern "C" fn(ctx: SlotReserveCtx);
88-
type CustomTryReserveSlotCallback = unsafe extern "C" fn(ctx: SlotReserveCtx);
88+
#[repr(C)]
89+
pub struct CustomSlotSupplier<SK> {
90+
inner: CustomSlotSupplierCallbacksImpl,
91+
_pd: std::marker::PhantomData<SK>,
92+
}
93+
94+
unsafe impl<SK> Send for CustomSlotSupplier<SK> {}
95+
unsafe impl<SK> Sync for CustomSlotSupplier<SK> {}
96+
97+
type CustomReserveSlotCallback =
98+
unsafe extern "C" fn(ctx: SlotReserveCtx, sender: *mut libc::c_void);
99+
/// Must return pointer to a C# object inheriting from SlotPermit
100+
type CustomTryReserveSlotCallback =
101+
unsafe extern "C" fn(ctx: SlotReserveCtx) -> *const libc::c_void;
89102
type CustomMarkSlotUsedCallback = unsafe extern "C" fn(ctx: SlotMarkUsedCtx);
90103
type CustomReleaseSlotCallback = unsafe extern "C" fn(ctx: SlotReleaseCtx);
91104

@@ -113,15 +126,6 @@ impl CustomSlotSupplierCallbacksImpl {
113126
}
114127
}
115128

116-
#[repr(C)]
117-
pub struct CustomSlotSupplier<SK> {
118-
inner: CustomSlotSupplierCallbacksImpl,
119-
_pd: std::marker::PhantomData<SK>,
120-
}
121-
122-
unsafe impl<SK> Send for CustomSlotSupplier<SK> {}
123-
unsafe impl<SK> Sync for CustomSlotSupplier<SK> {}
124-
125129
#[repr(C)]
126130
pub enum SlotKindType {
127131
WorkflowSlotKindType,
@@ -173,11 +177,13 @@ impl<SK: SlotKind + Send + Sync> temporal_sdk_core_api::worker::SlotSupplier
173177
type SlotKind = SK;
174178

175179
async fn reserve_slot(&self, ctx: &dyn SlotReservationContext) -> SlotSupplierPermit {
180+
let (tx, rx) = oneshot::channel();
176181
let ctx = Self::convert_reserve_ctx(ctx);
182+
let tx = Box::into_raw(Box::new(tx)) as *mut libc::c_void;
177183
unsafe {
178-
((*self.inner.0).reserve)(ctx);
184+
((*self.inner.0).reserve)(ctx, tx);
179185
}
180-
unimplemented!()
186+
rx.await.expect("reserve channel is not closed")
181187
}
182188

183189
fn try_reserve_slot(&self, ctx: &dyn SlotReservationContext) -> Option<SlotSupplierPermit> {
@@ -727,6 +733,20 @@ pub extern "C" fn worker_replay_push(
727733
}
728734
}
729735

736+
#[no_mangle]
737+
pub extern "C" fn complete_async_reserve(sender: *mut libc::c_void, permit: *const libc::c_void) {
738+
if !sender.is_null() && !permit.is_null() {
739+
unsafe {
740+
let sender = Box::from_raw(sender as *mut Sender<SlotSupplierPermit>);
741+
let permit =
742+
SlotSupplierPermit::with_user_data(UserDataHandle(permit as *mut libc::c_void));
743+
let _ = sender.send(permit);
744+
}
745+
} else {
746+
panic!("ReserveSlot sender & permit must not be null!");
747+
}
748+
}
749+
730750
impl TryFrom<&WorkerOptions> for temporal_sdk_core::WorkerConfig {
731751
type Error = anyhow::Error;
732752

0 commit comments

Comments
 (0)