Skip to content

Commit e5fe2af

Browse files
committed
Delay invocations until worker ready on restart due to error (hotfix commit) (#5668)
* minimal changes to cover most cases * Fix function invocation dispatcher change * Fix comments * No delays on placeholder mode or invalid. Added fix to http worker too * remove dupe * adding tests and addressing CR * don't remove log * log the exceptions * update unit tests * remove dupe test * fix * fix tests (cherry picked from commit 5e452aa) (cherry picked from commit e74710a)
1 parent f60c470 commit e5fe2af

File tree

7 files changed

+127
-10
lines changed

7 files changed

+127
-10
lines changed

src/WebJobs.Script/Description/Workers/WorkerFunctionInvoker.cs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,11 @@ protected override async Task<object> InvokeCore(object[] parameters, FunctionIn
9090

9191
private async Task DelayUntilFunctionDispatcherInitializedOrShutdown()
9292
{
93-
if (_functionDispatcher != null && _functionDispatcher.State == FunctionInvocationDispatcherState.Initializing)
93+
// Don't delay if functionDispatcher is already initialized OR is skipping initialization for one of
94+
// these reasons: started in placeholder, has no functions, functions do not match set language.
95+
bool ready = _functionDispatcher.State == FunctionInvocationDispatcherState.Initialized || _functionDispatcher.State == FunctionInvocationDispatcherState.Default;
96+
97+
if (!ready)
9498
{
9599
_logger.LogDebug($"functionDispatcher state: {_functionDispatcher.State}");
96100
bool result = await Utility.DelayAsync((_functionDispatcher.ErrorEventsThreshold + 1) * WorkerConstants.ProcessStartTimeoutSeconds, WorkerConstants.WorkerReadyCheckPollingIntervalMilliseconds, () =>

src/WebJobs.Script/Workers/FunctionInvocationDispatcherState.cs

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,30 @@ public enum FunctionInvocationDispatcherState
1111
Default,
1212

1313
/// <summary>
14-
/// The FunctionDispatcherState is starting.
14+
/// The FunctionDispatcher is starting.
1515
/// </summary>
1616
Initializing,
1717

1818
/// <summary>
19-
/// The FunctionDispatcherState has been fully initialized and can accept direct function
20-
/// invocations. All functions have been indexed. Listeners may not yet
21-
/// be not yet running.
19+
/// The FunctionDispatcher has been fully initialized and can accept function
20+
/// invocations. All functions have been indexed. At least one worker process is initialized.
2221
/// </summary>
23-
Initialized
22+
Initialized,
23+
24+
/// <summary>
25+
/// The FunctionDispatcher was previously "Initialized" but no longer has any initialized
26+
/// worker processes to handle function invocations.
27+
/// </summary>
28+
WorkerProcessRestarting,
29+
30+
/// <summary>
31+
/// The FunctionDispatcher is disposing
32+
/// </summary>
33+
Disposing,
34+
35+
/// <summary>
36+
/// The FunctionDispatcher is disposed
37+
/// </summary>
38+
Disposed
2439
}
2540
}

src/WebJobs.Script/Workers/Http/HttpFunctionInvocationDispatcher.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ public async void WorkerError(HttpWorkerErrorEvent workerError)
103103
{
104104
if (!_disposing)
105105
{
106-
_logger.LogDebug("Handling WorkerErrorEvent for workerId:{workerId}", workerError.WorkerId);
106+
_logger.LogDebug("Handling WorkerErrorEvent for workerId:{workerId}. Failed with: {exception}", workerError.WorkerId, workerError.Exception);
107107
AddOrUpdateErrorBucket(workerError);
108108
await DisposeAndRestartWorkerChannel(workerError.WorkerId);
109109
}
@@ -120,6 +120,9 @@ public async void WorkerRestart(HttpWorkerRestartEvent workerRestart)
120120

121121
private async Task DisposeAndRestartWorkerChannel(string workerId)
122122
{
123+
// Since we only have one HTTP worker process, as soon as we dispose it, InvokeAsync will fail. Set state to
124+
// indicate we are not ready to receive new requests.
125+
State = FunctionInvocationDispatcherState.WorkerProcessRestarting;
123126
_logger.LogDebug("Disposing channel for workerId: {channelId}", workerId);
124127
if (_httpWorkerChannel != null)
125128
{
@@ -166,13 +169,15 @@ protected virtual void Dispose(bool disposing)
166169
_workerErrorSubscription.Dispose();
167170
_workerRestartSubscription.Dispose();
168171
(_httpWorkerChannel as IDisposable)?.Dispose();
172+
State = FunctionInvocationDispatcherState.Disposed;
169173
_disposed = true;
170174
}
171175
}
172176

173177
public void Dispose()
174178
{
175179
_disposing = true;
180+
State = FunctionInvocationDispatcherState.Disposing;
176181
Dispose(true);
177182
}
178183

src/WebJobs.Script/Workers/Rpc/FunctionRegistration/RpcFunctionInvocationDispatcher.cs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -284,7 +284,7 @@ public async void WorkerError(WorkerErrorEvent workerError)
284284
{
285285
if (string.Equals(_workerRuntime, workerError.Language))
286286
{
287-
_logger.LogDebug("Handling WorkerErrorEvent for runtime:{runtime}, workerId:{workerId}", workerError.Language, workerError.WorkerId);
287+
_logger.LogDebug("Handling WorkerErrorEvent for runtime:{runtime}, workerId:{workerId}. Failed with: {exception}", workerError.Language, _workerRuntime, workerError.Exception);
288288
AddOrUpdateErrorBucket(workerError);
289289
await DisposeAndRestartWorkerChannel(workerError.Language, workerError.WorkerId);
290290
}
@@ -332,8 +332,16 @@ private async Task DisposeAndRestartWorkerChannel(string runtime, string workerI
332332

333333
if (ShouldRestartWorkerChannel(runtime, isWebHostChannel, isJobHostChannel))
334334
{
335+
// Set state to "WorkerProcessRestarting" if there are no other workers to handle work
336+
if ((await GetInitializedWorkerChannelsAsync()).Count() == 0)
337+
{
338+
State = FunctionInvocationDispatcherState.WorkerProcessRestarting;
339+
_logger.LogDebug("No initialized worker channels for runtime '{runtime}'. Delaying future invocations", runtime);
340+
}
341+
// Restart worker channel
335342
_logger.LogDebug("Restarting worker channel for runtime:{runtime}", runtime);
336343
await RestartWorkerChannel(runtime, workerId);
344+
// State is set back to "Initialized" when worker channel is up again
337345
}
338346
else
339347
{
@@ -386,13 +394,15 @@ protected virtual void Dispose(bool disposing)
386394
_processStartCancellationToken.Cancel();
387395
_processStartCancellationToken.Dispose();
388396
_jobHostLanguageWorkerChannelManager.DisposeAndRemoveChannels();
397+
State = FunctionInvocationDispatcherState.Disposed;
389398
_disposed = true;
390399
}
391400
}
392401

393402
public void Dispose()
394403
{
395404
_disposing = true;
405+
State = FunctionInvocationDispatcherState.Disposing;
396406
Dispose(true);
397407
}
398408
}

test/WebJobs.Script.Tests/Description/Worker/WorkerFunctionInvokerTests.cs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,29 @@ public async Task InvokeTimeout_CallsShutdown()
5656
_applicationLifetime.Verify(a => a.StopApplication(), Times.Once);
5757
}
5858

59+
[Theory]
60+
[InlineData(FunctionInvocationDispatcherState.Default, false)]
61+
[InlineData(FunctionInvocationDispatcherState.Initializing, true)]
62+
[InlineData(FunctionInvocationDispatcherState.Initialized, false)]
63+
[InlineData(FunctionInvocationDispatcherState.WorkerProcessRestarting, true)]
64+
[InlineData(FunctionInvocationDispatcherState.Disposing, true)]
65+
[InlineData(FunctionInvocationDispatcherState.Disposed, true)]
66+
public async Task FunctionDispatcher_DelaysInvoke_WhenNotReady(FunctionInvocationDispatcherState state, bool delaysExecution)
67+
{
68+
_mockFunctionInvocationDispatcher.Setup(a => a.State).Returns(state);
69+
var timeoutTask = Task.Delay(TimeSpan.FromSeconds(5));
70+
var invokeCoreTask = _testFunctionInvoker.InvokeCore(new object[] { }, null);
71+
var result = await Task.WhenAny(invokeCoreTask, timeoutTask);
72+
if (delaysExecution)
73+
{
74+
Assert.Equal(timeoutTask, result);
75+
}
76+
else
77+
{
78+
Assert.Equal(invokeCoreTask, result);
79+
}
80+
}
81+
5982
[Fact]
6083
public async Task InvokeInitialized_DoesNotCallShutdown()
6184
{

test/WebJobs.Script.Tests/Workers/Rpc/RpcFunctionInvocationDispatcherTests.cs

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -200,8 +200,31 @@ public async Task FunctionDispatcherState_Transitions_From_Starting_To_Initializ
200200
func1
201201
};
202202
await functionDispatcher.InitializeAsync(functions);
203-
Assert.True(functionDispatcher.State == FunctionInvocationDispatcherState.Initializing || functionDispatcher.State == FunctionInvocationDispatcherState.Initialized);
203+
Assert.Equal(FunctionInvocationDispatcherState.Initializing, functionDispatcher.State);
204204
await WaitForFunctionDispactherStateInitialized(functionDispatcher);
205+
Assert.Equal(FunctionInvocationDispatcherState.Initialized, functionDispatcher.State);
206+
}
207+
208+
[Fact]
209+
public async Task FunctionDispatcherState_Transitions_From_Default_To_Initialized_To_Disposing()
210+
{
211+
RpcFunctionInvocationDispatcher functionDispatcher = GetTestFunctionDispatcher();
212+
Assert.Equal(FunctionInvocationDispatcherState.Default, functionDispatcher.State);
213+
FunctionMetadata func1 = new FunctionMetadata()
214+
{
215+
Name = "func1",
216+
Language = "node"
217+
};
218+
var functions = new List<FunctionMetadata>()
219+
{
220+
func1
221+
};
222+
await functionDispatcher.InitializeAsync(functions);
223+
Assert.Equal(FunctionInvocationDispatcherState.Initializing, functionDispatcher.State);
224+
await WaitForFunctionDispactherStateInitialized(functionDispatcher);
225+
Assert.Equal(FunctionInvocationDispatcherState.Initialized, functionDispatcher.State);
226+
functionDispatcher.Dispose();
227+
Assert.True(functionDispatcher == null || functionDispatcher.State == FunctionInvocationDispatcherState.Disposing || functionDispatcher.State == FunctionInvocationDispatcherState.Disposed);
205228
}
206229

207230
[Fact]
@@ -225,6 +248,43 @@ public async Task FunctionDispatcher_Restart_ErroredChannels_Succeeds()
225248
Assert.Equal(expectedProcessCount, finalChannelCount);
226249
}
227250

251+
[Fact]
252+
public async Task FunctionDispatcher_Restart_ErroredChannels_And_Changes_State()
253+
{
254+
int expectedProcessCount = 1;
255+
RpcFunctionInvocationDispatcher functionDispatcher = GetTestFunctionDispatcher(expectedProcessCount.ToString());
256+
Assert.Equal(FunctionInvocationDispatcherState.Default, functionDispatcher.State);
257+
// Add worker
258+
await functionDispatcher.InitializeAsync(GetTestFunctionsList(RpcWorkerConstants.NodeLanguageWorkerName));
259+
await WaitForJobhostWorkerChannelsToStartup(functionDispatcher, expectedProcessCount);
260+
TestRpcWorkerChannel testWorkerChannel = (TestRpcWorkerChannel)functionDispatcher.JobHostLanguageWorkerChannelManager.GetChannels().FirstOrDefault();
261+
// Restart this channel
262+
testWorkerChannel.RaiseWorkerRestart();
263+
await TestHelpers.Await(() =>
264+
{
265+
return functionDispatcher.State == FunctionInvocationDispatcherState.WorkerProcessRestarting;
266+
}, 3000);
267+
await WaitForJobhostWorkerChannelsToStartup(functionDispatcher, expectedProcessCount);
268+
Assert.Equal(FunctionInvocationDispatcherState.Initialized, functionDispatcher.State);
269+
}
270+
271+
[Fact]
272+
public async Task FunctionDispatcher_Restart_ErroredChannels_And_DoesNot_Change_State()
273+
{
274+
int expectedProcessCount = 2;
275+
RpcFunctionInvocationDispatcher functionDispatcher = GetTestFunctionDispatcher(expectedProcessCount.ToString());
276+
Assert.Equal(FunctionInvocationDispatcherState.Default, functionDispatcher.State);
277+
// Add worker
278+
await functionDispatcher.InitializeAsync(GetTestFunctionsList(RpcWorkerConstants.NodeLanguageWorkerName));
279+
await WaitForJobhostWorkerChannelsToStartup(functionDispatcher, expectedProcessCount);
280+
TestRpcWorkerChannel testWorkerChannel = (TestRpcWorkerChannel)functionDispatcher.JobHostLanguageWorkerChannelManager.GetChannels().FirstOrDefault();
281+
// Restart one channel
282+
testWorkerChannel.RaiseWorkerRestart();
283+
Assert.Equal(FunctionInvocationDispatcherState.Initialized, functionDispatcher.State);
284+
await WaitForJobhostWorkerChannelsToStartup(functionDispatcher, expectedProcessCount);
285+
Assert.Equal(FunctionInvocationDispatcherState.Initialized, functionDispatcher.State);
286+
}
287+
228288
[Fact]
229289
public async Task FunctionDispatcher_Restart_ErroredChannels_ExceedsLimit()
230290
{

test/WebJobs.Script.Tests/Workers/Rpc/TestRpcWorkerChannel.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ public void SendInvocationRequest(ScriptInvocationContext context)
7171
public async Task StartWorkerProcessAsync()
7272
{
7373
// To verify FunctionDispatcher transistions
74-
await Task.Delay(TimeSpan.FromMilliseconds(100));
74+
await Task.Delay(TimeSpan.FromMilliseconds(500));
7575

7676
if (_throwOnProcessStartUp)
7777
{

0 commit comments

Comments
 (0)