Skip to content

Commit a784c1f

Browse files
CopilotYunchuWang
andcommitted
Resolve merge conflict with main branch
- Rebased on latest main (f3a7fca) - Added dedupe status tests from main alongside suspend/resume tests - All 15 tests now present (5 dedupe + 5 suspend/resume + 5 original) - Resolved import conflicts (added Grpc.Core using statements) - All builds passing Co-authored-by: YunchuWang <[email protected]>
1 parent 5d1414b commit a784c1f

File tree

16 files changed

+1328
-47
lines changed

16 files changed

+1328
-47
lines changed

src/Abstractions/TaskOptions.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,4 +134,12 @@ public record StartOrchestrationOptions(string? InstanceId = null, DateTimeOffse
134134
/// Gets the version to associate with the orchestration instance.
135135
/// </summary>
136136
public TaskVersion? Version { get; init; }
137+
138+
/// <summary>
139+
/// Gets the orchestration runtime statuses that should be considered for deduplication.
140+
/// </summary>
141+
/// <remarks>
142+
/// For type-safe usage, use the WithDedupeStatuses extension method.
143+
/// </remarks>
144+
public IReadOnlyList<string>? DedupeStatuses { get; init; }
137145
}

src/Analyzers/AnalyzerReleases.Shipped.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,15 @@
11
; Shipped analyzer releases
22
; https://github.com/dotnet/roslyn/blob/main/src/RoslynAnalyzers/Microsoft.CodeAnalysis.Analyzers/ReleaseTrackingAnalyzers.Help.md
33

4+
## Release 0.2.0
5+
6+
### New Rules
7+
8+
Rule ID | Category | Severity | Notes
9+
--------|----------|----------|-------
10+
DURABLE2003 | Activity | Warning | **FunctionNotFoundAnalyzer**: Warns when an activity function call references a name that does not match any defined activity in the compilation.
11+
DURABLE2004 | Orchestration | Warning | **FunctionNotFoundAnalyzer**: Warns when a sub-orchestration call references a name that does not match any defined orchestrator in the compilation.
12+
413
## Release 0.1.0
514

615
### New Rules

src/Analyzers/AnalyzerReleases.Unshipped.md

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,4 @@
44
### New Rules
55

66
Rule ID | Category | Severity | Notes
7-
--------|----------|----------|-------
8-
DURABLE2003 | Activity | Warning | **FunctionNotFoundAnalyzer**: Warns when an activity function call references a name that does not match any defined activity in the compilation.
9-
DURABLE2004 | Orchestration | Warning | **FunctionNotFoundAnalyzer**: Warns when a sub-orchestration call references a name that does not match any defined orchestrator in the compilation.
7+
--------|----------|----------|-------

src/Analyzers/Analyzers.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
</PropertyGroup>
1212

1313
<PropertyGroup>
14-
<VersionPrefix>0.1.0</VersionPrefix>
14+
<VersionPrefix>0.2.0</VersionPrefix>
1515
<VersionSuffix></VersionSuffix>
1616
<PackageDescription>.NET Analyzers for the Durable Task SDK.</PackageDescription>
1717
<NeutralLanguage>en</NeutralLanguage>
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
// Copyright (c) Microsoft Corporation.
2+
// Licensed under the MIT License.
3+
4+
using System.Linq;
5+
6+
namespace Microsoft.DurableTask.Client;
7+
8+
/// <summary>
9+
/// Extension methods for <see cref="StartOrchestrationOptions"/> to provide type-safe deduplication status configuration.
10+
/// </summary>
11+
public static class StartOrchestrationOptionsExtensions
12+
{
13+
public static readonly OrchestrationRuntimeStatus[] ValidDedupeStatuses = new[]
14+
{
15+
OrchestrationRuntimeStatus.Completed,
16+
OrchestrationRuntimeStatus.Failed,
17+
OrchestrationRuntimeStatus.Terminated,
18+
OrchestrationRuntimeStatus.Canceled,
19+
};
20+
21+
/// <summary>
22+
/// Creates a new <see cref="StartOrchestrationOptions"/> with the specified deduplication statuses.
23+
/// </summary>
24+
/// <param name="options">The base options to extend.</param>
25+
/// <param name="dedupeStatuses">The orchestration runtime statuses that should be considered for deduplication.</param>
26+
/// <returns>A new <see cref="StartOrchestrationOptions"/> instance with the deduplication statuses set.</returns>
27+
public static StartOrchestrationOptions WithDedupeStatuses(
28+
this StartOrchestrationOptions options,
29+
params OrchestrationRuntimeStatus[] dedupeStatuses)
30+
{
31+
return options with
32+
{
33+
DedupeStatuses = dedupeStatuses.Select(s => s.ToString()).ToList(),
34+
};
35+
}
36+
}

