Skip to content

Commit 5c6c379

Browse files
authored
[Enhanced Http] Private preview support for http proxying scenarios (#9193)
1 parent b40039f commit 5c6c379

File tree

14 files changed

+211
-28
lines changed

14 files changed

+211
-28
lines changed

src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
using Microsoft.Azure.WebJobs.Script.Workers.SharedMemoryDataTransfer;
3232
using Microsoft.Extensions.Logging;
3333
using Microsoft.Extensions.Options;
34+
using Yarp.ReverseProxy.Forwarder;
3435

3536
using static Microsoft.Azure.WebJobs.Script.Grpc.Messages.RpcLog.Types;
3637
using FunctionMetadata = Microsoft.Azure.WebJobs.Script.Description.FunctionMetadata;
@@ -81,7 +82,8 @@ internal class GrpcWorkerChannel : IRpcWorkerChannel, IDisposable
8182
private bool _isSharedMemoryDataTransferEnabled;
8283
private bool? _cancelCapabilityEnabled;
8384
private bool _isWorkerApplicationInsightsLoggingEnabled;
84-
85+
private IHttpProxyService _httpProxyService;
86+
private Uri _httpProxyEndpoint;
8587
private System.Timers.Timer _timer;
8688

8789
internal GrpcWorkerChannel(
@@ -96,7 +98,8 @@ internal GrpcWorkerChannel(
9698
IOptionsMonitor<ScriptApplicationHostOptions> applicationHostOptions,
9799
ISharedMemoryManager sharedMemoryManager,
98100
IOptions<WorkerConcurrencyOptions> workerConcurrencyOptions,
99-
IOptions<FunctionsHostingConfigOptions> hostingConfigOptions)
101+
IOptions<FunctionsHostingConfigOptions> hostingConfigOptions,
102+
IHttpProxyService httpProxyService)
100103
{
101104
_workerId = workerId;
102105
_eventManager = eventManager;
@@ -112,6 +115,7 @@ internal GrpcWorkerChannel(
112115
_processInbound = state => ProcessItem((InboundGrpcEvent)state);
113116
_hostingConfigOptions = hostingConfigOptions;
114117

118+
_httpProxyService = httpProxyService;
115119
_workerCapabilities = new GrpcCapabilities(_workerChannelLogger);
116120

117121
if (!_eventManager.TryGetGrpcChannels(workerId, out var inbound, out var outbound))
@@ -133,6 +137,8 @@ internal GrpcWorkerChannel(
133137
_state = RpcWorkerChannelState.Default;
134138
}
135139

140+
private bool IsHttpProxyingWorker => _httpProxyEndpoint is not null;
141+
136142
public string Id => _workerId;
137143

138144
public IDictionary<string, BufferBlock<ScriptInvocationContext>> FunctionInputBuffers => _functionInputBuffers;
@@ -420,6 +426,20 @@ internal void WorkerInitResponse(GrpcEvent initEvent)
420426
_isWorkerApplicationInsightsLoggingEnabled = true;
421427
}
422428

429+
// If http proxying is enabled, we need to get the proxying endpoint of this worker
430+
var httpUri = _workerCapabilities.GetCapabilityState(RpcWorkerConstants.HttpUri);
431+
if (!string.IsNullOrEmpty(httpUri) && FeatureFlags.IsEnabled(ScriptConstants.FeatureFlagEnableHttpProxying))
432+
{
433+
try
434+
{
435+
_httpProxyEndpoint = new Uri(httpUri);
436+
}
437+
catch (Exception ex)
438+
{
439+
HandleWorkerInitError(ex);
440+
}
441+
}
442+
423443
_workerInitTask.TrySetResult(true);
424444
}
425445

@@ -738,6 +758,13 @@ await SendStreamingMessageAsync(new StreamingMessage
738758
{
739759
context.CancellationToken.Register(() => SendInvocationCancel(invocationRequest.InvocationId));
740760
}
761+
762+
if (IsHttpProxyingWorker && FeatureFlags.IsEnabled(ScriptConstants.FeatureFlagEnableHttpProxying) && context.FunctionMetadata.IsHttpTriggerFunction())
763+
{
764+
var aspNetTask = _httpProxyService.ForwardAsync(context, _httpProxyEndpoint).AsTask();
765+
766+
context.Properties.Add(ScriptConstants.HttpProxyTask, aspNetTask);
767+
}
741768
}
742769
catch (Exception invokeEx)
743770
{
@@ -902,6 +929,20 @@ internal async Task InvokeResponse(InvocationResponse invokeResponse)
902929
{
903930
if (invokeResponse.Result.IsInvocationSuccess(context.ResultSource, capabilityEnabled))
904931
{
932+
if (FeatureFlags.IsEnabled(ScriptConstants.FeatureFlagEnableHttpProxying) && IsHttpProxyingWorker)
933+
{
934+
if (context.Properties.TryGetValue(ScriptConstants.HttpProxyTask, out Task<ForwarderError> httpProxyTask))
935+
{
936+
ForwarderError httpProxyTaskResult = await httpProxyTask;
937+
938+
if (httpProxyTaskResult is not ForwarderError.None)
939+
{
940+
// TODO: Understand scenarios where function invocation succeeds but there is an error proxying
941+
// need to investigate different ForwarderErrors and consider how they will be relayed through other services and to users
942+
}
943+
}
944+
}
945+
905946
_metricsLogger.LogEvent(string.Format(MetricEventNames.WorkerInvokeSucceeded, Id));
906947

907948
try

src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannelFactory.cs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,11 @@ public class GrpcWorkerChannelFactory : IRpcWorkerChannelFactory
2626
private readonly ISharedMemoryManager _sharedMemoryManager = null;
2727
private readonly IOptions<WorkerConcurrencyOptions> _workerConcurrencyOptions;
2828
private readonly IOptions<FunctionsHostingConfigOptions> _hostingConfigOptions;
29+
private readonly IHttpProxyService _httpProxyService;
2930

3031
public GrpcWorkerChannelFactory(IScriptEventManager eventManager, IEnvironment environment, ILoggerFactory loggerFactory,
3132
IOptionsMonitor<ScriptApplicationHostOptions> applicationHostOptions, IRpcWorkerProcessFactory rpcWorkerProcessManager, ISharedMemoryManager sharedMemoryManager,
32-
IOptions<WorkerConcurrencyOptions> workerConcurrencyOptions, IOptions<FunctionsHostingConfigOptions> hostingConfigOptions)
33+
IOptions<WorkerConcurrencyOptions> workerConcurrencyOptions, IOptions<FunctionsHostingConfigOptions> hostingConfigOptions, IHttpProxyService httpProxyService)
3334
{
3435
_eventManager = eventManager;
3536
_loggerFactory = loggerFactory;
@@ -39,6 +40,7 @@ public GrpcWorkerChannelFactory(IScriptEventManager eventManager, IEnvironment e
3940
_sharedMemoryManager = sharedMemoryManager;
4041
_workerConcurrencyOptions = workerConcurrencyOptions;
4142
_hostingConfigOptions = hostingConfigOptions;
43+
_httpProxyService = httpProxyService;
4244
}
4345

4446
public IRpcWorkerChannel Create(string scriptRootPath, string runtime, IMetricsLogger metricsLogger, int attemptCount, IEnumerable<RpcWorkerConfig> workerConfigs)
@@ -54,12 +56,12 @@ public IRpcWorkerChannel Create(string scriptRootPath, string runtime, IMetricsL
5456
IWorkerProcess rpcWorkerProcess = _rpcWorkerProcessFactory.Create(workerId, runtime, scriptRootPath, languageWorkerConfig);
5557

5658
return CreateInternal(workerId, _eventManager, languageWorkerConfig, rpcWorkerProcess, workerLogger, metricsLogger, attemptCount,
57-
_environment, _applicationHostOptions, _sharedMemoryManager, _workerConcurrencyOptions, _hostingConfigOptions);
59+
_environment, _applicationHostOptions, _sharedMemoryManager, _workerConcurrencyOptions, _hostingConfigOptions, _httpProxyService);
5860
}
5961

6062
internal virtual IRpcWorkerChannel CreateInternal(string workerId, IScriptEventManager eventManager, RpcWorkerConfig languageWorkerConfig, IWorkerProcess rpcWorkerProcess,
6163
ILogger workerLogger, IMetricsLogger metricsLogger, int attemptCount, IEnvironment environment, IOptionsMonitor<ScriptApplicationHostOptions> applicationHostOptions,
62-
ISharedMemoryManager sharedMemoryManager, IOptions<WorkerConcurrencyOptions> workerConcurrencyOptions, IOptions<FunctionsHostingConfigOptions> hostingConfigOptions)
64+
ISharedMemoryManager sharedMemoryManager, IOptions<WorkerConcurrencyOptions> workerConcurrencyOptions, IOptions<FunctionsHostingConfigOptions> hostingConfigOptions, IHttpProxyService httpProxyService)
6365
{
6466
return new GrpcWorkerChannel(
6567
workerId,
@@ -73,7 +75,8 @@ internal virtual IRpcWorkerChannel CreateInternal(string workerId, IScriptEventM
7375
applicationHostOptions,
7476
sharedMemoryManager,
7577
workerConcurrencyOptions,
76-
hostingConfigOptions);
78+
hostingConfigOptions,
79+
httpProxyService);
7780
}
7881
}
7982
}

