Skip to content

Commit a00de4b

Browse files
authored
[Storage] [DataMovement] Pause/Resume Channel Task Disposal Wait; Pause during Chunk creating (Azure#47480)
* WIP * Fixed flakeyness in failure scenario * Fixed commit chunk handler to dispose channel task * Added fix where pause/stopping was not respected during chunk creation; Cleanup * Cleanup * Removing unnecessary fix for queued/enumeration pause/resume * Cleanup * Removed unnecessary check for inprogress before starting part * Update to tests to consider for queued complete transfer chunk * Cleanup * WIP * Remove semaphore on check update cancellation status * Remove semaphore for dispose handlers * Cleanup * More Cleanup * PR Comments * Cleanup * Turn completion source back to internal * Remove ChunkHandlerStatus
1 parent f02368b commit a00de4b

16 files changed

+289
-141
lines changed

sdk/storage/Azure.Storage.DataMovement/src/ChannelProcessing.cs

Lines changed: 44 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ namespace Azure.Storage.DataMovement;
1212

1313
internal delegate Task ProcessAsync<T>(T item, CancellationToken cancellationToken);
1414

15-
internal interface IProcessor<TItem> : IDisposable
15+
internal interface IProcessor<TItem> : IAsyncDisposable
1616
{
1717
ValueTask QueueAsync(TItem item, CancellationToken cancellationToken = default);
1818
bool TryComplete();
@@ -46,12 +46,13 @@ public static IProcessor<T> NewProcessor<T>(int readers, int? capacity = null)
4646
: new ParallelChannelProcessor<T>(channel, readers);
4747
}
4848

49-
private abstract class ChannelProcessor<TItem> : IProcessor<TItem>, IDisposable
49+
private abstract class ChannelProcessor<TItem> : IProcessor<TItem>, IAsyncDisposable
5050
{
5151
/// <summary>
5252
/// Async channel reader task. Loops for lifetime of object.
5353
/// </summary>
5454
private Task _processorTask;
55+
internal TaskCompletionSource<bool> _processorTaskCompletionSource;
5556

5657
/// <summary>
5758
/// Channel of items to process.
@@ -83,6 +84,9 @@ protected ChannelProcessor(Channel<TItem, TItem> channel)
8384
Argument.AssertNotNull(channel, nameof(channel));
8485
_channel = channel;
8586
_cancellationTokenSource = new();
87+
_processorTaskCompletionSource = new TaskCompletionSource<bool>(
88+
false,
89+
TaskCreationOptions.RunContinuationsAsynchronously);
8690
}
8791

8892
public async ValueTask QueueAsync(TItem item, CancellationToken cancellationToken = default)
@@ -94,12 +98,14 @@ public async ValueTask QueueAsync(TItem item, CancellationToken cancellationToke
9498

9599
protected abstract ValueTask NotifyOfPendingItemProcessing();
96100

97-
public void Dispose()
101+
public async ValueTask DisposeAsync()
98102
{
103+
_channel.Writer.TryComplete();
99104
if (!_cancellationTokenSource.IsCancellationRequested)
100105
{
101106
_cancellationTokenSource.Cancel();
102107
}
108+
await _processorTaskCompletionSource.Task.ConfigureAwait(false);
103109
GC.SuppressFinalize(this);
104110
}
105111
}
@@ -112,11 +118,21 @@ public SequentialChannelProcessor(Channel<TItem, TItem> channel)
112118

113119
protected override async ValueTask NotifyOfPendingItemProcessing()
114120
{
115-
// Process all available items in the queue.
116-
while (await _channel.Reader.WaitToReadAsync(_cancellationToken).ConfigureAwait(false))
121+
try
117122
{
118-
TItem item = await _channel.Reader.ReadAsync(_cancellationToken).ConfigureAwait(false);
119-
await Process(item, _cancellationToken).ConfigureAwait(false);
123+
// Process all available items in the queue.
124+
while (await _channel.Reader.WaitToReadAsync(_cancellationToken).ConfigureAwait(false))
125+
{
126+
TItem item = await _channel.Reader.ReadAsync(_cancellationToken).ConfigureAwait(false);
127+
await Process(item, _cancellationToken).ConfigureAwait(false);
128+
}
129+
}
130+
finally
131+
{
132+
// Since the channel has it's own dedicated CancellationTokenSource (only called at Dispose)
133+
// we don't need a catch block to catch the exception since we know the cancellation either comes
134+
// from successful completion or another failure that has been already invoked.
135+
_processorTaskCompletionSource.TrySetResult(true);
120136
}
121137
}
122138
}
@@ -137,22 +153,32 @@ public ParallelChannelProcessor(
137153
protected override async ValueTask NotifyOfPendingItemProcessing()
138154
{
139155
List<Task> chunkRunners = new List<Task>(_maxConcurrentProcessing);
140-
while (await _channel.Reader.WaitToReadAsync(_cancellationToken).ConfigureAwait(false))
156+
try
141157
{
142-
TItem item = await _channel.Reader.ReadAsync(_cancellationToken).ConfigureAwait(false);
143-
if (chunkRunners.Count >= _maxConcurrentProcessing)
158+
while (await _channel.Reader.WaitToReadAsync(_cancellationToken).ConfigureAwait(false))
144159
{
145-
// Clear any completed blocks from the task list
146-
int removedRunners = chunkRunners.RemoveAll(x => x.IsCompleted || x.IsCanceled || x.IsFaulted);
147-
// If no runners have finished..
148-
if (removedRunners == 0)
160+
TItem item = await _channel.Reader.ReadAsync(_cancellationToken).ConfigureAwait(false);
161+
if (chunkRunners.Count >= _maxConcurrentProcessing)
149162
{
150-
// Wait for at least one runner to finish
151-
await Task.WhenAny(chunkRunners).ConfigureAwait(false);
152-
chunkRunners.RemoveAll(x => x.IsCompleted || x.IsCanceled || x.IsFaulted);
163+
// Clear any completed blocks from the task list
164+
int removedRunners = chunkRunners.RemoveAll(x => x.IsCompleted || x.IsCanceled || x.IsFaulted);
165+
// If no runners have finished..
166+
if (removedRunners == 0)
167+
{
168+
// Wait for at least one runner to finish
169+
await Task.WhenAny(chunkRunners).ConfigureAwait(false);
170+
chunkRunners.RemoveAll(x => x.IsCompleted || x.IsCanceled || x.IsFaulted);
171+
}
153172
}
173+
chunkRunners.Add(Task.Run(async () => await Process(item, _cancellationToken).ConfigureAwait(false)));
154174
}
155-
chunkRunners.Add(Task.Run(async () => await Process(item, _cancellationToken).ConfigureAwait(false)));
175+
}
176+
finally
177+
{
178+
// Since the channel has it's own dedicated CancellationTokenSource (only called at Dispose)
179+
// we don't need a catch block to catch the exception since we know the cancellation either comes
180+
// from successful completion or another failure that has been already invoked.
181+
_processorTaskCompletionSource.TrySetResult(true);
156182
}
157183
}
158184
}

