Skip to content

Commit 92d51f0

Browse files
authored
Enable ordering of RpcLog and InvocationResponse messages (#9657)
1 parent 6686ecc commit 92d51f0

11 files changed

+595
-20
lines changed

src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs

Lines changed: 105 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
using Microsoft.Extensions.Logging;
3333
using Microsoft.Extensions.Options;
3434
using Yarp.ReverseProxy.Forwarder;
35-
3635
using static Microsoft.Azure.WebJobs.Script.Grpc.Messages.RpcLog.Types;
3736
using FunctionMetadata = Microsoft.Azure.WebJobs.Script.Description.FunctionMetadata;
3837
using MsgType = Microsoft.Azure.WebJobs.Script.Grpc.Messages.StreamingMessage.ContentOneofCase;
@@ -51,6 +50,7 @@ internal partial class GrpcWorkerChannel : IRpcWorkerChannel, IDisposable
5150
private readonly List<TimeSpan> _workerStatusLatencyHistory = new List<TimeSpan>();
5251
private readonly IOptions<WorkerConcurrencyOptions> _workerConcurrencyOptions;
5352
private readonly WaitCallback _processInbound;
53+
private readonly IInvocationMessageDispatcherFactory _messageDispatcherFactory;
5454
private readonly object _syncLock = new object();
5555
private readonly object _metadataLock = new object();
5656
private readonly Dictionary<MsgType, Queue<PendingItem>> _pendingActions = new();
@@ -65,7 +65,7 @@ internal partial class GrpcWorkerChannel : IRpcWorkerChannel, IDisposable
6565
private RpcWorkerChannelState _state;
6666
private IDictionary<string, Exception> _functionLoadErrors = new Dictionary<string, Exception>();
6767
private IDictionary<string, Exception> _metadataRequestErrors = new Dictionary<string, Exception>();
68-
private ConcurrentDictionary<string, ScriptInvocationContext> _executingInvocations = new ConcurrentDictionary<string, ScriptInvocationContext>();
68+
private ConcurrentDictionary<string, ExecutingInvocation> _executingInvocations = new();
6969
private IDictionary<string, BufferBlock<ScriptInvocationContext>> _functionInputBuffers = new ConcurrentDictionary<string, BufferBlock<ScriptInvocationContext>>();
7070
private ConcurrentDictionary<string, TaskCompletionSource<bool>> _workerStatusRequests = new ConcurrentDictionary<string, TaskCompletionSource<bool>>();
7171
private List<IDisposable> _inputLinks = new List<IDisposable>();
@@ -137,6 +137,9 @@ internal GrpcWorkerChannel(
137137
_startLatencyMetric = metricsLogger?.LatencyEvent(string.Format(MetricEventNames.WorkerInitializeLatency, workerConfig.Description.Language, attemptCount));
138138

139139
_state = RpcWorkerChannelState.Default;
140+
141+
// Temporary switch to allow fully testing new algorithm in production
142+
_messageDispatcherFactory = GetProcessorFactory();
140143
}
141144

142145
private bool IsHttpProxyingWorker => _httpProxyEndpoint is not null;
@@ -149,6 +152,23 @@ internal GrpcWorkerChannel(
149152

150153
public RpcWorkerConfig WorkerConfig => _workerConfig;
151154

155+
// Temporary switch that allows us to move between the "old" ThreadPool-only processor
156+
// and a "new" Channel processor (for proper ordering of messages).
157+
private IInvocationMessageDispatcherFactory GetProcessorFactory()
158+
{
159+
if (_hostingConfigOptions.Value.EnableOrderedInvocationMessages ||
160+
FeatureFlags.IsEnabled(ScriptConstants.FeatureFlagEnableOrderedInvocationmessages, _environment))
161+
{
162+
_workerChannelLogger.LogDebug($"Using {nameof(OrderedInvocationMessageDispatcherFactory)}.");
163+
return new OrderedInvocationMessageDispatcherFactory(ProcessItem, _workerChannelLogger);
164+
}
165+
else
166+
{
167+
_workerChannelLogger.LogDebug($"Using {nameof(ThreadPoolInvocationProcessorFactory)}.");
168+
return new ThreadPoolInvocationProcessorFactory(_processInbound);
169+
}
170+
}
171+
152172
private void ProcessItem(InboundGrpcEvent msg)
153173
{
154174
// note this method is a thread-pool (QueueUserWorkItem) entry-point
@@ -251,7 +271,8 @@ private async Task ProcessInbound()
251271
{
252272
Logger.ChannelReceivedMessage(_workerChannelLogger, msg.WorkerId, msg.MessageType);
253273
}
254-
ThreadPool.QueueUserWorkItem(_processInbound, msg);
274+
275+
DispatchMessage(msg);
255276
}
256277
}
257278
}
@@ -266,6 +287,40 @@ private async Task ProcessInbound()
266287
}
267288
}
268289

