diff --git a/src/Dapr.Workflow/Logging.cs b/src/Dapr.Workflow/Logging.cs index a44a2249b..7fd05def5 100644 --- a/src/Dapr.Workflow/Logging.cs +++ b/src/Dapr.Workflow/Logging.cs @@ -244,4 +244,7 @@ public static partial void LogWaitForStartException(this ILogger logger, Invalid [LoggerMessage(LogLevel.Information, "Rerun workflow from event: source='{SourceInstanceId}', eventId={EventId}, newInstanceId='{NewInstanceId}'")] public static partial void LogRerunWorkflowFromEvent(this ILogger logger, string sourceInstanceId, uint eventId, string newInstanceId); + + [LoggerMessage(LogLevel.Debug, "gRPC protocol handler keepalive Hello call failed")] + public static partial void LogGrpcProtocolHandlerKeepaliveFailed(this ILogger logger, Exception ex); } diff --git a/src/Dapr.Workflow/Worker/Grpc/GrpcProtocolHandler.cs b/src/Dapr.Workflow/Worker/Grpc/GrpcProtocolHandler.cs index 8a6d9bc69..f727a3315 100644 --- a/src/Dapr.Workflow/Worker/Grpc/GrpcProtocolHandler.cs +++ b/src/Dapr.Workflow/Worker/Grpc/GrpcProtocolHandler.cs @@ -17,6 +17,7 @@ using System.Threading.Tasks; using Dapr.Common; using Dapr.DurableTask.Protobuf; +using Google.Protobuf.WellKnownTypes; using Grpc.Core; using Microsoft.Extensions.Logging; @@ -33,6 +34,7 @@ internal sealed class GrpcProtocolHandler( string? daprApiToken = null) : IAsyncDisposable { private static readonly TimeSpan ReconnectDelay = TimeSpan.FromSeconds(5); + private static readonly TimeSpan KeepaliveInterval = TimeSpan.FromSeconds(30); private readonly CancellationTokenSource _disposalCts = new(); private readonly ILogger _logger = loggerFactory?.CreateLogger() ?? throw new ArgumentNullException(nameof(loggerFactory)); @@ -67,6 +69,9 @@ public async Task StartAsync( while (!token.IsCancellationRequested) { + CancellationTokenSource? keepaliveCts = null; + Task? keepaliveTask = null; + try { _logger.LogGrpcProtocolHandlerStartStream(); @@ -77,6 +82,10 @@ public async Task StartAsync( _logger.LogGrpcProtocolHandlerStreamEstablished(); + // Start the background keepalive loop to keep the connection alive + keepaliveCts = CancellationTokenSource.CreateLinkedTokenSource(token); + keepaliveTask = KeepaliveLoopAsync(keepaliveCts.Token); + // Process work items from the stream await ReceiveLoopAsync(_streamingCall.ResponseStream, workflowHandler, activityHandler, token); @@ -110,6 +119,18 @@ public async Task StartAsync( } finally { + // Stop the keepalive loop when the receive loop ends (reconnect or shutdown). + // This runs after catch filters evaluate, avoiding a race where teardown delay + // allows external cancellation to change filter outcomes. + if (keepaliveCts != null) + { + await keepaliveCts.CancelAsync(); + try { await keepaliveTask!; } + catch (OperationCanceledException) { } + catch (Exception ex) { _logger.LogGrpcProtocolHandlerKeepaliveFailed(ex); } + keepaliveCts.Dispose(); + } + _streamingCall?.Dispose(); _streamingCall = null; } @@ -322,6 +343,26 @@ private static OrchestratorResponse CreateWorkflowFailureResult(OrchestratorRequ } }; + /// + /// Periodically calls Hello on the sidecar to prevent idle HTTP/2 connections from being + /// closed by intermediary load balancers (e.g. AWS ALB). + /// + private async Task KeepaliveLoopAsync(CancellationToken cancellation) + { + using var timer = new PeriodicTimer(KeepaliveInterval); + while (await timer.WaitForNextTickAsync(cancellation)) + { + try + { + await _grpcClient.HelloAsync(new Empty(), CreateCallOptions(cancellation)); + } + catch (Exception ex) when (ex is not OperationCanceledException) + { + _logger.LogGrpcProtocolHandlerKeepaliveFailed(ex); + } + } + } + /// public async ValueTask DisposeAsync() {