Skip to content

Commit 954b4a7

Browse files
committed
Update to use interface for DI
Signed-off-by: Hal Spang <halspang@microsoft.com>
1 parent b3fadcf commit 954b4a7

File tree

6 files changed

+90
-46
lines changed

6 files changed

+90
-46
lines changed

src/Worker/Core/DependencyInjection/DurableTaskWorkerBuilderExtensions.cs

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -114,16 +114,25 @@ public static IDurableTaskWorkerBuilder UseVersioning(this IDurableTaskWorkerBui
114114
/// Adds an orchestration filter to the specified <see cref="IDurableTaskWorkerBuilder"/>.
115115
/// </summary>
116116
/// <param name="builder">The builder to set the builder target for.</param>
117-
/// <param name="orchestrationFilter">The filter function that determines whether an orchestration should be processed.</param>
117+
/// <typeparam name="TOrchestrationFilter">The implementation of a <see cref="IOrchestrationFilter"/> that will be bound.</typeparam>
118118
/// <returns>The same <see cref="IDurableTaskWorkerBuilder"/> instance, allowing for method chaining.</returns>
119-
public static IDurableTaskWorkerBuilder UseOrchestrationFilter(this IDurableTaskWorkerBuilder builder, Func<OrchestrationInfo, bool> orchestrationFilter)
119+
public static IDurableTaskWorkerBuilder UseOrchestrationFilter<TOrchestrationFilter>(this IDurableTaskWorkerBuilder builder) where TOrchestrationFilter : class, IOrchestrationFilter
120120
{
121121
Check.NotNull(builder);
122-
Check.NotNull(orchestrationFilter);
123-
builder.Services.Configure<DurableTaskWorkerOptions>(builder.Name, options =>
124-
{
125-
options.OrchestrationFilter = orchestrationFilter;
126-
});
122+
builder.Services.AddSingleton<IOrchestrationFilter, TOrchestrationFilter>();
123+
return builder;
124+
}
125+
126+
/// <summary>
127+
/// Adds an orchestration filter to the specified <see cref="IDurableTaskWorkerBuilder"/>.
128+
/// </summary>
129+
/// <param name="builder">The builder to set the builder target for.</param>
130+
/// <param name="filter">The instance of an <see cref="IOrchestrationFilter"/> to use.</param>
131+
/// <returns>The same <see cref="IDurableTaskWorkerBuilder"/> instance, allowing for method chaining.</returns>
132+
public static IDurableTaskWorkerBuilder UseOrchestrationFilter(this IDurableTaskWorkerBuilder builder, IOrchestrationFilter filter)
133+
{
134+
Check.NotNull(builder);
135+
builder.Services.AddSingleton(filter);
127136
return builder;
128137
}
129138
}

src/Worker/Core/DurableTaskWorkerOptions.cs

Lines changed: 5 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -49,22 +49,6 @@ public enum VersionFailureStrategy
4949
Fail = 1,
5050
}
5151

52-
/// <summary>
53-
/// Struct representation of orchestration information.
54-
/// </summary>
55-
public struct OrchestrationInfo
56-
{
57-
/// <summary>
58-
/// Gets the name of the orchestration.
59-
/// </summary>
60-
public string Name { get; init; }
61-
62-
/// <summary>
63-
/// Gets the tags associated with the orchestration.
64-
/// </summary>
65-
public Dictionary<string, string> Tags { get; init; }
66-
}
67-
6852
/// <summary>
6953
/// Gets or sets the data converter. Default value is <see cref="JsonDataConverter.Default" />.
7054
/// </summary>
@@ -161,6 +145,11 @@ public DataConverter DataConverter
161145
/// </summary>
162146
public bool IsVersioningSet { get; internal set; }
163147

148+
/// <summary>
149+
/// Gets or sets a callback function that determines whether an orchestration should be accepted for work.
150+
/// </summary>
151+
public IOrchestrationFilter? OrchestrationFilter { get; set; }
152+
164153
/// <summary>
165154
/// Gets a value indicating whether <see cref="DataConverter" /> was explicitly set or not.
166155
/// </summary>
@@ -172,10 +161,6 @@ public DataConverter DataConverter
172161
/// </remarks>
173162
internal bool DataConverterExplicitlySet { get; private set; }
174163

175-
/// <summary>
176-
/// Gets or sets a callback function that determines whether an orchestration should be accepted for work.
177-
/// </summary>
178-
public Func<OrchestrationInfo, bool>? OrchestrationFilter { get; set; }
179164

