Skip to content

Commit a3734f9

Browse files
authored
Ensure extension RPC endpoints ready before processing gRPC messages (#10255)
* Ensure extension RPC endpoints ready before processing gRPC messages * Add timeout and tests to waiting on RPC extensions.
1 parent 4e74882 commit a3734f9

File tree

4 files changed

+115
-1
lines changed

4 files changed

+115
-1
lines changed

release_notes.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,4 +17,5 @@
1717
- Ordered invocations are now the default (#10201)
1818
- Skip worker description if none of the profile conditions are met (#9932)
1919
- Fixed incorrect function count in the log message.(#10220)
20+
- Fix race condition on startup with extension RPC endpoints not being available. (#10255)
2021
- Adding a timeout when retrieving function metadata from metadata providers (#10219)

src/WebJobs.Script.Grpc/Server/ExtensionsCompositeEndpointDataSource.cs

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,12 @@
66
using System.Diagnostics.CodeAnalysis;
77
using System.Linq;
88
using System.Threading;
9+
using System.Threading.Tasks;
910
using Microsoft.AspNetCore.Http;
1011
using Microsoft.AspNetCore.Routing;
1112
using Microsoft.Azure.WebJobs.Rpc.Core.Internal;
1213
using Microsoft.Extensions.DependencyInjection;
14+
using Microsoft.Extensions.Logging;
1315
using Microsoft.Extensions.Primitives;
1416

1517
namespace Microsoft.Azure.WebJobs.Script.Grpc
@@ -26,6 +28,7 @@ internal sealed class ExtensionsCompositeEndpointDataSource : EndpointDataSource
2628
private readonly object _lock = new();
2729
private readonly List<EndpointDataSource> _dataSources = new();
2830
private readonly IScriptHostManager _scriptHostManager;
31+
private readonly TaskCompletionSource _initialized = new();
2932

3033
private IServiceProvider _extensionServices;
3134
private List<Endpoint> _endpoints;
@@ -191,6 +194,7 @@ private void OnHostChanged(object sender, ActiveHostChangedEventArgs args)
191194
.GetService<IEnumerable<WebJobsRpcEndpointDataSource>>()
192195
?? Enumerable.Empty<WebJobsRpcEndpointDataSource>();
193196
_dataSources.AddRange(sources);
197+
_initialized.TrySetResult(); // signal we have first initialized.
194198
}
195199
else
196200
{
@@ -301,5 +305,49 @@ private void ThrowIfDisposed()
301305
throw new ObjectDisposedException(nameof(ExtensionsCompositeEndpointDataSource));
302306
}
303307
}
308+
309+
/// <summary>
310+
/// Middleware to ensure <see cref="ExtensionsCompositeEndpointDataSource"/> is initialized before routing for the first time.
311+
/// Must be registered as a singleton service.
312+
/// </summary>
313+
/// <param name="dataSource">The <see cref="ExtensionsCompositeEndpointDataSource"/> to ensure is initialized.</param>
314+
/// <param name="logger">The logger.</param>
315+
public sealed class EnsureInitializedMiddleware(ExtensionsCompositeEndpointDataSource dataSource, ILogger<EnsureInitializedMiddleware> logger) : IMiddleware
316+
{
317+
private TaskCompletionSource _initialized = new();
318+
private bool _firstRun = true;
319+
320+
// used for testing to verify initialization success.
321+
internal Task Initialized => _initialized.Task;
322+
323+
// settable only for testing purposes.
324+
internal TimeSpan Timeout { get; init; } = TimeSpan.FromSeconds(2);
325+
326+
public Task InvokeAsync(HttpContext context, RequestDelegate next)
327+
{
328+
return _firstRun ? InvokeCoreAsync(context, next) : next(context);
329+
}
330+
331+
private async Task InvokeCoreAsync(HttpContext context, RequestDelegate next)
332+
{
333+
try
334+
{
335+
await dataSource._initialized.Task.WaitAsync(Timeout);
336+
}
337+
catch (TimeoutException ex)
338+
{
339+
// In case of deadlock we don't want to block all gRPC requests.
340+
// Log an error and continue.
341+
logger.LogError(ex, "Error initializing extension endpoints.");
342+
_initialized.TrySetException(ex);
343+
}
344+
345+
// Even in case of timeout we don't want to continually test for initialization on subsequent requests.
346+
// That would be a serious performance degredation.
347+
_firstRun = false;
348+
_initialized.TrySetResult();
349+
await next(context);
350+
}
351+
}
304352
}
305353
}

src/WebJobs.Script.Grpc/Server/Startup.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ internal class Startup
1616
public void ConfigureServices(IServiceCollection services)
1717
{
1818
services.AddSingleton<ExtensionsCompositeEndpointDataSource>();
19+
services.AddSingleton<ExtensionsCompositeEndpointDataSource.EnsureInitializedMiddleware>();
1920
services.AddGrpc(options =>
2021
{
2122
options.MaxReceiveMessageSize = MaxMessageLengthBytes;
@@ -30,12 +31,16 @@ public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
3031
app.UseDeveloperExceptionPage();
3132
}
3233

34+
// This must occur before 'UseRouting'. This ensures extension endpoints are registered before the
35+
// endpoints are collected by the routing middleware.
36+
app.UseMiddleware<ExtensionsCompositeEndpointDataSource.EnsureInitializedMiddleware>();
3337
app.UseRouting();
3438

3539
app.UseEndpoints(endpoints =>
3640
{
3741
endpoints.MapGrpcService<FunctionRpc.FunctionRpcBase>();
38-
endpoints.DataSources.Add(endpoints.ServiceProvider.GetRequiredService<ExtensionsCompositeEndpointDataSource>());
42+
endpoints.DataSources.Add(
43+
endpoints.ServiceProvider.GetRequiredService<ExtensionsCompositeEndpointDataSource>());
3944
});
4045
}
4146
}

test/WebJobs.Script.Tests/Workers/Rpc/ExtensionsCompositeEndpointDataSourceTests.cs

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,16 @@
33

44
using System;
55
using System.Collections.Generic;
6+
using System.Threading;
7+
using System.Threading.Tasks;
68
using Microsoft.AspNetCore.Http;
79
using Microsoft.Azure.WebJobs.Rpc.Core.Internal;
810
using Microsoft.Azure.WebJobs.Script.Grpc;
911
using Microsoft.Extensions.DependencyInjection;
1012
using Microsoft.Extensions.FileProviders;
1113
using Microsoft.Extensions.Hosting;
14+
using Microsoft.Extensions.Logging;
15+
using Microsoft.Extensions.Logging.Abstractions;
1216
using Microsoft.Extensions.Primitives;
1317
using Moq;
1418
using Xunit;
@@ -17,6 +21,9 @@ namespace Microsoft.Azure.WebJobs.Script.Tests.Workers.Rpc
1721
{
1822
public class ExtensionsCompositeEndpointDataSourceTests
1923
{
24+
private static readonly ILogger<ExtensionsCompositeEndpointDataSource.EnsureInitializedMiddleware> _logger
25+
= NullLogger<ExtensionsCompositeEndpointDataSource.EnsureInitializedMiddleware>.Instance;
26+
2027
[Fact]
2128
public void NoActiveHost_NoEndpoints()
2229
{
@@ -41,6 +48,7 @@ public void ActiveHostChanged_NullHost_NoEndpoints()
4148
public void ActiveHostChanged_NoExtensions_NoEndpoints()
4249
{
4350
Mock<IScriptHostManager> manager = new();
51+
4452
ExtensionsCompositeEndpointDataSource dataSource = new(manager.Object);
4553

4654
IChangeToken token = dataSource.GetChangeToken();
@@ -67,6 +75,45 @@ public void ActiveHostChanged_NewExtensions_NewEndpoints()
6775
endpoint => Assert.Equal("Test2", endpoint.DisplayName));
6876
}
6977

78+
[Fact]
79+
public async Task ActiveHostChanged_MiddlewareWaits_Success()
80+
{
81+
Mock<IScriptHostManager> manager = new();
82+
83+
ExtensionsCompositeEndpointDataSource dataSource = new(manager.Object);
84+
ExtensionsCompositeEndpointDataSource.EnsureInitializedMiddleware middleware =
85+
new(dataSource, _logger) { Timeout = Timeout.InfiniteTimeSpan };
86+
TestDelegate next = new();
87+
88+
Task waiter = middleware.InvokeAsync(null, next.InvokeAsync);
89+
Assert.False(waiter.IsCompleted); // should be blocked until we raise the event.
90+
91+
manager.Raise(x => x.ActiveHostChanged += null, new ActiveHostChangedEventArgs(null, GetHost()));
92+
await waiter.WaitAsync(TimeSpan.FromSeconds(5));
93+
await middleware.Initialized;
94+
await next.Invoked;
95+
}
96+
97+
[Fact]
98+
public async Task NoActiveHostChanged_MiddlewareWaits_Timeout()
99+
{
100+
Mock<IScriptHostManager> manager = new();
101+
102+
ExtensionsCompositeEndpointDataSource dataSource = new(manager.Object);
103+
ExtensionsCompositeEndpointDataSource.EnsureInitializedMiddleware middleware =
104+
new(dataSource, _logger) { Timeout = TimeSpan.Zero };
105+
TestDelegate next = new();
106+
107+
await middleware.InvokeAsync(null, next.InvokeAsync).WaitAsync(TimeSpan.FromSeconds(5)); // should not throw
108+
await Assert.ThrowsAsync<TimeoutException>(() => middleware.Initialized);
109+
await next.Invoked;
110+
111+
// invoke again to verify it processes the next request.
112+
next = new();
113+
await middleware.InvokeAsync(null, next.InvokeAsync);
114+
await next.Invoked;
115+
}
116+
70117
[Fact]
71118
public void Dispose_GetThrows()
72119
{
@@ -105,5 +152,18 @@ public TestEndpoints(params Endpoint[] endpoints)
105152

106153
public override IChangeToken GetChangeToken() => NullChangeToken.Singleton;
107154
}
155+
156+
private class TestDelegate
157+
{
158+
private readonly TaskCompletionSource _invoked = new();
159+
160+
public Task Invoked => _invoked.Task;
161+
162+
public Task InvokeAsync(HttpContext context)
163+
{
164+
_invoked.TrySetResult();
165+
return Task.CompletedTask;
166+
}
167+
}
108168
}
109169
}

0 commit comments

Comments
 (0)