diff --git a/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs b/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs index 81733a063..193cbd919 100644 --- a/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs +++ b/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs @@ -1,13 +1,11 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT License. -using System.Diagnostics.CodeAnalysis; using System.Text; using DurableTask.Core; using DurableTask.Core.Entities; using DurableTask.Core.Entities.OperationFormat; using DurableTask.Core.History; -using Grpc.Core; using Microsoft.DurableTask.Entities; using Microsoft.DurableTask.Worker.Shims; using Microsoft.Extensions.DependencyInjection; @@ -186,10 +184,18 @@ async ValueTask BuildRuntimeStateAsync( async Task ProcessWorkItemsAsync(AsyncServerStreamingCall stream, CancellationToken cancellation) { + // Create a new token source for timing out and a final token source that keys off of them both. + // The timeout token is used to detect when we are no longer getting any messages, including health checks. + // If this is the case, it signifies the connection has been dropped silently and we need to reconnect. + using var timeoutSource = new CancellationTokenSource(); + timeoutSource.CancelAfter(TimeSpan.FromSeconds(60)); + using var tokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellation, timeoutSource.Token); + while (!cancellation.IsCancellationRequested) { - await foreach (P.WorkItem workItem in stream.ResponseStream.ReadAllAsync(cancellation)) + await foreach (P.WorkItem workItem in stream.ResponseStream.ReadAllAsync(tokenSource.Token)) { + timeoutSource.CancelAfter(TimeSpan.FromSeconds(60)); if (workItem.RequestCase == P.WorkItem.RequestOneofCase.OrchestratorRequest) { this.RunBackgroundTask( @@ -237,6 +243,20 @@ async Task ProcessWorkItemsAsync(AsyncServerStreamingCall stream, Ca this.Logger.UnexpectedWorkItemType(workItem.RequestCase.ToString()); } } + + if (tokenSource.IsCancellationRequested || tokenSource.Token.IsCancellationRequested) + { + // The token has cancelled, this means either: + // 1. The broader 'cancellation' was triggered, return here to start a graceful shutdown. + // 2. The timeoutSource was triggered, return here to trigger a reconnect to the backend. + if (!cancellation.IsCancellationRequested) + { + // Since the cancellation came from the timeout, log a warning. + this.Logger.ConnectionTimeout(); + } + + return; + } } } diff --git a/src/Worker/Grpc/Logs.cs b/src/Worker/Grpc/Logs.cs index 2ff8d40ec..0cdea3ebd 100644 --- a/src/Worker/Grpc/Logs.cs +++ b/src/Worker/Grpc/Logs.cs @@ -48,5 +48,8 @@ static partial class Logs [LoggerMessage(EventId = 55, Level = LogLevel.Information, Message = "{instanceId}: Evaluating custom retry handler for failed '{name}' task. Attempt = {attempt}.")] public static partial void RetryingTask(this ILogger logger, string instanceId, string name, int attempt); + + [LoggerMessage(EventId = 56, Level = LogLevel.Warning, Message = "Channel to backend has stopped receiving traffic, will attempt to reconnect.")] + public static partial void ConnectionTimeout(this ILogger logger); } }