Skip to content

Commit 2d05b30

Browse files
committed
add interceptor v1
1 parent 8b6b9f4 commit 2d05b30

File tree

3 files changed

+271
-0
lines changed

3 files changed

+271
-0
lines changed

src/Extensions/AzureBlobPayloads/DependencyInjection/DurableTaskClientBuilderExtensions.AzureBlobPayloads.cs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,27 @@ public static IDurableTaskClientBuilder UseExternalizedPayloads(
3131
return new BlobPayloadStore(opts);
3232
});
3333

34+
// Wrap the gRPC CallInvoker with our interceptor when using the gRPC client
35+
builder.Services
36+
.AddOptions<GrpcDurableTaskClientOptions>(builder.Name)
37+
.PostConfigure<IPayloadStore, IOptionsMonitor<LargePayloadStorageOptions>>((opt, store, monitor) =>
38+
{
39+
LargePayloadStorageOptions opts = monitor.Get(builder.Name);
40+
if (opt.Channel is not null)
41+
{
42+
var invoker = opt.Channel.Intercept(new Worker.Grpc.Internal.AzureBlobPayloadsInterceptor(store, opts));
43+
opt.CallInvoker = invoker;
44+
}
45+
else if (opt.CallInvoker is not null)
46+
{
47+
opt.CallInvoker = opt.CallInvoker.Intercept(new Worker.Grpc.Internal.AzureBlobPayloadsInterceptor(store, opts));
48+
}
49+
else if (!string.IsNullOrEmpty(opt.Address))
50+
{
51+
// Channel will be built later; we can't intercept here. This will be handled in the client if CallInvoker is null.
52+
}
53+
});
54+
3455
// builder.Services
3556
// .AddOptions<DurableTaskClientOptions>(builder.Name)
3657
// .PostConfigure<IPayloadStore, IOptionsMonitor<LargePayloadStorageOptions>>((opt, store, monitor) =>

src/Extensions/AzureBlobPayloads/DependencyInjection/DurableTaskWorkerBuilderExtensions.AzureBlobPayloads.cs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,27 @@ public static IDurableTaskWorkerBuilder UseExternalizedPayloads(
3030
return new BlobPayloadStore(opts);
3131
});
3232

