diff --git a/src/Azure/Orleans.Persistence.AzureStorage/Providers/AzureProviderErrorCode.cs b/src/Azure/Orleans.Persistence.AzureStorage/Providers/AzureProviderErrorCode.cs index 97c8b8a7507..628bb99c73f 100644 --- a/src/Azure/Orleans.Persistence.AzureStorage/Providers/AzureProviderErrorCode.cs +++ b/src/Azure/Orleans.Persistence.AzureStorage/Providers/AzureProviderErrorCode.cs @@ -34,7 +34,7 @@ internal enum AzureProviderErrorCode AzureBlobProvider_ClearError = AzureBlobProviderBase + 14, AzureBlobProvider_ClearingData = AzureBlobProviderBase + 15, AzureBlobProvider_Cleared = AzureBlobProviderBase + 16, - + AzureBlobProvider_LargePayloadFallback = AzureBlobProviderBase + 17, } diff --git a/src/Azure/Orleans.Persistence.AzureStorage/Providers/Storage/AzureBlobStorage.cs b/src/Azure/Orleans.Persistence.AzureStorage/Providers/Storage/AzureBlobStorage.cs index 02bf13ba53b..faacb7f4e65 100644 --- a/src/Azure/Orleans.Persistence.AzureStorage/Providers/Storage/AzureBlobStorage.cs +++ b/src/Azure/Orleans.Persistence.AzureStorage/Providers/Storage/AzureBlobStorage.cs @@ -1,8 +1,6 @@ #nullable enable -using System; +using System.Buffers; using System.Diagnostics; -using System.Threading; -using System.Threading.Tasks; using Azure; using Azure.Storage.Blobs; using Azure.Storage.Blobs.Models; @@ -11,7 +9,7 @@ using Microsoft.Extensions.Options; using Orleans.Configuration; using Orleans.Providers.Azure; -using Orleans.Runtime; +using Orleans.Serialization.Buffers.Adaptors; using Orleans.Serialization.Serializers; using LogLevel = Microsoft.Extensions.Logging.LogLevel; @@ -28,6 +26,7 @@ public partial class AzureBlobGrainStorage : IGrainStorage, ILifecycleParticipan private readonly IActivatorProvider _activatorProvider; private readonly AzureBlobStorageOptions options; private readonly IGrainStorageSerializer grainStorageSerializer; + private readonly IGrainStorageStreamingSerializer? streamSerializer; /// Default constructor public AzureBlobGrainStorage( @@ -42,6 +41,7 @@ public AzureBlobGrainStorage( this.blobContainerFactory = blobContainerFactory; _activatorProvider = activatorProvider; this.grainStorageSerializer = options.GrainStorageSerializer; + this.streamSerializer = options.GrainStorageSerializer as IGrainStorageStreamingSerializer; this.logger = logger; } @@ -58,20 +58,12 @@ public async Task ReadStateAsync(string grainType, GrainId grainId, IGrainSta { var blob = container.GetBlobClient(blobName); - var response = await blob.DownloadContentAsync(); - grainState.ETag = response.Value.Details.ETag.ToString(); - var contents = response.Value.Content; - T? loadedState; - if (contents is null || contents.IsEmpty) + T? loadedState = streamSerializer switch { - loadedState = default; - LogTraceBlobEmptyReading(grainType, grainId, grainState.ETag, blobName, container.Name); - } - else - { - loadedState = this.ConvertFromStorageFormat(contents); - LogTraceDataRead(grainType, grainId, grainState.ETag, blobName, container.Name); - } + not null => await ReadStateWithStreamAsync(blob, grainType, grainId, grainState, blobName, container.Name), + null when options.UsePooledBufferForReads => await ReadStateWithPooledBufferAsync(blob, grainType, grainId, grainState, blobName, container.Name), + _ => await ReadStateWithBinaryDataAsync(blob, grainType, grainId, grainState, blobName, container.Name), + }; grainState.State = loadedState ?? CreateInstance(); grainState.RecordExists = loadedState is not null; @@ -115,11 +107,17 @@ public async Task WriteStateAsync(string grainType, GrainId grainId, IGrainSt { LogTraceWriting(grainType, grainId, grainState.ETag, blobName, container.Name); - var contents = ConvertToStorageFormat(grainState.State); - var blob = container.GetBlobClient(blobName); - await WriteStateAndCreateContainerIfNotExists(grainType, grainId, grainState, contents, "application/octet-stream", blob); + if (streamSerializer is null || options.WriteMode == AzureBlobStorageWriteMode.BinaryData) + { + var contents = ConvertToStorageFormat(grainState.State); + await WriteStateAndCreateContainerIfNotExists(grainType, grainId, grainState, contents, "application/octet-stream", blob); + } + else + { + await WriteStateBufferedStreamAndCreateContainerIfNotExists(grainType, grainId, grainState, "application/octet-stream", blob); + } LogTraceDataWritten(grainType, grainId, grainState.ETag, blobName, container.Name); } @@ -200,8 +198,7 @@ private async Task WriteStateAndCreateContainerIfNotExists(string grainType, static state => state.blob.UploadAsync(state.contents, state.options), (blob, contents, options), blob, - grainState.ETag) - .ConfigureAwait(false); + grainState.ETag).ConfigureAwait(false); grainState.ETag = result.Value.ETag.ToString(); grainState.RecordExists = true; @@ -211,11 +208,131 @@ private async Task WriteStateAndCreateContainerIfNotExists(string grainType, // if the container does not exist, create it, and make another attempt LogTraceContainerNotFound(grainType, grainId, grainState.ETag, blob.Name, container.Name); await container.CreateIfNotExistsAsync().ConfigureAwait(false); - await WriteStateAndCreateContainerIfNotExists(grainType, grainId, grainState, contents, mimeType, blob).ConfigureAwait(false); } } + private async Task WriteStateBufferedStreamAndCreateContainerIfNotExists(string grainType, GrainId grainId, IGrainState grainState, string mimeType, BlobClient blob) + { + var container = this.blobContainerFactory.GetBlobContainerClient(grainId); + + try + { + var conditions = string.IsNullOrEmpty(grainState.ETag) + ? new BlobRequestConditions { IfNoneMatch = ETag.All } + : new BlobRequestConditions { IfMatch = new ETag(grainState.ETag) }; + + var options = new BlobUploadOptions + { + HttpHeaders = new BlobHttpHeaders { ContentType = mimeType }, + Conditions = conditions, + }; + + var result = await DoOptimisticUpdate( + static state => state.self.UploadSerializedStateBufferedAsync(state.blob, state.options, state.value), + (self: this, blob, options, value: grainState.State), + blob, + grainState.ETag).ConfigureAwait(false); + + grainState.ETag = result.Value.ETag.ToString(); + grainState.RecordExists = true; + } + catch (RequestFailedException exception) when (exception.IsContainerNotFound()) + { + // if the container does not exist, create it, and make another attempt + LogTraceContainerNotFound(grainType, grainId, grainState.ETag, blob.Name, container.Name); + await container.CreateIfNotExistsAsync().ConfigureAwait(false); + await WriteStateBufferedStreamAndCreateContainerIfNotExists(grainType, grainId, grainState, mimeType, blob).ConfigureAwait(false); + } + } + + private async Task> UploadSerializedStateBufferedAsync(BlobClient blob, BlobUploadOptions options, T value) + { + if (streamSerializer is null) + { + throw new InvalidOperationException("Stream serializer is not configured."); + } + + var bufferStream = PooledBufferStream.Rent(); + try + { + await streamSerializer.SerializeAsync(value, bufferStream).ConfigureAwait(false); + bufferStream.Position = 0; + return await blob.UploadAsync(bufferStream, options).ConfigureAwait(false); + } + finally + { + PooledBufferStream.Return(bufferStream); + } + } + + private async Task ReadStateWithStreamAsync(BlobClient blob, string grainType, GrainId grainId, IGrainState grainState, string blobName, string containerName) + { + var response = await blob.DownloadStreamingAsync(); + grainState.ETag = response.Value.Details.ETag.ToString(); + var contentLength = response.Value.Details.ContentLength; + + if (contentLength <= 0) + { + LogTraceBlobEmptyReading(grainType, grainId, grainState.ETag, blobName, containerName); + return default; + } + + await using var content = response.Value.Content; + var loadedState = await streamSerializer!.DeserializeAsync(content).ConfigureAwait(false); + LogTraceDataRead(grainType, grainId, grainState.ETag, blobName, containerName); + return loadedState; + } + + private async Task ReadStateWithBinaryDataAsync(BlobClient blob, string grainType, GrainId grainId, IGrainState grainState, string blobName, string containerName) + { + var response = await blob.DownloadContentAsync(); + grainState.ETag = response.Value.Details.ETag.ToString(); + var contents = response.Value.Content; + + if (contents is null || contents.IsEmpty) + { + LogTraceBlobEmptyReading(grainType, grainId, grainState.ETag, blobName, containerName); + return default; + } + + var loadedState = this.ConvertFromStorageFormat(contents); + LogTraceDataRead(grainType, grainId, grainState.ETag, blobName, containerName); + return loadedState; + } + + private async Task ReadStateWithPooledBufferAsync(BlobClient blob, string grainType, GrainId grainId, IGrainState grainState, string blobName, string containerName) + { + var response = await blob.DownloadStreamingAsync(); + grainState.ETag = response.Value.Details.ETag.ToString(); + var contentLength = response.Value.Details.ContentLength; + + if (contentLength <= 0) + { + LogTraceBlobEmptyReading(grainType, grainId, grainState.ETag, blobName, containerName); + return default; + } + + await using var content = response.Value.Content; + T? loadedState; + if (contentLength <= int.MaxValue) + { + var buffer = ArrayPool.Shared.Rent((int)contentLength); + var memory = buffer.AsMemory(0, (int)contentLength); + await content.ReadExactlyAsync(memory); + loadedState = this.ConvertFromStorageFormat(new BinaryData(memory)); + ArrayPool.Shared.Return(buffer); + } + else + { + loadedState = this.ConvertFromStorageFormat(new BinaryData(content)); + LogWarningLargePayloadFallback(contentLength, grainType, grainId, grainState.ETag, blobName, containerName); + } + + LogTraceDataRead(grainType, grainId, grainState.ETag, blobName, containerName); + return loadedState; + } + private static async Task DoOptimisticUpdate(Func> updateOperation, TState state, BlobClient blob, string currentETag) { try @@ -294,6 +411,13 @@ private async Task Init(CancellationToken ct) )] private partial void LogTraceDataRead(string grainType, GrainId grainId, string? eTag, string blobName, string containerName); + [LoggerMessage( + Level = LogLevel.Warning, + EventId = (int)AzureProviderErrorCode.AzureBlobProvider_LargePayloadFallback, + Message = "ContentLength={ContentLength} exceeds max array size; falling back to DownloadContentAsync. GrainType={GrainType} GrainId={GrainId} ETag={ETag} BlobName={BlobName} in Container={ContainerName}" + )] + private partial void LogWarningLargePayloadFallback(long contentLength, string grainType, GrainId grainId, string? eTag, string blobName, string containerName); + [LoggerMessage( Level = LogLevel.Error, EventId = (int)AzureProviderErrorCode.AzureBlobProvider_ReadError, diff --git a/src/Azure/Orleans.Persistence.AzureStorage/Providers/Storage/AzureBlobStorageOptions.cs b/src/Azure/Orleans.Persistence.AzureStorage/Providers/Storage/AzureBlobStorageOptions.cs index 2a9bb3dbeab..9b3a7280011 100644 --- a/src/Azure/Orleans.Persistence.AzureStorage/Providers/Storage/AzureBlobStorageOptions.cs +++ b/src/Azure/Orleans.Persistence.AzureStorage/Providers/Storage/AzureBlobStorageOptions.cs @@ -11,6 +11,23 @@ namespace Orleans.Configuration { + public enum AzureBlobStorageWriteMode + { + /// + /// Serialize to and upload. + /// This uses the binary path, materializing the full payload in memory. + /// It is typically the fastest path but can create large allocations (including LOH) for big payloads. + /// + BinaryData, + + /// + /// Serialize using the stream serializer into a pooled in-memory stream and upload from that buffer. + /// This still buffers the full payload but avoids LOH churn by reusing pooled segments. + /// Requires ; otherwise the write falls back to . + /// + BufferedStream, + } + public class AzureBlobStorageOptions : IStorageProviderSerializerOptions { private BlobServiceClient _blobServiceClient; @@ -57,6 +74,20 @@ public BlobServiceClient BlobServiceClient /// public bool DeleteStateOnClear { get; set; } = true; + /// + /// Gets or sets a value indicating whether to use pooled buffers when reading blob contents. + /// The deserializer must not retain the or underlying buffer after deserialization. + /// When a stream serializer is configured, pooled buffers are used only if the content length fits in an . + /// When pooled buffers are used, deserialization goes through the binary path. + /// + public bool UsePooledBufferForReads { get; set; } = true; + + /// + /// Gets or sets the write path to use when a stream serializer is available. + /// If the stream serializer is not configured, writes always use . + /// + public AzureBlobStorageWriteMode WriteMode { get; set; } = AzureBlobStorageWriteMode.BinaryData; + /// /// A function for building container factory instances /// @@ -149,7 +180,7 @@ public void ValidateConfiguration() AzureBlobUtils.ValidateContainerName(options.ContainerName); AzureBlobUtils.ValidateBlobName(this.name); } - catch(ArgumentException e) + catch (ArgumentException e) { throw new OrleansConfigurationException( $"Configuration for AzureBlobStorageOptions {name} is invalid. {nameof(this.options.ContainerName)} is not valid", e); diff --git a/src/Orleans.Core/Providers/IGrainStorageSerializer.cs b/src/Orleans.Core/Providers/IGrainStorageSerializer.cs index 577f3b387e1..8a57409b2c5 100644 --- a/src/Orleans.Core/Providers/IGrainStorageSerializer.cs +++ b/src/Orleans.Core/Providers/IGrainStorageSerializer.cs @@ -1,4 +1,7 @@ using System; +using System.IO; +using System.Threading; +using System.Threading.Tasks; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Options; using Orleans.Runtime; @@ -27,6 +30,32 @@ public interface IGrainStorageSerializer T Deserialize(BinaryData input); } +#nullable enable + /// + /// Optional stream-based serializer for grain state. + /// + public interface IGrainStorageStreamingSerializer + { + /// + /// Serializes the object input to a stream. + /// + /// The object to serialize. + /// The destination stream. + /// The cancellation token. + /// The input type. + ValueTask SerializeAsync(T input, Stream destination, CancellationToken cancellationToken = default); + + /// + /// Deserializes the provided data from a stream. + /// + /// The input stream. + /// The cancellation token. + /// The output type. + /// The deserialized object. + ValueTask DeserializeAsync(Stream input, CancellationToken cancellationToken = default); + } +#nullable restore + /// /// Extensions for . /// @@ -76,7 +105,7 @@ public void PostConfigure(string name, TOptions options) { if (options.GrainStorageSerializer == default) { - // First, try to get a IGrainStorageSerializer that was registered with + // First, try to get a IGrainStorageSerializer that was registered with // the same name as the storage provider // If none is found, fallback to system wide default options.GrainStorageSerializer = _serviceProvider.GetKeyedService(name) ?? _serviceProvider.GetRequiredService(); diff --git a/src/Orleans.Core/Providers/StorageSerializer/JsonGrainStorageSerializer.cs b/src/Orleans.Core/Providers/StorageSerializer/JsonGrainStorageSerializer.cs index ab14744d677..a5528764d54 100644 --- a/src/Orleans.Core/Providers/StorageSerializer/JsonGrainStorageSerializer.cs +++ b/src/Orleans.Core/Providers/StorageSerializer/JsonGrainStorageSerializer.cs @@ -1,4 +1,3 @@ -using System; using Orleans.Serialization; namespace Orleans.Storage @@ -6,7 +5,7 @@ namespace Orleans.Storage /// /// Grain storage serializer that uses Newtonsoft.Json /// - public class JsonGrainStorageSerializer : IGrainStorageSerializer + public class JsonGrainStorageSerializer : IGrainStorageSerializer, IGrainStorageStreamingSerializer { private readonly OrleansJsonSerializer _orleansJsonSerializer; @@ -30,5 +29,20 @@ public T Deserialize(BinaryData input) { return (T)_orleansJsonSerializer.Deserialize(typeof(T), input.ToString()); } + + /// + public ValueTask SerializeAsync(T value, Stream destination, CancellationToken cancellationToken = default) + { + cancellationToken.ThrowIfCancellationRequested(); + _orleansJsonSerializer.Serialize(value, typeof(T), destination); + return ValueTask.CompletedTask; + } + + /// + public ValueTask DeserializeAsync(Stream input, CancellationToken cancellationToken = default) + { + cancellationToken.ThrowIfCancellationRequested(); + return ValueTask.FromResult((T)_orleansJsonSerializer.Deserialize(typeof(T), input)); + } } } diff --git a/src/Orleans.Core/Providers/StorageSerializer/OrleansGrainStateSerializer.cs b/src/Orleans.Core/Providers/StorageSerializer/OrleansGrainStateSerializer.cs index 436895b6702..bdecc8d094a 100644 --- a/src/Orleans.Core/Providers/StorageSerializer/OrleansGrainStateSerializer.cs +++ b/src/Orleans.Core/Providers/StorageSerializer/OrleansGrainStateSerializer.cs @@ -1,13 +1,17 @@ using System; +using System.IO; +using System.Threading; +using System.Threading.Tasks; using System.Buffers; using Orleans.Serialization; +using Orleans.Serialization.Buffers.Adaptors; namespace Orleans.Storage { /// /// Grain storage serializer that uses the Orleans . /// - public class OrleansGrainStorageSerializer : IGrainStorageSerializer + public class OrleansGrainStorageSerializer : IGrainStorageSerializer, IGrainStorageStreamingSerializer { private readonly Serializer serializer; @@ -33,5 +37,42 @@ public T Deserialize(BinaryData input) { return this.serializer.Deserialize(input.ToMemory()); } + + /// + public ValueTask SerializeAsync(T value, Stream destination, CancellationToken cancellationToken = default) + { + this.serializer.Serialize(value, destination); + return ValueTask.CompletedTask; + } + + /// + public async ValueTask DeserializeAsync(Stream input, CancellationToken cancellationToken = default) + { + // Seekable streams (e.g., MemoryStream, FileStream) can be deserialized directly without buffering. + // Non-seekable streams (e.g., NetworkStream) require buffering to enable efficient multi-pass reading. + if (input.CanSeek) + { + return this.serializer.Deserialize(input); + } + + var bufferStream = PooledBufferStream.Rent(); + try + { + await input.CopyToAsync(bufferStream, cancellationToken).ConfigureAwait(false); + var sequence = bufferStream.RentReadOnlySequence(); + try + { + return this.serializer.Deserialize(sequence); + } + finally + { + bufferStream.ReturnReadOnlySequence(sequence); + } + } + finally + { + PooledBufferStream.Return(bufferStream); + } + } } } diff --git a/src/Orleans.Core/Serialization/OrleansJsonSerializer.cs b/src/Orleans.Core/Serialization/OrleansJsonSerializer.cs index 5ae2cd79b31..487c78c3d44 100644 --- a/src/Orleans.Core/Serialization/OrleansJsonSerializer.cs +++ b/src/Orleans.Core/Serialization/OrleansJsonSerializer.cs @@ -1,10 +1,9 @@ -using System; using System.Net; +using System.Text; +using Microsoft.Extensions.Options; using Newtonsoft.Json; using Newtonsoft.Json.Linq; -using Orleans.Runtime; using Orleans.GrainReferences; -using Microsoft.Extensions.Options; namespace Orleans.Serialization { @@ -43,12 +42,40 @@ public object Deserialize(Type expectedType, string input) return JsonConvert.DeserializeObject(input, expectedType, this.settings); } + /// + /// Deserializes an object of the specified expected type from the provided stream. + /// + /// The expected type. + /// The input stream. + /// The deserialized object. + public object Deserialize(Type expectedType, Stream input) + { + using var reader = new StreamReader(input, Encoding.UTF8, detectEncodingFromByteOrderMarks: true, bufferSize: 1024, leaveOpen: true); + using var jsonReader = new JsonTextReader(reader); + var serializer = JsonSerializer.Create(this.settings); + return serializer.Deserialize(jsonReader, expectedType); + } + /// /// Serializes an object to a JSON string. /// /// The object to serialize. /// The type the deserializer should expect. public string Serialize(object item, Type expectedType) => JsonConvert.SerializeObject(item, expectedType, this.settings); + + /// + /// Serializes an object to a stream. + /// + /// The object to serialize. + /// The type the deserializer should expect. + /// The destination stream. + public void Serialize(object item, Type expectedType, Stream destination) + { + using var writer = new StreamWriter(destination, new UTF8Encoding(encoderShouldEmitUTF8Identifier: false), 1024, leaveOpen: true); + using var jsonWriter = new JsonTextWriter(writer); + var serializer = JsonSerializer.Create(this.settings); + serializer.Serialize(jsonWriter, item, expectedType); + } } /// @@ -289,7 +316,7 @@ public override bool CanConvert(Type objectType) /// public override void WriteJson(JsonWriter writer, object value, JsonSerializer serializer) { - var val = ((IAddressable)value).AsReference(); + var val = ((IAddressable)value).AsReference(); writer.WriteStartObject(); writer.WritePropertyName("Id"); writer.WriteStartObject(); diff --git a/test/Benchmarks/GrainStorage/AzureBlobReadStateBenchmark.cs b/test/Benchmarks/GrainStorage/AzureBlobReadStateBenchmark.cs new file mode 100644 index 00000000000..9b8465caf32 --- /dev/null +++ b/test/Benchmarks/GrainStorage/AzureBlobReadStateBenchmark.cs @@ -0,0 +1,162 @@ +using Azure.Storage.Blobs; +using BenchmarkDotNet.Attributes; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging.Abstractions; +using Microsoft.Extensions.Options; +using Orleans.Configuration; +using Orleans.Serialization; +using Orleans.Serialization.Serializers; +using Orleans.Storage; +using TestExtensions; + +namespace Benchmarks.GrainStorage; + +[SimpleJob(launchCount: 1, warmupCount: 2, iterationCount: 5)] +[MemoryDiagnoser(true)] +public class AzureBlobReadStateBenchmark +{ + private const int ReadIterations = 50; + private ServiceProvider _serviceProvider = null!; + private AzureBlobGrainStorage _nonPooledStorage = null!; + private AzureBlobGrainStorage _pooledStorage = null!; + private IGrainState _nonPooledState = null!; + private IGrainState _pooledState = null!; + private GrainId _grainId; + private string _grainType = null!; + private BlobContainerClient _containerClient = null!; + + [Params(4 * 1024, 64 * 1024, 128 * 1024)] + public int PayloadSize { get; set; } + + [Params(StorageSerializerKind.Orleans, StorageSerializerKind.NewtonsoftJson, StorageSerializerKind.SystemTextJson)] + public StorageSerializerKind SerializerKind { get; set; } + + [GlobalSetup] + public async Task SetupAsync() + { + var client = CreateBlobServiceClient(); + var services = new ServiceCollection() + .AddLogging() + .AddSerializer(); + services.AddOptions(); + services.AddSingleton, ConfigureOrleansJsonSerializerOptions>(); + services.AddSingleton(); + _serviceProvider = services.BuildServiceProvider(); + + var activatorProvider = _serviceProvider.GetRequiredService(); + var serializer = CreateSerializer(SerializerKind); + var nonStreamingSerializer = new NonStreamingGrainStorageSerializer(serializer); + var containerName = $"bench-grainstate-{Guid.NewGuid():N}"; + _grainType = "bench-grain"; + _grainId = GrainId.Create("bench-grain", Guid.NewGuid().ToString("N")); + + var nonPooledOptions = CreateOptions(client, nonStreamingSerializer, containerName, usePooledReads: false); + var pooledOptions = CreateOptions(client, nonStreamingSerializer, containerName, usePooledReads: true); + + (_nonPooledStorage, var nonPooledFactory) = CreateStorage("bench-nonpooled", nonPooledOptions, activatorProvider); + (_pooledStorage, var pooledFactory) = CreateStorage("bench-pooled", pooledOptions, activatorProvider); + + await nonPooledFactory.InitializeAsync(client); + await pooledFactory.InitializeAsync(client); + + var writeState = new GrainState + { + State = BenchmarkState.Create(PayloadSize) + }; + await _nonPooledStorage.WriteStateAsync(_grainType, _grainId, writeState); + + _nonPooledState = new GrainState(); + _pooledState = new GrainState(); + _containerClient = client.GetBlobContainerClient(containerName); + } + + [GlobalCleanup] + public async Task CleanupAsync() + { + if (_containerClient is not null) + { + await _containerClient.DeleteIfExistsAsync(); + } + + _serviceProvider?.Dispose(); + } + + [Benchmark(Baseline = true, OperationsPerInvoke = ReadIterations)] + public async Task ReadStateNonPooledAsync() + { + for (var i = 0; i < ReadIterations; i++) + { + await _nonPooledStorage.ReadStateAsync(_grainType, _grainId, _nonPooledState); + } + } + + [Benchmark(OperationsPerInvoke = ReadIterations)] + public async Task ReadStatePooledAsync() + { + for (var i = 0; i < ReadIterations; i++) + { + await _pooledStorage.ReadStateAsync(_grainType, _grainId, _pooledState); + } + } + + private static BlobServiceClient CreateBlobServiceClient() + { + if (TestDefaultConfiguration.UseAadAuthentication) + { + if (!TestDefaultConfiguration.GetValue(nameof(TestDefaultConfiguration.DataBlobUri), out var blobUriValue) || + string.IsNullOrWhiteSpace(blobUriValue)) + { + throw new InvalidOperationException("DataBlobUri is required when UseAadAuthentication is true."); + } + + return new BlobServiceClient(new Uri(blobUriValue), TestDefaultConfiguration.TokenCredential); + } + + if (string.IsNullOrWhiteSpace(TestDefaultConfiguration.DataConnectionString)) + { + throw new InvalidOperationException("OrleansDataConnectionString must be set for Azure Blob benchmarks."); + } + + return new BlobServiceClient(TestDefaultConfiguration.DataConnectionString); + } + + private static AzureBlobStorageOptions CreateOptions( + BlobServiceClient client, + IGrainStorageSerializer serializer, + string containerName, + bool usePooledReads) + { + return new AzureBlobStorageOptions + { + BlobServiceClient = client, + ContainerName = containerName, + GrainStorageSerializer = serializer, + UsePooledBufferForReads = usePooledReads + }; + } + + private (AzureBlobGrainStorage Storage, IBlobContainerFactory Factory) CreateStorage( + string name, + AzureBlobStorageOptions options, + IActivatorProvider activatorProvider) + { + var containerFactory = options.BuildContainerFactory(_serviceProvider, options); + var logger = NullLogger.Instance; + var storage = new AzureBlobGrainStorage(name, options, containerFactory, activatorProvider, logger); + return (storage, containerFactory); + } + + private IGrainStorageSerializer CreateSerializer(StorageSerializerKind kind) + { + return kind switch + { + StorageSerializerKind.Orleans => new OrleansGrainStorageSerializer(_serviceProvider.GetRequiredService()), + StorageSerializerKind.NewtonsoftJson => new JsonGrainStorageSerializer(CreateJsonSerializer()), + StorageSerializerKind.SystemTextJson => new SystemTextJsonGrainStorageSerializer(), + _ => throw new InvalidOperationException($"Unknown serializer kind '{kind}'.") + }; + } + + private static OrleansJsonSerializer CreateJsonSerializer() + => new(Options.Create(new OrleansJsonSerializerOptions())); +} diff --git a/test/Benchmarks/GrainStorage/AzureBlobReadStateStreamingBenchmark.cs b/test/Benchmarks/GrainStorage/AzureBlobReadStateStreamingBenchmark.cs new file mode 100644 index 00000000000..bd2853f39a2 --- /dev/null +++ b/test/Benchmarks/GrainStorage/AzureBlobReadStateStreamingBenchmark.cs @@ -0,0 +1,168 @@ +using Azure.Storage.Blobs; +using BenchmarkDotNet.Attributes; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging.Abstractions; +using Microsoft.Extensions.Options; +using Orleans.Configuration; +using Orleans.Serialization; +using Orleans.Serialization.Serializers; +using Orleans.Storage; +using TestExtensions; + +namespace Benchmarks.GrainStorage; + +[SimpleJob(launchCount: 1, warmupCount: 2, iterationCount: 5)] +[MemoryDiagnoser(true)] +public class AzureBlobReadStateStreamingBenchmark +{ + private const int ReadIterations = 50; + private ServiceProvider _serviceProvider = null!; + private AzureBlobGrainStorage _binaryStorage = null!; + private AzureBlobGrainStorage _streamStorage = null!; + private IGrainState _binaryState = null!; + private IGrainState _streamState = null!; + private GrainId _binaryGrainId; + private GrainId _streamGrainId; + private string _grainType = null!; + private BlobContainerClient _containerClient = null!; + + [Params(4 * 1024, 64 * 1024, 128 * 1024)] + public int PayloadSize { get; set; } + + [Params(StorageSerializerKind.Orleans, StorageSerializerKind.NewtonsoftJson, StorageSerializerKind.SystemTextJson)] + public StorageSerializerKind SerializerKind { get; set; } + + [GlobalSetup] + public async Task SetupAsync() + { + var client = CreateBlobServiceClient(); + var services = new ServiceCollection() + .AddLogging() + .AddSerializer(); + services.AddOptions(); + services.AddSingleton, ConfigureOrleansJsonSerializerOptions>(); + services.AddSingleton(); + _serviceProvider = services.BuildServiceProvider(); + + var activatorProvider = _serviceProvider.GetRequiredService(); + var serializer = CreateSerializer(SerializerKind); + var containerName = $"bench-grainstate-{Guid.NewGuid():N}"; + _grainType = "bench-grain"; + _binaryGrainId = GrainId.Create("bench-grain", Guid.NewGuid().ToString("N")); + _streamGrainId = GrainId.Create("bench-grain", Guid.NewGuid().ToString("N")); + + var binaryOptions = CreateOptions(client, new NonStreamingGrainStorageSerializer(serializer), containerName, usePooledReads: false); + var streamOptions = CreateOptions(client, serializer, containerName, usePooledReads: true); + + (_binaryStorage, var binaryFactory) = CreateStorage("bench-binary", binaryOptions, activatorProvider); + (_streamStorage, var streamFactory) = CreateStorage("bench-stream", streamOptions, activatorProvider); + + await binaryFactory.InitializeAsync(client); + await streamFactory.InitializeAsync(client); + + var writeBinaryState = new GrainState + { + State = BenchmarkState.Create(PayloadSize) + }; + var writeStreamState = new GrainState + { + State = BenchmarkState.Create(PayloadSize) + }; + await _binaryStorage.WriteStateAsync(_grainType, _binaryGrainId, writeBinaryState); + await _streamStorage.WriteStateAsync(_grainType, _streamGrainId, writeStreamState); + + _binaryState = new GrainState(); + _streamState = new GrainState(); + _containerClient = client.GetBlobContainerClient(containerName); + } + + [GlobalCleanup] + public async Task CleanupAsync() + { + if (_containerClient is not null) + { + await _containerClient.DeleteIfExistsAsync(); + } + + _serviceProvider?.Dispose(); + } + + [Benchmark(Baseline = true, OperationsPerInvoke = ReadIterations)] + public async Task ReadStateBinaryAsync() + { + for (var i = 0; i < ReadIterations; i++) + { + await _binaryStorage.ReadStateAsync(_grainType, _binaryGrainId, _binaryState); + } + } + + [Benchmark(OperationsPerInvoke = ReadIterations)] + public async Task ReadStateStreamAsync() + { + for (var i = 0; i < ReadIterations; i++) + { + await _streamStorage.ReadStateAsync(_grainType, _streamGrainId, _streamState); + } + } + + private static BlobServiceClient CreateBlobServiceClient() + { + if (TestDefaultConfiguration.UseAadAuthentication) + { + if (!TestDefaultConfiguration.GetValue(nameof(TestDefaultConfiguration.DataBlobUri), out var blobUriValue) || + string.IsNullOrWhiteSpace(blobUriValue)) + { + throw new InvalidOperationException("DataBlobUri is required when UseAadAuthentication is true."); + } + + return new BlobServiceClient(new Uri(blobUriValue), TestDefaultConfiguration.TokenCredential); + } + + if (string.IsNullOrWhiteSpace(TestDefaultConfiguration.DataConnectionString)) + { + throw new InvalidOperationException("OrleansDataConnectionString must be set for Azure Blob benchmarks."); + } + + return new BlobServiceClient(TestDefaultConfiguration.DataConnectionString); + } + + private static AzureBlobStorageOptions CreateOptions( + BlobServiceClient client, + IGrainStorageSerializer serializer, + string containerName, + bool usePooledReads) + { + return new AzureBlobStorageOptions + { + BlobServiceClient = client, + ContainerName = containerName, + GrainStorageSerializer = serializer, + UsePooledBufferForReads = usePooledReads + }; + } + + private (AzureBlobGrainStorage Storage, IBlobContainerFactory Factory) CreateStorage( + string name, + AzureBlobStorageOptions options, + IActivatorProvider activatorProvider) + { + var containerFactory = options.BuildContainerFactory(_serviceProvider, options); + var logger = NullLogger.Instance; + var storage = new AzureBlobGrainStorage(name, options, containerFactory, activatorProvider, logger); + return (storage, containerFactory); + } + + private IGrainStorageSerializer CreateSerializer(StorageSerializerKind kind) + { + return kind switch + { + StorageSerializerKind.Orleans => new OrleansGrainStorageSerializer(_serviceProvider.GetRequiredService()), + StorageSerializerKind.NewtonsoftJson => new JsonGrainStorageSerializer(CreateJsonSerializer()), + StorageSerializerKind.SystemTextJson => new SystemTextJsonGrainStorageSerializer(), + _ => throw new InvalidOperationException($"Unknown serializer kind '{kind}'.") + }; + } + + private static OrleansJsonSerializer CreateJsonSerializer() + => new(Options.Create(new OrleansJsonSerializerOptions())); +} diff --git a/test/Benchmarks/GrainStorage/AzureBlobWriteStateStreamingBenchmark.cs b/test/Benchmarks/GrainStorage/AzureBlobWriteStateStreamingBenchmark.cs new file mode 100644 index 00000000000..a0079ac8fcf --- /dev/null +++ b/test/Benchmarks/GrainStorage/AzureBlobWriteStateStreamingBenchmark.cs @@ -0,0 +1,158 @@ +using Azure.Storage.Blobs; +using BenchmarkDotNet.Attributes; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging.Abstractions; +using Microsoft.Extensions.Options; +using Orleans.Configuration; +using Orleans.Serialization; +using Orleans.Serialization.Serializers; +using Orleans.Storage; +using TestExtensions; + +namespace Benchmarks.GrainStorage; + +[SimpleJob(launchCount: 1, warmupCount: 2, iterationCount: 5)] +[MemoryDiagnoser(true)] +public class AzureBlobWriteStateStreamingBenchmark +{ + private const int WriteIterations = 50; + private ServiceProvider _serviceProvider = null!; + private AzureBlobGrainStorage _binaryStorage = null!; + private AzureBlobGrainStorage _streamStorage = null!; + private GrainState _binaryState = null!; + private GrainState _streamState = null!; + private GrainId _binaryGrainId; + private GrainId _streamGrainId; + private string _grainType = null!; + private BlobContainerClient _containerClient = null!; + + [Params(4 * 1024, 64 * 1024, 128 * 1024)] + public int PayloadSize { get; set; } + + [Params(StorageSerializerKind.Orleans, StorageSerializerKind.NewtonsoftJson, StorageSerializerKind.SystemTextJson)] + public StorageSerializerKind SerializerKind { get; set; } + + [GlobalSetup] + public async Task SetupAsync() + { + var client = CreateBlobServiceClient(); + var services = new ServiceCollection() + .AddLogging() + .AddSerializer(); + services.AddOptions(); + services.AddSingleton, ConfigureOrleansJsonSerializerOptions>(); + services.AddSingleton(); + _serviceProvider = services.BuildServiceProvider(); + + var activatorProvider = _serviceProvider.GetRequiredService(); + var serializer = CreateSerializer(SerializerKind); + var containerName = $"bench-grainstate-{Guid.NewGuid():N}"; + _grainType = "bench-grain"; + _binaryGrainId = GrainId.Create("bench-grain", Guid.NewGuid().ToString("N")); + _streamGrainId = GrainId.Create("bench-grain", Guid.NewGuid().ToString("N")); + + var binaryOptions = CreateOptions(client, new NonStreamingGrainStorageSerializer(serializer), containerName, AzureBlobStorageWriteMode.BinaryData); + var streamOptions = CreateOptions(client, serializer, containerName, AzureBlobStorageWriteMode.BufferedStream); + + (_binaryStorage, var binaryFactory) = CreateStorage("bench-binary", binaryOptions, activatorProvider); + (_streamStorage, var streamFactory) = CreateStorage("bench-stream", streamOptions, activatorProvider); + + await binaryFactory.InitializeAsync(client); + await streamFactory.InitializeAsync(client); + + _binaryState = new GrainState { State = BenchmarkState.Create(PayloadSize) }; + _streamState = new GrainState { State = BenchmarkState.Create(PayloadSize) }; + _containerClient = client.GetBlobContainerClient(containerName); + } + + [GlobalCleanup] + public async Task CleanupAsync() + { + if (_containerClient is not null) + { + await _containerClient.DeleteIfExistsAsync(); + } + + _serviceProvider?.Dispose(); + } + + [Benchmark(Baseline = true, OperationsPerInvoke = WriteIterations)] + public async Task WriteStateBinaryAsync() + { + for (var i = 0; i < WriteIterations; i++) + { + await _binaryStorage.WriteStateAsync(_grainType, _binaryGrainId, _binaryState); + } + } + + [Benchmark(OperationsPerInvoke = WriteIterations)] + public async Task WriteStateStreamAsync() + { + for (var i = 0; i < WriteIterations; i++) + { + await _streamStorage.WriteStateAsync(_grainType, _streamGrainId, _streamState); + } + } + + private static BlobServiceClient CreateBlobServiceClient() + { + if (TestDefaultConfiguration.UseAadAuthentication) + { + if (!TestDefaultConfiguration.GetValue(nameof(TestDefaultConfiguration.DataBlobUri), out var blobUriValue) || + string.IsNullOrWhiteSpace(blobUriValue)) + { + throw new InvalidOperationException("DataBlobUri is required when UseAadAuthentication is true."); + } + + return new BlobServiceClient(new Uri(blobUriValue), TestDefaultConfiguration.TokenCredential); + } + + if (string.IsNullOrWhiteSpace(TestDefaultConfiguration.DataConnectionString)) + { + throw new InvalidOperationException("OrleansDataConnectionString must be set for Azure Blob benchmarks."); + } + + return new BlobServiceClient(TestDefaultConfiguration.DataConnectionString); + } + + private static AzureBlobStorageOptions CreateOptions( + BlobServiceClient client, + IGrainStorageSerializer serializer, + string containerName, + AzureBlobStorageWriteMode writeMode) + { + return new AzureBlobStorageOptions + { + BlobServiceClient = client, + ContainerName = containerName, + GrainStorageSerializer = serializer, + UsePooledBufferForReads = false, + WriteMode = writeMode + }; + } + + private (AzureBlobGrainStorage Storage, IBlobContainerFactory Factory) CreateStorage( + string name, + AzureBlobStorageOptions options, + IActivatorProvider activatorProvider) + { + var containerFactory = options.BuildContainerFactory(_serviceProvider, options); + var logger = NullLogger.Instance; + var storage = new AzureBlobGrainStorage(name, options, containerFactory, activatorProvider, logger); + return (storage, containerFactory); + } + + private IGrainStorageSerializer CreateSerializer(StorageSerializerKind kind) + { + return kind switch + { + StorageSerializerKind.Orleans => new OrleansGrainStorageSerializer(_serviceProvider.GetRequiredService()), + StorageSerializerKind.NewtonsoftJson => new JsonGrainStorageSerializer(CreateJsonSerializer()), + StorageSerializerKind.SystemTextJson => new SystemTextJsonGrainStorageSerializer(), + _ => throw new InvalidOperationException($"Unknown serializer kind '{kind}'.") + }; + } + + private static OrleansJsonSerializer CreateJsonSerializer() + => new(Options.Create(new OrleansJsonSerializerOptions())); +} diff --git a/test/Benchmarks/GrainStorage/BenchmarkState.cs b/test/Benchmarks/GrainStorage/BenchmarkState.cs new file mode 100644 index 00000000000..c162f9b6684 --- /dev/null +++ b/test/Benchmarks/GrainStorage/BenchmarkState.cs @@ -0,0 +1,50 @@ +using Orleans.Serialization; + +namespace Benchmarks.GrainStorage; + +[GenerateSerializer] +public sealed class BenchmarkState +{ + [Id(0)] + public string PayloadA { get; set; } = string.Empty; + + [Id(1)] + public string PayloadB { get; set; } = string.Empty; + + [Id(2)] + public string PayloadC { get; set; } = string.Empty; + + [Id(3)] + public string PayloadD { get; set; } = string.Empty; + + public static BenchmarkState Create(int size) + { + var perPropertySize = size / 4; + var perPropertyChars = Math.Max(0, perPropertySize / 2); + var payload = CreatePayload(perPropertyChars); + + return new BenchmarkState + { + PayloadA = payload, + PayloadB = payload, + PayloadC = payload, + PayloadD = payload + }; + } + + private static string CreatePayload(int length) + { + if (length <= 0) + { + return string.Empty; + } + + var chars = new char[length]; + for (var i = 0; i < chars.Length; i++) + { + chars[i] = (char)('a' + Random.Shared.Next(0, 26)); + } + + return new string(chars); + } +} diff --git a/test/Benchmarks/GrainStorage/StorageSerializerKind.cs b/test/Benchmarks/GrainStorage/StorageSerializerKind.cs new file mode 100644 index 00000000000..f84b3bb4314 --- /dev/null +++ b/test/Benchmarks/GrainStorage/StorageSerializerKind.cs @@ -0,0 +1,8 @@ +namespace Benchmarks.GrainStorage; + +public enum StorageSerializerKind +{ + Orleans, + NewtonsoftJson, + SystemTextJson +} diff --git a/test/Benchmarks/GrainStorage/SystemTextJsonGrainStorageSerializer.cs b/test/Benchmarks/GrainStorage/SystemTextJsonGrainStorageSerializer.cs new file mode 100644 index 00000000000..e0ee93c0a47 --- /dev/null +++ b/test/Benchmarks/GrainStorage/SystemTextJsonGrainStorageSerializer.cs @@ -0,0 +1,37 @@ +using System.Text.Json; +using Orleans.Storage; + +namespace Benchmarks.GrainStorage; + +public sealed class SystemTextJsonGrainStorageSerializer : IGrainStorageSerializer, IGrainStorageStreamingSerializer +{ + private readonly JsonSerializerOptions _options; + + public SystemTextJsonGrainStorageSerializer(JsonSerializerOptions options = null) + { + _options = options ?? new JsonSerializerOptions(); + } + + public BinaryData Serialize(T input) + { + var payload = JsonSerializer.SerializeToUtf8Bytes(input, _options); + return new BinaryData(payload); + } + + public T Deserialize(BinaryData input) + { + var result = JsonSerializer.Deserialize(input.ToMemory().Span, _options); + return result!; + } + + public ValueTask SerializeAsync(T input, Stream destination, CancellationToken cancellationToken = default) + { + return new(JsonSerializer.SerializeAsync(destination, input, _options, cancellationToken)); + } + + public async ValueTask DeserializeAsync(Stream input, CancellationToken cancellationToken = default) + { + var result = await JsonSerializer.DeserializeAsync(input, _options, cancellationToken).ConfigureAwait(false); + return result!; + } +} diff --git a/test/Benchmarks/Program.cs b/test/Benchmarks/Program.cs index 34a628d484b..2016c7f55ab 100644 --- a/test/Benchmarks/Program.cs +++ b/test/Benchmarks/Program.cs @@ -260,6 +260,18 @@ internal class Program benchmark => benchmark.RunAsync().GetAwaiter().GetResult(), benchmark => benchmark.Teardown()); }, + ["GrainStorage.AzureBlob.ReadState"] = _ => + { + BenchmarkRunner.Run(); + }, + ["GrainStorage.AzureBlob.ReadState.Streaming"] = _ => + { + BenchmarkRunner.Run(); + }, + ["GrainStorage.AzureBlob.WriteState.Streaming"] = _ => + { + BenchmarkRunner.Run(); + }, ["GrainStorage.AdoNet"] = _ => { RunBenchmark( diff --git a/test/Benchmarks/Properties/launchSettings.json b/test/Benchmarks/Properties/launchSettings.json deleted file mode 100644 index 58217d2419a..00000000000 --- a/test/Benchmarks/Properties/launchSettings.json +++ /dev/null @@ -1,8 +0,0 @@ -{ - "profiles": { - "Benchmarks": { - "commandName": "Project", - "commandLineArgs": "FanoutForever" - } - } -} \ No newline at end of file diff --git a/test/Benchmarks/run_test.cmd b/test/Benchmarks/run_test.cmd index fb3acdfe15d..502387283eb 100644 --- a/test/Benchmarks/run_test.cmd +++ b/test/Benchmarks/run_test.cmd @@ -1,5 +1,5 @@ pushd %~dp0 git log -n 1 git --no-pager diff -dotnet run -c Release -- ConcurrentPing -popd \ No newline at end of file +dotnet run -c Release --framework net10.0 -- GrainStorage.AzureBlob.WriteState.Streaming +popd diff --git a/test/Extensions/TesterAzureUtils/Persistence/PersistenceGrainTests_AzureBlobStore_PooledReads.cs b/test/Extensions/TesterAzureUtils/Persistence/PersistenceGrainTests_AzureBlobStore_PooledReads.cs new file mode 100644 index 00000000000..69dc89ad668 --- /dev/null +++ b/test/Extensions/TesterAzureUtils/Persistence/PersistenceGrainTests_AzureBlobStore_PooledReads.cs @@ -0,0 +1,56 @@ +using Orleans.Configuration; +using Orleans.Storage; +using Orleans.TestingHost; +using TestExtensions; +using Xunit; +using Xunit.Abstractions; + +namespace Tester.AzureUtils.Persistence; + +/// +/// PersistenceGrainTests using AzureStore with pooled read buffers enabled. +/// +[TestCategory("Persistence"), TestCategory("AzureStorage")] +public class PersistenceGrainTests_AzureBlobStore_PooledReads : Base_PersistenceGrainTests_AzureStore, IClassFixture +{ + public class Fixture : BaseAzureTestClusterFixture + { + private class StorageSiloBuilderConfigurator : ISiloConfigurator + { + public void Configure(ISiloBuilder hostBuilder) + { + hostBuilder + .AddAzureBlobGrainStorage("GrainStorageForTest", optionsBuilder => + { + optionsBuilder.Configure(options => + { + options.ConfigureTestDefaults(); + options.UsePooledBufferForReads = true; + }); + optionsBuilder.Configure((options, serializer) => + { + // Use a non-streaming wrapper to ensure the pooled buffer path is exercised. + options.GrainStorageSerializer = new NonStreamingGrainStorageSerializer(serializer); + }); + }) + .AddMemoryGrainStorage("MemoryStore") + .AddMemoryGrainStorage("test1"); + } + } + + protected override void ConfigureTestCluster(TestClusterBuilder builder) + { + builder.Options.InitialSilosCount = 4; + builder.Options.UseTestClusterMembership = false; + builder.AddSiloBuilderConfigurator(); + builder.AddSiloBuilderConfigurator(); + builder.AddClientBuilderConfigurator(); + } + } + + public PersistenceGrainTests_AzureBlobStore_PooledReads(ITestOutputHelper output, Fixture fixture) : base(output, fixture) + { + fixture.EnsurePreconditionsMet(); + } + +} diff --git a/test/Extensions/TesterAzureUtils/Persistence/PersistenceGrainTests_AzureBlobStore_StreamSerializer.cs b/test/Extensions/TesterAzureUtils/Persistence/PersistenceGrainTests_AzureBlobStore_StreamSerializer.cs new file mode 100644 index 00000000000..f4c31cd01b3 --- /dev/null +++ b/test/Extensions/TesterAzureUtils/Persistence/PersistenceGrainTests_AzureBlobStore_StreamSerializer.cs @@ -0,0 +1,234 @@ +using Azure; +using Orleans.Configuration; +using Orleans.Storage; +using Orleans.TestingHost; +using System.Threading; +using UnitTests.GrainInterfaces; +using Xunit; +using Xunit.Abstractions; +using static Tester.AzureUtils.Persistence.Base_PersistenceGrainTests_AzureStore; + +namespace Tester.AzureUtils.Persistence; + +/// +/// Validates the default Azure blob storage serializer behavior when stream support is available. +/// +[TestCategory("Persistence"), TestCategory("AzureStorage")] +public class PersistenceGrainTests_AzureBlobStore_StreamSerializer : IClassFixture +{ + private readonly Fixture fixture; + + public PersistenceGrainTests_AzureBlobStore_StreamSerializer(ITestOutputHelper output, Fixture fixture) + { + this.fixture = fixture; + fixture.EnsurePreconditionsMet(); + } + + [SkippableFact, TestCategory("Functional")] + public async Task AzureBlobStorage_DefaultsToBinaryWritesAndStreamReads() + { + CountingGrainStorageSerializer.Reset(); + + var grain = fixture.GrainFactory.GetGrain(Guid.NewGuid(), "UnitTests.Grains"); + await grain.DoWrite(1); + _ = await grain.DoRead(); + + Assert.True(CountingGrainStorageSerializer.BinarySerializeCount > 0); + Assert.True(CountingGrainStorageSerializer.StreamDeserializeCount > 0); + Assert.Equal(0, CountingGrainStorageSerializer.StreamSerializeCount); + Assert.Equal(0, CountingGrainStorageSerializer.BinaryDeserializeCount); + } + + public class Fixture : BaseAzureTestClusterFixture + { + protected override void ConfigureTestCluster(TestClusterBuilder builder) + { + builder.Options.InitialSilosCount = 1; + builder.Options.UseTestClusterMembership = false; + builder.AddSiloBuilderConfigurator(); + builder.AddSiloBuilderConfigurator(); + builder.AddClientBuilderConfigurator(); + } + + private class StorageSiloBuilderConfigurator : ISiloConfigurator + { + public void Configure(ISiloBuilder hostBuilder) + { + hostBuilder.AddAzureBlobGrainStorage("GrainStorageForTest", optionsBuilder => + { + optionsBuilder.Configure(options => + { + options.ConfigureTestDefaults(); + options.UsePooledBufferForReads = true; + }); + optionsBuilder.Configure((options, serializer) => + { + options.GrainStorageSerializer = new CountingGrainStorageSerializer(serializer); + }); + }); + } + } + } + + private sealed class CountingGrainStorageSerializer : IGrainStorageSerializer, IGrainStorageStreamingSerializer + { + private readonly IGrainStorageSerializer _inner; + private readonly IGrainStorageStreamingSerializer _innerStream; + + public static long BinarySerializeCount; + public static long BinaryDeserializeCount; + public static long StreamSerializeCount; + public static long StreamDeserializeCount; + + public static void Reset() + { + Interlocked.Exchange(ref BinarySerializeCount, 0); + Interlocked.Exchange(ref BinaryDeserializeCount, 0); + Interlocked.Exchange(ref StreamSerializeCount, 0); + Interlocked.Exchange(ref StreamDeserializeCount, 0); + } + + public CountingGrainStorageSerializer(IGrainStorageSerializer inner) + { + _inner = inner; + _innerStream = inner as IGrainStorageStreamingSerializer + ?? throw new InvalidOperationException("Inner serializer must support stream operations for this test."); + } + + public BinaryData Serialize(T input) + { + Interlocked.Increment(ref BinarySerializeCount); + return _inner.Serialize(input); + } + + public T Deserialize(BinaryData input) + { + Interlocked.Increment(ref BinaryDeserializeCount); + return _inner.Deserialize(input); + } + + public ValueTask SerializeAsync(T input, Stream destination, CancellationToken cancellationToken = default) + { + Interlocked.Increment(ref StreamSerializeCount); + return _innerStream.SerializeAsync(input, destination, cancellationToken); + } + + public ValueTask DeserializeAsync(Stream input, CancellationToken cancellationToken = default) + { + Interlocked.Increment(ref StreamDeserializeCount); + return _innerStream.DeserializeAsync(input, cancellationToken); + } + } +} + +/// +/// Validates that Azure blob storage uses the stream serializer for buffered writes. +/// +[TestCategory("Persistence"), TestCategory("AzureStorage")] +public class PersistenceGrainTests_AzureBlobStore_StreamSerializerBufferedWrites : IClassFixture +{ + private readonly Fixture fixture; + + public PersistenceGrainTests_AzureBlobStore_StreamSerializerBufferedWrites(ITestOutputHelper output, Fixture fixture) + { + this.fixture = fixture; + fixture.EnsurePreconditionsMet(); + } + + [SkippableFact, TestCategory("Functional")] + public async Task AzureBlobStorage_UsesStreamSerializerForBufferedWrites() + { + CountingGrainStorageSerializer.Reset(); + + var grain = fixture.GrainFactory.GetGrain(Guid.NewGuid(), "UnitTests.Grains"); + await grain.DoWrite(1); + _ = await grain.DoRead(); + + Assert.True(CountingGrainStorageSerializer.StreamSerializeCount > 0); + Assert.True(CountingGrainStorageSerializer.StreamDeserializeCount > 0); + Assert.Equal(0, CountingGrainStorageSerializer.BinarySerializeCount); + Assert.Equal(0, CountingGrainStorageSerializer.BinaryDeserializeCount); + } + + public class Fixture : BaseAzureTestClusterFixture + { + protected override void ConfigureTestCluster(TestClusterBuilder builder) + { + builder.Options.InitialSilosCount = 1; + builder.Options.UseTestClusterMembership = false; + builder.AddSiloBuilderConfigurator(); + builder.AddSiloBuilderConfigurator(); + builder.AddClientBuilderConfigurator(); + } + + private class StorageSiloBuilderConfigurator : ISiloConfigurator + { + public void Configure(ISiloBuilder hostBuilder) + { + hostBuilder.AddAzureBlobGrainStorage("GrainStorageForTest", optionsBuilder => + { + optionsBuilder.Configure(options => + { + options.ConfigureTestDefaults(); + options.UsePooledBufferForReads = true; + options.WriteMode = AzureBlobStorageWriteMode.BufferedStream; + }); + optionsBuilder.Configure((options, serializer) => + { + options.GrainStorageSerializer = new CountingGrainStorageSerializer(serializer); + }); + }); + } + } + } + + private sealed class CountingGrainStorageSerializer : IGrainStorageSerializer, IGrainStorageStreamingSerializer + { + private readonly IGrainStorageSerializer _inner; + private readonly IGrainStorageStreamingSerializer _innerStream; + + public static long BinarySerializeCount; + public static long BinaryDeserializeCount; + public static long StreamSerializeCount; + public static long StreamDeserializeCount; + + public static void Reset() + { + Interlocked.Exchange(ref BinarySerializeCount, 0); + Interlocked.Exchange(ref BinaryDeserializeCount, 0); + Interlocked.Exchange(ref StreamSerializeCount, 0); + Interlocked.Exchange(ref StreamDeserializeCount, 0); + } + + public CountingGrainStorageSerializer(IGrainStorageSerializer inner) + { + _inner = inner; + _innerStream = inner as IGrainStorageStreamingSerializer + ?? throw new InvalidOperationException("Inner serializer must support stream operations for this test."); + } + + public BinaryData Serialize(T input) + { + Interlocked.Increment(ref BinarySerializeCount); + return _inner.Serialize(input); + } + + public T Deserialize(BinaryData input) + { + Interlocked.Increment(ref BinaryDeserializeCount); + return _inner.Deserialize(input); + } + + public ValueTask SerializeAsync(T input, Stream destination, CancellationToken cancellationToken = default) + { + Interlocked.Increment(ref StreamSerializeCount); + return _innerStream.SerializeAsync(input, destination, cancellationToken); + } + + public ValueTask DeserializeAsync(Stream input, CancellationToken cancellationToken = default) + { + Interlocked.Increment(ref StreamDeserializeCount); + return _innerStream.DeserializeAsync(input, cancellationToken); + } + } +} diff --git a/test/NonSilo.Tests/Serialization/OrleansJsonSerializerStreamTests.cs b/test/NonSilo.Tests/Serialization/OrleansJsonSerializerStreamTests.cs new file mode 100644 index 00000000000..4b9b0366d12 --- /dev/null +++ b/test/NonSilo.Tests/Serialization/OrleansJsonSerializerStreamTests.cs @@ -0,0 +1,69 @@ +using System.IO; +using System.Text; +using Microsoft.Extensions.Options; +using Newtonsoft.Json; +using Orleans.Serialization; +using Xunit; + +namespace NonSilo.Tests.Serialization; + +public class OrleansJsonSerializerStreamTests +{ + [Fact] + public void StreamRoundTrip_SerializesAndDeserializes() + { + var serializer = new OrleansJsonSerializer(Options.Create(new OrleansJsonSerializerOptions())); + var payload = new TestPayload { Name = "test", Value = 42 }; + + using var stream = new MemoryStream(); + serializer.Serialize(payload, typeof(TestPayload), stream); + stream.Position = 0; + + var result = (TestPayload)serializer.Deserialize(typeof(TestPayload), stream); + + Assert.NotNull(result); + Assert.Equal(payload.Name, result.Name); + Assert.Equal(payload.Value, result.Value); + } + + [Fact] + public void Deserialize_EmptyStream_ReturnsNull() + { + var serializer = new OrleansJsonSerializer(Options.Create(new OrleansJsonSerializerOptions())); + + using var stream = new MemoryStream(); + var result = serializer.Deserialize(typeof(TestPayload), stream); + + Assert.Null(result); + } + + [Fact] + public void StreamRoundTrip_AllowsNull() + { + var serializer = new OrleansJsonSerializer(Options.Create(new OrleansJsonSerializerOptions())); + + using var stream = new MemoryStream(); + serializer.Serialize(null, typeof(TestPayload), stream); + stream.Position = 0; + + var result = serializer.Deserialize(typeof(TestPayload), stream); + + Assert.Null(result); + } + + [Fact] + public void Deserialize_InvalidJson_Throws() + { + var serializer = new OrleansJsonSerializer(Options.Create(new OrleansJsonSerializerOptions())); + + using var stream = new MemoryStream(Encoding.UTF8.GetBytes("{invalid")); + + Assert.Throws(() => serializer.Deserialize(typeof(TestPayload), stream)); + } + + private sealed class TestPayload + { + public string Name { get; set; } + public int Value { get; set; } + } +} diff --git a/test/TestInfrastructure/TestExtensions/NonStreamingGrainStorageSerializer.cs b/test/TestInfrastructure/TestExtensions/NonStreamingGrainStorageSerializer.cs new file mode 100644 index 00000000000..2aa0b111fbb --- /dev/null +++ b/test/TestInfrastructure/TestExtensions/NonStreamingGrainStorageSerializer.cs @@ -0,0 +1,14 @@ +using Orleans.Storage; + +namespace TestExtensions; + +public sealed class NonStreamingGrainStorageSerializer : IGrainStorageSerializer +{ + private readonly IGrainStorageSerializer _inner; + + public NonStreamingGrainStorageSerializer(IGrainStorageSerializer inner) => _inner = inner; + + public BinaryData Serialize(T input) => _inner.Serialize(input); + + public T Deserialize(BinaryData input) => _inner.Deserialize(input); +}