Skip to content

Commit 5d42b96

Browse files
github-actions[bot]surgupta-msftNickCraver
authored andcommitted
[v3.x] Figure out FUNCTIONS_WORKER_RUNTIME from function app content if Environment variable FUNCTIONS_WORKER_RUNTIME is not set (#8212)
* Taking worker runtime from files if not in Env setting * Added tests * Code cleanup * Added tests in Utility * minor restructuring in utility * RpcFunctionInvocationDispatcher: capture async void errors In test runs we're seeing failures like this: ``` The active test run was aborted. Reason: Test host process crashed : Unhandled exception. System.Threading.Tasks.TaskCanceledException: A task was canceled. at Microsoft.Azure.WebJobs.Script.Grpc.GrpcWorkerChannel.StartWorkerProcessAsync(CancellationToken cancellationToken) in /_/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs:line 155 at Microsoft.Azure.WebJobs.Script.Workers.Rpc.RpcFunctionInvocationDispatcher.InitializeJobhostLanguageWorkerChannelAsync(Int32 attemptCount) in /_/src/WebJobs.Script/Workers/Rpc/FunctionRegistration/RpcFunctionInvocationDispatcher.cs:line 119 at Microsoft.Azure.WebJobs.Script.Workers.Rpc.RpcFunctionInvocationDispatcher.StartWorkerChannel(String runtime) in /_/src/WebJobs.Script/Workers/Rpc/FunctionRegistration/RpcFunctionInvocationDispatcher.cs:line 535 at Microsoft.Azure.WebJobs.Script.Workers.Rpc.RpcFunctionInvocationDispatcher.StartWorkerChannel(String runtime) in /_/src/WebJobs.Script/Workers/Rpc/FunctionRegistration/RpcFunctionInvocationDispatcher.cs:line 548 at Microsoft.Azure.WebJobs.Script.Workers.Rpc.RpcFunctionInvocationDispatcher.DisposeAndRestartWorkerChannel(String runtime, String workerId, Exception workerException) in /_/src/WebJobs.Script/Workers/Rpc/FunctionRegistration/RpcFunctionInvocationDispatcher.cs:line 497 at Microsoft.Azure.WebJobs.Script.Workers.Rpc.RpcFunctionInvocationDispatcher.WorkerError(WorkerErrorEvent workerError) in /_/src/WebJobs.Script/Workers/Rpc/FunctionRegistration/RpcFunctionInvocationDispatcher.cs:line 442 at System.Threading.Tasks.Task.<>c.<ThrowAsync>b__128_1(Object state) at System.Threading.QueueUserWorkItemCallbackDefaultContext.Execute() at System.Threading.ThreadPoolWorkQueue.Dispatch() at System.Threading.PortableThreadPool.WorkerThread.WorkerThreadStart() at System.Threading.Thread.StartCallback() ``` This is overall because of the async void usage in the event handlers for the `ScriptEvent` pipe. By kicking off a task that could error (what `async void` does under the covers), its only option is to crash the runtime when an error happens. We have the options of: 1. Let the runtime crash (what happens today) 2. Capture only the `TaskCancelledException` case (what this PR does) 3. Capture and log all errors The problem with #3 is it introduces a state I'm tooo naïve to reason about: what happens when an error happens _and we fail to restart the worker_? This could be a downstream net win from a state standpoint of "well, we restart everything and recover". It's not fast though, vs. restarting just the worker. If that's _not_ a new state and it's handled correctly, 3 is the better/global option. The interaction here with the test suite is: 1. We throw an worker error (in testing) 2. An event is triggered kicking off an `async void` to restart the worker 3. We dispose of the Rpc bits (when the test finishes) 4. That background test is cancelled, throwing an error and crashing our runtime I think it's worth noting this is fixing this class only - similar usages (grep `async void`) lie in other areas and is something we should address globally. Writing this up to make sure the direction is correct/agreeable as a first step. Co-authored-by: Surgupta <[email protected]> Co-authored-by: Nick Craver <[email protected]>
1 parent 270a708 commit 5d42b96

File tree

4 files changed

+101
-12
lines changed

4 files changed

+101
-12
lines changed

src/WebJobs.Script/Utility.cs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -631,9 +631,21 @@ internal static bool IsSingleLanguage(IEnumerable<FunctionMetadata> functions, s
631631
return ContainsFunctionWithWorkerRuntime(filteredFunctions, workerRuntime);
632632
}
633633

634-
internal static string GetWorkerRuntime(IEnumerable<FunctionMetadata> functions)
634+
internal static string GetWorkerRuntime(IEnumerable<FunctionMetadata> functions, IEnvironment environment = null)
635635
{
636-
if (IsSingleLanguage(functions, null))
636+
string workerRuntime = null;
637+
638+
if (environment != null)
639+
{
640+
workerRuntime = environment.GetEnvironmentVariable(EnvironmentSettingNames.FunctionWorkerRuntime);
641+
642+
if (!string.IsNullOrEmpty(workerRuntime))
643+
{
644+
return workerRuntime;
645+
}
646+
}
647+
648+
if (functions != null && IsSingleLanguage(functions, null))
637649
{
638650
var filteredFunctions = functions?.Where(f => !f.IsCodeless());
639651
string functionLanguage = filteredFunctions.FirstOrDefault()?.Language;

src/WebJobs.Script/Workers/Rpc/FunctionRegistration/RpcFunctionInvocationDispatcher.cs

Lines changed: 31 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,8 @@ public async Task InitializeAsync(IEnumerable<FunctionMetadata> functions, Cance
217217
return;
218218
}
219219

220-
_workerRuntime = _workerRuntime ?? _environment.GetEnvironmentVariable(EnvironmentSettingNames.FunctionWorkerRuntime);
220+
_workerRuntime = _workerRuntime ?? Utility.GetWorkerRuntime(functions, _environment);
221+
221222
if (string.IsNullOrEmpty(_workerRuntime) || _workerRuntime.Equals(RpcWorkerConstants.DotNetLanguageWorkerName, StringComparison.InvariantCultureIgnoreCase))
222223
{
223224
// Shutdown any placeholder channels for empty function apps or dotnet function apps.
@@ -434,16 +435,26 @@ public async void WorkerError(WorkerErrorEvent workerError)
434435
return;
435436
}
436437

437-
if (string.Equals(_workerRuntime, workerError.Language))
438+
try
438439
{
439-
_logger.LogDebug("Handling WorkerErrorEvent for runtime:{runtime}, workerId:{workerId}. Failed with: {exception}", workerError.Language, _workerRuntime, workerError.Exception);
440-
AddOrUpdateErrorBucket(workerError);
441-
await DisposeAndRestartWorkerChannel(workerError.Language, workerError.WorkerId, workerError.Exception);
440+
if (string.Equals(_workerRuntime, workerError.Language))
441+
{
442+
_logger.LogDebug("Handling WorkerErrorEvent for runtime:{runtime}, workerId:{workerId}. Failed with: {exception}", workerError.Language, _workerRuntime, workerError.Exception);
443+
AddOrUpdateErrorBucket(workerError);
444+
await DisposeAndRestartWorkerChannel(workerError.Language, workerError.WorkerId, workerError.Exception);
445+
}
446+
else
447+
{
448+
_logger.LogDebug("Received WorkerErrorEvent for runtime:{runtime}, workerId:{workerId}", workerError.Language, workerError.WorkerId);
449+
_logger.LogDebug("WorkerErrorEvent runtime:{runtime} does not match current runtime:{currentRuntime}. Failed with: {exception}", workerError.Language, _workerRuntime, workerError.Exception);
450+
}
442451
}
443-
else
452+
catch (TaskCanceledException)
444453
{
445-
_logger.LogDebug("Received WorkerErrorEvent for runtime:{runtime}, workerId:{workerId}", workerError.Language, workerError.WorkerId);
446-
_logger.LogDebug("WorkerErrorEvent runtime:{runtime} does not match current runtime:{currentRuntime}. Failed with: {exception}", workerError.Language, _workerRuntime, workerError.Exception);
454+
// Specifically in the "we were torn down while trying to restart" case, we want to catch here and ignore
455+
// If we don't catch the exception from an async void method, we'll end up tearing down the entire runtime instead
456+
// It's possible we want to catch *all* exceptions and log or ignore here, but taking the minimal change first
457+
// For example if we capture and log, we're left in a worker-less state with a working Host runtime - is that desired? Will it self recover elsewhere?
447458
}
448459
}
449460

@@ -454,8 +465,18 @@ public async void WorkerRestart(WorkerRestartEvent workerRestart)
454465
return;
455466
}
456467

457-
_logger.LogDebug("Handling WorkerRestartEvent for runtime:{runtime}, workerId:{workerId}", workerRestart.Language, workerRestart.WorkerId);
458-
await DisposeAndRestartWorkerChannel(workerRestart.Language, workerRestart.WorkerId);
468+
try
469+
{
470+
_logger.LogDebug("Handling WorkerRestartEvent for runtime:{runtime}, workerId:{workerId}", workerRestart.Language, workerRestart.WorkerId);
471+
await DisposeAndRestartWorkerChannel(workerRestart.Language, workerRestart.WorkerId);
472+
}
473+
catch (TaskCanceledException)
474+
{
475+
// Specifically in the "we were torn down while trying to restart" case, we want to catch here and ignore
476+
// If we don't catch the exception from an async void method, we'll end up tearing down the entire runtime instead
477+
// It's possible we want to catch *all* exceptions and log or ignore here, but taking the minimal change first
478+
// For example if we capture and log, we're left in a worker-less state with a working Host runtime - is that desired? Will it self recover elsewhere?
479+
}
459480
}
460481

