Skip to content

Commit 4197058

Browse files
authored
Add ability to filter orchestrations at worker (#443)
Add ability to filter orchestrations at worker This change allows a user to implement an interface that allows for programmatic orchestration filtering. As it is an interface, it can be plugged into DI. Returning true allows the orchestration to proceed, false will block the orchestration. In this initial commit, the filter can work off of the orchestration name and the orchestration tags. Signed-off-by: Hal Spang <[email protected]>
1 parent 49a0ffa commit 4197058

File tree

8 files changed

+288
-7
lines changed

8 files changed

+288
-7
lines changed

src/Worker/Core/DependencyInjection/DurableTaskWorkerBuilderExtensions.cs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,4 +109,32 @@ public static IDurableTaskWorkerBuilder UseVersioning(this IDurableTaskWorkerBui
109109
});
110110
return builder;
111111
}
112+
113+
/// <summary>
114+
/// Adds an orchestration filter to the specified <see cref="IDurableTaskWorkerBuilder"/>.
115+
/// </summary>
116+
/// <param name="builder">The builder to set the builder target for.</param>
117+
/// <typeparam name="TOrchestrationFilter">The implementation of a <see cref="IOrchestrationFilter"/> that will be bound.</typeparam>
118+
/// <returns>The same <see cref="IDurableTaskWorkerBuilder"/> instance, allowing for method chaining.</returns>
119+
[Obsolete("Experimental")]
120+
public static IDurableTaskWorkerBuilder UseOrchestrationFilter<TOrchestrationFilter>(this IDurableTaskWorkerBuilder builder) where TOrchestrationFilter : class, IOrchestrationFilter
121+
{
122+
Check.NotNull(builder);
123+
builder.Services.AddSingleton<IOrchestrationFilter, TOrchestrationFilter>();
124+
return builder;
125+
}
126+
127+
/// <summary>
128+
/// Adds an orchestration filter to the specified <see cref="IDurableTaskWorkerBuilder"/>.
129+
/// </summary>
130+
/// <param name="builder">The builder to set the builder target for.</param>
131+
/// <param name="filter">The instance of an <see cref="IOrchestrationFilter"/> to use.</param>
132+
/// <returns>The same <see cref="IDurableTaskWorkerBuilder"/> instance, allowing for method chaining.</returns>
133+
[Obsolete("Experimental")]
134+
public static IDurableTaskWorkerBuilder UseOrchestrationFilter(this IDurableTaskWorkerBuilder builder, IOrchestrationFilter filter)
135+
{
136+
Check.NotNull(builder);
137+
builder.Services.AddSingleton(filter);
138+
return builder;
139+
}
112140
}

src/Worker/Core/DurableTaskWorkerOptions.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,12 @@ public DataConverter DataConverter
145145
/// </summary>
146146
public bool IsVersioningSet { get; internal set; }
147147

148+
/// <summary>
149+
/// Gets or sets a callback function that determines whether an orchestration should be accepted for work.
150+
/// </summary>
151+
[Obsolete("Experimental")]
152+
public IOrchestrationFilter? OrchestrationFilter { get; set; }
153+
148154
/// <summary>
149155
/// Gets a value indicating whether <see cref="DataConverter" /> was explicitly set or not.
150156
/// </summary>
@@ -156,6 +162,7 @@ public DataConverter DataConverter
156162
/// </remarks>
157163
internal bool DataConverterExplicitlySet { get; private set; }
158164