sdk/storage/Azure.Storage.DataMovement/src/CommitChunkHandler.cs

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
namespace Azure.Storage.DataMovement
1010
{
11-
internal class CommitChunkHandler : IDisposable
11+
internal class CommitChunkHandler : IAsyncDisposable
1212
{
1313
#region Delegate Definitions
1414
public delegate Task QueuePutBlockTaskInternal(long offset, long blockSize, long expectedLength, StorageResourceItemProperties properties);
@@ -43,6 +43,8 @@ public struct Behaviors
4343
private readonly DataTransferOrder _transferOrder;
4444
private readonly StorageResourceItemProperties _sourceProperties;
4545

46+
internal bool _isChunkHandlerRunning;
47+
4648
public CommitChunkHandler(
4749
long expectedLength,
4850
long blockSize,
@@ -78,11 +80,13 @@ public CommitChunkHandler(
7880
readers: 1,
7981
capacity: DataMovementConstants.Channels.StageChunkCapacity);
8082
_stageChunkProcessor.Process = ProcessCommitRange;
83+
_isChunkHandlerRunning = true;
8184
}
8285

83-
public void Dispose()
86+
public async ValueTask DisposeAsync()
8487
{
85-
_stageChunkProcessor.TryComplete();
88+
_isChunkHandlerRunning = false;
89+
await _stageChunkProcessor.DisposeAsync().ConfigureAwait(false);
8690
}
8791

8892
public async ValueTask QueueChunkAsync(QueueStageChunkArgs args)
@@ -127,7 +131,14 @@ await _queuePutBlockTask(
127131
}
128132
catch (Exception ex)
129133
{
130-
await _invokeFailedEventHandler(ex).ConfigureAwait(false);
134+
// If we are disposing, we don't want to invoke the failed event handler
135+
// because the error is likely due to the job part being disposed and was
136+
// invoked by another InvokeFailedEventHandler call.
137+
if (_isChunkHandlerRunning)
138+
{
139+
// This will trigger the job part to call Dispose on this object
140+
_ = Task.Run(() => _invokeFailedEventHandler(ex));
141+
}
131142
}
132143
}
133144
}

