Skip to content

Commit fa515eb

Browse files
[Storage][DataMovement] Refactor/add bounds to CommitChunkHandler (Azure#47417)
1 parent 40f2b24 commit fa515eb

File tree

8 files changed

+129
-422
lines changed

8 files changed

+129
-422
lines changed

sdk/storage/Azure.Storage.DataMovement/src/CommitChunkHandler.cs

Lines changed: 39 additions & 134 deletions
Original file line numberDiff line numberDiff line change
@@ -2,20 +2,14 @@
22
// Licensed under the MIT License.
33

44
using System;
5-
using Azure.Core;
65
using System.Threading.Tasks;
76
using System.Threading;
8-
using System.Threading.Channels;
9-
using Azure.Core.Pipeline;
107
using Azure.Storage.Common;
118

129
namespace Azure.Storage.DataMovement
1310
{
1411
internal class CommitChunkHandler : IDisposable
1512
{
16-
// Indicates whether the current thread is processing stage chunks.
17-
private static Task _processStageChunkEvents;
18-
1913
#region Delegate Definitions
2014
public delegate Task QueuePutBlockTaskInternal(long offset, long blockSize, long expectedLength, StorageResourceItemProperties properties);
2115
public delegate Task QueueCommitBlockTaskInternal(StorageResourceItemProperties sourceProperties);
@@ -36,29 +30,24 @@ public struct Behaviors
3630
public InvokeFailedEventHandlerInternal InvokeFailedHandler { get; set; }
3731
}
3832

39-
private event SyncAsyncEventHandler<StageChunkEventArgs> _commitBlockHandler;
40-
internal SyncAsyncEventHandler<StageChunkEventArgs> GetCommitBlockHandler() => _commitBlockHandler;
41-
4233
/// <summary>
43-
/// Create channel of <see cref="StageChunkEventArgs"/> to keep track of that are
34+
/// Create channel of <see cref="QueueStageChunkArgs"/> to keep track of that are
4435
/// waiting to update the bytesTransferred and other required operations.
4536
/// </summary>
46-
private readonly Channel<StageChunkEventArgs> _stageChunkChannel;
47-
private CancellationToken _cancellationToken;
37+
private readonly IProcessor<QueueStageChunkArgs> _stageChunkProcessor;
38+
private readonly CancellationToken _cancellationToken;
4839

4940
private long _bytesTransferred;
5041
private readonly long _expectedLength;
5142
private readonly long _blockSize;
5243
private readonly DataTransferOrder _transferOrder;
53-
private readonly ClientDiagnostics _clientDiagnostics;
5444
private readonly StorageResourceItemProperties _sourceProperties;
5545

5646
public CommitChunkHandler(
5747
long expectedLength,
5848
long blockSize,
5949
Behaviors behaviors,
6050
DataTransferOrder transferOrder,
61-
ClientDiagnostics clientDiagnostics,
6251
StorageResourceItemProperties sourceProperties,
6352
CancellationToken cancellationToken)
6453
{
@@ -67,7 +56,14 @@ public CommitChunkHandler(
6756
throw Errors.InvalidExpectedLength(expectedLength);
6857
}
6958
Argument.AssertNotNull(behaviors, nameof(behaviors));
70-
Argument.AssertNotNull(clientDiagnostics, nameof(clientDiagnostics));
59+
60+
_cancellationToken = cancellationToken;
61+
// Set bytes transferred to block size because we transferred the initial block
62+
_bytesTransferred = blockSize;
63+
_expectedLength = expectedLength;
64+
_blockSize = blockSize;
65+
_transferOrder = transferOrder;
66+
_sourceProperties = sourceProperties;
7167

7268
_queuePutBlockTask = behaviors.QueuePutBlockTask
7369
?? throw Errors.ArgumentNull(nameof(behaviors.QueuePutBlockTask));
@@ -78,152 +74,61 @@ public CommitChunkHandler(
7874
_invokeFailedEventHandler = behaviors.InvokeFailedHandler
7975
?? throw Errors.ArgumentNull(nameof(behaviors.InvokeFailedHandler));
8076

81-
// Set expected length to perform commit task
82-
_expectedLength = expectedLength;
83-
84-
// Create channel of finished Stage Chunk Args to update the bytesTransferred
85-
// and for ending tasks like commit block.
86-
// The size of the channel should never exceed 50k (limit on blocks in a block blob).
87-
// and that's in the worst case that we never read from the channel and had a maximum chunk blob.
88-
_stageChunkChannel = Channel.CreateUnbounded<StageChunkEventArgs>(
89-
new UnboundedChannelOptions()
90-
{
91-
// Single reader is required as we can only read and write to bytesTransferred value
92-
SingleReader = true,
93-
});
94-
_cancellationToken = cancellationToken;
95-
96-
// Set bytes transferred to block size because we transferred the initial block
97-
_bytesTransferred = blockSize;
98-
99-
_processStageChunkEvents = Task.Run(() => NotifyOfPendingStageChunkEvents());
100-
101-
_blockSize = blockSize;
102-
_transferOrder = transferOrder;
103-
if (_transferOrder == DataTransferOrder.Sequential)
104-
{
105-
_commitBlockHandler += SequentialBlockEvent;
106-
}
107-
_commitBlockHandler += ConcurrentBlockEvent;
108-
_clientDiagnostics = clientDiagnostics;
109-
_sourceProperties = sourceProperties;
77+
_stageChunkProcessor = ChannelProcessing.NewProcessor<QueueStageChunkArgs>(
78+
readers: 1,
79+
capacity: DataMovementConstants.Channels.StageChunkCapacity);
80+
_stageChunkProcessor.Process = ProcessCommitRange;
11081
}
11182

11283
public void Dispose()
11384
{
114-
// We no longer have to read from the channel. We are not expecting any more requests.
115-
_stageChunkChannel.Writer.TryComplete();
116-
DisposeHandlers();
85+
_stageChunkProcessor.TryComplete();
11786
}
11887

119-
private void DisposeHandlers()
88+
public async ValueTask QueueChunkAsync(QueueStageChunkArgs args)
12089
{
121-
if (_transferOrder == DataTransferOrder.Sequential)
122-
{
123-
_commitBlockHandler -= SequentialBlockEvent;
124-
}
125-
_commitBlockHandler -= ConcurrentBlockEvent;
90+
await _stageChunkProcessor.QueueAsync(args).ConfigureAwait(false);
12691
}
12792

128-
private async Task ConcurrentBlockEvent(StageChunkEventArgs args)
93+
private async Task ProcessCommitRange(QueueStageChunkArgs args, CancellationToken cancellationToken = default)
12994
{
13095
try
13196
{
132-
if (args.Success)
133-
{
134-
// Let's add to the channel, and our notifier will handle the chunks.
135-
_stageChunkChannel.Writer.TryWrite(args);
136-
}
137-
else
138-
{
139-
// Log an unexpected error since it came back unsuccessful
140-
throw args.Exception;
141-
}
142-
}
143-
catch (Exception ex)
144-
{
145-
// Log an unexpected error since it came back unsuccessful
146-
await _invokeFailedEventHandler(ex).ConfigureAwait(false);
147-
}
148-
}
97+
_bytesTransferred += args.BytesTransferred;
98+
_reportProgressInBytes(args.BytesTransferred);
14999

150-
private async Task NotifyOfPendingStageChunkEvents()
151-
{
152-
try
153-
{
154-
while (await _stageChunkChannel.Reader.WaitToReadAsync(_cancellationToken).ConfigureAwait(false))
100+
if (_bytesTransferred == _expectedLength)
155101
{
156-
// Read one event argument at a time.
157-
StageChunkEventArgs args = await _stageChunkChannel.Reader.ReadAsync(_cancellationToken).ConfigureAwait(false);
158-
159-
// don't need to use Interlocked.Add() as we are reading one event at a time
160-
// and _bytesTransferred is not being read/updated from any other thread
161-
_bytesTransferred += args.BytesTransferred;
162-
163-
// Report the incremental bytes transferred
164-
_reportProgressInBytes(args.BytesTransferred);
165-
166-
if (_bytesTransferred == _expectedLength)
167-
{
168-
// Add CommitBlockList task to the channel
169-
await _queueCommitBlockTask(_sourceProperties).ConfigureAwait(false);
170-
}
171-
else if (_bytesTransferred > _expectedLength)
172-
{
173-
throw Errors.MismatchLengthTransferred(
174-
expectedLength: _expectedLength,
175-
actualLength: _bytesTransferred);
176-
}
102+
// Add CommitBlockList task to the channel
103+
await _queueCommitBlockTask(_sourceProperties).ConfigureAwait(false);
177104
}
178-
}
179-
catch (Exception ex)
180-
{
181-
await _invokeFailedEventHandler(ex).ConfigureAwait(false);
182-
}
183-
}
184-
185-
private async Task SequentialBlockEvent(StageChunkEventArgs args)
186-
{
187-
try
188-
{
189-
if (args.Success)
105+
else if (_bytesTransferred < _expectedLength)
190106
{
191-
long oldOffset = args.Offset;
192-
long newOffset = oldOffset + _blockSize;
193-
if (newOffset < _expectedLength)
107+
// If this is a sequential transfer, we need to queue the next chunk
108+
if (_transferOrder == DataTransferOrder.Sequential)
194109
{
110+
long newOffset = args.Offset + _blockSize;
195111
long blockLength = (newOffset + _blockSize < _expectedLength) ?
196-
_blockSize :
197-
_expectedLength - newOffset;
198-
await _queuePutBlockTask(newOffset, blockLength, _expectedLength, _sourceProperties).ConfigureAwait(false);
112+
_blockSize :
113+
_expectedLength - newOffset;
114+
await _queuePutBlockTask(
115+
newOffset,
116+
blockLength,
117+
_expectedLength,
118+
_sourceProperties).ConfigureAwait(false);
199119
}
200120
}
201-
else
121+
else // _bytesTransferred > _expectedLength
202122
{
203-
// Log an unexpected error since it came back unsuccessful
204-
throw args.Exception;
123+
throw Errors.MismatchLengthTransferred(
124+
expectedLength: _expectedLength,
125+
actualLength: _bytesTransferred);
205126
}
206127
}
207128
catch (Exception ex)
208129
{
209-
// Log an unexpected error since it came back unsuccessful
210130
await _invokeFailedEventHandler(ex).ConfigureAwait(false);
211131
}
212132
}
213-
214-
public async Task InvokeEvent(StageChunkEventArgs args)
215-
{
216-
// There's a race condition where the event handler was disposed and an event
217-
// was already invoked, we should skip over this as the download chunk handler
218-
// was already disposed, and we should just ignore any more incoming events.
219-
if (_commitBlockHandler != null)
220-
{
221-
await _commitBlockHandler.RaiseAsync(
222-
args,
223-
nameof(CommitChunkHandler),
224-
nameof(_commitBlockHandler),
225-
_clientDiagnostics).ConfigureAwait(false);
226-
}
227-
}
228133
}
229134
}

sdk/storage/Azure.Storage.DataMovement/src/JobPartInternal.cs

Lines changed: 1 addition & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -504,26 +504,7 @@ internal static long ParseRangeTotalLength(string range)
504504
return long.Parse(range.Substring(lengthSeparator + 1), CultureInfo.InvariantCulture);
505505
}
506506

