Skip to content

Commit 370139e

Browse files
committed
Add streaming grain storage serializer and Azure Blob pooled IO
Introduce IGrainStorageStreamingSerializer and stream overloads for OrleansJsonSerializer plus Orleans/Json grain storage serializers. Azure blob storage now supports pooled read buffers, a buffered stream write mode, and logs a warning when large payloads force a fallback to DownloadContentAsync. Add Azure Blob storage tests for pooled reads and streaming serializer behavior, and add focused grain storage benchmarks (binary vs streaming) across Orleans, Newtonsoft.Json, and STJ. Alternatives considered: full streaming OpenWriteAsync with separate ETag readback (rejected due to concurrency race), and IBufferWriter/ReadOnlySequence buffer paths (explored, but didnt improve performance or allocation in a meaningful way, so but dropped for now). Buffered stream uploads and pooled reads were kept as the best allocation/compatibility tradeoff while keeping BinaryData writes as the default.
1 parent 8024faf commit 370139e

20 files changed

+1245
-42
lines changed

src/Azure/Orleans.Persistence.AzureStorage/Providers/AzureProviderErrorCode.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ internal enum AzureProviderErrorCode
3434
AzureBlobProvider_ClearError = AzureBlobProviderBase + 14,
3535
AzureBlobProvider_ClearingData = AzureBlobProviderBase + 15,
3636
AzureBlobProvider_Cleared = AzureBlobProviderBase + 16,
37-
37+
AzureBlobProvider_LargePayloadFallback = AzureBlobProviderBase + 17,
3838

3939

4040
}

src/Azure/Orleans.Persistence.AzureStorage/Providers/Storage/AzureBlobStorage.cs

Lines changed: 149 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
#nullable enable
2-
using System;
2+
using System.Buffers;
33
using System.Diagnostics;
4-
using System.Threading;
5-
using System.Threading.Tasks;
64
using Azure;
75
using Azure.Storage.Blobs;
86
using Azure.Storage.Blobs.Models;
@@ -11,7 +9,7 @@
119
using Microsoft.Extensions.Options;
1210
using Orleans.Configuration;
1311
using Orleans.Providers.Azure;
14-
using Orleans.Runtime;
12+
using Orleans.Serialization.Buffers.Adaptors;
1513
using Orleans.Serialization.Serializers;
1614
using LogLevel = Microsoft.Extensions.Logging.LogLevel;
1715

@@ -28,6 +26,7 @@ public partial class AzureBlobGrainStorage : IGrainStorage, ILifecycleParticipan
2826
private readonly IActivatorProvider _activatorProvider;
2927
private readonly AzureBlobStorageOptions options;
3028
private readonly IGrainStorageSerializer grainStorageSerializer;
29+
private readonly IGrainStorageStreamingSerializer? streamSerializer;
3130

