Skip to content

Commit 8bdb09f

Browse files
authored
Fail executions when the worker goes down v2 (#5928)
* fail executions when the worker goes down * matching test logic to behavior * address ahmed pr comments
1 parent 62408d9 commit 8bdb09f

12 files changed

+97
-14
lines changed

src/WebJobs.Script/Workers/Rpc/FunctionRegistration/RpcFunctionInvocationDispatcher.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ public async Task InitializeAsync(IEnumerable<FunctionMetadata> functions, Cance
201201
catch (Exception ex)
202202
{
203203
_logger.LogWarning(ex, "Removing errored webhost language worker channel for runtime: {workerRuntime} workerId:{workerId}", _workerRuntime, workerId);
204-
await _webHostLanguageWorkerChannelManager.ShutdownChannelIfExistsAsync(_workerRuntime, workerId);
204+
await _webHostLanguageWorkerChannelManager.ShutdownChannelIfExistsAsync(_workerRuntime, workerId, ex);
205205
InitializeWebhostLanguageWorkerChannel();
206206
}
207207
}
@@ -289,7 +289,7 @@ public async void WorkerError(WorkerErrorEvent workerError)
289289
{
290290
_logger.LogDebug("Handling WorkerErrorEvent for runtime:{runtime}, workerId:{workerId}. Failed with: {exception}", workerError.Language, _workerRuntime, workerError.Exception);
291291
AddOrUpdateErrorBucket(workerError);
292-
await DisposeAndRestartWorkerChannel(workerError.Language, workerError.WorkerId);
292+
await DisposeAndRestartWorkerChannel(workerError.Language, workerError.WorkerId, workerError.Exception);
293293
}
294294
else
295295
{
@@ -308,15 +308,15 @@ public async void WorkerRestart(WorkerRestartEvent workerRestart)
308308
}
309309
}
310310

311-
private async Task DisposeAndRestartWorkerChannel(string runtime, string workerId)
311+
private async Task DisposeAndRestartWorkerChannel(string runtime, string workerId, Exception workerException = null)
312312
{
313313
_logger.LogDebug("Attempting to dispose webhost or jobhost channel for workerId: {channelId}, runtime:{language}", workerId, runtime);
314314

315-
bool isWebHostChannelDisposed = await _webHostLanguageWorkerChannelManager.ShutdownChannelIfExistsAsync(runtime, workerId);
315+
bool isWebHostChannelDisposed = await _webHostLanguageWorkerChannelManager.ShutdownChannelIfExistsAsync(runtime, workerId, workerException);
316316
bool isJobHostChannelDisposed = false;
317317
if (!isWebHostChannelDisposed)
318318
{
319-
isJobHostChannelDisposed = await _jobHostLanguageWorkerChannelManager.ShutdownChannelIfExistsAsync(workerId);
319+
isJobHostChannelDisposed = await _jobHostLanguageWorkerChannelManager.ShutdownChannelIfExistsAsync(workerId, workerException);
320320
}
321321

322322
if (!isWebHostChannelDisposed && !isJobHostChannelDisposed)

src/WebJobs.Script/Workers/Rpc/IJobHostRpcWorkerChannelManager.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// Copyright (c) .NET Foundation. All rights reserved.
22
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
33

4+
using System;
45
using System.Collections.Generic;
56
using System.Threading.Tasks;
67

@@ -10,7 +11,7 @@ internal interface IJobHostRpcWorkerChannelManager
1011
{
1112
void AddChannel(IRpcWorkerChannel channel);
1213

13-
Task<bool> ShutdownChannelIfExistsAsync(string channelId);
14+
Task<bool> ShutdownChannelIfExistsAsync(string channelId, Exception workerException);
1415

1516
void ShutdownChannels();
1617

src/WebJobs.Script/Workers/Rpc/IRpcWorkerChannel.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,5 +29,7 @@ public interface IRpcWorkerChannel
2929
Task DrainInvocationsAsync();
3030

3131
bool IsExecutingInvocation(string invocationId);
32+
33+
bool TryFailExecutions(Exception workerException);
3234
}
3335
}