src/WebJobs.Script.Grpc/GrpcServiceCollectionsExtensions.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@ public static IServiceCollection AddScriptGrpc(this IServiceCollection services)
1616

1717
services.AddSingleton<IRpcServer, AspNetCoreGrpcServer>();
1818

19+
services.AddHttpForwarder();
20+
services.AddSingleton<IHttpProxyService, DefaultHttpProxyService>();
21+
1922
return services;
2023
}
2124
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
// Copyright (c) .NET Foundation. All rights reserved.
2+
// Licensed under the MIT License. See License.txt in the project root for license information.
3+
4+
using System;
5+
using System.Collections.Generic;
6+
using System.Linq;
7+
using System.Net.Http;
8+
using System.Threading.Tasks;
9+
using Microsoft.AspNetCore.Http;
10+
using Microsoft.Azure.WebJobs.Script.Description;
11+
using Microsoft.Azure.WebJobs.Script.Workers.Rpc;
12+
using Yarp.ReverseProxy.Forwarder;
13+
14+
namespace Microsoft.Azure.WebJobs.Script.Grpc
15+
{
16+
internal class DefaultHttpProxyService : IHttpProxyService, IDisposable
17+
{
18+
private readonly SocketsHttpHandler _handler;
19+
private readonly IHttpForwarder _httpForwarder;
20+
private readonly HttpMessageInvoker _messageInvoker;
21+
private readonly ForwarderRequestConfig _forwarderRequestConfig;
22+
23+
public DefaultHttpProxyService(IHttpForwarder httpForwarder)
24+
{
25+
_httpForwarder = httpForwarder ?? throw new ArgumentNullException(nameof(httpForwarder));
26+
27+
_handler = new SocketsHttpHandler();
28+
_messageInvoker = new HttpMessageInvoker(_handler);
29+
_forwarderRequestConfig = new ForwarderRequestConfig();
30+
}
31+
32+
public void Dispose()
33+
{
34+
_handler?.Dispose();
35+
_messageInvoker?.Dispose();
36+
}
37+
38+
public ValueTask<ForwarderError> ForwardAsync(ScriptInvocationContext context, Uri httpUri)
39+
{
40+
ArgumentNullException.ThrowIfNull(context);
41+
42+
if (context.Inputs is null || context.Inputs?.Count() == 0)
43+
{
44+
throw new InvalidOperationException($"The function {context.FunctionMetadata.Name} can not be evaluated since it has no inputs.");
45+
}
46+
47+
HttpRequest httpRequest = context.Inputs.FirstOrDefault(i => i.Val is HttpRequest).Val as HttpRequest;
48+
49+
if (httpRequest is null)
50+
{
51+
throw new InvalidOperationException($"Cannot proxy the HttpTrigger function {context.FunctionMetadata.Name} without an input of type {nameof(HttpRequest)}.");
52+
}
53+
54+
HttpContext httpContext = httpRequest.HttpContext;
55+
56+
httpContext.Items.Add(ScriptConstants.HttpProxyingEnabled, bool.TrueString);
57+
58+
// add invocation id as correlation id
59+
httpRequest.Headers.TryAdd(ScriptConstants.HttpProxyCorrelationHeader, context.ExecutionContext.InvocationId.ToString());
60+
61+
var aspNetTask = _httpForwarder.SendAsync(httpContext, httpUri.ToString(), _messageInvoker, _forwarderRequestConfig);
62+
63+
return aspNetTask;
64+
}
65+
}
66+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
// Copyright (c) .NET Foundation. All rights reserved.
2+
// Licensed under the MIT License. See License.txt in the project root for license information.
3+
4+
using System;
5+
using System.Threading.Tasks;
6+
using Microsoft.Azure.WebJobs.Script.Description;
7+
using Yarp.ReverseProxy.Forwarder;
8+
9+
namespace Microsoft.Azure.WebJobs.Script.Grpc
10+
{
11+
public interface IHttpProxyService
12+
{
13+
ValueTask<ForwarderError> ForwardAsync(ScriptInvocationContext context, Uri httpUri);
14+
}
15+
}

src/WebJobs.Script.Grpc/WebJobs.Script.Grpc.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
<PrivateAssets>all</PrivateAssets>
3232
</PackageReference>
3333
<PackageReference Include="System.Threading.Channels" Version="6.0.0" />
34+
<PackageReference Include="Yarp.ReverseProxy" Version="2.0.0" />
3435
</ItemGroup>
3536

3637
<ItemGroup>

src/WebJobs.Script.WebHost/Middleware/FunctionInvocationMiddleware.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ public FunctionInvocationMiddleware(RequestDelegate next)
3333
_next = next;
3434
}
3535