sdk/storage/Azure.Storage.DataMovement/src/DataMovementEventSource.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,17 +37,17 @@ public void TransferQueued(string transferId, StorageResource source, StorageRes
3737
}
3838

3939
[Event(TransferCompletedEvent, Level = EventLevel.Informational, Message = "Transfer [{0}] Transfer completed: HasFailed={1}, HasKsipped={2}")]
40-
public void TrasnferCompleted(string trasnferId, bool hasFailed, bool hasSkipped)
40+
public void TransferCompleted(string transferId, bool hasFailed, bool hasSkipped)
4141
{
42-
WriteEvent(TransferCompletedEvent, trasnferId, hasFailed, hasSkipped);
42+
WriteEvent(TransferCompletedEvent, transferId, hasFailed, hasSkipped);
4343
}
4444

4545
[NonEvent]
46-
public void TrasnferCompleted(string trasnferId, DataTransferStatus status)
46+
public void TransferCompleted(string transferId, DataTransferStatus status)
4747
{
4848
if (IsEnabled(EventLevel.Informational, EventKeywords.None))
4949
{
50-
TrasnferCompleted(trasnferId, status.HasFailedItems, status.HasSkippedItems);
50+
TransferCompleted(transferId, status.HasFailedItems, status.HasSkippedItems);
5151
}
5252
}
5353

