Skip to content

Commit 99a2f67

Browse files
committed
Azure blob reads: add optional pooled buffers for streaming downloads
Introduce AzureBlobStorageOptions.UsePooledBufferForReads and switch ReadStateAsync to DownloadStreamingAsync with a rented buffer sized to ContentLength when enabled. This avoids large byte[] allocations from DownloadContentAsync, reducing LOH pressure and Gen2 GC churn for blob reads while keeping the behavior opt-in for safety with custom serializers.
1 parent f1fe6fd commit 99a2f67

File tree

7 files changed

+248
-23
lines changed

7 files changed

+248
-23
lines changed

src/Azure/Orleans.Persistence.AzureStorage/Providers/Storage/AzureBlobStorage.cs

Lines changed: 37 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
#nullable enable
2-
using System;
2+
using System.Buffers;
33
using System.Diagnostics;
4-
using System.Threading;
5-
using System.Threading.Tasks;
64
using Azure;
75
using Azure.Storage.Blobs;
86
using Azure.Storage.Blobs.Models;
@@ -11,7 +9,6 @@
119
using Microsoft.Extensions.Options;
1210
using Orleans.Configuration;
1311
using Orleans.Providers.Azure;
14-
using Orleans.Runtime;
1512
using Orleans.Serialization.Serializers;
1613
using LogLevel = Microsoft.Extensions.Logging.LogLevel;
1714