461482
public async Task StartWorkerChannel()

test/WebJobs.Script.Tests/UtilityTests.cs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -595,6 +595,27 @@ public void IsSupported_Returns_True(string language, string funcMetadataLanguag
595595
Assert.True(Utility.IsFunctionMetadataLanguageSupportedByWorkerRuntime(func1, language));
596596
}
597597

598+
[Theory]
599+
[InlineData(null)]
600+
[InlineData("java")]
601+
public void GetWorkerRuntimeTests(string workerRuntime)
602+
{
603+
FunctionMetadata func1 = new FunctionMetadata()
604+
{
605+
Name = "func1",
606+
Language = workerRuntime
607+
};
608+
609+
IEnumerable<FunctionMetadata> functionMetadatas = new List<FunctionMetadata>
610+
{
611+
func1
612+
};
613+
614+
var testEnv = new TestEnvironment();
615+
testEnv.SetEnvironmentVariable(EnvironmentSettingNames.FunctionWorkerRuntime, workerRuntime);
616+
Assert.True(Utility.GetWorkerRuntime(functionMetadatas, testEnv) == workerRuntime);
617+
}
618+
598619
[Theory]
599620
[InlineData("node", "java")]
600621
[InlineData("java", "node")]

test/WebJobs.Script.Tests/Workers/Rpc/RpcFunctionInvocationDispatcherTests.cs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,41 @@ public async Task WorkerIndexing_Setting_ChannelInitializationState_Succeeds()
126126
Assert.Equal(expectedProcessCount, initializedChannelsCount);
127127
}
128128