sdk/storage/Azure.Storage.DataMovement/src/DataTransferInternalState.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ public bool SetTransferState(DataTransferState state)
9494
if (DataTransferState.Completed == _status.State ||
9595
DataTransferState.Paused == _status.State)
9696
{
97-
DataMovementEventSource.Singleton.TrasnferCompleted(Id, _status);
97+
DataMovementEventSource.Singleton.TransferCompleted(Id, _status);
9898
// If the _completionSource has been cancelled or the exception
9999
// has been set, we don't need to check if TrySetResult returns false
100100
// because it's acceptable to cancel or have an error occur before then.

sdk/storage/Azure.Storage.DataMovement/src/DownloadChunkHandler.cs

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
namespace Azure.Storage.DataMovement
1111
{
12-
internal class DownloadChunkHandler : IDisposable
12+
internal class DownloadChunkHandler : IAsyncDisposable
1313
{
1414
#region Delegate Definitions
1515
public delegate Task CopyToDestinationFileInternal(long offset, long length, Stream stream, long expectedLength, bool initial);
@@ -41,6 +41,8 @@ public struct Behaviors
4141
private long _bytesTransferred;
4242
private readonly long _expectedLength;
4343

44+
internal bool _isChunkHandlerRunning;
45+
4446
/// <summary>
4547
/// The controller for downloading the chunks to each file.
4648
/// </summary>
@@ -89,11 +91,13 @@ public DownloadChunkHandler(
8991
readers: 1,
9092
capacity: DataMovementConstants.Channels.DownloadChunkCapacity);
9193
_downloadRangeProcessor.Process = ProcessDownloadRange;
94+
_isChunkHandlerRunning = true;
9295
}
9396

94-
public void Dispose()
97+
public async ValueTask DisposeAsync()
9598
{
96-
_downloadRangeProcessor.TryComplete();
99+
_isChunkHandlerRunning = false;
100+
await _downloadRangeProcessor.DisposeAsync().ConfigureAwait(false);
97101
}
98102

99103
public async ValueTask QueueChunkAsync(QueueDownloadChunkArgs args)
@@ -125,8 +129,14 @@ await _copyToDestinationFile(
125129
}
126130
catch (Exception ex)
127131
{
128-
// This will trigger the job part to call Dispose on this object
129-
await _invokeFailedEventHandler(ex).ConfigureAwait(false);
132+
// If we are disposing, we don't want to invoke the failed event handler
133+
// because the error is likely due to the job part being disposed and was
134+
// invoked by another InvokeFailedEventHandler call.
135+
if (_isChunkHandlerRunning)
136+
{
137+
// This will trigger the job part to call Dispose on this object
138+
_ = Task.Run(() => _invokeFailedEventHandler(ex));
139+
}
130140
}
131141
}
132142

sdk/storage/Azure.Storage.DataMovement/src/JobPartInternal.cs

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,6 @@
44
using System.Buffers;
55
using System.Collections.Generic;
66
using System.Globalization;
7-
using System.IO;
8-
using System.Linq;
97
using System.Threading;
108
using System.Threading.Channels;
119
using System.Threading.Tasks;
@@ -240,6 +238,11 @@ await QueueChunk(
240238
/// <returns>The task that's queueing up the chunks</returns>
241239
public abstract Task ProcessPartToChunkAsync();
242240

241+
/// <summary>
242+
/// Diposes of chunk handler.
243+
/// </summary>
244+
public abstract Task DisposeHandlersAsync();
245+
243246
/// <summary>
244247
/// Triggers the cancellation for the Job Part.
245248
///
@@ -372,6 +375,7 @@ await PartTransferStatusEventHandler.RaiseAsync(
372375
.ConfigureAwait(false);
373376
}
374377
//TODO: figure out why we set the Completed state here and not just wait for all the chunks to finish
378+
await DisposeHandlersAsync().ConfigureAwait(false);
375379
await OnTransferStateChangedAsync(DataTransferState.Completed).ConfigureAwait(false);
376380
}
377381

@@ -432,7 +436,11 @@ await PartTransferStatusEventHandler.RaiseAsync(
432436
try
433437
{
434438
// Trigger job cancellation if the failed handler is enabled
435-
await TriggerCancellationAsync().ConfigureAwait(false);
439+
if (JobPartStatus.State != DataTransferState.Pausing &&
440+
JobPartStatus.State != DataTransferState.Stopping)
441+
{
442+
await TriggerCancellationAsync().ConfigureAwait(false);
443+
}
436444
await CheckAndUpdateCancellationStateAsync().ConfigureAwait(false);
437445
}
438446
catch (Exception cancellationException)
@@ -555,6 +563,7 @@ internal async Task CheckAndUpdateCancellationStateAsync()
555563
DataTransferState newState = JobPartStatus.State == DataTransferState.Pausing ?
556564
DataTransferState.Paused :
557565
DataTransferState.Completed;
566+
await DisposeHandlersAsync().ConfigureAwait(false);
558567
await OnTransferStateChangedAsync(newState).ConfigureAwait(false);
559568
}
560569
}
@@ -587,5 +596,21 @@ private void SetFailureType(string exceptionMessage)
587596
}
588597
}
589598
}
599+
600+
internal async Task<bool> CheckTransferStateBeforeRunning()
601+
{
602+
// If the main transfer has been stopped, do not process this part.
603+
if (_dataTransfer.TransferStatus.State == DataTransferState.Pausing)
604+
{
605+
await OnTransferStateChangedAsync(DataTransferState.Paused).ConfigureAwait(false);
606+
return false;
607+
}
608+
else if (_dataTransfer.TransferStatus.State == DataTransferState.Stopping)
609+
{
610+
await OnTransferStateChangedAsync(DataTransferState.Completed).ConfigureAwait(false);
611+
return false;
612+
}
613+
return true;
614+
}
590615
}
591616
}

0 commit comments

Comments
 (0)