Skip to content

Commit 4d4c4e7

Browse files
Remove variable blocksize (Azure#49416)
* CopyToExact stream extension * PooledMemoryStream changes Removed static method to buffer part of an existing stream netstandard2.1 impl * fix tests Lots of values adjusted to account for behavior change Some fixes for api changes * revert namechange * remove more plumbing of max&min counts
1 parent b8a3dc7 commit 4d4c4e7

File tree

9 files changed

+156
-241
lines changed

9 files changed

+156
-241
lines changed

sdk/storage/Azure.Storage.Blobs/tests/PartitionedUploaderTests.cs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public async Task UploadsStreamInBlocksIfLengthNotAvailable()
6161

6262
Assert.AreEqual(1, sink.Staged.Count);
6363
Assert.AreEqual(s_response, info);
64-
Assert.AreEqual(2, testPool.TotalRents); // while conceptually there is one rental, the second rental occurs upon checking for stream end on a Read() call
64+
Assert.AreEqual(1, testPool.TotalRents);
6565
Assert.AreEqual(0, testPool.CurrentCount);
6666
AssertStaged(sink, content);
6767
}
@@ -107,14 +107,14 @@ public async Task UploadsStreamInBlocksUnderSize()
107107
clientMock.SetupGet(c => c.ClientConfiguration).CallBase();
108108
SetupInternalStaging(clientMock, sink);
109109

110-
var uploader = new PartitionedUploader<BlobUploadOptions, BlobContentInfo>(BlockBlobClient.GetPartitionedUploaderBehaviors(clientMock.Object), new StorageTransferOptions() { MaximumTransferLength = 20}, s_validationEmpty, arrayPool: testPool);
110+
var uploader = new PartitionedUploader<BlobUploadOptions, BlobContentInfo>(BlockBlobClient.GetPartitionedUploaderBehaviors(clientMock.Object), new StorageTransferOptions() { MaximumTransferLength = 10}, s_validationEmpty, arrayPool: testPool);
111111
Response<BlobContentInfo> info = await InvokeUploadAsync(uploader, content);
112112

113113
Assert.AreEqual(2, sink.Staged.Count);
114114
Assert.AreEqual(s_response, info);
115115
AssertStaged(sink, content);
116116

117-
Assert.AreEqual(3, testPool.TotalRents);
117+
Assert.AreEqual(2, testPool.TotalRents);
118118
Assert.AreEqual(0, testPool.CurrentCount);
119119
}
120120

@@ -137,7 +137,7 @@ public async Task MergesLotsOfSmallBlocks()
137137

138138
Assert.AreEqual(1, sink.Staged.Count);
139139
Assert.AreEqual(s_response, info);
140-
Assert.AreEqual(2, testPool.TotalRents); // while conceptually there is one rental, the second rental occurs upon checking for stream end on a Read() call
140+
Assert.AreEqual(1, testPool.TotalRents);
141141
Assert.AreEqual(0, testPool.CurrentCount);
142142
AssertStaged(sink, content);
143143
}
@@ -161,7 +161,7 @@ public async Task SmallMaxWriteSize()
161161

162162
Assert.AreEqual(s_response, info);
163163
Assert.AreEqual(0, testPool.CurrentCount);
164-
Assert.AreEqual(41, testPool.TotalRents);
164+
Assert.AreEqual(20, testPool.TotalRents);
165165
AssertStaged(sink, content);
166166

167167
foreach (byte[] bytes in sink.Staged.Values.Select(val => val.Data))
@@ -186,14 +186,14 @@ public async Task MergesBlocksUntilTheyReachOverHalfMaxSize()
186186
clientMock.SetupGet(c => c.ClientConfiguration).CallBase();
187187
SetupInternalStaging(clientMock, sink);
188188

189-
var uploader = new PartitionedUploader<BlobUploadOptions, BlobContentInfo>(BlockBlobClient.GetPartitionedUploaderBehaviors(clientMock.Object), new StorageTransferOptions() { MaximumTransferLength = 20}, s_validationEmpty, arrayPool: testPool);
189+
var uploader = new PartitionedUploader<BlobUploadOptions, BlobContentInfo>(BlockBlobClient.GetPartitionedUploaderBehaviors(clientMock.Object), new StorageTransferOptions() { MaximumTransferLength = 10}, s_validationEmpty, arrayPool: testPool);
190190
Response<BlobContentInfo> info = await InvokeUploadAsync(uploader, content);
191191

192192
Assert.AreEqual(2, sink.Staged.Count);
193193
// First two should be merged
194194
CollectionAssert.AreEqual(new byte[] {0, 0, 0, 0, 0, 1, 1, 1, 1, 1 }, sink.Staged[sink.Blocks.First()].Data);
195195
Assert.AreEqual(s_response, info);
196-
Assert.AreEqual(3, testPool.TotalRents);
196+
Assert.AreEqual(2, testPool.TotalRents);
197197
Assert.AreEqual(0, testPool.CurrentCount);
198198
AssertStaged(sink, content);
199199
}
@@ -212,7 +212,7 @@ public async Task BlockIdsAre64BytesUniqueBase64Strings()
212212
clientMock.SetupGet(c => c.ClientConfiguration).CallBase();
213213
SetupInternalStaging(clientMock, sink);
214214

