Skip to content

Commit c6495bb

Browse files
authored
Add RPC extensibility feature to dev (#9393)
1 parent 388c4d2 commit c6495bb

16 files changed

+493
-35
lines changed

release_notes.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,4 +17,5 @@
1717
- Microsoft.IdentityModel.Logging
1818
- Updated Grpc.AspNetCore package to 2.55.0 (https://github.com/Azure/azure-functions-host/pull/9373)
1919
- Update protobuf file to v1.10.0 (https://github.com/Azure/azure-functions-host/pull/9405)
20-
- Send an empty RpcHttp payload if proxying the http request to the worker (https://github.com/Azure/azure-functions-host/pull/9415)
20+
- Send an empty RpcHttp payload if proxying the http request to the worker (https://github.com/Azure/azure-functions-host/pull/9415)
21+
- Add new Host to Worker RPC extensibility feature for out-of-proc workers. (https://github.com/Azure/azure-functions-host/pull/9292)

src/WebJobs.Script.Grpc/Server/AspNetCoreGrpcHostBuilder.cs

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,13 @@ namespace Microsoft.Azure.WebJobs.Script.Grpc
1414
{
1515
internal static class AspNetCoreGrpcHostBuilder
1616
{
17-
public static IHostBuilder CreateHostBuilder(FunctionRpc.FunctionRpcBase service, IScriptEventManager scriptEventManager, int port) =>
18-
new HostBuilder().ConfigureWebHost(webBuilder =>
17+
public static IHostBuilder CreateHostBuilder(
18+
FunctionRpc.FunctionRpcBase service,
19+
IScriptEventManager scriptEventManager,
20+
IScriptHostManager scriptHostManager,
21+
int port)
22+
{
23+
return new HostBuilder().ConfigureWebHost(webBuilder =>
1924
{
2025
webBuilder.UseKestrel(options =>
2126
{
@@ -26,12 +31,14 @@ public static IHostBuilder CreateHostBuilder(FunctionRpc.FunctionRpcBase service
2631
});
2732

2833
webBuilder.ConfigureServices(services =>
29-
{
30-
services.AddSingleton(scriptEventManager);
31-
services.AddSingleton(service);
32-
});
34+
{
35+
services.AddSingleton(scriptHostManager);
36+
services.AddSingleton(scriptEventManager);
37+
services.AddSingleton(service);
38+
});
3339

3440
webBuilder.UseStartup<Startup>();
3541
});
42+
}
3643
}
3744
}

src/WebJobs.Script.Grpc/Server/AspNetCoreGrpcServer.cs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,14 @@ public class AspNetCoreGrpcServer : IRpcServer, IDisposable, IAsyncDisposable
2323
private bool _disposed = false;
2424
private IHost _grpcHost;
2525

26-
public AspNetCoreGrpcServer(FunctionRpc.FunctionRpcBase service, IScriptEventManager scriptEventManager, ILogger<AspNetCoreGrpcServer> logger)
26+
public AspNetCoreGrpcServer(
27+
FunctionRpc.FunctionRpcBase service,
28+
IScriptEventManager scriptEventManager,
29+
IScriptHostManager scriptHostManager,
30+
ILogger<AspNetCoreGrpcServer> logger)
2731
{
2832
int port = WorkerUtilities.GetUnusedTcpPort();
29-
_grpcHostBuilder = AspNetCoreGrpcHostBuilder.CreateHostBuilder(service, scriptEventManager, port);
33+
_grpcHostBuilder = AspNetCoreGrpcHostBuilder.CreateHostBuilder(service, scriptEventManager, scriptHostManager, port);
3034
_logger = logger;
3135
Uri = new Uri($"http://{WorkerConstants.HostName}:{port}");
3236
}
Lines changed: 305 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,305 @@
1+
// Copyright (c) .NET Foundation. All rights reserved.
2+
// Licensed under the MIT License. See License.txt in the project root for license information.
3+
4+
using System;
5+
using System.Collections.Generic;
6+
using System.Diagnostics.CodeAnalysis;
7+
using System.Linq;
8+
using System.Threading;
9+
using Microsoft.AspNetCore.Http;
10+
using Microsoft.AspNetCore.Routing;
11+
using Microsoft.Azure.WebJobs.Rpc.Core.Internal;
12+
using Microsoft.Extensions.DependencyInjection;
13+
using Microsoft.Extensions.Primitives;
14+
15+
namespace Microsoft.Azure.WebJobs.Script.Grpc
16+
{
17+
/// <summary>
18+
/// Endpoint data source which composes all the WebJobs extension data sources together.
19+
/// </summary>
20+
/// <remarks>
21+
/// Implementation is adapted from <see cref="CompositeEndpointDataSource"/>.
22+
/// https://github.com/dotnet/aspnetcore/blob/main/src/Http/Routing/src/CompositeEndpointDataSource.cs.
23+
/// </remarks>
24+
internal sealed class ExtensionsCompositeEndpointDataSource : EndpointDataSource, IDisposable
25+
{
26+
private readonly object _lock = new();
27+
private readonly List<EndpointDataSource> _dataSources = new();
28+
private readonly IScriptHostManager _scriptHostManager;
29+
30+
private IServiceProvider _extensionServices;
31+
private List<Endpoint> _endpoints;
32+
private IChangeToken _consumerChangeToken;
33+
private CancellationTokenSource _cts;
34+
private List<IDisposable> _changeTokenRegistrations;
35+
private bool _disposed;
36+
37+
public ExtensionsCompositeEndpointDataSource(IScriptHostManager scriptHostManager)
38+
{
39+
_scriptHostManager = scriptHostManager;
40+
_scriptHostManager.ActiveHostChanged += OnHostChanged;
41+
}
42+
43+
/// <inheritdoc />
44+
public override IReadOnlyList<Endpoint> Endpoints
45+
{
46+
get
47+
{
48+
ThrowIfDisposed();
49+
EnsureEndpointsInitialized();
50+
return _endpoints;
51+
}
52+
}
53+
54+
/// <inheritdoc />
55+
public override IChangeToken GetChangeToken()
56+
{
57+
ThrowIfDisposed();
58+
EnsureChangeTokenInitialized();
59+
return _consumerChangeToken;
60+
}
61+
62+
/// <inheritdoc />
63+
public void Dispose()
64+
{
65+
List<IDisposable> disposables = null;
66+
lock (_lock)
67+
{
68+
if (_disposed)
69+
{
70+
return;
71+
}
72+
73+
_scriptHostManager.ActiveHostChanged -= OnHostChanged;
74+
_disposed = true;
75+
if (_changeTokenRegistrations is { Count: > 0 })
76+
{
77+
disposables ??= new List<IDisposable>();
78+
disposables.AddRange(_changeTokenRegistrations);
79+
_changeTokenRegistrations = null;
80+
}
81+
}
82+
83+
// Dispose everything outside of the lock in case a registration is blocking on HandleChange completing
84+
// on another thread or something.
85+
if (disposables is not null)
86+
{
87+
foreach (var disposable in disposables)
88+
{
89+
disposable.Dispose();
90+
}
91+
}
92+
93+
_cts?.Dispose();
94+
}
95+
96+
private Endpoint WrapEndpoint(Endpoint endpoint)
97+
{
98+
static RequestDelegate CreateDelegate(RequestDelegate next, IServiceProvider services)
99+
{
100+
// Incoming HttpContext has the gRPC script host services. Create a scope
101+
// for the JobHost services and swap out the contexts request services.
102+
return async context =>
103+
{
104+
if (next is null)
105+
{
106+
return;
107+
}
108+
109+
IServiceProvider original = context.RequestServices;
110+
111+
try
112+
{
113+
await using AsyncServiceScope scope = services.CreateAsyncScope();
114+
context.RequestServices = scope.ServiceProvider;
115+
await next(context);
116+
}
117+
finally
118+
{
119+
context.RequestServices = original;
120+
}
121+
};
122+
}
123+
124+
if (endpoint is not RouteEndpoint route)
125+
{
126+
// We only wrap URL-routeable endpoints (ie: RouteEndpoint).
127+
return endpoint;
128+
}
129+
130+
IServiceProvider services = _extensionServices
131+
?? throw new InvalidOperationException(
132+
"Trying to register extension endpoints, but no extension IServiceProvider available.");
133+
134+
return new RouteEndpoint(
135+
CreateDelegate(route.RequestDelegate, services),
136+
route.RoutePattern,
137+
route.Order,
138+
route.Metadata,
139+
route.DisplayName);
140+
}
141+
142+
[MemberNotNull(nameof(_consumerChangeToken))]
143+
private void EnsureChangeTokenInitialized()
144+
{
145+
if (_consumerChangeToken is not null)
146+
{
147+
return;
148+
}
149+
150+
lock (_lock)
151+
{
152+
if (_consumerChangeToken is not null)
153+
{
154+
return;
155+
}
156+
157+
// This is our first time initializing the change token, so the collection has "changed" from nothing.
158+
CreateChangeTokenUnsynchronized(collectionChanged: true);
159+
}
160+
}
161+
162+
[MemberNotNull(nameof(_consumerChangeToken))]
163+
private void CreateChangeTokenUnsynchronized(bool collectionChanged)
164+
{
165+
CancellationTokenSource cts = new();
166+
167+
if (collectionChanged)
168+
{
169+
_changeTokenRegistrations = new();
170+
foreach (var dataSource in _dataSources)
171+
{
172+
_changeTokenRegistrations.Add(ChangeToken.OnChange(
173+
dataSource.GetChangeToken,
174+
() => OnEndpointsChange(collectionChanged: false)));
175+
}
176+
}
177+
178+
_cts = cts;
179+
_consumerChangeToken = new CancellationChangeToken(cts.Token);
180+
}
181+
182+
private void OnHostChanged(object sender, ActiveHostChangedEventArgs args)
183+
{
184+
lock (_lock)
185+
{
186+
_dataSources.Clear();
187+
if (args?.NewHost?.Services is { } services)
188+
{
189+
_extensionServices = services;
190+
IEnumerable<WebJobsRpcEndpointDataSource> sources = services
191+
.GetService<IEnumerable<WebJobsRpcEndpointDataSource>>()
192+
?? Enumerable.Empty<WebJobsRpcEndpointDataSource>();
193+
_dataSources.AddRange(sources);
194+
}
195+
else
196+
{
197+
_extensionServices = null;
198+
}
199+
}
200+
201+
OnEndpointsChange(collectionChanged: true);
202+
}
203+
204+
private void OnEndpointsChange(bool collectionChanged)
205+
{
206+
CancellationTokenSource oldTokenSource = null;
207+
List<IDisposable> oldChangeTokenRegistrations = null;
208+
209+
lock (_lock)
210+
{
211+
if (_disposed)
212+
{
213+
return;
214+
}
215+
216+
// Prevent consumers from re-registering callback to in-flight events as that can
217+
// cause a stack overflow.
218+
// Example:
219+
// 1. B registers A.
220+
// 2. A fires event causing B's callback to get called.
221+
// 3. B executes some code in its callback, but needs to re-register callback
222+
// in the same callback.
223+
oldTokenSource = _cts;
224+
oldChangeTokenRegistrations = _changeTokenRegistrations;
225+
226+
// Don't create a new change token if no one is listening.
227+
if (oldTokenSource is not null)
228+
{
229+
// We have to hook to any OnChange callbacks before caching endpoints,
230+
// otherwise we might miss changes that occurred to one of the _dataSources after caching.
231+
CreateChangeTokenUnsynchronized(collectionChanged);
232+
}
233+
234+
// Don't update endpoints if no one has read them yet.
235+
if (_endpoints is not null)
236+
{
237+
// Refresh the endpoints from data source so that callbacks can get the latest endpoints.
238+
CreateEndpointsUnsynchronized();
239+
}
240+
}
241+
242+
// Disposing registrations can block on user defined code on running on other threads that could try to acquire the _lock.
243+
if (collectionChanged && oldChangeTokenRegistrations is not null)
244+
{
245+
foreach (var registration in oldChangeTokenRegistrations)
246+
{
247+
registration.Dispose();
248+
}
249+
}
250+
251+
// Raise consumer callbacks. Any new callback registration would happen on the new token created in earlier step.
252+
// Avoid raising callbacks inside a lock.
253+
oldTokenSource?.Cancel();
254+
}
255+
256+
[MemberNotNull(nameof(_endpoints))]
257+
private void CreateEndpointsUnsynchronized()
258+
{
259+
var endpoints = new List<Endpoint>();
260+
261+
foreach (var dataSource in _dataSources)
262+
{
263+
endpoints.AddRange(dataSource.Endpoints.Select(WrapEndpoint));
264+
}
265+
266+
// Only cache _endpoints after everything succeeds without throwing.
267+
// We don't want to create a negative cache which would cause 404s when there should be 500s.
268+
_endpoints = endpoints;
269+
}
270+
271+
// Defer initialization to avoid doing lots of reflection on startup.
272+
[MemberNotNull(nameof(_endpoints))]
273+
private void EnsureEndpointsInitialized()
274+
{
275+
if (_endpoints is not null)
276+
{
277+
return;
278+
}
279+
280+
lock (_lock)
281+
{
282+
if (_endpoints is not null)
283+
{
284+
return;
285+
}
286+
287+
// Now that we're caching the _enpoints, we're responsible for keeping them up-to-date even if the caller
288+
// hasn't started listening for changes themselves yet.
289+
EnsureChangeTokenInitialized();
290+
291+
// Note: we can't use DataSourceDependentCache here because we also need to handle a list of change
292+
// tokens, which is a complication most of our code doesn't have.
293+
CreateEndpointsUnsynchronized();
294+
}
295+
}
296+
297+
private void ThrowIfDisposed()
298+
{
299+
if (_disposed)
300+
{
301+
throw new ObjectDisposedException(nameof(ExtensionsCompositeEndpointDataSource));
302+
}
303+
}
304+
}
305+
}

src/WebJobs.Script.Grpc/Server/Startup.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ internal class Startup
1515

1616
public void ConfigureServices(IServiceCollection services)
1717
{
18+
services.AddSingleton<ExtensionsCompositeEndpointDataSource>();
1819
services.AddGrpc(options =>
1920
{
2021
options.MaxReceiveMessageSize = MaxMessageLengthBytes;
@@ -34,6 +35,7 @@ public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
3435
app.UseEndpoints(endpoints =>
3536
{
3637
endpoints.MapGrpcService<FunctionRpc.FunctionRpcBase>();
38+
endpoints.DataSources.Add(endpoints.ServiceProvider.GetRequiredService<ExtensionsCompositeEndpointDataSource>());
3739
});
3840
}
3941
}

0 commit comments

Comments
 (0)