Skip to content

Commit 790a293

Browse files
committed
Expand obsolete usage add tests
Signed-off-by: Hal Spang <[email protected]>
1 parent a27a0f0 commit 790a293

File tree

6 files changed

+105
-19
lines changed

6 files changed

+105
-19
lines changed

src/Worker/Core/DependencyInjection/DurableTaskWorkerBuilderExtensions.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,6 @@ public static IDurableTaskWorkerBuilder UseVersioning(this IDurableTaskWorkerBui
116116
/// <param name="builder">The builder to set the builder target for.</param>
117117
/// <typeparam name="TOrchestrationFilter">The implementation of a <see cref="IOrchestrationFilter"/> that will be bound.</typeparam>
118118
/// <returns>The same <see cref="IDurableTaskWorkerBuilder"/> instance, allowing for method chaining.</returns>
119-
120119
[Obsolete("Experimental")]
121120
public static IDurableTaskWorkerBuilder UseOrchestrationFilter<TOrchestrationFilter>(this IDurableTaskWorkerBuilder builder) where TOrchestrationFilter : class, IOrchestrationFilter
122121
{

src/Worker/Core/DurableTaskWorkerOptions.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,7 @@ public DataConverter DataConverter
148148
/// <summary>
149149
/// Gets or sets a callback function that determines whether an orchestration should be accepted for work.
150150
/// </summary>
151+
[Obsolete("Experimental")]
151152
public IOrchestrationFilter? OrchestrationFilter { get; set; }
152153

153154
/// <summary>

src/Worker/Core/IOrchestrationFilter.cs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ namespace Microsoft.DurableTask.Worker;
66
/// <summary>
77
/// Defines a filter for validating orchestrations.
88
/// </summary>
9+
[Obsolete("Experimental")]
910
public interface IOrchestrationFilter
1011
{
1112
/// <summary>
@@ -14,21 +15,21 @@ public interface IOrchestrationFilter
1415
/// <param name="info">The information on the orchestration to validate.</param>
1516
/// <param name="cancellationToken">The cancellation token for the request to timeout.</param>
1617
/// <returns><code>true</code> if the orchestration is valid <code>false</code> otherwise.</returns>
17-
Task<bool> IsOrchestrationValidAsync(OrchestrationInfo info, CancellationToken cancellationToken = default);
18+
ValueTask<bool> IsOrchestrationValidAsync(OrchestrationFilterParameters info, CancellationToken cancellationToken = default);
1819
}
1920

2021
/// <summary>
2122
/// Struct representation of orchestration information.
2223
/// </summary>
23-
public struct OrchestrationInfo
24+
public struct OrchestrationFilterParameters
2425
{
2526
/// <summary>
2627
/// Gets the name of the orchestration.
2728
/// </summary>
28-
public string Name { get; init; }
29+
public string? Name { get; init; }
2930

3031
/// <summary>
3132
/// Gets the tags associated with the orchestration.
3233
/// </summary>
33-
public Dictionary<string, string> Tags { get; init; }
34-
}
34+
public IReadOnlyDictionary<string, string>? Tags { get; init; }
35+
}

src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
// Copyright (c) Microsoft Corporation.
22
// Licensed under the MIT License.
33

4-
using System.Collections.Immutable;
54
using System.Text;
65
using DurableTask.Core;
76
using DurableTask.Core.Entities;
@@ -31,7 +30,8 @@ class Processor
3130
readonly TaskHubSidecarServiceClient client;
3231
readonly DurableTaskShimFactory shimFactory;
3332
readonly GrpcDurableTaskWorkerOptions.InternalOptions internalOptions;
34-
readonly IOrchestrationFilter? orchestrationFilter = null;
33+
[Obsolete("Experimental")]
34+
readonly IOrchestrationFilter? orchestrationFilter;
3535

3636
public Processor(GrpcDurableTaskWorker worker, TaskHubSidecarServiceClient client, IOrchestrationFilter? orchestrationFilter = null)
3737
{
@@ -381,10 +381,10 @@ async Task OnRunOrchestratorAsync(
381381
if (this.orchestrationFilter != null)
382382
{
383383
filterPassed = await this.orchestrationFilter.IsOrchestrationValidAsync(
384-
new OrchestrationInfo
384+
new OrchestrationFilterParameters
385385
{
386386
Name = runtimeState.Name,
387-
Tags = new Dictionary<string, string>(runtimeState.Tags ?? ImmutableDictionary<string, string>.Empty),
387+
Tags = runtimeState.Tags != null ? new Dictionary<string, string>(runtimeState.Tags) : null,
388388
},
389389
cancellationToken);
390390
}

src/Worker/Grpc/GrpcDurableTaskWorker.cs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
// Licensed under the MIT License.
33

44
using Microsoft.DurableTask.Worker.Hosting;
5-
using Microsoft.Extensions.DependencyInjection;
65
using Microsoft.Extensions.Logging;
76
using Microsoft.Extensions.Options;
87

@@ -18,6 +17,7 @@ sealed partial class GrpcDurableTaskWorker : DurableTaskWorker
1817
readonly IServiceProvider services;
1918
readonly ILoggerFactory loggerFactory;
2019
readonly ILogger logger;
20+
readonly IOrchestrationFilter? orchestrationFilter;
2121

2222
/// <summary>
2323
/// Initializes a new instance of the <see cref="GrpcDurableTaskWorker" /> class.
@@ -28,29 +28,31 @@ sealed partial class GrpcDurableTaskWorker : DurableTaskWorker
2828
/// <param name="workerOptions">The generic worker options.</param>
2929
/// <param name="services">The service provider.</param>
3030
/// <param name="loggerFactory">The logger.</param>
31+
/// <param name="orchestrationFilter">The optional <see cref="IOrchestrationFilter"/> used to filter orchestration execution.</param>
3132
public GrpcDurableTaskWorker(
3233
string name,
3334
IDurableTaskFactory factory,
3435
IOptionsMonitor<GrpcDurableTaskWorkerOptions> grpcOptions,
3536
IOptionsMonitor<DurableTaskWorkerOptions> workerOptions,
3637
IServiceProvider services,
37-
ILoggerFactory loggerFactory)
38+
ILoggerFactory loggerFactory,
39+
IOrchestrationFilter? orchestrationFilter = null)
3840
: base(name, factory)
3941
{
4042
this.grpcOptions = Check.NotNull(grpcOptions).Get(name);
4143
this.workerOptions = Check.NotNull(workerOptions).Get(name);
4244
this.services = Check.NotNull(services);
4345
this.loggerFactory = Check.NotNull(loggerFactory);
4446
this.logger = loggerFactory.CreateLogger("Microsoft.DurableTask"); // TODO: use better category name.
47+
this.orchestrationFilter = orchestrationFilter;
4548
}
4649

4750
/// <inheritdoc />
4851
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
4952
{
5053
await using AsyncDisposable disposable = this.GetCallInvoker(out CallInvoker callInvoker, out string address);
5154
this.logger.StartingTaskHubWorker(address);
52-
IOrchestrationFilter? filter = this.services.GetService<IOrchestrationFilter>();
53-
await new Processor(this, new(callInvoker), filter).ExecuteAsync(stoppingToken);
55+
await new Processor(this, new(callInvoker), this.orchestrationFilter).ExecuteAsync(stoppingToken);
5456
}
5557

5658
#if NET6_0_OR_GREATER

test/Grpc.IntegrationTests/OrchestrationPatterns.cs

Lines changed: 88 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1006,7 +1006,43 @@ public async Task FilterOrchestrationsByName()
10061006

10071007
// This should throw as the work is denied.
10081008
instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName);
1009-
await Assert.ThrowsAsync<OperationCanceledException>(async () => await server.Client.WaitForInstanceCompletionAsync(instanceId, new CancellationTokenSource(TimeSpan.FromSeconds(5)).Token));
1009+
using CancellationTokenSource cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
1010+
await Assert.ThrowsAsync<OperationCanceledException>(async () => await server.Client.WaitForInstanceCompletionAsync(instanceId, cts.Token));
1011+
}
1012+
1013+
[Obsolete("Experimental")]
1014+
[Fact]
1015+
public async Task FilterOrchestrationsByNamePassesWhenNotMatching()
1016+
{
1017+
// Setup a worker with an Orchestration Filter.
1018+
TaskName orchestratorName = nameof(EmptyOrchestration);
1019+
var orchestrationFilter = new OrchestrationFilter();
1020+
await using HostTestLifetime server = await this.StartWorkerAsync(b =>
1021+
{
1022+
b.AddTasks(tasks => tasks.AddOrchestratorFunc(orchestratorName, ctx => Task.FromResult<object?>(null)));
1023+
b.UseOrchestrationFilter(orchestrationFilter);
1024+
});
1025+
1026+
// Nothing in the filter set, the orchestration should complete.
1027+
string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName);
1028+
OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync(
1029+
instanceId, this.TimeoutToken);
1030+
1031+
Assert.NotNull(metadata);
1032+
Assert.Equal(instanceId, metadata.InstanceId);
1033+
Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus);
1034+
1035+
// Update the filter and re-enqueue. The name doesn't match so the filter should be OK.
1036+
orchestrationFilter.NameDenySet.Add($"not-{orchestratorName}");
1037+
1038+
// This should throw as the work is denied.
1039+
instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName);
1040+
metadata = await server.Client.WaitForInstanceCompletionAsync(
1041+
instanceId, this.TimeoutToken);
1042+
1043+
Assert.NotNull(metadata);
1044+
Assert.Equal(instanceId, metadata.InstanceId);
1045+
Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus);
10101046
}
10111047