215-
var uploader = new PartitionedUploader<BlobUploadOptions, BlobContentInfo>(BlockBlobClient.GetPartitionedUploaderBehaviors(clientMock.Object), new StorageTransferOptions() { MaximumTransferLength = 20 }, s_validationEmpty);
215+
var uploader = new PartitionedUploader<BlobUploadOptions, BlobContentInfo>(BlockBlobClient.GetPartitionedUploaderBehaviors(clientMock.Object), new StorageTransferOptions() { MaximumTransferLength = 10 }, s_validationEmpty);
216216
Response<BlobContentInfo> info = await InvokeUploadAsync(uploader, content);
217217

218218
Assert.AreEqual(2, sink.Staged.Count);

sdk/storage/Azure.Storage.Common/src/Shared/PartitionedUploader.cs

Lines changed: 25 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -85,8 +85,7 @@ private delegate Task StageContentPartitionAsync<TContent>(
8585
/// </summary>
8686
private delegate Task<(Stream PartitionContent, ReadOnlyMemory<byte> PartitionChecksum)> GetNextStreamPartition(
8787
Stream stream,
88-
long minCount,
89-
long maxCount,
88+
long count,
9089
long absolutePosition,
9190
bool async,
9291
CancellationToken cancellationToken);
@@ -373,14 +372,14 @@ public async Task<Response<TCompleteUploadReturn>> UploadInternal(
373372
Stream bufferedContent;
374373
if (UseMasterCrc && _masterCrcSupplier != default)
375374
{
376-
bufferedContent = await PooledMemoryStream.BufferStreamPartitionInternal(
377-
content, length.Value, length.Value, _arrayPool,
378-
maxArrayPoolRentalSize: default, async, cancellationToken).ConfigureAwait(false);
375+
bufferedContent = new PooledMemoryStream(_arrayPool, Constants.MB);
376+
await content.CopyToInternal(bufferedContent, async, cancellationToken).ConfigureAwait(false);
377+
bufferedContent.Position = 0;
379378
}
380379
else
381380
{
382381
(bufferedContent, oneshotValidationOptions) = await BufferAndOptionalChecksumStreamInternal(
383-
content, length.Value, length.Value, oneshotValidationOptions, async, cancellationToken)
382+
content, length.Value, oneshotValidationOptions, async, cancellationToken)
384383
.ConfigureAwait(false);
385384
}
386385
bucket.Add(bufferedContent);
@@ -438,11 +437,8 @@ private static long GetActualBlockSize(long? blockSize, long? totalLength)
438437
/// <param name="source">
439438
/// Stream to buffer.
440439
/// </param>
441-
/// <param name="minCount">
442-
/// Minimum count to buffer from the stream.
443-
/// </param>
444-
/// <param name="maxCount">
445-
/// Maximum count to buffer from the stream.
440+
/// <param name="count">
441+
/// Exact count to buffer from the stream.
446442
/// </param>
447443
/// <param name="validationOptions">
448444
/// Validation options for the upload to determine if buffering is needed.
@@ -467,8 +463,7 @@ private static long GetActualBlockSize(long? blockSize, long? totalLength)
467463
private async Task<(Stream Stream, UploadTransferValidationOptions ValidationOptions)>
468464
BufferAndOptionalChecksumStreamInternal(
469465
Stream source,
470-
long minCount,
471-
long maxCount,
466+
long? count,
472467
UploadTransferValidationOptions validationOptions,
473468
bool async,
474469
CancellationToken cancellationToken)
@@ -488,14 +483,16 @@ private static long GetActualBlockSize(long? blockSize, long? totalLength)
488483
.SetupChecksumCalculatingReadStream(source, validationOptions.ChecksumAlgorithm);
489484
}
490485

491-
PooledMemoryStream bufferedContent = await PooledMemoryStream.BufferStreamPartitionInternal(
492-
source,
493-
minCount,
494-
maxCount,
495-
_arrayPool,
496-
maxArrayPoolRentalSize: default,
497-
async,
498-
cancellationToken).ConfigureAwait(false);
486+
Stream bufferedContent = new PooledMemoryStream(_arrayPool, Constants.MB);
487+
if (count.HasValue)
488+
{
489+
await source.CopyToExactInternal(bufferedContent, count.Value, async, cancellationToken).ConfigureAwait(false);
490+
}
491+
else
492+
{
493+
await source.CopyToInternal(bufferedContent, async, cancellationToken).ConfigureAwait(false);
494+
}
495+
bufferedContent.Position = 0;
499496

