Skip to content

Commit 7ec8b60

Browse files
committed
Azure blob reads: add optional pooled buffers for streaming downloads
Introduce AzureBlobStorageOptions.UsePooledBufferForReads and switch ReadStateAsync to DownloadStreamingAsync with a rented buffer sized to ContentLength when enabled. This avoids large byte[] allocations from DownloadContentAsync, reducing LOH pressure and Gen2 GC churn for blob reads while keeping the behavior opt-in for safety with custom serializers.
1 parent f1fe6fd commit 7ec8b60

File tree

7 files changed

+252
-23
lines changed

7 files changed

+252
-23
lines changed

src/Azure/Orleans.Persistence.AzureStorage/Providers/Storage/AzureBlobStorage.cs

Lines changed: 37 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
#nullable enable
2-
using System;
2+
using System.Buffers;
33
using System.Diagnostics;
4-
using System.Threading;
5-
using System.Threading.Tasks;
64
using Azure;
75
using Azure.Storage.Blobs;
86
using Azure.Storage.Blobs.Models;
@@ -11,7 +9,6 @@
119
using Microsoft.Extensions.Options;
1210
using Orleans.Configuration;
1311
using Orleans.Providers.Azure;
14-
using Orleans.Runtime;
1512
using Orleans.Serialization.Serializers;
1613
using LogLevel = Microsoft.Extensions.Logging.LogLevel;
1714