src/Client/Grpc/GrpcDurableTaskClient.cs

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// Copyright (c) Microsoft Corporation.
22
// Licensed under the MIT License.
33

4+
using System.Collections.Immutable;
45
using System.Diagnostics;
56
using System.Text;
67
using DurableTask.Core.History;
@@ -91,7 +92,7 @@ public override async Task<string> ScheduleNewOrchestrationInstanceAsync(
9192
version = this.options.DefaultVersion;
9293
}
9394

94-
var request = new P.CreateInstanceRequest
95+
P.CreateInstanceRequest request = new()
9596
{
9697
Name = orchestratorName.Name,
9798
Version = version,
@@ -122,6 +123,34 @@ public override async Task<string> ScheduleNewOrchestrationInstanceAsync(
122123
request.ScheduledStartTimestamp = Timestamp.FromDateTimeOffset(startAt.Value.ToUniversalTime());
123124
}
124125

126+
// Set orchestration ID reuse policy for deduplication support
127+
// Note: This requires the protobuf to support OrchestrationIdReusePolicy field
128+
// If the protobuf doesn't support it yet, this will need to be updated when the protobuf is updated
129+
if (options?.DedupeStatuses != null && options.DedupeStatuses.Count > 0)
130+
{
131+
// Parse and validate all status strings to enum first
132+
ImmutableHashSet<OrchestrationRuntimeStatus> dedupeStatuses = options.DedupeStatuses
133+
.Select(s =>
134+
{
135+
if (!System.Enum.TryParse<OrchestrationRuntimeStatus>(s, ignoreCase: true, out OrchestrationRuntimeStatus status))
136+
{
137+
throw new ArgumentException(
138+
$"Invalid orchestration runtime status: '{s}' for deduplication.");
139+
}
140+
141+
return status;
142+
}).ToImmutableHashSet();
143+
144+
// Convert dedupe statuses to protobuf statuses and create reuse policy
145+
IEnumerable<P.OrchestrationStatus> dedupeStatusesProto = dedupeStatuses.Select(s => s.ToGrpcStatus());
146+
P.OrchestrationIdReusePolicy? policy = ProtoUtils.ConvertDedupeStatusesToReusePolicy(dedupeStatusesProto);
147+
148+
if (policy != null)
149+
{
150+
request.OrchestrationIdReusePolicy = policy;
151+
}
152+
}
153+
125154
using Activity? newActivity = TraceHelper.StartActivityForNewOrchestration(request);
126155

127156
P.CreateInstanceResponse? result = await this.sidecarClient.StartInstanceAsync(
@@ -405,7 +434,7 @@ public override async Task<string> RestartAsync(
405434
Check.NotNullOrEmpty(instanceId);
406435
Check.NotEntity(this.options.EnableEntitySupport, instanceId);
407436

408-
var request = new P.RestartInstanceRequest
437+
P.RestartInstanceRequest request = new P.RestartInstanceRequest
409438
{
410439
InstanceId = instanceId,
411440
RestartWithNewInstanceId = restartWithNewInstanceId,
@@ -441,7 +470,7 @@ public override async Task RewindInstanceAsync(
441470
Check.NotNullOrEmpty(instanceId);
442471
Check.NotEntity(this.options.EnableEntitySupport, instanceId);
443472

444-
var request = new P.RewindInstanceRequest
473+
P.RewindInstanceRequest request = new P.RewindInstanceRequest
445474
{
446475
InstanceId = instanceId,
447476
Reason = reason,
@@ -573,7 +602,7 @@ async Task<PurgeResult> PurgeInstancesCoreAsync(
573602

574603
OrchestrationMetadata CreateMetadata(P.OrchestrationState state, bool includeInputsAndOutputs)
575604
{
576-
var metadata = new OrchestrationMetadata(state.Name, state.InstanceId)
605+
OrchestrationMetadata metadata = new OrchestrationMetadata(state.Name, state.InstanceId)
577606
{
578607
CreatedAt = state.CreatedTimestamp.ToDateTimeOffset(),
579608
LastUpdatedAt = state.LastUpdatedTimestamp.ToDateTimeOffset(),

src/Client/Grpc/ProtoUtils.cs

Lines changed: 79 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,93 @@
11
// Copyright (c) Microsoft Corporation.
22
// Licensed under the MIT License.
33

4+
using System.Collections.Immutable;
5+
using System.Linq;
46
using P = Microsoft.DurableTask.Protobuf;
57

68
namespace Microsoft.DurableTask.Client.Grpc;
79

810
/// <summary>
911
/// Protobuf helpers and utilities.
1012
/// </summary>
11-
static class ProtoUtils
13+
public static class ProtoUtils
1214
{
15+
/// <summary>
16+
/// Gets the terminal orchestration statuses that are commonly used for deduplication.
17+
/// These are the statuses that can be used in OrchestrationIdReusePolicy.
18+
/// </summary>
19+
/// <returns>An immutable array of terminal orchestration statuses.</returns>
20+
public static ImmutableArray<P.OrchestrationStatus> GetTerminalStatuses()
21+
{
22+
#pragma warning disable CS0618 // Type or member is obsolete - Canceled is intentionally included for compatibility
23+
return ImmutableArray.Create(
24+
P.OrchestrationStatus.Completed,
25+
P.OrchestrationStatus.Failed,
26+
P.OrchestrationStatus.Terminated,
27+
P.OrchestrationStatus.Canceled);
28+
#pragma warning restore CS0618
29+
}
30+
31+
/// <summary>
32+
/// Converts dedupe statuses (statuses that should NOT be replaced) to an OrchestrationIdReusePolicy
33+
/// with replaceable statuses (statuses that CAN be replaced).
34+
/// </summary>
35+
/// <param name="dedupeStatuses">The orchestration statuses that should NOT be replaced. These are statuses for which an exception should be thrown if an orchestration already exists.</param>
36+
/// <returns>An OrchestrationIdReusePolicy with replaceable statuses set, or null if all terminal statuses are dedupe statuses.</returns>
37+
/// <remarks>
38+
/// The policy uses "replaceableStatus" - these are statuses that CAN be replaced.
39+
/// dedupeStatuses are statuses that should NOT be replaced.
40+
/// So replaceableStatus = all terminal statuses MINUS dedupeStatuses.
41+
/// </remarks>
42+
public static P.OrchestrationIdReusePolicy? ConvertDedupeStatusesToReusePolicy(
43+
IEnumerable<P.OrchestrationStatus>? dedupeStatuses)
44+
{
45+
ImmutableArray<P.OrchestrationStatus> terminalStatuses = GetTerminalStatuses();
46+
ImmutableHashSet<P.OrchestrationStatus> dedupeStatusSet = dedupeStatuses?.ToImmutableHashSet() ?? ImmutableHashSet<P.OrchestrationStatus>.Empty;
47+
48+
P.OrchestrationIdReusePolicy policy = new();
49+
50+
// Add terminal statuses that are NOT in dedupeStatuses as replaceable
51+
foreach (P.OrchestrationStatus terminalStatus in terminalStatuses.Where(status => !dedupeStatusSet.Contains(status)))
52+
{
53+
policy.ReplaceableStatus.Add(terminalStatus);
54+
}
55+
56+
// Only return policy if we have replaceable statuses
57+
return policy.ReplaceableStatus.Count > 0 ? policy : null;
58+
}
59+
60+
/// <summary>
61+
/// Converts an OrchestrationIdReusePolicy with replaceable statuses to dedupe statuses
62+
/// (statuses that should NOT be replaced).
63+
/// </summary>
64+
/// <param name="policy">The OrchestrationIdReusePolicy containing replaceable statuses.</param>
65+
/// <returns>An array of orchestration statuses that should NOT be replaced, or null if all terminal statuses are replaceable.</returns>
66+
/// <remarks>
67+
/// The policy uses "replaceableStatus" - these are statuses that CAN be replaced.
68+
/// dedupeStatuses are statuses that should NOT be replaced (should throw exception).
69+
/// So dedupeStatuses = all terminal statuses MINUS replaceableStatus.
70+
/// </remarks>
71+
public static P.OrchestrationStatus[]? ConvertReusePolicyToDedupeStatuses(
72+
P.OrchestrationIdReusePolicy? policy)
73+
{
74+
if (policy == null || policy.ReplaceableStatus.Count == 0)
75+
{
76+
return null;
77+
}
78+
79+
ImmutableArray<P.OrchestrationStatus> terminalStatuses = GetTerminalStatuses();
80+
ImmutableHashSet<P.OrchestrationStatus> replaceableStatusSet = policy.ReplaceableStatus.ToImmutableHashSet();
81+
82+
// Calculate dedupe statuses = terminal statuses - replaceable statuses
83+
P.OrchestrationStatus[] dedupeStatuses = terminalStatuses
84+
.Where(terminalStatus => !replaceableStatusSet.Contains(terminalStatus))
85+
.ToArray();
86+
87+
// Only return if there are dedupe statuses
88+
return dedupeStatuses.Length > 0 ? dedupeStatuses : null;
89+
}
90+
1391
#pragma warning disable 0618 // Referencing Obsolete member. This is intention as we are only converting it.
1492
/// <summary>
1593
/// Converts <see cref="OrchestrationRuntimeStatus" /> to <see cref="P.OrchestrationStatus" />.

src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
using DurableTask.Core;
88
using DurableTask.Core.History;
99
using DurableTask.Core.Query;
10+
using Microsoft.DurableTask.Client;
1011
using Microsoft.DurableTask.Client.Entities;
1112
using Microsoft.Extensions.DependencyInjection;
1213
using Microsoft.Extensions.Options;
@@ -192,7 +193,23 @@ public override async Task<string> ScheduleNewOrchestrationInstanceAsync(
192193
},
193194
};
194195

195-
await this.Client.CreateTaskOrchestrationAsync(message);
196+
Core.OrchestrationStatus[]? dedupeStatuses = null;
197+
if (options?.DedupeStatuses != null && options.DedupeStatuses.Count > 0)
198+
{
199+
dedupeStatuses = options.DedupeStatuses
200+
.Select(s =>
201+
{
202+
if (!Enum.TryParse<OrchestrationRuntimeStatus>(s, ignoreCase: true, out var status))
203+
{
204+
throw new ArgumentException(
205+
$"Invalid orchestration runtime status: '{s}' for deduplication.");
206+
}
207+
return status.ConvertToCore();
208+
})
209+
.ToArray();
210+
}
211+
212+
await this.Client.CreateTaskOrchestrationAsync(message, dedupeStatuses);
196213
return instanceId;
197214
}
198215

@@ -303,7 +320,7 @@ public override async Task<string> RestartAsync(
303320
},
304321
};
305322

306-
await this.Client.CreateTaskOrchestrationAsync(message);
323+
await this.Client.CreateTaskOrchestrationAsync(message, dedupeStatuses: null);
307324
return newInstanceId;
308325
}
309326

src/Extensions/AzureBlobPayloads/PayloadStore/BlobPayloadStore.cs

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@
22
// Licensed under the MIT License.
33

44
using System.IO.Compression;
5+
using System.Net;
56
using System.Text;
7+
using Azure;
68
using Azure.Core;
79
using Azure.Storage.Blobs;
810
using Azure.Storage.Blobs.Models;
@@ -117,20 +119,30 @@ public override async Task<string> DownloadAsync(string token, CancellationToken
117119

118120
BlobClient blob = this.containerClient.GetBlobClient(name);
119121

120-
using BlobDownloadStreamingResult result = await blob.DownloadStreamingAsync(cancellationToken: cancellationToken);
121-
Stream contentStream = result.Content;
122-
bool isGzip = string.Equals(
123-
result.Details.ContentEncoding, ContentEncodingGzip, StringComparison.OrdinalIgnoreCase);
122+
try
123+
{
124+
using BlobDownloadStreamingResult result = await blob.DownloadStreamingAsync(cancellationToken: cancellationToken);
125+
Stream contentStream = result.Content;
126+
bool isGzip = string.Equals(
127+
result.Details.ContentEncoding, ContentEncodingGzip, StringComparison.OrdinalIgnoreCase);
128+
129+
if (isGzip)
130+
{
131+
using GZipStream decompressed = new(contentStream, CompressionMode.Decompress);
132+
using StreamReader reader = new(decompressed, Encoding.UTF8);
133+
return await ReadToEndAsync(reader, cancellationToken);
134+
}
124135

125-
if (isGzip)
136+
using StreamReader uncompressedReader = new(contentStream, Encoding.UTF8);
137+
return await ReadToEndAsync(uncompressedReader, cancellationToken);
138+
}
139+
catch (RequestFailedException ex) when (ex.Status == (int)HttpStatusCode.NotFound)
126140
{
127-
using GZipStream decompressed = new(contentStream, CompressionMode.Decompress);
128-
using StreamReader reader = new(decompressed, Encoding.UTF8);
129-
return await ReadToEndAsync(reader, cancellationToken);
141+
throw new InvalidOperationException(
142+
$"The blob '{name}' was not found in container '{container}'. " +
143+
"The payload may have been deleted or the container was never created.",
144+
ex);
130145
}
131-
132-
using StreamReader uncompressedReader = new(contentStream, Encoding.UTF8);
133-
return await ReadToEndAsync(uncompressedReader, cancellationToken);
134146
}
135147

136148
/// <inheritdoc/>

0 commit comments

Comments
 (0)