Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/Dapr.Workflow/Logging.cs
Original file line number Diff line number Diff line change
Expand Up @@ -244,4 +244,7 @@ public static partial void LogWaitForStartException(this ILogger logger, Invalid

[LoggerMessage(LogLevel.Information, "Rerun workflow from event: source='{SourceInstanceId}', eventId={EventId}, newInstanceId='{NewInstanceId}'")]
public static partial void LogRerunWorkflowFromEvent(this ILogger logger, string sourceInstanceId, uint eventId, string newInstanceId);

[LoggerMessage(LogLevel.Debug, "gRPC protocol handler keepalive Hello call failed")]
public static partial void LogGrpcProtocolHandlerKeepaliveFailed(this ILogger logger, Exception ex);
}
41 changes: 41 additions & 0 deletions src/Dapr.Workflow/Worker/Grpc/GrpcProtocolHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
using System.Threading.Tasks;
using Dapr.Common;
using Dapr.DurableTask.Protobuf;
using Google.Protobuf.WellKnownTypes;
using Grpc.Core;
using Microsoft.Extensions.Logging;

Expand All @@ -33,6 +34,7 @@ internal sealed class GrpcProtocolHandler(
string? daprApiToken = null) : IAsyncDisposable
{
private static readonly TimeSpan ReconnectDelay = TimeSpan.FromSeconds(5);
private static readonly TimeSpan KeepaliveInterval = TimeSpan.FromSeconds(30);

private readonly CancellationTokenSource _disposalCts = new();
private readonly ILogger<GrpcProtocolHandler> _logger = loggerFactory?.CreateLogger<GrpcProtocolHandler>() ?? throw new ArgumentNullException(nameof(loggerFactory));
Expand Down Expand Up @@ -67,6 +69,9 @@ public async Task StartAsync(

while (!token.IsCancellationRequested)
{
CancellationTokenSource? keepaliveCts = null;
Task? keepaliveTask = null;

try
{
_logger.LogGrpcProtocolHandlerStartStream();
Expand All @@ -77,6 +82,10 @@ public async Task StartAsync(

_logger.LogGrpcProtocolHandlerStreamEstablished();

// Start the background keepalive loop to keep the connection alive
keepaliveCts = CancellationTokenSource.CreateLinkedTokenSource(token);
keepaliveTask = KeepaliveLoopAsync(keepaliveCts.Token);

// Process work items from the stream
await ReceiveLoopAsync(_streamingCall.ResponseStream, workflowHandler, activityHandler, token);

Expand Down Expand Up @@ -110,6 +119,18 @@ public async Task StartAsync(
}
finally
{
// Stop the keepalive loop when the receive loop ends (reconnect or shutdown).
// This runs after catch filters evaluate, avoiding a race where teardown delay
// allows external cancellation to change filter outcomes.
if (keepaliveCts != null)
{
await keepaliveCts.CancelAsync();
try { await keepaliveTask!; }
catch (OperationCanceledException) { }
catch (Exception ex) { _logger.LogGrpcProtocolHandlerKeepaliveFailed(ex); }
keepaliveCts.Dispose();
}

_streamingCall?.Dispose();
_streamingCall = null;
}
Expand Down Expand Up @@ -322,6 +343,26 @@ private static OrchestratorResponse CreateWorkflowFailureResult(OrchestratorRequ
}
};

/// <summary>
/// Periodically calls Hello on the sidecar to prevent idle HTTP/2 connections from being
/// closed by intermediary load balancers (e.g. AWS ALB).
/// </summary>
private async Task KeepaliveLoopAsync(CancellationToken cancellation)
{
using var timer = new PeriodicTimer(KeepaliveInterval);
while (await timer.WaitForNextTickAsync(cancellation))
{
try
{
await _grpcClient.HelloAsync(new Empty(), CreateCallOptions(cancellation));
}
catch (Exception ex) when (ex is not OperationCanceledException)
{
_logger.LogGrpcProtocolHandlerKeepaliveFailed(ex);
}
}
}

/// <inheritdoc />
public async ValueTask DisposeAsync()
{
Expand Down
Loading