Skip to content

Commit 4e5fc1a

Browse files
sophiatevSophia TevosyanCopilot
authored
Add support for dedupe statuses in isolated (#3316)
* first commit, will add e2e tests in the next one * added the e2e tests: * removing unnecessary changes * made hellocities more succinct * missed instance ID assignment * PR comment updates * fixing whitespace * removed linked cancellation token use * updated the exception handling in the e2e test to only catch OrchestrationAlreadyExistsException * removing question comments * removed unnecessary usings * fixing a comment * updating the terminating poll to end if the orchestration reaches any terminal status * updated HTTP request to include cancellation token * fixing the build bug * fixed CreateTaskOrchestration overrides to ultimately call the same method * PR comments: * updating the termination poll logic to use WaitForOrchestration * Adding an ArgumentException for invalid dedupe statuses (any running + terminated) * added support to terminate existing running instances for restart * Update src/WebJobs.Extensions.DurableTask/HttpApiHandler.cs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update test/e2e/Tests/Tests/DedupeStatusesTests.cs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update test/e2e/Tests/Tests/RestartOrchestrationTests.cs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * fixing the typoe * added log assertions to the tests, removed the huge RestartOrchestrationTests diff, addressed more copilot comments * Update test/e2e/Apps/BasicJava/src/main/java/com/function/HelloCities.java Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * fixing the build warnings and addressing PR comments * removing the version specified in one of the package references in VSSample.csproj since we use CPM * further fixing the nuget build errors * continuing the nuget attempts, addressing some copilot comments * fixed the failing test * updated .NET SDK dependencies * attempting to fix the e2e dts and mssql errors * Update test/e2e/Apps/BasicPowerShell/LargeOutputOrchestrator/run.ps1 Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * moving the placement of the log checks in the restart tests to try to reduce the flakiness * added conditional skip on the Suspended status for MSSQL since it doesnt support terminating suspended orchestrations * changing the logic in the restart test to wait for the restart to complete to remove flakiness * Update test/e2e/Apps/BasicNode/src/functions/LargeOutputOrchestrator.ts Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * more changes to try to fix the flakiness in the tests * missed a pending case in DedupeStatusesTests * fixing the ScheduledStartTime typo in the tests * updated the DTS and MSSQL dependencies which unblocked certain test cases * updated the mssql dependency to the working package --------- Co-authored-by: Sophia Tevosyan <stevosyan@microsoft.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
1 parent 182cb5e commit 4e5fc1a

29 files changed

+1050
-169
lines changed

Directory.Packages.props

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,9 @@
2727
<PackageVersion Include="Microsoft.CodeAnalysis.CSharp" Version="3.9.0" />
2828
<PackageVersion Include="Microsoft.CodeAnalysis.CSharp.Workspaces" Version="3.9.0" />
2929
<PackageVersion Include="Microsoft.CodeAnalysis.Workspaces.Common" Version="3.9.0" />
30-
<PackageVersion Include="Microsoft.DurableTask.Client.Grpc" Version="1.20.1" />
31-
<PackageVersion Include="Microsoft.DurableTask.Worker.Grpc" Version="1.20.1" />
32-
<PackageVersion Include="Microsoft.DurableTask.Abstractions" Version="1.20.1" />
30+
<PackageVersion Include="Microsoft.DurableTask.Client.Grpc" Version="1.22.0" />
31+
<PackageVersion Include="Microsoft.DurableTask.Worker.Grpc" Version="1.22.0" />
32+
<PackageVersion Include="Microsoft.DurableTask.Abstractions" Version="1.22.0" />
3333
<PackageVersion Include="Microsoft.DurableTask.Analyzers" Version="0.2.0" />
3434
<PackageVersion Include="Microsoft.Extensions.Azure" Version="1.7.0" />
3535
<PackageVersion Include="Microsoft.Extensions.Caching.Memory" Version="10.0.0" />
@@ -58,8 +58,8 @@
5858
<PackageVersion Include="Microsoft.Azure.DurableTask.Redis" Version="0.1.9-alpha" />
5959
<PackageVersion Include="Microsoft.Azure.Functions.Worker" Version="2.2.0" />
6060
<PackageVersion Include="Microsoft.Azure.Functions.Worker.ApplicationInsights" Version="2.0.0" />
61-
<PackageVersion Include="Microsoft.Azure.Functions.Worker.Extensions.DurableTask.AzureManaged" Version="0.4.2-alpha" />
62-
<PackageVersion Include="Microsoft.Azure.Functions.Worker.Extensions.DurableTask.SqlServer" Version="1.5.2" />
61+
<PackageVersion Include="Microsoft.Azure.Functions.Worker.Extensions.DurableTask.AzureManaged" Version="1.4.0" />
62+
<PackageVersion Include="Microsoft.Azure.Functions.Worker.Extensions.DurableTask.SqlServer" Version="1.5.4" />
6363
<PackageVersion Include="Microsoft.Azure.Functions.Worker.Extensions.Http" Version="3.3.0" />
6464
<PackageVersion Include="Microsoft.Azure.Functions.Worker.Extensions.Http.AspNetCore" Version="2.0.2" />
6565
<PackageVersion Include="Microsoft.Azure.Functions.Worker.Sdk" Version="2.0.5"/>

src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableClient.cs

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1140,16 +1140,6 @@ async Task<string> IDurableOrchestrationClient.RestartAsync(string instanceId, b
11401140
// GetOrchestrationInstanceStateAsync will throw ArgumentException if the provided instanceid is not found.
11411141
OrchestrationState state = await this.GetOrchestrationInstanceStateAsync(instanceId);
11421142

1143-
bool isInstanceNotCompleted = state.OrchestrationStatus == OrchestrationStatus.Running ||
1144-
state.OrchestrationStatus == OrchestrationStatus.Pending ||
1145-
state.OrchestrationStatus == OrchestrationStatus.Suspended;
1146-
1147-
if (isInstanceNotCompleted && !restartWithNewInstanceId)
1148-
{
1149-
throw new InvalidOperationException($"Instance '{instanceId}' cannot be restarted while it is in state '{state.OrchestrationStatus}'. " +
1150-
"Wait until it has completed, or restart with a new instance ID.");
1151-
}
1152-
11531143
JToken input = ParseToJToken(state.Input);
11541144

11551145
string newInstanceId = null;

src/WebJobs.Extensions.DurableTask/DurabilityProvider.cs

Lines changed: 143 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
1-
// Copyright (c) .NET Foundation. All rights reserved.
1+
// Copyright (c) .NET Foundation. All rights reserved.
22
// Licensed under the MIT License. See LICENSE in the project root for license information.
33

44
using System;
55
using System.Collections.Generic;
6+
using System.Linq;
67
using System.Threading;
78
using System.Threading.Tasks;
89
using DurableTask.Core;
910
using DurableTask.Core.Entities;
11+
using DurableTask.Core.Exceptions;
1012
using DurableTask.Core.History;
1113
using DurableTask.Core.Query;
1214
using Microsoft.Azure.WebJobs.Host.Scale;
@@ -38,6 +40,7 @@ public class DurabilityProvider :
3840
private readonly IOrchestrationServiceClient innerServiceClient;
3941
private readonly IEntityOrchestrationService entityOrchestrationService;
4042
private readonly string connectionName;
43+
private readonly int orchestrationCreationRequestTimeoutInSeconds = 180;
4144

4245
/// <summary>
4346
/// Creates the default <see cref="DurabilityProvider"/>.
@@ -406,10 +409,63 @@ public Task CreateTaskOrchestrationAsync(TaskMessage creationMessage)
406409
return this.GetOrchestrationServiceClient().CreateTaskOrchestrationAsync(creationMessage);
407410
}
408411

409-
/// <inheritdoc />
410-
public Task CreateTaskOrchestrationAsync(TaskMessage creationMessage, OrchestrationStatus[] dedupeStatuses)
411-
{
412-
return this.GetOrchestrationServiceClient().CreateTaskOrchestrationAsync(creationMessage, dedupeStatuses);
412+
/// <summary>
413+
/// Creates a new task orchestration instance using the specified creation message and dedupe statuses.
414+
/// </summary>
415+
/// <param name="creationMessage">The creation message for the orchestration.</param>
416+
/// <param name="dedupeStatuses">An array of orchestration statuses used for "dedupping":
417+
/// If an orchestration with the same instance ID already exists, and its status is in this array, then a
418+
/// <see cref="OrchestrationAlreadyExistsException"/> will be thrown.
419+
/// If the array contains all of the running statuses (<see cref="OrchestrationStatus.Pending"/>, <see cref="OrchestrationStatus.Running"/>,
420+
/// and <see cref="OrchestrationStatus.Suspended"/>), then only terminal statuses can be reused.
421+
/// If at least one of these statuses is not included in the array, then if an instance with that status is found, it will first be terminated
422+
/// before a new orchestration is created. If the existing instance does not reach a terminal state within 3 minutes, the operation will be cancelled.
423+
/// </param>
424+
/// <returns>A task that completes when the creation message for the task orchestration instance is enqueued.</returns>
425+
/// <exception cref="OrchestrationAlreadyExistsException">Thrown if an orchestration with the same instance ID already exists and its status
426+
/// is in <paramref name="dedupeStatuses"/>.</exception>
427+
/// <exception cref="OperationCanceledException">Thrown if an existing running instance does not reach a terminal state within 3 minutes.</exception>
428+
/// <exception cref="ArgumentException">
429+
/// Thrown if <paramref name="dedupeStatuses"/> contains <see cref="OrchestrationStatus.Terminated"/> but also allows at least one running status
430+
/// to be reusable. In this case, an existing orchestration with that running status would be terminated, but the creation of the new orchestration
431+
/// would immediately fail due to the existing orchestration now having status <see cref="OrchestrationStatus.Terminated"/>.
432+
/// </exception>
433+
public async Task CreateTaskOrchestrationAsync(TaskMessage creationMessage, OrchestrationStatus[] dedupeStatuses)
434+
{
435+
using var timeoutCts = new CancellationTokenSource(TimeSpan.FromSeconds(this.orchestrationCreationRequestTimeoutInSeconds));
436+
await this.CreateTaskOrchestrationAsync(creationMessage, dedupeStatuses, timeoutCts.Token);
437+
}
438+
439+
/// <summary>
440+
/// Creates a new task orchestration instance using the specified creation message and dedupe statuses.
441+
/// </summary>
442+
/// <param name="creationMessage">The creation message for the orchestration.</param>
443+
/// <param name="dedupeStatuses">An array of orchestration statuses used for "dedupping":
444+
/// If an orchestration with the same instance ID already exists, and its status is in this array, then a
445+
/// <see cref="OrchestrationAlreadyExistsException"/> will be thrown.
446+
/// If the array contains all of the running statuses (<see cref="OrchestrationStatus.Pending"/>, <see cref="OrchestrationStatus.Running"/>,
447+
/// and <see cref="OrchestrationStatus.Suspended"/>), then only terminal statuses can be reused.
448+
/// If at least one of these statuses is not included in the array, then if an instance with that status is found, it will first be terminated
449+
/// before a new orchestration is created. This method will wait for the instance to reach a terminal status for a maximum of one hour or
450+
/// until the <paramref name="cancellationToken"/> is invoked, whichever occurs first.</param>
451+
/// <param name="cancellationToken">The cancellation token used to cancel waiting for an existing instance to terminate in the case that a
452+
/// non-terminal instance is found whose runtime status is not included in <paramref name="dedupeStatuses"/>.</param>
453+
/// <returns>A task that completes when the creation message for the task orchestration instance is enqueued.</returns>
454+
/// <exception cref="OrchestrationAlreadyExistsException">Thrown if an orchestration with the same instance ID already exists and its status
455+
/// is in <paramref name="dedupeStatuses"/>.</exception>
456+
/// <exception cref="OperationCanceledException">Thrown if the operation is cancelled via <paramref name="cancellationToken"/>.</exception>
457+
/// <exception cref="ArgumentException">
458+
/// Thrown if <paramref name="dedupeStatuses"/> contains <see cref="OrchestrationStatus.Terminated"/> but also allows at least one running status
459+
/// to be reusable. In this case, an existing orchestration with that running status would be terminated, but the creation of the new orchestration
460+
/// would immediately fail due to the existing orchestration now having status <see cref="OrchestrationStatus.Terminated"/>.
461+
/// </exception>
462+
public async virtual Task CreateTaskOrchestrationAsync(TaskMessage creationMessage, OrchestrationStatus[] dedupeStatuses, CancellationToken cancellationToken)
463+
{
464+
await this.TerminateTaskOrchestrationWithReusableRunningStatusAndWaitAsync(
465+
creationMessage.OrchestrationInstance.InstanceId,
466+
dedupeStatuses,
467+
cancellationToken);
468+
await this.GetOrchestrationServiceClient().CreateTaskOrchestrationAsync(creationMessage, dedupeStatuses);
413469
}
414470

415471
/// <inheritdoc />
@@ -614,10 +670,90 @@ public virtual Task<IAsyncEnumerable<HistoryEvent>> StreamOrchestrationHistoryAs
614670
/// <summary>
615671
/// Attempts to modify the durability service's UseSeparateQueueForEntityWorkItems property.
616672
/// </summary>
617-
/// <param name="newValue">The value to set</param>
673+
/// <param name="newValue">The value to set.</param>
618674
public virtual void SetUseSeparateQueueForEntityWorkItems(bool newValue)
619675
{
620676
throw this.GetNotImplementedException(nameof(this.SetUseSeparateQueueForEntityWorkItems));
621677
}
678+
679+
/// <summary>
680+
/// If an orchestration exists with a status that is not in <paramref name="dedupeStatuses"/> and has a running status (one of
681+
/// <see cref="OrchestrationStatus.Pending"/>, <see cref="OrchestrationStatus.Running"/>, or <see cref="OrchestrationStatus.Suspended"/>),
682+
/// then this method terminates the specified orchestration instance and waits until:
683+
/// - The orchestration's status changes to <see cref="OrchestrationStatus.Terminated"/>,
684+
/// - or the orchestration is deleted,
685+
/// - or the operation is cancelled via the <paramref name="cancellationToken"/>.
686+
/// </summary>
687+
/// <param name="instanceId">The instance ID of the orchestration.</param>
688+
/// <param name="dedupeStatuses">The dedupe statuses of the orchestration.</param>
689+
/// <param name="cancellationToken">The cancellation token.</param>
690+
/// <returns>A task that completes when any of the above conditions are reached.</returns>
691+
/// <exception cref="OperationCanceledException">Thrown if the operation is cancelled via the <paramref name="cancellationToken"/>.</exception>
692+
/// <exception cref="OrchestrationAlreadyExistsException">Thrown if an orchestration already exists with status in <paramref name="dedupeStatuses"/>.</exception>
693+
/// <exception cref="ArgumentException">Thrown if <paramref name="dedupeStatuses"/> contains <see cref="OrchestrationStatus.Terminated"/> but allows
694+
/// at least one running status to be reusable.</exception>
695+
private async Task TerminateTaskOrchestrationWithReusableRunningStatusAndWaitAsync(
696+
string instanceId,
697+
OrchestrationStatus[] dedupeStatuses,
698+
CancellationToken cancellationToken)
699+
{
700+
var runningStatuses = new List<OrchestrationStatus>()
701+
{
702+
OrchestrationStatus.Running,
703+
OrchestrationStatus.Pending,
704+
OrchestrationStatus.Suspended,
705+
};
706+
707+
if (dedupeStatuses != null && runningStatuses.Any(
708+
status => !dedupeStatuses.Contains(status)) && dedupeStatuses.Contains(OrchestrationStatus.Terminated))
709+
{
710+
throw new ArgumentException(
711+
"Invalid dedupe statuses: cannot include 'Terminated' while also allowing reuse of running instances, " +
712+
"because the running instance would be terminated and then immediately conflict with the dedupe check.");
713+
}
714+
715+
bool IsRunning(OrchestrationStatus status) =>
716+
runningStatuses.Contains(status);
717+
718+
// At least one running status is reusable, so determine if an orchestration already exists with this status and terminate it if so
719+
if (dedupeStatuses == null || runningStatuses.Any(status => !dedupeStatuses.Contains(status)))
720+
{
721+
OrchestrationState orchestrationState = await this.GetOrchestrationStateAsync(instanceId, executionId: null);
722+
723+
if (orchestrationState != null)
724+
{
725+
if (dedupeStatuses?.Contains(orchestrationState.OrchestrationStatus) == true)
726+
{
727+
throw new OrchestrationAlreadyExistsException($"An orchestration with instance ID '{instanceId}' and status " +
728+
$"'{orchestrationState.OrchestrationStatus}' already exists");
729+
}
730+
731+
if (IsRunning(orchestrationState.OrchestrationStatus))
732+
{
733+
// Check for cancellation before attempting to terminate the orchestration
734+
cancellationToken.ThrowIfCancellationRequested();
735+
736+
string dedupeStatusesDescription = dedupeStatuses == null
737+
? "null (all statuses reusable)"
738+
: dedupeStatuses.Length == 0
739+
? "[] (all statuses reusable)"
740+
: $"[{string.Join(", ", dedupeStatuses)}]";
741+
742+
string terminationReason = $"A new instance creation request has been issued for instance {instanceId} which " +
743+
$"currently has status {orchestrationState.OrchestrationStatus}. Since the dedupe statuses of the creation request, " +
744+
$"{dedupeStatusesDescription}, do not contain the orchestration's status, the orchestration has been terminated " +
745+
$"and a new instance with the same instance ID will be created.";
746+
747+
await this.ForceTerminateTaskOrchestrationAsync(instanceId, terminationReason);
748+
749+
await this.WaitForOrchestrationAsync(
750+
instanceId,
751+
orchestrationState.OrchestrationInstance.ExecutionId,
752+
TimeSpan.FromHours(1),
753+
cancellationToken);
754+
}
755+
}
756+
}
757+
}
622758
}
623-
}
759+
}