500497
if (usingChecksumStream)
501498
{
@@ -893,12 +890,6 @@ private static async IAsyncEnumerable<ContentPartition<Stream>> GetStreamPartiti
893890
bool async,
894891
[EnumeratorCancellation] CancellationToken cancellationToken)
895892
{
896-
// The minimum amount of data we'll accept from a stream before
897-
// splitting another block. Code that sets `blockSize` will always
898-
// set it to a positive number. Min() only avoids edge case where
899-
// user sets their block size to 1.
900-
long acceptableBlockSize = Math.Max(1, blockSize / 2);
901-
902893
// if we know the data length, assert boundaries before spending resources uploading beyond service capabilities
903894
if (streamLength.HasValue)
904895
{
@@ -910,8 +901,6 @@ private static async IAsyncEnumerable<ContentPartition<Stream>> GetStreamPartiti
910901
{
911902
throw Errors.InsufficientStorageTransferOptions(streamLength.Value, blockSize, minRequiredBlockSize);
912903
}
913-
// bring min up to our min required by the service
914-
acceptableBlockSize = Math.Max(acceptableBlockSize, minRequiredBlockSize);
915904
}
916905

917906
long read;
@@ -920,7 +909,6 @@ private static async IAsyncEnumerable<ContentPartition<Stream>> GetStreamPartiti
920909
{
921910
(Stream partition, ReadOnlyMemory<byte> partitionChecksum) = await getNextPartition(
922911
stream,
923-
acceptableBlockSize,
924912
blockSize,
925913
absolutePosition,
926914
async,
@@ -954,11 +942,8 @@ private static async IAsyncEnumerable<ContentPartition<Stream>> GetStreamPartiti
954942
/// <param name="stream">
955943
/// Stream to buffer a partition from.
956944
/// </param>
957-
/// <param name="minCount">
958-
/// Minimum amount of data to wait on before finalizing buffer.
959-
/// </param>
960-
/// <param name="maxCount">
961-
/// Max amount of data to buffer before cutting off for the next.
945+
/// <param name="count">
946+
/// Amount of data to wait on before finalizing buffer.
962947
/// </param>
963948
/// <param name="absolutePosition">
964949
/// Offset of this stream relative to the large stream.
@@ -974,8 +959,7 @@ private static async IAsyncEnumerable<ContentPartition<Stream>> GetStreamPartiti
974959
/// </returns>
975960
private async Task<(Stream PartitionContent, ReadOnlyMemory<byte> PartitionChecksum)> GetBufferedPartitionInternal(
976961
Stream stream,
977-
long minCount,
978-
long maxCount,
962+
long count,
979963
long absolutePosition,
980964
bool async,
981965
CancellationToken cancellationToken)
@@ -984,8 +968,7 @@ private static async IAsyncEnumerable<ContentPartition<Stream>> GetStreamPartiti
984968
(Stream slicedStream, UploadTransferValidationOptions validationOptions)
985969
= await BufferAndOptionalChecksumStreamInternal(
986970
stream,
987-
minCount,
988-
maxCount,
971+
count,
989972
new UploadTransferValidationOptions { ChecksumAlgorithm = _validationAlgorithm },
990973
async,
991974
cancellationToken).ConfigureAwait(false);
@@ -1002,10 +985,7 @@ private static async IAsyncEnumerable<ContentPartition<Stream>> GetStreamPartiti
1002985
/// <param name="stream">
1003986
/// Stream to wrap.
1004987
/// </param>
1005-
/// <param name="minCount">
1006-
/// Unused, but part of <see cref="GetNextStreamPartition"/> definition.
1007-
/// </param>
1008-
/// <param name="maxCount">
988+
/// <param name="count">
1009989
/// Length of this facade stream.
1010990
/// </param>
1011991
/// <param name="absolutePosition">
@@ -1020,8 +1000,7 @@ private static async IAsyncEnumerable<ContentPartition<Stream>> GetStreamPartiti
10201000
/// </returns>
10211001
private async Task<(Stream PartitionContent, ReadOnlyMemory<byte> PartitionChecksum)> GetStreamedPartitionInternal(
10221002
Stream stream,
1023-
long minCount,
1024-
long maxCount,
1003+
long count,
10251004
long absolutePosition,
10261005
bool async,
10271006
CancellationToken cancellationToken)
@@ -1030,7 +1009,7 @@ private static async IAsyncEnumerable<ContentPartition<Stream>> GetStreamPartiti
10301009
{
10311010
throw Errors.InvalidArgument(nameof(stream));
10321011
}
1033-
var partitionStream = WindowStream.GetWindow(stream, maxCount);
1012+
var partitionStream = WindowStream.GetWindow(stream, count);
10341013
// this resets stream position for us
10351014
var checksum = await ContentHasher.GetHashOrDefaultInternal(
10361015
partitionStream,

0 commit comments

Comments
 (0)