Skip to content

Commit c5ad842

Browse files
committed
HttpWorker multiple restarts (#6350)
1 parent 2d2b276 commit c5ad842

File tree

7 files changed

+203
-51
lines changed

7 files changed

+203
-51
lines changed

src/WebJobs.Script/Workers/Http/DefaultHttpWorkerService.cs

Lines changed: 21 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -185,33 +185,29 @@ public async Task<bool> IsWorkerReady(CancellationToken cancellationToken)
185185
{
186186
bool continueWaitingForWorker = await Utility.DelayAsync(WorkerConstants.WorkerInitTimeoutSeconds, WorkerConstants.WorkerReadyCheckPollingIntervalMilliseconds, async () =>
187187
{
188-
return await IsWorkerReadyForRequest();
189-
}, cancellationToken);
190-
return !continueWaitingForWorker;
191-
}
192-
193-
private async Task<bool> IsWorkerReadyForRequest()
194-
{
195-
string requestUri = new UriBuilder(WorkerConstants.HttpScheme, WorkerConstants.HostName, _httpWorkerOptions.Port).ToString();
196-
HttpRequestMessage httpRequestMessage = new HttpRequestMessage();
197-
httpRequestMessage.RequestUri = new Uri(requestUri);
198-
try
199-
{
200-
await _httpClient.SendAsync(httpRequestMessage);
201-
// Any Http response indicates a valid server Url
202-
return false;
203-
}
204-
catch (HttpRequestException httpRequestEx)
205-
{
206-
if (httpRequestEx.InnerException != null && httpRequestEx.InnerException is SocketException)
188+
string requestUri = new UriBuilder(WorkerConstants.HttpScheme, WorkerConstants.HostName, _httpWorkerOptions.Port).ToString();
189+
HttpRequestMessage httpRequestMessage = new HttpRequestMessage();
190+
httpRequestMessage.RequestUri = new Uri(requestUri);
191+
try
207192
{
208-
// Wait for the worker to be ready
209-
_logger.LogDebug("Waiting for HttpWorker to be initialized. Request to: {requestUri} failing with exception message: {message}", requestUri, httpRequestEx.Message);
210-
return true;
193+
await _httpClient.SendAsync(httpRequestMessage);
194+
// Any Http response indicates a valid server Url
195+
return false;
211196
}
212-
// Any other inner exception, consider HttpWorker to be ready
213-
return false;
214-
}
197+
catch (HttpRequestException httpRequestEx)
198+
{
199+
if (httpRequestEx.InnerException != null && httpRequestEx.InnerException is SocketException)
200+
{
201+
// Wait for the worker to be ready
202+
_logger.LogDebug("Waiting for HttpWorker to be initialized. Request to: {requestUri} failing with exception message: {message}", requestUri, httpRequestEx.Message);
203+
return true;
204+
}
205+
// Any other inner exception, consider HttpWorker to be ready
206+
return false;
207+
}
208+
}, cancellationToken);
209+
210+
return !continueWaitingForWorker;
215211
}
216212

217213
internal string BuildAndGetUri(string pathValue = null)

src/WebJobs.Script/Workers/Http/HttpFunctionInvocationDispatcher.cs

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -67,17 +67,11 @@ internal Task InitializeHttpWorkerChannelAsync(int attemptCount, CancellationTok
6767
{
6868
_httpWorkerChannel = _httpWorkerChannelFactory.Create(_scriptOptions.RootScriptPath, _metricsLogger, attemptCount);
6969
_httpWorkerChannel.StartWorkerProcessAsync(cancellationToken).ContinueWith(workerInitTask =>
70-
{
71-
if (workerInitTask.IsCompleted)
72-
{
73-
_logger.LogDebug("Adding http worker channel. workerId:{id}", _httpWorkerChannel.Id);
74-
SetFunctionDispatcherStateToInitializedAndLog();
75-
}
76-
else
77-
{
78-
_logger.LogWarning("Failed to start http worker process. workerId:{id}", _httpWorkerChannel.Id);
79-
}
80-
});
70+
{
71+
_logger.LogDebug("Adding http worker channel. workerId:{id}", _httpWorkerChannel.Id);
72+
SetFunctionDispatcherStateToInitializedAndLog();
73+
}, TaskContinuationOptions.OnlyOnRanToCompletion);
74+
8175
return Task.CompletedTask;
8276
}
8377

