Skip to content

Commit 14951b4

Browse files
committed
Add timeout to gRPC workitem streaming
This commit adds a timeout to the gRPC stream used to communicate with the backend. This was done because the backend could restart and drop the connection and the worker would not know. This causes the worker to hang and not receive any new work items. The fix is to reset the connection if a long enough period of time has passed between receiving anything on the stream. Signed-off-by: halspang <[email protected]>
1 parent 65126d2 commit 14951b4

File tree

1 file changed

+15
-3
lines changed

1 file changed

+15
-3
lines changed

src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,11 @@
11
// Copyright (c) Microsoft Corporation.
22
// Licensed under the MIT License.
33

4-
using System.Diagnostics.CodeAnalysis;
54
using System.Text;
65
using DurableTask.Core;
76
using DurableTask.Core.Entities;
87
using DurableTask.Core.Entities.OperationFormat;
98
using DurableTask.Core.History;
10-
using Grpc.Core;
119
using Microsoft.DurableTask.Entities;
1210
using Microsoft.DurableTask.Worker.Shims;
1311
using Microsoft.Extensions.DependencyInjection;
@@ -186,10 +184,16 @@ async ValueTask<OrchestrationRuntimeState> BuildRuntimeStateAsync(
186184

187185
async Task ProcessWorkItemsAsync(AsyncServerStreamingCall<P.WorkItem> stream, CancellationToken cancellation)
188186
{
187+
// Create a new token source for timing out and a final token source that keys off of them both.
188+
var timeoutSource = new CancellationTokenSource();
189+
timeoutSource.CancelAfter(TimeSpan.FromSeconds(60));
190+
var tokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellation, timeoutSource.Token);
191+
189192
while (!cancellation.IsCancellationRequested)
190193
{
191-
await foreach (P.WorkItem workItem in stream.ResponseStream.ReadAllAsync(cancellation))
194+
await foreach (P.WorkItem workItem in stream.ResponseStream.ReadAllAsync(tokenSource.Token))
192195
{
196+
timeoutSource.CancelAfter(TimeSpan.FromSeconds(60));
193197
if (workItem.RequestCase == P.WorkItem.RequestOneofCase.OrchestratorRequest)
194198
{
195199
this.RunBackgroundTask(
@@ -237,6 +241,14 @@ async Task ProcessWorkItemsAsync(AsyncServerStreamingCall<P.WorkItem> stream, Ca
237241
this.Logger.UnexpectedWorkItemType(workItem.RequestCase.ToString());
238242
}
239243
}
244+
245+
if (tokenSource.IsCancellationRequested || tokenSource.Token.IsCancellationRequested)
246+
{
247+
// The token has cancelled, this means either:
248+
// 1. The broader 'cancellation' was triggered, return here to start a graceful shutdown.
249+
// 2. The timeoutSource was triggered, return here to trigger a reconnect to the backend.
250+
return;
251+
}
240252
}
241253
}
242254

0 commit comments

Comments
 (0)