Skip to content

Commit 04eeddd

Browse files
committed
PR suggestions
1 parent 99c6815 commit 04eeddd

File tree

1 file changed

+49
-47
lines changed

1 file changed

+49
-47
lines changed

src/Temporalio/Bridge/CustomSlotSupplier.cs

Lines changed: 49 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
using System.Threading;
55
using System.Threading.Tasks;
66
using Microsoft.Extensions.Logging;
7+
using Temporalio.Worker.Tuning;
78

89
namespace Temporalio.Bridge
910
{
@@ -15,7 +16,7 @@ internal class CustomSlotSupplier : NativeInvokeableClass<Interop.TemporalCoreCu
1516
private readonly ILogger logger;
1617
private readonly Temporalio.Worker.Tuning.CustomSlotSupplier userSupplier;
1718
private readonly ConcurrentDictionary<IntPtr, CancellationTokenSource> reservationCancelSources = new();
18-
private readonly ConcurrentDictionary<IntPtr, Temporalio.Worker.Tuning.SlotPermit> permits = new();
19+
private readonly ConcurrentDictionary<IntPtr, SlotPermit> permits = new();
1920

2021
/// <summary>
2122
/// Initializes a new instance of the <see cref="CustomSlotSupplier" /> class.
@@ -44,32 +45,32 @@ internal unsafe CustomSlotSupplier(
4445
PinCallbackHolder(interopCallbacks);
4546
}
4647

47-
private static Temporalio.Worker.Tuning.SlotInfo SlotInfoFromBridge(Interop.TemporalCoreSlotInfo slotInfo)
48+
private static SlotInfo SlotInfoFromBridge(Interop.TemporalCoreSlotInfo slotInfo)
4849
{
4950
return slotInfo.tag switch
5051
{
5152
Interop.TemporalCoreSlotInfo_Tag.WorkflowSlotInfo =>
52-
new Temporalio.Worker.Tuning.SlotInfo.WorkflowSlotInfo(
53+
new SlotInfo.WorkflowSlotInfo(
5354
ByteArrayRef.ToUtf8(slotInfo.workflow_slot_info.workflow_type), slotInfo.workflow_slot_info.is_sticky != 0),
5455
Interop.TemporalCoreSlotInfo_Tag.ActivitySlotInfo =>
55-
new Temporalio.Worker.Tuning.SlotInfo.ActivitySlotInfo(
56+
new SlotInfo.ActivitySlotInfo(
5657
ByteArrayRef.ToUtf8(slotInfo.activity_slot_info.activity_type)),
5758
Interop.TemporalCoreSlotInfo_Tag.LocalActivitySlotInfo =>
58-
new Temporalio.Worker.Tuning.SlotInfo.LocalActivitySlotInfo(
59+
new SlotInfo.LocalActivitySlotInfo(
5960
ByteArrayRef.ToUtf8(slotInfo.local_activity_slot_info.activity_type)),
60-
_ => throw new System.ArgumentOutOfRangeException(nameof(slotInfo)),
61+
_ => throw new ArgumentOutOfRangeException(nameof(slotInfo)),
6162
};
6263
}
6364

64-
private static unsafe Temporalio.Worker.Tuning.SlotReserveContext ReserveCtxFromBridge(Interop.TemporalCoreSlotReserveCtx* ctx)
65+
private static unsafe SlotReserveContext ReserveCtxFromBridge(Interop.TemporalCoreSlotReserveCtx* ctx)
6566
{
6667
return new(
6768
SlotType: (*ctx).slot_type switch
6869
{
69-
Interop.TemporalCoreSlotKindType.WorkflowSlotKindType => Temporalio.Worker.Tuning.SlotType.Workflow,
70-
Interop.TemporalCoreSlotKindType.ActivitySlotKindType => Temporalio.Worker.Tuning.SlotType.Activity,
71-
Interop.TemporalCoreSlotKindType.LocalActivitySlotKindType => Temporalio.Worker.Tuning.SlotType.LocalActivity,
72-
_ => throw new System.ArgumentOutOfRangeException(nameof(ctx)),
70+
Interop.TemporalCoreSlotKindType.WorkflowSlotKindType => SlotType.Workflow,
71+
Interop.TemporalCoreSlotKindType.ActivitySlotKindType => SlotType.Activity,
72+
Interop.TemporalCoreSlotKindType.LocalActivitySlotKindType => SlotType.LocalActivity,
73+
_ => throw new ArgumentOutOfRangeException(nameof(ctx)),
7374
},
7475
TaskQueue: ByteArrayRef.ToUtf8((*ctx).task_queue),
7576
WorkerIdentity: ByteArrayRef.ToUtf8((*ctx).worker_identity),
@@ -90,7 +91,7 @@ private unsafe void CancelReserve(Interop.TemporalCoreSlotReserveCompletionCtx*
9091
}
9192
}
9293

93-
private void SafeReserve(Temporalio.Worker.Tuning.SlotReserveContext ctx, IntPtr completionCtx)
94+
private void SafeReserve(SlotReserveContext ctx, IntPtr completionCtx)
9495
{
9596
var cancelTokenSrc = new CancellationTokenSource();
9697
if (!reservationCancelSources.TryAdd(completionCtx, cancelTokenSrc))
@@ -104,61 +105,62 @@ private void SafeReserve(Temporalio.Worker.Tuning.SlotReserveContext ctx, IntPtr
104105
{
105106
try
106107
{
108+
SlotPermit permit;
107109
while (true)
108110
{
109111
try
110112
{
111113
cancelTokenSrc.Token.ThrowIfCancellationRequested();
112-
var permit = await userSupplier.ReserveSlotAsync(ctx, cancelTokenSrc.Token)
114+
permit = await userSupplier.ReserveSlotAsync(ctx, cancelTokenSrc.Token)
113115
.ConfigureAwait(false);
114-
var permitId = StorePermit(permit);
115-
116-
byte result;
117-
unsafe
118-
{
119-
result = Interop.Methods.temporal_core_complete_async_reserve(
120-
(Interop.TemporalCoreSlotReserveCompletionCtx*)completionCtx.ToPointer(),
121-
new(permitId.ToPointer()));
122-
}
123-
124-
if (result == 0)
125-
{
126-
// We need to undo the reservation
127-
CompleteCancelReserve(completionCtx);
128-
Release(null, permitId);
129-
}
130-
131116
break;
132117
}
133118
catch (OperationCanceledException) when (cancelTokenSrc.Token.IsCancellationRequested)
134119
{
135120
CompleteCancelReserve(completionCtx);
136-
break;
121+
return;
137122
}
138123
#pragma warning disable CA1031 // We are ok catching all exceptions here
139124
catch (Exception e)
140125
{
141126
#pragma warning restore CA1031
142127
logger.LogError(e, "Error reserving slot");
128+
// Wait for a bit to avoid spamming errors
129+
try
130+
{
131+
await Task.Delay(1000, cancelTokenSrc.Token).ConfigureAwait(false);
132+
}
133+
catch (OperationCanceledException)
134+
{
135+
CompleteCancelReserve(completionCtx);
136+
return;
137+
}
143138
}
139+
}
144140

145-
// Wait for a bit to avoid spamming errors
146-
try
147-
{
148-
await Task.Delay(1000, cancelTokenSrc.Token).ConfigureAwait(false);
149-
}
150-
catch (OperationCanceledException)
151-
{
152-
CompleteCancelReserve(completionCtx);
153-
break;
154-
}
141+
// There should be no exceptions possible from here onward
142+
var permitId = StorePermit(permit);
143+
144+
byte result;
145+
unsafe
146+
{
147+
result = Interop.Methods.temporal_core_complete_async_reserve(
148+
(Interop.TemporalCoreSlotReserveCompletionCtx*)completionCtx.ToPointer(),
149+
new(permitId.ToPointer()));
150+
}
151+
152+
if (result == 0)
153+
{
154+
CompleteCancelReserve(completionCtx);
155+
// We need to undo the reservation
156+
Release(null, permitId);
155157
}
156158
}
157159
#pragma warning disable CA1031 // The task is detached, logging here is the only way to observe exceptions.
158160
catch (Exception e)
159161
{
160162
#pragma warning restore CA1031
161-
logger.LogError(e, "Exception escaped reserve slot retry loop");
163+
logger.LogError(e, "Exception happened outside of retry loop when reserving slot");
162164
}
163165
finally
164166
{
@@ -216,7 +218,7 @@ private unsafe void Release(Interop.TemporalCoreSlotReleaseCtx* ctx, void* userD
216218
Release(slotInfo, permitId);
217219
}
218220

219-
private void Release(Temporalio.Worker.Tuning.SlotInfo? slotInfo, IntPtr permitId)
221+
private void Release(SlotInfo? slotInfo, IntPtr permitId)
220222
{
221223
try
222224
{
@@ -226,7 +228,7 @@ private void Release(Temporalio.Worker.Tuning.SlotInfo? slotInfo, IntPtr permitI
226228
return;
227229
}
228230

229-
GCHandle.FromIntPtr(permitId).Free();
231+
Marshal.FreeHGlobal(permitId);
230232
userSupplier.ReleaseSlot(new(slotInfo, permit));
231233
}
232234
#pragma warning disable CA1031 // We are ok catching all exceptions here
@@ -239,9 +241,9 @@ private void Release(Temporalio.Worker.Tuning.SlotInfo? slotInfo, IntPtr permitI
239241

240242
private IntPtr StorePermit(Temporalio.Worker.Tuning.SlotPermit permit)
241243
{
242-
// We use an address of a newly created pinned object as a unique permit ID.
243-
// We cannot pin the permit itself because the type of permit may be unpinnable.
244-
var permitId = GCHandle.ToIntPtr(GCHandle.Alloc(new byte[1], GCHandleType.Pinned));
244+
// We use an address of a fresh allocation as a unique permit ID.
245+
// We cannot use the address of the permit itself because the type of permit may be unpinnable.
246+
var permitId = Marshal.AllocHGlobal(1);
245247
permits[permitId] = permit;
246248
return permitId;
247249
}

0 commit comments

Comments
 (0)