Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ internal enum AzureProviderErrorCode
AzureBlobProvider_ClearError = AzureBlobProviderBase + 14,
AzureBlobProvider_ClearingData = AzureBlobProviderBase + 15,
AzureBlobProvider_Cleared = AzureBlobProviderBase + 16,

AzureBlobProvider_LargePayloadFallback = AzureBlobProviderBase + 17,


}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
#nullable enable
using System;
using System.Buffers;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Azure;
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.Models;
Expand All @@ -11,7 +9,7 @@
using Microsoft.Extensions.Options;
using Orleans.Configuration;
using Orleans.Providers.Azure;
using Orleans.Runtime;
using Orleans.Serialization.Buffers.Adaptors;
using Orleans.Serialization.Serializers;
using LogLevel = Microsoft.Extensions.Logging.LogLevel;

Expand All @@ -28,6 +26,7 @@ public partial class AzureBlobGrainStorage : IGrainStorage, ILifecycleParticipan
private readonly IActivatorProvider _activatorProvider;
private readonly AzureBlobStorageOptions options;
private readonly IGrainStorageSerializer grainStorageSerializer;
private readonly IGrainStorageStreamingSerializer? streamSerializer;

/// <summary> Default constructor </summary>
public AzureBlobGrainStorage(
Expand All @@ -42,6 +41,7 @@ public AzureBlobGrainStorage(
this.blobContainerFactory = blobContainerFactory;
_activatorProvider = activatorProvider;
this.grainStorageSerializer = options.GrainStorageSerializer;
this.streamSerializer = options.GrainStorageSerializer as IGrainStorageStreamingSerializer;
this.logger = logger;
}

Expand All @@ -58,20 +58,12 @@ public async Task ReadStateAsync<T>(string grainType, GrainId grainId, IGrainSta
{
var blob = container.GetBlobClient(blobName);

var response = await blob.DownloadContentAsync();
grainState.ETag = response.Value.Details.ETag.ToString();
var contents = response.Value.Content;
T? loadedState;
if (contents is null || contents.IsEmpty)
T? loadedState = streamSerializer switch
{
loadedState = default;
LogTraceBlobEmptyReading(grainType, grainId, grainState.ETag, blobName, container.Name);
}
else
{
loadedState = this.ConvertFromStorageFormat<T>(contents);
LogTraceDataRead(grainType, grainId, grainState.ETag, blobName, container.Name);
}
not null => await ReadStateWithStreamAsync<T>(blob, grainType, grainId, grainState, blobName, container.Name),
null when options.UsePooledBufferForReads => await ReadStateWithPooledBufferAsync<T>(blob, grainType, grainId, grainState, blobName, container.Name),
_ => await ReadStateWithBinaryDataAsync<T>(blob, grainType, grainId, grainState, blobName, container.Name),
};

grainState.State = loadedState ?? CreateInstance<T>();
grainState.RecordExists = loadedState is not null;
Expand Down Expand Up @@ -115,11 +107,17 @@ public async Task WriteStateAsync<T>(string grainType, GrainId grainId, IGrainSt
{
LogTraceWriting(grainType, grainId, grainState.ETag, blobName, container.Name);

var contents = ConvertToStorageFormat(grainState.State);

var blob = container.GetBlobClient(blobName);

await WriteStateAndCreateContainerIfNotExists(grainType, grainId, grainState, contents, "application/octet-stream", blob);
if (streamSerializer is null || options.WriteMode == AzureBlobStorageWriteMode.BinaryData)
{
var contents = ConvertToStorageFormat(grainState.State);
await WriteStateAndCreateContainerIfNotExists(grainType, grainId, grainState, contents, "application/octet-stream", blob);
}
else
{
await WriteStateBufferedStreamAndCreateContainerIfNotExists(grainType, grainId, grainState, "application/octet-stream", blob);
}

LogTraceDataWritten(grainType, grainId, grainState.ETag, blobName, container.Name);
}
Expand Down Expand Up @@ -200,8 +198,7 @@ private async Task WriteStateAndCreateContainerIfNotExists<T>(string grainType,
static state => state.blob.UploadAsync(state.contents, state.options),
(blob, contents, options),
blob,
grainState.ETag)
.ConfigureAwait(false);
grainState.ETag).ConfigureAwait(false);

grainState.ETag = result.Value.ETag.ToString();
grainState.RecordExists = true;
Expand All @@ -211,11 +208,131 @@ private async Task WriteStateAndCreateContainerIfNotExists<T>(string grainType,
// if the container does not exist, create it, and make another attempt
LogTraceContainerNotFound(grainType, grainId, grainState.ETag, blob.Name, container.Name);
await container.CreateIfNotExistsAsync().ConfigureAwait(false);

await WriteStateAndCreateContainerIfNotExists(grainType, grainId, grainState, contents, mimeType, blob).ConfigureAwait(false);
}
}

private async Task WriteStateBufferedStreamAndCreateContainerIfNotExists<T>(string grainType, GrainId grainId, IGrainState<T> grainState, string mimeType, BlobClient blob)
{
var container = this.blobContainerFactory.GetBlobContainerClient(grainId);

try
{
var conditions = string.IsNullOrEmpty(grainState.ETag)
? new BlobRequestConditions { IfNoneMatch = ETag.All }
: new BlobRequestConditions { IfMatch = new ETag(grainState.ETag) };

var options = new BlobUploadOptions
{
HttpHeaders = new BlobHttpHeaders { ContentType = mimeType },
Conditions = conditions,
};

var result = await DoOptimisticUpdate(
static state => state.self.UploadSerializedStateBufferedAsync(state.blob, state.options, state.value),
(self: this, blob, options, value: grainState.State),
blob,
grainState.ETag).ConfigureAwait(false);

grainState.ETag = result.Value.ETag.ToString();
grainState.RecordExists = true;
}
catch (RequestFailedException exception) when (exception.IsContainerNotFound())
{
// if the container does not exist, create it, and make another attempt
LogTraceContainerNotFound(grainType, grainId, grainState.ETag, blob.Name, container.Name);
await container.CreateIfNotExistsAsync().ConfigureAwait(false);
await WriteStateBufferedStreamAndCreateContainerIfNotExists(grainType, grainId, grainState, mimeType, blob).ConfigureAwait(false);
}
}

private async Task<Response<BlobContentInfo>> UploadSerializedStateBufferedAsync<T>(BlobClient blob, BlobUploadOptions options, T value)
{
if (streamSerializer is null)
{
throw new InvalidOperationException("Stream serializer is not configured.");
}

var bufferStream = PooledBufferStream.Rent();
try
{
await streamSerializer.SerializeAsync(value, bufferStream).ConfigureAwait(false);
bufferStream.Position = 0;
return await blob.UploadAsync(bufferStream, options).ConfigureAwait(false);
}
finally
{
PooledBufferStream.Return(bufferStream);
}
}

private async Task<T?> ReadStateWithStreamAsync<T>(BlobClient blob, string grainType, GrainId grainId, IGrainState<T> grainState, string blobName, string containerName)
{
var response = await blob.DownloadStreamingAsync();
grainState.ETag = response.Value.Details.ETag.ToString();
var contentLength = response.Value.Details.ContentLength;

if (contentLength <= 0)
{
LogTraceBlobEmptyReading(grainType, grainId, grainState.ETag, blobName, containerName);
return default;
}

await using var content = response.Value.Content;
var loadedState = await streamSerializer!.DeserializeAsync<T>(content).ConfigureAwait(false);
LogTraceDataRead(grainType, grainId, grainState.ETag, blobName, containerName);
return loadedState;
}

private async Task<T?> ReadStateWithBinaryDataAsync<T>(BlobClient blob, string grainType, GrainId grainId, IGrainState<T> grainState, string blobName, string containerName)
{
var response = await blob.DownloadContentAsync();
grainState.ETag = response.Value.Details.ETag.ToString();
var contents = response.Value.Content;

if (contents is null || contents.IsEmpty)
{
LogTraceBlobEmptyReading(grainType, grainId, grainState.ETag, blobName, containerName);
return default;
}

var loadedState = this.ConvertFromStorageFormat<T>(contents);
LogTraceDataRead(grainType, grainId, grainState.ETag, blobName, containerName);
return loadedState;
}

private async Task<T?> ReadStateWithPooledBufferAsync<T>(BlobClient blob, string grainType, GrainId grainId, IGrainState<T> grainState, string blobName, string containerName)
{
var response = await blob.DownloadStreamingAsync();
grainState.ETag = response.Value.Details.ETag.ToString();
var contentLength = response.Value.Details.ContentLength;

if (contentLength <= 0)
{
LogTraceBlobEmptyReading(grainType, grainId, grainState.ETag, blobName, containerName);
return default;
}

await using var content = response.Value.Content;
T? loadedState;
if (contentLength <= int.MaxValue)
{
var buffer = ArrayPool<byte>.Shared.Rent((int)contentLength);
var memory = buffer.AsMemory(0, (int)contentLength);
await content.ReadExactlyAsync(memory);
loadedState = this.ConvertFromStorageFormat<T>(new BinaryData(memory));
ArrayPool<byte>.Shared.Return(buffer);
}
else
{
loadedState = this.ConvertFromStorageFormat<T>(new BinaryData(content));
LogWarningLargePayloadFallback(contentLength, grainType, grainId, grainState.ETag, blobName, containerName);
}

LogTraceDataRead(grainType, grainId, grainState.ETag, blobName, containerName);
return loadedState;
}

private static async Task<TResult> DoOptimisticUpdate<TState, TResult>(Func<TState, Task<TResult>> updateOperation, TState state, BlobClient blob, string currentETag)
{
try
Expand Down Expand Up @@ -294,6 +411,13 @@ private async Task Init(CancellationToken ct)
)]
private partial void LogTraceDataRead(string grainType, GrainId grainId, string? eTag, string blobName, string containerName);

