Skip to content

Commit cc3410c

Browse files
committed
Add unit tests for worker exit
1 parent 45a4043 commit cc3410c

File tree

4 files changed

+153
-65
lines changed

4 files changed

+153
-65
lines changed

src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -388,10 +388,15 @@ public async Task StartWorkerProcessAsync(CancellationToken cancellationToken)
388388
_workerChannelLogger.LogDebug("Initiating Worker Process start up");
389389
await _rpcWorkerProcess.StartProcessAsync(cancellationToken);
390390
_state |= RpcWorkerChannelState.Initializing;
391-
Task winner = await Task.WhenAny(
392-
_workerInitTask.Task, _rpcWorkerProcess.WaitForExitAsync(cancellationToken))
393-
.WaitAsync(cancellationToken);
391+
Task exited = _rpcWorkerProcess.WaitForExitAsync(cancellationToken);
392+
Task winner = await Task.WhenAny(_workerInitTask.Task, exited).WaitAsync(cancellationToken);
394393
await winner;
394+
395+
if (winner == exited)
396+
{
397+
// process exited without throwing. We need to throw to indicate process is not running.
398+
throw new WorkerProcessExitException("Worker process exited before initializing.");
399+
}
395400
}
396401

397402
public async Task<WorkerStatus> GetWorkerStatusAsync()

