Skip to content

Commit 3244c10

Browse files
committed
Merge branch 'wangbill/large-payload' of https://github.com/microsoft/durabletask-dotnet into wangbill/large-payload
2 parents c1a87b9 + 6a51f62 commit 3244c10

File tree

12 files changed

+485
-40
lines changed

12 files changed

+485
-40
lines changed

src/Client/Core/DurableTaskClient.cs

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -399,6 +399,47 @@ public virtual Task<PurgeResult> PurgeAllInstancesAsync(
399399
throw new NotSupportedException($"{this.GetType()} does not support purging of orchestration instances.");
400400
}
401401

402+
/// <summary>
403+
/// Restarts an orchestration instance with the same or a new instance ID.
404+
/// </summary>
405+
/// <remarks>
406+
/// <para>
407+
/// This method restarts an existing orchestration instance. If <paramref name="restartWithNewInstanceId"/> is <c>true</c>,
408+
/// a new instance ID will be generated for the restarted orchestration. If <c>false</c>, the original instance ID will be reused.
409+
/// </para><para>
410+
/// The restarted orchestration will use the same input data as the original instance. If the original orchestration
411+
/// instance is not found, an <see cref="ArgumentException"/> will be thrown.
412+
/// </para><para>
413+
/// Note that this operation is backend-specific and may not be supported by all durable task backends.
414+
/// If the backend does not support restart operations, a <see cref="NotSupportedException"/> will be thrown.
415+
/// </para>
416+
/// </remarks>
417+
/// <param name="instanceId">The ID of the orchestration instance to restart.</param>
418+
/// <param name="restartWithNewInstanceId">
419+
/// If <c>true</c>, a new instance ID will be generated for the restarted orchestration.
420+
/// If <c>false</c>, the original instance ID will be reused.
421+
/// </param>
422+
/// <param name="cancellation">
423+
/// The cancellation token. This only cancels enqueueing the restart request to the backend.
424+
/// Does not abort restarting the orchestration once enqueued.
425+
/// </param>
426+
/// <returns>
427+
/// A task that completes when the orchestration instance is successfully restarted.
428+
/// The value of this task is the instance ID of the restarted orchestration instance.
429+
/// </returns>
430+
/// <exception cref="ArgumentException">
431+
/// Thrown if an orchestration with the specified <paramref name="instanceId"/> was not found. </exception>
432+
/// <exception cref="InvalidOperationException">
433+
/// Thrown when attempting to restart an instance using the same instance Id
434+
/// while the instance has not yet reached a completed or terminal state. </exception>
435+
/// <exception cref="NotSupportedException">
436+
/// Thrown if the backend does not support restart operations. </exception>
437+
public virtual Task<string> RestartAsync(
438+
string instanceId,
439+
bool restartWithNewInstanceId = false,
440+
CancellationToken cancellation = default)
441+
=> throw new NotSupportedException($"{this.GetType()} does not support orchestration restart.");
442+
402443
// TODO: Create task hub
403444

404445
// TODO: Delete task hub

src/Client/Grpc/GrpcDurableTaskClient.cs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -414,6 +414,42 @@ public override Task<PurgeResult> PurgeAllInstancesAsync(
414414
return this.PurgeInstancesCoreAsync(request, cancellation);
415415
}
416416

