Skip to content

Commit 738892b

Browse files
committed
refactor
1 parent a5549a3 commit 738892b

File tree

3 files changed

+31
-15
lines changed

3 files changed

+31
-15
lines changed

src/Extensions/AzureBlobPayloads/Converters/BlobPayloadStore.cs

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,12 @@ namespace Microsoft.DurableTask.Converters;
1818
public sealed class BlobPayloadStore : IPayloadStore
1919
{
2020
const string TokenPrefix = "blob:v1:";
21+
const string ContentEncodingGzip = "gzip";
22+
const int DefaultCopyBufferSize = 81920;
23+
const int MaxRetryAttempts = 8;
24+
const int BaseDelayMs = 250;
25+
const int MaxDelayMs = 10_000;
26+
const int MaxJitterMs = 100;
2127

2228
// Jitter RNG for retry backoff
2329
static readonly object RandomLock = new object();
@@ -43,8 +49,8 @@ public BlobPayloadStore(LargePayloadStorageOptions options)
4349
Retry =
4450
{
4551
Mode = RetryMode.Exponential,
46-
MaxRetries = 8,
47-
Delay = TimeSpan.FromMilliseconds(250),
52+
MaxRetries = MaxRetryAttempts,
53+
Delay = TimeSpan.FromMilliseconds(BaseDelayMs),
4854
MaxDelay = TimeSpan.FromSeconds(10),
4955
NetworkTimeout = TimeSpan.FromMinutes(2),
5056
},
@@ -73,21 +79,21 @@ public async Task<string> UploadAsync(ReadOnlyMemory<byte> payloadBytes, Cancell
7379
{
7480
BlobOpenWriteOptions writeOptions = new()
7581
{
76-
HttpHeaders = new BlobHttpHeaders { ContentEncoding = "gzip" },
82+
HttpHeaders = new BlobHttpHeaders { ContentEncoding = ContentEncodingGzip },
7783
};
7884
using Stream blobStream = await blob.OpenWriteAsync(true, writeOptions, ct);
7985
using GZipStream compressedBlobStream = new(blobStream, CompressionLevel.Optimal, leaveOpen: true);
8086
using MemoryStream payloadStream = new(payloadBuffer, writable: false);
8187

82-
await payloadStream.CopyToAsync(compressedBlobStream, bufferSize: 81920, ct);
88+
await payloadStream.CopyToAsync(compressedBlobStream, bufferSize: DefaultCopyBufferSize, ct);
8389
await compressedBlobStream.FlushAsync(ct);
8490
await blobStream.FlushAsync(ct);
8591
}
8692
else
8793
{
8894
using Stream blobStream = await blob.OpenWriteAsync(true, default, ct);
8995
using MemoryStream payloadStream = new(payloadBuffer, writable: false);
90-
await payloadStream.CopyToAsync(blobStream, bufferSize: 81920, ct);
96+
await payloadStream.CopyToAsync(blobStream, bufferSize: DefaultCopyBufferSize, ct);
9197
await blobStream.FlushAsync(ct);
9298
}
9399

@@ -115,7 +121,7 @@ public async Task<string> DownloadAsync(string token, CancellationToken cancella
115121
using BlobDownloadStreamingResult result = await blob.DownloadStreamingAsync(cancellationToken: ct);
116122
Stream contentStream = result.Content;
117123
bool isGzip = string.Equals(
118-
result.Details.ContentEncoding, "gzip", StringComparison.OrdinalIgnoreCase);
124+
result.Details.ContentEncoding, ContentEncodingGzip, StringComparison.OrdinalIgnoreCase);
119125

120126
if (isGzip)
121127
{
@@ -162,8 +168,8 @@ public bool IsKnownPayloadToken(string value)
162168

163169
static async Task<T> WithTransientRetryAsync<T>(Func<CancellationToken, Task<T>> operation, CancellationToken cancellationToken)
164170
{
165-
const int maxAttempts = 8;
166-
TimeSpan baseDelay = TimeSpan.FromMilliseconds(250);
171+
const int maxAttempts = MaxRetryAttempts;
172+
TimeSpan baseDelay = TimeSpan.FromMilliseconds(BaseDelayMs);
167173
int attempt = 0;
168174

169175
while (true)
@@ -201,9 +207,9 @@ static TimeSpan ComputeBackoff(TimeSpan baseDelay, int attempt)
201207
int jitterMs;
202208
lock (RandomLock)
203209
{
204-
jitterMs = SharedRandom.Next(0, 100);
210+
jitterMs = SharedRandom.Next(0, MaxJitterMs);
205211
}
206-
return TimeSpan.FromMilliseconds(Math.Min((baseDelay.TotalMilliseconds * factor) + jitterMs, 10_000));
207-
}
208212

213+
return TimeSpan.FromMilliseconds(Math.Min((baseDelay.TotalMilliseconds * factor) + jitterMs, MaxDelayMs));
214+
}
209215
}

src/Extensions/AzureBlobPayloads/Converters/LargePayloadDataConverter.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,14 +37,14 @@ public sealed class LargePayloadDataConverter(
3737
[return: NotNullIfNotNull("value")]
3838
public override string? Serialize(object? value)
3939
{
40-
throw new NotImplementedException();
40+
throw new NotSupportedException();
4141
}
4242

4343
/// <inheritdoc/>
4444
[return: NotNullIfNotNull("data")]
4545
public override object? Deserialize(string? data, Type targetType)
4646
{
47-
throw new NotImplementedException();
47+
throw new NotSupportedException();
4848
}
4949

5050
/// <summary>

src/Worker/Core/Shims/DurableTaskShimFactory.cs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,12 @@ public TaskActivity CreateActivity(TaskName name, ITaskActivity activity)
5050
{
5151
Check.NotDefault(name);
5252
Check.NotNull(activity);
53-
return new TaskActivityShim(this.loggerFactory, this.options.DataConverter, name, activity, this.options.EnableLargePayloadSupport);
53+
return new TaskActivityShim(
54+
this.loggerFactory,
55+
this.options.DataConverter,
56+
name,
57+
activity,
58+
this.options.EnableLargePayloadSupport);
5459
}
5560

5661
/// <summary>
@@ -151,6 +156,11 @@ public TaskEntity CreateEntity(TaskName name, ITaskEntity entity, EntityId entit
151156
// In the future we may consider caching those shims and reusing them, which can reduce
152157
// deserialization and allocation overheads.
153158
ILogger logger = this.loggerFactory.CreateLogger(entity.GetType());
154-
return new TaskEntityShim(this.options.DataConverter, entity, entityId, logger, this.options.EnableLargePayloadSupport);
159+
return new TaskEntityShim(
160+
this.options.DataConverter,
161+
entity,
162+
entityId,
163+
logger,
164+
enableLargePayloadSupport: this.options.EnableLargePayloadSupport);
155165
}
156166
}

0 commit comments

Comments
 (0)