3231
/// <summary> Default constructor </summary>
3332
public AzureBlobGrainStorage(
@@ -42,6 +41,7 @@ public AzureBlobGrainStorage(
4241
this.blobContainerFactory = blobContainerFactory;
4342
_activatorProvider = activatorProvider;
4443
this.grainStorageSerializer = options.GrainStorageSerializer;
44+
this.streamSerializer = options.GrainStorageSerializer as IGrainStorageStreamingSerializer;
4545
this.logger = logger;
4646
}
4747

@@ -58,20 +58,12 @@ public async Task ReadStateAsync<T>(string grainType, GrainId grainId, IGrainSta
5858
{
5959
var blob = container.GetBlobClient(blobName);
6060

61-
var response = await blob.DownloadContentAsync();
62-
grainState.ETag = response.Value.Details.ETag.ToString();
63-
var contents = response.Value.Content;
64-
T? loadedState;
65-
if (contents is null || contents.IsEmpty)
61+
T? loadedState = streamSerializer switch
6662
{
67-
loadedState = default;
68-
LogTraceBlobEmptyReading(grainType, grainId, grainState.ETag, blobName, container.Name);
69-
}
70-
else
71-
{
72-
loadedState = this.ConvertFromStorageFormat<T>(contents);
73-
LogTraceDataRead(grainType, grainId, grainState.ETag, blobName, container.Name);
74-
}
63+
not null => await ReadStateWithStreamAsync<T>(blob, grainType, grainId, grainState, blobName, container.Name),
64+
null when options.UsePooledBufferForReads => await ReadStateWithPooledBufferAsync<T>(blob, grainType, grainId, grainState, blobName, container.Name),
65+
_ => await ReadStateWithBinaryDataAsync<T>(blob, grainType, grainId, grainState, blobName, container.Name),
66+
};
7567

7668
grainState.State = loadedState ?? CreateInstance<T>();
7769
grainState.RecordExists = loadedState is not null;
@@ -115,11 +107,17 @@ public async Task WriteStateAsync<T>(string grainType, GrainId grainId, IGrainSt
115107
{
116108
LogTraceWriting(grainType, grainId, grainState.ETag, blobName, container.Name);
117109

118-
var contents = ConvertToStorageFormat(grainState.State);
119-
120110
var blob = container.GetBlobClient(blobName);
121111

122-
await WriteStateAndCreateContainerIfNotExists(grainType, grainId, grainState, contents, "application/octet-stream", blob);
112+
if (streamSerializer is null || options.WriteMode == AzureBlobStorageWriteMode.BinaryData)
113+
{
114+
var contents = ConvertToStorageFormat(grainState.State);
115+
await WriteStateAndCreateContainerIfNotExists(grainType, grainId, grainState, contents, "application/octet-stream", blob);
116+
}
117+
else
118+
{
119+
await WriteStateBufferedStreamAndCreateContainerIfNotExists(grainType, grainId, grainState, "application/octet-stream", blob);
120+
}
123121

124122
LogTraceDataWritten(grainType, grainId, grainState.ETag, blobName, container.Name);
125123
}
@@ -200,8 +198,7 @@ private async Task WriteStateAndCreateContainerIfNotExists<T>(string grainType,
200198
static state => state.blob.UploadAsync(state.contents, state.options),
201199
(blob, contents, options),
202200
blob,
203-
grainState.ETag)
204-
.ConfigureAwait(false);
201+
grainState.ETag).ConfigureAwait(false);
205202

206203
grainState.ETag = result.Value.ETag.ToString();
207204
grainState.RecordExists = true;
@@ -211,11 +208,133 @@ private async Task WriteStateAndCreateContainerIfNotExists<T>(string grainType,
211208
// if the container does not exist, create it, and make another attempt
212209
LogTraceContainerNotFound(grainType, grainId, grainState.ETag, blob.Name, container.Name);
213210
await container.CreateIfNotExistsAsync().ConfigureAwait(false);
214-
215211
await WriteStateAndCreateContainerIfNotExists(grainType, grainId, grainState, contents, mimeType, blob).ConfigureAwait(false);
216212
}
217213
}
218214

215+
private async Task WriteStateBufferedStreamAndCreateContainerIfNotExists<T>(string grainType, GrainId grainId, IGrainState<T> grainState, string mimeType, BlobClient blob)
216+
{
217+
var container = this.blobContainerFactory.GetBlobContainerClient(grainId);
218+
219+
try
220+
{
221+
var conditions = string.IsNullOrEmpty(grainState.ETag)
222+
? new BlobRequestConditions { IfNoneMatch = ETag.All }
223+
: new BlobRequestConditions { IfMatch = new ETag(grainState.ETag) };
224+
225+
var options = new BlobUploadOptions
226+
{
227+
HttpHeaders = new BlobHttpHeaders { ContentType = mimeType },
228+
Conditions = conditions,
229+
};
230+
231+
var result = await DoOptimisticUpdate(
232+
static state => state.self.UploadSerializedStateBufferedAsync(state.blob, state.options, state.value),
233+
(self: this, blob, options, value: grainState.State),
234+
blob,
235+
grainState.ETag).ConfigureAwait(false);
236+
237+
grainState.ETag = result.Value.ETag.ToString();
238+
grainState.RecordExists = true;
239+
}
240+
catch (RequestFailedException exception) when (exception.IsContainerNotFound())
241+
{
242+
// if the container does not exist, create it, and make another attempt
243+
LogTraceContainerNotFound(grainType, grainId, grainState.ETag, blob.Name, container.Name);
244+
await container.CreateIfNotExistsAsync().ConfigureAwait(false);
245+
await WriteStateBufferedStreamAndCreateContainerIfNotExists(grainType, grainId, grainState, mimeType, blob).ConfigureAwait(false);
246+
}
247+
}
248+
249+
private async Task<Response<BlobContentInfo>> UploadSerializedStateBufferedAsync<T>(BlobClient blob, BlobUploadOptions options, T value)
250+
{
251+
if (streamSerializer is null)
252+
{
253+
throw new InvalidOperationException("Stream serializer is not configured.");
254+
}
255+
256+
var bufferStream = PooledBufferStream.Rent();
257+
try
258+
{
259+
await streamSerializer.SerializeAsync(value, bufferStream).ConfigureAwait(false);
260+
bufferStream.Position = 0;
261+
return await blob.UploadAsync(bufferStream, options).ConfigureAwait(false);
262+
}
263+
finally
264+
{
265+
PooledBufferStream.Return(bufferStream);
266+
}
267+
}
268+
269+
private async Task<T?> ReadStateWithStreamAsync<T>(BlobClient blob, string grainType, GrainId grainId, IGrainState<T> grainState, string blobName, string containerName)
270+
{
271+
var response = await blob.DownloadStreamingAsync();
272+
grainState.ETag = response.Value.Details.ETag.ToString();
273+
var contentLength = response.Value.Details.ContentLength;
274+
275+
if (contentLength <= 0)
276+
{
277+
LogTraceBlobEmptyReading(grainType, grainId, grainState.ETag, blobName, containerName);
278+
return default;
279+
}
280+
281+
await using var content = response.Value.Content;
282+
var loadedState = await streamSerializer!.DeserializeAsync<T>(content).ConfigureAwait(false);
283+
LogTraceDataRead(grainType, grainId, grainState.ETag, blobName, containerName);
284+
return loadedState;
285+
}
286+
287+
private async Task<T?> ReadStateWithBinaryDataAsync<T>(BlobClient blob, string grainType, GrainId grainId, IGrainState<T> grainState, string blobName, string containerName)
288+
{
289+
var response = await blob.DownloadContentAsync();
290+
grainState.ETag = response.Value.Details.ETag.ToString();
291+
var contents = response.Value.Content;
292+
293+
if (contents is null || contents.IsEmpty)
294+
{
295+
LogTraceBlobEmptyReading(grainType, grainId, grainState.ETag, blobName, containerName);
296+
return default;
297+
}
298+
299+
var loadedState = this.ConvertFromStorageFormat<T>(contents);
300+
LogTraceDataRead(grainType, grainId, grainState.ETag, blobName, containerName);
301+
return loadedState;
302+
}
303+
304+
private async Task<T?> ReadStateWithPooledBufferAsync<T>(BlobClient blob, string grainType, GrainId grainId, IGrainState<T> grainState, string blobName, string containerName)
305+
{
306+
var response = await blob.DownloadStreamingAsync();
307+
grainState.ETag = response.Value.Details.ETag.ToString();
308+
var contentLength = response.Value.Details.ContentLength;
309+
310+
if (contentLength <= 0)
311+
{
312+
LogTraceBlobEmptyReading(grainType, grainId, grainState.ETag, blobName, containerName);
313+
return default;
314+
}
315+
316+
if (contentLength > int.MaxValue)
317+
{
318+
LogWarningLargePayloadFallback(contentLength, grainType, grainId, grainState.ETag, blobName, containerName);
319+
return await ReadStateWithBinaryDataAsync(blob, grainType, grainId, grainState, blobName, containerName);
320+
}
321+
322+
await using var content = response.Value.Content;
323+
var buffer = ArrayPool<byte>.Shared.Rent((int)contentLength);
324+
try
325+
{
326+
var memory = buffer.AsMemory(0, (int)contentLength);
327+
await content.ReadExactlyAsync(memory);
328+
var loadedState = this.ConvertFromStorageFormat<T>(new BinaryData(memory));
329+
LogTraceDataRead(grainType, grainId, grainState.ETag, blobName, containerName);
330+
return loadedState;
331+
}
332+
finally
333+
{
334+
ArrayPool<byte>.Shared.Return(buffer);
335+
}
336+
}
337+
219338
private static async Task<TResult> DoOptimisticUpdate<TState, TResult>(Func<TState, Task<TResult>> updateOperation, TState state, BlobClient blob, string currentETag)
220339
{
221340
try
@@ -294,6 +413,13 @@ private async Task Init(CancellationToken ct)
294413
)]
295414
private partial void LogTraceDataRead(string grainType, GrainId grainId, string? eTag, string blobName, string containerName);
296415

