Skip to content

Commit 99cb94e

Browse files
ShareFileClient uses PartitionedUploader (Azure#25732)
* initial file upload port ShareFileClient uses PartitionedUploader instead of custom implementation. TransactionalHashingTests are failing due to diagnostic scopes. * added ForwardsClientCalls * reset tests * Changelog Co-authored-by: jschrepp-MSFT <[email protected]>
1 parent eed2d1d commit 99cb94e

File tree

4 files changed

+85
-106
lines changed

4 files changed

+85
-106
lines changed

sdk/storage/Azure.Storage.Common/src/Shared/PartitionedUploader.cs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616
namespace Azure.Storage
1717
{
18-
internal class PartitionedUploader<TServiceSpecificArgs, TCompleteUploadReturn>
18+
internal class PartitionedUploader<TServiceSpecificData, TCompleteUploadReturn>
1919
{
2020
#region Definitions
2121
// delegte for getting a partition from a stream based on the selected data management stragegy
@@ -29,25 +29,25 @@ private delegate Task<SlicedStream> GetNextStreamPartition(
2929

3030
// injected behaviors for services to use partitioned uploads
3131
public delegate DiagnosticScope CreateScope(string operationName);
32-
public delegate Task InitializeDestinationInternal(TServiceSpecificArgs args, bool async, CancellationToken cancellationToken);
32+
public delegate Task InitializeDestinationInternal(TServiceSpecificData args, bool async, CancellationToken cancellationToken);
3333
public delegate Task<Response<TCompleteUploadReturn>> SingleUploadInternal(
3434
Stream contentStream,
35-
TServiceSpecificArgs args,
35+
TServiceSpecificData args,
3636
IProgress<long> progressHandler,
3737
UploadTransactionalHashingOptions hashingOptions,
3838
string operationName,
3939
bool async,
4040
CancellationToken cancellationToken);
4141
public delegate Task UploadPartitionInternal(Stream contentStream,
4242
long offset,
43-
TServiceSpecificArgs args,
43+
TServiceSpecificData args,
4444
IProgress<long> progressHandler,
4545
UploadTransactionalHashingOptions hashingOptions,
4646
bool async,
4747
CancellationToken cancellationToken);
4848
public delegate Task<Response<TCompleteUploadReturn>> CommitPartitionedUploadInternal(
4949
List<(long Offset, long Size)> partitions,
50-
TServiceSpecificArgs args,
50+
TServiceSpecificData args,
5151
bool async,
5252
CancellationToken cancellationToken);
5353

@@ -165,7 +165,7 @@ public PartitionedUploader(
165165
public async Task<Response<TCompleteUploadReturn>> UploadInternal(
166166
Stream content,
167167
long? expectedContentLength,
168-
TServiceSpecificArgs args,
168+
TServiceSpecificData args,
169169
IProgress<long> progressHandler,
170170
bool async,
171171
CancellationToken cancellationToken = default)
@@ -270,7 +270,7 @@ private async Task<Response<TCompleteUploadReturn>> UploadInSequenceInternal(
270270
Stream content,
271271
long? contentLength,
272272
long partitionSize,
273-
TServiceSpecificArgs args,
273+
TServiceSpecificData args,
274274
IProgress<long> progressHandler,
275275
bool async,
276276
CancellationToken cancellationToken)
@@ -364,7 +364,7 @@ private async Task<Response<TCompleteUploadReturn>> UploadInParallelAsync(
364364
Stream content,
365365
long? contentLength,
366366
long blockSize,
367-
TServiceSpecificArgs args,
367+
TServiceSpecificData args,
368368
IProgress<long> progressHandler,
369369
CancellationToken cancellationToken)
370370
{
@@ -466,7 +466,7 @@ private async Task<Response<TCompleteUploadReturn>> UploadInParallelAsync(
466466
private async Task StagePartitionAndDisposeInternal(
467467
SlicedStream partition,
468468
long offset,
469-
TServiceSpecificArgs args,
469+
TServiceSpecificData args,
470470
IProgress<long> progressHandler,
471471
bool async,
472472
CancellationToken cancellationToken)

sdk/storage/Azure.Storage.Files.Shares/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
### Breaking Changes
88

99
### Bugs Fixed
10+
- Fixed a memory leak in ShareFileClient.UploadAsync().
1011

1112
### Other Changes
1213

sdk/storage/Azure.Storage.Files.Shares/src/Azure.Storage.Files.Shares.csproj

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
<Project Sdk="Microsoft.NET.Sdk">
1+
<Project Sdk="Microsoft.NET.Sdk">
22
<PropertyGroup>
33
<TargetFrameworks>$(RequiredTargetFrameworks)</TargetFrameworks>
44
</PropertyGroup>
@@ -57,6 +57,7 @@
5757
<Compile Include="$(AzureStorageSharedSources)IHasher.cs" LinkBase="Shared" />
5858
<Compile Include="$(AzureStorageSharedSources)NonCryptographicHashAlgorithmHasher.cs" LinkBase="Shared" />
5959
<Compile Include="$(AzureStorageSharedSources)NonDisposingStream.cs" LinkBase="Shared" />
60+
<Compile Include="$(AzureStorageSharedSources)PartitionedUploader.cs" LinkBase="Shared" />
6061
<Compile Include="$(AzureStorageSharedSources)PooledMemoryStream.cs" LinkBase="Shared" />
6162
<Compile Include="$(AzureStorageSharedSources)ProgressIncrementingStream.cs" LinkBase="Shared" />
6263
<Compile Include="$(AzureStorageSharedSources)SasExtensions.cs" LinkBase="Shared" />
@@ -81,6 +82,7 @@
8182
<Compile Include="$(AzureStorageSharedSources)StreamPartition.cs" LinkBase="Shared" />
8283
<Compile Include="$(AzureStorageSharedSources)UriExtensions.cs" LinkBase="Shared" />
8384
<Compile Include="$(AzureStorageSharedSources)UriQueryParamsCollection.cs" LinkBase="Shared" />
85+
<Compile Include="$(AzureStorageSharedSources)WindowStream.cs" LinkBase="Shared" />
8486
<Compile Include="$(AzureStorageSharedSources)StorageWriteStream.cs" LinkBase="Shared" />
8587
<Compile Include="$(AzureStorageSharedSources)LazyLoadingReadOnlyStream.cs" LinkBase="Shared" />
8688
<Compile Include="$(AzureStorageSharedSources)\StorageBearerTokenChallengeAuthorizationPolicy.cs" LinkBase="Shared" />

sdk/storage/Azure.Storage.Files.Shares/src/ShareFileClient.cs

Lines changed: 72 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -4543,6 +4543,7 @@ private async Task<Response<ShareFileUploadInfo>> UploadRangeFromUriInternal(
45434543
/// A <see cref="RequestFailedException"/> will be thrown if
45444544
/// a failure occurs.
45454545
/// </remarks>
4546+
[ForwardsClientCalls]
45464547
public virtual Response<ShareFileUploadInfo> Upload(
45474548
Stream stream,
45484549
ShareFileUploadOptions options,
@@ -4583,6 +4584,7 @@ public virtual Response<ShareFileUploadInfo> Upload(
45834584
/// A <see cref="RequestFailedException"/> will be thrown if
45844585
/// a failure occurs.
45854586
/// </remarks>
4587+
[ForwardsClientCalls]
45864588
public virtual async Task<Response<ShareFileUploadInfo>> UploadAsync(
45874589
Stream stream,
45884590
ShareFileUploadOptions options,
@@ -4833,115 +4835,89 @@ internal async Task<Response<ShareFileUploadInfo>> UploadInternal(
48334835
bool async,
48344836
CancellationToken cancellationToken)
48354837
{
4836-
Errors.VerifyStreamPosition(content, nameof(content));
4838+
var uploader = GetPartitionedUploader(
4839+
new StorageTransferOptions
4840+
{
4841+
// shares can't suppot parallel upload
4842+
MaximumConcurrency = 1,
4843+
MaximumTransferSize = singleRangeThreshold,
4844+
InitialTransferSize = singleRangeThreshold
4845+
},
4846+
hashingOptions,
4847+
operationName: $"{nameof(ShareFileClient)}.{nameof(Upload)}");
4848+
4849+
return await uploader.UploadInternal(
4850+
content,
4851+
expectedContentLength: default,
4852+
new ShareFileUploadData
4853+
{
4854+
Conditions = conditions,
4855+
},
4856+
progressHandler,
4857+
async,
4858+
cancellationToken)
4859+
.ConfigureAwait(false);
4860+
}
4861+
#endregion Upload
48374862

4838-
// partitioned uploads don't support pre-calculated hashes
4839-
if (hashingOptions?.PrecalculatedHash != default)
4840-
{
4841-
throw Errors.PrecalculatedHashNotSupportedOnSplit();
4842-
}
4863+
#region PartitionedUploader
4864+
internal class ShareFileUploadData
4865+
{
4866+
public ShareFileRequestConditions Conditions { get; set; }
4867+
public Response<ShareFileUploadInfo> LastUploadRangeResponse { get; set; }
4868+
}
48434869

4844-
DiagnosticScope scope = ClientConfiguration.ClientDiagnostics.CreateScope($"{nameof(ShareFileClient)}.{nameof(Upload)}");
4845-
try
4870+
internal PartitionedUploader<ShareFileUploadData, ShareFileUploadInfo> GetPartitionedUploader(
4871+
StorageTransferOptions transferOptions,
4872+
UploadTransactionalHashingOptions hashingOptions,
4873+
ArrayPool<byte> arrayPool = null,
4874+
string operationName = null)
4875+
=> new PartitionedUploader<ShareFileUploadData, ShareFileUploadInfo>(
4876+
GetPartitionedUploaderBehaviors(this),
4877+
transferOptions,
4878+
hashingOptions,
4879+
arrayPool,
4880+
operationName);
4881+
4882+
// static because it makes mocking easier in tests
4883+
internal static PartitionedUploader<ShareFileUploadData, ShareFileUploadInfo>.Behaviors GetPartitionedUploaderBehaviors(ShareFileClient client)
4884+
=> new PartitionedUploader<ShareFileUploadData, ShareFileUploadInfo>.Behaviors
48464885
{
4847-
scope.Start();
4848-
4849-
// Try to upload the file as a single range
4850-
Debug.Assert(singleRangeThreshold <= Constants.File.MaxFileUpdateRange);
4851-
var length = content?.Length - content?.Position;
4852-
if (length <= singleRangeThreshold)
4886+
SingleUpload = async (stream, data, progressHandler, hashingOptions, operationName, async, cancellationToken) =>
48534887
{
4854-
return await UploadRangeInternal(
4855-
new HttpRange(0, length),
4856-
content,
4888+
return await client.UploadRangeInternal(
4889+
new HttpRange(offset: 0, length: stream.Length),
4890+
stream,
48574891
new ShareFileUploadRangeOptions
48584892
{
4859-
TransactionalHashingOptions = hashingOptions,
4893+
Conditions = data.Conditions,
48604894
ProgressHandler = progressHandler,
4861-
Conditions = conditions
4895+
TransactionalHashingOptions = hashingOptions,
48624896
},
48634897
async,
48644898
cancellationToken)
48654899
.ConfigureAwait(false);
4866-
}
4867-
4868-
// Otherwise naively split the file into ranges and upload them individually
4869-
var response = default(Response<ShareFileUploadInfo>);
4870-
var pool = default(MemoryPool<byte>);
4871-
// erase potential precalculated hash now that we're splitting; we'll have to recalculate.
4872-
hashingOptions = hashingOptions == default
4873-
? default
4874-
: new UploadTransactionalHashingOptions
4875-
{
4876-
Algorithm = hashingOptions.Algorithm
4877-
};
4878-
4879-
long initalPosition = content.Position;
4880-
4881-
try
4900+
},
4901+
UploadPartition = async (stream, offset, data, progressHandler, hashingOptions, async, cancellationToken) =>
48824902
{
4883-
pool = (singleRangeThreshold < MemoryPool<byte>.Shared.MaxBufferSize) ?
4884-
MemoryPool<byte>.Shared :
4885-
new StorageMemoryPool(singleRangeThreshold, 1);
4886-
for (; ; )
4887-
{
4888-
// Get the next chunk of content
4889-
var parentPosition = content.Position;
4890-
IMemoryOwner<byte> buffer = pool.Rent(singleRangeThreshold);
4891-
if (!MemoryMarshal.TryGetArray<byte>(buffer.Memory, out ArraySegment<byte> segment))
4903+
data.LastUploadRangeResponse = await client.UploadRangeInternal(
4904+
new HttpRange(offset: offset, length: stream.Length),
4905+
stream,
4906+
new ShareFileUploadRangeOptions
48924907
{
4893-
throw Errors.UnableAccessArray();
4894-
}
4895-
var count = async ?
4896-
await content.ReadAsync(segment.Array, 0, singleRangeThreshold, cancellationToken).ConfigureAwait(false) :
4897-
content.Read(segment.Array, 0, singleRangeThreshold);
4898-
4899-
// Stop when we've exhausted the content
4900-
if (count <= 0)
4901-
{ break; }
4902-
4903-
// Upload the chunk
4904-
var partition = new StreamPartition(
4905-
buffer.Memory,
4906-
parentPosition,
4907-
count,
4908-
() => buffer.Dispose(),
4909-
cancellationToken);
4910-
response = await UploadRangeInternal(
4911-
new HttpRange(partition.ParentPosition - initalPosition, partition.Length),
4912-
partition,
4913-
new ShareFileUploadRangeOptions
4914-
{
4915-
TransactionalHashingOptions = hashingOptions,
4916-
ProgressHandler = progressHandler,
4917-
Conditions = conditions
4918-
},
4919-
async,
4920-
cancellationToken)
4921-
.ConfigureAwait(false);
4922-
}
4923-
}
4924-
finally
4925-
{
4926-
if (pool is StorageMemoryPool)
4927-
{
4928-
pool.Dispose();
4929-
}
4930-
}
4931-
return response;
4932-
}
4933-
catch (Exception ex)
4934-
{
4935-
ClientConfiguration.Pipeline.LogException(ex);
4936-
scope.Failed(ex);
4937-
throw;
4938-
}
4939-
finally
4940-
{
4941-
scope.Dispose();
4942-
}
4943-
}
4944-
#endregion Upload
4908+
Conditions = data.Conditions,
4909+
ProgressHandler = progressHandler,
4910+
TransactionalHashingOptions = hashingOptions,
4911+
},
4912+
async,
4913+
cancellationToken)
4914+
.ConfigureAwait(false);
4915+
},
4916+
CommitPartitionedUpload = (partitions, data, async, cancellationToken) => Task.FromResult(data.LastUploadRangeResponse),
4917+
Scope = operationName => client.ClientConfiguration.ClientDiagnostics.CreateScope(operationName ??
4918+
$"{nameof(Azure)}.{nameof(Storage)}.{nameof(Files)}.{nameof(Shares)}.{nameof(ShareFileClient)}.{nameof(ShareFileClient.Upload)}")
4919+
};
4920+
#endregion
49454921

49464922
#region GetRangeList
49474923
/// <summary>

0 commit comments

Comments
 (0)