417+
/// <inheritdoc/>
418+
public override async Task<string> RestartAsync(
419+
string instanceId,
420+
bool restartWithNewInstanceId = false,
421+
CancellationToken cancellation = default)
422+
{
423+
Check.NotNullOrEmpty(instanceId);
424+
Check.NotEntity(this.options.EnableEntitySupport, instanceId);
425+
426+
var request = new P.RestartInstanceRequest
427+
{
428+
InstanceId = instanceId,
429+
RestartWithNewInstanceId = restartWithNewInstanceId,
430+
};
431+
432+
try
433+
{
434+
P.RestartInstanceResponse result = await this.sidecarClient.RestartInstanceAsync(
435+
request, cancellationToken: cancellation);
436+
return result.InstanceId;
437+
}
438+
catch (RpcException e) when (e.StatusCode == StatusCode.NotFound)
439+
{
440+
throw new ArgumentException($"An orchestration with the instanceId {instanceId} was not found.", e);
441+
}
442+
catch (RpcException e) when (e.StatusCode == StatusCode.FailedPrecondition)
443+
{
444+
throw new InvalidOperationException($"An orchestration with the instanceId {instanceId} cannot be restarted.", e);
445+
}
446+
catch (RpcException e) when (e.StatusCode == StatusCode.Cancelled)
447+
{
448+
throw new OperationCanceledException(
449+
$"The {nameof(this.RestartAsync)} operation was canceled.", e, cancellation);
450+
}
451+
}
452+
417453
static AsyncDisposable GetCallInvoker(GrpcDurableTaskClientOptions options, out CallInvoker callInvoker)
418454
{
419455
if (options.Channel is GrpcChannel c)

src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,59 @@ public override async Task<OrchestrationMetadata> WaitForInstanceStartAsync(
265265
}
266266
}
267267

