Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
// limitations under the License.
// ----------------------------------------------------------------------------------
#nullable enable
using System.Collections.Generic;

namespace DurableTask.Core.Command
{
/// <summary>
Expand Down Expand Up @@ -41,5 +43,10 @@ public class ScheduleTaskOrchestratorAction : OrchestratorAction

// TODO: This property is not used and should be removed or made obsolete
internal string? Tasklist { get; set; }

/// <summary>
/// Gets or sets a dictionary of tags of string, string
/// </summary>
public IDictionary<string, string>? Tags { get; set; }
}
}
2 changes: 1 addition & 1 deletion src/DurableTask.Core/DurableTask.Core.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
<PropertyGroup>
<MajorVersion>3</MajorVersion>
<MinorVersion>2</MinorVersion>
<PatchVersion>0</PatchVersion>
<PatchVersion>1</PatchVersion>
<VersionPrefix>$(MajorVersion).$(MinorVersion).$(PatchVersion)</VersionPrefix>
<FileVersion>$(VersionPrefix).0</FileVersion>
<!-- FileVersionRevision is expected to be set by the CI. This is useful for distinguishing between multiple builds of the same version. -->
Expand Down
7 changes: 7 additions & 0 deletions src/DurableTask.Core/History/TaskScheduledEvent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#nullable enable
namespace DurableTask.Core.History
{
using System.Collections.Generic;
using System.Runtime.Serialization;
using DurableTask.Core.Tracing;

Expand Down Expand Up @@ -84,5 +85,11 @@ public TaskScheduledEvent(int eventId)
/// </summary>
[DataMember]
public DistributedTraceContext? ParentTraceContext { get; set; }

/// <summary>
/// Gets or sets a dictionary of tags of string, string
/// </summary>
[DataMember]
public IDictionary<string, string>? Tags { get; set; }
}
}
13 changes: 13 additions & 0 deletions src/DurableTask.Core/OrchestrationContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,19 @@ public virtual Task<TResult> ScheduleTask<TResult>(Type activityType, params obj
NameVersionHelper.GetDefaultVersion(activityType), parameters);
}

/// <summary>
/// Schedule a TaskActivity by type, version, and tags.
/// </summary>
/// <typeparam name="TResult">Return Type of the TaskActivity.Execute method</typeparam>
/// <param name="name">Name of the orchestration as specified by the ObjectCreator</param>
/// <param name="version">Name of the orchestration as specified by the ObjectCreator</param>
/// <param name="parameters">Parameters for the TaskActivity.Execute method</param>
/// <param name="options">Options for scheduling a task</param>
public virtual Task<TResult> ScheduleTask<TResult>(string name, string version, ScheduleTaskOptions options, params object[] parameters)
{
throw new NotImplementedException();
}

/// <summary>
/// Schedule a TaskActivity by name and version.
/// </summary>
Expand Down
1 change: 1 addition & 0 deletions src/DurableTask.Core/OrchestrationRuntimeState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,7 @@ HistoryEvent GenerateAbridgedEvent(HistoryEvent evt)
Name = taskScheduledEvent.Name,
Version = taskScheduledEvent.Version,
Input = "[..snipped..]",
Tags = taskScheduledEvent.Tags,
};
}
else if (evt is TaskCompletedEvent taskCompletedEvent)
Expand Down
143 changes: 143 additions & 0 deletions src/DurableTask.Core/ScheduleTaskOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
// ----------------------------------------------------------------------------------
// Copyright Microsoft Corporation
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ----------------------------------------------------------------------------------

#nullable enable

