Skip to content

Commit 05c1287

Browse files
committed
add interceptor v2
1 parent 2d05b30 commit 05c1287

File tree

5 files changed

+333
-108
lines changed

5 files changed

+333
-108
lines changed

src/Extensions/AzureBlobPayloads/AzureBlobPayloads.csproj

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,17 +12,19 @@
1212
<PackageReference Include="Azure.Storage.Blobs" />
1313
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" />
1414
<PackageReference Include="Microsoft.Extensions.Options" />
15+
<PackageReference Include="Grpc.Net.Client" />
1516
</ItemGroup>
1617

1718
<ItemGroup>
1819
<ProjectReference Include="..\..\Abstractions\Abstractions.csproj" />
1920
<ProjectReference Include="..\..\Client\Core\Client.csproj" />
2021
<ProjectReference Include="..\..\Worker\Core\Worker.csproj" />
22+
<ProjectReference Include="..\..\Client\Grpc\Client.Grpc.csproj" />
23+
<ProjectReference Include="..\..\Worker\Grpc\Worker.Grpc.csproj" />
2124
</ItemGroup>
2225

2326
<ItemGroup>
2427
<SharedSection Include="Core" />
2528
</ItemGroup>
2629

27-
</Project>
28-
30+
</Project>
Lines changed: 16 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
// Copyright (c) Microsoft Corporation.
22
// Licensed under the MIT License.
33

4+
using Grpc.Core.Interceptors;
45
using Microsoft.DurableTask.Client;
6+
using Microsoft.DurableTask.Client.Grpc;
57
using Microsoft.DurableTask.Converters;
8+
using Microsoft.DurableTask.Worker.Grpc.Internal;
69
using Microsoft.Extensions.DependencyInjection;
7-
using Microsoft.Extensions.DependencyInjection.Extensions;
810
using Microsoft.Extensions.Options;
911

10-
namespace Microsoft.DurableTask.Client;
12+
namespace Microsoft.DurableTask;
1113

1214
/// <summary>
1315
/// Extension methods to enable externalized payloads using Azure Blob Storage for Durable Task Client.
@@ -17,6 +19,10 @@ public static class DurableTaskClientBuilderExtensionsAzureBlobPayloads
1719
/// <summary>
1820
/// Enables externalized payload storage using Azure Blob Storage for the specified client builder.
1921
/// </summary>
22+
/// <param name="builder">The builder to configure.</param>
23+
/// <param name="configure">The callback to configure the storage options.</param>
24+
/// <returns>The original builder, for call chaining.</returns>
25+
/// <returns></returns>
2026
public static IDurableTaskClientBuilder UseExternalizedPayloads(
2127
this IDurableTaskClientBuilder builder,
2228
Action<LargePayloadStorageOptions> configure)
@@ -39,30 +45,23 @@ public static IDurableTaskClientBuilder UseExternalizedPayloads(
3945
LargePayloadStorageOptions opts = monitor.Get(builder.Name);
4046
if (opt.Channel is not null)
4147
{
42-
var invoker = opt.Channel.Intercept(new Worker.Grpc.Internal.AzureBlobPayloadsInterceptor(store, opts));
48+
Grpc.Core.CallInvoker invoker = opt.Channel.Intercept(new AzureBlobPayloadsInterceptor(store, opts));
4349
opt.CallInvoker = invoker;
50+
51+
// Ensure client uses the intercepted invoker path
52+
opt.Channel = null;
4453
}
4554
else if (opt.CallInvoker is not null)
4655
{
47-
opt.CallInvoker = opt.CallInvoker.Intercept(new Worker.Grpc.Internal.AzureBlobPayloadsInterceptor(store, opts));
56+
opt.CallInvoker = opt.CallInvoker.Intercept(new AzureBlobPayloadsInterceptor(store, opts));
4857
}
49-
else if (!string.IsNullOrEmpty(opt.Address))
58+
else
5059
{
51-
// Channel will be built later; we can't intercept here. This will be handled in the client if CallInvoker is null.
60+
throw new ArgumentException(
61+
"Channel or CallInvoker must be provided to use Azure Blob Payload Externalization feature");
5262
}
5363
});
5464

