Skip to content

Commit 86f5b72

Browse files
committed
Introduce WorkItemFilters into worker flow
This change adds WorkItemFilters into the grpc worker. This includes builder methods to specify them and the connection into the GetWorkItems flow inside the worker processor. Signed-off-by: Hal Spang <halspang@microsoft.com>
1 parent da748a4 commit 86f5b72

File tree

7 files changed

+423
-1
lines changed

7 files changed

+423
-1
lines changed

src/Grpc/orchestrator_service.proto

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -822,6 +822,7 @@ message GetWorkItemsRequest {
822822
int32 maxConcurrentEntityWorkItems = 3;
823823

824824
repeated WorkerCapability capabilities = 10;
825+
WorkItemFilters workItemFilters = 11;
825826
}
826827

827828
enum WorkerCapability {
@@ -844,6 +845,26 @@ enum WorkerCapability {
844845
WORKER_CAPABILITY_LARGE_PAYLOADS = 3;
845846
}
846847

848+
message WorkItemFilters {
849+
repeated OrchestrationFilter orchestrations = 1;
850+
repeated ActivityFilter activities = 2;
851+
repeated EntityFilter entities = 3;
852+
}
853+
854+
message OrchestrationFilter {
855+
string name = 1;
856+
repeated string versions = 2;
857+
}
858+
859+
message ActivityFilter {
860+
string name = 1;
861+
repeated string versions = 2;
862+
}
863+
864+
message EntityFilter {
865+
string name = 1;
866+
}
867+
847868
message WorkItem {
848869
oneof request {
849870
OrchestratorRequest orchestratorRequest = 1;

src/Worker/Core/DependencyInjection/DurableTaskWorkerBuilderExtensions.cs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,4 +137,32 @@ public static IDurableTaskWorkerBuilder UseOrchestrationFilter(this IDurableTask
137137
builder.Services.AddSingleton(filter);
138138
return builder;
139139
}
140+
141+
/// <summary>
142+
/// Adds <see cref="DurableTaskWorkerWorkItemFilters"/> to the specified <see cref="IDurableTaskWorkerBuilder"/>.
143+
/// </summary>
144+
/// <param name="builder">The builder to set the builder target for.</param>
145+
/// <param name="workItemFilters">The instance of a <see cref="DurableTaskWorkerWorkItemFilters"/> to use.</param>
146+
/// <returns>The same <see cref="IDurableTaskWorkerBuilder"/> instance, allowing for method chaining.</returns>
147+
/// <remarks>If this is called without specified filters, the filters will be constructed from the registered orchestrations, activities, and entities.</remarks>
148+
public static IDurableTaskWorkerBuilder UseWorkItemFilters(this IDurableTaskWorkerBuilder builder, DurableTaskWorkerWorkItemFilters? workItemFilters = null)
149+
{
150+
Check.NotNull(builder);
151+
if (workItemFilters != null)
152+
{
153+
builder.Services.AddSingleton(workItemFilters);
154+
}
155+
else
156+
{
157+
// Auto-generate the filters from registered orchestrations, activities, and entitites.
158+
builder.Services.AddSingleton(provider =>
159+
{
160+
DurableTaskRegistry registry = provider.GetRequiredService<IOptionsMonitor<DurableTaskRegistry>>().Get(builder.Name);
161+
DurableTaskWorkerOptions? options = provider.GetOptions<DurableTaskWorkerOptions>(builder.Name);
162+
return new DurableTaskWorkerWorkItemFilters(registry, options);
163+
});
164+
}
165+
166+
return builder;
167+
}
140168
}
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
// Copyright (c) Microsoft Corporation.
2+
// Licensed under the MIT License.
3+
4+
namespace Microsoft.DurableTask.Worker;
5+
6+
/// <summary>
7+
/// A class that represents work item filters for a Durable Task Worker. These filters are passed to the backend
8+
/// and only work items matching the filters will be processed by the worker. If no filters are provided,
9+
/// the worker will process all work items.
10+
/// </summary>
11+
public class DurableTaskWorkerWorkItemFilters
12+
{
13+
/// <summary>
14+
/// Initializes a new instance of the <see cref="DurableTaskWorkerWorkItemFilters"/> class.
15+
/// </summary>
16+
public DurableTaskWorkerWorkItemFilters()
17+
{
18+
this.Orchestrations = [];
19+
this.Activities = [];
20+
this.Entities = [];
21+
}
22+
23+
/// <summary>
24+
/// Initializes a new instance of the <see cref="DurableTaskWorkerWorkItemFilters"/> class.
25+
/// </summary>
26+
/// <param name="registry"><see cref="DurableTaskRegistry"/> to construct the filter from.</param>
27+
/// <param name="workerOptions"><see cref="DurableTaskWorkerOptions"/> that optionally provides versioning information.</param>
28+
internal DurableTaskWorkerWorkItemFilters(DurableTaskRegistry registry, DurableTaskWorkerOptions? workerOptions)
29+
{
30+
List<OrchestrationFilter> orchestrationActions = new();
31+
foreach (var orchestration in registry.Orchestrators)
32+
{
33+
orchestrationActions.Add(new OrchestrationFilter
34+
{
35+
Name = orchestration.Key,
36+
37+
// TODO: Support multiple orchestration versions, for now, utilize the Worker's version.
38+
Versions = workerOptions?.Versioning != null ? [workerOptions.Versioning.DefaultVersion] : [],
39+
});
40+
}
41+
42+
this.Orchestrations = orchestrationActions;
43+
List<ActivityFilter> activityActions = new();
44+
foreach (var activity in registry.Activities)
45+
{
46+
activityActions.Add(new ActivityFilter
47+
{
48+
Name = activity.Key,
49+
50+
// TODO: Support multiple activity versions, for now, utilize the Worker's version.
51+
Versions = workerOptions?.Versioning != null ? [workerOptions.Versioning.DefaultVersion] : [],
52+
});
53+
}
54+
55+
this.Activities = activityActions;
56+
List<EntityFilter> entityActions = new();
57+
foreach (var entity in registry.Entities)
58+
{
59+
entityActions.Add(new EntityFilter
60+
{
61+
// Entity names are normalized to lowercase in the backend.
62+
Name = entity.Key.ToString().ToLowerInvariant(),
63+
});
64+
}
65+
66+
this.Entities = entityActions;
67+
}
68+
69+
/// <summary>
70+
/// Gets or initializes the orchestration filters.
71+
/// </summary>
72+
public IReadOnlyList<OrchestrationFilter> Orchestrations { get; init; }
73+
74+
/// <summary>
75+
/// Gets or initializes the activity filters.
76+
/// </summary>
77+
public IReadOnlyList<ActivityFilter> Activities { get; init; }
78+
79+
/// <summary>
80+
/// Gets or initializes the entity filters.
81+
/// </summary>
82+
public IReadOnlyList<EntityFilter> Entities { get; init; }
83+
84+
/// <summary>
85+
/// Struct specifying an orchestration filter.
86+
/// </summary>
87+
public struct OrchestrationFilter
88+
{
89+
/// <summary>
90+
/// Gets or initializes the name of the orchestration to filter.
91+
/// </summary>
92+
public string Name { get; init; }
93+
94+
/// <summary>
95+
/// Gets or initializes the versions of the orchestration to filter.
96+
/// </summary>
97+
public List<string> Versions { get; init; }
98+
}
99+
100+
/// <summary>
101+
/// Struct specifying an activity filter.
102+
/// </summary>
103+
public struct ActivityFilter
104+
{
105+
/// <summary>
106+
/// Gets or initializes the name of the activity to filter.
107+
/// </summary>
108+
public string Name { get; init; }
109+
110+
/// <summary>
111+
/// Gets or initializes the versions of the activity to filter.
112+
/// </summary>
113+
public List<string> Versions { get; init; }
114+
}
115+
116+
/// <summary>
117+
/// Struct specifying an entity filter.
118+
/// </summary>
119+
public struct EntityFilter
120+
{
121+
/// <summary>
122+
/// Gets or initializes the name of the entity to filter.
123+
/// </summary>
124+
public string Name { get; init; }
125+
}
126+
}

src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
using Microsoft.DurableTask.Abstractions;
1111
using Microsoft.DurableTask.Entities;
1212
using Microsoft.DurableTask.Tracing;
13+
using Microsoft.DurableTask.Worker.Grpc.Internal;
1314
using Microsoft.DurableTask.Worker.Shims;
1415
using Microsoft.Extensions.DependencyInjection;
1516
using Microsoft.Extensions.Logging;
@@ -255,6 +256,7 @@ async ValueTask<OrchestrationRuntimeState> BuildRuntimeStateAsync(
255256
MaxConcurrentEntityWorkItems =
256257
workerOptions.Concurrency.MaximumConcurrentEntityWorkItems,
257258
Capabilities = { this.worker.grpcOptions.Capabilities },
259+
WorkItemFilters = this.worker?.workItemFilters?.ToGrpcWorkItemFilters(),
258260
},
259261
cancellationToken: cancellation);
260262
}

