diff --git a/samples/ScheduleWebApp/Activities/CacheClearingActivity.cs b/samples/ScheduleWebApp/Activities/CacheClearingActivity.cs new file mode 100644 index 000000000..17a6ee0b1 --- /dev/null +++ b/samples/ScheduleWebApp/Activities/CacheClearingActivity.cs @@ -0,0 +1,17 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using Microsoft.DurableTask; + +namespace ScheduleWebApp.Activities; + +[DurableTask] // Optional: enables code generation for type-safe calls +public class CacheClearingActivity : TaskActivity +{ + public override async Task RunAsync(TaskActivityContext context, object input) + { + // Simulate cache clearing + await Task.Delay(TimeSpan.FromSeconds(5)); + return "Cache cleared"; + } +} \ No newline at end of file diff --git a/samples/ScheduleWebApp/Orchestrations/CacheClearingOrchestrator.cs b/samples/ScheduleWebApp/Orchestrations/CacheClearingOrchestrator.cs index 4113f07ab..4aad5716a 100644 --- a/samples/ScheduleWebApp/Orchestrations/CacheClearingOrchestrator.cs +++ b/samples/ScheduleWebApp/Orchestrations/CacheClearingOrchestrator.cs @@ -2,6 +2,7 @@ // Licensed under the MIT License. using Microsoft.DurableTask; +using ScheduleWebApp.Activities; namespace ScheduleWebApp.Orchestrations; @@ -14,8 +15,12 @@ public override async Task RunAsync(TaskOrchestrationContext context, st { logger.LogInformation("Starting CacheClearingOrchestration for schedule ID: {ScheduleId}", scheduleId); - // Simulate cache clearing - await Task.Delay(TimeSpan.FromSeconds(5)); + TaskOptions options = new TaskOptions(tags: new Dictionary + { + { "scheduleId", scheduleId } + }); + + await context.CallActivityAsync(nameof(CacheClearingActivity), options); logger.LogInformation("CacheClearingOrchestration completed for schedule ID: {ScheduleId}", scheduleId); diff --git a/samples/ScheduleWebApp/Program.cs b/samples/ScheduleWebApp/Program.cs index fd89cc93f..9ab021a81 100644 --- a/samples/ScheduleWebApp/Program.cs +++ b/samples/ScheduleWebApp/Program.cs @@ -7,6 +7,7 @@ using Microsoft.DurableTask.ScheduledTasks; using Microsoft.DurableTask.Worker; using Microsoft.DurableTask.Worker.AzureManaged; +using ScheduleWebApp.Activities; using ScheduleWebApp.Orchestrations; WebApplicationBuilder builder = WebApplication.CreateBuilder(args); @@ -19,13 +20,14 @@ // Add all the generated orchestrations and activities automatically builder.Services.AddDurableTaskWorker(builder => { + builder.UseDurableTaskScheduler(connectionString); + builder.UseScheduledTasks(); builder.AddTasks(r => { // Add your orchestrators and activities here + r.AddActivity(); r.AddOrchestrator(); }); - builder.UseDurableTaskScheduler(connectionString); - builder.UseScheduledTasks(); }); // Register the client, which can be used to start orchestrations diff --git a/src/Abstractions/TaskOptions.cs b/src/Abstractions/TaskOptions.cs index aed400528..8d4fbd63a 100644 --- a/src/Abstractions/TaskOptions.cs +++ b/src/Abstractions/TaskOptions.cs @@ -19,11 +19,27 @@ public TaskOptions(TaskRetryOptions? retry = null) this.Retry = retry; } + /// + /// Initializes a new instance of the class. + /// + /// The task retry options. + /// The tags to associate with the task. + public TaskOptions(TaskRetryOptions? retry = null, IDictionary? tags = null) + { + this.Retry = retry; + this.Tags = tags; + } + /// /// Gets the task retry options. /// public TaskRetryOptions? Retry { get; init; } + /// + /// Gets the tags to associate with the task. + /// + public IDictionary? Tags { get; init; } + /// /// Returns a new from the provided . /// diff --git a/src/Shared/Grpc/ProtoUtils.cs b/src/Shared/Grpc/ProtoUtils.cs index b73d24374..868ecc661 100644 --- a/src/Shared/Grpc/ProtoUtils.cs +++ b/src/Shared/Grpc/ProtoUtils.cs @@ -4,7 +4,6 @@ using System.Buffers; using System.Buffers.Text; using System.Diagnostics.CodeAnalysis; -using System.Runtime.CompilerServices; using System.Text; using DurableTask.Core; using DurableTask.Core.Command; @@ -91,7 +90,10 @@ internal static HistoryEvent ConvertHistoryEvent(P.HistoryEvent proto, EntityCon proto.EventId, proto.TaskScheduled.Name, proto.TaskScheduled.Version, - proto.TaskScheduled.Input); + proto.TaskScheduled.Input) + { + Tags = proto.TaskScheduled.Tags, + }; break; case P.HistoryEvent.EventTypeOneofCase.TaskCompleted: historyEvent = new TaskCompletedEvent( @@ -304,6 +306,15 @@ internal static P.OrchestratorResponse ConstructOrchestratorResponse( Version = scheduleTaskAction.Version, Input = scheduleTaskAction.Input, }; + + if (scheduleTaskAction.Tags != null) + { + foreach (KeyValuePair tag in scheduleTaskAction.Tags) + { + protoAction.ScheduleTask.Tags[tag.Key] = tag.Value; + } + } + break; case OrchestratorActionType.CreateSubOrchestration: var subOrchestrationAction = (CreateSubOrchestrationAction)action; diff --git a/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs b/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs index db2abe258..c7c5dea57 100644 --- a/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs +++ b/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs @@ -1,6 +1,7 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT License. +using System.Collections.Immutable; using System.Globalization; using System.Security.Cryptography; using System.Text; @@ -138,27 +139,51 @@ public override async Task CallActivityAsync( try { + IDictionary tags = ImmutableDictionary.Empty; + if (options is TaskOptions callActivityOptions) + { + if (callActivityOptions.Tags is not null) + { + tags = callActivityOptions.Tags; + } + } + // TODO: Cancellation (https://github.com/microsoft/durabletask-dotnet/issues/7) #pragma warning disable 0618 if (options?.Retry?.Policy is RetryPolicy policy) { - return await this.innerContext.ScheduleWithRetry( + return await this.innerContext.ScheduleTask( name.Name, name.Version, - policy.ToDurableTaskCoreRetryOptions(), - input); + options: ScheduleTaskOptions.CreateBuilder() + .WithRetryOptions(policy.ToDurableTaskCoreRetryOptions()) + .WithTags(tags) + .Build(), + parameters: input); } else if (options?.Retry?.Handler is AsyncRetryHandler handler) { return await this.InvokeWithCustomRetryHandler( - () => this.innerContext.ScheduleTask(name.Name, name.Version, input), + () => this.innerContext.ScheduleTask( + name.Name, + name.Version, + options: ScheduleTaskOptions.CreateBuilder() + .WithTags(tags) + .Build(), + parameters: input), name.Name, handler, default); } else { - return await this.innerContext.ScheduleTask(name.Name, name.Version, input); + return await this.innerContext.ScheduleTask( + name.Name, + name.Version, + options: ScheduleTaskOptions.CreateBuilder() + .WithTags(tags) + .Build(), + parameters: input); } } catch (global::DurableTask.Core.Exceptions.TaskFailedException e) @@ -492,7 +517,7 @@ string GetDefaultVersion() } // Secondary choice. - if (this.Properties.TryGetValue("defaultVersion", out var propVersion) && propVersion is string v2) + if (this.Properties.TryGetValue("defaultVersion", out object? propVersion) && propVersion is string v2) { return v2; } diff --git a/test/Grpc.IntegrationTests/OrchestrationPatterns.cs b/test/Grpc.IntegrationTests/OrchestrationPatterns.cs index b64a5fa87..2634935e5 100644 --- a/test/Grpc.IntegrationTests/OrchestrationPatterns.cs +++ b/test/Grpc.IntegrationTests/OrchestrationPatterns.cs @@ -944,6 +944,42 @@ public async Task SubOrchestrationTaskVersionOverridesDefaultVersion(string over Assert.Equal($"Sub Orchestration version: {overrideVersion}", output); } + [Fact] + public async Task RunActivityWithTags() + { + TaskName orchestratorName = nameof(RunActivityWithTags); + TaskName taggedActivityName = "TaggedActivity"; + + await using HostTestLifetime server = await this.StartWorkerAsync(b => + { + b.AddTasks(tasks => tasks + .AddOrchestratorFunc( + orchestratorName, (ctx, input) => ctx.CallActivityAsync(taggedActivityName, input)) + .AddActivityFunc(taggedActivityName, (ctx, name) => $"Hello from tagged activity, {name}!")); + }); + + // Schedule orchestration with tags + StartOrchestrationOptions options = new() + { + Tags = new Dictionary + { + { "activityTag", "taggedExecution" }, + { "testType", "activityTagTest" } + } + }; + + string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync( + orchestratorName, input: "World", options); + + OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync( + instanceId, getInputsAndOutputs: true, this.TimeoutToken); + + Assert.NotNull(metadata); + Assert.Equal(instanceId, metadata.InstanceId); + Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); + Assert.Equal("Hello from tagged activity, World!", metadata.ReadOutputAs()); + } + // TODO: Test for multiple external events with the same name // TODO: Test for ContinueAsNew with external events that carry over // TODO: Test for catching activity exceptions of specific types