src/WebJobs.Extensions.DurableTask/DurableTaskExtension.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1480,7 +1480,7 @@ Task<HttpResponseMessage> IAsyncConverter<HttpRequestMessage, HttpResponseMessag
14801480
HttpRequestMessage request,
14811481
CancellationToken cancellationToken)
14821482
{
1483-
return this.HttpApiHandler.HandleRequestAsync(request);
1483+
return this.HttpApiHandler.HandleRequestAsync(request, cancellationToken);
14841484
}
14851485

14861486
internal static string ValidatePayloadSize(string payload)

src/WebJobs.Extensions.DurableTask/HttpApiHandler.cs

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
using System.Threading;
1313
using System.Threading.Tasks;
1414
using DurableTask.Core;
15+
using DurableTask.Core.Exceptions;
1516
using DurableTask.Core.History;
1617
using Microsoft.AspNetCore.Routing;
1718
using Microsoft.AspNetCore.Routing.Template;
@@ -285,7 +286,7 @@ internal async Task<HttpResponseMessage> WaitForCompletionOrCreateCheckStatusRes
285286
}
286287
}
287288

288-
public async Task<HttpResponseMessage> HandleRequestAsync(HttpRequestMessage request)
289+
public async Task<HttpResponseMessage> HandleRequestAsync(HttpRequestMessage request, CancellationToken cancellationToken)
289290
{
290291
try
291292
{
@@ -309,7 +310,7 @@ public async Task<HttpResponseMessage> HandleRequestAsync(HttpRequestMessage req
309310
string instanceId = (string)routeValues[InstanceIdRouteParameter];
310311
if (request.Method == HttpMethod.Post)
311312
{
312-
return await this.HandleStartOrchestratorRequestAsync(request, functionName, instanceId);
313+
return await this.HandleStartOrchestratorRequestAsync(request, functionName, instanceId, cancellationToken);
313314
}
314315
else
315316
{
@@ -906,7 +907,8 @@ private async Task<HttpResponseMessage> HandleResumeInstanceRequestAsync(
906907
private async Task<HttpResponseMessage> HandleStartOrchestratorRequestAsync(
907908
HttpRequestMessage request,
908909
string functionName,
909-
string instanceId)
910+
string instanceId,
911+
CancellationToken cancellationToken)
910912
{
911913
try
912914
{
@@ -965,7 +967,8 @@ await durableClient.DurabilityProvider.CreateTaskOrchestrationAsync(
965967
Event = executionStartedEvent,
966968
OrchestrationInstance = instance,
967969
},
968-
this.config.Options.OverridableExistingInstanceStates.ToDedupeStatuses());
970+
this.config.Options.OverridableExistingInstanceStates.ToDedupeStatuses(),
971+
cancellationToken);
969972
}
970973
else
971974
{
@@ -994,6 +997,14 @@ await durableClient.DurabilityProvider.CreateTaskOrchestrationAsync(
994997
{
995998
return request.CreateErrorResponse(HttpStatusCode.BadRequest, "Invalid JSON content", e);
996999
}
1000+
catch (OrchestrationAlreadyExistsException e)
1001+
{
1002+
return request.CreateErrorResponse(HttpStatusCode.Conflict, e.Message);
1003+
}
1004+
catch (ArgumentException e)
1005+
{
1006+
return request.CreateErrorResponse(HttpStatusCode.BadRequest, e.Message);
1007+
}
9971008
}
9981009

9991010
private static string GetHeaderValueFromHeaders(string header, HttpRequestHeaders headers)
@@ -1068,6 +1079,10 @@ private async Task<HttpResponseMessage> HandleRestartInstanceRequestAsync(
10681079
{
10691080
return request.CreateErrorResponse(HttpStatusCode.BadRequest, "InstanceId does not match a valid orchestration instance.", e);
10701081
}
1082+
catch (OrchestrationAlreadyExistsException e)
1083+
{
1084+
return request.CreateErrorResponse(HttpStatusCode.Conflict, "A non-terminal instance with this instance ID already exists.", e);
1085+
}
10711086
catch (JsonReaderException e)
10721087
{
10731088
return request.CreateErrorResponse(HttpStatusCode.BadRequest, "Invalid JSON content", e);

0 commit comments

Comments
 (0)