Skip to content

Commit 0a9de76

Browse files
committed
Callbacks mostly working
Silently hanging for some reason
1 parent e1d94b8 commit 0a9de76

File tree

7 files changed

+98
-37
lines changed

7 files changed

+98
-37
lines changed

.gitignore

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,6 @@ obj/
77
/tests/golangworker/golangworker
88
/.vs
99
/.vscode
10-
/.idea
10+
/.idea
11+
/.zed
12+
Temporalio.sln.DotSettings.user

src/Temporalio/Bridge/CustomSlotSupplier.cs

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
using System;
2+
using System.Runtime.InteropServices;
13
using System.Threading.Tasks;
24

35
namespace Temporalio.Bridge
@@ -28,15 +30,33 @@ internal unsafe CustomSlotSupplier(Temporalio.Worker.Tuning.ICustomSlotSupplier
2830
PinCallbackHolder(interopCallbacks);
2931
}
3032

31-
private void Reserve(Interop.SlotReserveCtx ctx)
33+
private unsafe void Reserve(Interop.SlotReserveCtx ctx, void* sender)
3234
{
33-
// TODO: Need to call callback with result that will put it in a channel to await in Rust
34-
var reserveTask = Task.Run(() => userSupplier.ReserveSlotAsync(new(ctx)));
35+
SafeReserve(ctx, new IntPtr(sender));
3536
}
3637

37-
private void TryReserve(Interop.SlotReserveCtx ctx)
38+
private void SafeReserve(Interop.SlotReserveCtx ctx, IntPtr sender)
3839
{
39-
userSupplier.TryReserveSlot(new(ctx));
40+
Console.WriteLine("Reserve called");
41+
var reserveTask = Task.Run(async () =>
42+
{
43+
var permit = await userSupplier.ReserveSlotAsync(new(ctx)).ConfigureAwait(false);
44+
Console.WriteLine("Reserve done user");
45+
unsafe
46+
{
47+
Console.WriteLine("Calling async reserve??");
48+
var handle = GCHandle.Alloc(permit, GCHandleType.Pinned);
49+
Interop.Methods.complete_async_reserve(sender.ToPointer(), handle.AddrOfPinnedObject().ToPointer());
50+
Console.WriteLine("Called async reserve??");
51+
}
52+
});
53+
}
54+
55+
private unsafe void* TryReserve(Interop.SlotReserveCtx ctx)
56+
{
57+
var returned = userSupplier.TryReserveSlot(new(ctx));
58+
var handle = GCHandle.Alloc(returned, GCHandleType.Pinned);
59+
return handle.AddrOfPinnedObject().ToPointer();
4060
}
4161

4262
private void MarkUsed(Interop.SlotMarkUsedCtx ctx)

src/Temporalio/Bridge/Interop/Interop.cs

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -577,10 +577,11 @@ internal partial struct SlotReserveCtx
577577
}
578578

579579
[UnmanagedFunctionPointer(CallingConvention.Cdecl)]
580-
internal delegate void CustomReserveSlotCallback([NativeTypeName("struct SlotReserveCtx")] SlotReserveCtx ctx);
580+
internal unsafe delegate void CustomReserveSlotCallback([NativeTypeName("struct SlotReserveCtx")] SlotReserveCtx ctx, void* sender);
581581

582582
[UnmanagedFunctionPointer(CallingConvention.Cdecl)]
583-
internal delegate void CustomTryReserveSlotCallback([NativeTypeName("struct SlotReserveCtx")] SlotReserveCtx ctx);
583+
[return: NativeTypeName("const void *")]
584+
internal unsafe delegate void* CustomTryReserveSlotCallback([NativeTypeName("struct SlotReserveCtx")] SlotReserveCtx ctx);
584585