10121048
[Obsolete("Experimental")]
@@ -1046,19 +1082,66 @@ public async Task FilterOrchestrationsByTag()
10461082
{
10471083
Tags = orchestratorTags,
10481084
});
1049-
await Assert.ThrowsAsync<OperationCanceledException>(async () => await server.Client.WaitForInstanceCompletionAsync(instanceId, new CancellationTokenSource(TimeSpan.FromSeconds(5)).Token));
1085+
using CancellationTokenSource cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
1086+
await Assert.ThrowsAsync<OperationCanceledException>(async () => await server.Client.WaitForInstanceCompletionAsync(instanceId, cts.Token));
10501087
}
10511088

1089+
[Obsolete("Experimental")]
1090+
[Fact]
1091+
public async Task FilterOrchestrationsByTagPassesWithNoMatch()
1092+
{
1093+
// Setup a worker with an Orchestration Filter.
1094+
TaskName orchestratorName = nameof(EmptyOrchestration);
1095+
IReadOnlyDictionary<string, string> orchestratorTags = new Dictionary<string, string>
1096+
{
1097+
{ "test", "true" }
1098+
};
1099+
var orchestrationFilter = new OrchestrationFilter();
1100+
await using HostTestLifetime server = await this.StartWorkerAsync(b =>
1101+
{
1102+
b.AddTasks(tasks => tasks.AddOrchestratorFunc(orchestratorName, ctx => Task.FromResult<object?>(null)));
1103+
b.UseOrchestrationFilter(orchestrationFilter);
1104+
});
1105+
1106+
// Nothing in the filter set, the orchestration should complete.
1107+
string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName, new StartOrchestrationOptions
1108+
{
1109+
Tags = orchestratorTags,
1110+
});
1111+
OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync(
1112+
instanceId, this.TimeoutToken);
1113+
1114+
Assert.NotNull(metadata);
1115+
Assert.Equal(instanceId, metadata.InstanceId);
1116+
Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus);
1117+
1118+
// Update the filter and re-enqueue. The tags don't match so the orchestration should be OK.
1119+
orchestrationFilter.TagDenyDict.Add("test", "false");
1120+
1121+
// This should throw as the work is denied.
1122+
instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName, new StartOrchestrationOptions
1123+
{
1124+
Tags = orchestratorTags,
1125+
});
1126+
metadata = await server.Client.WaitForInstanceCompletionAsync(
1127+
instanceId, this.TimeoutToken);
1128+
1129+
Assert.NotNull(metadata);
1130+
Assert.Equal(instanceId, metadata.InstanceId);
1131+
Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus);
1132+
}
1133+
1134+
[Obsolete("Experimental")]
10521135
class OrchestrationFilter : IOrchestrationFilter
10531136
{
10541137
public ISet<string> NameDenySet { get; set; } = new HashSet<string>();
10551138
public IDictionary<string, string> TagDenyDict = new Dictionary<string, string>();
10561139

1057-
public Task<bool> IsOrchestrationValidAsync(OrchestrationInfo info, CancellationToken cancellationToken = default)
1140+
public ValueTask<bool> IsOrchestrationValidAsync(OrchestrationFilterParameters info, CancellationToken cancellationToken = default)
10581141
{
1059-
return Task.FromResult(
1142+
return ValueTask.FromResult(
10601143
!this.NameDenySet.Contains(info.Name)
1061-
&& !this.TagDenyDict.Any(kvp => info.Tags.ContainsKey(kvp.Key) && info.Tags[kvp.Key] == kvp.Value));
1144+
&& !this.TagDenyDict.Any(kvp => info.Tags != null && info.Tags.ContainsKey(kvp.Key) && info.Tags[kvp.Key] == kvp.Value));
10621145
}
10631146
}
10641147

0 commit comments

Comments
 (0)