Skip to content

Commit db91bf6

Browse files
committed
retry on blob upload/download
1 parent 5ebe0b0 commit db91bf6

File tree

1 file changed

+113
-37
lines changed

1 file changed

+113
-37
lines changed

src/Extensions/AzureBlobPayloads/Converters/BlobPayloadStore.cs

Lines changed: 113 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
using System.Globalization;
55
using System.IO.Compression;
66
using System.Text;
7+
using Azure;
8+
using Azure.Core;
79
using Azure.Storage.Blobs;
810
using Azure.Storage.Blobs.Models;
911

@@ -16,6 +18,10 @@ namespace Microsoft.DurableTask.Converters;
1618
public sealed class BlobPayloadStore : IPayloadStore
1719
{
1820
const string TokenPrefix = "blob:v1:";
21+
22+
// Jitter RNG for retry backoff
23+
static readonly object RandomLock = new object();
24+
static readonly Random SharedRandom = new Random();
1925
readonly BlobContainerClient containerClient;
2026
readonly LargePayloadStorageOptions options;
2127

@@ -32,47 +38,64 @@ public BlobPayloadStore(LargePayloadStorageOptions options)
3238
Check.NotNullOrEmpty(options.ConnectionString, nameof(options.ConnectionString));
3339
Check.NotNullOrEmpty(options.ContainerName, nameof(options.ContainerName));
3440

35-
BlobServiceClient serviceClient = new(options.ConnectionString);
41+
BlobClientOptions clientOptions = new()
42+
{
43+
Retry =
44+
{
45+
Mode = RetryMode.Exponential,
46+
MaxRetries = 8,
47+
Delay = TimeSpan.FromMilliseconds(250),
48+
MaxDelay = TimeSpan.FromSeconds(10),
49+
NetworkTimeout = TimeSpan.FromMinutes(2),
50+
},
51+
};
52+
BlobServiceClient serviceClient = new(options.ConnectionString, clientOptions);
3653
this.containerClient = serviceClient.GetBlobContainerClient(options.ContainerName);
3754
}
3855

3956
/// <inheritdoc/>
4057
public async Task<string> UploadAsync(ReadOnlyMemory<byte> payloadBytes, CancellationToken cancellationToken)
4158
{
42-
// Ensure container exists
43-
await this.containerClient.CreateIfNotExistsAsync(PublicAccessType.None, default, default, cancellationToken);
44-
45-
// One blob per payload using GUID-based name for uniqueness
59+
// One blob per payload using GUID-based name for uniqueness (stable across retries)
4660
string timestamp = DateTimeOffset.UtcNow.ToString("yyyy/MM/dd/HH/mm/ss", CultureInfo.InvariantCulture);
4761
string blobName = $"{timestamp}/{Guid.NewGuid():N}";
4862
BlobClient blob = this.containerClient.GetBlobClient(blobName);
4963

5064
byte[] payloadBuffer = payloadBytes.ToArray();
5165

52-
// Upload streaming, optionally compressing and marking ContentEncoding
53-
if (this.options.CompressPayloads)
66+
string token = await WithTransientRetryAsync(
67+
async ct =>
5468
{
55-
BlobOpenWriteOptions writeOptions = new()
69+
// Ensure container exists (idempotent)
70+
await this.containerClient.CreateIfNotExistsAsync(PublicAccessType.None, default, default, ct);
71+
72+
if (this.options.CompressPayloads)
5673
{
57-
HttpHeaders = new BlobHttpHeaders { ContentEncoding = "gzip" },
58-
};
59-
using Stream blobStream = await blob.OpenWriteAsync(true, writeOptions, cancellationToken);
60-
using GZipStream compressedBlobStream = new(blobStream, CompressionLevel.Optimal, leaveOpen: true);
61-
using MemoryStream payloadStream = new(payloadBuffer, writable: false);
62-
63-
await payloadStream.CopyToAsync(compressedBlobStream, bufferSize: 81920, cancellationToken);
64-
await compressedBlobStream.FlushAsync(cancellationToken);
65-
await blobStream.FlushAsync(cancellationToken);
66-
}
67-
else
68-
{
69-
using Stream blobStream = await blob.OpenWriteAsync(true, default, cancellationToken);
70-
using MemoryStream payloadStream = new(payloadBuffer, writable: false);
71-
await payloadStream.CopyToAsync(blobStream, bufferSize: 81920, cancellationToken);
72-
await blobStream.FlushAsync(cancellationToken);
73-
}
74+
BlobOpenWriteOptions writeOptions = new()
75+
{
76+
HttpHeaders = new BlobHttpHeaders { ContentEncoding = "gzip" },
77+
};
78+
using Stream blobStream = await blob.OpenWriteAsync(true, writeOptions, ct);
79+
using GZipStream compressedBlobStream = new(blobStream, CompressionLevel.Optimal, leaveOpen: true);
80+
using MemoryStream payloadStream = new(payloadBuffer, writable: false);
81+
82+
await payloadStream.CopyToAsync(compressedBlobStream, bufferSize: 81920, ct);
83+
await compressedBlobStream.FlushAsync(ct);
84+
await blobStream.FlushAsync(ct);
85+
}
86+
else
87+
{
88+
using Stream blobStream = await blob.OpenWriteAsync(true, default, ct);
89+
using MemoryStream payloadStream = new(payloadBuffer, writable: false);
90+
await payloadStream.CopyToAsync(blobStream, bufferSize: 81920, ct);
91+
await blobStream.FlushAsync(ct);
92+
}
93+
94+
return EncodeToken(this.containerClient.Name, blobName);
95+
},
96+
cancellationToken);
7497

