Skip to content

Commit 874f44b

Browse files
[Storage][DataMovement] Add bounds to transfer queues (Azure#47348)
1 parent 68e212d commit 874f44b

File tree

9 files changed

+107
-105
lines changed

9 files changed

+107
-105
lines changed

sdk/storage/Azure.Storage.DataMovement/src/ChannelProcessing.cs

Lines changed: 27 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -15,27 +15,35 @@ namespace Azure.Storage.DataMovement;
1515
internal interface IProcessor<TItem> : IDisposable
1616
{
1717
ValueTask QueueAsync(TItem item, CancellationToken cancellationToken = default);
18+
bool TryComplete();
1819
ProcessAsync<TItem> Process { get; set; }
1920
}
2021

2122
internal static class ChannelProcessing
2223
{
23-
public static IProcessor<T> NewProcessor<T>(int parallelism)
24+
public static IProcessor<T> NewProcessor<T>(int readers, int? capacity = null)
2425
{
25-
Argument.AssertInRange(parallelism, 1, int.MaxValue, nameof(parallelism));
26-
return parallelism == 1
27-
? new SequentialChannelProcessor<T>(
28-
Channel.CreateUnbounded<T>(new UnboundedChannelOptions()
29-
{
30-
AllowSynchronousContinuations = true,
31-
SingleReader = true,
32-
}))
33-
: new ParallelChannelProcessor<T>(
34-
Channel.CreateUnbounded<T>(new UnboundedChannelOptions()
35-
{
36-
AllowSynchronousContinuations = true,
37-
}),
38-
parallelism);
26+
Argument.AssertInRange(readers, 1, int.MaxValue, nameof(readers));
27+
if (capacity.HasValue)
28+
{
29+
Argument.AssertInRange(capacity.Value, 1, int.MaxValue, nameof(capacity));
30+
}
31+
32+
Channel<T> channel = capacity.HasValue
33+
? Channel.CreateBounded<T>(new BoundedChannelOptions(capacity.Value)
34+
{
35+
AllowSynchronousContinuations = true,
36+
SingleReader = readers == 1,
37+
FullMode = BoundedChannelFullMode.Wait,
38+
})
39+
: Channel.CreateUnbounded<T>(new UnboundedChannelOptions()
40+
{
41+
AllowSynchronousContinuations = true,
42+
SingleReader = readers == 1,
43+
});
44+
return readers == 1
45+
? new SequentialChannelProcessor<T>(channel)
46+
: new ParallelChannelProcessor<T>(channel, readers);
3947
}
4048

4149
private abstract class ChannelProcessor<TItem> : IProcessor<TItem>, IDisposable
@@ -82,6 +90,8 @@ public async ValueTask QueueAsync(TItem item, CancellationToken cancellationToke
8290
await _channel.Writer.WriteAsync(item, cancellationToken).ConfigureAwait(false);
8391
}
8492

93+
public bool TryComplete() => _channel.Writer.TryComplete();
94+
8595
protected abstract ValueTask NotifyOfPendingItemProcessing();
8696

8797
public void Dispose()
@@ -126,11 +136,11 @@ public ParallelChannelProcessor(
126136

127137
protected override async ValueTask NotifyOfPendingItemProcessing()
128138
{
129-
List<Task> chunkRunners = new List<Task>(DataMovementConstants.MaxJobPartReaders);
139+
List<Task> chunkRunners = new List<Task>(_maxConcurrentProcessing);
130140
while (await _channel.Reader.WaitToReadAsync(_cancellationToken).ConfigureAwait(false))
131141
{
132142
TItem item = await _channel.Reader.ReadAsync(_cancellationToken).ConfigureAwait(false);
133-
if (chunkRunners.Count >= DataMovementConstants.MaxJobPartReaders)
143+
if (chunkRunners.Count >= _maxConcurrentProcessing)
134144
{
135145
// Clear any completed blocks from the task list
136146
int removedRunners = chunkRunners.RemoveAll(x => x.IsCompleted || x.IsCanceled || x.IsFaulted);

sdk/storage/Azure.Storage.DataMovement/src/DownloadChunkHandler.cs

Lines changed: 32 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
using System;
55
using System.IO;
66
using System.Threading;
7-
using System.Threading.Channels;
87
using System.Threading.Tasks;
98
using Azure.Storage.Common;
109

@@ -33,11 +32,10 @@ public struct Behaviors
3332
}
3433

3534
/// <summary>
36-
/// Create channel of <see cref="QueueDownloadChunkArgs"/> to keep track to handle
37-
/// writing downloaded chunks to the destination as well as tracking overall progress.
35+
/// Create channel of <see cref="QueueDownloadChunkArgs"/> to handle writing
36+
/// downloaded chunks to the destination as well as tracking overall progress.
3837
/// </summary>
39-
private readonly Channel<QueueDownloadChunkArgs> _downloadRangeChannel;
40-
private readonly Task _processDownloadRangeEvents;
38+
private readonly IProcessor<QueueDownloadChunkArgs> _downloadRangeProcessor;
4139
private readonly CancellationToken _cancellationToken;
4240

4341
private long _bytesTransferred;
@@ -66,30 +64,18 @@ public DownloadChunkHandler(
6664
Behaviors behaviors,
6765
CancellationToken cancellationToken)
6866
{
69-
// Set bytes transferred to the length of bytes we got back from the initial
70-
// download request
71-
_bytesTransferred = currentTransferred;
72-
73-
// The size of the channel should never exceed 50k (limit on blocks in a block blob).
74-
// and that's in the worst case that we never read from the channel and had a maximum chunk blob.
75-
_downloadRangeChannel = Channel.CreateUnbounded<QueueDownloadChunkArgs>(
76-
new UnboundedChannelOptions()
77-
{
78-
// Single reader is required as we can only have one writer to the destination.
79-
SingleReader = true,
80-
});
81-
_processDownloadRangeEvents = Task.Run(NotifyOfPendingChunkDownloadEvents);
82-
_cancellationToken = cancellationToken;
83-
84-
_expectedLength = expectedLength;
85-
8667
if (expectedLength <= 0)
8768
{
8869
throw Errors.InvalidExpectedLength(expectedLength);
8970
}
9071
Argument.AssertNotNull(behaviors, nameof(behaviors));
9172

92-
// Set values
73+
_cancellationToken = cancellationToken;
74+
// Set bytes transferred to the length of bytes we got back from the initial
75+
// download request
76+
_bytesTransferred = currentTransferred;
77+
_expectedLength = expectedLength;
78+
9379
_copyToDestinationFile = behaviors.CopyToDestinationFile
9480
?? throw Errors.ArgumentNull(nameof(behaviors.CopyToDestinationFile));
9581
_reportProgressInBytes = behaviors.ReportProgressInBytes
@@ -98,44 +84,43 @@ public DownloadChunkHandler(
9884
?? throw Errors.ArgumentNull(nameof(behaviors.InvokeFailedHandler));
9985
_queueCompleteFileDownload = behaviors.QueueCompleteFileDownload
10086
?? throw Errors.ArgumentNull(nameof(behaviors.QueueCompleteFileDownload));
87+
88+
_downloadRangeProcessor = ChannelProcessing.NewProcessor<QueueDownloadChunkArgs>(
89+
readers: 1,
90+
capacity: DataMovementConstants.Channels.DownloadChunkCapacity);
91+
_downloadRangeProcessor.Process = ProcessDownloadRange;
10192
}
10293

10394
public void Dispose()
10495
{
105-
_downloadRangeChannel.Writer.TryComplete();
96+
_downloadRangeProcessor.TryComplete();
10697
}
10798

108-
public void QueueChunk(QueueDownloadChunkArgs args)
99+
public async ValueTask QueueChunkAsync(QueueDownloadChunkArgs args)
109100
{
110-
_downloadRangeChannel.Writer.TryWrite(args);
101+
await _downloadRangeProcessor.QueueAsync(args).ConfigureAwait(false);
111102
}
112103

113-
private async Task NotifyOfPendingChunkDownloadEvents()
104+
private async Task ProcessDownloadRange(QueueDownloadChunkArgs args, CancellationToken cancellationToken = default)
114105
{
115106
try
116107
{
117-
while (await _downloadRangeChannel.Reader.WaitToReadAsync(_cancellationToken).ConfigureAwait(false))
108+
// Copy the current chunk to the destination
109+
using (Stream content = args.Content)
118110
{
119-
// Read one event argument at a time.
120-
QueueDownloadChunkArgs args = await _downloadRangeChannel.Reader.ReadAsync(_cancellationToken).ConfigureAwait(false);
121-
122-
// Copy the current chunk to the destination
123-
using (Stream content = args.Content)
124-
{
125-
await _copyToDestinationFile(
126-
args.Offset,
127-
args.Length,
128-
content,
129-
_expectedLength,
130-
initial: _bytesTransferred == 0).ConfigureAwait(false);
131-
}
132-
UpdateBytesAndRange(args.Length);
111+
await _copyToDestinationFile(
112+
args.Offset,
113+
args.Length,
114+
content,
115+
_expectedLength,
116+
initial: _bytesTransferred == 0).ConfigureAwait(false);
117+
}
118+
UpdateBytesAndRange(args.Length);
133119

134-
// Check if we finished downloading the blob
135-
if (_bytesTransferred == _expectedLength)
136-
{
137-
await _queueCompleteFileDownload().ConfigureAwait(false);
138-
}
120+
// Check if we finished downloading the blob
121+
if (_bytesTransferred == _expectedLength)
122+
{
123+
await _queueCompleteFileDownload().ConfigureAwait(false);
139124
}
140125
}
141126
catch (Exception ex)
@@ -145,10 +130,6 @@ await _copyToDestinationFile(
145130
}
146131
}
147132

148-
/// <summary>
149-
/// Moves the downloader to the next range and updates/reports bytes transferred.
150-
/// </summary>
151-
/// <param name="bytesDownloaded"></param>
152133
private void UpdateBytesAndRange(long bytesDownloaded)
153134
{
154135
_bytesTransferred += bytesDownloaded;

sdk/storage/Azure.Storage.DataMovement/src/JobPartInternal.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
using System.IO;
88
using System.Linq;
99
using System.Threading;
10+
using System.Threading.Channels;
1011
using System.Threading.Tasks;
1112
using Azure.Core;
1213
using Azure.Core.Pipeline;
@@ -379,6 +380,7 @@ public async virtual Task InvokeFailedArgAsync(Exception ex)
379380
{
380381
if (ex is not OperationCanceledException &&
381382
ex is not TaskCanceledException &&
383+
ex is not ChannelClosedException &&
382384
ex.InnerException is not TaskCanceledException &&
383385
!ex.Message.Contains("The request was canceled."))
384386
{

sdk/storage/Azure.Storage.DataMovement/src/Shared/DataMovementConstants.cs

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,27 @@
11
// Copyright (c) Microsoft Corporation. All rights reserved.
22
// Licensed under the MIT License.
33

4+
using System;
5+
46
namespace Azure.Storage.DataMovement
57
{
68
internal class DataMovementConstants
79
{
8-
/// <summary>
9-
/// Constants of the Data Movement library
10-
/// </summary>
11-
internal const int InitialMainPoolSize = 32;
12-
internal const int InitialDownloadFileThreads = 32; // Max is 3000
13-
internal const int CpuTuningMultiplier = 16;
14-
internal const int MaxJobPartReaders = 64;
15-
internal const int MaxJobChunkTasks = 3000;
16-
internal const int StatusCheckInSec = 10;
1710
internal const int DefaultStreamCopyBufferSize = 81920; // Use the .NET default
18-
1911
internal const long DefaultInitialTransferSize = 32 * Constants.MB;
2012
internal const long DefaultChunkSize = 4 * Constants.MB;
2113

2214
public const char PathForwardSlashDelimiterChar = '/';
2315

16+
internal static class Channels
17+
{
18+
internal const int MaxJobPartReaders = 32;
19+
internal static int MaxJobChunkReaders = Environment.ProcessorCount * 8;
20+
internal const int JobPartCapacity = 1000;
21+
internal const int JobChunkCapacity = 1000;
22+
internal const int DownloadChunkCapacity = 16;
23+
}
24+
2425
internal static class ConcurrencyTuner
2526
{
2627
internal const int StandardMultiplier = 2;

sdk/storage/Azure.Storage.DataMovement/src/TransferManager.cs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,13 @@ protected TransferManager()
6060
/// <param name="options">Options that will apply to all transfers started by this TransferManager.</param>
6161
public TransferManager(TransferManagerOptions options = default)
6262
: this(
63-
ChannelProcessing.NewProcessor<TransferJobInternal>(parallelism: 1),
64-
ChannelProcessing.NewProcessor<JobPartInternal>(DataMovementConstants.MaxJobPartReaders),
65-
ChannelProcessing.NewProcessor<Func<Task>>(options?.MaximumConcurrency ?? DataMovementConstants.MaxJobChunkTasks),
63+
ChannelProcessing.NewProcessor<TransferJobInternal>(readers: 1),
64+
ChannelProcessing.NewProcessor<JobPartInternal>(
65+
readers: DataMovementConstants.Channels.MaxJobPartReaders,
66+
capacity: DataMovementConstants.Channels.JobPartCapacity),
67+
ChannelProcessing.NewProcessor<Func<Task>>(
68+
readers: options?.MaximumConcurrency ?? DataMovementConstants.Channels.MaxJobChunkReaders,
69+
capacity: DataMovementConstants.Channels.JobChunkCapacity),
6670
new(ArrayPool<byte>.Shared,
6771
options?.ErrorHandling ?? DataTransferErrorMode.StopOnAnyFailure,
6872
new ClientDiagnostics(options?.ClientOptions ?? ClientOptions.Default)),

sdk/storage/Azure.Storage.DataMovement/src/UriToStreamJobPart.cs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -405,10 +405,13 @@ await dataStream.CopyToAsync(
405405
content.Position = 0;
406406

407407
// The chunk handler may have been disposed in failure case
408-
_downloadChunkHandler?.QueueChunk(new QueueDownloadChunkArgs(
409-
offset: range.Offset,
410-
length: (long)range.Length,
411-
content: content));
408+
if (_downloadChunkHandler != null)
409+
{
410+
await _downloadChunkHandler.QueueChunkAsync(new QueueDownloadChunkArgs(
411+
offset: range.Offset,
412+
length: (long)range.Length,
413+
content: content)).ConfigureAwait(false);
414+
}
412415
}
413416
catch (Exception ex)
414417
{

0 commit comments

Comments
 (0)