Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 23 additions & 3 deletions src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using System.Diagnostics.CodeAnalysis;
using System.Text;
using DurableTask.Core;
using DurableTask.Core.Entities;
using DurableTask.Core.Entities.OperationFormat;
using DurableTask.Core.History;
using Grpc.Core;
using Microsoft.DurableTask.Entities;
using Microsoft.DurableTask.Worker.Shims;
using Microsoft.Extensions.DependencyInjection;
Expand Down Expand Up @@ -186,10 +184,18 @@ async ValueTask<OrchestrationRuntimeState> BuildRuntimeStateAsync(

async Task ProcessWorkItemsAsync(AsyncServerStreamingCall<P.WorkItem> stream, CancellationToken cancellation)
{
// Create a new token source for timing out and a final token source that keys off of them both.
// The timeout token is used to detect when we are no longer getting any messages, including health checks.
// If this is the case, it signifies the connection has been dropped silently and we need to reconnect.
using var timeoutSource = new CancellationTokenSource();
timeoutSource.CancelAfter(TimeSpan.FromSeconds(60));
using var tokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellation, timeoutSource.Token);

while (!cancellation.IsCancellationRequested)
{
await foreach (P.WorkItem workItem in stream.ResponseStream.ReadAllAsync(cancellation))
await foreach (P.WorkItem workItem in stream.ResponseStream.ReadAllAsync(tokenSource.Token))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this not throw if the connection is closed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We thought it would too, but if you go into the IAsyncStreamReader, a cancellation is actually just treated as the end of the stream and it returns normally.

{
timeoutSource.CancelAfter(TimeSpan.FromSeconds(60));
if (workItem.RequestCase == P.WorkItem.RequestOneofCase.OrchestratorRequest)
{
this.RunBackgroundTask(
Expand Down Expand Up @@ -237,6 +243,20 @@ async Task ProcessWorkItemsAsync(AsyncServerStreamingCall<P.WorkItem> stream, Ca
this.Logger.UnexpectedWorkItemType(workItem.RequestCase.ToString());
}
}

if (tokenSource.IsCancellationRequested || tokenSource.Token.IsCancellationRequested)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I am missing something, but it seems unlikely this line would ever be true. If IsCancellationRequested, then more likely than not stream.ResponseStream.ReadAllAsync throw OpeationCancelledException.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See above. We thought this behavior was an odd choice for the stream reader as well, but it's documented that it doesn't throw.

{
// The token has cancelled, this means either:
// 1. The broader 'cancellation' was triggered, return here to start a graceful shutdown.
// 2. The timeoutSource was triggered, return here to trigger a reconnect to the backend.
if (!cancellation.IsCancellationRequested)
{
// Since the cancellation came from the timeout, log a warning.
this.Logger.ConnectionTimeout();
}

return;
}
}
}

Expand Down
3 changes: 3 additions & 0 deletions src/Worker/Grpc/Logs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,5 +48,8 @@ static partial class Logs

[LoggerMessage(EventId = 55, Level = LogLevel.Information, Message = "{instanceId}: Evaluating custom retry handler for failed '{name}' task. Attempt = {attempt}.")]
public static partial void RetryingTask(this ILogger logger, string instanceId, string name, int attempt);

[LoggerMessage(EventId = 56, Level = LogLevel.Warning, Message = "Channel to backend has stopped receiving traffic, will attempt to reconnect.")]
public static partial void ConnectionTimeout(this ILogger logger);
}
}
Loading