165+
159166
/// <summary>
160167
/// Applies these option values to another.
161168
/// </summary>
@@ -169,6 +176,7 @@ internal void ApplyTo(DurableTaskWorkerOptions other)
169176
other.MaximumTimerInterval = this.MaximumTimerInterval;
170177
other.EnableEntitySupport = this.EnableEntitySupport;
171178
other.Versioning = this.Versioning;
179+
other.OrchestrationFilter = this.OrchestrationFilter;
172180
}
173181
}
174182

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
// Copyright (c) Microsoft Corporation.
2+
// Licensed under the MIT License.
3+
4+
namespace Microsoft.DurableTask.Worker;
5+
6+
/// <summary>
7+
/// Defines a filter for validating orchestrations.
8+
/// </summary>
9+
[Obsolete("Experimental")]
10+
public interface IOrchestrationFilter
11+
{
12+
/// <summary>
13+
/// Validate the orchestration against the filter represented by this interface.
14+
/// </summary>
15+
/// <param name="info">The information on the orchestration to validate.</param>
16+
/// <param name="cancellationToken">The cancellation token for the request to timeout.</param>
17+
/// <returns><code>true</code> if the orchestration is valid <code>false</code> otherwise.</returns>
18+
ValueTask<bool> IsOrchestrationValidAsync(OrchestrationFilterParameters info, CancellationToken cancellationToken = default);
19+
}
20+
21+
/// <summary>
22+
/// Struct representation of orchestration information.
23+
/// </summary>
24+
public struct OrchestrationFilterParameters
25+
{
26+
/// <summary>
27+
/// Gets the name of the orchestration.
28+
/// </summary>
29+
public string? Name { get; init; }
30+
31+
/// <summary>
32+
/// Gets the tags associated with the orchestration.
33+
/// </summary>
34+
public IReadOnlyDictionary<string, string>? Tags { get; init; }
35+
}

src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,16 @@ class Processor
3030
readonly TaskHubSidecarServiceClient client;
3131
readonly DurableTaskShimFactory shimFactory;
3232
readonly GrpcDurableTaskWorkerOptions.InternalOptions internalOptions;
33+
[Obsolete("Experimental")]
34+
readonly IOrchestrationFilter? orchestrationFilter;
3335

34-
public Processor(GrpcDurableTaskWorker worker, TaskHubSidecarServiceClient client)
36+
public Processor(GrpcDurableTaskWorker worker, TaskHubSidecarServiceClient client, IOrchestrationFilter? orchestrationFilter = null)
3537
{
3638
this.worker = worker;
3739
this.client = client;
3840
this.shimFactory = new DurableTaskShimFactory(this.worker.grpcOptions, this.worker.loggerFactory);
3941
this.internalOptions = this.worker.grpcOptions.Internal;
42+
this.orchestrationFilter = orchestrationFilter;
4043
}
4144

4245
ILogger Logger => this.worker.logger;
@@ -374,6 +377,31 @@ async Task OnRunOrchestratorAsync(
374377
entityConversionState,
375378
cancellationToken);
376379

380+
bool filterPassed = true;
381+
if (this.orchestrationFilter != null)
382+
{
383+
filterPassed = await this.orchestrationFilter.IsOrchestrationValidAsync(
384+
new OrchestrationFilterParameters
385+
{
386+
Name = runtimeState.Name,
387+
Tags = runtimeState.Tags != null ? new Dictionary<string, string>(runtimeState.Tags) : null,
388+
},
389+
cancellationToken);
390+
}
391+
392+
if (!filterPassed)
393+
{
394+
this.Logger.AbandoningOrchestrationDueToOrchestrationFilter(request.InstanceId, completionToken);
395+
await this.client.AbandonTaskOrchestratorWorkItemAsync(
396+
new P.AbandonOrchestrationTaskRequest
397+
{
398+
CompletionToken = completionToken,
399+
},
400+
cancellationToken: cancellationToken);
401+
402+
return;
403+
}
404+
377405
// If versioning has been explicitly set, we attempt to follow that pattern. If it is not set, we don't compare versions here.
378406
failureDetails = EvaluateOrchestrationVersioning(versioning, runtimeState.Version, out versionFailure);
379407

src/Worker/Grpc/GrpcDurableTaskWorker.cs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ sealed partial class GrpcDurableTaskWorker : DurableTaskWorker
1717
readonly IServiceProvider services;
1818
readonly ILoggerFactory loggerFactory;
1919
readonly ILogger logger;
20+
readonly IOrchestrationFilter? orchestrationFilter;
2021