src/WebJobs.Script/Workers/Rpc/IWebHostRpcWorkerChannelManager.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// Copyright (c) .NET Foundation. All rights reserved.
22
// Licensed under the MIT License. See License.txt in the project root for license information.
33

4+
using System;
45
using System.Collections.Generic;
56
using System.Threading.Tasks;
67

@@ -14,7 +15,7 @@ public interface IWebHostRpcWorkerChannelManager
1415

1516
Task SpecializeAsync();
1617

17-
Task<bool> ShutdownChannelIfExistsAsync(string language, string workerId);
18+
Task<bool> ShutdownChannelIfExistsAsync(string language, string workerId, Exception workerException);
1819

1920
Task ShutdownChannelsAsync();
2021
}

src/WebJobs.Script/Workers/Rpc/JobHostRpcWorkerChannelManager.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,12 @@ public void AddChannel(IRpcWorkerChannel channel)
2424
_channels.TryAdd(channel.Id, channel);
2525
}
2626

27-
public Task<bool> ShutdownChannelIfExistsAsync(string channelId)
27+
public Task<bool> ShutdownChannelIfExistsAsync(string channelId, Exception workerException)
2828
{
2929
if (_channels.TryRemove(channelId, out IRpcWorkerChannel removedChannel))
3030
{
3131
_logger.LogDebug("Disposing JobHost language worker channel with id:{workerId}", removedChannel.Id);
32+
removedChannel.TryFailExecutions(workerException);
3233
(removedChannel as IDisposable)?.Dispose();
3334
return Task.FromResult(true);
3435
}

src/WebJobs.Script/Workers/Rpc/RpcWorkerChannel.cs

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -346,7 +346,8 @@ internal void SendInvocationRequest(ScriptInvocationContext context)
346346
internal void InvokeResponse(InvocationResponse invokeResponse)
347347
{
348348
_workerChannelLogger.LogDebug("InvocationResponse received for invocation id: {Id}", invokeResponse.InvocationId);
349-
if (_executingInvocations.TryRemove(invokeResponse.InvocationId, out ScriptInvocationContext context)
349+
bool invocationExists = _executingInvocations.TryRemove(invokeResponse.InvocationId, out ScriptInvocationContext context);
350+
if (invocationExists
350351
&& invokeResponse.Result.IsSuccess(context.ResultSource))
351352
{
352353
try
@@ -366,6 +367,11 @@ internal void InvokeResponse(InvocationResponse invokeResponse)
366367
context.ResultSource.TrySetException(responseEx);
367368
}
368369
}
370+
371+
if (!invocationExists)
372+
{
373+
_workerChannelLogger.LogDebug("Invocation was not found as an executing invocation (id={id})", invokeResponse.InvocationId);
374+
}
369375
}
370376

371377
internal void Log(RpcEvent msg)
@@ -507,5 +513,22 @@ public bool IsExecutingInvocation(string invocationId)
507513
{
508514
return _executingInvocations.ContainsKey(invocationId);
509515
}
516+
517+
public bool TryFailExecutions(Exception workerException)
518+
{
519+
if (workerException == null)
520+
{
521+
return false;
522+
}
523+
524+
foreach (ScriptInvocationContext currContext in _executingInvocations?.Values)
525+
{
526+
string invocationId = currContext?.ExecutionContext?.InvocationId.ToString();
527+
_workerChannelLogger.LogDebug("Worker '{workerId}' encountered a fatal error. Failing invocation id: {Id}", _workerId, invocationId);
528+
_executingInvocations.TryRemove(invocationId, out ScriptInvocationContext _);
529+
currContext?.ResultSource?.TrySetException(workerException);
530+
}
531+
return true;
532+
}
510533
}
511534
}

src/WebJobs.Script/Workers/Rpc/WebHostRpcWorkerChannelManager.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ private bool UsePlaceholderChannel(string workerRuntime)
146146
return false;
147147
}
148148

