Skip to content

Commit 713942c

Browse files
authored
[RPC]Lock process restarts on WorkerError (#6930)
[RPC]Lock process restarts on WorkerError
1 parent 6bcde79 commit 713942c

File tree

5 files changed

+88
-38
lines changed

5 files changed

+88
-38
lines changed

src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -524,7 +524,7 @@ internal void HandleWorkerInitError(Exception exc)
524524
internal void HandleWorkerFunctionLoadError(Exception exc)
525525
{
526526
_workerChannelLogger.LogError(exc, "Loading function failed.");
527-
if (_disposing)
527+
if (_disposing || _disposed)
528528
{
529529
return;
530530
}
@@ -534,7 +534,7 @@ internal void HandleWorkerFunctionLoadError(Exception exc)
534534
private void PublishWorkerErrorEvent(Exception exc)
535535
{
536536
_workerInitTask.SetException(exc);
537-
if (_disposing)
537+
if (_disposing || _disposed)
538538
{
539539
return;
540540
}

src/WebJobs.Script/Workers/ProcessManagement/WorkerProcess.cs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ internal abstract class WorkerProcess : IWorkerProcess, IDisposable
2626
private Process _process;
2727
private bool _useStdErrorStreamForErrorsOnly;
2828
private ProcessMonitor _processMonitor;
29-
private bool _disposing;
3029
private Queue<string> _processStdErrDataQueue = new Queue<string>(3);
3130

3231
internal WorkerProcess(IScriptEventManager eventManager, IProcessRegistry processRegistry, ILogger workerProcessLogger, IWorkerConsoleLogSource consoleLogSource, IMetricsLogger metricsLogger, bool useStdErrStreamForErrorsOnly = false)
@@ -39,6 +38,8 @@ internal WorkerProcess(IScriptEventManager eventManager, IProcessRegistry proces
3938
_useStdErrorStreamForErrorsOnly = useStdErrStreamForErrorsOnly;
4039
}
4140

41+
protected bool Disposing { get; private set; }
42+
4243
public int Id => _process.Id;
4344

4445
internal Queue<string> ProcessStdErrDataQueue => _processStdErrDataQueue;
@@ -129,7 +130,7 @@ private void LogError(string msg)
129130

130131
private void OnProcessExited(object sender, EventArgs e)
131132
{
132-
if (_disposing)
133+
if (Disposing)
133134
{
134135
// No action needed
135136
return;
@@ -192,7 +193,7 @@ internal void BuildAndLogConsoleLog(string msg, LogLevel level)
192193

193194
public void Dispose()
194195
{
195-
_disposing = true;
196+
Disposing = true;
196197
// best effort process disposal
197198
try
198199
{

src/WebJobs.Script/Workers/Rpc/FunctionRegistration/RpcFunctionInvocationDispatcher.cs

Lines changed: 73 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,9 @@ internal class RpcFunctionInvocationDispatcher : IFunctionInvocationDispatcher
2929
private readonly IEnvironment _environment;
3030
private readonly IApplicationLifetime _applicationLifetime;
3131
private readonly TimeSpan _shutdownTimeout = TimeSpan.FromSeconds(10);
32-
private readonly TimeSpan thresholdBetweenRestarts = TimeSpan.FromMinutes(WorkerConstants.WorkerRestartErrorIntervalThresholdInMinutes);
32+
private readonly TimeSpan _restartWait = TimeSpan.FromSeconds(10);
33+
private readonly SemaphoreSlim _restartWorkerProcessSLock = new SemaphoreSlim(1, 1);
34+
private readonly TimeSpan _thresholdBetweenRestarts = TimeSpan.FromMinutes(WorkerConstants.WorkerRestartErrorIntervalThresholdInMinutes);
3335

3436
private IScriptEventManager _eventManager;
3537
private IEnumerable<RpcWorkerConfig> _workerConfigs;
@@ -48,6 +50,7 @@ internal class RpcFunctionInvocationDispatcher : IFunctionInvocationDispatcher
4850
private IEnumerable<FunctionMetadata> _functions;
4951
private ConcurrentStack<WorkerErrorEvent> _languageWorkerErrors = new ConcurrentStack<WorkerErrorEvent>();
5052
private CancellationTokenSource _processStartCancellationToken = new CancellationTokenSource();
53+
private CancellationTokenSource _disposeToken = new CancellationTokenSource();
5154
private int _debounceMilliSeconds = (int)TimeSpan.FromSeconds(10).TotalMilliseconds;
5255

5356
public RpcFunctionInvocationDispatcher(IOptions<ScriptJobHostOptions> scriptHostOptions,
@@ -310,35 +313,43 @@ internal async Task<IEnumerable<IRpcWorkerChannel>> GetInitializedWorkerChannels
310313

311314
public async void WorkerError(WorkerErrorEvent workerError)
312315
{
313-
if (!_disposing)
316+
if (_disposing || _disposed)
314317
{
315-
if (string.Equals(_workerRuntime, workerError.Language))
316-
{
317-
_logger.LogDebug("Handling WorkerErrorEvent for runtime:{runtime}, workerId:{workerId}. Failed with: {exception}", workerError.Language, _workerRuntime, workerError.Exception);
318-
AddOrUpdateErrorBucket(workerError);
319-
await DisposeAndRestartWorkerChannel(workerError.Language, workerError.WorkerId, workerError.Exception);
320-
}
321-
else
322-
{
323-
_logger.LogDebug("Received WorkerErrorEvent for runtime:{runtime}, workerId:{workerId}", workerError.Language, workerError.WorkerId);
324-
_logger.LogDebug("WorkerErrorEvent runtime:{runtime} does not match current runtime:{currentRuntime}. Failed with: {exception}", workerError.Language, _workerRuntime, workerError.Exception);
325-
}
318+
return;
319+
}
320+
321+
if (string.Equals(_workerRuntime, workerError.Language))
322+
{
323+
_logger.LogDebug("Handling WorkerErrorEvent for runtime:{runtime}, workerId:{workerId}. Failed with: {exception}", workerError.Language, _workerRuntime, workerError.Exception);
324+
AddOrUpdateErrorBucket(workerError);
325+
await DisposeAndRestartWorkerChannel(workerError.Language, workerError.WorkerId, workerError.Exception);
326+
}
327+
else
328+
{
329+
_logger.LogDebug("Received WorkerErrorEvent for runtime:{runtime}, workerId:{workerId}", workerError.Language, workerError.WorkerId);
330+
_logger.LogDebug("WorkerErrorEvent runtime:{runtime} does not match current runtime:{currentRuntime}. Failed with: {exception}", workerError.Language, _workerRuntime, workerError.Exception);
326331
}
327332
}
328333

329334
public async void WorkerRestart(WorkerRestartEvent workerRestart)
330335
{
331-
if (!_disposing)
336+
if (_disposing || _disposed)
332337
{
333-
_logger.LogDebug("Handling WorkerRestartEvent for runtime:{runtime}, workerId:{workerId}", workerRestart.Language, workerRestart.WorkerId);
334-
await DisposeAndRestartWorkerChannel(workerRestart.Language, workerRestart.WorkerId);
338+
return;
335339
}
340+
341+
_logger.LogDebug("Handling WorkerRestartEvent for runtime:{runtime}, workerId:{workerId}", workerRestart.Language, workerRestart.WorkerId);
342+
await DisposeAndRestartWorkerChannel(workerRestart.Language, workerRestart.WorkerId);
336343
}
337344

338345
private async Task DisposeAndRestartWorkerChannel(string runtime, string workerId, Exception workerException = null)
339346
{
340-
_logger.LogDebug("Attempting to dispose webhost or jobhost channel for workerId: {channelId}, runtime:{language}", workerId, runtime);
347+
if (_disposing || _disposed)
348+
{
349+
return;
350+
}
341351

352+
_logger.LogDebug("Attempting to dispose webhost or jobhost channel for workerId: '{channelId}', runtime: '{language}'", workerId, runtime);
342353
bool isWebHostChannelDisposed = await _webHostLanguageWorkerChannelManager.ShutdownChannelIfExistsAsync(runtime, workerId, workerException);
343354
bool isJobHostChannelDisposed = false;
344355
if (!isWebHostChannelDisposed)
@@ -348,7 +359,7 @@ private async Task DisposeAndRestartWorkerChannel(string runtime, string workerI
348359

349360
if (!isWebHostChannelDisposed && !isJobHostChannelDisposed)
350361
{
351-
_logger.LogDebug("Did not find WebHost or JobHost channel to dispose for workerId: {channelId}, runtime:{language}", workerId, runtime);
362+
_logger.LogDebug("Did not find WebHost or JobHost channel to dispose for workerId: '{channelId}', runtime: '{language}'", workerId, runtime);
352363
}
353364

354365
if (ShouldRestartWorkerChannel(runtime, isWebHostChannelDisposed, isJobHostChannelDisposed))
@@ -360,13 +371,13 @@ private async Task DisposeAndRestartWorkerChannel(string runtime, string workerI
360371
_logger.LogDebug("No initialized worker channels for runtime '{runtime}'. Delaying future invocations", runtime);
361372
}
362373
// Restart worker channel
363-
_logger.LogDebug("Restarting worker channel for runtime:{runtime}", runtime);
364-
await RestartWorkerChannel(runtime, workerId);
374+
_logger.LogDebug("Restarting worker channel for runtime: '{runtime}'", runtime);
375+
await RestartWorkerChannel(runtime);
365376
// State is set back to "Initialized" when worker channel is up again
366377
}
367378
else
368379
{
369-
_logger.LogDebug("Skipping worker channel restart for errored worker runtime:{runtime}, current runtime:{currentRuntime}, isWebHostChannel:{isWebHostChannel}, isJobHostChannel:{isJobHostChannel}",
380+
_logger.LogDebug("Skipping worker channel restart for errored worker runtime: '{runtime}', current runtime: '{currentRuntime}', isWebHostChannel: '{isWebHostChannel}', isJobHostChannel: '{isJobHostChannel}'",
370381
runtime, _workerRuntime, isWebHostChannelDisposed, isJobHostChannelDisposed);
371382
}
372383
}
@@ -376,11 +387,33 @@ internal bool ShouldRestartWorkerChannel(string runtime, bool isWebHostChannel,
376387
return string.Equals(_workerRuntime, runtime, StringComparison.InvariantCultureIgnoreCase) && (isWebHostChannel || isJobHostChannel);
377388
}
378389

379-
private async Task RestartWorkerChannel(string runtime, string workerId)
390+
private async Task RestartWorkerChannel(string runtime)
380391
{
392+
if (_disposing || _disposed)
393+
{
394+
return;
395+
}
396+
381397
if (_languageWorkerErrors.Count < ErrorEventsThreshold)
382398
{
383-
await InitializeJobhostLanguageWorkerChannelAsync(_languageWorkerErrors.Count);
399+
try
400+
{
401+
// Issue only one restart at a time.
402+
await _restartWorkerProcessSLock.WaitAsync();
403+
await InitializeJobhostLanguageWorkerChannelAsync(_languageWorkerErrors.Count);
404+
}
405+
finally
406+
{
407+
// Wait before releasing the lock to give time for the process to startup and initialize.
408+
try
409+
{
410+
await Task.Delay(_restartWait, _disposeToken.Token);
411+
_restartWorkerProcessSLock.Release();
412+
}
413+
catch (TaskCanceledException)
414+
{
415+
}
416+
}
384417
}
385418
else if (_jobHostLanguageWorkerChannelManager.GetChannels().Count() == 0)
386419
{
@@ -393,12 +426,12 @@ private void AddOrUpdateErrorBucket(WorkerErrorEvent currentErrorEvent)
393426
{
394427
if (_languageWorkerErrors.TryPeek(out WorkerErrorEvent top))
395428
{
396-
if ((currentErrorEvent.CreatedAt - top.CreatedAt) > thresholdBetweenRestarts)
429+
if ((currentErrorEvent.CreatedAt - top.CreatedAt) > _thresholdBetweenRestarts)
397430
{
398431
while (!_languageWorkerErrors.IsEmpty)
399432
{
400433
_languageWorkerErrors.TryPop(out WorkerErrorEvent popped);
401-
_logger.LogDebug($"Popping out errorEvent createdAt:{popped.CreatedAt} workerId:{popped.WorkerId}");
434+
_logger.LogDebug($"Popping out errorEvent createdAt: '{popped.CreatedAt}' workerId: '{popped.WorkerId}'");
402435
}
403436
}
404437
}
@@ -407,14 +440,21 @@ private void AddOrUpdateErrorBucket(WorkerErrorEvent currentErrorEvent)
407440

408441
protected virtual void Dispose(bool disposing)
409442
{
410-
if (!_disposed && disposing)
443+
if (!_disposed)
411444
{
412-
_logger.LogDebug("Disposing FunctionDispatcher");
413-
_workerErrorSubscription.Dispose();
414-
_workerRestartSubscription.Dispose();
415-
_processStartCancellationToken.Cancel();
416-
_processStartCancellationToken.Dispose();
417-
_jobHostLanguageWorkerChannelManager.ShutdownChannels();
445+
if (_disposing)
446+
{
447+
_logger.LogDebug("Disposing FunctionDispatcher");
448+
_disposeToken.Cancel();
449+
_disposeToken.Dispose();
450+
_restartWorkerProcessSLock.Dispose();
451+
_workerErrorSubscription.Dispose();
452+
_workerRestartSubscription.Dispose();
453+
_processStartCancellationToken.Cancel();
454+
_processStartCancellationToken.Dispose();
455+
_jobHostLanguageWorkerChannelManager.ShutdownChannels();
456+
}
457+
418458
State = FunctionInvocationDispatcherState.Disposed;
419459
_disposed = true;
420460
}
@@ -435,7 +475,7 @@ public async Task<bool> RestartWorkerWithInvocationIdAsync(string invocationId)
435475
{
436476
if (channel.IsExecutingInvocation(invocationId))
437477
{
438-
_logger.LogInformation($"Restarting channel '{channel.Id}' that is executing invocation '{invocationId}' and timed out.");
478+
_logger.LogDebug($"Restarting channel with workerId: '{channel.Id}' that is executing invocation: '{invocationId}' and timed out.");
439479
await DisposeAndRestartWorkerChannel(_workerRuntime, channel.Id);
440480
return true;
441481
}

src/WebJobs.Script/Workers/Rpc/RpcWorkerProcess.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,10 @@ internal override Process CreateWorkerProcess()
5454

5555
internal override void HandleWorkerProcessExitError(WorkerProcessExitException rpcWorkerProcessExitException)
5656
{
57+
if (Disposing)
58+
{
59+
return;
60+
}
5761
if (rpcWorkerProcessExitException == null)
5862
{
5963
throw new ArgumentNullException(nameof(rpcWorkerProcessExitException));

test/WebJobs.Script.Tests/Workers/Rpc/RpcFunctionInvocationDispatcherTests.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -384,6 +384,11 @@ public async Task FunctionDispatcher_Error_WithinThreshold_BucketFills()
384384
{
385385
TestRpcWorkerChannel testWorkerChannel = channel as TestRpcWorkerChannel;
386386
testWorkerChannel.RaiseWorkerError();
387+
if (i < 2)
388+
{
389+
// wait for restart to complete before raising another error
390+
await WaitForJobhostWorkerChannelsToStartup(functionDispatcher, 1);
391+
}
387392
}
388393
}
389394

0 commit comments

Comments
 (0)