namespace DurableTask.Core
{
using System;
using System.Collections.Generic;

/// <summary>
/// Options for scheduling a task.
/// </summary>
public class ScheduleTaskOptions
{
/// <summary>
/// Initializes a new instance of the <see cref="ScheduleTaskOptions"/> class.
/// </summary>
protected ScheduleTaskOptions()
{
}

/// <summary>
/// Dictionary of key/value tags associated with this instance.
/// </summary>
public IDictionary<string, string>? Tags { get; internal set; }

/// <summary>
/// Gets or sets the retry options for the scheduled task.
/// </summary>
public RetryOptions? RetryOptions { get; internal set; }

/// <summary>
/// Creates a new builder for constructing a <see cref="ScheduleTaskOptions"/> instance.
/// </summary>
/// <returns>A new builder for creating schedule task options.</returns>
public static Builder CreateBuilder()
{
return new Builder();
}

/// <summary>
/// Builder class for creating instances of <see cref="ScheduleTaskOptions"/>.
/// </summary>
public class Builder
{
private readonly ScheduleTaskOptions options;

/// <summary>
/// Initializes a new instance of the <see cref="Builder"/> class.
/// </summary>
internal Builder()
{
this.options = new ScheduleTaskOptions();
}

/// <summary>
/// Sets the tags for the schedule task options.
/// </summary>
/// <param name="tags">The dictionary of key/value tags.</param>
/// <returns>The builder instance.</returns>
public Builder WithTags(IDictionary<string, string> tags)
{
this.options.Tags = tags;
return this;
}

/// <summary>
/// Adds a tag to the schedule task options.
/// </summary>
/// <param name="key">The tag key.</param>
/// <param name="value">The tag value.</param>
/// <returns>The builder instance.</returns>
public Builder AddTag(string key, string value)
{
if (this.options.Tags == null)
{
this.options.Tags = new Dictionary<string, string>();
}

this.options.Tags[key] = value;
return this;
}

/// <summary>
/// Sets the retry options for the scheduled task.
/// </summary>
/// <param name="retryOptions">The retry options to use.</param>
/// <returns>The builder instance.</returns>
public Builder WithRetryOptions(RetryOptions retryOptions)
{
this.options.RetryOptions = retryOptions;
return this;
}

/// <summary>
/// Sets the retry options for the scheduled task with the specified parameters.
/// </summary>
/// <param name="firstRetryInterval">Timespan to wait for the first retry.</param>
/// <param name="maxNumberOfAttempts">Max number of attempts to retry.</param>
/// <returns>The builder instance.</returns>
public Builder WithRetryOptions(TimeSpan firstRetryInterval, int maxNumberOfAttempts)
{
this.options.RetryOptions = new RetryOptions(firstRetryInterval, maxNumberOfAttempts);
return this;
}

/// <summary>
/// Sets the retry options for the scheduled task with the specified parameters and configures additional properties.
/// </summary>
/// <param name="firstRetryInterval">Timespan to wait for the first retry.</param>
/// <param name="maxNumberOfAttempts">Max number of attempts to retry.</param>
/// <param name="configureRetryOptions">Action to configure additional retry option properties.</param>
/// <returns>The builder instance.</returns>
public Builder WithRetryOptions(TimeSpan firstRetryInterval, int maxNumberOfAttempts, Action<RetryOptions> configureRetryOptions)
{
var retryOptions = new RetryOptions(firstRetryInterval, maxNumberOfAttempts);
configureRetryOptions?.Invoke(retryOptions);
this.options.RetryOptions = retryOptions;
return this;
}

/// <summary>
/// Builds the <see cref="ScheduleTaskOptions"/> instance.
/// </summary>
/// <returns>The built schedule task options.</returns>
public ScheduleTaskOptions Build()
{
return this.options;
}
}
}
}
35 changes: 32 additions & 3 deletions src/DurableTask.Core/TaskOrchestrationContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,25 @@ public override async Task<TResult> ScheduleTask<TResult>(string name, string ve
return result;
}

public override async Task<TResult> ScheduleTask<TResult>(string name, string version,
ScheduleTaskOptions options, params object[] parameters)
{
if (options.RetryOptions != null)
{
Task<TResult> RetryCall() => ScheduleTask<TResult>(name, version, ScheduleTaskOptions.CreateBuilder().WithTags(options.Tags).Build(), parameters);
var retryInterceptor = new RetryInterceptor<TResult>(this, options.RetryOptions, RetryCall);
return await retryInterceptor.Invoke();
}

TResult result = await ScheduleTaskToWorker<TResult>(name, version, null, options, parameters);

return result;
}

