@@ -16,20 +16,15 @@ namespace Microsoft.DurableTask;
1616/// Azure Blob Storage implementation of <see cref="IPayloadStore"/>.
1717/// Stores payloads as blobs and returns opaque tokens in the form "blob:v1:<container>:<blobName>".
1818/// </summary>
19- internal sealed class BlobPayloadStore : IPayloadStore
19+ sealed class BlobPayloadStore : IPayloadStore
2020{
2121 const string TokenPrefix = "blob:v1:" ;
2222 const string ContentEncodingGzip = "gzip" ;
2323 const int DefaultCopyBufferSize = 81920 ;
2424 const int MaxRetryAttempts = 8 ;
2525 const int BaseDelayMs = 250 ;
2626 const int MaxDelayMs = 10_000 ;
27- const int MaxJitterMs = 100 ;
2827 const int NetworkTimeoutMinutes = 2 ;
29-
30- // Jitter RNG for retry backoff
31- static readonly object RandomLock = new object ( ) ;
32- static readonly Random SharedRandom = new Random ( ) ;
3328 readonly BlobContainerClient containerClient ;
3429 readonly LargePayloadStorageOptions options ;
3530
@@ -65,45 +60,37 @@ public BlobPayloadStore(LargePayloadStorageOptions options)
6560 public override async Task < string > UploadAsync ( ReadOnlyMemory < byte > payloadBytes , CancellationToken cancellationToken )
6661 {
6762 // One blob per payload using GUID-based name for uniqueness (stable across retries)
68- string timestamp = DateTimeOffset . UtcNow . ToString ( "yyyy/MM/dd/HH/mm/ss" , CultureInfo . InvariantCulture ) ;
69- string blobName = $ "{ timestamp } /{ Guid . NewGuid ( ) : N} ";
63+ string blobName = $ "{ Guid . NewGuid ( ) : N} ";
7064 BlobClient blob = this . containerClient . GetBlobClient ( blobName ) ;
7165
7266 byte [ ] payloadBuffer = payloadBytes . ToArray ( ) ;
7367
74- string token = await WithTransientRetryAsync (
75- async ct =>
76- {
77- // Ensure container exists (idempotent)
78- await this . containerClient . CreateIfNotExistsAsync ( PublicAccessType . None , default , default , ct ) ;
68+ // Ensure container exists (idempotent)
69+ await this . containerClient . CreateIfNotExistsAsync ( PublicAccessType . None , default , default , cancellationToken ) ;
7970
80- if ( this . options . CompressPayloads )
81- {
82- BlobOpenWriteOptions writeOptions = new ( )
83- {
84- HttpHeaders = new BlobHttpHeaders { ContentEncoding = ContentEncodingGzip } ,
85- } ;
86- using Stream blobStream = await blob . OpenWriteAsync ( true , writeOptions , ct ) ;
87- using GZipStream compressedBlobStream = new ( blobStream , System . IO . Compression . CompressionLevel . Optimal , leaveOpen : true ) ;
88- using MemoryStream payloadStream = new ( payloadBuffer , writable : false ) ;
89-
90- await payloadStream . CopyToAsync ( compressedBlobStream , bufferSize : DefaultCopyBufferSize , ct ) ;
91- await compressedBlobStream . FlushAsync ( ct ) ;
92- await blobStream . FlushAsync ( ct ) ;
93- }
94- else
71+ if ( this . options . CompressPayloads )
72+ {
73+ BlobOpenWriteOptions writeOptions = new ( )
9574 {
96- using Stream blobStream = await blob . OpenWriteAsync ( true , default , ct ) ;
97- using MemoryStream payloadStream = new ( payloadBuffer , writable : false ) ;
98- await payloadStream . CopyToAsync ( blobStream , bufferSize : DefaultCopyBufferSize , ct ) ;
99- await blobStream . FlushAsync ( ct ) ;
100- }
101-
102- return EncodeToken ( this . containerClient . Name , blobName ) ;
103- } ,
104- cancellationToken ) ;
75+ HttpHeaders = new BlobHttpHeaders { ContentEncoding = ContentEncodingGzip } ,
76+ } ;
77+ using Stream blobStream = await blob . OpenWriteAsync ( true , writeOptions , cancellationToken ) ;
78+ using GZipStream compressedBlobStream = new ( blobStream , System . IO . Compression . CompressionLevel . Optimal , leaveOpen : true ) ;
79+ using MemoryStream payloadStream = new ( payloadBuffer , writable : false ) ;
80+
81+ await payloadStream . CopyToAsync ( compressedBlobStream , bufferSize : DefaultCopyBufferSize , cancellationToken ) ;
82+ await compressedBlobStream . FlushAsync ( cancellationToken ) ;
83+ await blobStream . FlushAsync ( cancellationToken ) ;
84+ }
85+ else
86+ {
87+ using Stream blobStream = await blob . OpenWriteAsync ( true , default , cancellationToken ) ;
88+ using MemoryStream payloadStream = new ( payloadBuffer , writable : false ) ;
89+ await payloadStream . CopyToAsync ( blobStream , bufferSize : DefaultCopyBufferSize , cancellationToken ) ;
90+ await blobStream . FlushAsync ( cancellationToken ) ;
91+ }
10592
106- return token ;
93+ return EncodeToken ( this . containerClient . Name , blobName ) ;
10794 }
10895
10996 /// <inheritdoc/>
@@ -117,25 +104,20 @@ public override async Task<string> DownloadAsync(string token, CancellationToken
117104
118105 BlobClient blob = this . containerClient . GetBlobClient ( name ) ;
119106
120- return await WithTransientRetryAsync (
121- async ct =>
107+ using BlobDownloadStreamingResult result = await blob . DownloadStreamingAsync ( cancellationToken : cancellationToken ) ;
108+ Stream contentStream = result . Content ;
109+ bool isGzip = string . Equals (
110+ result . Details . ContentEncoding , ContentEncodingGzip , StringComparison . OrdinalIgnoreCase ) ;
111+
112+ if ( isGzip )
122113 {
123- using BlobDownloadStreamingResult result = await blob . DownloadStreamingAsync ( cancellationToken : ct ) ;
124- Stream contentStream = result . Content ;
125- bool isGzip = string . Equals (
126- result . Details . ContentEncoding , ContentEncodingGzip , StringComparison . OrdinalIgnoreCase ) ;
114+ using GZipStream decompressed = new ( contentStream , CompressionMode . Decompress ) ;
115+ using StreamReader reader = new ( decompressed , Encoding . UTF8 ) ;
116+ return await reader . ReadToEndAsync ( ) ;
117+ }
127118
128- if ( isGzip )
129- {
130- using GZipStream decompressed = new ( contentStream , CompressionMode . Decompress ) ;
131- using StreamReader reader = new ( decompressed , Encoding . UTF8 ) ;
132- return await reader . ReadToEndAsync ( ) ;
133- }
134-
135- using StreamReader uncompressedReader = new ( contentStream , Encoding . UTF8 ) ;
136- return await uncompressedReader . ReadToEndAsync ( ) ;
137- } ,
138- cancellationToken ) ;
119+ using StreamReader uncompressedReader = new ( contentStream , Encoding . UTF8 ) ;
120+ return await uncompressedReader . ReadToEndAsync ( ) ;
139121 }
140122
141123 /// <inheritdoc/>
@@ -167,49 +149,4 @@ public override bool IsKnownPayloadToken(string value)
167149
168150 return ( rest . Substring ( 0 , sep ) , rest . Substring ( sep + 1 ) ) ;
169151 }
170-
171- static async Task < T > WithTransientRetryAsync < T > ( Func < CancellationToken , Task < T > > operation , CancellationToken cancellationToken )
172- {
173- const int maxAttempts = MaxRetryAttempts ;
174- TimeSpan baseDelay = TimeSpan . FromMilliseconds ( BaseDelayMs ) ;
175- int attempt = 0 ;
176-
177- while ( true )
178- {
179- cancellationToken . ThrowIfCancellationRequested ( ) ;
180- try
181- {
182- return await operation ( cancellationToken ) ;
183- }
184- catch ( RequestFailedException ex ) when ( IsTransient ( ex ) && attempt < maxAttempts - 1 )
185- {
186- attempt ++ ;
187- TimeSpan delay = ComputeBackoff ( baseDelay , attempt ) ;
188- await Task . Delay ( delay , cancellationToken ) ;
189- }
190- catch ( IOException ) when ( attempt < maxAttempts - 1 )
191- {
192- attempt ++ ;
193- TimeSpan delay = ComputeBackoff ( baseDelay , attempt ) ;
194- await Task . Delay ( delay , cancellationToken ) ;
195- }
196- }
197- }
198-
199- static bool IsTransient ( RequestFailedException ex )
200- {
201- return ex . Status == 503 || ex . Status == 502 || ex . Status == 500 || ex . Status == 429 ;
202- }
203-
204- static TimeSpan ComputeBackoff ( TimeSpan baseDelay , int attempt )
205- {
206- double factor = Math . Pow ( 2 , Math . Min ( attempt , 6 ) ) ;
207- int jitterMs ;
208- lock ( RandomLock )
209- {
210- jitterMs = SharedRandom . Next ( 0 , MaxJitterMs ) ;
211- }
212-
213- return TimeSpan . FromMilliseconds ( Math . Min ( ( baseDelay . TotalMilliseconds * factor ) + jitterMs , MaxDelayMs ) ) ;
214- }
215- }
152+ }
0 commit comments