@@ -103,7 +97,7 @@ public async Task InitializeAsync(IEnumerable<FunctionMetadata> functions, Cance
10397

10498
public Task InvokeAsync(ScriptInvocationContext invocationContext)
10599
{
106-
return _httpWorkerChannel.InvokeFunction(invocationContext);
100+
return _httpWorkerChannel.InvokeAsync(invocationContext);
107101
}
108102

109103
public async void WorkerError(HttpWorkerErrorEvent workerError)

src/WebJobs.Script/Workers/Http/HttpWorkerChannel.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ internal HttpWorkerChannel(
4141

4242
public string Id { get; }
4343

44-
public Task InvokeFunction(ScriptInvocationContext context)
44+
public Task InvokeAsync(ScriptInvocationContext context)
4545
{
4646
return _httpWorkerService.InvokeAsync(context);
4747
}
@@ -56,7 +56,7 @@ internal async Task DelayUntilWokerInitialized(CancellationToken cancellationTok
5656
bool isWorkerReady = await _httpWorkerService.IsWorkerReady(cancellationToken);
5757
if (!isWorkerReady)
5858
{
59-
PublishWorkerErrorEvent(new TimeoutException("Initializing HttpWorker timed out."));
59+
throw new TimeoutException("Initializing HttpWorker timed out.");
6060
}
6161
else
6262
{
@@ -66,7 +66,9 @@ internal async Task DelayUntilWokerInitialized(CancellationToken cancellationTok
6666
catch (Exception ex)
6767
{
6868
// HttpFunctionInvocationDispatcher will handdle the worker error events
69+
_workerChannelLogger.LogError(ex, "Failed to start http worker process. workerId:{id}", Id);
6970
PublishWorkerErrorEvent(ex);
71+
throw;
7072
}
7173
}
7274
}

src/WebJobs.Script/Workers/Http/IHttpWorkerChannel.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ public interface IHttpWorkerChannel
1111
{
1212
string Id { get; }
1313

14-
Task InvokeFunction(ScriptInvocationContext context);
14+
Task InvokeAsync(ScriptInvocationContext context);
1515

1616
Task StartWorkerProcessAsync(CancellationToken cancellationToken = default);
1717
}

src/WebJobs.Script/Workers/Rpc/FunctionRegistration/RpcFunctionInvocationDispatcher.cs

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -118,17 +118,10 @@ internal Task InitializeJobhostLanguageWorkerChannelAsync(int attemptCount)
118118
_jobHostLanguageWorkerChannelManager.AddChannel(rpcWorkerChannel);
119119
rpcWorkerChannel.StartWorkerProcessAsync().ContinueWith(workerInitTask =>
120120
{
121-
if (workerInitTask.IsCompleted)
122-
{
123-
_logger.LogDebug("Adding jobhost language worker channel for runtime: {language}. workerId:{id}", _workerRuntime, rpcWorkerChannel.Id);
124-
rpcWorkerChannel.SendFunctionLoadRequests(_managedDependencyOptions.Value);
125-
SetFunctionDispatcherStateToInitializedAndLog();
126-
}
127-
else
128-
{
129-
_logger.LogWarning("Failed to start language worker process for runtime: {language}. workerId:{id}", _workerRuntime, rpcWorkerChannel.Id);
130-
}
131-
});
121+
_logger.LogDebug("Adding jobhost language worker channel for runtime: {language}. workerId:{id}", _workerRuntime, rpcWorkerChannel.Id);
122+
rpcWorkerChannel.SendFunctionLoadRequests(_managedDependencyOptions.Value);
123+
SetFunctionDispatcherStateToInitializedAndLog();
124+
}, TaskContinuationOptions.OnlyOnRanToCompletion);
132125
return Task.CompletedTask;
133126
}
134127

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
// Copyright (c) .NET Foundation. All rights reserved.
2+
// Licensed under the MIT License. See License.txt in the project root for license information.
3+
4+
using System;
5+
using System.Reactive.Linq;
6+
using System.Threading;
7+
using System.Threading.Tasks;
8+
using Microsoft.Azure.WebJobs.Script.Diagnostics;
9+
using Microsoft.Azure.WebJobs.Script.Eventing;
10+
using Microsoft.Azure.WebJobs.Script.Workers;
11+
using Microsoft.Extensions.Logging.Abstractions;
12+
using Microsoft.Extensions.Options;
13+
using Moq;
14+
using Xunit;
15+
16+
namespace Microsoft.Azure.WebJobs.Script.Tests.Workers.Http
17+
{
18+
public class HttpFunctionInvocationDispatcherTests
19+
{
20+
[Theory]
21+
[InlineData(true)]
22+
[InlineData(false)]
23+
public async Task TestDelay_StartProcess(bool startWorkerProcessResult)
24+
{
25+
Mock<IOptions<ScriptJobHostOptions>> mockOptions = new Mock<IOptions<ScriptJobHostOptions>>();
26+
Mock<IScriptEventManager> mockEventManager = new Mock<IScriptEventManager>();
27+
Mock<IHttpWorkerChannelFactory> mockFactory = new Mock<IHttpWorkerChannelFactory>();
28+
Mock<IHttpWorkerChannel> mockChannel = new Mock<IHttpWorkerChannel>();
29+
30+
mockOptions.Setup(a => a.Value).Returns(new ScriptJobHostOptions());
31+
if (startWorkerProcessResult)
32+
{
33+
mockChannel.Setup(a => a.StartWorkerProcessAsync(It.IsAny<CancellationToken>())).Returns(Task.FromResult(startWorkerProcessResult));
34+
}
35+
else
36+
{
37+
mockChannel.Setup(a => a.StartWorkerProcessAsync(It.IsAny<CancellationToken>())).Throws(new Exception("Random exception"));
38+
}
39+
40+
mockFactory.Setup(a => a.Create(It.IsAny<string>(), It.IsAny<IMetricsLogger>(), It.IsAny<int>())).Returns(mockChannel.Object);
41+
42+
HttpFunctionInvocationDispatcher dispatcher = new HttpFunctionInvocationDispatcher(mockOptions.Object, null, null, mockEventManager.Object, NullLoggerFactory.Instance, mockFactory.Object);
43+
Assert.Equal(dispatcher.State, FunctionInvocationDispatcherState.Default);
44+
try
45+
{
46+
await dispatcher.InitializeHttpWorkerChannelAsync(3);
47+
}
48+
catch (Exception)
49+
{
50+
}
51+
52+
await Task.Delay(TimeSpan.FromMilliseconds(500));
53+
if (startWorkerProcessResult)
54+
{
55+
Assert.Equal(dispatcher.State, FunctionInvocationDispatcherState.Initialized);
56+
}
57+
else
58+
{
59+
Assert.NotEqual(dispatcher.State, FunctionInvocationDispatcherState.Initialized);
60+
}
61+
}
62+
}
63+
}
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
// Copyright (c) .NET Foundation. All rights reserved.
2+
// Licensed under the MIT License. See License.txt in the project root for license information.
3+
4+
using System;
5+
using System.Linq;
6+
using System.Threading;
7+
using System.Threading.Tasks;
8+
using Microsoft.Azure.WebJobs.Script.Diagnostics;
9+
using Microsoft.Azure.WebJobs.Script.Eventing;
10+
using Microsoft.Azure.WebJobs.Script.Workers;
11+
using Moq;
12+
using Xunit;
13+
14+
namespace Microsoft.Azure.WebJobs.Script.Tests.Workers.Http
15+
{
16+
public class HttpWorkerChannelTests
17+
{
18+
private readonly IMetricsLogger _metricsLogger;
19+
private readonly IScriptEventManager _eventManager;
20+
21+
public HttpWorkerChannelTests()
22+
{
23+
_metricsLogger = new TestMetricsLogger();
24+
Mock<IScriptEventManager> mockEventManager = new Mock<IScriptEventManager>();
25+
mockEventManager.Setup(a => a.Publish(It.IsAny<ScriptEvent>()));
26+
_eventManager = mockEventManager.Object;
27+
}
28+
29+
[Theory]
30+
[InlineData(true)]
31+
[InlineData(false)]
32+
public async Task TestStartWorkerProcess(bool isWorkerReady)
33+
{
34+
Mock<IWorkerProcess> workerProcess = new Mock<IWorkerProcess>();
35+
Mock<IHttpWorkerService> httpWorkerService = new Mock<IHttpWorkerService>();
36+
37+
httpWorkerService.Setup(a => a.IsWorkerReady(It.IsAny<CancellationToken>())).Returns(Task.FromResult(isWorkerReady));
38+
workerProcess.Setup(a => a.StartProcessAsync()).Returns(Task.CompletedTask);
39+
TestLogger logger = new TestLogger("HttpWorkerChannel");
40+
IHttpWorkerChannel testWorkerChannel = new HttpWorkerChannel("RandomWorkerId", _eventManager, workerProcess.Object, httpWorkerService.Object, logger, _metricsLogger, 3);
41+
Task resultTask = null;
42+
try
43+
{
44+
resultTask = testWorkerChannel.StartWorkerProcessAsync();
45+
await resultTask;
46+
logger.GetLogMessages().Select(a => a.FormattedMessage).Contains("HttpWorker is Initialized.");
47+
Assert.Equal(resultTask.Status, TaskStatus.RanToCompletion);
48+
}
49+
catch (Exception)
50+
{
51+
logger.GetLogMessages().Select(a => a.FormattedMessage).Contains("Failed to start http worker process. workerId:");
52+
Assert.NotEqual(resultTask.Status, TaskStatus.RanToCompletion);
53+
}
54+
}
55+
56+
[Fact]
57+
public async Task TestStartWorkerProcess_WorkerServiceThrowsException()
58+
{
59+
Mock<IWorkerProcess> workerProcess = new Mock<IWorkerProcess>();
60+
Mock<IHttpWorkerService> httpWorkerService = new Mock<IHttpWorkerService>();
61+
IMetricsLogger metricsLogger = new TestMetricsLogger();
62+
63+
httpWorkerService.Setup(a => a.IsWorkerReady(It.IsAny<CancellationToken>())).Throws(new Exception("RandomException"));
64+
workerProcess.Setup(a => a.StartProcessAsync()).Returns(Task.CompletedTask);
65+
TestLogger logger = new TestLogger("HttpWorkerChannel");
66+
IHttpWorkerChannel testWorkerChannel = new HttpWorkerChannel("RandomWorkerId", _eventManager, workerProcess.Object, httpWorkerService.Object, logger, _metricsLogger, 3);
67+
Task resultTask = null;
68+
try
69+
{
70+
resultTask = testWorkerChannel.StartWorkerProcessAsync();
71+
await resultTask;
72+
}
73+
catch (Exception)
74+
{
75+
logger.GetLogMessages().Select(a => a.FormattedMessage).Contains("Failed to start http worker process. workerId:");
76+
Assert.NotEqual(resultTask.Status, TaskStatus.RanToCompletion);
77+
}
78+
}
79+
80+
[Fact]
81+
public async Task TestStartWorkerProcess_WorkerProcessThrowsException()
82+
{
83+
Mock<IWorkerProcess> workerProcess = new Mock<IWorkerProcess>();
84+
Mock<IHttpWorkerService> httpWorkerService = new Mock<IHttpWorkerService>();
85+
IMetricsLogger metricsLogger = new TestMetricsLogger();
86+
87+
httpWorkerService.Setup(a => a.IsWorkerReady(It.IsAny<CancellationToken>())).Throws(new Exception("RandomException"));
88+
workerProcess.Setup(a => a.StartProcessAsync()).Throws(new Exception("RandomException"));
89+
TestLogger logger = new TestLogger("HttpWorkerChannel");
90+
IHttpWorkerChannel testWorkerChannel = new HttpWorkerChannel("RandomWorkerId", _eventManager, workerProcess.Object, httpWorkerService.Object, logger, _metricsLogger, 3);
91+
Task resultTask = null;
92+
try
93+
{
94+
resultTask = testWorkerChannel.StartWorkerProcessAsync();
95+
await resultTask;
96+
}
97+
catch (Exception)
98+
{
99+
logger.GetLogMessages().Select(a => a.FormattedMessage).Contains("Failed to start http worker process. workerId:");
100+
Assert.NotEqual(resultTask.Status, TaskStatus.RanToCompletion);
101+
}
102+
}
103+
}
104+
}

0 commit comments

Comments
 (0)