Skip to content

Commit 8913ece

Browse files
authored
Fail in-flight invocations when worker channel shuts down (#11159)
1 parent 63d5954 commit 8913ece

15 files changed

+250
-41
lines changed

release_notes.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,3 +12,4 @@
1212
- Setting current activity status for failed invocations (#11313)
1313
- Adding test coverage for `Utility.IsAzureMonitorLoggingEnabled` (#11322)
1414
- Reduce allocations in `Utility.IsAzureMonitorLoggingEnabled` (#11323)
15+
- Bug fix that fails in-flight invocations when a worker channel shuts down (#11159)

src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,14 @@
1616
using System.Threading.Tasks.Dataflow;
1717
using Google.Protobuf.Collections;
1818
using Google.Protobuf.WellKnownTypes;
19+
using Microsoft.Azure.WebJobs.Host;
1920
using Microsoft.Azure.WebJobs.Logging;
2021
using Microsoft.Azure.WebJobs.Script.Config;
2122
using Microsoft.Azure.WebJobs.Script.Description;
2223
using Microsoft.Azure.WebJobs.Script.Diagnostics;
2324
using Microsoft.Azure.WebJobs.Script.Diagnostics.OpenTelemetry;
2425
using Microsoft.Azure.WebJobs.Script.Eventing;
26+
using Microsoft.Azure.WebJobs.Script.Exceptions;
2527
using Microsoft.Azure.WebJobs.Script.Extensions;
2628
using Microsoft.Azure.WebJobs.Script.Grpc.Eventing;
2729
using Microsoft.Azure.WebJobs.Script.Grpc.Extensions;
@@ -1552,21 +1554,22 @@ public bool IsExecutingInvocation(string invocationId)
15521554
return _executingInvocations.ContainsKey(invocationId);
15531555
}
15541556

1555-
public bool TryFailExecutions(Exception workerException)
1557+
public void Shutdown(Exception workerException)
15561558
{
1557-
if (workerException == null)
1559+
var shutdownException = workerException;
1560+
1561+
if (workerException is null || workerException is FunctionTimeoutException)
15581562
{
1559-
return false;
1563+
shutdownException = new FunctionTimeoutAbortException(workerException?.Message ?? "Worker channel is shutting down. Aborting function.", workerException);
15601564
}
15611565

15621566
foreach (var invocation in _executingInvocations?.Values)
15631567
{
15641568
string invocationId = invocation.Context?.ExecutionContext?.InvocationId.ToString();
15651569
_workerChannelLogger.LogDebug("Worker '{workerId}' encountered a fatal error. Failing invocation: '{invocationId}'", _workerId, invocationId);
1566-
invocation.Context?.SetException(workerException);
1570+
invocation.Context?.SetException(shutdownException);
15671571
RemoveExecutingInvocation(invocationId);
15681572
}
1569-
return true;
15701573
}
15711574

15721575
/// <summary>

src/WebJobs.Script.WebHost/WebScriptHostExceptionHandler.cs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) .NET Foundation. All rights reserved.
1+
// Copyright (c) .NET Foundation. All rights reserved.
22
// Licensed under the MIT License. See License.txt in the project root for license information.
33

44
using System;
@@ -48,14 +48,16 @@ public async Task OnTimeoutExceptionAsync(ExceptionDispatchInfo exceptionInfo, T
4848
// explicitly in order to pass the timeout.
4949
// Task ignoreTask = _hostManager.StopAsync();
5050
// Give the manager and all running tasks some time to shut down gracefully.
51-
//await Task.Delay(timeoutGracePeriod);
51+
// await Task.Delay(timeoutGracePeriod);
5252
IFunctionInvocationDispatcher functionInvocationDispatcher = _functionInvocationDispatcherFactory.GetFunctionDispatcher();
5353
if (!functionInvocationDispatcher.State.Equals(FunctionInvocationDispatcherState.Default))
5454
{
5555
_logger.LogWarning($"A function timeout has occurred. Restarting worker process executing invocationId '{timeoutException.InstanceId}'.", exceptionInfo.SourceException);
56+
5657
// If invocation id is not found in any of the workers => worker is already disposed. No action needed.
57-
await functionInvocationDispatcher.RestartWorkerWithInvocationIdAsync(timeoutException.InstanceId.ToString());
58-
_logger.LogWarning("Restart of language worker process(es) completed.", exceptionInfo.SourceException);
58+
await functionInvocationDispatcher.RestartWorkerWithInvocationIdAsync(timeoutException.InstanceId.ToString(), timeoutException);
59+
60+
_logger.LogWarning("Attempt to restart language worker process(es) completed.", exceptionInfo.SourceException);
5961
}
6062
else
6163
{

src/WebJobs.Script/Http/RetryProxyHandler.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) .NET Foundation. All rights reserved.
1+
// Copyright (c) .NET Foundation. All rights reserved.
22
// Licensed under the MIT License. See License.txt in the project root for license information.
33

44
using System;
@@ -59,7 +59,7 @@ protected override async Task<HttpResponseMessage> SendAsync(HttpRequestMessage
5959
{
6060
if (resultSource is not null && (resultSource.Task.IsFaulted || resultSource.Task.IsCanceled))
6161
{
62-
_logger.LogWarning("HTTP request will not be retried. The associated function invocation has failed.");
62+
// if the ScriptInvocation has already failed, we do not need to retry any more because there's no request to proxy.
6363
throw;
6464
}
6565

src/WebJobs.Script/Workers/Http/HttpFunctionInvocationDispatcher.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ public Task ShutdownAsync()
206206
return Task.CompletedTask;
207207
}
208208

209-
public Task<bool> RestartWorkerWithInvocationIdAsync(string invocationId)
209+
public Task<bool> RestartWorkerWithInvocationIdAsync(string invocationId, Exception exception = null)
210210
{
211211
// Since there's only one channel for httpworker
212212
DisposeAndRestartWorkerChannel(_httpWorkerChannel.Id);

src/WebJobs.Script/Workers/IFunctionInvocationDispatcher.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ public interface IFunctionInvocationDispatcher : IDisposable
2323

2424
Task ShutdownAsync();
2525

26-
Task<bool> RestartWorkerWithInvocationIdAsync(string invocationId);
26+
Task<bool> RestartWorkerWithInvocationIdAsync(string invocationId, Exception exception);
2727

2828
Task StartWorkerChannel();
2929

src/WebJobs.Script/Workers/Rpc/FunctionRegistration/RpcFunctionInvocationDispatcher.cs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) .NET Foundation. All rights reserved.
1+
// Copyright (c) .NET Foundation. All rights reserved.
22
// Licensed under the MIT License. See License.txt in the project root for license information.
33

44
using System;
@@ -676,7 +676,7 @@ public void Dispose()
676676
Dispose(true);
677677
}
678678

679-
public async Task<bool> RestartWorkerWithInvocationIdAsync(string invocationId)
679+
public async Task<bool> RestartWorkerWithInvocationIdAsync(string invocationId, Exception exception = null)
680680
{
681681
// Dispose and restart errored channel with the particular invocation id
682682
var channels = await GetInitializedWorkerChannelsAsync();
@@ -685,10 +685,13 @@ public async Task<bool> RestartWorkerWithInvocationIdAsync(string invocationId)
685685
if (channel.IsExecutingInvocation(invocationId))
686686
{
687687
_logger.LogDebug($"Restarting channel with workerId: '{channel.Id}' that is executing invocation: '{invocationId}' and timed out.");
688-
await DisposeAndRestartWorkerChannel(_workerRuntime, channel.Id);
688+
await DisposeAndRestartWorkerChannel(_workerRuntime, channel.Id, exception);
689689
return true;
690690
}
691691
}
692+
693+
_logger.LogDebug("The channel that is executing invocation {invocationId} no longer exists.", invocationId);
694+
692695
return false;
693696
}
694697

src/WebJobs.Script/Workers/Rpc/IRpcWorkerChannel.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,6 @@ public interface IRpcWorkerChannel : IWorkerChannel
3232

3333
bool IsExecutingInvocation(string invocationId);
3434

35-
bool TryFailExecutions(Exception workerException);
35+
void Shutdown(Exception workerException);
3636
}
3737
}

src/WebJobs.Script/Workers/Rpc/JobHostRpcWorkerChannelManager.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ public Task<bool> ShutdownChannelIfExistsAsync(string channelId, Exception worke
4949
{
5050
string id = rpcChannel.Id;
5151
_logger.LogDebug("Disposing language worker channel with id:{workerId}", id);
52-
rpcChannel.TryFailExecutions(workerException);
52+
rpcChannel.Shutdown(workerException);
5353

5454
(rpcChannel as IDisposable)?.Dispose();
5555
_logger.LogDebug("Disposed language worker channel with id:{workerId}", id);

src/WebJobs.Script/Workers/Rpc/WebHostRpcWorkerChannelManager.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) .NET Foundation. All rights reserved.
1+
// Copyright (c) .NET Foundation. All rights reserved.
22
// Licensed under the MIT License. See License.txt in the project root for license information.
33

44
using System;
@@ -269,7 +269,7 @@ public Task<bool> ShutdownChannelIfExistsAsync(string language, string workerId,
269269
if (workerChannel != null)
270270
{
271271
_logger.LogDebug("Disposing WebHost channel for workerId: {channelId}, for runtime:{language}", workerId, language);
272-
workerChannel.TryFailExecutions(workerException);
272+
workerChannel.Shutdown(workerException);
273273
(channelTask.Result as IDisposable)?.Dispose();
274274
}
275275
}
@@ -295,7 +295,7 @@ public Task<bool> ShutdownChannelIfExistsAsync(string language, string workerId,
295295
if (workerChannel != null)
296296
{
297297
_logger.LogDebug("Disposing WebHost channel for workerId: {channelId}, for runtime:{language}", workerId, language);
298-
workerChannel.TryFailExecutions(workerException);
298+
workerChannel.Shutdown(workerException);
299299
(channelTask.Result as IDisposable)?.Dispose();
300300
}
301301
}

0 commit comments

Comments
 (0)