290+
private void DispatchMessage(InboundGrpcEvent msg)
291+
{
292+
// RpcLog and InvocationResponse messages are special. They need to be handled by the InvocationMessageDispatcher
293+
switch (msg.MessageType)
294+
{
295+
case MsgType.RpcLog when msg.Message.RpcLog.LogCategory == RpcLogCategory.User || msg.Message.RpcLog.LogCategory == RpcLogCategory.CustomMetric:
296+
if (_executingInvocations.TryGetValue(msg.Message.RpcLog.InvocationId, out var invocation))
297+
{
298+
invocation.Dispatcher.DispatchRpcLog(msg);
299+
}
300+
else
301+
{
302+
// We received a log outside of a invocation
303+
ThreadPool.QueueUserWorkItem(_processInbound, msg);
304+
}
305+
break;
306+
case MsgType.InvocationResponse:
307+
if (_executingInvocations.TryGetValue(msg.Message.InvocationResponse.InvocationId, out invocation))
308+
{
309+
invocation.Dispatcher.DispatchInvocationResponse(msg);
310+
}
311+
else
312+
{
313+
// This should never happen, but if it does, just send it to the ThreadPool.
314+
ThreadPool.QueueUserWorkItem(_processInbound, msg);
315+
}
316+
break;
317+
default:
318+
// All other messages can go to the thread pool.
319+
ThreadPool.QueueUserWorkItem(_processInbound, msg);
320+
break;
321+
}
322+
}
323+
269324
public bool IsChannelReadyForInvocations()
270325
{
271326
return !_disposing && !_disposed && _state.HasFlag(RpcWorkerChannelState.InvocationBuffersInitialized | RpcWorkerChannelState.Initialized);
@@ -750,14 +805,14 @@ internal async Task SendInvocationRequest(ScriptInvocationContext context)
750805
{
751806
_workerChannelLogger.LogDebug("Function {functionName} failed to load", context.FunctionMetadata.Name);
752807
context.ResultSource.TrySetException(_functionLoadErrors[context.FunctionMetadata.GetFunctionId()]);
753-
_executingInvocations.TryRemove(invocationId, out ScriptInvocationContext _);
808+
RemoveExecutingInvocation(invocationId);
754809
return;
755810
}
756811
else if (_metadataRequestErrors.ContainsKey(context.FunctionMetadata.GetFunctionId()))
757812
{
758813
_workerChannelLogger.LogDebug("Worker failed to load metadata for {functionName}", context.FunctionMetadata.Name);
759814
context.ResultSource.TrySetException(_metadataRequestErrors[context.FunctionMetadata.GetFunctionId()]);
760-
_executingInvocations.TryRemove(invocationId, out ScriptInvocationContext _);
815+
RemoveExecutingInvocation(invocationId);
761816
return;
762817
}
763818

@@ -771,7 +826,7 @@ internal async Task SendInvocationRequest(ScriptInvocationContext context)
771826

772827
var invocationRequest = await context.ToRpcInvocationRequest(_workerChannelLogger, _workerCapabilities, _isSharedMemoryDataTransferEnabled, _sharedMemoryManager);
773828
AddAdditionalTraceContext(invocationRequest.TraceContext.Attributes, context);
774-
_executingInvocations.TryAdd(invocationRequest.InvocationId, context);
829+
_executingInvocations.TryAdd(invocationRequest.InvocationId, new(context, _messageDispatcherFactory.Create(invocationRequest.InvocationId)));
775830
_metricsLogger.LogEvent(string.Format(MetricEventNames.WorkerInvoked, Id), functionName: Sanitizer.Sanitize(context.FunctionMetadata.Name));
776831

777832
await SendStreamingMessageAsync(new StreamingMessage
@@ -980,8 +1035,9 @@ internal async Task InvokeResponse(InvocationResponse invokeResponse)
9801035
// Check if the worker supports logging user-code-thrown exceptions to app insights
9811036
bool capabilityEnabled = !string.IsNullOrEmpty(_workerCapabilities.GetCapabilityState(RpcWorkerConstants.EnableUserCodeException));
9821037

983-
if (_executingInvocations.TryRemove(invokeResponse.InvocationId, out ScriptInvocationContext context))
1038+
if (_executingInvocations.TryRemove(invokeResponse.InvocationId, out var invocation))
9841039
{
1040+
var context = invocation.Context;
9851041
if (invokeResponse.Result.IsInvocationSuccess(context.ResultSource, capabilityEnabled))
9861042
{
9871043
_metricsLogger.LogEvent(string.Format(MetricEventNames.WorkerInvokeSucceeded, Id));
@@ -1051,6 +1107,8 @@ internal async Task InvokeResponse(InvocationResponse invokeResponse)
10511107
SendCloseSharedMemoryResourcesForInvocationRequest(outputMaps);
10521108
}
10531109
}
1110+
1111+
invocation.Dispose();
10541112
}
10551113
else
10561114
{
@@ -1083,8 +1141,10 @@ internal void SendCloseSharedMemoryResourcesForInvocationRequest(IList<string> o
10831141
internal void Log(GrpcEvent msg)
10841142
{
10851143
var rpcLog = msg.Message.RpcLog;
1086-
if (_executingInvocations.TryGetValue(rpcLog.InvocationId, out ScriptInvocationContext context))
1144+
if (_executingInvocations.TryGetValue(rpcLog.InvocationId, out var invocation))
10871145
{
1146+
var context = invocation.Context;
1147+
10881148
// Restore the execution context from the original invocation. This allows AsyncLocal state to flow to loggers.
10891149
System.Threading.ExecutionContext.Run(context.AsyncExecutionContext, static (state) =>
10901150
{
@@ -1273,12 +1333,25 @@ internal void ReceiveWorkerStatusResponse(string requestId, WorkerStatusResponse
12731333
}
12741334
}
12751335

1336+
private void RemoveExecutingInvocation(string invocationId)
1337+
{
1338+
if (_executingInvocations.TryRemove(invocationId, out var invocation))
1339+
{
1340+
invocation.Dispose();
1341+
}
1342+
}
1343+
12761344
protected virtual void Dispose(bool disposing)
12771345
{
12781346
if (!_disposed)
12791347
{
12801348
if (disposing)
12811349
{
1350+
foreach (var id in _executingInvocations.Keys)
1351+
{
1352+
RemoveExecutingInvocation(id);
1353+
}
1354+
12821355
_startLatencyMetric?.Dispose();
12831356
_workerInitTask?.TrySetCanceled();
12841357
_timer?.Dispose();
@@ -1366,9 +1439,9 @@ private void StopWorkerProcess()
13661439
public async Task DrainInvocationsAsync()
13671440
{
13681441
_workerChannelLogger.LogDebug("Count of in-buffer invocations waiting to be drained out: {invocationCount}", _executingInvocations.Count);
1369-
foreach (ScriptInvocationContext currContext in _executingInvocations.Values)
1442+
foreach (var invocation in _executingInvocations.Values)
13701443
{
1371-
await currContext.ResultSource.Task;
1444+
await invocation.Context.ResultSource.Task;
13721445
}
13731446
}
13741447

@@ -1384,12 +1457,12 @@ public bool TryFailExecutions(Exception workerException)
13841457
return false;
13851458
}
13861459

1387-
foreach (ScriptInvocationContext currContext in _executingInvocations?.Values)
1460+
foreach (var invocation in _executingInvocations?.Values)
13881461
{
1389-
string invocationId = currContext?.ExecutionContext?.InvocationId.ToString();
1462+
string invocationId = invocation.Context?.ExecutionContext?.InvocationId.ToString();
13901463
_workerChannelLogger.LogDebug("Worker '{workerId}' encountered a fatal error. Failing invocation: '{invocationId}'", _workerId, invocationId);
1391-
currContext?.ResultSource?.TrySetException(workerException);
1392-
_executingInvocations.TryRemove(invocationId, out ScriptInvocationContext _);
1464+
invocation.Context?.ResultSource?.TrySetException(workerException);
1465+
RemoveExecutingInvocation(invocationId);
13931466
}
13941467
return true;
13951468
}
@@ -1529,6 +1602,24 @@ private void AddAdditionalTraceContext(MapField<string, string> attributes, Scri
15291602
}
15301603
}
15311604

1605+
private sealed class ExecutingInvocation : IDisposable
1606+
{
1607+
public ExecutingInvocation(ScriptInvocationContext context, IInvocationMessageDispatcher dispatcher)
1608+
{
1609+
Context = context;
1610+
Dispatcher = dispatcher;
1611+
}
1612+
1613+
public ScriptInvocationContext Context { get; }
1614+
1615+
public IInvocationMessageDispatcher Dispatcher { get; }
1616+
1617+
public void Dispose()
1618+
{
1619+
(Dispatcher as IDisposable)?.Dispose();
1620+
}
1621+
}
1622+
15321623
private sealed class PendingItem
15331624
{
15341625
private readonly Action<InboundGrpcEvent> _callback;
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
// Copyright (c) .NET Foundation. All rights reserved.
2+
// Licensed under the MIT License. See License.txt in the project root for license information.
3+
4+
using Microsoft.Azure.WebJobs.Script.Grpc.Eventing;
5+
6+
namespace Microsoft.Azure.WebJobs.Script.Grpc;
7+
8+
/// <summary>
9+
/// Interface for processing grpc messages that may come from the worker that are
10+
/// related to an invocation (RpcLog and InvocationResponse). The contract here is as-follows:
11+
/// - The contract with a worker is that, during an invocation (i.e an InvocationRequest has been sent), there are only
12+
/// two messages that the worker can send us related to that invocation
13+
/// - one or many RpcLog messages
14+
/// - one final InvocationResponse that effectively ends the invocation. Once the InvocationResponse is received, no
15+
/// more RpcLog messages from this specific invocation will be processed.
16+
/// - The GrpcChannel is looping to dequeue grpc messages from a worker. When it finds one that is either an RpcLog
17+
/// or an InvocationResponse, it will will call the matching method on this interface (i.e. RpcLog -> DispatchRpcLog)
18+
/// from the same thread that is looping and dequeuing items from the grpc channel. The implementors of this interface
19+
/// must quickly dispatch the message to a background Task or Thread for handling, so as to not block the
20+
/// main loop from dequeuing more messages.
21+
/// - Because the methods on this interface are all on being called from the same thread, they do not need to be
22+
/// thread-safe. They can assume that they will not be called multiple times from different threads.
23+
/// </summary>
24+
internal interface IInvocationMessageDispatcher
25+
{
26+
/// <summary>
27+
/// Inspects the incoming RpcLog and dispatches to a Thread or background Task as quickly as possible. This method is
28+
/// called from a loop processing incoming grpc messages and any thread blocking will delay the processing of that loop.
29+
/// It can be assumed that this method will never be called from multiple threads simultaneously and thus does not need
30+
/// to be thread-safe.
31+
/// </summary>
32+
/// <param name="msg">The RpcLog message. Implementors can assume that this message is an RpcLog.</param>
33+
void DispatchRpcLog(InboundGrpcEvent msg);
34+
35+
/// <summary>
36+
/// Inspects an incoming InvocationResponse message and dispatches to a Thread or background Task as quickly as possible.
37+
/// This method is called from a loop processing incoming grpc messages and any thread blocking will delay the processing
38+
/// of that loop. It can be assumed that this method will never be called from multiple threads simultaneously and thus
39+
/// does not need to be thread-safe.
40+
/// </summary>
41+
/// <param name="msg">The InvocationResponse message. Implementors can assume that this message is an InvocationResponse.</param>
42+
void DispatchInvocationResponse(InboundGrpcEvent msg);
43+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
// Copyright (c) .NET Foundation. All rights reserved.
2+
// Licensed under the MIT License. See License.txt in the project root for license information.
3+
4+
namespace Microsoft.Azure.WebJobs.Script.Grpc;
5+
6+
internal interface IInvocationMessageDispatcherFactory
7+
{
8+
IInvocationMessageDispatcher Create(string invocationId);
9+
}

0 commit comments

Comments
 (0)