Skip to content

Commit 15de604

Browse files
committed
Update sub-orchestration to use defaultVersion
Signed-off-by: halspang <[email protected]>
1 parent c78122d commit 15de604

File tree

8 files changed

+199
-25
lines changed

8 files changed

+199
-25
lines changed

src/WebJobs.Extensions.DurableTask/Bindings/OrchestrationTriggerAttributeBindingProvider.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
using Newtonsoft.Json;
1414
using Newtonsoft.Json.Linq;
1515
using static Microsoft.Azure.WebJobs.Extensions.DurableTask.OutOfProcOrchestrationShim;
16+
using proto = Google.Protobuf.WellKnownTypes;
1617

1718
namespace Microsoft.Azure.WebJobs.Extensions.DurableTask
1819
{
@@ -172,6 +173,12 @@ public Task<ITriggerData> BindAsync(object? value, ValueBindingContext context)
172173
EntityParameters = remoteContext.EntityParameters.ToProtobuf(),
173174
};
174175

176+
// We only do a null check as an empty string is a valid version.
177+
if (this.config.Options.DefaultVersion != null)
178+
{
179+
orchestratorRequest.Properties.Add("defaultVersion", proto.Value.ForString(this.config.Options.DefaultVersion));
180+
}
181+
175182
// We convert the binary payload into a base64 string because that seems to be the most commonly supported
176183
// format for Azure Functions language workers. Attempts to send unencoded byte[] payloads were unsuccessful.
177184
string encodedRequest = ProtobufUtils.Base64Encode(orchestratorRequest);

src/WebJobs.Extensions.DurableTask/Grpc/orchestrator_service.proto

Lines changed: 60 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import "google/protobuf/timestamp.proto";
1111
import "google/protobuf/duration.proto";
1212
import "google/protobuf/wrappers.proto";
1313
import "google/protobuf/empty.proto";
14+
import "google/protobuf/struct.proto";
1415

