Skip to content

Commit 9a796c2

Browse files
authored
Activity tag support (#426)
1 parent 7ea9c0d commit 9a796c2

File tree

7 files changed

+124
-12
lines changed

7 files changed

+124
-12
lines changed
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
// Copyright (c) Microsoft Corporation.
2+
// Licensed under the MIT License.
3+
4+
using Microsoft.DurableTask;
5+
6+
namespace ScheduleWebApp.Activities;
7+
8+
[DurableTask] // Optional: enables code generation for type-safe calls
9+
public class CacheClearingActivity : TaskActivity<object, string>
10+
{
11+
public override async Task<string> RunAsync(TaskActivityContext context, object input)
12+
{
13+
// Simulate cache clearing
14+
await Task.Delay(TimeSpan.FromSeconds(5));
15+
return "Cache cleared";
16+
}
17+
}

samples/ScheduleWebApp/Orchestrations/CacheClearingOrchestrator.cs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// Licensed under the MIT License.
33

44
using Microsoft.DurableTask;
5+
using ScheduleWebApp.Activities;
56

67
namespace ScheduleWebApp.Orchestrations;
78

@@ -14,8 +15,12 @@ public override async Task<string> RunAsync(TaskOrchestrationContext context, st
1415
{
1516
logger.LogInformation("Starting CacheClearingOrchestration for schedule ID: {ScheduleId}", scheduleId);
1617

17-
// Simulate cache clearing
18-
await Task.Delay(TimeSpan.FromSeconds(5));
18+
TaskOptions options = new TaskOptions(tags: new Dictionary<string, string>
19+
{
20+
{ "scheduleId", scheduleId }
21+
});
22+
23+
await context.CallActivityAsync(nameof(CacheClearingActivity), options);
1924

2025
logger.LogInformation("CacheClearingOrchestration completed for schedule ID: {ScheduleId}", scheduleId);
2126

samples/ScheduleWebApp/Program.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
using Microsoft.DurableTask.ScheduledTasks;
88
using Microsoft.DurableTask.Worker;
99
using Microsoft.DurableTask.Worker.AzureManaged;
10+
using ScheduleWebApp.Activities;
1011
using ScheduleWebApp.Orchestrations;
1112

1213
WebApplicationBuilder builder = WebApplication.CreateBuilder(args);
@@ -19,13 +20,14 @@
1920
// Add all the generated orchestrations and activities automatically
2021
builder.Services.AddDurableTaskWorker(builder =>
2122
{
23+
builder.UseDurableTaskScheduler(connectionString);
24+
builder.UseScheduledTasks();
2225
builder.AddTasks(r =>
2326
{
2427
// Add your orchestrators and activities here
28+
r.AddActivity<CacheClearingActivity>();
2529
r.AddOrchestrator<CacheClearingOrchestrator>();
2630
});
27-
builder.UseDurableTaskScheduler(connectionString);
28-
builder.UseScheduledTasks();
2931
});
3032

3133
// Register the client, which can be used to start orchestrations

src/Abstractions/TaskOptions.cs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,27 @@ public TaskOptions(TaskRetryOptions? retry = null)
1919
this.Retry = retry;
2020
}
2121

22+
/// <summary>
23+
/// Initializes a new instance of the <see cref="TaskOptions"/> class.
24+
/// </summary>
25+
/// <param name="retry">The task retry options.</param>
26+
/// <param name="tags">The tags to associate with the task.</param>
27+
public TaskOptions(TaskRetryOptions? retry = null, IDictionary<string, string>? tags = null)
28+
{
29+
this.Retry = retry;
30+
this.Tags = tags;
31+
}
32+
2233
/// <summary>
2334
/// Gets the task retry options.
2435
/// </summary>
2536
public TaskRetryOptions? Retry { get; init; }
2637

38+
/// <summary>
39+
/// Gets the tags to associate with the task.
40+
/// </summary>
41+
public IDictionary<string, string>? Tags { get; init; }
42+
2743
/// <summary>
2844
/// Returns a new <see cref="TaskOptions" /> from the provided <see cref="RetryPolicy" />.
2945
/// </summary>

src/Shared/Grpc/ProtoUtils.cs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
using System.Buffers;
55
using System.Buffers.Text;
66
using System.Diagnostics.CodeAnalysis;
7-
using System.Runtime.CompilerServices;
87
using System.Text;
98
using DurableTask.Core;
109
using DurableTask.Core.Command;
@@ -91,7 +90,10 @@ internal static HistoryEvent ConvertHistoryEvent(P.HistoryEvent proto, EntityCon
9190
proto.EventId,
9291
proto.TaskScheduled.Name,
9392
proto.TaskScheduled.Version,
94-
proto.TaskScheduled.Input);
93+
proto.TaskScheduled.Input)
94+
{
95+
Tags = proto.TaskScheduled.Tags,
96+
};
9597
break;
9698
case P.HistoryEvent.EventTypeOneofCase.TaskCompleted:
9799
historyEvent = new TaskCompletedEvent(
@@ -304,6 +306,15 @@ internal static P.OrchestratorResponse ConstructOrchestratorResponse(
304306
Version = scheduleTaskAction.Version,
305307
Input = scheduleTaskAction.Input,
306308
};
309+
310+
if (scheduleTaskAction.Tags != null)
311+
{
312+
foreach (KeyValuePair<string, string> tag in scheduleTaskAction.Tags)
313+
{
314+
protoAction.ScheduleTask.Tags[tag.Key] = tag.Value;
315+
}
316+
}
317+
307318
break;
308319
case OrchestratorActionType.CreateSubOrchestration:
309320
var subOrchestrationAction = (CreateSubOrchestrationAction)action;

src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// Copyright (c) Microsoft Corporation.
22
// Licensed under the MIT License.
33

4+
using System.Collections.Immutable;
45
using System.Globalization;
56
using System.Security.Cryptography;
67
using System.Text;
@@ -138,27 +139,51 @@ public override async Task<T> CallActivityAsync<T>(
138139

139140
try
140141
{
142+
IDictionary<string, string> tags = ImmutableDictionary<string, string>.Empty;
143+
if (options is TaskOptions callActivityOptions)
144+
{
145+
if (callActivityOptions.Tags is not null)
146+
{
147+
tags = callActivityOptions.Tags;
148+
}
149+
}
150+
141151
// TODO: Cancellation (https://github.com/microsoft/durabletask-dotnet/issues/7)
142152
#pragma warning disable 0618
143153
if (options?.Retry?.Policy is RetryPolicy policy)
144154
{
145-
return await this.innerContext.ScheduleWithRetry<T>(
155+
return await this.innerContext.ScheduleTask<T>(
146156
name.Name,
147157
name.Version,
148-
policy.ToDurableTaskCoreRetryOptions(),
149-
input);
158+
options: ScheduleTaskOptions.CreateBuilder()
159+
.WithRetryOptions(policy.ToDurableTaskCoreRetryOptions())
160+
.WithTags(tags)
161+
.Build(),
162+
parameters: input);
150163
}
151164
else if (options?.Retry?.Handler is AsyncRetryHandler handler)
152165
{
153166
return await this.InvokeWithCustomRetryHandler(
154-
() => this.innerContext.ScheduleTask<T>(name.Name, name.Version, input),
167+
() => this.innerContext.ScheduleTask<T>(
168+
name.Name,
169+
name.Version,
170+
options: ScheduleTaskOptions.CreateBuilder()
171+
.WithTags(tags)
172+
.Build(),
173+
parameters: input),
155174
name.Name,
156175
handler,
157176
default);
158177
}
159178
else
160179
{
161-
return await this.innerContext.ScheduleTask<T>(name.Name, name.Version, input);
180+
return await this.innerContext.ScheduleTask<T>(
181+
name.Name,
182+
name.Version,
183+
options: ScheduleTaskOptions.CreateBuilder()
184+
.WithTags(tags)
185+
.Build(),
186+
parameters: input);
162187
}
163188
}
164189
catch (global::DurableTask.Core.Exceptions.TaskFailedException e)
@@ -492,7 +517,7 @@ string GetDefaultVersion()
492517
}
493518

494519
// Secondary choice.
495-
if (this.Properties.TryGetValue("defaultVersion", out var propVersion) && propVersion is string v2)
520+
if (this.Properties.TryGetValue("defaultVersion", out object? propVersion) && propVersion is string v2)
496521
{
497522
return v2;
498523
}

test/Grpc.IntegrationTests/OrchestrationPatterns.cs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -944,6 +944,42 @@ public async Task SubOrchestrationTaskVersionOverridesDefaultVersion(string over
944944
Assert.Equal($"Sub Orchestration version: {overrideVersion}", output);
945945
}
946946

947+
[Fact]
948+
public async Task RunActivityWithTags()
949+
{
950+
TaskName orchestratorName = nameof(RunActivityWithTags);
951+
TaskName taggedActivityName = "TaggedActivity";
952+
953+
await using HostTestLifetime server = await this.StartWorkerAsync(b =>
954+
{
955+
b.AddTasks(tasks => tasks
956+
.AddOrchestratorFunc<string, string>(
957+
orchestratorName, (ctx, input) => ctx.CallActivityAsync<string>(taggedActivityName, input))
958+
.AddActivityFunc<string, string>(taggedActivityName, (ctx, name) => $"Hello from tagged activity, {name}!"));
959+
});
960+
961+
// Schedule orchestration with tags
962+
StartOrchestrationOptions options = new()
963+
{
964+
Tags = new Dictionary<string, string>
965+
{
966+
{ "activityTag", "taggedExecution" },
967+
{ "testType", "activityTagTest" }
968+
}
969+
};
970+
971+
string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(
972+
orchestratorName, input: "World", options);
973+
974+
OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync(
975+
instanceId, getInputsAndOutputs: true, this.TimeoutToken);
976+
977+
Assert.NotNull(metadata);
978+
Assert.Equal(instanceId, metadata.InstanceId);
979+
Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus);
980+
Assert.Equal("Hello from tagged activity, World!", metadata.ReadOutputAs<string>());
981+
}
982+
947983
// TODO: Test for multiple external events with the same name
948984
// TODO: Test for ContinueAsNew with external events that carry over
949985
// TODO: Test for catching activity exceptions of specific types

0 commit comments

Comments
 (0)