diff --git a/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs b/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs
index 2a4610cf8..20cd63095 100644
--- a/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs
+++ b/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs
@@ -276,7 +276,8 @@ async Task ProcessWorkItemsAsync(AsyncServerStreamingCall
stream, Ca
() => this.OnRunOrchestratorAsync(
workItem.OrchestratorRequest,
workItem.CompletionToken,
- cancellation));
+ cancellation),
+ cancellation);
}
else if (workItem.RequestCase == P.WorkItem.RequestOneofCase.ActivityRequest)
{
@@ -285,13 +286,15 @@ async Task ProcessWorkItemsAsync(AsyncServerStreamingCall stream, Ca
() => this.OnRunActivityAsync(
workItem.ActivityRequest,
workItem.CompletionToken,
- cancellation));
+ cancellation),
+ cancellation);
}
else if (workItem.RequestCase == P.WorkItem.RequestOneofCase.EntityRequest)
{
this.RunBackgroundTask(
workItem,
- () => this.OnRunEntityBatchAsync(workItem.EntityRequest.ToEntityBatchRequest(), cancellation));
+ () => this.OnRunEntityBatchAsync(workItem.EntityRequest.ToEntityBatchRequest(), cancellation),
+ cancellation);
}
else if (workItem.RequestCase == P.WorkItem.RequestOneofCase.EntityRequestV2)
{
@@ -305,7 +308,8 @@ async Task ProcessWorkItemsAsync(AsyncServerStreamingCall stream, Ca
batchRequest,
cancellation,
workItem.CompletionToken,
- operationInfos));
+ operationInfos),
+ cancellation);
}
else if (workItem.RequestCase == P.WorkItem.RequestOneofCase.HealthPing)
{
@@ -333,10 +337,11 @@ async Task ProcessWorkItemsAsync(AsyncServerStreamingCall stream, Ca
}
}
- void RunBackgroundTask(P.WorkItem? workItem, Func handler)
+ void RunBackgroundTask(P.WorkItem? workItem, Func handler, CancellationToken cancellation)
{
// TODO: is Task.Run appropriate here? Should we have finer control over the tasks and their threads?
- _ = Task.Run(async () =>
+ _ = Task.Run(
+ async () =>
{
try
{
@@ -351,8 +356,99 @@ void RunBackgroundTask(P.WorkItem? workItem, Func handler)
string instanceId =
workItem?.OrchestratorRequest?.InstanceId ??
workItem?.ActivityRequest?.OrchestrationInstance?.InstanceId ??
+ workItem?.EntityRequest?.InstanceId ??
+ workItem?.EntityRequestV2?.InstanceId ??
string.Empty;
this.Logger.UnexpectedError(ex, instanceId);
+
+ if (workItem?.OrchestratorRequest != null)
+ {
+ try
+ {
+ this.Logger.AbandoningOrchestratorWorkItem(instanceId, workItem?.CompletionToken ?? string.Empty);
+ await this.client.AbandonTaskOrchestratorWorkItemAsync(
+ new P.AbandonOrchestrationTaskRequest
+ {
+ CompletionToken = workItem?.CompletionToken,
+ },
+ cancellationToken: cancellation);
+ this.Logger.AbandonedOrchestratorWorkItem(instanceId, workItem?.CompletionToken ?? string.Empty);
+ }
+ catch (Exception abandonException)
+ {
+ this.Logger.UnexpectedError(abandonException, instanceId);
+ }
+ }
+ else if (workItem?.ActivityRequest != null)
+ {
+ try
+ {
+ this.Logger.AbandoningActivityWorkItem(
+ instanceId,
+ workItem.ActivityRequest.Name,
+ workItem.ActivityRequest.TaskId,
+ workItem?.CompletionToken ?? string.Empty);
+ await this.client.AbandonTaskActivityWorkItemAsync(
+ new P.AbandonActivityTaskRequest
+ {
+ CompletionToken = workItem?.CompletionToken,
+ },
+ cancellationToken: cancellation);
+ this.Logger.AbandonedActivityWorkItem(
+ instanceId,
+ workItem.ActivityRequest.Name,
+ workItem.ActivityRequest.TaskId,
+ workItem?.CompletionToken ?? string.Empty);
+ }
+ catch (Exception abandonException)
+ {
+ this.Logger.UnexpectedError(abandonException, instanceId);
+ }
+ }
+ else if (workItem?.EntityRequest != null)
+ {
+ try
+ {
+ this.Logger.AbandoningEntityWorkItem(
+ workItem.EntityRequest.InstanceId,
+ workItem?.CompletionToken ?? string.Empty);
+ await this.client.AbandonTaskEntityWorkItemAsync(
+ new P.AbandonEntityTaskRequest
+ {
+ CompletionToken = workItem?.CompletionToken,
+ },
+ cancellationToken: cancellation);
+ this.Logger.AbandonedEntityWorkItem(
+ workItem.EntityRequest.InstanceId,
+ workItem?.CompletionToken ?? string.Empty);
+ }
+ catch (Exception abandonException)
+ {
+ this.Logger.UnexpectedError(abandonException, workItem.EntityRequest.InstanceId);
+ }
+ }
+ else if (workItem?.EntityRequestV2 != null)
+ {
+ try
+ {
+ this.Logger.AbandoningEntityWorkItem(
+ workItem.EntityRequestV2.InstanceId,
+ workItem?.CompletionToken ?? string.Empty);
+ await this.client.AbandonTaskEntityWorkItemAsync(
+ new P.AbandonEntityTaskRequest
+ {
+ CompletionToken = workItem?.CompletionToken,
+ },
+ cancellationToken: cancellation);
+ this.Logger.AbandonedEntityWorkItem(
+ workItem.EntityRequestV2.InstanceId,
+ workItem?.CompletionToken ?? string.Empty);
+ }
+ catch (Exception abandonException)
+ {
+ this.Logger.UnexpectedError(abandonException, workItem.EntityRequestV2.InstanceId);
+ }
+ }
}
});
}
diff --git a/src/Worker/Grpc/Logs.cs b/src/Worker/Grpc/Logs.cs
index b5cc0d9c2..b7d1f957e 100644
--- a/src/Worker/Grpc/Logs.cs
+++ b/src/Worker/Grpc/Logs.cs
@@ -60,5 +60,24 @@ static partial class Logs
[LoggerMessage(EventId = 59, Level = LogLevel.Information, Message = "Abandoning orchestration due to filtering. InstanceId = '{instanceId}'. Completion token = '{completionToken}'")]
public static partial void AbandoningOrchestrationDueToOrchestrationFilter(this ILogger logger, string instanceId, string completionToken);
+
+ // Abandoning/Abandoned logs for background task error handling
+ [LoggerMessage(EventId = 60, Level = LogLevel.Information, Message = "{instanceId}: Abandoning orchestrator work item. Completion token = '{completionToken}'")]
+ public static partial void AbandoningOrchestratorWorkItem(this ILogger logger, string instanceId, string completionToken);
+
+ [LoggerMessage(EventId = 61, Level = LogLevel.Information, Message = "{instanceId}: Abandoned orchestrator work item. Completion token = '{completionToken}'")]
+ public static partial void AbandonedOrchestratorWorkItem(this ILogger logger, string instanceId, string completionToken);
+
+ [LoggerMessage(EventId = 62, Level = LogLevel.Information, Message = "{instanceId}: Abandoning activity work item '{name}#{taskId}'. Completion token = '{completionToken}'")]
+ public static partial void AbandoningActivityWorkItem(this ILogger logger, string instanceId, string name, int taskId, string completionToken);
+
+ [LoggerMessage(EventId = 63, Level = LogLevel.Information, Message = "{instanceId}: Abandoned activity work item '{name}#{taskId}'. Completion token = '{completionToken}'")]
+ public static partial void AbandonedActivityWorkItem(this ILogger logger, string instanceId, string name, int taskId, string completionToken);
+
+ [LoggerMessage(EventId = 64, Level = LogLevel.Information, Message = "{instanceId}: Abandoning entity work item. Completion token = '{completionToken}'")]
+ public static partial void AbandoningEntityWorkItem(this ILogger logger, string instanceId, string completionToken);
+
+ [LoggerMessage(EventId = 65, Level = LogLevel.Information, Message = "{instanceId}: Abandoned entity work item. Completion token = '{completionToken}'")]
+ public static partial void AbandonedEntityWorkItem(this ILogger logger, string instanceId, string completionToken);
}
}
diff --git a/test/Worker/Grpc.Tests/RunBackgroundTaskLoggingTests.cs b/test/Worker/Grpc.Tests/RunBackgroundTaskLoggingTests.cs
new file mode 100644
index 000000000..86453ddef
--- /dev/null
+++ b/test/Worker/Grpc.Tests/RunBackgroundTaskLoggingTests.cs
@@ -0,0 +1,496 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT License.
+
+using System.Reflection;
+using FluentAssertions;
+using Microsoft.DurableTask;
+using Microsoft.DurableTask.Worker;
+using Microsoft.DurableTask.Worker.Grpc;
+using Microsoft.DurableTask.Tests.Logging;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Options;
+using Moq;
+using P = Microsoft.DurableTask.Protobuf;
+using Xunit;
+using Grpc.Core;
+using Xunit.Abstractions;
+
+namespace Microsoft.DurableTask.Worker.Grpc.Tests;
+
+public class RunBackgroundTaskLoggingTests
+{
+ const string Category = "Microsoft.DurableTask";
+
+ [Fact]
+ public async Task Logs_Abandoning_And_Abandoned_For_Orchestrator()
+ {
+ await using var fixture = await TestFixture.CreateAsync();
+
+ string instanceId = Guid.NewGuid().ToString("N");
+ string completionToken = Guid.NewGuid().ToString("N");
+
+ var tcs = new TaskCompletionSource();
+ fixture.ClientMock
+ .Setup(c => c.AbandonTaskOrchestratorWorkItemAsync(
+ It.IsAny(),
+ It.IsAny(),
+ It.IsAny(),
+ It.IsAny()))
+ .Returns((P.AbandonOrchestrationTaskRequest r, Metadata h, DateTime? d, CancellationToken ct) =>
+ CompletedAsyncUnaryCall(new P.AbandonOrchestrationTaskResponse(), () => tcs.TrySetResult(true)));
+
+ P.WorkItem workItem = new()
+ {
+ OrchestratorRequest = new P.OrchestratorRequest { InstanceId = instanceId },
+ CompletionToken = completionToken,
+ };
+
+ fixture.InvokeRunBackgroundTask(workItem, () => Task.FromException(new Exception("boom")));
+
+ await WaitAsync(tcs.Task);
+
+ await AssertEventually(() => fixture.GetLogs().Any(l => l.Message.Contains("Abandoning orchestrator work item") && l.Message.Contains(instanceId)));
+ await AssertEventually(() => fixture.GetLogs().Any(l => l.Message.Contains("Abandoned orchestrator work item") && l.Message.Contains(instanceId)));
+ }
+
+ [Fact]
+ public async Task Logs_Abandoning_And_NoAbandoned_When_Orchestrator_Abandon_Fails()
+ {
+ await using var fixture = await TestFixture.CreateAsync();
+
+ string instanceId = Guid.NewGuid().ToString("N");
+ string completionToken = Guid.NewGuid().ToString("N");
+
+ fixture.ClientMock
+ .Setup(c => c.AbandonTaskOrchestratorWorkItemAsync(
+ It.IsAny(),
+ It.IsAny(),
+ It.IsAny(),
+ It.IsAny()))
+ .Returns(() => FaultedAsyncUnaryCall(new Exception("abandon failure")));
+
+ P.WorkItem workItem = new()
+ {
+ OrchestratorRequest = new P.OrchestratorRequest { InstanceId = instanceId },
+ CompletionToken = completionToken,
+ };
+
+ fixture.InvokeRunBackgroundTask(workItem, () => Task.FromException(new Exception("boom")));
+
+ // Allow background task to execute
+ await Task.Delay(200);
+
+ await AssertEventually(() => fixture.GetLogs().Any(l => l.Message.Contains("Abandoning orchestrator work item") && l.Message.Contains(instanceId)));
+ Assert.DoesNotContain(fixture.GetLogs(), l => l.Message.Contains("Abandoned orchestrator work item") && l.Message.Contains(instanceId));
+ await AssertEventually(() => fixture.GetLogs().Any(l => l.Message.Contains("Unexpected error") && l.Message.Contains(instanceId)));
+ }
+
+ [Fact]
+ public async Task Logs_Abandoning_And_Abandoned_For_Activity()
+ {
+ await using var fixture = await TestFixture.CreateAsync();
+
+ string instanceId = Guid.NewGuid().ToString("N");
+ string completionToken = Guid.NewGuid().ToString("N");
+
+ var tcs = new TaskCompletionSource();
+ fixture.ClientMock
+ .Setup(c => c.AbandonTaskActivityWorkItemAsync(
+ It.IsAny(),
+ It.IsAny(),
+ It.IsAny(),
+ It.IsAny()))
+ .Returns((P.AbandonActivityTaskRequest r, Metadata h, DateTime? d, CancellationToken ct) =>
+ CompletedAsyncUnaryCall(new P.AbandonActivityTaskResponse(), () => tcs.TrySetResult(true)));
+
+ P.WorkItem workItem = new()
+ {
+ ActivityRequest = new P.ActivityRequest
+ {
+ Name = "MyActivity",
+ TaskId = 42,
+ OrchestrationInstance = new P.OrchestrationInstance { InstanceId = instanceId },
+ },
+ CompletionToken = completionToken,
+ };
+
+ fixture.InvokeRunBackgroundTask(workItem, () => Task.FromException(new Exception("boom")));
+
+ await WaitAsync(tcs.Task);
+
+ await AssertEventually(() => fixture.GetLogs().Any(l => l.Message.Contains("Abandoning activity work item") && l.Message.Contains(instanceId)));
+ await AssertEventually(() => fixture.GetLogs().Any(l => l.Message.Contains("Abandoned activity work item") && l.Message.Contains(instanceId)));
+ }
+
+ [Fact]
+ public async Task Logs_Abandoning_And_NoAbandoned_When_Activity_Abandon_Fails()
+ {
+ await using var fixture = await TestFixture.CreateAsync();
+
+ string instanceId = Guid.NewGuid().ToString("N");
+ string completionToken = Guid.NewGuid().ToString("N");
+
+ fixture.ClientMock
+ .Setup(c => c.AbandonTaskActivityWorkItemAsync(
+ It.IsAny(),
+ It.IsAny(),
+ It.IsAny(),
+ It.IsAny()))
+ .Returns(() => FaultedAsyncUnaryCall(new Exception("abandon failure")));
+
+ P.WorkItem workItem = new()
+ {
+ ActivityRequest = new P.ActivityRequest
+ {
+ Name = "MyActivity",
+ TaskId = 42,
+ OrchestrationInstance = new P.OrchestrationInstance { InstanceId = instanceId },
+ },
+ CompletionToken = completionToken,
+ };
+
+ fixture.InvokeRunBackgroundTask(workItem, () => Task.FromException(new Exception("boom")));
+
+ await Task.Delay(200);
+
+ await AssertEventually(() => fixture.GetLogs().Any(l => l.Message.Contains("Abandoning activity work item") && l.Message.Contains(instanceId)));
+ Assert.DoesNotContain(fixture.GetLogs(), l => l.Message.Contains("Abandoned activity work item") && l.Message.Contains(instanceId));
+ await AssertEventually(() => fixture.GetLogs().Any(l => l.Message.Contains("Unexpected error") && l.Message.Contains(instanceId)));
+ }
+
+ [Fact]
+ public async Task Logs_Abandoning_And_Abandoned_For_Entity_V1()
+ {
+ await using var fixture = await TestFixture.CreateAsync();
+
+ string instanceId = "entity@key";
+ string completionToken = Guid.NewGuid().ToString("N");
+
+ var tcs = new TaskCompletionSource();
+ fixture.ClientMock
+ .Setup(c => c.AbandonTaskEntityWorkItemAsync(
+ It.IsAny(),
+ It.IsAny(),
+ It.IsAny(),
+ It.IsAny()))
+ .Returns((P.AbandonEntityTaskRequest r, Metadata h, DateTime? d, CancellationToken ct) =>
+ CompletedAsyncUnaryCall(new P.AbandonEntityTaskResponse(), () => tcs.TrySetResult(true)));
+
+ P.WorkItem workItem = new()
+ {
+ EntityRequest = new P.EntityBatchRequest { InstanceId = instanceId },
+ CompletionToken = completionToken,
+ };
+
+ fixture.InvokeRunBackgroundTask(workItem, () => Task.FromException(new Exception("boom")));
+
+ await WaitAsync(tcs.Task);
+
+ await AssertEventually(() => fixture.GetLogs().Any(l => l.Message.Contains("Abandoning entity work item") && l.Message.Contains(instanceId)));
+ await AssertEventually(() => fixture.GetLogs().Any(l => l.Message.Contains("Abandoned entity work item") && l.Message.Contains(instanceId)));
+ }
+
+ [Fact]
+ public async Task Logs_Abandoning_And_NoAbandoned_When_EntityV1_Abandon_Fails()
+ {
+ await using var fixture = await TestFixture.CreateAsync();
+
+ string instanceId = "entity@key";
+ string completionToken = Guid.NewGuid().ToString("N");
+
+ fixture.ClientMock
+ .Setup(c => c.AbandonTaskEntityWorkItemAsync(
+ It.IsAny(),
+ It.IsAny(),
+ It.IsAny(),
+ It.IsAny()))
+ .Returns(() => FaultedAsyncUnaryCall(new Exception("abandon failure")));
+
+ P.WorkItem workItem = new()
+ {
+ EntityRequest = new P.EntityBatchRequest { InstanceId = instanceId },
+ CompletionToken = completionToken,
+ };
+
+ fixture.InvokeRunBackgroundTask(workItem, () => Task.FromException(new Exception("boom")));
+
+ await Task.Delay(200);
+
+ await AssertEventually(() => fixture.GetLogs().Any(l => l.Message.Contains("Abandoning entity work item") && l.Message.Contains(instanceId)));
+ Assert.DoesNotContain(fixture.GetLogs(), l => l.Message.Contains("Abandoned entity work item") && l.Message.Contains(instanceId));
+ await AssertEventually(() => fixture.GetLogs().Any(l => l.Message.Contains("Unexpected error") && l.Message.Contains(instanceId)));
+ }
+
+ [Fact]
+ public async Task Logs_Abandoning_And_Abandoned_For_Entity_V2()
+ {
+ await using var fixture = await TestFixture.CreateAsync();
+
+ string instanceId = "entity2@key";
+ string completionToken = Guid.NewGuid().ToString("N");
+
+ var tcs = new TaskCompletionSource();
+ fixture.ClientMock
+ .Setup(c => c.AbandonTaskEntityWorkItemAsync(
+ It.IsAny(),
+ It.IsAny(),
+ It.IsAny(),
+ It.IsAny()))
+ .Returns((P.AbandonEntityTaskRequest r, Metadata h, DateTime? d, CancellationToken ct) =>
+ CompletedAsyncUnaryCall(new P.AbandonEntityTaskResponse(), () => tcs.TrySetResult(true)));
+
+ P.WorkItem workItem = new()
+ {
+ EntityRequestV2 = new P.EntityRequest { InstanceId = instanceId },
+ CompletionToken = completionToken,
+ };
+
+ fixture.InvokeRunBackgroundTask(workItem, () => Task.FromException(new Exception("boom")));
+
+ await WaitAsync(tcs.Task);
+
+ await AssertEventually(() => fixture.GetLogs().Any(l => l.Message.Contains("Abandoning entity work item") && l.Message.Contains(instanceId)));
+ await AssertEventually(() => fixture.GetLogs().Any(l => l.Message.Contains("Abandoned entity work item") && l.Message.Contains(instanceId)));
+ }
+
+ [Fact]
+ public async Task Logs_Abandoning_And_NoAbandoned_When_EntityV2_Abandon_Fails()
+ {
+ await using var fixture = await TestFixture.CreateAsync();
+
+ string instanceId = "entity2@key";
+ string completionToken = Guid.NewGuid().ToString("N");
+
+ fixture.ClientMock
+ .Setup(c => c.AbandonTaskEntityWorkItemAsync(
+ It.IsAny(),
+ It.IsAny(),
+ It.IsAny(),
+ It.IsAny()))
+ .Returns(() => FaultedAsyncUnaryCall(new Exception("abandon failure")));
+
+ P.WorkItem workItem = new()
+ {
+ EntityRequestV2 = new P.EntityRequest { InstanceId = instanceId },
+ CompletionToken = completionToken,
+ };
+
+ fixture.InvokeRunBackgroundTask(workItem, () => Task.FromException(new Exception("boom")));
+
+ await Task.Delay(200);
+
+ await AssertEventually(() => fixture.GetLogs().Any(l => l.Message.Contains("Abandoning entity work item") && l.Message.Contains(instanceId)));
+ Assert.DoesNotContain(fixture.GetLogs(), l => l.Message.Contains("Abandoned entity work item") && l.Message.Contains(instanceId));
+ await AssertEventually(() => fixture.GetLogs().Any(l => l.Message.Contains("Unexpected error") && l.Message.Contains(instanceId)));
+ }
+
+ [Fact]
+ public async Task Forwards_CancellationToken_To_Abandon_Orchestrator()
+ {
+ await using var fixture = await TestFixture.CreateAsync();
+
+ string instanceId = Guid.NewGuid().ToString("N");
+ string completionToken = Guid.NewGuid().ToString("N");
+
+ var cts = new CancellationTokenSource();
+ var observed = new TaskCompletionSource();
+
+ fixture.ClientMock
+ .Setup(c => c.AbandonTaskOrchestratorWorkItemAsync(
+ It.IsAny(),
+ It.IsAny(),
+ It.IsAny(),
+ It.IsAny()))
+ .Returns((P.AbandonOrchestrationTaskRequest r, Metadata h, DateTime? d, CancellationToken ct) =>
+ {
+ if (ct == cts.Token)
+ {
+ observed.TrySetResult(true);
+ }
+ return CompletedAsyncUnaryCall(new P.AbandonOrchestrationTaskResponse());
+ });
+
+ P.WorkItem workItem = new()
+ {
+ OrchestratorRequest = new P.OrchestratorRequest { InstanceId = instanceId },
+ CompletionToken = completionToken,
+ };
+
+ fixture.InvokeRunBackgroundTask(workItem, () => Task.FromException(new Exception("boom")), cts.Token);
+
+ await WaitAsync(observed.Task);
+ }
+
+ static async Task WaitAsync(Task task)
+ {
+ using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
+ Task completed = await Task.WhenAny(task, Task.Delay(Timeout.InfiniteTimeSpan, cts.Token));
+ if (completed != task)
+ {
+ throw new TimeoutException("Timed out waiting for abandon call");
+ }
+ }
+
+ static async Task AssertEventually(Func condition, int timeoutMs = 2000)
+ {
+ var sw = System.Diagnostics.Stopwatch.StartNew();
+ while (sw.ElapsedMilliseconds < timeoutMs)
+ {
+ if (condition())
+ {
+ return;
+ }
+ await Task.Delay(50);
+ }
+ Assert.True(false, "Condition not met within timeout");
+ }
+
+ sealed class TestFixture : IAsyncDisposable
+ {
+ readonly ServiceProvider services;
+ readonly TestLogProvider logProvider;
+
+ public Mock ClientMock { get; }
+ public GrpcDurableTaskWorker Worker { get; }
+ object ProcessorInstance { get; }
+ MethodInfo RunBackgroundTaskMethod { get; }
+
+ TestFixture(ServiceProvider services, TestLogProvider logProvider, Mock clientMock, GrpcDurableTaskWorker worker, object processorInstance, MethodInfo runBackgroundTaskMethod)
+ {
+ this.services = services;
+ this.logProvider = logProvider;
+ this.ClientMock = clientMock;
+ this.Worker = worker;
+ this.ProcessorInstance = processorInstance;
+ this.RunBackgroundTaskMethod = runBackgroundTaskMethod;
+ }
+
+ public static async Task CreateAsync()
+ {
+ // Logging
+ var logProvider = new TestLogProvider(new NullOutput());
+ // DI
+ var services = new ServiceCollection().BuildServiceProvider();
+ var loggerFactory = new SimpleLoggerFactory(logProvider);
+
+ // Options
+ var grpcOptions = new OptionsMonitorStub(new GrpcDurableTaskWorkerOptions());
+ var workerOptions = new OptionsMonitorStub(new DurableTaskWorkerOptions());
+
+ // Factory (not used in these tests)
+ var factoryMock = new Mock(MockBehavior.Strict);
+
+ // Worker
+ var worker = new GrpcDurableTaskWorker(
+ name: "Test",
+ factory: factoryMock.Object,
+ grpcOptions: grpcOptions,
+ workerOptions: workerOptions,
+ services: services,
+ loggerFactory: loggerFactory);
+
+ // Client mock
+ var callInvoker = Mock.Of();
+ var clientMock = new Mock(MockBehavior.Strict, new object[] { callInvoker });
+
+ // Build Processor via reflection
+ Type processorType = typeof(GrpcDurableTaskWorker).GetNestedType("Processor", BindingFlags.NonPublic)!;
+ object processorInstance = Activator.CreateInstance(
+ processorType,
+ BindingFlags.Public | BindingFlags.Instance,
+ binder: null,
+ args: new object?[] { worker, clientMock.Object, null },
+ culture: null)!;
+
+ MethodInfo runBackgroundTask = processorType.GetMethod("RunBackgroundTask", BindingFlags.Instance | BindingFlags.NonPublic)!;
+
+ return new TestFixture((ServiceProvider)services, logProvider, clientMock, worker, processorInstance, runBackgroundTask);
+ }
+
+ public void InvokeRunBackgroundTask(P.WorkItem workItem, Func handler, CancellationToken cancellationToken = default)
+ {
+ this.RunBackgroundTaskMethod.Invoke(this.ProcessorInstance, new object?[] { workItem, handler, cancellationToken });
+ }
+
+ public IReadOnlyCollection GetLogs()
+ {
+ this.logProvider.TryGetLogs(Category, out var logs);
+ return logs ?? Array.Empty();
+ }
+
+ public ValueTask DisposeAsync()
+ {
+ (this.services as IDisposable)?.Dispose();
+ return default;
+ }
+ }
+
+ static AsyncUnaryCall CompletedAsyncUnaryCall(T response, Action? onInvoke = null)
+ {
+ var respTask = Task.Run(() => { onInvoke?.Invoke(); return response; });
+ return new AsyncUnaryCall(
+ respTask,
+ Task.FromResult(new Metadata()),
+ () => new Status(StatusCode.OK, string.Empty),
+ () => new Metadata(),
+ () => { });
+ }
+
+ static AsyncUnaryCall FaultedAsyncUnaryCall(Exception ex)
+ {
+ var respTask = Task.FromException(ex);
+ return new AsyncUnaryCall(
+ respTask,
+ Task.FromResult(new Metadata()),
+ () => new Status(StatusCode.Unknown, ex.Message),
+ () => new Metadata(),
+ () => { });
+ }
+
+ sealed class NullOutput : ITestOutputHelper
+ {
+ public void WriteLine(string message) { }
+ public void WriteLine(string format, params object[] args) { }
+ }
+
+ sealed class OptionsMonitorStub : IOptionsMonitor where T : class, new()
+ {
+ readonly T value;
+
+ public OptionsMonitorStub(T value) => this.value = value;
+
+ public T CurrentValue => this.value;
+
+ public T Get(string? name) => this.value;
+
+ public IDisposable OnChange(Action listener) => NullDisposable.Instance;
+
+ sealed class NullDisposable : IDisposable
+ {
+ public static readonly NullDisposable Instance = new();
+ public void Dispose() { }
+ }
+ }
+
+ sealed class SimpleLoggerFactory : ILoggerFactory
+ {
+ readonly ILoggerProvider provider;
+
+ public SimpleLoggerFactory(ILoggerProvider provider)
+ {
+ this.provider = provider;
+ }
+
+ public void AddProvider(ILoggerProvider provider)
+ {
+ // No-op; single provider
+ }
+
+ public ILogger CreateLogger(string categoryName) => this.provider.CreateLogger(categoryName);
+
+ public void Dispose() { }
+ }
+}
+
+