75-
return EncodeToken(this.containerClient.Name, blobName);
98+
return token;
7699
}
77100

78101
/// <inheritdoc/>
@@ -85,20 +108,26 @@ public async Task<string> DownloadAsync(string token, CancellationToken cancella
85108
}
86109

87110
BlobClient blob = this.containerClient.GetBlobClient(name);
88-
using BlobDownloadStreamingResult result = await blob.DownloadStreamingAsync(cancellationToken: cancellationToken);
89-
Stream contentStream = result.Content;
90-
bool isGzip = string.Equals(
91-
result.Details.ContentEncoding, "gzip", StringComparison.OrdinalIgnoreCase);
92111

93-
if (isGzip)
112+
return await WithTransientRetryAsync(
113+
async ct =>
94114
{
95-
using GZipStream decompressed = new(contentStream, CompressionMode.Decompress);
96-
using StreamReader reader = new(decompressed, Encoding.UTF8);
97-
return await reader.ReadToEndAsync();
98-
}
115+
using BlobDownloadStreamingResult result = await blob.DownloadStreamingAsync(cancellationToken: ct);
116+
Stream contentStream = result.Content;
117+
bool isGzip = string.Equals(
118+
result.Details.ContentEncoding, "gzip", StringComparison.OrdinalIgnoreCase);
99119

100-
using StreamReader uncompressedReader = new(contentStream, Encoding.UTF8);
101-
return await uncompressedReader.ReadToEndAsync();
120+
if (isGzip)
121+
{
122+
using GZipStream decompressed = new(contentStream, CompressionMode.Decompress);
123+
using StreamReader reader = new(decompressed, Encoding.UTF8);
124+
return await reader.ReadToEndAsync();
125+
}
126+
127+
using StreamReader uncompressedReader = new(contentStream, Encoding.UTF8);
128+
return await uncompressedReader.ReadToEndAsync();
129+
},
130+
cancellationToken);
102131
}
103132

104133
/// <inheritdoc/>
@@ -130,4 +159,51 @@ public bool IsKnownPayloadToken(string value)
130159

131160
return (rest.Substring(0, sep), rest.Substring(sep + 1));
132161
}
162+
163+
static async Task<T> WithTransientRetryAsync<T>(Func<CancellationToken, Task<T>> operation, CancellationToken cancellationToken)
164+
{
165+
const int maxAttempts = 8;
166+
TimeSpan baseDelay = TimeSpan.FromMilliseconds(250);
167+
int attempt = 0;
168+
169+
while (true)
170+
{
171+
cancellationToken.ThrowIfCancellationRequested();
172+
try
173+
{
174+
return await operation(cancellationToken);
175+
}
176+
catch (RequestFailedException ex) when (IsTransient(ex) && attempt < maxAttempts - 1)
177+
{
178+
attempt++;
179+
TimeSpan delay = ComputeBackoff(baseDelay, attempt);
180+
await Task.Delay(delay, cancellationToken);
181+
}
182+
catch (IOException) when (attempt < maxAttempts - 1)
183+
{
184+
attempt++;
185+
TimeSpan delay = ComputeBackoff(baseDelay, attempt);
186+
await Task.Delay(delay, cancellationToken);
187+
}
188+
}
189+
}
190+
191+
static bool IsTransient(RequestFailedException ex)
192+
{
193+
return ex.Status == 503 || ex.Status == 502 || ex.Status == 500 || ex.Status == 429 ||
194+
string.Equals(ex.ErrorCode, "ServerBusy", StringComparison.OrdinalIgnoreCase) ||
195+
string.Equals(ex.ErrorCode, "OperationTimedOut", StringComparison.OrdinalIgnoreCase);
196+
}
197+
198+
static TimeSpan ComputeBackoff(TimeSpan baseDelay, int attempt)
199+
{
200+
double factor = Math.Pow(2, Math.Min(attempt, 6));
201+
int jitterMs;
202+
lock (RandomLock)
203+
{
204+
jitterMs = SharedRandom.Next(0, 100);
205+
}
206+
return TimeSpan.FromMilliseconds(Math.Min((baseDelay.TotalMilliseconds * factor) + jitterMs, 10_000));
207+
}
208+
133209
}

0 commit comments

Comments
 (0)