src/Worker/Grpc/GrpcDurableTaskWorker.cs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ sealed partial class GrpcDurableTaskWorker : DurableTaskWorker
1818
readonly ILoggerFactory loggerFactory;
1919
readonly ILogger logger;
2020
readonly IOrchestrationFilter? orchestrationFilter;
21+
readonly DurableTaskWorkerWorkItemFilters? workItemFilters;
2122

2223
/// <summary>
2324
/// Initializes a new instance of the <see cref="GrpcDurableTaskWorker" /> class.
@@ -30,6 +31,7 @@ sealed partial class GrpcDurableTaskWorker : DurableTaskWorker
3031
/// <param name="loggerFactory">The logger.</param>
3132
/// <param name="orchestrationFilter">The optional <see cref="IOrchestrationFilter"/> used to filter orchestration execution.</param>
3233
/// <param name="exceptionPropertiesProvider">The custom exception properties provider that help build failure details.</param>
34+
/// <param name="workItemFilters">The optional <see cref="DurableTaskWorkerWorkItemFilters"/> used to filter work items in the backend.</param>
3335
public GrpcDurableTaskWorker(
3436
string name,
3537
IDurableTaskFactory factory,
@@ -38,7 +40,8 @@ public GrpcDurableTaskWorker(
3840
IServiceProvider services,
3941
ILoggerFactory loggerFactory,
4042
IOrchestrationFilter? orchestrationFilter = null,
41-
IExceptionPropertiesProvider? exceptionPropertiesProvider = null)
43+
IExceptionPropertiesProvider? exceptionPropertiesProvider = null,
44+
DurableTaskWorkerWorkItemFilters? workItemFilters = null)
4245
: base(name, factory)
4346
{
4447
this.grpcOptions = Check.NotNull(grpcOptions).Get(name);
@@ -48,6 +51,7 @@ public GrpcDurableTaskWorker(
4851
this.logger = CreateLogger(loggerFactory, this.workerOptions);
4952
this.orchestrationFilter = orchestrationFilter;
5053
this.ExceptionPropertiesProvider = exceptionPropertiesProvider;
54+
this.workItemFilters = workItemFilters;
5155
}
5256

5357
/// <inheritdoc />
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
// Copyright (c) Microsoft Corporation.
2+
// Licensed under the MIT License.
3+
4+
using P = Microsoft.DurableTask.Protobuf;
5+
6+
namespace Microsoft.DurableTask.Worker.Grpc.Internal;
7+
8+
/// <summary>
9+
/// Extension for <see cref="DurableTaskWorkerWorkItemFilters"/> to convert to gRPC types.
10+
/// </summary>
11+
public static class DurableTaskWorkerWorkItemFiltersExtensions
12+
{
13+
/// <summary>
14+
/// Converts a <see cref="DurableTaskWorkerWorkItemFilters"/> to a gRPC <see cref="P.WorkItemFilters"/>.
15+
/// </summary>
16+
/// <param name="workItemFilter">The <see cref="DurableTaskWorkerWorkItemFilters"/> to convert.</param>
17+
/// <returns>A gRPC <see cref="P.WorkItemFilters"/>.</returns>
18+
public static P.WorkItemFilters ToGrpcWorkItemFilters(this DurableTaskWorkerWorkItemFilters workItemFilter)
19+
{
20+
var grpcWorkItemFilters = new P.WorkItemFilters();
21+
foreach (var orchestrationFilter in workItemFilter.Orchestrations)
22+
{
23+
var grpcOrchestrationFilter = new P.OrchestrationFilter
24+
{
25+
Name = orchestrationFilter.Name,
26+
};
27+
grpcOrchestrationFilter.Versions.AddRange(orchestrationFilter.Versions);
28+
grpcWorkItemFilters.Orchestrations.Add(grpcOrchestrationFilter);
29+
}
30+
31+
foreach (var activityFilter in workItemFilter.Activities)
32+
{
33+
var grpcActivityAction = new P.ActivityFilter
34+
{
35+
Name = activityFilter.Name,
36+
};
37+
grpcActivityAction.Versions.AddRange(activityFilter.Versions);
38+
grpcWorkItemFilters.Activities.Add(grpcActivityAction);
39+
}
40+
41+
foreach (var entityFilter in workItemFilter.Entities)
42+
{
43+
var grpcEntityAction = new P.EntityFilter
44+
{
45+
Name = entityFilter.Name,
46+
};
47+
grpcWorkItemFilters.Entities.Add(grpcEntityAction);
48+
}
49+
50+
return grpcWorkItemFilters;
51+
}
52+
}

0 commit comments

Comments
 (0)