2122
/// <summary>
2223
/// Initializes a new instance of the <see cref="GrpcDurableTaskWorker" /> class.
@@ -27,28 +28,31 @@ sealed partial class GrpcDurableTaskWorker : DurableTaskWorker
2728
/// <param name="workerOptions">The generic worker options.</param>
2829
/// <param name="services">The service provider.</param>
2930
/// <param name="loggerFactory">The logger.</param>
31+
/// <param name="orchestrationFilter">The optional <see cref="IOrchestrationFilter"/> used to filter orchestration execution.</param>
3032
public GrpcDurableTaskWorker(
3133
string name,
3234
IDurableTaskFactory factory,
3335
IOptionsMonitor<GrpcDurableTaskWorkerOptions> grpcOptions,
3436
IOptionsMonitor<DurableTaskWorkerOptions> workerOptions,
3537
IServiceProvider services,
36-
ILoggerFactory loggerFactory)
38+
ILoggerFactory loggerFactory,
39+
IOrchestrationFilter? orchestrationFilter = null)
3740
: base(name, factory)
3841
{
3942
this.grpcOptions = Check.NotNull(grpcOptions).Get(name);
4043
this.workerOptions = Check.NotNull(workerOptions).Get(name);
4144
this.services = Check.NotNull(services);
4245
this.loggerFactory = Check.NotNull(loggerFactory);
4346
this.logger = loggerFactory.CreateLogger("Microsoft.DurableTask"); // TODO: use better category name.
47+
this.orchestrationFilter = orchestrationFilter;
4448
}
4549

4650
/// <inheritdoc />
4751
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
4852
{
4953
await using AsyncDisposable disposable = this.GetCallInvoker(out CallInvoker callInvoker, out string address);
5054
this.logger.StartingTaskHubWorker(address);
51-
await new Processor(this, new(callInvoker)).ExecuteAsync(stoppingToken);
55+
await new Processor(this, new(callInvoker), this.orchestrationFilter).ExecuteAsync(stoppingToken);
5256
}
5357

5458
#if NET6_0_OR_GREATER

src/Worker/Grpc/Logs.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,5 +57,8 @@ static partial class Logs
5757

5858
[LoggerMessage(EventId = 58, Level = LogLevel.Information, Message = "Abandoning orchestration. InstanceId = '{instanceId}'. Completion token = '{completionToken}'")]
5959
public static partial void AbandoningOrchestrationDueToVersioning(this ILogger logger, string instanceId, string completionToken);
60+
61+
[LoggerMessage(EventId = 59, Level = LogLevel.Information, Message = "Abandoning orchestration due to filtering. InstanceId = '{instanceId}'. Completion token = '{completionToken}'")]
62+
public static partial void AbandoningOrchestrationDueToOrchestrationFilter(this ILogger logger, string instanceId, string completionToken);
6063
}
6164
}

test/Grpc.IntegrationTests/GrpcSidecar/Grpc/TaskHubGrpcServer.cs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@
66
using DurableTask.Core;
77
using DurableTask.Core.History;
88
using DurableTask.Core.Query;
9-
using Microsoft.DurableTask.Sidecar.Dispatcher;
109
using Google.Protobuf.WellKnownTypes;
1110
using Grpc.Core;
11+
using Microsoft.DurableTask.Sidecar.Dispatcher;
1212
using Microsoft.Extensions.Hosting;
1313
using Microsoft.Extensions.Logging;
1414
using Microsoft.Extensions.Options;
@@ -589,6 +589,16 @@ static string GetTaskIdKey(string instanceId, int taskId)
589589
return string.Concat(instanceId, "__", taskId.ToString());
590590
}
591591

592+
public override Task<P.AbandonActivityTaskResponse> AbandonTaskActivityWorkItem(P.AbandonActivityTaskRequest request, ServerCallContext context)
593+
{
594+
return Task.FromResult<P.AbandonActivityTaskResponse>(new());
595+
}
596+
597+
public override Task<P.AbandonOrchestrationTaskResponse> AbandonTaskOrchestratorWorkItem(P.AbandonOrchestrationTaskRequest request, ServerCallContext context)
598+
{
599+
return Task.FromResult<P.AbandonOrchestrationTaskResponse>(new());
600+
}
601+
592602
/// <summary>
593603
/// A <see cref="ITrafficSignal"/> implementation that is used to control whether the task hub
594604
/// dispatcher can fetch new work-items, based on whether a client is currently connected.

0 commit comments

Comments
 (0)