55-
// builder.Services
56-
// .AddOptions<DurableTaskClientOptions>(builder.Name)
57-
// .PostConfigure<IPayloadStore, IOptionsMonitor<LargePayloadStorageOptions>>((opt, store, monitor) =>
58-
// {
59-
// LargePayloadStorageOptions opts = monitor.Get(builder.Name);
60-
// DataConverter inner = opt.DataConverter ?? Converters.JsonDataConverter.Default;
61-
// opt.DataConverter = new LargePayloadDataConverter(inner, store, opts);
62-
// });
63-
6465
return builder;
6566
}
6667
}
67-
68-

src/Extensions/AzureBlobPayloads/DependencyInjection/DurableTaskWorkerBuilderExtensions.AzureBlobPayloads.cs

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,11 @@
55
using Microsoft.DurableTask.Worker;
66
using Microsoft.Extensions.DependencyInjection;
77
using Microsoft.Extensions.Options;
8+
using Microsoft.DurableTask.Worker.Grpc;
9+
using Grpc.Net.Client;
10+
using Grpc.Core.Interceptors;
811

9-
namespace Microsoft.DurableTask.Worker;
12+
namespace Microsoft.DurableTask;
1013

1114
/// <summary>
1215
/// Extension methods to enable externalized payloads using Azure Blob Storage for Durable Task Worker.
@@ -16,6 +19,9 @@ public static class DurableTaskWorkerBuilderExtensionsAzureBlobPayloads
1619
/// <summary>
1720
/// Enables externalized payload storage using Azure Blob Storage for the specified worker builder.
1821
/// </summary>
22+
/// <param name="builder">The builder to configure.</param>
23+
/// <param name="configure">The callback to configure the storage options.</param>
24+
/// <returns>The original builder, for call chaining.</returns>
1925
public static IDurableTaskWorkerBuilder UseExternalizedPayloads(
2026
this IDurableTaskWorkerBuilder builder,
2127
Action<LargePayloadStorageOptions> configure)
@@ -38,28 +44,23 @@ public static IDurableTaskWorkerBuilder UseExternalizedPayloads(
3844
LargePayloadStorageOptions opts = monitor.Get(builder.Name);
3945
if (opt.Channel is not null)
4046
{
41-
var invoker = opt.Channel.Intercept(new Grpc.Internal.AzureBlobPayloadsInterceptor(store, opts));
47+
var invoker = opt.Channel.Intercept(new AzureBlobPayloadsInterceptor(store, opts));
4248
opt.CallInvoker = invoker;
49+
// Ensure worker uses the intercepted invoker path
50+
opt.Channel = null;
4351
}
4452
else if (opt.CallInvoker is not null)
4553
{
46-
opt.CallInvoker = opt.CallInvoker.Intercept(new Grpc.Internal.AzureBlobPayloadsInterceptor(store, opts));
54+
opt.CallInvoker = opt.CallInvoker.Intercept(new AzureBlobPayloadsInterceptor(store, opts));
4755
}
48-
else if (!string.IsNullOrEmpty(opt.Address))
56+
else
4957
{
50-
// Channel will be built later; worker will build it, intercept when possible through CallInvoker path.
58+
throw new ArgumentException(
59+
"Channel or CallInvoker must be provided to use Azure Blob Payload Externalization feature"
60+
);
5161
}
5262
});
5363

54-
// builder.Services
55-
// .AddOptions<DurableTaskWorkerOptions>(builder.Name)
56-
// .PostConfigure<IPayloadStore, IOptionsMonitor<LargePayloadStorageOptions>>((opt, store, monitor) =>
57-
// {
58-
// LargePayloadStorageOptions opts = monitor.Get(builder.Name);
59-
// DataConverter inner = opt.DataConverter ?? Converters.JsonDataConverter.Default;
60-
// opt.DataConverter = new LargePayloadDataConverter(inner, store, opts);
61-
// });
62-
6364
return builder;
6465
}
6566
}

0 commit comments

Comments
 (0)