36+
// TODO: Confirm that only HttpTrigger requests would flow through here
3637
public async Task Invoke(HttpContext context)
3738
{
3839
if (_next != null)
@@ -52,6 +53,15 @@ public async Task Invoke(HttpContext context)
5253

5354
int nestedProxiesCount = GetNestedProxiesCount(context, functionExecution);
5455
IActionResult result = await GetResultAsync(context, functionExecution);
56+
57+
if (context.Items.TryGetValue(ScriptConstants.HttpProxyingEnabled, out var value))
58+
{
59+
if (value?.ToString() == bool.TrueString)
60+
{
61+
return;
62+
}
63+
}
64+
5565
if (nestedProxiesCount > 0)
5666
{
5767
// if Proxy, the rest of the pipeline will be processed by Proxies in

src/WebJobs.Script/Description/Workers/ScriptInvocationContext.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,5 +31,7 @@ public class ScriptInvocationContext
3131
public ILogger Logger { get; set; }
3232

3333
public System.Threading.ExecutionContext AsyncExecutionContext { get; set; }
34+
35+
public Dictionary<string, object> Properties { get; set; } = new Dictionary<string, object>();
3436
}
3537
}

src/WebJobs.Script/Extensions/FunctionMetadataExtensions.cs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ public static class FunctionMetadataExtensions
1414
private const string IsDisabledKey = "IsDisabled";
1515
private const string IsCodelessKey = "IsCodeless";
1616
private const string FunctionIdKey = "FunctionId";
17+
private const string HttpTriggerKey = "HttpTrigger";
18+
private const string HttpOutputKey = "Http";
1719

