Skip to content

Commit b2ac4ba

Browse files
TransferManager task management (Azure#45730)
* ChannelProcessor Encapsulate the management of consuming tasks to a ChannelProcessor. Removes the code for managing Channels and performing actions on them from the TransferManager, allowing it to focus on defining those actions and linking outputs to more work queues. * remove dead delegate * miscellaneous remove extraneous methods. add some cancellation tokens to signatures for cleaner delegate passing * Dependency injection ChannelProcessors accept their channels at construction. ChannelProcessors can have process methods set after the fact. TransferManager gets dependency injection constructor where it links up the channel processors it's given.
1 parent f6e2c75 commit b2ac4ba

File tree

11 files changed

+212
-213
lines changed

11 files changed

+212
-213
lines changed

sdk/storage/Azure.Storage.DataMovement.Blobs/tests/BlobContainerServiceToServiceJobTests.cs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -92,8 +92,6 @@ public async Task ProcessJobToJobPartAsync()
9292
Mock<BlobStorageResourceContainer> sourceMock = GetMockBlobContainerResource();
9393
Mock<BlobStorageResourceContainer> destinationMock = GetMockBlobContainerResource();
9494

95-
Mock<TransferJobInternal.QueueChunkTaskInternal> mockPartQueueChunkTask = MockQueueInternalTasks.GetQueueChunkTask();
96-
9795
// Set up default checkpointer with transfer job
9896
LocalTransferCheckpointer checkpointer = new(default);
9997
await checkpointer.AddNewJobAsync(
@@ -119,7 +117,6 @@ await checkpointer.AddNewJobAsync(
119117
sourceMock.Object,
120118
destinationMock.Object,
121119
new DataTransferOptions(),
122-
mockPartQueueChunkTask.Object,
123120
checkpointer,
124121
DataTransferErrorMode.StopOnAnyFailure,
125122
ArrayPool<byte>.Shared,
@@ -148,8 +145,6 @@ public async Task ProcessJobToJobPartAsync_AllBlobTypes()
148145
Mock<BlobStorageResourceContainer> sourceMock = GetMockBlobContainerResource();
149146
Mock<BlobStorageResourceContainer> destinationMock = GetMockBlobContainerResource();
150147

151-
Mock<TransferJobInternal.QueueChunkTaskInternal> mockPartQueueChunkTask = MockQueueInternalTasks.GetQueueChunkTask();
152-
153148
// Set up default checkpointer with transfer job
154149
LocalTransferCheckpointer checkpointer = new(default);
155150
await checkpointer.AddNewJobAsync(
@@ -178,7 +173,6 @@ await checkpointer.AddNewJobAsync(
178173
sourceMock.Object,
179174
destinationMock.Object,
180175
new DataTransferOptions(),
181-
mockPartQueueChunkTask.Object,
182176
checkpointer,
183177
DataTransferErrorMode.StopOnAnyFailure,
184178
ArrayPool<byte>.Shared,
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
using System;
5+
using System.Collections.Generic;
6+
using System.Threading;
7+
using System.Threading.Channels;
8+
using System.Threading.Tasks;
9+
using Azure.Storage.Common;
10+
using static Azure.Storage.DataMovement.ChannelProcessing;
11+
12+
namespace Azure.Storage.DataMovement;
13+
14+
internal interface IProcessor<TItem> : IDisposable
15+
{
16+
ValueTask QueueAsync(TItem item, CancellationToken cancellationToken = default);
17+
ProcessAsync<TItem> Process { get; set; }
18+
}
19+
20+
internal static class ChannelProcessing
21+
{
22+
public delegate Task ProcessAsync<T>(T item, CancellationToken cancellationToken);
23+
24+
public static IProcessor<T> NewProcessor<T>(int parallelism)
25+
{
26+
Argument.AssertInRange(parallelism, 1, int.MaxValue, nameof(parallelism));
27+
return parallelism == 1
28+
? new SequentialChannelProcessor<T>(
29+
Channel.CreateUnbounded<T>(new UnboundedChannelOptions()
30+
{
31+
AllowSynchronousContinuations = true,
32+
SingleReader = true,
33+
}))
34+
: new ParallelChannelProcessor<T>(
35+
Channel.CreateUnbounded<T>(new UnboundedChannelOptions()
36+
{
37+
AllowSynchronousContinuations = true,
38+
}),
39+
parallelism);
40+
}
41+
42+
private abstract class ChannelProcessor<TItem> : IProcessor<TItem>, IDisposable
43+
{
44+
/// <summary>
45+
/// Async channel reader task. Loops for lifetime of object.
46+
/// </summary>
47+
private Task _processorTask;
48+
49+
/// <summary>
50+
/// Channel of items to process.
51+
/// </summary>
52+
protected readonly Channel<TItem, TItem> _channel;
53+
54+
/// <summary>
55+
/// Cancellation token for disposal.
56+
/// </summary>
57+
private CancellationTokenSource _cancellationTokenSource;
58+
protected CancellationToken _cancellationToken => _cancellationTokenSource.Token;
59+
60+
private ProcessAsync<TItem> _process;
61+
public ProcessAsync<TItem> Process
62+
{
63+
get => _process;
64+
set
65+
{
66+
ProcessAsync<TItem> prev = Interlocked.Exchange(ref _process, value);
67+
if (prev == default)
68+
{
69+
_processorTask = Task.Run(NotifyOfPendingItemProcessing);
70+
}
71+
}
72+
}
73+
74+
protected ChannelProcessor(Channel<TItem, TItem> channel)
75+
{
76+
Argument.AssertNotNull(channel, nameof(channel));
77+
_channel = channel;
78+
_cancellationTokenSource = new();
79+
}
80+
81+
public async ValueTask QueueAsync(TItem item, CancellationToken cancellationToken = default)
82+
{
83+
await _channel.Writer.WriteAsync(item, cancellationToken).ConfigureAwait(false);
84+
}
85+
86+
protected abstract ValueTask NotifyOfPendingItemProcessing();
87+
88+
public void Dispose()
89+
{
90+
if (!_cancellationTokenSource.IsCancellationRequested)
91+
{
92+
_cancellationTokenSource.Cancel();
93+
}
94+
GC.SuppressFinalize(this);
95+
}
96+
}
97+
98+
private class SequentialChannelProcessor<TItem> : ChannelProcessor<TItem>
99+
{
100+
public SequentialChannelProcessor(Channel<TItem, TItem> channel)
101+
: base(channel)
102+
{ }
103+
104+
protected override async ValueTask NotifyOfPendingItemProcessing()
105+
{
106+
// Process all available items in the queue.
107+
while (await _channel.Reader.WaitToReadAsync(_cancellationToken).ConfigureAwait(false))
108+
{
109+
TItem item = await _channel.Reader.ReadAsync(_cancellationToken).ConfigureAwait(false);
110+
await Process(item, _cancellationToken).ConfigureAwait(false);
111+
}
112+
}
113+
}
114+
115+
private class ParallelChannelProcessor<TItem> : ChannelProcessor<TItem>
116+
{
117+
private readonly int _maxConcurrentProcessing;
118+
119+
public ParallelChannelProcessor(
120+
Channel<TItem, TItem> channel,
121+
int maxConcurrentProcessing)
122+
: base(channel)
123+
{
124+
Argument.AssertInRange(maxConcurrentProcessing, 2, int.MaxValue, nameof(maxConcurrentProcessing));
125+
_maxConcurrentProcessing = maxConcurrentProcessing;
126+
}
127+
128+
protected override async ValueTask NotifyOfPendingItemProcessing()
129+
{
130+
List<Task> chunkRunners = new List<Task>(DataMovementConstants.MaxJobPartReaders);
131+
while (await _channel.Reader.WaitToReadAsync(_cancellationToken).ConfigureAwait(false))
132+
{
133+
TItem item = await _channel.Reader.ReadAsync(_cancellationToken).ConfigureAwait(false);
134+
if (chunkRunners.Count >= DataMovementConstants.MaxJobPartReaders)
135+
{
136+
// Clear any completed blocks from the task list
137+
int removedRunners = chunkRunners.RemoveAll(x => x.IsCompleted || x.IsCanceled || x.IsFaulted);
138+
// If no runners have finished..
139+
if (removedRunners == 0)
140+
{
141+
// Wait for at least one runner to finish
142+
await Task.WhenAny(chunkRunners).ConfigureAwait(false);
143+
chunkRunners.RemoveAll(x => x.IsCompleted || x.IsCanceled || x.IsFaulted);
144+
}
145+
}
146+
chunkRunners.Add(Task.Run(async () => await Process(item, _cancellationToken).ConfigureAwait(false)));
147+
}
148+
}
149+
}
150+
}

sdk/storage/Azure.Storage.DataMovement/src/JobPartInternal.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ namespace Azure.Storage.DataMovement
1717
{
1818
internal abstract class JobPartInternal
1919
{
20-
public delegate Task QueueChunkDelegate(Func<Task> item);
20+
public delegate ValueTask QueueChunkDelegate(Func<Task> item, CancellationToken cancellationToken);
2121
public QueueChunkDelegate QueueChunk { get; internal set; }
2222

2323
/// <summary>
@@ -229,7 +229,8 @@ await QueueChunk(
229229
}
230230
Interlocked.Increment(ref _completedChunkCount);
231231
await CheckAndUpdateCancellationStateAsync().ConfigureAwait(false);
232-
}).ConfigureAwait(false);
232+
},
233+
default).ConfigureAwait(false);
233234
}
234235

235236
/// <summary>

sdk/storage/Azure.Storage.DataMovement/src/ServiceToServiceTransferJob.cs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ internal ServiceToServiceTransferJob(
2121
StorageResourceItem sourceResource,
2222
StorageResourceItem destinationResource,
2323
DataTransferOptions transferOptions,
24-
QueueChunkTaskInternal queueChunkTask,
2524
TransferCheckpointer CheckPointFolderPath,
2625
DataTransferErrorMode errorHandling,
2726
ArrayPool<byte> arrayPool,
@@ -30,7 +29,6 @@ internal ServiceToServiceTransferJob(
3029
sourceResource,
3130
destinationResource,
3231
transferOptions,
33-
queueChunkTask,
3432
CheckPointFolderPath,
3533
errorHandling,
3634
arrayPool,
@@ -46,7 +44,6 @@ internal ServiceToServiceTransferJob(
4644
StorageResourceContainer sourceResource,
4745
StorageResourceContainer destinationResource,
4846
DataTransferOptions transferOptions,
49-
QueueChunkTaskInternal queueChunkTask,
5047
TransferCheckpointer checkpointer,
5148
DataTransferErrorMode errorHandling,
5249
ArrayPool<byte> arrayPool,
@@ -55,7 +52,6 @@ internal ServiceToServiceTransferJob(
5552
sourceResource,
5653
destinationResource,
5754
transferOptions,
58-
queueChunkTask,
5955
checkpointer,
6056
errorHandling,
6157
arrayPool,

sdk/storage/Azure.Storage.DataMovement/src/StreamToUriTransferJob.cs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ internal StreamToUriTransferJob(
1919
StorageResourceItem sourceResource,
2020
StorageResourceItem destinationResource,
2121
DataTransferOptions transferOptions,
22-
QueueChunkTaskInternal queueChunkTask,
2322
TransferCheckpointer checkpointer,
2423
DataTransferErrorMode errorHandling,
2524
ArrayPool<byte> arrayPool,
@@ -28,7 +27,6 @@ internal StreamToUriTransferJob(
2827
sourceResource,
2928
destinationResource,
3029
transferOptions,
31-
queueChunkTask,
3230
checkpointer,
3331
errorHandling,
3432
arrayPool,
@@ -44,7 +42,6 @@ internal StreamToUriTransferJob(
4442
StorageResourceContainer sourceResource,
4543
StorageResourceContainer destinationResource,
4644
DataTransferOptions transferOptions,
47-
QueueChunkTaskInternal queueChunkTask,
4845
TransferCheckpointer checkpointer,
4946
DataTransferErrorMode errorHandling,
5047
ArrayPool<byte> arrayPool,
@@ -53,7 +50,6 @@ internal StreamToUriTransferJob(
5350
sourceResource,
5451
destinationResource,
5552
transferOptions,
56-
queueChunkTask,
5753
checkpointer,
5854
errorHandling,
5955
arrayPool,

sdk/storage/Azure.Storage.DataMovement/src/TransferJobInternal.cs

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,6 @@ namespace Azure.Storage.DataMovement
1414
{
1515
internal abstract class TransferJobInternal : IDisposable
1616
{
17-
#region Delegates
18-
public delegate Task QueueChunkTaskInternal(Func<Task> uploadTask);
19-
#endregion
20-
public QueueChunkTaskInternal QueueChunkTask { get; internal set; }
21-
2217
/// <summary>
2318
/// DataTransfer communicate when the transfer has finished and the progress
2419
/// </summary>
@@ -140,7 +135,6 @@ internal protected TransferJobInternal()
140135

141136
private TransferJobInternal(
142137
DataTransfer dataTransfer,
143-
QueueChunkTaskInternal queueChunkTask,
144138
TransferCheckpointer checkPointer,
145139
DataTransferErrorMode errorHandling,
146140
long? initialTransferSize,
@@ -158,7 +152,6 @@ private TransferJobInternal(
158152
_dataTransfer = dataTransfer ?? throw Errors.ArgumentNull(nameof(dataTransfer));
159153
_dataTransfer.TransferStatus.TrySetTransferStateChange(DataTransferState.Queued);
160154
_checkpointer = checkPointer;
161-
QueueChunkTask = queueChunkTask;
162155
_arrayPool = arrayPool;
163156
_jobParts = new List<JobPartInternal>();
164157
_enumerationComplete = false;
@@ -189,13 +182,11 @@ internal TransferJobInternal(
189182
StorageResourceItem sourceResource,
190183
StorageResourceItem destinationResource,
191184
DataTransferOptions transferOptions,
192-
QueueChunkTaskInternal queueChunkTask,
193185
TransferCheckpointer checkpointer,
194186
DataTransferErrorMode errorHandling,
195187
ArrayPool<byte> arrayPool,
196188
ClientDiagnostics clientDiagnostics)
197189
: this(dataTransfer,
198-
queueChunkTask,
199190
checkpointer,
200191
errorHandling,
201192
transferOptions.InitialTransferSize,
@@ -222,13 +213,11 @@ internal TransferJobInternal(
222213
StorageResourceContainer sourceResource,
223214
StorageResourceContainer destinationResource,
224215
DataTransferOptions transferOptions,
225-
QueueChunkTaskInternal queueChunkTask,
226216
TransferCheckpointer checkpointer,
227217
DataTransferErrorMode errorHandling,
228218
ArrayPool<byte> arrayPool,
229219
ClientDiagnostics clientDiagnostics)
230220
: this(dataTransfer,
231-
queueChunkTask,
232221
checkpointer,
233222
errorHandling,
234223
transferOptions.InitialTransferSize,
@@ -517,7 +506,7 @@ internal HashSet<Uri> GetJobPartSourceResourcePaths()
517506
return new HashSet<Uri>(_jobParts.Select(x => x._sourceResource.Uri));
518507
}
519508

520-
internal void QueueJobPart()
509+
internal void IncrementJobParts()
521510
{
522511
_progressTracker.IncrementQueuedFiles();
523512
}

0 commit comments

Comments
 (0)