11using System ;
22using System . Collections . Generic ;
3+ using System . Runtime . CompilerServices ;
34using System . Runtime . InteropServices ;
45using System . Threading ;
56using System . Threading . Tasks ;
67using Microsoft . Extensions . Logging ;
7- using Temporalio . Bridge . Interop ;
88
99namespace Temporalio . Bridge
1010{
@@ -37,36 +37,19 @@ internal unsafe CustomSlotSupplier(
3737 try_reserve = FunctionPointer < Interop . CustomTryReserveSlotCallback > ( TryReserve ) ,
3838 mark_used = FunctionPointer < Interop . CustomMarkSlotUsedCallback > ( MarkUsed ) ,
3939 release = FunctionPointer < Interop . CustomReleaseSlotCallback > ( Release ) ,
40+ free = FunctionPointer < Interop . CustomSlotImplFreeCallback > ( Free ) ,
4041 } ;
4142
4243 PinCallbackHolder ( interopCallbacks ) ;
4344 }
4445
45- private static void SetCancelTokenOnCtx ( ref SlotReserveCtx ctx , CancellationTokenSource cancelTokenSrc )
46+ private unsafe void Reserve ( Interop . SlotReserveCtx * ctx , void * sender )
4647 {
47- unsafe
48- {
49- try
50- {
51- var handle = GCHandle . Alloc ( cancelTokenSrc ) ;
52- fixed ( Interop . SlotReserveCtx * p = & ctx )
53- {
54- Interop . Methods . set_reserve_cancel_target ( p , GCHandle . ToIntPtr ( handle ) . ToPointer ( ) ) ;
55- }
56- }
57- catch ( Exception e )
58- {
59- Console . WriteLine ( $ "Error setting cancel token on ctx: { e } ") ;
60- throw ;
61- }
62- }
63- }
64-
65- private unsafe void Reserve ( Interop . SlotReserveCtx ctx , void * sender )
66- {
67- SafeReserve ( ctx , new IntPtr ( sender ) ) ;
48+ SafeReserve ( new IntPtr ( ctx ) , new IntPtr ( sender ) ) ;
6849 }
6950
51+ // Note that this is always called by Rust, either because the call is cancelled or because
52+ // it completed. Therefore the GCHandle is always freed.
7053 private unsafe void CancelReserve ( void * tokenSrc )
7154 {
7255 var handle = GCHandle . FromIntPtr ( new IntPtr ( tokenSrc ) ) ;
@@ -75,44 +58,56 @@ private unsafe void CancelReserve(void* tokenSrc)
7558 handle . Free ( ) ;
7659 }
7760
78- private void SafeReserve ( Interop . SlotReserveCtx ctx , IntPtr sender )
61+ private void SafeReserve ( IntPtr ctx , IntPtr sender )
7962 {
80- var reserveTask = Task . Run ( async ( ) =>
63+ _ = Task . Run ( async ( ) =>
8164 {
82- var cancelTokenSrc = new System . Threading . CancellationTokenSource ( ) ;
83- SetCancelTokenOnCtx ( ref ctx , cancelTokenSrc ) ;
84- while ( true )
65+ using ( var cancelTokenSrc = new System . Threading . CancellationTokenSource ( ) )
8566 {
86- try
67+ unsafe
8768 {
88- var permit = await userSupplier . ReserveSlotAsync (
89- new ( ctx ) , cancelTokenSrc . Token ) . ConfigureAwait ( false ) ;
90- var usedPermitId = AddPermitToMap ( permit ) ;
91- unsafe
92- {
93- Interop . Methods . complete_async_reserve ( sender . ToPointer ( ) , new ( usedPermitId ) ) ;
94- }
95- cancelTokenSrc . Dispose ( ) ;
96- return ;
69+ var srcHandle = GCHandle . Alloc ( cancelTokenSrc ) ;
70+ Interop . Methods . set_reserve_cancel_target (
71+ ( Interop . SlotReserveCtx * ) ctx . ToPointer ( ) ,
72+ GCHandle . ToIntPtr ( srcHandle ) . ToPointer ( ) ) ;
9773 }
98- catch ( OperationCanceledException )
74+ while ( true )
9975 {
100- cancelTokenSrc . Dispose ( ) ;
101- return ;
102- }
76+ try
77+ {
78+ ConfiguredTaskAwaitable < Temporalio . Worker . Tuning . ISlotPermit > reserveTask ;
79+ unsafe
80+ {
81+ reserveTask = userSupplier . ReserveSlotAsync (
82+ new ( ( Interop . SlotReserveCtx * ) ctx . ToPointer ( ) ) ,
83+ cancelTokenSrc . Token ) . ConfigureAwait ( false ) ;
84+ }
85+ var permit = await reserveTask ;
86+ unsafe
87+ {
88+ var usedPermitId = AddPermitToMap ( permit ) ;
89+ Interop . Methods . complete_async_reserve ( sender . ToPointer ( ) , new ( usedPermitId ) ) ;
90+ }
91+ return ;
92+ }
93+ catch ( OperationCanceledException )
94+ {
95+ return ;
96+ }
10397#pragma warning disable CA1031 // We are ok catching all exceptions here
104- catch ( Exception e )
105- {
98+ catch ( Exception e )
99+ {
106100#pragma warning restore CA1031
107- logger . LogError ( e , "Error reserving slot" ) ;
101+ logger . LogError ( e , "Error reserving slot" ) ;
102+ }
103+ // Wait for a bit to avoid spamming errors
104+ await Task . Delay ( 1000 , cancelTokenSrc . Token ) . ConfigureAwait ( false ) ;
108105 }
109- // Wait for a bit to avoid spamming errors
110- await Task . Delay ( 1000 , cancelTokenSrc . Token ) . ConfigureAwait ( false ) ;
111106 }
112107 } ) ;
113108 }
114109
115- private unsafe UIntPtr TryReserve ( Interop . SlotReserveCtx ctx )
110+ private unsafe UIntPtr TryReserve ( Interop . SlotReserveCtx * ctx )
116111 {
117112 Temporalio . Worker . Tuning . ISlotPermit ? maybePermit ;
118113 try
@@ -135,11 +130,16 @@ private unsafe UIntPtr TryReserve(Interop.SlotReserveCtx ctx)
135130 return new ( usedPermitId ) ;
136131 }
137132
138- private void MarkUsed ( Interop . SlotMarkUsedCtx ctx )
133+ private unsafe void MarkUsed ( Interop . SlotMarkUsedCtx * ctx )
139134 {
140135 try
141136 {
142- userSupplier . MarkSlotUsed ( new ( ctx , permits [ ctx . slot_permit . ToUInt32 ( ) ] ) ) ;
137+ Temporalio . Worker . Tuning . ISlotPermit permit ;
138+ lock ( permits )
139+ {
140+ permit = permits [ ( * ctx ) . slot_permit . ToUInt32 ( ) ] ;
141+ }
142+ userSupplier . MarkSlotUsed ( new ( ctx , permit ) ) ;
143143 }
144144#pragma warning disable CA1031 // We are ok catching all exceptions here
145145 catch ( Exception e )
@@ -149,20 +149,31 @@ private void MarkUsed(Interop.SlotMarkUsedCtx ctx)
149149 }
150150 }
151151
152- private void Release ( Interop . SlotReleaseCtx ctx )
152+ private unsafe void Release ( Interop . SlotReleaseCtx * ctx )
153153 {
154- var permitId = ctx . slot_permit . ToUInt32 ( ) ;
154+ var permitId = ( * ctx ) . slot_permit . ToUInt32 ( ) ;
155+ Temporalio . Worker . Tuning . ISlotPermit permit ;
156+ lock ( permits )
157+ {
158+ permit = permits [ permitId ] ;
159+ }
155160 try
156161 {
157- userSupplier . ReleaseSlot ( new ( ctx , permits [ permitId ] ) ) ;
162+ userSupplier . ReleaseSlot ( new ( ctx , permit ) ) ;
158163 }
159164#pragma warning disable CA1031 // We are ok catching all exceptions here
160165 catch ( Exception e )
161166 {
162167#pragma warning restore CA1031
163168 logger . LogError ( e , "Error releasing slot" ) ;
164169 }
165- permits . Remove ( permitId ) ;
170+ finally
171+ {
172+ lock ( permits )
173+ {
174+ permits . Remove ( permitId ) ;
175+ }
176+ }
166177 }
167178
168179 private uint AddPermitToMap ( Temporalio . Worker . Tuning . ISlotPermit permit )
0 commit comments