Skip to content

Commit 11d728f

Browse files
rlrossiterRyan Rossiter (from Dev Box)
andauthored
Ensure all StorageWriteStream disposables are disposed (Azure#47783)
* Ensure all StorageWriteStream disposables are disposed Because the other calls within Dispose can error out, there needs to be a try around them so even if they fail, the _accumulatedDisposables can still be disposed after they are complete to prevent resource leakage. * Add test to exemplify memory leak --------- Co-authored-by: Ryan Rossiter (from Dev Box) <[email protected]>
1 parent 86d228d commit 11d728f

File tree

2 files changed

+114
-3
lines changed

2 files changed

+114
-3
lines changed

sdk/storage/Azure.Storage.Common/src/Shared/StorageWriteStream.cs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -330,9 +330,15 @@ protected override void Dispose(bool disposing)
330330

331331
if (disposing)
332332
{
333-
Flush();
334-
ValidateCallerCrcIfAny();
335-
_accumulatedDisposables.Dispose();
333+
try
334+
{
335+
Flush();
336+
ValidateCallerCrcIfAny();
337+
}
338+
finally
339+
{
340+
_accumulatedDisposables.Dispose();
341+
}
336342
}
337343

338344
_disposed = true;

sdk/storage/Azure.Storage.Common/tests/StorageWriteStreamTests.cs

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,48 @@ public async Task WritesLargerThanBufferNonAligned()
188188
mockBuffer.Verify(r => r.WriteAsync(data[1], 1072, 928, default));
189189
}
190190

191+
[Test]
192+
public async Task ErrorsInCommitCleanupArrayPoolOnDispose()
193+
{
194+
// Arrange
195+
int bufferSize = Constants.KB;
196+
int writeSize = 256;
197+
198+
Mock<PooledMemoryStream> mockBuffer = new(
199+
MockBehavior.Loose,
200+
ArrayPool<byte>.Shared,
201+
Constants.MB)
202+
{
203+
CallBase = true,
204+
};
205+
206+
RentReturnTrackingArrayPool<byte> bufferPool = new RentReturnTrackingArrayPool<byte>();
207+
208+
StorageWriteStreamWithFlushError stream = new StorageWriteStreamWithFlushError(
209+
position: 0,
210+
bufferSize: bufferSize,
211+
progressHandler: null,
212+
buffer: mockBuffer.Object,
213+
bufferPool: bufferPool);
214+
215+
// Act
216+
// Do one write, and then explicitly dispose
217+
// This dispose will raise an exception (as it has a Commit/Flush failure),
218+
// and then we will assert that after the dispose, the proper ArrayPool calls were made.
219+
await stream.WriteAsync(GetRandomBuffer(writeSize), 0, writeSize);
220+
221+
InvalidOperationException ex = Assert.Throws<InvalidOperationException>(stream.Dispose);
222+
223+
// Assert
224+
Assert.AreEqual(2, stream.ApiCalls.Count);
225+
Assert.AreEqual(s_append, stream.ApiCalls[0]);
226+
Assert.AreEqual(s_flush, stream.ApiCalls[1]);
227+
Assert.AreEqual(ex.Message, "Flush failed");
228+
229+
Assert.AreEqual(3, bufferPool.RentCount); // Crc, Checksum, and buffer is rented from the bufferPool
230+
Assert.AreEqual(3, bufferPool.ReturnCount, "Not all allocated array pool arrays were properly returned");
231+
}
232+
191233
internal class StorageWriteStreamImplementation : StorageWriteStream
192234
{
193235
public List<string> ApiCalls;
@@ -233,6 +275,69 @@ protected override void ValidateBufferSize(long bufferSize)
233275
{
234276
}
235277
}
278+
279+
internal class StorageWriteStreamWithFlushError : StorageWriteStream
280+
{
281+
public List<string> ApiCalls;
282+
283+
public StorageWriteStreamWithFlushError(
284+
long position,
285+
long bufferSize,
286+
IProgress<long> progressHandler,
287+
PooledMemoryStream buffer,
288+
RentReturnTrackingArrayPool<byte> bufferPool)
289+
: base(
290+
position,
291+
bufferSize,
292+
progressHandler,
293+
transferValidation: new UploadTransferValidationOptions
294+
{
295+
ChecksumAlgorithm = StorageChecksumAlgorithm.Auto
296+
},
297+
buffer,
298+
bufferPool)
299+
{
300+
ApiCalls = new List<string>();
301+
}
302+
303+
protected override Task AppendInternal(UploadTransferValidationOptions validationOptions, bool async, CancellationToken cancellationToken)
304+
{
305+
ApiCalls.Add(s_append);
306+
return Task.CompletedTask;
307+
}
308+
309+
protected override Task CommitInternal(bool async, CancellationToken cancellationToken)
310+
{
311+
ApiCalls.Add(s_flush);
312+
throw new InvalidOperationException("Flush failed");
313+
}
314+
315+
protected override void ValidateBufferSize(long bufferSize)
316+
{
317+
}
318+
}
319+
320+
internal class RentReturnTrackingArrayPool<T> : ArrayPool<T>
321+
{
322+
private int _rentCount = 0;
323+
private int _returnCount = 0;
324+
325+
public int RentCount => _rentCount;
326+
public int ReturnCount => _returnCount;
327+
328+
public override T[] Rent(int minimumLength)
329+
{
330+
Interlocked.Increment(ref _rentCount);
331+
return Shared.Rent(minimumLength);
332+
}
333+
334+
public override void Return(T[] array, bool clearArray = false)
335+
{
336+
Interlocked.Increment(ref _returnCount);
337+
Shared.Return(array, clearArray);
338+
}
339+
}
340+
236341
private static byte[] GetRandomBuffer(long size)
237342
{
238343
Random random = new Random(Environment.TickCount);

0 commit comments

Comments
 (0)