Skip to content

Commit c112509

Browse files
mgravellliliankasemMarc GravellNickCraver
authored
remove RX as the primary pipe for gRPC communications (#8281)
### Issue describing the changes in this PR resolves #8280; performance locally is a 5x+ speedup in throughput the main crux of the new flow is described in src/WebJobs.Script.Grpc/Eventing/GrpcEventExtensions.cs, but in short: we establish a per-worker `Channel<InboundGrpcEvent>` and `Channel<OutboundGrpcEvent>` to handle the backlog; instead of publishing to RX, we publish to the writer of the relevant queue. Separately, we have an async worker deque data from the queues, and process accordingly. This is much more direct, avoids a *lot* of RX machinery, and creates isolation between workers. ### Pull request checklist * [ ] My changes **do not** require documentation changes * [ ] Otherwise: Documentation issue linked to PR * [ ] My changes **should not** be added to the release notes for the next release * [ ] Otherwise: I've added my notes to `release_notes.md` * [ ] My changes **do not** need to be backported to a previous version * [ ] Otherwise: Backport tracked by issue/PR #issue_or_pr * [ ] I have added all required tests (Unit tests, E2E tests) <!-- Optional: delete if not applicable --> ### Additional information Additional PR information Co-authored-by: Lilian Kasem <[email protected]> Co-authored-by: Marc Gravell <[email protected]> Co-authored-by: Nick Craver <[email protected]> Co-authored-by: Nick Craver <[email protected]>
1 parent 5d4e651 commit c112509

File tree

18 files changed

+967
-411
lines changed

18 files changed

+967
-411
lines changed

src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs

Lines changed: 278 additions & 62 deletions
Large diffs are not rendered by default.

src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannelFactory.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
using System.Reactive.Linq;
88
using Microsoft.Azure.WebJobs.Script.Diagnostics;
99
using Microsoft.Azure.WebJobs.Script.Eventing;
10+
using Microsoft.Azure.WebJobs.Script.Grpc.Eventing;
1011
using Microsoft.Azure.WebJobs.Script.Workers;
1112
using Microsoft.Azure.WebJobs.Script.Workers.Rpc;
1213
using Microsoft.Azure.WebJobs.Script.Workers.SharedMemoryDataTransfer;
@@ -47,6 +48,7 @@ public IRpcWorkerChannel Create(string scriptRootPath, string runtime, IMetricsL
4748
throw new InvalidOperationException($"WorkerCofig for runtime: {runtime} not found");
4849
}
4950
string workerId = Guid.NewGuid().ToString();
51+
_eventManager.AddGrpcChannels(workerId); // prepare the inbound/outbound dedicated channels
5052
ILogger workerLogger = _loggerFactory.CreateLogger($"Worker.LanguageWorkerChannel.{runtime}.{workerId}");
5153
IWorkerProcess rpcWorkerProcess = _rpcWorkerProcessFactory.Create(workerId, runtime, scriptRootPath, languageWorkerConfig);
5254
return new GrpcWorkerChannel(
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
// Copyright (c) .NET Foundation. All rights reserved.
2+
// Licensed under the MIT License. See License.txt in the project root for license information.
3+
4+
using System;
5+
using System.Threading.Channels;
6+
using Microsoft.Azure.WebJobs.Script.Eventing;
7+
8+
namespace Microsoft.Azure.WebJobs.Script.Grpc.Eventing;
9+
10+
internal static class GrpcEventExtensions
11+
{
12+
// flow here is:
13+
// 1) external request is proxied to the the GrpcWorkerChannel via one of the many Send* APIs, which writes
14+
// to outbound-writer; this means we can have concurrent writes to outbound
15+
// 2) if an out-of-process function is connected, a FunctionRpcService-EventStream will consume
16+
// from outbound-reader (we'll allow for the multi-stream possibility, hence concurrent), and push it via gRPC
17+
// 3) when the out-of-process function provides a response to FunctionRpcService-EventStream, it is written to
18+
// inbound-writer (note we will allow for multi-stream possibility)
19+
// 4) the GrpcWorkerChannel has a single dedicated consumer of inbound-reader, which it then marries to
20+
// in-flight operations
21+
internal static readonly UnboundedChannelOptions InboundOptions = new UnboundedChannelOptions
22+
{
23+
SingleReader = true, // see 4
24+
SingleWriter = false, // see 3
25+
AllowSynchronousContinuations = false,
26+
};
27+
28+
internal static readonly UnboundedChannelOptions OutboundOptions = new UnboundedChannelOptions
29+
{
30+
SingleReader = false, // see 2
31+
SingleWriter = false, // see 1
32+
AllowSynchronousContinuations = false,
33+
};
34+
35+
public static void AddGrpcChannels(this IScriptEventManager manager, string workerId)
36+
{
37+
var inbound = Channel.CreateUnbounded<InboundGrpcEvent>(InboundOptions);
38+
if (manager.TryAddWorkerState(workerId, inbound))
39+
{
40+
var outbound = Channel.CreateUnbounded<OutboundGrpcEvent>(OutboundOptions);
41+
if (manager.TryAddWorkerState(workerId, outbound))
42+
{
43+
return; // successfully added both
44+
}
45+
// we added the inbound but not the outbound; revert
46+
manager.TryRemoveWorkerState(workerId, out inbound);
47+
}
48+
// this is not anticipated, so don't panic abount the allocs above
49+
throw new ArgumentException("Duplicate worker id: " + workerId, nameof(workerId));
50+
}
51+
52+
public static bool TryGetGrpcChannels(this IScriptEventManager manager, string workerId, out Channel<InboundGrpcEvent> inbound, out Channel<OutboundGrpcEvent> outbound)
53+
=> manager.TryGetWorkerState(workerId, out inbound) & manager.TryGetWorkerState(workerId, out outbound);
54+
55+
public static void RemoveGrpcChannels(this IScriptEventManager manager, string workerId)
56+
{
57+
// remove any channels, and shut them down
58+
if (manager.TryGetWorkerState<Channel<InboundGrpcEvent>>(workerId, out var inbound))
59+
{
60+
inbound.Writer.TryComplete();
61+
}
62+
if (manager.TryGetWorkerState<Channel<OutboundGrpcEvent>>(workerId, out var outbound))
63+
{
64+
outbound.Writer.TryComplete();
65+
}
66+
}
67+
}

src/WebJobs.Script.Grpc/Extensions/InboundGrpcEventExtensions.cs

Lines changed: 0 additions & 22 deletions
This file was deleted.

src/WebJobs.Script.Grpc/Server/FunctionRpcService.cs

Lines changed: 64 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,9 @@
22
// Licensed under the MIT License. See License.txt in the project root for license information.
33

44
using System;
5-
using System.Collections.Generic;
6-
using System.Reactive.Concurrency;
75
using System.Reactive.Linq;
86
using System.Threading;
7+
using System.Threading.Channels;
98
using System.Threading.Tasks;
109
using Grpc.Core;
1110
using Microsoft.Azure.WebJobs.Script.Eventing;
@@ -21,7 +20,6 @@ namespace Microsoft.Azure.WebJobs.Script.Grpc
2120
// TODO: move to WebJobs.Script.Grpc package and provide event stream abstraction
2221
internal class FunctionRpcService : FunctionRpc.FunctionRpcBase
2322
{
24-
private readonly SemaphoreSlim _writeLock = new SemaphoreSlim(1, 1);
2523
private readonly IScriptEventManager _eventManager;
2624
private readonly ILogger _logger;
2725

@@ -33,64 +31,48 @@ public FunctionRpcService(IScriptEventManager eventManager, ILogger<FunctionRpcS
3331

3432
public override async Task EventStream(IAsyncStreamReader<StreamingMessage> requestStream, IServerStreamWriter<StreamingMessage> responseStream, ServerCallContext context)
3533
{
36-
var cancelSource = new TaskCompletionSource<bool>();
37-
IDictionary<string, IDisposable> outboundEventSubscriptions = new Dictionary<string, IDisposable>();
38-
34+
var cancelSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
35+
var cts = CancellationTokenSource.CreateLinkedTokenSource(context.CancellationToken);
36+
CancellationTokenRegistration ctr = cts.Token.Register(static state => ((TaskCompletionSource<bool>)state).TrySetResult(false), cancelSource);
3937
try
4038
{
41-
context.CancellationToken.Register(() => cancelSource.TrySetResult(false));
42-
Func<Task<bool>> messageAvailable = async () =>
39+
static Task<Task<bool>> MoveNextAsync(IAsyncStreamReader<StreamingMessage> requestStream, TaskCompletionSource<bool> cancelSource)
4340
{
4441
// GRPC does not accept cancellation tokens for individual reads, hence wrapper
4542
var requestTask = requestStream.MoveNext(CancellationToken.None);
46-
var completed = await Task.WhenAny(cancelSource.Task, requestTask);
47-
return completed.Result;
48-
};
43+
return Task.WhenAny(cancelSource.Task, requestTask);
44+
}
4945

50-
if (await messageAvailable())
46+
if (await await MoveNextAsync(requestStream, cancelSource))
5147
{
52-
string workerId = requestStream.Current.StartStream.WorkerId;
53-
_logger.LogDebug("Established RPC channel. WorkerId: {workerId}", workerId);
54-
outboundEventSubscriptions.Add(workerId, _eventManager.OfType<OutboundGrpcEvent>()
55-
.Where(evt => evt.WorkerId == workerId)
56-
.ObserveOn(NewThreadScheduler.Default)
57-
.Subscribe(async evt =>
48+
var currentMessage = requestStream.Current;
49+
// expect first operation (and only the first; we don't support re-registration) to be StartStream
50+
if (currentMessage.ContentCase == MsgType.StartStream)
51+
{
52+
var workerId = currentMessage.StartStream?.WorkerId;
53+
if (!string.IsNullOrEmpty(workerId) && _eventManager.TryGetGrpcChannels(workerId, out var inbound, out var outbound))
5854
{
59-
try
60-
{
61-
if (evt.MessageType == MsgType.InvocationRequest)
62-
{
63-
_logger.LogTrace("Writing invocation request invocationId: {invocationId} to workerId: {workerId}", evt.Message.InvocationRequest.InvocationId, workerId);
64-
}
55+
// send work
56+
_ = PushFromOutboundToGrpc(workerId, responseStream, outbound.Reader, cts.Token);
6557

66-
try
58+
// this loop is "pull from gRPC and push to inbound"
59+
do
60+
{
61+
currentMessage = requestStream.Current;
62+
if (currentMessage.ContentCase == MsgType.InvocationResponse && !string.IsNullOrEmpty(currentMessage.InvocationResponse?.InvocationId))
6763
{
68-
// WriteAsync only allows one pending write at a time, so we
69-
// serialize access to the stream for each subscription
70-
await _writeLock.WaitAsync();
71-
await responseStream.WriteAsync(evt.Message);
64+
_logger.LogTrace("Received invocation response for invocationId: {invocationId} from workerId: {workerId}", currentMessage.InvocationResponse.InvocationId, workerId);
7265
}
73-
finally
66+
var newInbound = new InboundGrpcEvent(workerId, currentMessage);
67+
if (!inbound.Writer.TryWrite(newInbound))
7468
{
75-
_writeLock.Release();
69+
await inbound.Writer.WriteAsync(newInbound);
7670
}
71+
currentMessage = null; // allow old messages to be collected while we wait
7772
}
78-
catch (Exception subscribeEventEx)
79-
{
80-
_logger.LogError(subscribeEventEx, "Error writing message type {messageType} to workerId: {workerId}", evt.MessageType, workerId);
81-
}
82-
}));
83-
84-
do
85-
{
86-
var currentMessage = requestStream.Current;
87-
if (currentMessage.InvocationResponse != null && !string.IsNullOrEmpty(currentMessage.InvocationResponse.InvocationId))
88-
{
89-
_logger.LogTrace("Received invocation response for invocationId: {invocationId} from workerId: {workerId}", currentMessage.InvocationResponse.InvocationId, workerId);
73+
while (await await MoveNextAsync(requestStream, cancelSource));
9074
}
91-
_eventManager.Publish(new InboundGrpcEvent(workerId, currentMessage));
9275
}
93-
while (await messageAvailable());
9476
}
9577
}
9678
catch (Exception rpcException)
@@ -101,14 +83,47 @@ public override async Task EventStream(IAsyncStreamReader<StreamingMessage> requ
10183
}
10284
finally
10385
{
104-
foreach (var sub in outboundEventSubscriptions)
105-
{
106-
sub.Value?.Dispose();
107-
}
86+
cts.Cancel();
87+
ctr.Dispose();
10888

10989
// ensure cancellationSource task completes
11090
cancelSource.TrySetResult(false);
11191
}
11292
}
93+
94+
private async Task PushFromOutboundToGrpc(string workerId, IServerStreamWriter<StreamingMessage> responseStream, ChannelReader<OutboundGrpcEvent> source, CancellationToken cancellationToken)
95+
{
96+
try
97+
{
98+
_logger.LogDebug("Established RPC channel. WorkerId: {workerId}", workerId);
99+
await Task.Yield(); // free up the caller
100+
while (await source.WaitToReadAsync(cancellationToken))
101+
{
102+
while (source.TryRead(out var evt))
103+
{
104+
if (evt.MessageType == MsgType.InvocationRequest)
105+
{
106+
_logger.LogTrace("Writing invocation request invocationId: {invocationId} to workerId: {workerId}", evt.Message.InvocationRequest.InvocationId, workerId);
107+
}
108+
try
109+
{
110+
await responseStream.WriteAsync(evt.Message);
111+
}
112+
catch (Exception subscribeEventEx)
113+
{
114+
_logger.LogError(subscribeEventEx, "Error writing message type {messageType} to workerId: {workerId}", evt.MessageType, workerId);
115+
}
116+
}
117+
}
118+
}
119+
catch (OperationCanceledException oce) when (oce.CancellationToken == cancellationToken)
120+
{
121+
// that's fine; normaly exit through cancellation
122+
}
123+
catch (Exception ex)
124+
{
125+
_logger.LogError(ex, "Error pushing from outbound to gRPC");
126+
}
127+
}
113128
}
114129
}

src/WebJobs.Script.Grpc/WebJobs.Script.Grpc.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
<PackageReference Include="StyleCop.Analyzers" Version="1.1.0-beta004">
3131
<PrivateAssets>all</PrivateAssets>
3232
</PackageReference>
33+
<PackageReference Include="System.Threading.Channels" Version="6.0.0" />
3334
</ItemGroup>
3435

3536
<ItemGroup>

src/WebJobs.Script/Eventing/IScriptEventManager.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,11 @@ namespace Microsoft.Azure.WebJobs.Script.Eventing
88
public interface IScriptEventManager : IObservable<ScriptEvent>
99
{
1010
void Publish(ScriptEvent scriptEvent);
11+
12+
bool TryAddWorkerState<T>(string workerId, T state);
13+
14+
bool TryGetWorkerState<T>(string workerId, out T state);
15+
16+
bool TryRemoveWorkerState<T>(string workerId, out T state);
1117
}
1218
}

src/WebJobs.Script/Eventing/ScriptEventManager.cs

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,16 @@
22
// Licensed under the MIT License. See License.txt in the project root for license information.
33

44
using System;
5+
using System.Collections.Concurrent;
56
using System.Reactive.Subjects;
67

78
namespace Microsoft.Azure.WebJobs.Script.Eventing
89
{
9-
public sealed class ScriptEventManager : IScriptEventManager, IDisposable
10+
public class ScriptEventManager : IScriptEventManager, IDisposable
1011
{
1112
private readonly Subject<ScriptEvent> _subject = new Subject<ScriptEvent>();
13+
private readonly ConcurrentDictionary<(string, Type), object> _workerState = new ();
14+
1215
private bool _disposed = false;
1316

1417
public void Publish(ScriptEvent scriptEvent)
@@ -47,5 +50,35 @@ private void Dispose(bool disposing)
4750
}
4851

4952
public void Dispose() => Dispose(true);
53+
54+
bool IScriptEventManager.TryAddWorkerState<T>(string workerId, T state)
55+
{
56+
var key = (workerId, typeof(T));
57+
return _workerState.TryAdd(key, state);
58+
}
59+
60+
bool IScriptEventManager.TryGetWorkerState<T>(string workerId, out T state)
61+
{
62+
var key = (workerId, typeof(T));
63+
if (_workerState.TryGetValue(key, out var tmp) && tmp is T typed)
64+
{
65+
state = typed;
66+
return true;
67+
}
68+
state = default;
69+
return false;
70+
}
71+
72+
bool IScriptEventManager.TryRemoveWorkerState<T>(string workerId, out T state)
73+
{
74+
var key = (workerId, typeof(T));
75+
if (_workerState.TryRemove(key, out var tmp) && tmp is T typed)
76+
{
77+
state = typed;
78+
return true;
79+
}
80+
state = default;
81+
return false;
82+
}
5083
}
5184
}

src/WebJobs.Script/Workers/Rpc/RpcWorkerConstants.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ public static class RpcWorkerConstants
6262

6363
// Host Capabilities
6464
public const string V2Compatable = "V2Compatable";
65+
public const string MultiStream = nameof(MultiStream);
6566

6667
// dotnet executable file path components
6768
public const string DotNetExecutableName = "dotnet";

0 commit comments

Comments
 (0)