129+
[Theory]
130+
[InlineData(null)]
131+
[InlineData(RpcWorkerConstants.JavaLanguageWorkerName)]
132+
public async Task WorkerRuntime_Setting_ChannelInitializationState_Succeeds(string workerRuntime)
133+
{
134+
_testLoggerProvider.ClearAllLogMessages();
135+
int expectedProcessCount = 1;
136+
RpcFunctionInvocationDispatcher functionDispatcher = GetTestFunctionDispatcher(expectedProcessCount, false, runtime: workerRuntime, workerIndexing: true);
137+
138+
// create channels and ensure that they aren't ready for invocation requests yet
139+
await functionDispatcher.InitializeAsync(new List<FunctionMetadata>());
140+
141+
if (!string.IsNullOrEmpty(workerRuntime))
142+
{
143+
int createdChannelsCount = await WaitForJobhostWorkerChannelsToStartup(functionDispatcher, expectedProcessCount, false);
144+
Assert.Equal(expectedProcessCount, createdChannelsCount);
145+
146+
IEnumerable<IRpcWorkerChannel> channels = await functionDispatcher.GetInitializedWorkerChannelsAsync();
147+
Assert.Equal(0, channels.Count());
148+
149+
// set up invocation buffers, send load requests, and ensure that the channels are now set up for invocation requests
150+
var functions = GetTestFunctionsList(RpcWorkerConstants.JavaLanguageWorkerName);
151+
await functionDispatcher.FinishInitialization(functions);
152+
int initializedChannelsCount = await WaitForJobhostWorkerChannelsToStartup(functionDispatcher, expectedProcessCount, true);
153+
Assert.Equal(expectedProcessCount, initializedChannelsCount);
154+
}
155+
else
156+
{
157+
foreach (var currChannel in functionDispatcher.JobHostLanguageWorkerChannelManager.GetChannels())
158+
{
159+
Assert.True(((TestRpcWorkerChannel)currChannel).ExecutionContexts.Count == 0);
160+
}
161+
}
162+
}
163+
129164
[Fact]
130165
public async Task Starting_MultipleJobhostChannels_Failed()
131166
{

0 commit comments

Comments
 (0)