Skip to content

Commit 107be00

Browse files
authored
Wait for HttpWorker to be ready before sending invocations (#5195)
1 parent 6d341d2 commit 107be00

13 files changed

+191
-53
lines changed

src/WebJobs.Script/Description/Workers/WorkerFunctionInvoker.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ private async Task DelayUntilFunctionDispatcherInitialized()
8989
if (_functionDispatcher != null && _functionDispatcher.State == FunctionInvocationDispatcherState.Initializing)
9090
{
9191
_logger.LogDebug($"functionDispatcher state: {_functionDispatcher.State}");
92-
await Utility.DelayAsync(WorkerConstants.ProcessStartTimeoutSeconds, 25, () =>
92+
await Utility.DelayAsync(WorkerConstants.ProcessStartTimeoutSeconds, WorkerConstants.WorkerReadyCheckPollingIntervalMilliseconds, () =>
9393
{
9494
return _functionDispatcher.State != FunctionInvocationDispatcherState.Initialized;
9595
});

src/WebJobs.Script/Host/ScriptHost.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,7 @@ protected override async Task StartAsyncCore(CancellationToken cancellationToken
241241
{
242242
var ignore = LogInitializationAsync();
243243

244-
await InitializeAsync();
244+
await InitializeAsync(cancellationToken);
245245

246246
// Throw if cancellation occurred during initialization.
247247
cancellationToken.ThrowIfCancellationRequested();
@@ -255,7 +255,7 @@ protected override async Task StartAsyncCore(CancellationToken cancellationToken
255255
/// Performs all required initialization on the host.
256256
/// Must be called before the host is started.
257257
/// </summary>
258-
public async Task InitializeAsync()
258+
public async Task InitializeAsync(CancellationToken cancellationToken = default)
259259
{
260260
_stopwatch.Start();
261261
using (_metricsLogger.LatencyEvent(MetricEventNames.HostStartupLatency))
@@ -277,7 +277,7 @@ public async Task InitializeAsync()
277277
await InitializeFunctionDescriptorsAsync(functionMetadataList);
278278

279279
// Initialize worker function invocation dispatcher only for valid functions after creating function descriptors
280-
await _functionDispatcher.InitializeAsync(Utility.GetValidFunctions(functionMetadataList, Functions));
280+
await _functionDispatcher.InitializeAsync(Utility.GetValidFunctions(functionMetadataList, Functions), cancellationToken);
281281

282282
GenerateFunctions(directTypes);
283283

src/WebJobs.Script/Utility.cs

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,17 +81,32 @@ internal static async Task InvokeWithRetriesAsync(Func<Task> action, int maxRetr
8181
/// <param name="pollingIntervalMilliseconds">The polling interval.</param>
8282
/// <param name="condition">The condition to check</param>
8383
/// <returns>A Task representing the delay.</returns>
84-
internal static async Task DelayAsync(int timeoutSeconds, int pollingIntervalMilliseconds, Func<bool> condition)
84+
internal static Task DelayAsync(int timeoutSeconds, int pollingIntervalMilliseconds, Func<bool> condition)
85+
{
86+
Task<bool> Condition() => Task.FromResult(condition());
87+
return DelayAsync(timeoutSeconds, pollingIntervalMilliseconds, Condition, CancellationToken.None);
88+
}
89+
90+
/// <summary>
91+
/// Delays while the specified condition remains true.
92+
/// </summary>
93+
/// <param name="timeoutSeconds">The maximum number of seconds to delay.</param>
94+
/// <param name="pollingIntervalMilliseconds">The polling interval.</param>
95+
/// <param name="condition">The async condition to check</param>
96+
/// <returns>A Task representing the delay.</returns>
97+
internal static async Task<bool> DelayAsync(int timeoutSeconds, int pollingIntervalMilliseconds, Func<Task<bool>> condition, CancellationToken cancellationToken)
8598
{
8699
TimeSpan timeout = TimeSpan.FromSeconds(timeoutSeconds);
87100
TimeSpan delay = TimeSpan.FromMilliseconds(pollingIntervalMilliseconds);
88101
TimeSpan timeWaited = TimeSpan.Zero;
89-
90-
while (condition() && (timeWaited < timeout))
102+
bool conditionResult = await condition();
103+
while (conditionResult && (timeWaited < timeout) && !cancellationToken.IsCancellationRequested)
91104
{
92105
await Task.Delay(delay);
93106
timeWaited += delay;
107+
conditionResult = await condition();
94108
}
109+
return conditionResult;
95110
}
96111

97112
/// <summary>

src/WebJobs.Script/Workers/Http/DefaultHttpWorkerService.cs

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,11 @@
44
using System;
55
using System.Collections.Generic;
66
using System.Linq;
7+
using System.Net;
78
using System.Net.Http;
89
using System.Net.Http.Formatting;
10+
using System.Net.Sockets;
11+
using System.Threading;
912
using System.Threading.Tasks;
1013
using Microsoft.AspNetCore.Http;
1114
using Microsoft.AspNetCore.Mvc.WebApiCompatShim;
@@ -24,15 +27,15 @@ public class DefaultHttpWorkerService : IHttpWorkerService
2427
private readonly ILogger _logger;
2528

2629
public DefaultHttpWorkerService(IOptions<HttpWorkerOptions> httpWorkerOptions, ILoggerFactory loggerFactory)
27-
: this(new HttpClient(), httpWorkerOptions, loggerFactory)
30+
: this(new HttpClient(), httpWorkerOptions, loggerFactory.CreateLogger<DefaultHttpWorkerService>())
2831
{
2932
}
3033

31-
internal DefaultHttpWorkerService(HttpClient httpClient, IOptions<HttpWorkerOptions> httpWorkerOptions, ILoggerFactory loggerFactory)
34+
internal DefaultHttpWorkerService(HttpClient httpClient, IOptions<HttpWorkerOptions> httpWorkerOptions, ILogger logger)
3235
{
3336
_httpClient = httpClient;
3437
_httpWorkerOptions = httpWorkerOptions.Value;
35-
_logger = loggerFactory.CreateLogger<DefaultHttpWorkerService>();
38+
_logger = logger;
3639
}
3740

3841
public Task InvokeAsync(ScriptInvocationContext scriptInvocationContext)
@@ -170,5 +173,38 @@ private void AddRequestHeadersAndSetRequestUri(HttpRequestMessage httpRequestMes
170173
httpRequestMessage.Headers.Add(HttpWorkerConstants.HostVersionHeaderName, ScriptHost.Version);
171174
httpRequestMessage.Headers.UserAgent.ParseAdd($"{HttpWorkerConstants.UserAgentHeaderValue}/{ScriptHost.Version}");
172175
}
176+
177+
public async Task<bool> IsWorkerReady(CancellationToken cancellationToken)
178+
{
179+
bool continueWaitingForWorker = await Utility.DelayAsync(WorkerConstants.WorkerInitTimeoutSeconds, WorkerConstants.WorkerReadyCheckPollingIntervalMilliseconds, async () =>
180+
{
181+
return await IsWorkerReadyForRequest();
182+
}, cancellationToken);
183+
return !continueWaitingForWorker;
184+
}
185+
186+
private async Task<bool> IsWorkerReadyForRequest()
187+
{
188+
string requestUri = new UriBuilder(WorkerConstants.HttpScheme, WorkerConstants.HostName, _httpWorkerOptions.Port).ToString();
189+
HttpRequestMessage httpRequestMessage = new HttpRequestMessage();
190+
httpRequestMessage.RequestUri = new Uri(requestUri);
191+
try
192+
{
193+
await _httpClient.SendAsync(httpRequestMessage);
194+
// Any Http response indicates a valid server Url
195+
return false;
196+
}
197+
catch (HttpRequestException httpRequestEx)
198+
{
199+
if (httpRequestEx.InnerException != null && httpRequestEx.InnerException is SocketException)
200+
{
201+
// Wait for the worker to be ready
202+
_logger.LogDebug("Waiting for HttpWorker to be initialized. Request to: {requestUri} failing with exception message: {message}", requestUri, httpRequestEx.Message);
203+
return true;
204+
}
205+
// Any other inner exception, consider HttpWorker to be ready
206+
return false;
207+
}
208+
}
173209
}
174210
}

src/WebJobs.Script/Workers/Http/HttpFunctionInvocationDispatcher.cs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,11 @@
66
using System.Collections.Generic;
77
using System.Linq;
88
using System.Reactive.Linq;
9+
using System.Threading;
910
using System.Threading.Tasks;
1011
using Microsoft.Azure.WebJobs.Script.Description;
1112
using Microsoft.Azure.WebJobs.Script.Diagnostics;
1213
using Microsoft.Azure.WebJobs.Script.Eventing;
13-
using Microsoft.Azure.WebJobs.Script.Workers.Rpc;
1414
using Microsoft.Extensions.Logging;
1515
using Microsoft.Extensions.Options;
1616

@@ -59,11 +59,11 @@ public HttpFunctionInvocationDispatcher(IOptions<ScriptJobHostOptions> scriptHos
5959

6060
public FunctionInvocationDispatcherState State { get; private set; }
6161

62-
internal Task InitializeJobhostLanguageWorkerChannelAsync(int attemptCount)
62+
internal Task InitializeHttpWorkerChannelAsync(int attemptCount, CancellationToken cancellationToken = default)
6363
{
6464
// TODO: Add process managment for http invoker
6565
_httpWorkerChannel = _httpWorkerChannelFactory.Create(_scriptOptions.RootScriptPath, _metricsLogger, attemptCount);
66-
_httpWorkerChannel.StartWorkerProcessAsync().ContinueWith(workerInitTask =>
66+
_httpWorkerChannel.StartWorkerProcessAsync(cancellationToken).ContinueWith(workerInitTask =>
6767
{
6868
if (workerInitTask.IsCompleted)
6969
{
@@ -78,7 +78,7 @@ internal Task InitializeJobhostLanguageWorkerChannelAsync(int attemptCount)
7878
return Task.CompletedTask;
7979
}
8080

81-
public async Task InitializeAsync(IEnumerable<FunctionMetadata> functions)
81+
public async Task InitializeAsync(IEnumerable<FunctionMetadata> functions, CancellationToken cancellationToken = default)
8282
{
8383
if (functions == null || !functions.Any())
8484
{
@@ -87,7 +87,7 @@ public async Task InitializeAsync(IEnumerable<FunctionMetadata> functions)
8787
}
8888

8989
State = FunctionInvocationDispatcherState.Initializing;
90-
await InitializeJobhostLanguageWorkerChannelAsync(0);
90+
await InitializeHttpWorkerChannelAsync(0, cancellationToken);
9191
}
9292

9393
public Task InvokeAsync(ScriptInvocationContext invocationContext)
@@ -121,19 +121,19 @@ private async Task DisposeAndRestartWorkerChannel(string workerId)
121121
{
122122
(_httpWorkerChannel as IDisposable)?.Dispose();
123123
}
124-
_logger.LogDebug("Restarting http invoker channel");
125124
await RestartWorkerChannel(workerId);
126125
}
127126

128127
private async Task RestartWorkerChannel(string workerId)
129128
{
130129
if (_invokerErrors.Count < 3)
131130
{
132-
await InitializeJobhostLanguageWorkerChannelAsync(_invokerErrors.Count);
131+
_logger.LogDebug("Restarting http invoker channel");
132+
await InitializeHttpWorkerChannelAsync(_invokerErrors.Count);
133133
}
134-
else if (_httpWorkerChannel == null)
134+
else
135135
{
136-
_logger.LogError("Exceeded http invoker restart retry count. Shutting down Functions Host");
136+
_logger.LogError("Exceeded http worker restart retry count. Shutting down Functions Host");
137137
_scriptJobHostEnvironment.Shutdown();
138138
}
139139
}

src/WebJobs.Script/Workers/Http/HttpWorkerChannel.cs

Lines changed: 42 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,32 +2,36 @@
22
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
33

44
using System;
5+
using System.Threading;
56
using System.Threading.Tasks;
67
using Microsoft.Azure.WebJobs.Script.Description;
78
using Microsoft.Azure.WebJobs.Script.Diagnostics;
8-
using Microsoft.Azure.WebJobs.Script.Workers.Rpc;
9+
using Microsoft.Azure.WebJobs.Script.Eventing;
910
using Microsoft.Extensions.Logging;
1011

1112
namespace Microsoft.Azure.WebJobs.Script.Workers
1213
{
1314
public class HttpWorkerChannel : IHttpWorkerChannel, IDisposable
1415
{
16+
private readonly IScriptEventManager _eventManager;
1517
private bool _disposed;
1618
private IDisposable _startLatencyMetric;
1719
private ILogger _workerChannelLogger;
18-
private IWorkerProcess _rpcWorkerProcess;
20+
private IWorkerProcess _workerProcess;
1921
private IHttpWorkerService _httpWorkerService;
2022

2123
internal HttpWorkerChannel(
2224
string workerId,
25+
IScriptEventManager eventManager,
2326
IWorkerProcess rpcWorkerProcess,
2427
IHttpWorkerService httpWorkerService,
2528
ILogger logger,
2629
IMetricsLogger metricsLogger,
2730
int attemptCount)
2831
{
2932
Id = workerId;
30-
_rpcWorkerProcess = rpcWorkerProcess;
33+
_eventManager = eventManager;
34+
_workerProcess = rpcWorkerProcess;
3135
_workerChannelLogger = logger;
3236
_httpWorkerService = httpWorkerService;
3337
_startLatencyMetric = metricsLogger?.LatencyEvent(string.Format(MetricEventNames.WorkerInitializeLatency, "HttpWorker", attemptCount));
@@ -40,10 +44,42 @@ public Task InvokeFunction(ScriptInvocationContext context)
4044
return _httpWorkerService.InvokeAsync(context);
4145
}
4246

43-
public async Task StartWorkerProcessAsync()
47+
internal async Task DelayUntilWokerInitialized(CancellationToken cancellationToken)
48+
{
49+
_workerChannelLogger.LogDebug("Initializing HttpWorker.");
50+
try
51+
{
52+
bool isWorkerReady = await _httpWorkerService.IsWorkerReady(cancellationToken);
53+
if (!isWorkerReady)
54+
{
55+
PublishWorkerErrorEvent(new TimeoutException("Initializing HttpWorker timed out."));
56+
}
57+
else
58+
{
59+
_workerChannelLogger.LogDebug("HttpWorker is Initialized.");
60+
}
61+
}
62+
catch (Exception ex)
63+
{
64+
// HttpFunctionInvocationDispatcher will handdle the worker error events
65+
PublishWorkerErrorEvent(ex);
66+
}
67+
}
68+
69+
public async Task StartWorkerProcessAsync(CancellationToken cancellationToken)
4470
{
4571
_workerChannelLogger.LogDebug("Initiating Worker Process start up");
46-
await _rpcWorkerProcess.StartProcessAsync();
72+
await _workerProcess.StartProcessAsync();
73+
await DelayUntilWokerInitialized(cancellationToken);
74+
}
75+
76+
private void PublishWorkerErrorEvent(Exception exc)
77+
{
78+
if (_disposed)
79+
{
80+
return;
81+
}
82+
_eventManager.Publish(new HttpWorkerErrorEvent(Id, exc));
4783
}
4884

4985
protected virtual void Dispose(bool disposing)
@@ -54,7 +90,7 @@ protected virtual void Dispose(bool disposing)
5490
{
5591
_startLatencyMetric?.Dispose();
5692

57-
(_rpcWorkerProcess as IDisposable)?.Dispose();
93+
(_workerProcess as IDisposable)?.Dispose();
5894
}
5995
_disposed = true;
6096
}

src/WebJobs.Script/Workers/Http/HttpWorkerChannelFactory.cs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@
33

44
using System;
55
using Microsoft.Azure.WebJobs.Script.Diagnostics;
6+
using Microsoft.Azure.WebJobs.Script.Eventing;
67
using Microsoft.Azure.WebJobs.Script.Workers.Http;
7-
using Microsoft.Azure.WebJobs.Script.Workers.Rpc;
88
using Microsoft.Extensions.Logging;
99
using Microsoft.Extensions.Options;
1010

@@ -15,11 +15,13 @@ public class HttpWorkerChannelFactory : IHttpWorkerChannelFactory
1515
private readonly ILoggerFactory _loggerFactory = null;
1616
private readonly IHttpWorkerProcessFactory _httpWorkerProcessFactory = null;
1717
private readonly HttpWorkerOptions _httpWorkerOptions = null;
18+
private readonly IScriptEventManager _eventManager;
1819
private IHttpWorkerService _httpWorkerService;
1920

20-
public HttpWorkerChannelFactory(IEnvironment environment, ILoggerFactory loggerFactory, IOptions<HttpWorkerOptions> httpWorkerOptions,
21+
public HttpWorkerChannelFactory(IEnvironment environment, ILoggerFactory loggerFactory, IOptions<HttpWorkerOptions> httpWorkerOptions, IScriptEventManager eventManager,
2122
IOptionsMonitor<ScriptApplicationHostOptions> applicationHostOptions, IHttpWorkerProcessFactory httpWorkerProcessFactory, IHttpWorkerService httpWorkerService)
2223
{
24+
_eventManager = eventManager ?? throw new ArgumentNullException(nameof(eventManager));
2325
_loggerFactory = loggerFactory ?? throw new ArgumentNullException(nameof(loggerFactory));
2426
_httpWorkerOptions = httpWorkerOptions.Value ?? throw new ArgumentNullException(nameof(httpWorkerOptions.Value));
2527
_httpWorkerProcessFactory = httpWorkerProcessFactory ?? throw new ArgumentNullException(nameof(httpWorkerProcessFactory));
@@ -33,6 +35,7 @@ public IHttpWorkerChannel Create(string scriptRootPath, IMetricsLogger metricsLo
3335
IWorkerProcess httpWorkerProcess = _httpWorkerProcessFactory.Create(workerId, scriptRootPath, _httpWorkerOptions);
3436
return new HttpWorkerChannel(
3537
workerId,
38+
_eventManager,
3639
httpWorkerProcess,
3740
_httpWorkerService,
3841
workerLogger,

src/WebJobs.Script/Workers/Http/IHttpWorkerChannel.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// Copyright (c) .NET Foundation. All rights reserved.
22
// Licensed under the MIT License. See License.txt in the project root for license information.
33

4+
using System.Threading;
45
using System.Threading.Tasks;
56
using Microsoft.Azure.WebJobs.Script.Description;
67

@@ -12,6 +13,6 @@ public interface IHttpWorkerChannel
1213

1314
Task InvokeFunction(ScriptInvocationContext context);
1415

15-
Task StartWorkerProcessAsync();
16+
Task StartWorkerProcessAsync(CancellationToken cancellationToken = default);
1617
}
1718
}

src/WebJobs.Script/Workers/Http/IHttpWorkerService.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// Copyright (c) .NET Foundation. All rights reserved.
22
// Licensed under the MIT License. See License.txt in the project root for license information.
33

4+
using System.Threading;
45
using System.Threading.Tasks;
56
using Microsoft.Azure.WebJobs.Script.Description;
67

@@ -9,5 +10,7 @@ namespace Microsoft.Azure.WebJobs.Script.Workers
910
public interface IHttpWorkerService
1011
{
1112
Task InvokeAsync(ScriptInvocationContext scriptInvocationContext);
13+
14+
Task<bool> IsWorkerReady(CancellationToken cancellationToken);
1215
}
1316
}

src/WebJobs.Script/Workers/IFunctionInvocationDispatcher.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
using System;
55
using System.Collections.Generic;
6+
using System.Threading;
67
using System.Threading.Tasks;
78
using Microsoft.Azure.WebJobs.Script.Description;
89

@@ -14,7 +15,7 @@ public interface IFunctionInvocationDispatcher : IDisposable
1415

1516
Task InvokeAsync(ScriptInvocationContext invocationContext);
1617

17-
Task InitializeAsync(IEnumerable<FunctionMetadata> functions);
18+
Task InitializeAsync(IEnumerable<FunctionMetadata> functions, CancellationToken cancellationToken = default);
1819

1920
Task ShutdownAsync();
2021
}

0 commit comments

Comments
 (0)