180165
/// <summary>
181166
/// Applies these option values to another.
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
// Copyright (c) Microsoft Corporation.
2+
// Licensed under the MIT License.
3+
4+
namespace Microsoft.DurableTask.Worker;
5+
6+
/// <summary>
7+
/// Defines a filter for validating orchestrations.
8+
/// </summary>
9+
public interface IOrchestrationFilter
10+
{
11+
/// <summary>
12+
/// Validate the orchestration against the filter represented by this interface.
13+
/// </summary>
14+
/// <param name="info">The information on the orchestration to validate.</param>
15+
/// <param name="cancellationToken">The cancellation token for the request to timeout.</param>
16+
/// <returns><code>true</code> if the orchestration is valid <code>false</code> otherwise.</returns>
17+
Task<bool> IsOrchestrationValidAsync(OrchestrationInfo info, CancellationToken cancellationToken = default);
18+
}
19+
20+
/// <summary>
21+
/// Struct representation of orchestration information.
22+
/// </summary>
23+
public struct OrchestrationInfo
24+
{
25+
/// <summary>
26+
/// Gets the name of the orchestration.
27+
/// </summary>
28+
public string Name { get; init; }
29+
30+
/// <summary>
31+
/// Gets the tags associated with the orchestration.
32+
/// </summary>
33+
public Dictionary<string, string> Tags { get; init; }
34+
}

src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
using Microsoft.Extensions.DependencyInjection;
1414
using Microsoft.Extensions.Logging;
1515
using static Microsoft.DurableTask.Protobuf.TaskHubSidecarService;
16-
using static Microsoft.DurableTask.Worker.DurableTaskWorkerOptions;
1716
using DTCore = DurableTask.Core;
1817
using P = Microsoft.DurableTask.Protobuf;
1918

@@ -32,13 +31,15 @@ class Processor
3231
readonly TaskHubSidecarServiceClient client;
3332
readonly DurableTaskShimFactory shimFactory;
3433
readonly GrpcDurableTaskWorkerOptions.InternalOptions internalOptions;
34+
readonly IOrchestrationFilter? orchestrationFilter = null;
3535

36-
public Processor(GrpcDurableTaskWorker worker, TaskHubSidecarServiceClient client)
36+
public Processor(GrpcDurableTaskWorker worker, TaskHubSidecarServiceClient client, IOrchestrationFilter? orchestrationFilter = null)
3737
{
3838
this.worker = worker;
3939
this.client = client;
4040
this.shimFactory = new DurableTaskShimFactory(this.worker.grpcOptions, this.worker.loggerFactory);
4141
this.internalOptions = this.worker.grpcOptions.Internal;
42+
this.orchestrationFilter = orchestrationFilter;
4243
}
4344

4445
ILogger Logger => this.worker.logger;
@@ -376,11 +377,17 @@ async Task OnRunOrchestratorAsync(
376377
entityConversionState,
377378
cancellationToken);
378379

379-
bool filterPassed = this.worker.workerOptions.OrchestrationFilter?.Invoke(new OrchestrationInfo
380+
bool filterPassed = true;
381+
if (this.orchestrationFilter != null)
380382
{
381-
Name = runtimeState.Name,
382-
Tags = new(runtimeState.Tags ?? ImmutableDictionary<string, string>.Empty),
383-
}) ?? true;
383+
filterPassed = await this.orchestrationFilter.IsOrchestrationValidAsync(
384+
new OrchestrationInfo
385+
{
386+
Name = runtimeState.Name,
387+
Tags = new Dictionary<string, string>(runtimeState.Tags ?? ImmutableDictionary<string, string>.Empty),
388+
},
389+
cancellationToken);
390+
}
384391

