Skip to content

Commit 46a8ecb

Browse files
committed
Expanded tests
Everything working except cancelling reserves
1 parent 89b5781 commit 46a8ecb

File tree

12 files changed

+257
-66
lines changed

12 files changed

+257
-66
lines changed

src/Temporalio/Bridge/CustomSlotSupplier.cs

Lines changed: 58 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
using System.Collections.Generic;
33
using System.Runtime.InteropServices;
44
using System.Threading.Tasks;
5+
using Microsoft.Extensions.Logging;
56

67
namespace Temporalio.Bridge
78
{
@@ -10,6 +11,7 @@ namespace Temporalio.Bridge
1011
/// </summary>
1112
internal class CustomSlotSupplier : NativeInvokeableClass<Interop.CustomSlotSupplierCallbacks>
1213
{
14+
private readonly ILogger logger;
1315
private readonly Temporalio.Worker.Tuning.ICustomSlotSupplier userSupplier;
1416
private readonly Dictionary<uint, GCHandle> permits = new();
1517
private uint permitId = 1;
@@ -18,8 +20,12 @@ internal class CustomSlotSupplier : NativeInvokeableClass<Interop.CustomSlotSupp
1820
/// Initializes a new instance of the <see cref="CustomSlotSupplier" /> class.
1921
/// </summary>
2022
/// <param name="userSupplier">User's slot supplier implementation'.</param>
21-
internal unsafe CustomSlotSupplier(Temporalio.Worker.Tuning.ICustomSlotSupplier userSupplier)
23+
/// <param name="loggerFactory">Logger factory.</param>
24+
internal unsafe CustomSlotSupplier(
25+
Temporalio.Worker.Tuning.ICustomSlotSupplier userSupplier,
26+
ILoggerFactory loggerFactory)
2227
{
28+
this.logger = loggerFactory.CreateLogger<CustomSlotSupplier>();
2329
this.userSupplier = userSupplier;
2430

2531
var interopCallbacks = new Interop.CustomSlotSupplierCallbacks
@@ -42,26 +48,45 @@ private void SafeReserve(Interop.SlotReserveCtx ctx, IntPtr sender)
4248
{
4349
var reserveTask = Task.Run(async () =>
4450
{
45-
try
51+
while (true)
4652
{
47-
var permit = await userSupplier.ReserveSlotAsync(new(ctx)).ConfigureAwait(false);
48-
var usedPermitId = AddPermitToMap(permit);
49-
unsafe
53+
try
5054
{
51-
Interop.Methods.complete_async_reserve(sender.ToPointer(), new(usedPermitId));
55+
var permit = await userSupplier.ReserveSlotAsync(new(ctx)).ConfigureAwait(false);
56+
var usedPermitId = AddPermitToMap(permit);
57+
unsafe
58+
{
59+
Interop.Methods.complete_async_reserve(sender.ToPointer(), new(usedPermitId));
60+
}
61+
return;
5262
}
53-
}
54-
catch (Exception e)
55-
{
56-
Console.WriteLine("Exception in reserve: " + e.Message);
57-
throw;
63+
#pragma warning disable CA1031 // We are ok catching all exceptions here
64+
catch (Exception e)
65+
{
66+
#pragma warning restore CA1031
67+
logger.LogError(e, "Error reserving slot");
68+
}
69+
// Wait for a bit to avoid spamming errors
70+
await Task.Delay(1000).ConfigureAwait(false);
5871
}
5972
});
6073
}
6174

6275
private unsafe UIntPtr TryReserve(Interop.SlotReserveCtx ctx)
6376
{
64-
var maybePermit = userSupplier.TryReserveSlot(new(ctx));
77+
Temporalio.Worker.Tuning.ISlotPermit? maybePermit;
78+
try
79+
{
80+
maybePermit = userSupplier.TryReserveSlot(new(ctx));
81+
}
82+
#pragma warning disable CA1031 // We are ok catching all exceptions here
83+
catch (Exception e)
84+
{
85+
#pragma warning restore CA1031
86+
logger.LogError(e, "Error trying to reserve slot");
87+
return UIntPtr.Zero;
88+
}
89+
6590
if (maybePermit == null)
6691
{
6792
return UIntPtr.Zero;
@@ -72,17 +97,35 @@ private unsafe UIntPtr TryReserve(Interop.SlotReserveCtx ctx)
7297

7398
private void MarkUsed(Interop.SlotMarkUsedCtx ctx)
7499
{
75-
userSupplier.MarkSlotUsed(new(ctx, permits[ctx.slot_permit.ToUInt32()]));
100+
try
101+
{
102+
userSupplier.MarkSlotUsed(new(ctx, permits[ctx.slot_permit.ToUInt32()]));
103+
}
104+
#pragma warning disable CA1031 // We are ok catching all exceptions here
105+
catch (Exception e)
106+
{
107+
#pragma warning restore CA1031
108+
logger.LogError(e, "Error marking slot used");
109+
}
76110
}
77111

78112
private void Release(Interop.SlotReleaseCtx ctx)
79113
{
80114
var permitId = ctx.slot_permit.ToUInt32();
81-
userSupplier.ReleaseSlot(new(ctx, permits[permitId]));
115+
try
116+
{
117+
userSupplier.ReleaseSlot(new(ctx, permits[permitId]));
118+
}
119+
#pragma warning disable CA1031 // We are ok catching all exceptions here
120+
catch (Exception e)
121+
{
122+
#pragma warning restore CA1031
123+
logger.LogError(e, "Error releasing slot");
124+
}
82125
permits.Remove(permitId);
83126
}
84127

85-
private uint AddPermitToMap(Temporalio.Worker.Tuning.SlotPermit permit)
128+
private uint AddPermitToMap(Temporalio.Worker.Tuning.ISlotPermit permit)
86129
{
87130
var handle = GCHandle.Alloc(permit);
88131
lock (permits)

src/Temporalio/Bridge/OptionsExtensions.cs

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
using System.Collections.Generic;
33
using System.Linq;
44
using System.Reflection;
5+
using Microsoft.Extensions.Logging;
56
using Temporalio.Bridge.Interop;
67
using Temporalio.Exceptions;
78

@@ -408,11 +409,13 @@ public static Interop.TestServerOptions ToInteropOptions(
408409
/// <param name="options">Options to convert.</param>
409410
/// <param name="scope">Scope to use.</param>
410411
/// <param name="namespace_">Namespace for the worker.</param>
412+
/// <param name="loggerFactory">Logger factory.</param>
411413
/// <returns>Converted options.</returns>
412414
public static Interop.WorkerOptions ToInteropOptions(
413415
this Temporalio.Worker.TemporalWorkerOptions options,
414416
Scope scope,
415-
string namespace_)
417+
string namespace_,
418+
ILoggerFactory loggerFactory)
416419
{
417420
if (options.TaskQueue == null)
418421
{
@@ -458,7 +461,7 @@ public static Interop.WorkerOptions ToInteropOptions(
458461
build_id = scope.ByteArray(buildId),
459462
identity_override = scope.ByteArray(options.Identity),
460463
max_cached_workflows = (uint)options.MaxCachedWorkflows,
461-
tuner = tuner.ToInteropTuner(scope),
464+
tuner = tuner.ToInteropTuner(scope, loggerFactory),
462465
no_remote_activities = (byte)(noRemoteActivities ? 1 : 0),
463466
sticky_queue_schedule_to_start_timeout_millis =
464467
(ulong)options.StickyQueueScheduleToStartTimeout.TotalMilliseconds,
@@ -504,7 +507,7 @@ public static Interop.WorkerOptions ToInteropOptions(
504507
build_id = scope.ByteArray(buildId),
505508
identity_override = scope.ByteArray(options.Identity),
506509
max_cached_workflows = 2,
507-
tuner = Temporalio.Worker.Tuning.WorkerTuner.CreateFixedSize(2, 1, 1).ToInteropTuner(scope),
510+
tuner = Temporalio.Worker.Tuning.WorkerTuner.CreateFixedSize(2, 1, 1).ToInteropTuner(scope, options.LoggerFactory),
508511
no_remote_activities = 1,
509512
sticky_queue_schedule_to_start_timeout_millis = 1000,
510513
max_heartbeat_throttle_interval_millis = 1000,
@@ -524,7 +527,8 @@ public static Interop.WorkerOptions ToInteropOptions(
524527

525528
private static Interop.TunerHolder ToInteropTuner(
526529
this Temporalio.Worker.Tuning.WorkerTuner tuner,
527-
Scope scope)
530+
Scope scope,
531+
ILoggerFactory loggerFactory)
528532
{
529533
Temporalio.Worker.Tuning.ResourceBasedTunerOptions? lastTunerOptions = null;
530534
Temporalio.Worker.Tuning.ISlotSupplier[] suppliers =
@@ -549,17 +553,18 @@ private static Interop.TunerHolder ToInteropTuner(
549553
return new()
550554
{
551555
workflow_slot_supplier =
552-
tuner.WorkflowTaskSlotSupplier.ToInteropSlotSupplier(true),
556+
tuner.WorkflowTaskSlotSupplier.ToInteropSlotSupplier(true, loggerFactory),
553557
activity_slot_supplier =
554-
tuner.ActivityTaskSlotSupplier.ToInteropSlotSupplier(false),
558+
tuner.ActivityTaskSlotSupplier.ToInteropSlotSupplier(false, loggerFactory),
555559
local_activity_slot_supplier =
556-
tuner.LocalActivitySlotSupplier.ToInteropSlotSupplier(false),
560+
tuner.LocalActivitySlotSupplier.ToInteropSlotSupplier(false, loggerFactory),
557561
};
558562
}
559563

560564
private static Interop.SlotSupplier ToInteropSlotSupplier(
561565
this Temporalio.Worker.Tuning.ISlotSupplier supplier,
562-
bool isWorkflow)
566+
bool isWorkflow,
567+
ILoggerFactory loggerFactory)
563568
{
564569
if (supplier is Temporalio.Worker.Tuning.FixedSizeSlotSupplier fixedSize)
565570
{
@@ -604,7 +609,7 @@ private static Interop.SlotSupplier ToInteropSlotSupplier(
604609
}
605610
else if (supplier is Temporalio.Worker.Tuning.ICustomSlotSupplier custom)
606611
{
607-
var wrapped = new CustomSlotSupplier(custom);
612+
var wrapped = new CustomSlotSupplier(custom, loggerFactory);
608613
unsafe
609614
{
610615
return new()

src/Temporalio/Bridge/Worker.cs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
using System.Runtime.InteropServices;
33
using System.Threading.Tasks;
44
using Google.Protobuf;
5+
using Microsoft.Extensions.Logging;
56

67
namespace Temporalio.Bridge
78
{
@@ -16,11 +17,16 @@ internal class Worker : SafeHandle
1617
/// <param name="client">Client for the worker.</param>
1718
/// <param name="namespace_">Namespace for the worker.</param>
1819
/// <param name="options">Options for the worker.</param>
20+
/// <param name="loggerFactory">Logger factory, used instead of the one in options by
21+
/// anything in the bridge that needs it, since it's guaranteed to be set.</param>
1922
/// <exception cref="Exception">
2023
/// If any of the options are invalid including improperly defined workflows/activities.
2124
/// </exception>
2225
public Worker(
23-
Client client, string namespace_, Temporalio.Worker.TemporalWorkerOptions options)
26+
Client client,
27+
string namespace_,
28+
Temporalio.Worker.TemporalWorkerOptions options,
29+
ILoggerFactory loggerFactory)
2430
: base(IntPtr.Zero, true)
2531
{
2632
Runtime = client.Runtime;
@@ -29,7 +35,8 @@ public Worker(
2935
unsafe
3036
{
3137
var workerOrFail = Interop.Methods.worker_new(
32-
client.Ptr, scope.Pointer(options.ToInteropOptions(scope, namespace_)));
38+
client.Ptr,
39+
scope.Pointer(options.ToInteropOptions(scope, namespace_, loggerFactory)));
3340
if (workerOrFail.fail != null)
3441
{
3542
string failStr;
@@ -375,4 +382,4 @@ protected override unsafe bool ReleaseHandle()
375382
return true;
376383
}
377384
}
378-
}
385+
}

src/Temporalio/Worker/TemporalWorker.cs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ public class TemporalWorker : IDisposable
3434
public TemporalWorker(IWorkerClient client, TemporalWorkerOptions options)
3535
{
3636
this.client = client;
37+
var loggerFactory = options.LoggerFactory ?? client.Options.LoggerFactory;
3738
// Clone the options to discourage mutation (but we aren't completely disabling mutation
3839
// on the Options field herein).
3940
Options = (TemporalWorkerOptions)options.Clone();
@@ -42,7 +43,8 @@ public TemporalWorker(IWorkerClient client, TemporalWorkerOptions options)
4243
BridgeWorker = new(
4344
(Bridge.Client)bridgeClient,
4445
client.Options.Namespace,
45-
options);
46+
options,
47+
loggerFactory);
4648
if (options.Activities.Count == 0 && options.Workflows.Count == 0)
4749
{
4850
throw new ArgumentException("Must have at least one workflow and/or activity");
@@ -81,7 +83,7 @@ public TemporalWorker(IWorkerClient client, TemporalWorkerOptions options)
8183
Workflows: options.Workflows,
8284
DataConverter: client.Options.DataConverter,
8385
Interceptors: Interceptors,
84-
LoggerFactory: options.LoggerFactory ?? client.Options.LoggerFactory,
86+
LoggerFactory: loggerFactory,
8587
WorkflowInstanceFactory: options.WorkflowInstanceFactory,
8688
DebugMode: options.DebugMode,
8789
DisableWorkflowTracingEventListener: options.DisableWorkflowTracingEventListener,
@@ -400,4 +402,4 @@ private async Task ExecuteInternalAsync(
400402
}
401403
}
402404
}
403-
}
405+
}

src/Temporalio/Worker/Tuning/ICustomSlotSupplier.cs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,27 +18,36 @@ public interface ICustomSlotSupplier : ISlotSupplier
1818
/// invocations of this method may be cancelled. Any other exceptions thrown will be logged
1919
/// and ignored.
2020
/// </summary>
21+
/// <remarks>
22+
/// This method will be called concurrently from multiple threads, so it must be thread-safe.
23+
/// </remarks>
2124
/// <param name="ctx">The context for slot reservation.</param>
2225
/// <returns>A permit to use the slot which may be populated with your own data.</returns>
2326
/// <exception cref="OperationCanceledException">Cancellation requested.</exception>
24-
public Task<SlotPermit> ReserveSlotAsync(SlotReserveContext ctx);
27+
public Task<ISlotPermit> ReserveSlotAsync(SlotReserveContext ctx);
2528

2629
/// <summary>
2730
/// This function is called when trying to reserve slots for "eager" workflow and activity tasks.
2831
/// Eager tasks are those which are returned as a result of completing a workflow task, rather than
2932
/// from polling. Your implementation must not block, and if a slot is available, return a permit
3033
/// to use that slot.
3134
/// </summary>
35+
/// <remarks>
36+
/// This method will be called concurrently from multiple threads, so it must be thread-safe.
37+
/// </remarks>
3238
/// <param name="ctx">The context for slot reservation.</param>
3339
/// <returns>Maybe a permit to use the slot which may be populated with your own data.</returns>
34-
public SlotPermit? TryReserveSlot(SlotReserveContext ctx);
40+
public ISlotPermit? TryReserveSlot(SlotReserveContext ctx);
3541

3642
/// <summary>
3743
/// This function is called once a slot is actually being used to process some task, which may be
3844
/// some time after the slot was reserved originally. For example, if there is no work for a
3945
/// worker, a number of slots equal to the number of active pollers may already be reserved, but
4046
/// none of them are being used yet. This call should be non-blocking.
4147
/// </summary>
48+
/// <remarks>
49+
/// This method will be called concurrently from multiple threads, so it must be thread-safe.
50+
/// </remarks>
4251
/// <param name="ctx">The context for marking a slot as used.</param>
4352
public void MarkSlotUsed(SlotMarkUsedContext ctx);
4453

@@ -47,6 +56,9 @@ public interface ICustomSlotSupplier : ISlotSupplier
4756
/// finished, whether successfully or not, or because the slot was no longer needed (ex: the number
4857
/// of active pollers decreased). This call should be non-blocking.
4958
/// </summary>
59+
/// <remarks>
60+
/// This method will be called concurrently from multiple threads, so it must be thread-safe.
61+
/// </remarks>
5062
/// <param name="ctx">The context for releasing a slot.</param>
5163
public void ReleaseSlot(SlotReleaseContext ctx);
5264
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
namespace Temporalio.Worker.Tuning
2+
{
3+
// This interface is intended as a marker type
4+
#pragma warning disable CA1040
5+
/// <summary>
6+
/// A permit to use a slot for a workflow/activity/local activity task.
7+
/// You can implement this interface to add your own data to the permit.
8+
/// </summary>
9+
/// <remarks>
10+
/// WARNING: Custom slot suppliers are currently experimental.
11+
/// </remarks>
12+
public interface ISlotPermit
13+
{
14+
}
15+
#pragma warning restore CA1040
16+
}

src/Temporalio/Worker/Tuning/SlotMarkUsedContext.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ internal SlotMarkUsedContext(Temporalio.Bridge.Interop.SlotMarkUsedCtx ctx, GCHa
2020
this.SlotInfo = SlotInfo.FromBridge(ctx.slot_info);
2121
unsafe
2222
{
23-
this.Permit = (SlotPermit)userData.Target!;
23+
this.Permit = (ISlotPermit)userData.Target!;
2424
}
2525
}
2626

@@ -32,6 +32,6 @@ internal SlotMarkUsedContext(Temporalio.Bridge.Interop.SlotMarkUsedCtx ctx, GCHa
3232
/// <summary>
3333
/// Gets the permit that was issued when the slot was reserved.
3434
/// </summary>
35-
public SlotPermit Permit { get; }
35+
public ISlotPermit Permit { get; }
3636
}
3737
}

src/Temporalio/Worker/Tuning/SlotPermit.cs

Lines changed: 0 additions & 18 deletions
This file was deleted.

0 commit comments

Comments
 (0)