149-
public Task<bool> ShutdownChannelIfExistsAsync(string language, string workerId)
149+
public Task<bool> ShutdownChannelIfExistsAsync(string language, string workerId, Exception workerException = null)
150150
{
151151
if (string.IsNullOrEmpty(language))
152152
{
@@ -168,6 +168,8 @@ public Task<bool> ShutdownChannelIfExistsAsync(string language, string workerId)
168168
if (workerChannel != null)
169169
{
170170
_logger.LogDebug("Disposing WebHost channel for workerId: {channelId}, for runtime:{language}", workerId, language);
171+
// Set exception if exists
172+
workerChannel.TryFailExecutions(workerException);
171173
(channelTask.Result as IDisposable)?.Dispose();
172174
}
173175
}

test/WebJobs.Script.Tests/Workers/Rpc/RpcFunctionInvocationDispatcherTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -506,7 +506,7 @@ private static RpcFunctionInvocationDispatcher GetTestFunctionDispatcher(string
506506
WorkerConfigs = TestHelpers.GetTestWorkerConfigs()
507507
};
508508
IRpcWorkerChannelFactory testLanguageWorkerChannelFactory = new TestRpcWorkerChannelFactory(eventManager, _testLogger, scriptOptions.Value.RootScriptPath, throwOnProcessStartUp);
509-
IWebHostRpcWorkerChannelManager testWebHostLanguageWorkerChannelManager = new TesRpcWorkerChannelManager(eventManager, _testLogger, scriptOptions.Value.RootScriptPath, testLanguageWorkerChannelFactory);
509+
IWebHostRpcWorkerChannelManager testWebHostLanguageWorkerChannelManager = new TestRpcWorkerChannelManager(eventManager, _testLogger, scriptOptions.Value.RootScriptPath, testLanguageWorkerChannelFactory);
510510
IJobHostRpcWorkerChannelManager jobHostLanguageWorkerChannelManager = new JobHostRpcWorkerChannelManager(loggerFactory);
511511
if (addWebhostChannel)
512512
{

test/WebJobs.Script.Tests/Workers/Rpc/RpcWorkerChannelTests.cs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,20 @@ public async Task Drain_Verify()
224224
Assert.Equal(result.Status, TaskStatus.RanToCompletion);
225225
}
226226

227+
[Fact]
228+
public void InFlight_Functions_FailedWithException()
229+
{
230+
var resultSource = new TaskCompletionSource<ScriptInvocationResult>();
231+
ScriptInvocationContext scriptInvocationContext = GetTestScriptInvocationContext(Guid.NewGuid(), resultSource);
232+
_workerChannel.SendInvocationRequest(scriptInvocationContext);
233+
Assert.True(_workerChannel.IsExecutingInvocation(scriptInvocationContext.ExecutionContext.InvocationId.ToString()));
234+
Exception workerException = new Exception("worker failed");
235+
_workerChannel.TryFailExecutions(workerException);
236+
Assert.False(_workerChannel.IsExecutingInvocation(scriptInvocationContext.ExecutionContext.InvocationId.ToString()));
237+
Assert.Equal(TaskStatus.Faulted, resultSource.Task.Status);
238+
Assert.Equal(workerException, resultSource.Task.Exception.InnerException);
239+
}
240+
227241
[Fact]
228242
public void SendLoadRequests_PublishesOutboundEvents()
229243
{

test/WebJobs.Script.Tests/Workers/Rpc/TestRpcWorkerChannel.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,5 +130,12 @@ public bool IsExecutingInvocation(string invocationId)
130130
{
131131
return _executingInvocations.Contains(invocationId);
132132
}
133+
134+
public bool TryFailExecutions(Exception exception)
135+
{
136+
// Executions are no longer executing
137+
_executingInvocations = new HashSet<string>();
138+
return true;
139+
}
133140
}
134141
}

0 commit comments

Comments
 (0)