268+
/// <inheritdoc/>
269+
public override async Task<string> RestartAsync(
270+
string instanceId,
271+
bool restartWithNewInstanceId = false,
272+
CancellationToken cancellation = default)
273+
{
274+
Check.NotNullOrEmpty(instanceId);
275+
cancellation.ThrowIfCancellationRequested();
276+
277+
// Get the current orchestration status to retrieve the name and input
278+
OrchestrationMetadata? status = await this.GetInstanceAsync(instanceId, getInputsAndOutputs: true, cancellation);
279+
280+
if (status == null)
281+
{
282+
throw new ArgumentException($"An orchestration with the instanceId {instanceId} was not found.");
283+
}
284+
285+
bool isInstaceNotCompleted = status.RuntimeStatus == OrchestrationRuntimeStatus.Running ||
286+
status.RuntimeStatus == OrchestrationRuntimeStatus.Pending ||
287+
status.RuntimeStatus == OrchestrationRuntimeStatus.Suspended;
288+
289+
if (isInstaceNotCompleted && !restartWithNewInstanceId)
290+
{
291+
throw new InvalidOperationException($"Instance '{instanceId}' cannot be restarted while it is in state '{status.RuntimeStatus}'. " +
292+
"Wait until it has completed, or restart with a new instance ID.");
293+
}
294+
295+
// Determine the instance ID for the restarted orchestration
296+
string newInstanceId = restartWithNewInstanceId ? Guid.NewGuid().ToString("N") : instanceId;
297+
298+
OrchestrationInstance instance = new()
299+
{
300+
InstanceId = newInstanceId,
301+
ExecutionId = Guid.NewGuid().ToString("N"),
302+
};
303+
304+
// Use the original serialized input directly to avoid double serialization
305+
// TODO: OrchestrationMetada doesn't have version property so we don't support version here.
306+
// Issue link: https://github.com/microsoft/durabletask-dotnet/issues/463
307+
TaskMessage message = new()
308+
{
309+
OrchestrationInstance = instance,
310+
Event = new ExecutionStartedEvent(-1, status.SerializedInput)
311+
{
312+
Name = status.Name,
313+
OrchestrationInstance = instance,
314+
},
315+
};
316+
317+
await this.Client.CreateTaskOrchestrationAsync(message);
318+
return newInstanceId;
319+
}
320+
268321
[return: NotNullIfNotNull("state")]
269322
OrchestrationMetadata? ToMetadata(Core.OrchestrationState? state, bool getInputsAndOutputs)
270323
{

src/Extensions/AzureBlobPayloads/Converters/BlobPayloadStore.cs

Lines changed: 113 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
using System.Globalization;
55
using System.IO.Compression;
66
using System.Text;
7+
using Azure;
8+
using Azure.Core;
79
using Azure.Storage.Blobs;
810
using Azure.Storage.Blobs.Models;
911

@@ -16,6 +18,10 @@ namespace Microsoft.DurableTask.Converters;
1618
public sealed class BlobPayloadStore : IPayloadStore
1719
{
1820
const string TokenPrefix = "blob:v1:";
21+
22+
// Jitter RNG for retry backoff
23+
static readonly object RandomLock = new object();
24+
static readonly Random SharedRandom = new Random();
1925
readonly BlobContainerClient containerClient;
2026
readonly LargePayloadStorageOptions options;
2127

@@ -32,47 +38,64 @@ public BlobPayloadStore(LargePayloadStorageOptions options)
3238
Check.NotNullOrEmpty(options.ConnectionString, nameof(options.ConnectionString));
3339
Check.NotNullOrEmpty(options.ContainerName, nameof(options.ContainerName));
3440

35-
BlobServiceClient serviceClient = new(options.ConnectionString);
41+
BlobClientOptions clientOptions = new()
42+
{
43+
Retry =
44+
{
45+
Mode = RetryMode.Exponential,
46+
MaxRetries = 8,
47+
Delay = TimeSpan.FromMilliseconds(250),
48+
MaxDelay = TimeSpan.FromSeconds(10),
49+
NetworkTimeout = TimeSpan.FromMinutes(2),
50+
},
51+
};
52+
BlobServiceClient serviceClient = new(options.ConnectionString, clientOptions);
3653
this.containerClient = serviceClient.GetBlobContainerClient(options.ContainerName);
3754
}
3855

3956
/// <inheritdoc/>
4057
public async Task<string> UploadAsync(ReadOnlyMemory<byte> payloadBytes, CancellationToken cancellationToken)
4158
{
42-
// Ensure container exists
43-
await this.containerClient.CreateIfNotExistsAsync(PublicAccessType.None, default, default, cancellationToken);
44-
45-
// One blob per payload using GUID-based name for uniqueness
59+
// One blob per payload using GUID-based name for uniqueness (stable across retries)
4660
string timestamp = DateTimeOffset.UtcNow.ToString("yyyy/MM/dd/HH/mm/ss", CultureInfo.InvariantCulture);
4761
string blobName = $"{timestamp}/{Guid.NewGuid():N}";
4862
BlobClient blob = this.containerClient.GetBlobClient(blobName);
4963

5064
byte[] payloadBuffer = payloadBytes.ToArray();
5165

52-
// Upload streaming, optionally compressing and marking ContentEncoding
53-
if (this.options.CompressPayloads)
66+
string token = await WithTransientRetryAsync(
67+
async ct =>
5468
{
55-
BlobOpenWriteOptions writeOptions = new()
69+
// Ensure container exists (idempotent)
70+
await this.containerClient.CreateIfNotExistsAsync(PublicAccessType.None, default, default, ct);
71+
72+
if (this.options.CompressPayloads)
5673
{
57-
HttpHeaders = new BlobHttpHeaders { ContentEncoding = "gzip" },
58-
};
59-
using Stream blobStream = await blob.OpenWriteAsync(true, writeOptions, cancellationToken);
60-
using GZipStream compressedBlobStream = new(blobStream, CompressionLevel.Optimal, leaveOpen: true);
61-
using MemoryStream payloadStream = new(payloadBuffer, writable: false);
62-
63-
await payloadStream.CopyToAsync(compressedBlobStream, bufferSize: 81920, cancellationToken);
64-
await compressedBlobStream.FlushAsync(cancellationToken);
65-
await blobStream.FlushAsync(cancellationToken);
66-
}
67-
else
68-
{
69-
using Stream blobStream = await blob.OpenWriteAsync(true, default, cancellationToken);
70-
using MemoryStream payloadStream = new(payloadBuffer, writable: false);
71-
await payloadStream.CopyToAsync(blobStream, bufferSize: 81920, cancellationToken);
72-
await blobStream.FlushAsync(cancellationToken);
73-
}
74+
BlobOpenWriteOptions writeOptions = new()
75+
{
76+
HttpHeaders = new BlobHttpHeaders { ContentEncoding = "gzip" },
77+
};
78+
using Stream blobStream = await blob.OpenWriteAsync(true, writeOptions, ct);
79+
using GZipStream compressedBlobStream = new(blobStream, CompressionLevel.Optimal, leaveOpen: true);
80+
using MemoryStream payloadStream = new(payloadBuffer, writable: false);
81+
82+
await payloadStream.CopyToAsync(compressedBlobStream, bufferSize: 81920, ct);
83+
await compressedBlobStream.FlushAsync(ct);
84+
await blobStream.FlushAsync(ct);
85+
}
86+
else
87+
{
88+
using Stream blobStream = await blob.OpenWriteAsync(true, default, ct);
89+
using MemoryStream payloadStream = new(payloadBuffer, writable: false);
90+
await payloadStream.CopyToAsync(blobStream, bufferSize: 81920, ct);
91+
await blobStream.FlushAsync(ct);
92+
}
93+
94+
return EncodeToken(this.containerClient.Name, blobName);
95+
},
96+
cancellationToken);
7497