1820
public static bool IsHttpInAndOutFunction(this FunctionMetadata metadata)
1921
{
@@ -24,15 +26,20 @@ public static bool IsHttpInAndOutFunction(this FunctionMetadata metadata)
2426

2527
BindingMetadata inputBindingMetadata = metadata.InputBindings.ElementAt(0);
2628
BindingMetadata outputBindingMetadata = metadata.OutputBindings.ElementAt(0);
27-
if (string.Equals("httptrigger", inputBindingMetadata.Type, StringComparison.OrdinalIgnoreCase) &&
28-
string.Equals("http", outputBindingMetadata.Type, StringComparison.OrdinalIgnoreCase))
29+
if (string.Equals(HttpTriggerKey, inputBindingMetadata.Type, StringComparison.OrdinalIgnoreCase) &&
30+
string.Equals(HttpOutputKey, outputBindingMetadata.Type, StringComparison.OrdinalIgnoreCase))
2931
{
3032
return true;
3133
}
3234

3335
return false;
3436
}
3537

38+
public static bool IsHttpTriggerFunction(this FunctionMetadata metadata)
39+
{
40+
return metadata.InputBindings.Any(b => string.Equals(HttpTriggerKey, b.Type, StringComparison.OrdinalIgnoreCase));
41+
}
42+
3643
public static string GetFunctionId(this FunctionMetadata metadata)
3744
{
3845
if (!metadata.Properties.TryGetValue(FunctionIdKey, out object idObj)

src/WebJobs.Script/ScriptConstants.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ public static class ScriptConstants
125125
public const string FeatureFlagEnableWorkerIndexing = "EnableWorkerIndexing";
126126
public const string FeatureFlagEnableDebugTracing = "EnableDebugTracing";
127127
public const string FeatureFlagEnableProxies = "EnableProxies";
128+
public const string FeatureFlagEnableHttpProxying = "EnableHttpProxying";
128129
public const string HostingConfigDisableLinuxAppServiceDetailedExecutionEvents = "DisableLinuxExecutionDetails";
129130
public const string HostingConfigDisableLinuxAppServiceExecutionEventLogBackoff = "DisableLinuxLogBackoff";
130131

@@ -225,5 +226,10 @@ public static class ScriptConstants
225226
public static readonly string LiveLogsSessionAIKey = "#AzFuncLiveLogsSessionId";
226227

227228
public static readonly string FunctionsHostingConfigSectionName = "FunctionsHostingConfig";
229+
230+
// HTTP Proxying constants
231+
public static readonly string HttpProxyingEnabled = "HttpProxyingEnabled";
232+
public static readonly string HttpProxyCorrelationHeader = "x-ms-invocation-id";
233+
public static readonly string HttpProxyTask = "HttpProxyTask";
228234
}
229235
}

0 commit comments

Comments
 (0)