1516
message OrchestrationInstance {
1617
string instanceId = 1;
@@ -75,6 +76,7 @@ message ExecutionStartedEvent {
7576
google.protobuf.Timestamp scheduledStartTimestamp = 6;
7677
TraceContext parentTraceContext = 7;
7778
google.protobuf.StringValue orchestrationSpanID = 8;
79+
map<string, string> tags = 9;
7880
}
7981

8082
message ExecutionCompletedEvent {
@@ -93,6 +95,7 @@ message TaskScheduledEvent {
9395
google.protobuf.StringValue version = 2;
9496
google.protobuf.StringValue input = 3;
9597
TraceContext parentTraceContext = 4;
98+
map<string, string> tags = 5;
9699
}
97100

98101
message TaskCompletedEvent {
@@ -254,6 +257,7 @@ message ScheduleTaskAction {
254257
string name = 1;
255258
google.protobuf.StringValue version = 2;
256259
google.protobuf.StringValue input = 3;
260+
map<string, string> tags = 4;
257261
}
258262

259263
message CreateSubOrchestrationAction {
@@ -288,6 +292,15 @@ message TerminateOrchestrationAction {
288292
bool recurse = 3;
289293
}
290294

295+
message SendEntityMessageAction {
296+
oneof EntityMessageType {
297+
EntityOperationSignaledEvent entityOperationSignaled = 1;
298+
EntityOperationCalledEvent entityOperationCalled = 2;
299+
EntityLockRequestedEvent entityLockRequested = 3;
300+
EntityUnlockSentEvent entityUnlockSent = 4;
301+
}
302+
}
303+
291304
message OrchestratorAction {
292305
int32 id = 1;
293306
oneof orchestratorActionType {
@@ -297,6 +310,7 @@ message OrchestratorAction {
297310
SendEventAction sendEvent = 5;
298311
CompleteOrchestrationAction completeOrchestration = 6;
299312
TerminateOrchestrationAction terminateOrchestration = 7;
313+
SendEntityMessageAction sendEntityMessage = 8;
300314
}
301315
}
302316

@@ -307,6 +321,7 @@ message OrchestratorRequest {
307321
repeated HistoryEvent newEvents = 4;
308322
OrchestratorEntityParameters entityParameters = 5;
309323
bool requiresHistoryStreaming = 6;
324+
map<string, google.protobuf.Value> properties = 7;
310325
}
311326

312327
message OrchestratorResponse {
@@ -333,14 +348,8 @@ message CreateInstanceRequest {
333348
}
334349

335350
message OrchestrationIdReusePolicy {
336-
repeated OrchestrationStatus operationStatus = 1;
337-
CreateOrchestrationAction action = 2;
338-
}
339-
340-
enum CreateOrchestrationAction {
341-
ERROR = 0;
342-
IGNORE = 1;
343-
TERMINATE = 2;
351+
repeated OrchestrationStatus replaceableStatus = 1;
352+
reserved 2;
344353
}
345354

346355
message CreateInstanceResponse {
@@ -381,6 +390,7 @@ message OrchestrationState {
381390
google.protobuf.StringValue executionId = 12;
382391
google.protobuf.Timestamp completedTimestamp = 13;
383392
google.protobuf.StringValue parentInstanceId = 14;
393+
map<string, string> tags = 15;
384394
}
385395

386396
message RaiseEventRequest {
@@ -457,6 +467,7 @@ message PurgeInstanceFilter {
457467

458468
message PurgeInstancesResponse {
459469
int32 deletedInstanceCount = 1;
470+
google.protobuf.BoolValue isComplete = 2;
460471
}
461472

462473
message CreateTaskHubRequest {
@@ -551,6 +562,8 @@ message EntityBatchResult {
551562
repeated OperationAction actions = 2;
552563
google.protobuf.StringValue entityState = 3;
553564
TaskFailureDetails failureDetails = 4;
565+
string completionToken = 5;
566+
repeated OperationInfo operationInfos = 6; // used only with DTS
554567
}
555568

556569
message EntityRequest {
@@ -573,6 +586,11 @@ message OperationResult {
573586
}
574587
}
575588

589+
message OperationInfo {
590+
string requestId = 1;
591+
OrchestrationInstance responseDestination = 2; // null for signals
592+
}
593+
576594
message OperationResultSuccess {
577595
google.protobuf.StringValue result = 1;
578596
}
@@ -604,6 +622,30 @@ message StartNewOrchestrationAction {
604622
google.protobuf.Timestamp scheduledTime = 5;
605623
}
606624

625+
message AbandonActivityTaskRequest {
626+
string completionToken = 1;
627+
}
628+
629+
message AbandonActivityTaskResponse {
630+
// Empty.
631+
}
632+
633+
message AbandonOrchestrationTaskRequest {
634+
string completionToken = 1;
635+
}
636+
637+
message AbandonOrchestrationTaskResponse {
638+
// Empty.
639+
}
640+
641+
message AbandonEntityTaskRequest {
642+
string completionToken = 1;
643+
}
644+
645+
message AbandonEntityTaskResponse {
646+
// Empty.
647+
}
648+
607649
service TaskHubSidecarService {
608650
// Sends a hello request to the sidecar service.
609651
rpc Hello(google.protobuf.Empty) returns (google.protobuf.Empty);
@@ -665,6 +707,15 @@ service TaskHubSidecarService {
665707

666708
// clean entity storage
667709
rpc CleanEntityStorage(CleanEntityStorageRequest) returns (CleanEntityStorageResponse);
710+
711+
// Abandons a single work item
712+
rpc AbandonTaskActivityWorkItem(AbandonActivityTaskRequest) returns (AbandonActivityTaskResponse);
713+
714+
// Abandon an orchestration work item
715+
rpc AbandonTaskOrchestratorWorkItem(AbandonOrchestrationTaskRequest) returns (AbandonOrchestrationTaskResponse);
716+
717+
// Abandon an entity work item
718+
rpc AbandonTaskEntityWorkItem(AbandonEntityTaskRequest) returns (AbandonEntityTaskResponse);
668719
}
669720

670721
message GetWorkItemsRequest {
@@ -714,4 +765,4 @@ message StreamInstanceHistoryRequest {
714765

715766
message HistoryChunk {
716767
repeated HistoryEvent events = 1;
717-
}
768+
}
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
# The following files were downloaded from branch main at 2025-02-07 22:22:03 UTC
2-
https://raw.githubusercontent.com/microsoft/durabletask-protobuf/6000187e90d79fba297dfd75d42095abc1462eba/protos/orchestrator_service.proto
1+
# The following files were downloaded from branch main at 2025-05-29 22:35:16 UTC
2+
https://raw.githubusercontent.com/microsoft/durabletask-protobuf/d02c540473cf8fe6fc09bcf1fe2ac041b583eb9e/protos/orchestrator_service.proto

src/WebJobs.Extensions.DurableTask/LocalGrpcListener.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ public override Task<Empty> Hello(Empty request, ServerCallContext context)
225225
ExecutionStartedEvent executionStartedEvent = new ExecutionStartedEvent(-1, request.Input)
226226
{
227227
Name = request.Name,
228-
Version = !string.IsNullOrEmpty(request.Version) ? request.Version : this.extension.Options.DefaultVersion,
228+
Version = request.Version != null ? request.Version : this.extension.Options.DefaultVersion,
229229
OrchestrationInstance = instance,
230230
ScheduledStartTime = request.ScheduledStartTimestamp?.ToDateTime(),
231231
};

src/WebJobs.Extensions.DurableTask/Options/DurableTaskOptions.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public class DurableTaskOptions
3434
/// <summary>
3535
/// The strategy that will be used for matching versions when running an orchestration. See <see cref="VersioningSettings.VersionMatchStrategy"/> for more information.
3636
/// </summary>
37-
public VersioningSettings.VersionMatchStrategy VersionMatchStrategy { get; set; } = VersioningSettings.VersionMatchStrategy.None;
37+
public VersioningSettings.VersionMatchStrategy VersionMatchStrategy { get; set; } = VersioningSettings.VersionMatchStrategy.CurrentOrOlder;
3838

3939
/// <summary>
4040
/// The strategy that will be used if a versioning failure is detected. See <see cref="VersioningSettings.VersionFailureStrategy"/> for more information.

src/Worker.Extensions.DurableTask/FunctionsOrchestrationContext.cs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,31 @@ public override Task<TResult> CallSubOrchestratorAsync<TResult>(
8989
TaskName orchestratorName, object? input = null, TaskOptions? options = null)
9090
{
9191
this.EnsureLegalAccess();
92+
// If we have a default version, add it to the TaskOptions and call the inner context with it.
93+
if (this.innerContext.Properties.TryGetValue("defaultVersion", out var propVersion) && propVersion is string defaultVersion)
94+
{
95+
SubOrchestrationOptions subOptions;
96+
if (options is SubOrchestrationOptions subOrchestrationOptions)
97+
{
98+
subOptions = new SubOrchestrationOptions
99+
{
100+
InstanceId = subOrchestrationOptions.InstanceId,
101+
Retry = subOrchestrationOptions.Retry,
102+
Version = subOrchestrationOptions.Version?.Version ?? defaultVersion,
103+
};
104+
}
105+
else
106+
{
107+
subOptions = new SubOrchestrationOptions
108+
{
109+
InstanceId = null, // No instance ID specified, so we use the default one.
110+
Retry = options?.Retry,
111+
Version = defaultVersion
112+
};
113+
}
114+
return this.innerContext.CallSubOrchestratorAsync<TResult>(orchestratorName, input, subOptions);
115+
}
116+
92117
return this.innerContext.CallSubOrchestratorAsync<TResult>(orchestratorName, input, options);
93118
}
94119

test/e2e/Apps/BasicDotNetIsolated/VersionedOrchestration.cs

Lines changed: 68 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,40 @@ public static class VersionedOrchestration
1616
public static async Task<string> RunOrchestrator(
1717
[OrchestrationTrigger] TaskOrchestrationContext context)
1818
{
19-
ILogger logger = context.CreateReplaySafeLogger(nameof(HelloCities));
20-
logger.LogInformation($"Versioned orchestration! Version: {context.Version}");
19+
ILogger logger = context.CreateReplaySafeLogger(nameof(VersionedOrchestration));
20+
logger.LogInformation($"Versioned orchestration! Version: '{context.Version}'");
21+
22+
return await context.CallActivityAsync<string>(nameof(SayVersion), context.Version);
23+
}
24+
25+
[Function(nameof(RunWithSubOrchestrator))]
26+
public static async Task<string> RunWithSubOrchestrator(
27+
[OrchestrationTrigger] TaskOrchestrationContext context,
28+
string? subVersion)
29+
{
30+
ILogger logger = context.CreateReplaySafeLogger(nameof(VersionedOrchestration));
31+
logger.LogInformation($"Versioned orchestration! Version: '{context.Version}' Sub Version: '{subVersion}'");
32+
33+
string subOrchestrationResponse;
34+
if (subVersion == null)
35+
{
36+
subOrchestrationResponse = await context.CallSubOrchestratorAsync<string>(nameof(VersionedSubOrchestration));
37+
}
38+
else
39+
{
40+
subOrchestrationResponse = await context.CallSubOrchestratorAsync<string>(nameof(VersionedSubOrchestration), new SubOrchestrationOptions
41+
{
42+
Version = subVersion,
43+
});
44+
}
45+
return $"Parent Version: '{context.Version}' | Sub {subOrchestrationResponse}";
46+
}
47+
48+
[Function(nameof(VersionedSubOrchestration))]
49+
public static async Task<string> VersionedSubOrchestration([OrchestrationTrigger] TaskOrchestrationContext context)
50+
{
51+
ILogger logger = context.CreateReplaySafeLogger(nameof(VersionedOrchestration));
52+
logger.LogInformation($"Versioned sub-orchestration! Version: '{context.Version}'");
2153

2254
return await context.CallActivityAsync<string>(nameof(SayVersion), context.Version);
2355
}
@@ -27,23 +59,26 @@ public static string SayVersion([ActivityTrigger] string version, FunctionContex
2759
{
2860
ILogger logger = executionContext.GetLogger("SayVersion");
2961
logger.LogInformation("Activity running with version: {name}.", version);
30-
return $"Version: {version}";
62+
return $"Version: '{version}'";
3163
}
3264

3365
[Function("OrchestrationVersion_HttpStart")]
3466
public static async Task<HttpResponseData> HttpStart(
3567
[HttpTrigger(AuthorizationLevel.Anonymous, "get", "post")] HttpRequestData req,
3668
[DurableClient] DurableTaskClient client,
3769
FunctionContext executionContext,
38-
string version)
70+
string? version)
3971
{
4072
ILogger logger = executionContext.GetLogger("VersionedOrchestration_HttpStart");
4173

4274
// Function input comes from the request content.
4375
string instanceId;
44-
if (!string.IsNullOrEmpty(version))
76+
if (version != null)
4577
{
46-
instanceId = await client.ScheduleNewOrchestrationInstanceAsync(new TaskName(nameof(VersionedOrchestration), version));
78+
instanceId = await client.ScheduleNewOrchestrationInstanceAsync(nameof(VersionedOrchestration), new StartOrchestrationOptions
79+
{
80+
Version = version,
81+
});
4782
}
4883
else
4984
{
@@ -56,4 +91,31 @@ public static async Task<HttpResponseData> HttpStart(
5691
// See https://learn.microsoft.com/azure/azure-functions/durable/durable-functions-http-api#start-orchestration
5792
return await client.CreateCheckStatusResponseAsync(req, instanceId);
5893
}
94+
95+
[Function("OrchestrationSubVersion_HttpStart")]
96+
public static async Task<HttpResponseData> HttpSubStart(
97+
[HttpTrigger(AuthorizationLevel.Anonymous, "get", "post")] HttpRequestData req,
98+
[DurableClient] DurableTaskClient client,
99+
FunctionContext executionContext,
100+
string? version)
101+
{
102+
ILogger logger = executionContext.GetLogger("OrchestrationSubVersion_HttpStart");
103+
104+
// Function input comes from the request content.
105+
string instanceId;
106+
if (version != null)
107+
{
108+
instanceId = await client.ScheduleNewOrchestrationInstanceAsync(nameof(RunWithSubOrchestrator), input: version);
109+
}
110+
else
111+
{
112+
instanceId = await client.ScheduleNewOrchestrationInstanceAsync(nameof(RunWithSubOrchestrator));
113+
}
114+
115+
logger.LogInformation("Started orchestration with ID = '{instanceId}' and Version = '{version}'.", instanceId, version);
116+
117+
// Returns an HTTP 202 response with an instance management payload.
118+
// See https://learn.microsoft.com/azure/azure-functions/durable/durable-functions-http-api#start-orchestration
119+
return await client.CreateCheckStatusResponseAsync(req, instanceId);
120+
}
59121
}

0 commit comments

Comments
 (0)