@@ -58,19 +55,46 @@ public async Task ReadStateAsync<T>(string grainType, GrainId grainId, IGrainSta
5855
{
5956
var blob = container.GetBlobClient(blobName);
6057

61-
var response = await blob.DownloadContentAsync();
62-
grainState.ETag = response.Value.Details.ETag.ToString();
63-
var contents = response.Value.Content;
64-
T? loadedState;
65-
if (contents is null || contents.IsEmpty)
58+
T? loadedState = default;
59+
if (options.UsePooledBufferForReads)
6660
{
67-
loadedState = default;
68-
LogTraceBlobEmptyReading(grainType, grainId, grainState.ETag, blobName, container.Name);
61+
var response = await blob.DownloadStreamingAsync();
62+
grainState.ETag = response.Value.Details.ETag.ToString();
63+
64+
var contentLength = response.Value.Details.ContentLength;
65+
if (contentLength <= 0)
66+
{
67+
LogTraceBlobEmptyReading(grainType, grainId, grainState.ETag, blobName, container.Name);
68+
}
69+
else if (contentLength > int.MaxValue)
70+
{
71+
throw new InvalidOperationException($"Blob content length {contentLength} exceeds the maximum supported size.");
72+
}
73+
else
74+
{
75+
await using var content = response.Value.Content;
76+
var buffer = ArrayPool<byte>.Shared.Rent((int)contentLength);
77+
var memory = buffer.AsMemory(0, (int)contentLength);
78+
await content.ReadExactlyAsync(memory);
79+
loadedState = this.ConvertFromStorageFormat<T>(new BinaryData(memory));
80+
ArrayPool<byte>.Shared.Return(buffer);
81+
LogTraceDataRead(grainType, grainId, grainState.ETag, blobName, container.Name);
82+
}
6983
}
7084
else
7185
{
72-
loadedState = this.ConvertFromStorageFormat<T>(contents);
73-
LogTraceDataRead(grainType, grainId, grainState.ETag, blobName, container.Name);
86+
var response = await blob.DownloadContentAsync();
87+
grainState.ETag = response.Value.Details.ETag.ToString();
88+
var contents = response.Value.Content;
89+
if (contents is null || contents.IsEmpty)
90+
{
91+
LogTraceBlobEmptyReading(grainType, grainId, grainState.ETag, blobName, container.Name);
92+
}
93+
else
94+
{
95+
loadedState = this.ConvertFromStorageFormat<T>(contents);
96+
LogTraceDataRead(grainType, grainId, grainState.ETag, blobName, container.Name);
97+
}
7498
}
7599

76100
grainState.State = loadedState ?? CreateInstance<T>();

src/Azure/Orleans.Persistence.AzureStorage/Providers/Storage/AzureBlobStorageOptions.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,12 @@ public BlobServiceClient BlobServiceClient
5757
/// </summary>
5858
public bool DeleteStateOnClear { get; set; } = true;
5959

60+
/// <summary>
61+
/// Gets or sets a value indicating whether to use pooled buffers when reading blob contents.
62+
/// The deserializer must not retain the <see cref="BinaryData"/> or underlying buffer after deserialization.
63+
/// </summary>
64+
public bool UsePooledBufferForReads { get; set; }
65+
6066
/// <summary>
6167
/// A function for building container factory instances
6268
/// </summary>
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
using System;
2+
using System.Threading.Tasks;
3+
using Azure.Storage.Blobs;
4+
using BenchmarkDotNet.Attributes;
5+
using Microsoft.Extensions.DependencyInjection;
6+
using Microsoft.Extensions.Logging;
7+
using Microsoft.Extensions.Logging.Abstractions;
8+
using Orleans.Configuration;
9+
using Orleans.Runtime;
10+
using Orleans.Serialization;
11+
using Orleans.Serialization.Serializers;
12+
using Orleans.Storage;
13+
using TestExtensions;
14+
15+
namespace Benchmarks.GrainStorage;
16+
17+
[MemoryDiagnoser]
18+
[SimpleJob(launchCount: 1, warmupCount: 1, iterationCount: 6)]
19+
public class AzureBlobReadStateBenchmark
20+
{
21+
private const int ReadIterations = 50;
22+
private ServiceProvider _serviceProvider = null!;
23+
private AzureBlobGrainStorage _nonPooledStorage = null!;
24+
private AzureBlobGrainStorage _pooledStorage = null!;
25+
private IGrainState<BenchmarkState> _nonPooledState = null!;
26+
private IGrainState<BenchmarkState> _pooledState = null!;
27+
private GrainId _grainId;
28+
private string _grainType = null!;
29+
private BlobContainerClient _containerClient = null!;
30+
31+
[Params(64 * 1024, 128 * 1024)]
32+
public int PayloadSize { get; set; }
33+
34+
[GlobalSetup]
35+
public async Task SetupAsync()
36+
{
37+
var client = CreateBlobServiceClient();
38+
_serviceProvider = new ServiceCollection()
39+
.AddLogging()
40+
.AddSerializer()
41+
.BuildServiceProvider();
42+
43+
var activatorProvider = _serviceProvider.GetRequiredService<IActivatorProvider>();
44+
var serializer = new OrleansGrainStorageSerializer(_serviceProvider.GetRequiredService<Serializer>());
45+
var containerName = $"bench-grainstate-{Guid.NewGuid():N}";
46+
_grainType = "bench-grain";
47+
_grainId = GrainId.Create("bench-grain", Guid.NewGuid().ToString("N"));
48+
49+
var nonPooledOptions = CreateOptions(client, serializer, containerName, usePooledReads: false);
50+
var pooledOptions = CreateOptions(client, serializer, containerName, usePooledReads: true);
51+
52+
(_nonPooledStorage, var nonPooledFactory) = CreateStorage("bench-nonpooled", nonPooledOptions, activatorProvider);
53+
(_pooledStorage, var pooledFactory) = CreateStorage("bench-pooled", pooledOptions, activatorProvider);
54+
55+
await nonPooledFactory.InitializeAsync(client);
56+
await pooledFactory.InitializeAsync(client);
57+
58+
var writeState = new GrainState<BenchmarkState>
59+
{
60+
State = BenchmarkState.Create(PayloadSize)
61+
};
62+
await _nonPooledStorage.WriteStateAsync(_grainType, _grainId, writeState);
63+
64+
_nonPooledState = new GrainState<BenchmarkState>();
65+
_pooledState = new GrainState<BenchmarkState>();
66+
_containerClient = client.GetBlobContainerClient(containerName);
67+
}
68+
69+
[GlobalCleanup]
70+
public async Task CleanupAsync()
71+
{
72+
if (_containerClient is not null)
73+
{
74+
await _containerClient.DeleteIfExistsAsync();
75+
}
76+
77+
_serviceProvider?.Dispose();
78+
}
79+
80+
[Benchmark(Baseline = true, OperationsPerInvoke = ReadIterations)]
81+
public async Task ReadStateNonPooledAsync()
82+
{
83+
for (var i = 0; i < ReadIterations; i++)
84+
{
85+
await _nonPooledStorage.ReadStateAsync(_grainType, _grainId, _nonPooledState);
86+
}
87+
}
88+
89+
[Benchmark(OperationsPerInvoke = ReadIterations)]
90+
public async Task ReadStatePooledAsync()
91+
{
92+
for (var i = 0; i < ReadIterations; i++)
93+
{
94+
await _pooledStorage.ReadStateAsync(_grainType, _grainId, _pooledState);
95+
}
96+
}
97+
98+
private static BlobServiceClient CreateBlobServiceClient()
99+
{
100+
if (TestDefaultConfiguration.UseAadAuthentication)
101+
{
102+
if (!TestDefaultConfiguration.GetValue(nameof(TestDefaultConfiguration.DataBlobUri), out var blobUriValue) ||
103+
string.IsNullOrWhiteSpace(blobUriValue))
104+
{
105+
throw new InvalidOperationException("DataBlobUri is required when UseAadAuthentication is true.");
106+
}
107+
108+
return new BlobServiceClient(new Uri(blobUriValue), TestDefaultConfiguration.TokenCredential);
109+
}
110+
111+
if (string.IsNullOrWhiteSpace(TestDefaultConfiguration.DataConnectionString))
112+
{
113+
throw new InvalidOperationException("OrleansDataConnectionString must be set for Azure Blob benchmarks.");
114+
}
115+
116+
return new BlobServiceClient(TestDefaultConfiguration.DataConnectionString);
117+
}
118+
119+
private static AzureBlobStorageOptions CreateOptions(
120+
BlobServiceClient client,
121+
IGrainStorageSerializer serializer,
122+
string containerName,
123+
bool usePooledReads)
124+
{
125+
return new AzureBlobStorageOptions
126+
{
127+
BlobServiceClient = client,
128+
ContainerName = containerName,
129+
GrainStorageSerializer = serializer,
130+
UsePooledBufferForReads = usePooledReads
131+
};
132+
}
133+
134+
private (AzureBlobGrainStorage Storage, IBlobContainerFactory Factory) CreateStorage(
135+
string name,
136+
AzureBlobStorageOptions options,
137+
IActivatorProvider activatorProvider)
138+
{
139+
var containerFactory = options.BuildContainerFactory(_serviceProvider, options);
140+
var logger = NullLoggerFactory.Instance.CreateLogger<AzureBlobGrainStorage>();
141+
var storage = new AzureBlobGrainStorage(name, options, containerFactory, activatorProvider, logger);
142+
return (storage, containerFactory);
143+
}
144+
145+
[GenerateSerializer]
146+
public sealed class BenchmarkState
147+
{
148+
[Id(0)]
149+
public byte[] Payload { get; set; } = Array.Empty<byte>();
150+
151+
public static BenchmarkState Create(int size)
152+
{
153+
var payload = new byte[size];
154+
Random.Shared.NextBytes(payload);
155+
return new BenchmarkState { Payload = payload };
156+
}
157+
}
158+
}

test/Benchmarks/Program.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,10 @@ internal class Program
260260
benchmark => benchmark.RunAsync().GetAwaiter().GetResult(),
261261
benchmark => benchmark.Teardown());
262262
},
263+
["GrainStorage.AzureBlob.ReadState"] = _ =>
264+
{
265+
BenchmarkRunner.Run<AzureBlobReadStateBenchmark>();
266+
},
263267
["GrainStorage.AdoNet"] = _ =>
264268
{
265269
RunBenchmark(

test/Benchmarks/Properties/launchSettings.json

Lines changed: 0 additions & 8 deletions
This file was deleted.

test/Benchmarks/run_test.cmd

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
pushd %~dp0
22
git log -n 1
33
git --no-pager diff
4-
dotnet run -c Release -- ConcurrentPing
5-
popd
4+
dotnet run -c Release --framework net10.0 -- GrainStorage.AzureBlob.ReadState
5+
popd
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
using Orleans.Configuration;
2+
using Orleans.TestingHost;
3+
using Xunit;
4+
using Xunit.Abstractions;
5+
6+
namespace Tester.AzureUtils.Persistence;
7+
8+
/// <summary>
9+
/// PersistenceGrainTests using AzureStore with pooled read buffers enabled.
10+
/// </summary>
11+
[TestCategory("Persistence"), TestCategory("AzureStorage")]
12+
public class PersistenceGrainTests_AzureBlobStore_PooledReads : Base_PersistenceGrainTests_AzureStore, IClassFixture<PersistenceGrainTests_AzureBlobStore_PooledReads.Fixture>
13+
{
14+
public class Fixture : BaseAzureTestClusterFixture
15+
{
16+
private class StorageSiloBuilderConfigurator : ISiloConfigurator
17+
{
18+
public void Configure(ISiloBuilder hostBuilder)
19+
{
20+
hostBuilder
21+
.AddAzureBlobGrainStorage("GrainStorageForTest", (AzureBlobStorageOptions options) =>
22+
{
23+
options.ConfigureTestDefaults();
24+
options.UsePooledBufferForReads = true;
25+
})
26+
.AddMemoryGrainStorage("MemoryStore")
27+
.AddMemoryGrainStorage("test1");
28+
}
29+
}
30+
31+
protected override void ConfigureTestCluster(TestClusterBuilder builder)
32+
{
33+
builder.Options.InitialSilosCount = 4;
34+
builder.Options.UseTestClusterMembership = false;
35+
builder.AddSiloBuilderConfigurator<SiloBuilderConfigurator>();
36+
builder.AddSiloBuilderConfigurator<StorageSiloBuilderConfigurator>();
37+
builder.AddClientBuilderConfigurator<ClientBuilderConfigurator>();
38+
}
39+
}
40+
41+
public PersistenceGrainTests_AzureBlobStore_PooledReads(ITestOutputHelper output, Fixture fixture) : base(output, fixture)
42+
{
43+
fixture.EnsurePreconditionsMet();
44+
}
45+
}

0 commit comments

Comments
 (0)