75-
return EncodeToken(this.containerClient.Name, blobName);
98+
return token;
7699
}
77100

78101
/// <inheritdoc/>
@@ -85,20 +108,26 @@ public async Task<string> DownloadAsync(string token, CancellationToken cancella
85108
}
86109

87110
BlobClient blob = this.containerClient.GetBlobClient(name);
88-
using BlobDownloadStreamingResult result = await blob.DownloadStreamingAsync(cancellationToken: cancellationToken);
89-
Stream contentStream = result.Content;
90-
bool isGzip = string.Equals(
91-
result.Details.ContentEncoding, "gzip", StringComparison.OrdinalIgnoreCase);
92111

93-
if (isGzip)
112+
return await WithTransientRetryAsync(
113+
async ct =>
94114
{
95-
using GZipStream decompressed = new(contentStream, CompressionMode.Decompress);
96-
using StreamReader reader = new(decompressed, Encoding.UTF8);
97-
return await reader.ReadToEndAsync();
98-
}
115+
using BlobDownloadStreamingResult result = await blob.DownloadStreamingAsync(cancellationToken: ct);
116+
Stream contentStream = result.Content;
117+
bool isGzip = string.Equals(
118+
result.Details.ContentEncoding, "gzip", StringComparison.OrdinalIgnoreCase);
99119

100-
using StreamReader uncompressedReader = new(contentStream, Encoding.UTF8);
101-
return await uncompressedReader.ReadToEndAsync();
120+
if (isGzip)
121+
{
122+
using GZipStream decompressed = new(contentStream, CompressionMode.Decompress);
123+
using StreamReader reader = new(decompressed, Encoding.UTF8);
124+
return await reader.ReadToEndAsync();
125+
}
126+
127+
using StreamReader uncompressedReader = new(contentStream, Encoding.UTF8);
128+
return await uncompressedReader.ReadToEndAsync();
129+
},
130+
cancellationToken);
102131
}
103132

104133
/// <inheritdoc/>
@@ -130,4 +159,51 @@ public bool IsKnownPayloadToken(string value)
130159