416+
[LoggerMessage(
417+
Level = LogLevel.Warning,
418+
EventId = (int)AzureProviderErrorCode.AzureBlobProvider_LargePayloadFallback,
419+
Message = "ContentLength={ContentLength} exceeds max array size; falling back to DownloadContentAsync. GrainType={GrainType} GrainId={GrainId} ETag={ETag} BlobName={BlobName} in Container={ContainerName}"
420+
)]
421+
private partial void LogWarningLargePayloadFallback(long contentLength, string grainType, GrainId grainId, string? eTag, string blobName, string containerName);
422+
297423
[LoggerMessage(
298424
Level = LogLevel.Error,
299425
EventId = (int)AzureProviderErrorCode.AzureBlobProvider_ReadError,

src/Azure/Orleans.Persistence.AzureStorage/Providers/Storage/AzureBlobStorageOptions.cs

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,23 @@
1111

1212
namespace Orleans.Configuration
1313
{
14+
public enum AzureBlobStorageWriteMode
15+
{
16+
/// <summary>
17+
/// Serialize to <see cref="BinaryData"/> and upload.
18+
/// This uses the <see cref="IGrainStorageSerializer"/> binary path, materializing the full payload in memory.
19+
/// It is typically the fastest path but can create large allocations (including LOH) for big payloads.
20+
/// </summary>
21+
BinaryData,
22+
23+
/// <summary>
24+
/// Serialize using the stream serializer into a pooled in-memory stream and upload from that buffer.
25+
/// This still buffers the full payload but avoids LOH churn by reusing pooled segments.
26+
/// Requires <see cref="IGrainStorageStreamingSerializer"/>; otherwise the write falls back to <see cref="BinaryData"/>.
27+
/// </summary>
28+
BufferedStream,
29+
}
30+
1431
public class AzureBlobStorageOptions : IStorageProviderSerializerOptions
1532
{
1633
private BlobServiceClient _blobServiceClient;
@@ -57,6 +74,20 @@ public BlobServiceClient BlobServiceClient
5774
/// </summary>
5875
public bool DeleteStateOnClear { get; set; } = true;
5976

77+
/// <summary>
78+
/// Gets or sets a value indicating whether to use pooled buffers when reading blob contents.
79+
/// The deserializer must not retain the <see cref="BinaryData"/> or underlying buffer after deserialization.
80+
/// When a stream serializer is configured, pooled buffers are used only if the content length fits in an <see cref="int"/>.
81+
/// When pooled buffers are used, deserialization goes through the <see cref="IGrainStorageSerializer"/> binary path.
82+
/// </summary>
83+
public bool UsePooledBufferForReads { get; set; } = true;
84+
85+
/// <summary>
86+
/// Gets or sets the write path to use when a stream serializer is available.
87+
/// If the stream serializer is not configured, writes always use <see cref="BinaryData"/>.
88+
/// </summary>
89+
public AzureBlobStorageWriteMode WriteMode { get; set; } = AzureBlobStorageWriteMode.BinaryData;
90+
6091
/// <summary>
6192
/// A function for building container factory instances
6293
/// </summary>
@@ -149,7 +180,7 @@ public void ValidateConfiguration()
149180
AzureBlobUtils.ValidateContainerName(options.ContainerName);
150181
AzureBlobUtils.ValidateBlobName(this.name);
151182
}
152-
catch(ArgumentException e)
183+
catch (ArgumentException e)
153184
{
154185
throw new OrleansConfigurationException(
155186
$"Configuration for AzureBlobStorageOptions {name} is invalid. {nameof(this.options.ContainerName)} is not valid", e);

src/Orleans.Core/Providers/IGrainStorageSerializer.cs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
11
using System;
2+
using System.IO;
3+
using System.Threading;
4+
using System.Threading.Tasks;
25
using Microsoft.Extensions.DependencyInjection;
36
using Microsoft.Extensions.Options;
47
using Orleans.Runtime;
@@ -27,6 +30,30 @@ public interface IGrainStorageSerializer
2730
T Deserialize<T>(BinaryData input);
2831
}
2932

33+
/// <summary>
34+
/// Optional stream-based serializer for grain state.
35+
/// </summary>
36+
public interface IGrainStorageStreamingSerializer
37+
{
38+
/// <summary>
39+
/// Serializes the object input to a stream.
40+
/// </summary>
41+
/// <param name="input">The object to serialize.</param>
42+
/// <param name="destination">The destination stream.</param>
43+
/// <param name="cancellationToken">The cancellation token.</param>
44+
/// <typeparam name="T">The input type.</typeparam>
45+
ValueTask SerializeAsync<T>(T input, Stream destination, CancellationToken cancellationToken = default);
46+
47+
/// <summary>
48+
/// Deserializes the provided data from a stream.
49+
/// </summary>
50+
/// <param name="input">The input stream.</param>
51+
/// <param name="cancellationToken">The cancellation token.</param>
52+
/// <typeparam name="T">The output type.</typeparam>
53+
/// <returns>The deserialized object.</returns>
54+
ValueTask<T> DeserializeAsync<T>(Stream input, CancellationToken cancellationToken = default);
55+
}
56+
3057
/// <summary>
3158
/// Extensions for <see cref="IGrainStorageSerializer"/>.
3259
/// </summary>

src/Orleans.Core/Providers/StorageSerializer/JsonGrainStorageSerializer.cs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
1-
using System;
21
using Orleans.Serialization;
32

43
namespace Orleans.Storage
54
{
65
/// <summary>
76
/// Grain storage serializer that uses Newtonsoft.Json
87
/// </summary>
9-
public class JsonGrainStorageSerializer : IGrainStorageSerializer
8+
public class JsonGrainStorageSerializer : IGrainStorageSerializer, IGrainStorageStreamingSerializer
109
{
1110
private readonly OrleansJsonSerializer _orleansJsonSerializer;
1211

@@ -30,5 +29,20 @@ public T Deserialize<T>(BinaryData input)
3029
{
3130
return (T)_orleansJsonSerializer.Deserialize(typeof(T), input.ToString());
3231
}
32+
33+
/// <inheritdoc/>
34+
public ValueTask SerializeAsync<T>(T value, Stream destination, CancellationToken cancellationToken = default)
35+
{
36+
cancellationToken.ThrowIfCancellationRequested();
37+
_orleansJsonSerializer.Serialize(value, typeof(T), destination);
38+
return ValueTask.CompletedTask;
39+
}
40+
41+
/// <inheritdoc/>
42+
public ValueTask<T> DeserializeAsync<T>(Stream input, CancellationToken cancellationToken = default)
43+
{
44+
cancellationToken.ThrowIfCancellationRequested();
45+
return ValueTask.FromResult((T)_orleansJsonSerializer.Deserialize(typeof(T), input));
46+
}
3347
}
3448
}

0 commit comments

Comments
 (0)