585586
internal enum SlotInfo_Tag
586587
{
@@ -614,7 +615,7 @@ internal unsafe partial struct SlotInfo
614615
{
615616
public SlotInfo_Tag tag;
616617

617-
[NativeTypeName("__AnonymousRecord_temporal-sdk-bridge_L419_C3")]
618+
[NativeTypeName("__AnonymousRecord_temporal-sdk-bridge_L422_C3")]
618619
public _Anonymous_e__Union Anonymous;
619620

620621
internal ref WorkflowSlotInfo_Body workflow_slot_info
@@ -720,7 +721,7 @@ internal unsafe partial struct SlotSupplier
720721
{
721722
public SlotSupplier_Tag tag;
722723

723-
[NativeTypeName("__AnonymousRecord_temporal-sdk-bridge_L465_C3")]
724+
[NativeTypeName("__AnonymousRecord_temporal-sdk-bridge_L468_C3")]
724725
public _Anonymous_e__Union Anonymous;
725726

726727
internal ref FixedSizeSlotSupplier fixed_size
@@ -760,15 +761,15 @@ internal ref CustomSlotSupplierCallbacksImpl custom
760761
internal unsafe partial struct _Anonymous_e__Union
761762
{
762763
[FieldOffset(0)]
763-
[NativeTypeName("__AnonymousRecord_temporal-sdk-bridge_L466_C5")]
764+
[NativeTypeName("__AnonymousRecord_temporal-sdk-bridge_L469_C5")]
764765
public _Anonymous1_e__Struct Anonymous1;
765766

766767
[FieldOffset(0)]
767-
[NativeTypeName("__AnonymousRecord_temporal-sdk-bridge_L469_C5")]
768+
[NativeTypeName("__AnonymousRecord_temporal-sdk-bridge_L472_C5")]
768769
public _Anonymous2_e__Struct Anonymous2;
769770

770771
[FieldOffset(0)]
771-
[NativeTypeName("__AnonymousRecord_temporal-sdk-bridge_L472_C5")]
772+
[NativeTypeName("__AnonymousRecord_temporal-sdk-bridge_L475_C5")]
772773
public _Anonymous3_e__Struct Anonymous3;
773774

774775
internal partial struct _Anonymous1_e__Struct
@@ -1057,5 +1058,8 @@ internal static unsafe partial class Methods
10571058
[DllImport("temporal_sdk_bridge", CallingConvention = CallingConvention.Cdecl, ExactSpelling = true)]
10581059
[return: NativeTypeName("struct WorkerReplayPushResult")]
10591060
public static extern WorkerReplayPushResult worker_replay_push([NativeTypeName("struct Worker *")] Worker* worker, [NativeTypeName("struct WorkerReplayPusher *")] WorkerReplayPusher* worker_replay_pusher, [NativeTypeName("struct ByteArrayRef")] ByteArrayRef workflow_id, [NativeTypeName("struct ByteArrayRef")] ByteArrayRef history);
1061+
1062+
[DllImport("temporal_sdk_bridge", CallingConvention = CallingConvention.Cdecl, ExactSpelling = true)]
1063+
public static extern void complete_async_reserve(void* sender, [NativeTypeName("const void *")] void* permit);
10601064
}
10611065
}

src/Temporalio/Bridge/include/temporal-sdk-bridge.h

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -391,9 +391,12 @@ typedef struct SlotReserveCtx {
391391
bool is_sticky;
392392
} SlotReserveCtx;
393393

394-
typedef void (*CustomReserveSlotCallback)(struct SlotReserveCtx ctx);
394+
typedef void (*CustomReserveSlotCallback)(struct SlotReserveCtx ctx, void *sender);
395395

396-
typedef void (*CustomTryReserveSlotCallback)(struct SlotReserveCtx ctx);
396+
/**
397+
* Must return pointer to a C# object inheriting from SlotPermit
398+
*/
399+
typedef const void *(*CustomTryReserveSlotCallback)(struct SlotReserveCtx ctx);
397400

398401
typedef enum SlotInfo_Tag {
399402
WorkflowSlotInfo,
@@ -689,6 +692,8 @@ struct WorkerReplayPushResult worker_replay_push(struct Worker *worker,
689692
struct ByteArrayRef workflow_id,
690693
struct ByteArrayRef history);
691694

695+
void complete_async_reserve(void *sender, const void *permit);
696+
692697
#ifdef __cplusplus
693698
} // extern "C"
694699
#endif // __cplusplus

src/Temporalio/Bridge/src/worker.rs

Lines changed: 37 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use temporal_sdk_core_protos::coresdk::ActivityHeartbeat;
2424
use temporal_sdk_core_protos::coresdk::ActivityTaskCompletion;
2525
use temporal_sdk_core_protos::temporal::api::history::v1::History;
2626
use tokio::sync::mpsc::{channel, Sender};
27+
use tokio::sync::oneshot;
2728
use tokio_stream::wrappers::ReceiverStream;
2829

2930
use std::collections::HashMap;
@@ -84,8 +85,20 @@ pub struct ResourceBasedSlotSupplier {
8485
tuner_options: ResourceBasedTunerOptions,
8586
}
8687

87-
type CustomReserveSlotCallback = unsafe extern "C" fn(ctx: SlotReserveCtx);
88-
type CustomTryReserveSlotCallback = unsafe extern "C" fn(ctx: SlotReserveCtx);
88+
#[repr(C)]
89+
pub struct CustomSlotSupplier<SK> {
90+
inner: CustomSlotSupplierCallbacksImpl,
91+
_pd: std::marker::PhantomData<SK>,
92+
}
93+
94+
unsafe impl<SK> Send for CustomSlotSupplier<SK> {}
95+
unsafe impl<SK> Sync for CustomSlotSupplier<SK> {}
96+
97+
type CustomReserveSlotCallback =
98+
unsafe extern "C" fn(ctx: SlotReserveCtx, sender: *mut libc::c_void);
99+
/// Must return pointer to a C# object inheriting from SlotPermit
100+
type CustomTryReserveSlotCallback =
101+
unsafe extern "C" fn(ctx: SlotReserveCtx) -> *const libc::c_void;
89102
type CustomMarkSlotUsedCallback = unsafe extern "C" fn(ctx: SlotMarkUsedCtx);
90103
type CustomReleaseSlotCallback = unsafe extern "C" fn(ctx: SlotReleaseCtx);
91104

@@ -113,15 +126,6 @@ impl CustomSlotSupplierCallbacksImpl {
113126
}
114127
}
115128

116-
#[repr(C)]
117-
pub struct CustomSlotSupplier<SK> {
118-
inner: CustomSlotSupplierCallbacksImpl,
119-
_pd: std::marker::PhantomData<SK>,
120-
}
121-
122-
unsafe impl<SK> Send for CustomSlotSupplier<SK> {}
123-
unsafe impl<SK> Sync for CustomSlotSupplier<SK> {}
124-
125129
#[repr(C)]
126130
pub enum SlotKindType {
127131
WorkflowSlotKindType,
@@ -173,11 +177,13 @@ impl<SK: SlotKind + Send + Sync> temporal_sdk_core_api::worker::SlotSupplier
173177
type SlotKind = SK;
174178

175179
async fn reserve_slot(&self, ctx: &dyn SlotReservationContext) -> SlotSupplierPermit {
180+
let (tx, rx) = oneshot::channel();
176181
let ctx = Self::convert_reserve_ctx(ctx);
182+
let tx = Box::into_raw(Box::new(tx)) as *mut libc::c_void;
177183
unsafe {
178-
((*self.inner.0).reserve)(ctx);
184+
((*self.inner.0).reserve)(ctx, tx);
179185
}
180-
unimplemented!()
186+
rx.await.expect("reserve channel is not closed")
181187
}
182188

183189
fn try_reserve_slot(&self, ctx: &dyn SlotReservationContext) -> Option<SlotSupplierPermit> {
@@ -727,6 +733,24 @@ pub extern "C" fn worker_replay_push(
727733
}
728734
}
729735

736+
#[no_mangle]
737+
pub extern "C" fn complete_async_reserve(sender: *mut libc::c_void, permit: *const libc::c_void) {
738+
dbg!("Trying to complete async reserve");
739+
if !sender.is_null() {
740+
dbg!("Completing async reserve");
741+
unsafe {
742+
let sender = Box::from_raw(sender as *mut Sender<SlotSupplierPermit>);
743+
let permit =
744+
SlotSupplierPermit::with_user_data(UserDataHandle(permit as *mut libc::c_void));
745+
dbg!("sending");
746+
let _ = sender.send(permit);
747+
dbg!("Sent");
748+
}
749+
} else {
750+
panic!("ReserveSlot sender must not be null!");
751+
}
752+
}
753+
730754
impl TryFrom<&WorkerOptions> for temporal_sdk_core::WorkerConfig {
731755
type Error = anyhow::Error;
732756

src/Temporalio/Worker/Tuning/SlotPermit.cs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,6 @@ namespace Temporalio.Worker.Tuning
1313
/// </remarks>
1414
public class SlotPermit
1515
{
16-
private SlotPermit()
17-
{
18-
}
19-
2016
/// <summary>
2117
/// Reconstruct a permit from a pointer.
2218
/// </summary>

tests/Temporalio.Tests/Worker/WorkerTuningTests.cs

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -112,26 +112,36 @@ public async Task Cannot_Mix_MaxConcurrent_And_Tuner()
112112
Assert.Contains("Cannot set both Tuner and any of", argumentException.Message);
113113
}
114114

115+
private class MyPermit : SlotPermit
116+
{
117+
private readonly int dat;
118+
119+
public MyPermit(int v)
120+
{
121+
this.dat = v;
122+
}
123+
}
124+
115125
private class MySlotSupplier : ICustomSlotSupplier
116126
{
117-
public Task<SlotPermit> ReserveSlotAsync(SlotReserveContext ctx)
127+
public async Task<SlotPermit> ReserveSlotAsync(SlotReserveContext ctx)
118128
{
119-
throw new NotImplementedException();
129+
// Do something async to make sure that works
130+
await Task.Delay(10);
131+
return new MyPermit(1);
120132
}
121133

122134
public SlotPermit? TryReserveSlot(SlotReserveContext ctx)
123135
{
124-
throw new NotImplementedException();
136+
return new MyPermit(1);
125137
}
126138

127139
public void MarkSlotUsed(SlotMarkUsedContext ctx)
128140
{
129-
throw new NotImplementedException();
130141
}
131142

132143
public void ReleaseSlot(SlotReleaseContext ctx)
133144
{
134-
throw new NotImplementedException();
135145
}
136146
}
137147

0 commit comments

Comments
 (0)