Skip to content
Merged
17 changes: 17 additions & 0 deletions samples/ScheduleWebApp/Activities/CacheClearingActivity.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using Microsoft.DurableTask;

namespace ScheduleWebApp.Activities;

[DurableTask] // Optional: enables code generation for type-safe calls
public class CacheClearingActivity : TaskActivity<object, string>
{
public override async Task<string> RunAsync(TaskActivityContext context, object input)
{
// Simulate cache clearing
await Task.Delay(TimeSpan.FromSeconds(5));
return "Cache cleared";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Licensed under the MIT License.

using Microsoft.DurableTask;
using ScheduleWebApp.Activities;

namespace ScheduleWebApp.Orchestrations;

Expand All @@ -14,8 +15,12 @@ public override async Task<string> RunAsync(TaskOrchestrationContext context, st
{
logger.LogInformation("Starting CacheClearingOrchestration for schedule ID: {ScheduleId}", scheduleId);

// Simulate cache clearing
await Task.Delay(TimeSpan.FromSeconds(5));
TaskOptions options = new TaskOptions(tags: new Dictionary<string, string>
{
{ "scheduleId", scheduleId }
});

await context.CallActivityAsync(nameof(CacheClearingActivity), options);

logger.LogInformation("CacheClearingOrchestration completed for schedule ID: {ScheduleId}", scheduleId);

Expand Down
6 changes: 4 additions & 2 deletions samples/ScheduleWebApp/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using Microsoft.DurableTask.ScheduledTasks;
using Microsoft.DurableTask.Worker;
using Microsoft.DurableTask.Worker.AzureManaged;
using ScheduleWebApp.Activities;
using ScheduleWebApp.Orchestrations;

WebApplicationBuilder builder = WebApplication.CreateBuilder(args);
Expand All @@ -19,13 +20,14 @@
// Add all the generated orchestrations and activities automatically
builder.Services.AddDurableTaskWorker(builder =>
{
builder.UseDurableTaskScheduler(connectionString);
builder.UseScheduledTasks();
builder.AddTasks(r =>
{
// Add your orchestrators and activities here
r.AddActivity<CacheClearingActivity>();
r.AddOrchestrator<CacheClearingOrchestrator>();
});
builder.UseDurableTaskScheduler(connectionString);
builder.UseScheduledTasks();
});

// Register the client, which can be used to start orchestrations
Expand Down
16 changes: 16 additions & 0 deletions src/Abstractions/TaskOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,27 @@ public TaskOptions(TaskRetryOptions? retry = null)
this.Retry = retry;
}

/// <summary>
/// Initializes a new instance of the <see cref="TaskOptions"/> class.
/// </summary>
/// <param name="retry">The task retry options.</param>
/// <param name="tags">The tags to associate with the task.</param>
public TaskOptions(TaskRetryOptions? retry = null, IDictionary<string, string>? tags = null)
{
this.Retry = retry;
this.Tags = tags;
}

/// <summary>
/// Gets the task retry options.
/// </summary>
public TaskRetryOptions? Retry { get; init; }

/// <summary>
/// Gets the tags to associate with the task.
/// </summary>
public IDictionary<string, string>? Tags { get; init; }

/// <summary>
/// Returns a new <see cref="TaskOptions" /> from the provided <see cref="RetryPolicy" />.
/// </summary>
Expand Down
15 changes: 13 additions & 2 deletions src/Shared/Grpc/ProtoUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
using System.Buffers;
using System.Buffers.Text;
using System.Diagnostics.CodeAnalysis;
using System.Runtime.CompilerServices;
using System.Text;
using DurableTask.Core;
using DurableTask.Core.Command;
Expand Down Expand Up @@ -91,7 +90,10 @@ internal static HistoryEvent ConvertHistoryEvent(P.HistoryEvent proto, EntityCon
proto.EventId,
proto.TaskScheduled.Name,
proto.TaskScheduled.Version,
proto.TaskScheduled.Input);
proto.TaskScheduled.Input)
{
Tags = proto.TaskScheduled.Tags,
};
break;
case P.HistoryEvent.EventTypeOneofCase.TaskCompleted:
historyEvent = new TaskCompletedEvent(
Expand Down Expand Up @@ -304,6 +306,15 @@ internal static P.OrchestratorResponse ConstructOrchestratorResponse(
Version = scheduleTaskAction.Version,
Input = scheduleTaskAction.Input,
};

if (scheduleTaskAction.Tags != null)
{
foreach (KeyValuePair<string, string> tag in scheduleTaskAction.Tags)
{
protoAction.ScheduleTask.Tags[tag.Key] = tag.Value;
}
}

break;
case OrchestratorActionType.CreateSubOrchestration:
var subOrchestrationAction = (CreateSubOrchestrationAction)action;
Expand Down
37 changes: 31 additions & 6 deletions src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using System.Collections.Immutable;
using System.Globalization;
using System.Security.Cryptography;
using System.Text;
Expand Down Expand Up @@ -138,27 +139,51 @@ public override async Task<T> CallActivityAsync<T>(

try
{
IDictionary<string, string> tags = ImmutableDictionary<string, string>.Empty;
if (options is TaskOptions callActivityOptions)
{
if (callActivityOptions.Tags is not null)
{
tags = callActivityOptions.Tags;
}
}

// TODO: Cancellation (https://github.com/microsoft/durabletask-dotnet/issues/7)
#pragma warning disable 0618
if (options?.Retry?.Policy is RetryPolicy policy)
{
return await this.innerContext.ScheduleWithRetry<T>(
return await this.innerContext.ScheduleTask<T>(
name.Name,
name.Version,
policy.ToDurableTaskCoreRetryOptions(),
input);
options: ScheduleTaskOptions.CreateBuilder()
.WithRetryOptions(policy.ToDurableTaskCoreRetryOptions())
.WithTags(tags)
.Build(),
parameters: input);
}
else if (options?.Retry?.Handler is AsyncRetryHandler handler)
{
return await this.InvokeWithCustomRetryHandler(
() => this.innerContext.ScheduleTask<T>(name.Name, name.Version, input),
() => this.innerContext.ScheduleTask<T>(
name.Name,
name.Version,
options: ScheduleTaskOptions.CreateBuilder()
.WithTags(tags)
.Build(),
parameters: input),
name.Name,
handler,
default);
}
else
{
return await this.innerContext.ScheduleTask<T>(name.Name, name.Version, input);
return await this.innerContext.ScheduleTask<T>(
name.Name,
name.Version,
options: ScheduleTaskOptions.CreateBuilder()
.WithTags(tags)
.Build(),
parameters: input);
}
}
catch (global::DurableTask.Core.Exceptions.TaskFailedException e)
Expand Down Expand Up @@ -492,7 +517,7 @@ string GetDefaultVersion()
}

// Secondary choice.
if (this.Properties.TryGetValue("defaultVersion", out var propVersion) && propVersion is string v2)
if (this.Properties.TryGetValue("defaultVersion", out object? propVersion) && propVersion is string v2)
{
return v2;
}
Expand Down
36 changes: 36 additions & 0 deletions test/Grpc.IntegrationTests/OrchestrationPatterns.cs
Original file line number Diff line number Diff line change
Expand Up @@ -944,6 +944,42 @@ public async Task SubOrchestrationTaskVersionOverridesDefaultVersion(string over
Assert.Equal($"Sub Orchestration version: {overrideVersion}", output);
}

[Fact]
public async Task RunActivityWithTags()
{
TaskName orchestratorName = nameof(RunActivityWithTags);
TaskName taggedActivityName = "TaggedActivity";

await using HostTestLifetime server = await this.StartWorkerAsync(b =>
{
b.AddTasks(tasks => tasks
.AddOrchestratorFunc<string, string>(
orchestratorName, (ctx, input) => ctx.CallActivityAsync<string>(taggedActivityName, input))
.AddActivityFunc<string, string>(taggedActivityName, (ctx, name) => $"Hello from tagged activity, {name}!"));
});

// Schedule orchestration with tags
StartOrchestrationOptions options = new()
{
Tags = new Dictionary<string, string>
{
{ "activityTag", "taggedExecution" },
{ "testType", "activityTagTest" }
}
};

string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(
orchestratorName, input: "World", options);

OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync(
instanceId, getInputsAndOutputs: true, this.TimeoutToken);

Assert.NotNull(metadata);
Assert.Equal(instanceId, metadata.InstanceId);
Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus);
Assert.Equal("Hello from tagged activity, World!", metadata.ReadOutputAs<string>());
}

// TODO: Test for multiple external events with the same name
// TODO: Test for ContinueAsNew with external events that carry over
// TODO: Test for catching activity exceptions of specific types
Expand Down
Loading