385392
if (!filterPassed)
386393
{

src/Worker/Grpc/GrpcDurableTaskWorker.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// Licensed under the MIT License.
33

44
using Microsoft.DurableTask.Worker.Hosting;
5+
using Microsoft.Extensions.DependencyInjection;
56
using Microsoft.Extensions.Logging;
67
using Microsoft.Extensions.Options;
78

@@ -48,7 +49,8 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
4849
{
4950
await using AsyncDisposable disposable = this.GetCallInvoker(out CallInvoker callInvoker, out string address);
5051
this.logger.StartingTaskHubWorker(address);
51-
await new Processor(this, new(callInvoker)).ExecuteAsync(stoppingToken);
52+
IOrchestrationFilter? filter = this.services.GetService<IOrchestrationFilter>();
53+
await new Processor(this, new(callInvoker), filter).ExecuteAsync(stoppingToken);
5254
}
5355

5456
#if NET6_0_OR_GREATER

test/Grpc.IntegrationTests/OrchestrationPatterns.cs

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -984,14 +984,11 @@ public async Task FilterOrchestrationsByName()
984984
{
985985
// Setup a worker with an Orchestration Filter.
986986
TaskName orchestratorName = nameof(EmptyOrchestration);
987-
ISet<string> orchestratorNames = new HashSet<string>();
987+
var orchestrationFilter = new OrchestrationFilter();
988988
await using HostTestLifetime server = await this.StartWorkerAsync(b =>
989989
{
990990
b.AddTasks(tasks => tasks.AddOrchestratorFunc(orchestratorName, ctx => Task.FromResult<object?>(null)));
991-
b.UseOrchestrationFilter((info) =>
992-
{
993-
return !orchestratorNames.Contains(info.Name);
994-
});
991+
b.UseOrchestrationFilter(orchestrationFilter);
995992
});
996993

997994
// Nothing in the filter set, the orchestration should complete.
@@ -1004,7 +1001,7 @@ public async Task FilterOrchestrationsByName()
10041001
Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus);
10051002

10061003
// Update the filter and re-enqueue. We should see the orchestration denied.
1007-
orchestratorNames.Add(orchestratorName);
1004+
orchestrationFilter.NameDenySet.Add(orchestratorName);
10081005

10091006
// This should throw as the work is denied.
10101007
instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName);
@@ -1020,14 +1017,11 @@ public async Task FilterOrchestrationsByTag()
10201017
{
10211018
{ "test", "true" }
10221019
};
1023-
IDictionary<string, string> orchestratorTagFilter = new Dictionary<string, string>();
1020+
var orchestrationFilter = new OrchestrationFilter();
10241021
await using HostTestLifetime server = await this.StartWorkerAsync(b =>
10251022
{
10261023
b.AddTasks(tasks => tasks.AddOrchestratorFunc(orchestratorName, ctx => Task.FromResult<object?>(null)));
1027-
b.UseOrchestrationFilter((info) =>
1028-
{
1029-
return !orchestratorTagFilter.Any(kvp => info.Tags.ContainsKey(kvp.Key) && info.Tags[kvp.Key] == kvp.Value);
1030-
});
1024+
b.UseOrchestrationFilter(orchestrationFilter);
10311025
});
10321026

10331027
// Nothing in the filter set, the orchestration should complete.
@@ -1043,7 +1037,7 @@ public async Task FilterOrchestrationsByTag()
10431037
Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus);
10441038

10451039
// Update the filter and re-enqueue. We should see the orchestration denied.
1046-
orchestratorTagFilter.Add("test", "true");
1040+
orchestrationFilter.TagDenyDict.Add("test", "true");
10471041

10481042
// This should throw as the work is denied.
10491043
instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName, new StartOrchestrationOptions
@@ -1053,6 +1047,19 @@ public async Task FilterOrchestrationsByTag()
10531047
await Assert.ThrowsAsync<OperationCanceledException>(async () => await server.Client.WaitForInstanceCompletionAsync(instanceId, new CancellationTokenSource(TimeSpan.FromSeconds(5)).Token));
10541048
}
10551049

1050+
class OrchestrationFilter : IOrchestrationFilter
1051+
{
1052+
public ISet<string> NameDenySet { get; set; } = new HashSet<string>();
1053+
public IDictionary<string, string> TagDenyDict = new Dictionary<string, string>();
1054+
1055+
public Task<bool> IsOrchestrationValidAsync(OrchestrationInfo info, CancellationToken cancellationToken = default)
1056+
{
1057+
return Task.FromResult(
1058+
!this.NameDenySet.Contains(info.Name)
1059+
&& !this.TagDenyDict.Any(kvp => info.Tags.ContainsKey(kvp.Key) && info.Tags[kvp.Key] == kvp.Value));
1060+
}
1061+
}
1062+
10561063
// TODO: Test for multiple external events with the same name
10571064
// TODO: Test for ContinueAsNew with external events that carry over
10581065
// TODO: Test for catching activity exceptions of specific types

0 commit comments

Comments
 (0)