33+
// Wrap the gRPC CallInvoker with our interceptor when using the gRPC worker
34+
builder.Services
35+
.AddOptions<GrpcDurableTaskWorkerOptions>(builder.Name)
36+
.PostConfigure<IPayloadStore, IOptionsMonitor<LargePayloadStorageOptions>>((opt, store, monitor) =>
37+
{
38+
LargePayloadStorageOptions opts = monitor.Get(builder.Name);
39+
if (opt.Channel is not null)
40+
{
41+
var invoker = opt.Channel.Intercept(new Grpc.Internal.AzureBlobPayloadsInterceptor(store, opts));
42+
opt.CallInvoker = invoker;
43+
}
44+
else if (opt.CallInvoker is not null)
45+
{
46+
opt.CallInvoker = opt.CallInvoker.Intercept(new Grpc.Internal.AzureBlobPayloadsInterceptor(store, opts));
47+
}
48+
else if (!string.IsNullOrEmpty(opt.Address))
49+
{
50+
// Channel will be built later; worker will build it, intercept when possible through CallInvoker path.
51+
}
52+
});
53+
3354
// builder.Services
3455
// .AddOptions<DurableTaskWorkerOptions>(builder.Name)
3556
// .PostConfigure<IPayloadStore, IOptionsMonitor<LargePayloadStorageOptions>>((opt, store, monitor) =>
Lines changed: 229 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,229 @@
1+
// Copyright (c) Microsoft Corporation.
2+
// Licensed under the MIT License.
3+
4+
using System.Text;
5+
using Grpc.Core;
6+
using Grpc.Core.Interceptors;
7+
using Microsoft.DurableTask.Converters;
8+
using P = Microsoft.DurableTask.Protobuf;
9+
10+
namespace Microsoft.DurableTask.Worker.Grpc.Internal;
11+
12+
/// <summary>
13+
/// gRPC interceptor that externalizes large payloads to an <see cref="IPayloadStore"/> on requests
14+
/// and resolves known payload tokens on responses.
15+
/// </summary>
16+
sealed class AzureBlobPayloadsInterceptor : Interceptor
17+
{
18+
readonly IPayloadStore payloadStore;
19+
readonly LargePayloadStorageOptions options;
20+
21+
public AzureBlobPayloadsInterceptor(IPayloadStore payloadStore, LargePayloadStorageOptions options)
22+
{
23+
this.payloadStore = payloadStore;
24+
this.options = options;
25+
}
26+
27+
// Unary: externalize on request, resolve on response
28+
public override AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(
29+
TRequest request,
30+
ClientInterceptorContext<TRequest, TResponse> context,
31+
AsyncUnaryCallContinuation<TRequest, TResponse> continuation)
32+
{
33+
// Mutate request payloads before sending
34+
this.ExternalizeRequestPayloads(request, context);
35+
36+
AsyncUnaryCall<TResponse> call = continuation(request, context);
37+
38+
// Wrap response task to resolve payloads
39+
async Task<TResponse> ResolveAsync(Task<TResponse> inner)
40+
{
41+
TResponse response = await inner.ConfigureAwait(false);
42+
await this.ResolveResponsePayloadsAsync(response, context.CancellationToken);
43+
return response;
44+
}
45+
46+
return new AsyncUnaryCall<TResponse>(
47+
ResolveAsync(call.ResponseAsync),
48+
call.ResponseHeadersAsync,
49+
call.GetStatus,
50+
call.GetTrailers,
51+
call.Dispose);
52+
}
53+
54+
// Server streaming: resolve payloads in streamed responses (e.g., GetWorkItems)
55+
public override AsyncServerStreamingCall<TResponse> AsyncServerStreamingCall<TRequest, TResponse>(
56+
TRequest request,
57+
ClientInterceptorContext<TRequest, TResponse> context,
58+
AsyncServerStreamingCallContinuation<TRequest, TResponse> continuation)
59+
{
60+
this.ExternalizeRequestPayloads(request, context);
61+
62+
AsyncServerStreamingCall<TResponse> call = continuation(request, context);
63+
64+
IAsyncStreamReader<TResponse> wrapped = new TransformingStreamReader<TResponse>(call.ResponseStream, async (msg, ct) =>
65+
{
66+
await this.ResolveResponsePayloadsAsync(msg, ct).ConfigureAwait(false);
67+
return msg;
68+
});
69+
70+
return new AsyncServerStreamingCall<TResponse>(
71+
wrapped,
72+
call.ResponseHeadersAsync,
73+
call.GetStatus,
74+
call.GetTrailers,
75+
call.Dispose);
76+
}
77+
78+
void ExternalizeRequestPayloads<TRequest>(TRequest request, ClientInterceptorContext<TRequest, object> context)
79+
{
80+
// Client -> sidecar
81+
switch (request)
82+
{
83+
case P.CreateInstanceRequest r:
84+
this.MaybeExternalize(ref r.Input);
85+
break;
86+
case P.RaiseEventRequest r:
87+
this.MaybeExternalize(ref r.Input);
88+
break;
89+
case P.TerminateRequest r:
90+
this.MaybeExternalize(ref r.Output);
91+
break;
92+
case P.ActivityResponse r:
93+
this.MaybeExternalize(ref r.Result);
94+
break;
95+
case P.OrchestratorResponse r:
96+
this.MaybeExternalize(ref r.CustomStatus);
97+
foreach (P.OrchestratorAction a in r.Actions)
98+
{
99+
if (a.CompleteOrchestration is { } complete)
100+
{
101+
this.MaybeExternalize(ref complete.Result);
102+
}
103+
}
104+
break;
105+
}
106+
}
107+
108+
async Task ResolveResponsePayloadsAsync<TResponse>(TResponse response, CancellationToken cancellation)
109+
{
110+
// Sidecar -> client/worker
111+
switch (response)
112+
{
113+
case P.GetInstanceResponse r when r.OrchestrationState is { } s:
114+
this.MaybeResolve(ref s.Input, cancellation);
115+
this.MaybeResolve(ref s.Output, cancellation);
116+
this.MaybeResolve(ref s.CustomStatus, cancellation);
117+
break;
118+
case P.QueryInstancesResponse r:
119+
foreach (P.OrchestrationState s in r.OrchestrationState)
120+
{
121+
this.MaybeResolve(ref s.Input, cancellation);
122+
this.MaybeResolve(ref s.Output, cancellation);
123+
this.MaybeResolve(ref s.CustomStatus, cancellation);
124+
}
125+
break;
126+
case P.WorkItem wi:
127+
// Resolve activity input
128+
if (wi.ActivityRequest is { } ar)
129+
{
130+
this.MaybeResolve(ref ar.Input, cancellation);
131+
}
132+
133+
// Resolve orchestration input embedded in ExecutionStarted event and external events
134+
if (wi.OrchestratorRequest is { } or)
135+
{
136+
foreach (var e in or.PastEvents)
137+
{
138+
this.ResolveEventPayloads(e, cancellation);
139+
}
140+
foreach (var e in or.NewEvents)
141+
{
142+
this.ResolveEventPayloads(e, cancellation);
143+
}
144+
}
145+
break;
146+
}
147+
await Task.CompletedTask;
148+
}
149+
150+
void ResolveEventPayloads(P.HistoryEvent e, CancellationToken cancellation)
151+
{
152+
switch (e.EventTypeCase)
153+
{
154+
case P.HistoryEvent.EventTypeOneofCase.ExecutionStarted:
155+
if (e.ExecutionStarted is { } es)
156+
{
157+
this.MaybeResolve(ref es.Input, cancellation);
158+
}
159+
break;
160+
case P.HistoryEvent.EventTypeOneofCase.EventRaised:
161+
if (e.EventRaised is { } er)
162+
{
163+
this.MaybeResolve(ref er.Input, cancellation);
164+
}
165+
break;
166+
}
167+
}
168+
169+
void MaybeExternalize(ref string? value)
170+
{
171+
if (string.IsNullOrEmpty(value))
172+
{
173+
return;
174+
}
175+
176+
int size = Encoding.UTF8.GetByteCount(value);
177+
if (size < this.options.ExternalizeThresholdBytes)
178+
{
179+
return;
180+
}
181+
182+
// Upload synchronously via .GetAwaiter().GetResult() because interceptor API is sync for requests
183+
string token = this.payloadStore.UploadAsync(Encoding.UTF8.GetBytes(value), CancellationToken.None)
184+
.GetAwaiter()
185+
.GetResult();
186+
value = token;
187+
}
188+
189+
void MaybeResolve(ref string? value, CancellationToken cancellation)
190+
{
191+
if (string.IsNullOrEmpty(value) || !this.payloadStore.IsKnownPayloadToken(value))
192+
{
193+
return;
194+
}
195+
196+
string resolved = this.payloadStore.DownloadAsync(value, cancellation)
197+
.GetAwaiter()
198+
.GetResult();
199+
value = resolved;
200+
}
201+
202+
sealed class TransformingStreamReader<T> : IAsyncStreamReader<T>
203+
{
204+
readonly IAsyncStreamReader<T> inner;
205+
readonly Func<T, CancellationToken, ValueTask<T>> transform;
206+
207+
public TransformingStreamReader(IAsyncStreamReader<T> inner, Func<T, CancellationToken, ValueTask<T>> transform)
208+
{
209+
this.inner = inner;
210+
this.transform = transform;
211+
}
212+
213+
public T Current { get; private set; } = default!;
214+
215+
public async Task<bool> MoveNext(CancellationToken cancellationToken)
216+
{
217+
bool hasNext = await this.inner.MoveNext(cancellationToken).ConfigureAwait(false);
218+
if (!hasNext)
219+
{
220+
return false;
221+
}
222+
223+
this.Current = await this.transform(this.inner.Current, cancellationToken).ConfigureAwait(false);
224+
return true;
225+
}
226+
}
227+
}
228+
229+

0 commit comments

Comments
 (0)