Skip to content

Commit 7747663

Browse files
authored
Add health-check timeout to gRPC workitem streaming (#390)
This commit adds a timeout to the gRPC stream used to communicate with the backend. This was done because the backend could restart and drop the connection and the worker would not know. This causes the worker to hang and not receive any new work items. The fix is to reset the connection if a long enough period of time has passed between receiving anything on the stream. Signed-off-by: halspang <[email protected]>
1 parent d9273d8 commit 7747663

File tree

2 files changed

+26
-3
lines changed

2 files changed

+26
-3
lines changed

src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,11 @@
11
// Copyright (c) Microsoft Corporation.
22
// Licensed under the MIT License.
33

4-
using System.Diagnostics.CodeAnalysis;
54
using System.Text;
65
using DurableTask.Core;
76
using DurableTask.Core.Entities;
87
using DurableTask.Core.Entities.OperationFormat;
98
using DurableTask.Core.History;
10-
using Grpc.Core;
119
using Microsoft.DurableTask.Entities;
1210
using Microsoft.DurableTask.Worker.Shims;
1311
using Microsoft.Extensions.DependencyInjection;
@@ -186,10 +184,18 @@ async ValueTask<OrchestrationRuntimeState> BuildRuntimeStateAsync(
186184

187185
async Task ProcessWorkItemsAsync(AsyncServerStreamingCall<P.WorkItem> stream, CancellationToken cancellation)
188186
{
187+
// Create a new token source for timing out and a final token source that keys off of them both.
188+
// The timeout token is used to detect when we are no longer getting any messages, including health checks.
189+
// If this is the case, it signifies the connection has been dropped silently and we need to reconnect.
190+
using var timeoutSource = new CancellationTokenSource();
191+
timeoutSource.CancelAfter(TimeSpan.FromSeconds(60));
192+
using var tokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellation, timeoutSource.Token);
193+
189194
while (!cancellation.IsCancellationRequested)
190195
{
191-
await foreach (P.WorkItem workItem in stream.ResponseStream.ReadAllAsync(cancellation))
196+
await foreach (P.WorkItem workItem in stream.ResponseStream.ReadAllAsync(tokenSource.Token))
192197
{
198+
timeoutSource.CancelAfter(TimeSpan.FromSeconds(60));
193199
if (workItem.RequestCase == P.WorkItem.RequestOneofCase.OrchestratorRequest)
194200
{
195201
this.RunBackgroundTask(
@@ -237,6 +243,20 @@ async Task ProcessWorkItemsAsync(AsyncServerStreamingCall<P.WorkItem> stream, Ca
237243
this.Logger.UnexpectedWorkItemType(workItem.RequestCase.ToString());
238244
}
239245
}
246+
247+
if (tokenSource.IsCancellationRequested || tokenSource.Token.IsCancellationRequested)
248+
{
249+
// The token has cancelled, this means either:
250+
// 1. The broader 'cancellation' was triggered, return here to start a graceful shutdown.
251+
// 2. The timeoutSource was triggered, return here to trigger a reconnect to the backend.
252+
if (!cancellation.IsCancellationRequested)
253+
{
254+
// Since the cancellation came from the timeout, log a warning.
255+
this.Logger.ConnectionTimeout();
256+
}
257+
258+
return;
259+
}
240260
}
241261
}
242262

src/Worker/Grpc/Logs.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,5 +48,8 @@ static partial class Logs
4848

4949
[LoggerMessage(EventId = 55, Level = LogLevel.Information, Message = "{instanceId}: Evaluating custom retry handler for failed '{name}' task. Attempt = {attempt}.")]
5050
public static partial void RetryingTask(this ILogger logger, string instanceId, string name, int attempt);
51+
52+
[LoggerMessage(EventId = 56, Level = LogLevel.Warning, Message = "Channel to backend has stopped receiving traffic, will attempt to reconnect.")]
53+
public static partial void ConnectionTimeout(this ILogger logger);
5154
}
5255
}

0 commit comments

Comments
 (0)