[LoggerMessage(
Level = LogLevel.Warning,
EventId = (int)AzureProviderErrorCode.AzureBlobProvider_LargePayloadFallback,
Message = "ContentLength={ContentLength} exceeds max array size; falling back to DownloadContentAsync. GrainType={GrainType} GrainId={GrainId} ETag={ETag} BlobName={BlobName} in Container={ContainerName}"
)]
private partial void LogWarningLargePayloadFallback(long contentLength, string grainType, GrainId grainId, string? eTag, string blobName, string containerName);

[LoggerMessage(
Level = LogLevel.Error,
EventId = (int)AzureProviderErrorCode.AzureBlobProvider_ReadError,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,23 @@

namespace Orleans.Configuration
{
public enum AzureBlobStorageWriteMode
{
/// <summary>
/// Serialize to <see cref="BinaryData"/> and upload.
/// This uses the <see cref="IGrainStorageSerializer"/> binary path, materializing the full payload in memory.
/// It is typically the fastest path but can create large allocations (including LOH) for big payloads.
/// </summary>
BinaryData,

/// <summary>
/// Serialize using the stream serializer into a pooled in-memory stream and upload from that buffer.
/// This still buffers the full payload but avoids LOH churn by reusing pooled segments.
/// Requires <see cref="IGrainStorageStreamingSerializer"/>; otherwise the write falls back to <see cref="BinaryData"/>.
/// </summary>
BufferedStream,
}

public class AzureBlobStorageOptions : IStorageProviderSerializerOptions
{
private BlobServiceClient _blobServiceClient;
Expand Down Expand Up @@ -57,6 +74,20 @@ public BlobServiceClient BlobServiceClient
/// </summary>
public bool DeleteStateOnClear { get; set; } = true;

/// <summary>
/// Gets or sets a value indicating whether to use pooled buffers when reading blob contents.
/// The deserializer must not retain the <see cref="BinaryData"/> or underlying buffer after deserialization.
/// When a stream serializer is configured, pooled buffers are used only if the content length fits in an <see cref="int"/>.
/// When pooled buffers are used, deserialization goes through the <see cref="IGrainStorageSerializer"/> binary path.
/// </summary>
public bool UsePooledBufferForReads { get; set; } = true;

/// <summary>
/// Gets or sets the write path to use when a stream serializer is available.
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

write path -> write mode

/// If the stream serializer is not configured, writes always use <see cref="BinaryData"/>.
/// </summary>
public AzureBlobStorageWriteMode WriteMode { get; set; } = AzureBlobStorageWriteMode.BinaryData;

/// <summary>
/// A function for building container factory instances
/// </summary>
Expand Down Expand Up @@ -149,7 +180,7 @@ public void ValidateConfiguration()
AzureBlobUtils.ValidateContainerName(options.ContainerName);
AzureBlobUtils.ValidateBlobName(this.name);
}
catch(ArgumentException e)
catch (ArgumentException e)
{
throw new OrleansConfigurationException(
$"Configuration for AzureBlobStorageOptions {name} is invalid. {nameof(this.options.ContainerName)} is not valid", e);
Expand Down
31 changes: 30 additions & 1 deletion src/Orleans.Core/Providers/IGrainStorageSerializer.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using Orleans.Runtime;
Expand Down Expand Up @@ -27,6 +30,32 @@ public interface IGrainStorageSerializer
T Deserialize<T>(BinaryData input);
}

#nullable enable
/// <summary>
/// Optional stream-based serializer for grain state.
/// </summary>
public interface IGrainStorageStreamingSerializer
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ReubenBond said on Discord he prefers something like this for the abstraction.

/// <summary>
/// Optional stream-based serializer for grain state.
/// </summary>
public interface IGrainStorageStreamingSerializer
{
    /// <summary>
    /// Serializes the object input to a stream.
    /// </summary>
    /// <param name="input">The object to serialize.</param>
    /// <param name="destination">The destination buffer writer.</param>
    /// <param name="cancellationToken">The cancellation token.</param>
    /// <typeparam name="T">The input type.</typeparam>
    ValueTask SerializeAsync<T>(T input, IBufferWriter<byte> destination, CancellationToken cancellationToken = default);

    /// <summary>
    /// Deserializes the provided data from a stream.
    /// </summary>
    /// <param name="input">The input byte sequence.</param>
    /// <param name="cancellationToken">The cancellation token.</param>
    /// <typeparam name="T">The output type.</typeparam>
    /// <returns>The deserialized object, or null.</returns>
    ValueTask<T?> DeserializeAsync<T>(ReadOnlySequence<byte> input, CancellationToken cancellationToken = default);
}

For the data providers where Stream is native, we can then include helper extensions method that map to/from IBufferWriter<byte> and ReadOnlySequence<byte>

Copy link
Copy Markdown
Contributor Author

@egil egil Jan 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For serialization, we can use ready build adapters in Orleans:

public static class GrainStorageStreamingSerializerExtensions
{
    /// <summary>
    /// Serializes the object input to a stream.
    /// </summary>
    /// <param name="input">The object to serialize.</param>
    /// <param name="destination">The destination stream.</param>
    /// <param name="cancellationToken">The cancellation token.</param>
    /// <typeparam name="T">The input type.</typeparam>
    public static ValueTask SerializeAsync<T>(this IGrainStorageStreamingSerializer serializer, T input, Stream destination, CancellationToken cancellationToken = default)
    {
        ArgumentNullException.ThrowIfNull(serializer);
        ArgumentNullException.ThrowIfNull(destination);

        if (destination is MemoryStream memoryStream)
        {
            return serializer.SerializeAsync(input, new MemoryStreamBufferWriter(memoryStream), cancellationToken);
        }
        else
        {
            return serializer.SerializeAsync(input, new ArrayStreamBufferWriter(destination), cancellationToken);
        }
    }
}

For DeserializeAsync, I would love a suggestion for how to adapt a Stream to ReadOnlySequence<byte> without loading all data into memory first. Suggestions?

Copy link
Copy Markdown

@SilentBlueD666 SilentBlueD666 Jan 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The naming feels off, now you've switched from stream, maybe change to IGrainStateBufferSerializer or something.

My gut is telling me, drop the ValueTask, by this point all IO should be done, and Task/Await is just overhead 99% of the time, unless I am missing why you would need...

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The naming feels off, now you've switched from stream, maybe change to IGrainStateBufferSerializer or something.

My gut is telling to drop the ValueTask, by this point all IO should be done, and Task/Await is just overhead 99% of the time, unless I am missing why you would need...

I hope IO is not done at this point, no, since that means data has been loaded into memory, which was the main problem I was trying to avoid. With large datasets, e.g., blobs, that leads to more GC churn. So there need to be support for streaming data from blob storage to the serializer and then to objects.

Calling the interface IGrainStateBufferSerializer is fine though, no objections there.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about the IO and buffering in the serializer implementations (JsonGrainStorageSerializer, OrleansGrainStorageSerializer) and how it flows into AzureBlobGrainStorage.

Note: overhead here is tiny — pure micro-optimization territory!

The overall solution I was getting at earlier on Discord before I dropped off 😅 has been implemented at the calling level (in the provider) rather than inside the serializer.

// In AzureBlobGrainStorage.UploadSerializedStateBufferedAsync<T>
var bufferStream = PooledBufferStream.Rent();
try
{
    // Serialize: sync write to the pooled stream (no real await needed inside most impls)
    await streamSerializer.SerializeAsync(value, bufferStream).ConfigureAwait(false);
    
    bufferStream.Position = 0;
    
    // Actual IO: upload from the pooled stream
    return await blob.UploadAsync(bufferStream, options).ConfigureAwait(false);
}
finally
{
    PooledBufferStream.Return(bufferStream);
}

In JsonGrainStorageSerializer.SerializeAsync (similar for Orleans serializer):

public ValueTask SerializeAsync<T>(T value, Stream destination, CancellationToken ct = default)
{
    ct.ThrowIfCancellationRequested();
    _orleansJsonSerializer.Serialize(value, typeof(T), destination);  // sync write
    return ValueTask.CompletedTask;  // no suspension
}

The await on serialize is still basically zero-cost (completed ValueTask), but unnecessary.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh I see. If there is no need for asynchrony in the serializer implementations, then dropping ValueTask is fine. Looks like there are no asynchronous methods on ReadOnlySequence nor on IBufferWriter, so probably a good indicator ValueTask is not needed?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep 😁

{
/// <summary>
/// Serializes the object input to a stream.
/// </summary>
/// <param name="input">The object to serialize.</param>
/// <param name="destination">The destination stream.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <typeparam name="T">The input type.</typeparam>
ValueTask SerializeAsync<T>(T input, Stream destination, CancellationToken cancellationToken = default);

/// <summary>
/// Deserializes the provided data from a stream.
/// </summary>
/// <param name="input">The input stream.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <typeparam name="T">The output type.</typeparam>
/// <returns>The deserialized object.</returns>
ValueTask<T?> DeserializeAsync<T>(Stream input, CancellationToken cancellationToken = default);
}
#nullable restore

/// <summary>
/// Extensions for <see cref="IGrainStorageSerializer"/>.
/// </summary>
Expand Down Expand Up @@ -76,7 +105,7 @@ public void PostConfigure(string name, TOptions options)
{
if (options.GrainStorageSerializer == default)
{
// First, try to get a IGrainStorageSerializer that was registered with
// First, try to get a IGrainStorageSerializer that was registered with
// the same name as the storage provider
// If none is found, fallback to system wide default
options.GrainStorageSerializer = _serviceProvider.GetKeyedService<IGrainStorageSerializer>(name) ?? _serviceProvider.GetRequiredService<IGrainStorageSerializer>();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
using System;
using Orleans.Serialization;

namespace Orleans.Storage
{
/// <summary>
/// Grain storage serializer that uses Newtonsoft.Json
/// </summary>
public class JsonGrainStorageSerializer : IGrainStorageSerializer
public class JsonGrainStorageSerializer : IGrainStorageSerializer, IGrainStorageStreamingSerializer
{
private readonly OrleansJsonSerializer _orleansJsonSerializer;

Expand All @@ -30,5 +29,20 @@ public T Deserialize<T>(BinaryData input)
{
return (T)_orleansJsonSerializer.Deserialize(typeof(T), input.ToString());
}

/// <inheritdoc/>
public ValueTask SerializeAsync<T>(T value, Stream destination, CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();
_orleansJsonSerializer.Serialize(value, typeof(T), destination);
return ValueTask.CompletedTask;
}

/// <inheritdoc/>
public ValueTask<T> DeserializeAsync<T>(Stream input, CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();
return ValueTask.FromResult((T)_orleansJsonSerializer.Deserialize(typeof(T), input));
}
}
}
Loading
Loading