507-
internal static List<(long Offset, long Size)> GetRangeList(long blockSize, long fileLength)
508-
{
509-
// The list tracking blocks IDs we're going to commit
510-
List<(long Offset, long Size)> partitions = new List<(long, long)>();
511-
512-
// Partition the stream into individual blocks
513-
foreach ((long Offset, long Length) block in GetPartitionIndexes(fileLength, blockSize))
514-
{
515-
/* We need to do this first! Length is calculated on the fly based on stream buffer
516-
* contents; We need to record the partition data first before consuming the stream
517-
* asynchronously. */
518-
partitions.Add(block);
519-
}
520-
return partitions;
521-
}
522-
523-
/// <summary>
524-
/// Partition a stream into a series of blocks buffered as needed by an array pool.
525-
/// </summary>
526-
private static IEnumerable<(long Offset, long Length)> GetPartitionIndexes(
507+
protected static IEnumerable<(long Offset, long Length)> GetRanges(
527508
long streamLength, // StreamLength needed to divide before hand
528509
long blockSize)
529510
{
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
namespace Azure.Storage.DataMovement
5+
{
6+
/// <summary>
7+
/// This class is interchangable for
8+
/// Stage Block (Put Block), Stage Block From Uri (Put Block From URL),
9+
/// Append Block (Append Block), Append Block From Uri (Append Block From URL),
10+
/// Upload Page (Put Page), Upload Pages From Uri (Put Pages From URL)
11+
///
12+
/// Basically any transfer operation that must end in a Commit Block List
13+
/// will end up using this internal event argument to track the success
14+
/// and the bytes transferred to ensure the correct amount of bytes are tranferred.
15+
/// </summary>
16+
internal class QueueStageChunkArgs
17+
{
18+
public long Offset { get; }
19+
public long BytesTransferred { get; }
20+
21+
public QueueStageChunkArgs(long offset, long bytesTransferred)
22+
{
23+
Offset = offset;
24+
BytesTransferred = bytesTransferred;
25+
}
26+
}
27+
}

0 commit comments

Comments
 (0)