@@ -58,19 +55,46 @@ public async Task ReadStateAsync<T>(string grainType, GrainId grainId, IGrainSta
5855
{
5956
var blob = container.GetBlobClient(blobName);
6057

61-
var response = await blob.DownloadContentAsync();
62-
grainState.ETag = response.Value.Details.ETag.ToString();
63-
var contents = response.Value.Content;
64-
T? loadedState;
65-
if (contents is null || contents.IsEmpty)
58+
T? loadedState = default;
59+
if (options.UsePooledBufferForReads)
6660
{
67-
loadedState = default;
68-
LogTraceBlobEmptyReading(grainType, grainId, grainState.ETag, blobName, container.Name);
61+
var response = await blob.DownloadStreamingAsync();
62+
grainState.ETag = response.Value.Details.ETag.ToString();
63+
64+
var contentLength = response.Value.Details.ContentLength;
65+
if (contentLength <= 0)
66+
{
67+
LogTraceBlobEmptyReading(grainType, grainId, grainState.ETag, blobName, container.Name);
68+
}
69+
else if (contentLength > int.MaxValue)
70+
{
71+
throw new InvalidOperationException($"Blob content length {contentLength} exceeds the maximum supported size.");
72+
}
73+
else
74+
{
75+
await using var content = response.Value.Content;
76+
var buffer = ArrayPool<byte>.Shared.Rent((int)contentLength);
77+
var memory = buffer.AsMemory(0, (int)contentLength);
78+
await content.ReadExactlyAsync(memory);
79+
loadedState = this.ConvertFromStorageFormat<T>(new BinaryData(memory));
80+
ArrayPool<byte>.Shared.Return(buffer);
81+
LogTraceDataRead(grainType, grainId, grainState.ETag, blobName, container.Name);
82+
}
6983
}
7084
else
7185
{
72-
loadedState = this.ConvertFromStorageFormat<T>(contents);
73-
LogTraceDataRead(grainType, grainId, grainState.ETag, blobName, container.Name);
86+
var response = await blob.DownloadContentAsync();
87+
grainState.ETag = response.Value.Details.ETag.ToString();
88+
var contents = response.Value.Content;
89+
if (contents is null || contents.IsEmpty)
90+
{
91+
LogTraceBlobEmptyReading(grainType, grainId, grainState.ETag, blobName, container.Name);
92+
}
93+
else
94+
{
95+
loadedState = this.ConvertFromStorageFormat<T>(contents);
96+
LogTraceDataRead(grainType, grainId, grainState.ETag, blobName, container.Name);
97+
}
7498
}
7599

76100
grainState.State = loadedState ?? CreateInstance<T>();

src/Azure/Orleans.Persistence.AzureStorage/Providers/Storage/AzureBlobStorageOptions.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,12 @@ public BlobServiceClient BlobServiceClient
5757
/// </summary>
5858
public bool DeleteStateOnClear { get; set; } = true;
5959

60+
/// <summary>
61+
/// Gets or sets a value indicating whether to use pooled buffers when reading blob contents.
62+
/// The deserializer must not retain the <see cref="BinaryData"/> or underlying buffer after deserialization.
63+
/// </summary>
64+
public bool UsePooledBufferForReads { get; set; }
65+
6066
/// <summary>
6167
/// A function for building container factory instances
6268
/// </summary>
Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
using Azure.Storage.Blobs;
2+
using BenchmarkDotNet.Attributes;
3+
using Microsoft.Extensions.DependencyInjection;
4+
using Microsoft.Extensions.Logging;
5+
using Microsoft.Extensions.Logging.Abstractions;
6+
using Orleans.Configuration;
7+
using Orleans.Serialization;
8+
using Orleans.Serialization.Serializers;
9+
using Orleans.Storage;
10+
using TestExtensions;
11+
12+
namespace Benchmarks.GrainStorage;
13+
14+
[MemoryDiagnoser]
15+
public class AzureBlobReadStateBenchmark
16+
{
17+
private const int ReadIterations = 50;
18+
private ServiceProvider _serviceProvider = null!;
19+
private AzureBlobGrainStorage _nonPooledStorage = null!;
20+
private AzureBlobGrainStorage _pooledStorage = null!;
21+
private IGrainState<BenchmarkState> _nonPooledState = null!;
22+
private IGrainState<BenchmarkState> _pooledState = null!;
23+
private GrainId _grainId;
24+
private string _grainType = null!;
25+
private BlobContainerClient _containerClient = null!;
26+
27+
[Params(4 * 1024, 64 * 1024, 128 * 1024)]
28+
public int PayloadSize { get; set; }
29+
30+
[GlobalSetup]
31+
public async Task SetupAsync()
32+
{
33+
var client = CreateBlobServiceClient();
34+
_serviceProvider = new ServiceCollection()
35+
.AddLogging()
36+
.AddSerializer()
37+
.BuildServiceProvider();
38+
39+
var activatorProvider = _serviceProvider.GetRequiredService<IActivatorProvider>();
40+
var serializer = new OrleansGrainStorageSerializer(_serviceProvider.GetRequiredService<Serializer>());
41+
var containerName = $"bench-grainstate-{Guid.NewGuid():N}";
42+
_grainType = "bench-grain";
43+
_grainId = GrainId.Create("bench-grain", Guid.NewGuid().ToString("N"));
44+
45+
var nonPooledOptions = CreateOptions(client, serializer, containerName, usePooledReads: false);
46+
var pooledOptions = CreateOptions(client, serializer, containerName, usePooledReads: true);
47+
48+
(_nonPooledStorage, var nonPooledFactory) = CreateStorage("bench-nonpooled", nonPooledOptions, activatorProvider);
49+
(_pooledStorage, var pooledFactory) = CreateStorage("bench-pooled", pooledOptions, activatorProvider);
50+
51+
await nonPooledFactory.InitializeAsync(client);
52+
await pooledFactory.InitializeAsync(client);
53+
54+
var writeState = new GrainState<BenchmarkState>
55+
{
56+
State = BenchmarkState.Create(PayloadSize)
57+
};
58+
await _nonPooledStorage.WriteStateAsync(_grainType, _grainId, writeState);
59+
60+
_nonPooledState = new GrainState<BenchmarkState>();
61+
_pooledState = new GrainState<BenchmarkState>();
62+
_containerClient = client.GetBlobContainerClient(containerName);
63+
}
64+
65+
[GlobalCleanup]
66+
public async Task CleanupAsync()
67+
{
68+
if (_containerClient is not null)
69+
{
70+
await _containerClient.DeleteIfExistsAsync();
71+
}
72+
73+
_serviceProvider?.Dispose();
74+
}
75+
76+
[Benchmark(Baseline = true, OperationsPerInvoke = ReadIterations)]
77+
public async Task ReadStateNonPooledAsync()
78+
{
79+
for (var i = 0; i < ReadIterations; i++)
80+
{
81+
await _nonPooledStorage.ReadStateAsync(_grainType, _grainId, _nonPooledState);
82+
}
83+
}
84+
85+
[Benchmark(OperationsPerInvoke = ReadIterations)]
86+
public async Task ReadStatePooledAsync()
87+
{
88+
for (var i = 0; i < ReadIterations; i++)
89+
{
90+
await _pooledStorage.ReadStateAsync(_grainType, _grainId, _pooledState);
91+
}
92+
}
93+
94+
private static BlobServiceClient CreateBlobServiceClient()
95+
{
96+
if (TestDefaultConfiguration.UseAadAuthentication)
97+
{
98+
if (!TestDefaultConfiguration.GetValue(nameof(TestDefaultConfiguration.DataBlobUri), out var blobUriValue) ||
99+
string.IsNullOrWhiteSpace(blobUriValue))
100+
{
101+
throw new InvalidOperationException("DataBlobUri is required when UseAadAuthentication is true.");
102+
}
103+
104+
return new BlobServiceClient(new Uri(blobUriValue), TestDefaultConfiguration.TokenCredential);
105+
}
106+
107+
if (string.IsNullOrWhiteSpace(TestDefaultConfiguration.DataConnectionString))
108+
{
109+
throw new InvalidOperationException("OrleansDataConnectionString must be set for Azure Blob benchmarks.");
110+
}
111+
112+
return new BlobServiceClient(TestDefaultConfiguration.DataConnectionString);
113+
}
114+
115+
private static AzureBlobStorageOptions CreateOptions(
116+
BlobServiceClient client,
117+
IGrainStorageSerializer serializer,
118+
string containerName,
119+
bool usePooledReads)
120+
{
121+
return new AzureBlobStorageOptions
122+
{
123+
BlobServiceClient = client,
124+
ContainerName = containerName,
125+
GrainStorageSerializer = serializer,
126+
UsePooledBufferForReads = usePooledReads
127+
};
128+
}
129+
130+
private (AzureBlobGrainStorage Storage, IBlobContainerFactory Factory) CreateStorage(
131+
string name,
132+
AzureBlobStorageOptions options,
133+
IActivatorProvider activatorProvider)
134+
{
135+
var containerFactory = options.BuildContainerFactory(_serviceProvider, options);
136+
var logger = NullLoggerFactory.Instance.CreateLogger<AzureBlobGrainStorage>();
137+
var storage = new AzureBlobGrainStorage(name, options, containerFactory, activatorProvider, logger);
138+
return (storage, containerFactory);
139+
}
140+
141+
[GenerateSerializer]
142+
public sealed class BenchmarkState
143+
{
144+
[Id(0)]
145+
public byte[] Payload { get; set; } = Array.Empty<byte>();
146+
147+
public static BenchmarkState Create(int size)
148+
{
149+
var payload = new byte[size];
150+
Random.Shared.NextBytes(payload);
151+
return new BenchmarkState { Payload = payload };
152+
}
153+
}
154+
}

test/Benchmarks/Program.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,10 @@ internal class Program
260260
benchmark => benchmark.RunAsync().GetAwaiter().GetResult(),
261261
benchmark => benchmark.Teardown());
262262
},
263+
["GrainStorage.AzureBlob.ReadState"] = _ =>
264+
{
265+
BenchmarkRunner.Run<AzureBlobReadStateBenchmark>();
266+
},
263267
["GrainStorage.AdoNet"] = _ =>
264268
{
265269
RunBenchmark(

test/Benchmarks/Properties/launchSettings.json

Lines changed: 0 additions & 8 deletions
This file was deleted.

test/Benchmarks/run_test.cmd

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
pushd %~dp0
22
git log -n 1
33
git --no-pager diff
4-
dotnet run -c Release -- ConcurrentPing
5-
popd
4+
dotnet run -c Release --framework net10.0 -- GrainStorage.AzureBlob.ReadState
5+
popd
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
using Orleans.Configuration;
2+
using Orleans.TestingHost;
3+
using Xunit;
4+
using Xunit.Abstractions;
5+
6+
namespace Tester.AzureUtils.Persistence;
7+
8+
/// <summary>
9+
/// PersistenceGrainTests using AzureStore with pooled read buffers enabled.
10+
/// </summary>
11+
[TestCategory("Persistence"), TestCategory("AzureStorage")]
12+
public class PersistenceGrainTests_AzureBlobStore_PooledReads : Base_PersistenceGrainTests_AzureStore, IClassFixture<PersistenceGrainTests_AzureBlobStore_PooledReads.Fixture>
13+
{
14+
public class Fixture : BaseAzureTestClusterFixture
15+
{
16+
private class StorageSiloBuilderConfigurator : ISiloConfigurator
17+
{
18+
public void Configure(ISiloBuilder hostBuilder)
19+
{
20+
hostBuilder
21+
.AddAzureBlobGrainStorage("GrainStorageForTest", (AzureBlobStorageOptions options) =>
22+
{
23+
options.ConfigureTestDefaults();
24+
options.UsePooledBufferForReads = true;
25+
})
26+
.AddMemoryGrainStorage("MemoryStore")
27+
.AddMemoryGrainStorage("test1");
28+
}
29+
}
30+
31+
protected override void ConfigureTestCluster(TestClusterBuilder builder)
32+
{
33+
builder.Options.InitialSilosCount = 4;
34+
builder.Options.UseTestClusterMembership = false;
35+
builder.AddSiloBuilderConfigurator<SiloBuilderConfigurator>();
36+
builder.AddSiloBuilderConfigurator<StorageSiloBuilderConfigurator>();
37+
builder.AddClientBuilderConfigurator<ClientBuilderConfigurator>();
38+
}
39+
}
40+
41+
public PersistenceGrainTests_AzureBlobStore_PooledReads(ITestOutputHelper output, Fixture fixture) : base(output, fixture)
42+
{
43+
fixture.EnsurePreconditionsMet();
44+
}
45+
}

0 commit comments

Comments
 (0)