131160
return (rest.Substring(0, sep), rest.Substring(sep + 1));
132161
}
162+
163+
static async Task<T> WithTransientRetryAsync<T>(Func<CancellationToken, Task<T>> operation, CancellationToken cancellationToken)
164+
{
165+
const int maxAttempts = 8;
166+
TimeSpan baseDelay = TimeSpan.FromMilliseconds(250);
167+
int attempt = 0;
168+
169+
while (true)
170+
{
171+
cancellationToken.ThrowIfCancellationRequested();
172+
try
173+
{
174+
return await operation(cancellationToken);
175+
}
176+
catch (RequestFailedException ex) when (IsTransient(ex) && attempt < maxAttempts - 1)
177+
{
178+
attempt++;
179+
TimeSpan delay = ComputeBackoff(baseDelay, attempt);
180+
await Task.Delay(delay, cancellationToken);
181+
}
182+
catch (IOException) when (attempt < maxAttempts - 1)
183+
{
184+
attempt++;
185+
TimeSpan delay = ComputeBackoff(baseDelay, attempt);
186+
await Task.Delay(delay, cancellationToken);
187+
}
188+
}
189+
}
190+
191+
static bool IsTransient(RequestFailedException ex)
192+
{
193+
return ex.Status == 503 || ex.Status == 502 || ex.Status == 500 || ex.Status == 429 ||
194+
string.Equals(ex.ErrorCode, "ServerBusy", StringComparison.OrdinalIgnoreCase) ||
195+
string.Equals(ex.ErrorCode, "OperationTimedOut", StringComparison.OrdinalIgnoreCase);
196+
}
197+
198+
static TimeSpan ComputeBackoff(TimeSpan baseDelay, int attempt)
199+
{
200+
double factor = Math.Pow(2, Math.Min(attempt, 6));
201+
int jitterMs;
202+
lock (RandomLock)
203+
{
204+
jitterMs = SharedRandom.Next(0, 100);
205+
}
206+
return TimeSpan.FromMilliseconds(Math.Min((baseDelay.TotalMilliseconds * factor) + jitterMs, 10_000));
207+
}
208+
133209
}

src/Grpc/orchestrator_service.proto

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -482,6 +482,15 @@ message PurgeInstancesResponse {
482482
google.protobuf.BoolValue isComplete = 2;
483483
}
484484

485+
message RestartInstanceRequest {
486+
string instanceId = 1;
487+
bool restartWithNewInstanceId = 2;
488+
}
489+
490+
message RestartInstanceResponse {
491+
string instanceId = 1;
492+
}
493+
485494
message CreateTaskHubRequest {
486495
bool recreateIfExists = 1;
487496
}
@@ -682,6 +691,9 @@ service TaskHubSidecarService {
682691
// Rewinds an orchestration instance to last known good state and replays from there.
683692
rpc RewindInstance(RewindInstanceRequest) returns (RewindInstanceResponse);
684693

694+
// Restarts an orchestration instance.
695+
rpc RestartInstance(RestartInstanceRequest) returns (RestartInstanceResponse);
696+
685697
// Waits for an orchestration instance to reach a running or completion state.
686698
rpc WaitForInstanceStart(GetInstanceRequest) returns (GetInstanceResponse);
687699

src/Grpc/versions.txt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
# The following files were downloaded from branch main at 2025-08-08 16:46:11 UTC
2-
https://raw.githubusercontent.com/microsoft/durabletask-protobuf/e88acbd07ae38b499dbe8c4e333e9e3feeb2a9cc/protos/orchestrator_service.proto
1+
# The following files were downloaded from branch main at 2025-09-10 22:50:45 UTC
2+
https://raw.githubusercontent.com/microsoft/durabletask-protobuf/985035a0890575ae18be0eb2a3ac93c10824498a/protos/orchestrator_service.proto

test/Client/Core.Tests/DependencyInjection/DefaultDurableTaskClientBuilderTests.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,14 @@ public override Task<OrchestrationMetadata> WaitForInstanceStartAsync(
144144
{
145145
throw new NotImplementedException();
146146
}
147+
148+
public override Task<string> RestartAsync(
149+
string instanceId,
150+
bool restartWithNewInstanceId = false,
151+
CancellationToken cancellation = default)
152+
{
153+
throw new NotImplementedException();
154+
}
147155
}
148156

149157
class CustomDataConverter : DataConverter

0 commit comments

Comments
 (0)