src/WebJobs.Script/Workers/ProcessManagement/WorkerProcess.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -85,9 +85,9 @@ public Task StartProcessAsync(CancellationToken cancellationToken = default)
8585
_processExit = new();
8686
try
8787
{
88-
Process.ErrorDataReceived += (sender, e) => OnErrorDataReceived(sender, e);
89-
Process.OutputDataReceived += (sender, e) => OnOutputDataReceived(sender, e);
90-
Process.Exited += (sender, e) => OnProcessExited(sender, e);
88+
Process.ErrorDataReceived += OnErrorDataReceived;
89+
Process.OutputDataReceived += OnOutputDataReceived;
90+
Process.Exited += OnProcessExited;
9191
Process.EnableRaisingEvents = true;
9292
string sanitizedArguments = Sanitizer.Sanitize(Process.StartInfo.Arguments);
9393

test/WebJobs.Script.Tests/Workers/Rpc/GrpcWorkerChannelTests.cs

Lines changed: 83 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,9 @@ public class GrpcWorkerChannelTests : IDisposable
3939
private static string _expectedSystemLogMessage = "Random system log message";
4040
private static string _expectedLoadMsgPartial = "Sending FunctionLoadRequest for ";
4141

42-
private readonly Mock<IWorkerProcess> _mockrpcWorkerProcess = new Mock<IWorkerProcess>();
42+
private readonly Mock<IWorkerProcess> _mockRpcWorkerProcess = new Mock<IWorkerProcess>();
4343
private readonly string _workerId = "testWorkerId";
44-
private readonly string _scriptRootPath = "c:\testdir";
44+
private readonly string _scriptRootPath = "c:\\testdir";
4545
private readonly IScriptEventManager _eventManager = new ScriptEventManager();
4646
private readonly Mock<IScriptHostManager> _mockScriptHostManager = new Mock<IScriptHostManager>(MockBehavior.Strict);
4747
private readonly TestMetricsLogger _metricsLogger = new TestMetricsLogger();
@@ -57,7 +57,6 @@ public class GrpcWorkerChannelTests : IDisposable
5757
private readonly IOptionsMonitor<ScriptApplicationHostOptions> _hostOptionsMonitor;
5858
private readonly IMemoryMappedFileAccessor _mapAccessor;
5959
private readonly ISharedMemoryManager _sharedMemoryManager;
60-
private readonly IFunctionDataCache _functionDataCache;
6160
private readonly IOptions<WorkerConcurrencyOptions> _workerConcurrencyOptions;
6261
private readonly ITestOutputHelper _testOutput;
6362
private readonly IOptions<FunctionsHostingConfigOptions> _hostingConfigOptions;
@@ -76,24 +75,24 @@ public GrpcWorkerChannelTests(ITestOutputHelper testOutput)
7675
_testWorkerConfig.CountOptions.InitializationTimeout = TimeSpan.FromSeconds(5);
7776
_testWorkerConfig.CountOptions.EnvironmentReloadTimeout = TimeSpan.FromSeconds(5);
7877

79-
_mockrpcWorkerProcess.Setup(m => m.StartProcessAsync(default)).Returns(Task.CompletedTask);
80-
_mockrpcWorkerProcess.Setup(m => m.Id).Returns(910);
78+
_mockRpcWorkerProcess.Setup(m => m.StartProcessAsync(It.IsAny<CancellationToken>())).Returns(Task.CompletedTask);
79+
_mockRpcWorkerProcess.Setup(m => m.WaitForExitAsync(It.IsAny<CancellationToken>())).Returns(Task.Delay(Timeout.Infinite));
80+
_mockRpcWorkerProcess.Setup(m => m.Id).Returns(910);
8181
_testEnvironment = new TestEnvironment();
8282
_testEnvironment.SetEnvironmentVariable(FunctionDataCacheConstants.FunctionDataCacheEnabledSettingName, "1");
8383
_workerConcurrencyOptions = Options.Create(new WorkerConcurrencyOptions());
8484
_workerConcurrencyOptions.Value.CheckInterval = TimeSpan.FromSeconds(1);
8585

86-
ILogger<MemoryMappedFileAccessor> mmapAccessorLogger = NullLogger<MemoryMappedFileAccessor>.Instance;
86+
ILogger<MemoryMappedFileAccessor> mMapAccessorLogger = NullLogger<MemoryMappedFileAccessor>.Instance;
8787
if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
8888
{
89-
_mapAccessor = new MemoryMappedFileAccessorWindows(mmapAccessorLogger);
89+
_mapAccessor = new MemoryMappedFileAccessorWindows(mMapAccessorLogger);
9090
}
9191
else
9292
{
93-
_mapAccessor = new MemoryMappedFileAccessorUnix(mmapAccessorLogger, _testEnvironment);
93+
_mapAccessor = new MemoryMappedFileAccessorUnix(mMapAccessorLogger, _testEnvironment);
9494
}
9595
_sharedMemoryManager = new SharedMemoryManager(_loggerFactory, _mapAccessor);
96-
_functionDataCache = new FunctionDataCache(_sharedMemoryManager, _loggerFactory, _testEnvironment);
9796

9897
var hostOptions = new ScriptApplicationHostOptions
9998
{
@@ -127,7 +126,7 @@ private Task CreateDefaultWorkerChannel(bool autoStart = true, IDictionary<strin
127126
_eventManager,
128127
_mockScriptHostManager.Object,
129128
_testWorkerConfig,
130-
_mockrpcWorkerProcess.Object,
129+
_mockRpcWorkerProcess.Object,
131130
_logger,
132131
_metricsLogger,
133132
0,
@@ -182,6 +181,77 @@ await Assert.ThrowsAsync<TaskCanceledException>(async () =>
182181
});
183182
}
184183

184+
[Fact]
185+
public async Task StartWorkerProcessAsync_ProcessExits_Throws()
186+
{
187+
_mockRpcWorkerProcess.Setup(m => m.WaitForExitAsync(It.IsAny<CancellationToken>()))
188+
.Returns(Task.CompletedTask);
189+
await CreateDefaultWorkerChannel(autoStart: false);
190+
191+
WorkerProcessExitException ex = await Assert.ThrowsAsync<WorkerProcessExitException>(
192+
() => _workerChannel.StartWorkerProcessAsync(default))
193+
.WaitAsync(TimeSpan.FromMilliseconds(100));
194+
Assert.Equal(0, ex.ExitCode);
195+
Assert.Equal("Worker process exited before initializing.", ex.Message);
196+
}
197+
198+
[Fact]
199+
public async Task StartWorkerProcessAsync_ProcessFaults_Throws()
200+
{
201+
WorkerProcessExitException expected = new("Process has faulted.") { ExitCode = -1 };
202+
_mockRpcWorkerProcess.Setup(m => m.WaitForExitAsync(It.IsAny<CancellationToken>()))
203+
.ThrowsAsync(expected);
204+
await CreateDefaultWorkerChannel(autoStart: false);
205+
206+
WorkerProcessExitException actual = await Assert.ThrowsAsync<WorkerProcessExitException>(
207+
() => _workerChannel.StartWorkerProcessAsync(default))
208+
.WaitAsync(TimeSpan.FromMilliseconds(100));
209+
Assert.Equal(expected, actual);
210+
}
211+
212+
[Fact]
213+
public async Task StartWorkerProcessAsync_TimesOut()
214+
{
215+
await CreateDefaultWorkerChannel(autoStart: false); // suppress for timeout
216+
var initTask = _workerChannel.StartWorkerProcessAsync(CancellationToken.None);
217+
await Assert.ThrowsAsync<TimeoutException>(async () => await initTask);
218+
}
219+
220+
[Fact]
221+
public async Task StartWorkerProcessAsync_WorkerProcess_Throws()
222+
{
223+
// note: uses custom worker channel
224+
Mock<IWorkerProcess> mockrpcWorkerProcessThatThrows = new Mock<IWorkerProcess>();
225+
mockrpcWorkerProcessThatThrows.Setup(m => m.StartProcessAsync(default)).Throws<FileNotFoundException>();
226+
227+
_workerChannel = new GrpcWorkerChannel(
228+
_workerId,
229+
_eventManager,
230+
_mockScriptHostManager.Object,
231+
_testWorkerConfig,
232+
mockrpcWorkerProcessThatThrows.Object,
233+
_logger,
234+
_metricsLogger,
235+
0,
236+
_testEnvironment,
237+
_hostOptionsMonitor,
238+
_sharedMemoryManager,
239+
_workerConcurrencyOptions,
240+
_hostingConfigOptions,
241+
_httpProxyService);
242+
await Assert.ThrowsAsync<FileNotFoundException>(async () => await _workerChannel.StartWorkerProcessAsync(CancellationToken.None));
243+
}
244+
245+
[Fact]
246+
public async Task StartWorkerProcessAsync_Invoked_SetupFunctionBuffers_Verify_ReadyForInvocation()
247+
{
248+
await CreateDefaultWorkerChannel();
249+
_mockRpcWorkerProcess.Verify(m => m.StartProcessAsync(default), Times.Once);
250+
Assert.False(_workerChannel.IsChannelReadyForInvocations());
251+
_workerChannel.SetupFunctionInvocationBuffers(GetTestFunctionsList("node"));
252+
Assert.True(_workerChannel.IsChannelReadyForInvocations());
253+
}
254+
185255
[Fact]
186256
public async Task WorkerChannel_Dispose_With_WorkerTerminateCapability()
187257
{
@@ -221,16 +291,6 @@ public async Task WorkerChannel_Dispose_Without_WorkerTerminateCapability()
221291
Assert.False(traces.Any(m => string.Equals(m.FormattedMessage, expectedLogMsg)));
222292
}
223293

224-
[Fact]
225-
public async Task StartWorkerProcessAsync_Invoked_SetupFunctionBuffers_Verify_ReadyForInvocation()
226-
{
227-
await CreateDefaultWorkerChannel();
228-
_mockrpcWorkerProcess.Verify(m => m.StartProcessAsync(default), Times.Once);
229-
Assert.False(_workerChannel.IsChannelReadyForInvocations());
230-
_workerChannel.SetupFunctionInvocationBuffers(GetTestFunctionsList("node"));
231-
Assert.True(_workerChannel.IsChannelReadyForInvocations());
232-
}
233-
234294
[Fact]
235295
public async Task DisposingChannel_NotReadyForInvocation()
236296
{
@@ -259,14 +319,6 @@ public void SetupFunctionBuffers_Verify_ReadyForInvocation_Returns_False()
259319
Assert.False(_workerChannel.IsChannelReadyForInvocations());
260320
}
261321

262-
[Fact]
263-
public async Task StartWorkerProcessAsync_TimesOut()
264-
{
265-
await CreateDefaultWorkerChannel(autoStart: false); // suppress for timeout
266-
var initTask = _workerChannel.StartWorkerProcessAsync(CancellationToken.None);
267-
await Assert.ThrowsAsync<TimeoutException>(async () => await initTask);
268-
}
269-
270322
[Fact]
271323
public async Task SendEnvironmentReloadRequest_Generates_ExpectedMetrics()
272324
{
@@ -283,31 +335,6 @@ public async Task SendEnvironmentReloadRequest_Generates_ExpectedMetrics()
283335
Assert.True(_metricsLogger.EventsBegan.Contains(MetricEventNames.SpecializationEnvironmentReloadRequestResponse));
284336
}
285337

286-
[Fact]
287-
public async Task StartWorkerProcessAsync_WorkerProcess_Throws()
288-
{
289-
// note: uses custom worker channel
290-
Mock<IWorkerProcess> mockrpcWorkerProcessThatThrows = new Mock<IWorkerProcess>();
291-
mockrpcWorkerProcessThatThrows.Setup(m => m.StartProcessAsync(default)).Throws<FileNotFoundException>();
292-
293-
_workerChannel = new GrpcWorkerChannel(
294-
_workerId,
295-
_eventManager,
296-
_mockScriptHostManager.Object,
297-
_testWorkerConfig,
298-
mockrpcWorkerProcessThatThrows.Object,
299-
_logger,
300-
_metricsLogger,
301-
0,
302-
_testEnvironment,
303-
_hostOptionsMonitor,
304-
_sharedMemoryManager,
305-
_workerConcurrencyOptions,
306-
_hostingConfigOptions,
307-
_httpProxyService);
308-
await Assert.ThrowsAsync<FileNotFoundException>(async () => await _workerChannel.StartWorkerProcessAsync(CancellationToken.None));
309-
}
310-
311338
[Fact]
312339
public async Task SendWorkerInitRequest_PublishesOutboundEvent()
313340
{
@@ -547,7 +574,7 @@ public async Task Drain_Verify()
547574
_eventManager,
548575
_mockScriptHostManager.Object,
549576
_testWorkerConfig,
550-
_mockrpcWorkerProcess.Object,
577+
_mockRpcWorkerProcess.Object,
551578
_logger,
552579
_metricsLogger,
553580
0,
@@ -1256,7 +1283,7 @@ public async Task GetLatencies_StartsTimer_WhenDynamicConcurrencyEnabled()
12561283
_eventManager,
12571284
_mockScriptHostManager.Object,
12581285
config,
1259-
_mockrpcWorkerProcess.Object,
1286+
_mockRpcWorkerProcess.Object,
12601287
_logger,
12611288
_metricsLogger,
12621289
0,
@@ -1297,7 +1324,7 @@ public async Task GetLatencies_DoesNot_StartTimer_WhenDynamicConcurrencyDisabled
12971324
_eventManager,
12981325
_mockScriptHostManager.Object,
12991326
config,
1300-
_mockrpcWorkerProcess.Object,
1327+
_mockRpcWorkerProcess.Object,
13011328
_logger,
13021329
_metricsLogger,
13031330
0,

test/WebJobs.Script.Tests/Workers/Rpc/RpcWorkerProcessTests.cs

Lines changed: 59 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,9 @@
22
// Licensed under the MIT License. See License.txt in the project root for license information.
33

44
using System;
5-
using System.Collections;
6-
using System.Collections.Generic;
75
using System.Diagnostics;
86
using System.Linq;
7+
using System.Threading.Tasks;
98
using Microsoft.Azure.WebJobs.Host.Scale;
109
using Microsoft.Azure.WebJobs.Script.Config;
1110
using Microsoft.Azure.WebJobs.Script.Eventing;
@@ -260,5 +259,62 @@ public void WorkerProcess_WaitForExit_AfterExit_DoesNotThrow()
260259
Exception ex = traces.Single().Exception;
261260
Assert.IsType<InvalidOperationException>(ex);
262261
}
262+
263+
[Fact]
264+
public async Task WorkerProcess_WaitForExit_NotStarted_Throws()
265+
{
266+
using var rpcWorkerProcess = GetRpcWorkerConfigProcess(
267+
TestHelpers.GetTestWorkerConfigsWithExecutableWorkingDirectory().ElementAt(0));
268+
await Assert.ThrowsAsync<InvalidOperationException>(() => rpcWorkerProcess.WaitForExitAsync());
269+
}
270+
271+
[Fact]
272+
public async Task WorkerProcess_WaitForExit_Success_TaskCompletes()
273+
{
274+
// arrange
275+
using Process process = GetProcess(exitCode: 0);
276+
_hostProcessMonitorMock.Setup(m => m.RegisterChildProcess(process));
277+
_workerProcessFactory.Setup(m => m.CreateWorkerProcess(It.IsNotNull<WorkerContext>())).Returns(process);
278+
using var rpcWorkerProcess = GetRpcWorkerConfigProcess(
279+
TestHelpers.GetTestWorkerConfigsWithExecutableWorkingDirectory().ElementAt(0));
280+
281+
// act
282+
await rpcWorkerProcess.StartProcessAsync();
283+
284+
// assert
285+
await rpcWorkerProcess.WaitForExitAsync();
286+
}
287+
288+
[Fact]
289+
public async Task WorkerProcess_WaitForExit_Error_Rethrows()
290+
{
291+
// arrange
292+
using Process process = GetProcess(exitCode: -1);
293+
_hostProcessMonitorMock.Setup(m => m.RegisterChildProcess(process));
294+
_workerProcessFactory.Setup(m => m.CreateWorkerProcess(It.IsNotNull<WorkerContext>())).Returns(process);
295+
using var rpcWorkerProcess = GetRpcWorkerConfigProcess(
296+
TestHelpers.GetTestWorkerConfigsWithExecutableWorkingDirectory().ElementAt(0));
297+
298+
// act
299+
await rpcWorkerProcess.StartProcessAsync();
300+
301+
// assert
302+
await Assert.ThrowsAnyAsync<WorkerProcessExitException>(() => rpcWorkerProcess.WaitForExitAsync());
303+
}
304+
305+
private static Process GetProcess(int exitCode)
306+
{
307+
return new()
308+
{
309+
StartInfo = new()
310+
{
311+
WindowStyle = ProcessWindowStyle.Hidden,
312+
FileName = OperatingSystem.IsWindows() ? "cmd" : "bash",
313+
Arguments = OperatingSystem.IsWindows() ? $"/C exit {exitCode}" : $"-c \"exit {exitCode}\"",
314+
RedirectStandardError = true,
315+
RedirectStandardOutput = true,
316+
}
317+
};
318+
}
263319
}
264-
}
320+
}

0 commit comments

Comments
 (0)