public async Task<TResult> ScheduleTaskToWorker<TResult>(string name, string version, string taskList,
params object[] parameters)
ScheduleTaskOptions options, params object[] parameters)
{
object result = await ScheduleTaskInternal(name, version, taskList, typeof(TResult), parameters);
object result = await ScheduleTaskInternal(name, version, taskList, typeof(TResult), options, parameters);

if (result == null)
{
Expand All @@ -99,8 +114,14 @@ public async Task<TResult> ScheduleTaskToWorker<TResult>(string name, string ver
return (TResult)result;
}

public async Task<object> ScheduleTaskInternal(string name, string version, string taskList, Type resultType,
public async Task<TResult> ScheduleTaskToWorker<TResult>(string name, string version, string taskList,
params object[] parameters)
{
return await ScheduleTaskToWorker<TResult>(name, version, taskList, ScheduleTaskOptions.CreateBuilder().Build(), parameters);
}

public async Task<object> ScheduleTaskInternal(string name, string version, string taskList, Type resultType,
ScheduleTaskOptions options, params object[] parameters)
{
int id = this.idCounter++;
string serializedInput = this.MessageDataConverter.SerializeInternal(parameters);
Expand All @@ -111,6 +132,7 @@ public async Task<object> ScheduleTaskInternal(string name, string version, stri
Version = version,
Tasklist = taskList,
Input = serializedInput,
Tags = options.Tags,
};

this.orchestratorActionsMap.Add(id, scheduleTaskTaskAction);
Expand All @@ -123,6 +145,13 @@ public async Task<object> ScheduleTaskInternal(string name, string version, stri
return this.MessageDataConverter.Deserialize(serializedResult, resultType);
}


public async Task<object> ScheduleTaskInternal(string name, string version, string taskList, Type resultType,
params object[] parameters)
{
return await ScheduleTaskInternal(name, version, taskList, resultType, ScheduleTaskOptions.CreateBuilder().Build(), parameters);
}

public override Task<T> CreateSubOrchestrationInstance<T>(
string name,
string version,
Expand Down
9 changes: 6 additions & 3 deletions src/DurableTask.Core/TaskOrchestrationDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1050,12 +1050,13 @@ TaskMessage ProcessScheduleTaskDecision(
}

var taskMessage = new TaskMessage();

var scheduledEvent = new TaskScheduledEvent(
eventId: scheduleTaskOrchestratorAction.Id,
name: scheduleTaskOrchestratorAction.Name,
version: scheduleTaskOrchestratorAction.Version,
input: scheduleTaskOrchestratorAction.Input);
input: scheduleTaskOrchestratorAction.Input) {
Tags = scheduleTaskOrchestratorAction.Tags
};

ActivitySpanId clientSpanId = ActivitySpanId.CreateRandom();

Expand All @@ -1074,7 +1075,9 @@ TaskMessage ProcessScheduleTaskDecision(
scheduledEvent = new TaskScheduledEvent(
eventId: scheduleTaskOrchestratorAction.Id,
name: scheduleTaskOrchestratorAction.Name,
version: scheduleTaskOrchestratorAction.Version);
version: scheduleTaskOrchestratorAction.Version) {
Tags = scheduleTaskOrchestratorAction.Tags
};

if (parentTraceActivity != null)
{
Expand Down
5 changes: 3 additions & 2 deletions test/DurableTask.AzureStorage.Tests/ScheduleTaskTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@

namespace DurableTask.AzureStorage.Tests
{
using System.Threading.Tasks;
using DurableTask.Core;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using System.Collections.Generic;
using System.Threading.Tasks;

[TestClass]
public class ScheduleTaskTests
Expand All @@ -40,7 +41,7 @@ public async Task TaskReturnsVoid_OrchestratorFails()
TestInstance<int> instance = await host.StartInlineOrchestration(
input: 123,
orchestrationName: "TestOrchestration",
implementation: (ctx, input) => ctx.ScheduleTask<int>("Activity", "", input),
implementation: (ctx, input) => ctx.ScheduleTask<int>("Activity", "", new Dictionary<string, string>()),
activities: ("Activity", activity));

// The expectedOutput value is the string that's passed into the InvalidOperationException
Expand Down
Loading