Skip to content

Commit 75cabf7

Browse files
[Storage][DataMovement] Fix progress reports appearing out of order (Azure#47539)
1 parent 4547424 commit 75cabf7

16 files changed

+143
-112
lines changed

sdk/storage/Azure.Storage.DataMovement.Blobs/tests/ProgressHandlerTests.cs

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
using System.Linq;
99
using System.Threading;
1010
using System.Threading.Tasks;
11+
using Azure.Core.TestFramework;
1112
using Azure.Storage.DataMovement.Tests;
1213
using BaseBlobs::Azure.Storage.Blobs;
1314
using DMBlobs::Azure.Storage.DataMovement.Blobs;
@@ -298,7 +299,7 @@ await TransferAndAssertProgress(
298299
waitTime: 30);
299300
}
300301

301-
[Ignore("https://github.com/Azure/azure-sdk-for-net/issues/35558")]
302+
[LiveOnly]
302303
[Test]
303304
[TestCase(0)]
304305
[TestCase(150)]
@@ -310,22 +311,27 @@ public async Task ProgressHandler_PauseResume(int delayInMs)
310311

311312
await PopulateTestContainer(source.Container);
312313

313-
StorageResourceContainer sourceResource =
314-
new BlobStorageResourceContainer(source.Container);
315-
StorageResourceContainer destinationResource =
316-
new LocalDirectoryStorageResourceContainer(destination.DirectoryPath);
314+
BlobsStorageResourceProvider blobProvider = new(TestEnvironment.Credential);
315+
LocalFilesStorageResourceProvider localProvider = new();
317316

318-
TransferManager transferManager = new TransferManager();
317+
TransferManagerOptions transferManagerOptions = new()
318+
{
319+
ResumeProviders = [blobProvider, localProvider]
320+
};
321+
TransferManager transferManager = new(transferManagerOptions);
319322

320-
TestProgressHandler progressHandler = new TestProgressHandler();
321-
DataTransferOptions transferOptions = new DataTransferOptions()
323+
TestProgressHandler progressHandler = new();
324+
DataTransferOptions transferOptions = new()
322325
{
323326
ProgressHandlerOptions = new ProgressHandlerOptions(progressHandler, true)
324327
};
325-
TestEventsRaised testEventsRaised = new TestEventsRaised(transferOptions);
328+
TestEventsRaised testEventsRaised = new(transferOptions);
326329

327330
// Act - Start transfer
328-
DataTransfer transfer = await transferManager.StartTransferAsync(sourceResource, destinationResource, transferOptions);
331+
DataTransfer transfer = await transferManager.StartTransferAsync(
332+
blobProvider.FromContainer(source.Container.Uri),
333+
localProvider.FromDirectory(destination.DirectoryPath),
334+
transferOptions);
329335

330336
// TODO: This can likely be replaced with something better once mocking is in place
331337
// Wait for the transfer to start happening
@@ -334,7 +340,7 @@ public async Task ProgressHandler_PauseResume(int delayInMs)
334340
// Pause transfer
335341
CancellationTokenSource tokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(30));
336342
await transferManager.PauseTransferIfRunningAsync(transfer.Id, tokenSource.Token);
337-
Assert.AreEqual(DataTransferState.Paused, transfer.TransferStatus);
343+
Assert.AreEqual(DataTransferState.Paused, transfer.TransferStatus.State);
338344

339345
// Record the current number of progress updates to use during assertions
340346
int pause = progressHandler.Updates.Count;
@@ -348,7 +354,7 @@ public async Task ProgressHandler_PauseResume(int delayInMs)
348354
await resumeTransfer.WaitForCompletionAsync(tokenSource.Token);
349355

350356
// Assert
351-
Assert.AreEqual(DataTransferState.Completed, resumeTransfer.TransferStatus);
357+
Assert.AreEqual(DataTransferState.Completed, resumeTransfer.TransferStatus.State);
352358
ProgressHandlerAsserts.AssertFileProgress(progressHandler.Updates, 5, pauseIndexes: pause);
353359
ProgressHandlerAsserts.AssertBytesTransferred(progressHandler.Updates, _expectedBytesTransferred);
354360
}

sdk/storage/Azure.Storage.DataMovement/api/Azure.Storage.DataMovement.net6.0.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public enum DataTransferOrder
5151
}
5252
public partial class DataTransferProgress
5353
{
54-
protected internal DataTransferProgress() { }
54+
internal DataTransferProgress() { }
5555
public long? BytesTransferred { get { throw null; } }
5656
public long CompletedCount { get { throw null; } }
5757
public long FailedCount { get { throw null; } }

sdk/storage/Azure.Storage.DataMovement/api/Azure.Storage.DataMovement.net8.0.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public enum DataTransferOrder
5151
}
5252
public partial class DataTransferProgress
5353
{
54-
protected internal DataTransferProgress() { }
54+
internal DataTransferProgress() { }
5555
public long? BytesTransferred { get { throw null; } }
5656
public long CompletedCount { get { throw null; } }
5757
public long FailedCount { get { throw null; } }

sdk/storage/Azure.Storage.DataMovement/api/Azure.Storage.DataMovement.netstandard2.0.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public enum DataTransferOrder
5151
}
5252
public partial class DataTransferProgress
5353
{
54-
protected internal DataTransferProgress() { }
54+
internal DataTransferProgress() { }
5555
public long? BytesTransferred { get { throw null; } }
5656
public long CompletedCount { get { throw null; } }
5757
public long FailedCount { get { throw null; } }

sdk/storage/Azure.Storage.DataMovement/src/CommitChunkHandler.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ internal class CommitChunkHandler : IAsyncDisposable
1313
#region Delegate Definitions
1414
public delegate Task QueuePutBlockTaskInternal(long offset, long blockSize, long expectedLength, StorageResourceItemProperties properties);
1515
public delegate Task QueueCommitBlockTaskInternal(StorageResourceItemProperties sourceProperties);
16-
public delegate void ReportProgressInBytes(long bytesWritten);
16+
public delegate ValueTask ReportProgressInBytes(long bytesWritten);
1717
public delegate Task InvokeFailedEventHandlerInternal(Exception ex);
1818
#endregion Delegate Definitions
1919

@@ -99,7 +99,7 @@ private async Task ProcessCommitRange(QueueStageChunkArgs args, CancellationToke
9999
try
100100
{
101101
_bytesTransferred += args.BytesTransferred;
102-
_reportProgressInBytes(args.BytesTransferred);
102+
await _reportProgressInBytes(args.BytesTransferred).ConfigureAwait(false);
103103

104104
if (_bytesTransferred == _expectedLength)
105105
{

sdk/storage/Azure.Storage.DataMovement/src/DataTransferProgress.cs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,7 @@ namespace Azure.Storage.DataMovement
88
/// </summary>
99
public class DataTransferProgress
1010
{
11-
/// <summary>
12-
/// Constructor for mocking.
13-
/// </summary>
14-
protected internal DataTransferProgress() { }
11+
internal DataTransferProgress() { }
1512

1613
/// <summary>
1714
/// Number of files that were transferred successfully.

sdk/storage/Azure.Storage.DataMovement/src/DownloadChunkHandler.cs

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ internal class DownloadChunkHandler : IAsyncDisposable
1313
{
1414
#region Delegate Definitions
1515
public delegate Task CopyToDestinationFileInternal(long offset, long length, Stream stream, long expectedLength, bool initial);
16-
public delegate void ReportProgressInBytes(long bytesWritten);
16+
public delegate ValueTask ReportProgressInBytes(long bytesWritten);
1717
public delegate Task QueueCompleteFileDownloadInternal();
1818
public delegate Task InvokeFailedEventHandlerInternal(Exception ex);
1919
#endregion Delegate Definitions
@@ -119,7 +119,8 @@ await _copyToDestinationFile(
119119
_expectedLength,
120120
initial: _bytesTransferred == 0).ConfigureAwait(false);
121121
}
122-
UpdateBytesAndRange(args.Length);
122+
_bytesTransferred += args.Length;
123+
await _reportProgressInBytes(args.Length).ConfigureAwait(false);
123124

124125
// Check if we finished downloading the blob
125126
if (_bytesTransferred == _expectedLength)
@@ -139,11 +140,5 @@ await _copyToDestinationFile(
139140
}
140141
}
141142
}
142-
143-
private void UpdateBytesAndRange(long bytesDownloaded)
144-
{
145-
_bytesTransferred += bytesDownloaded;
146-
_reportProgressInBytes(bytesDownloaded);
147-
}
148143
}
149144
}

sdk/storage/Azure.Storage.DataMovement/src/JobPartInternal.cs

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -284,11 +284,11 @@ internal async Task OnTransferStateChangedAsync(DataTransferState transferState)
284284
// Progress tracking, do before invoking the event below
285285
if (transferState == DataTransferState.InProgress)
286286
{
287-
_progressTracker.IncrementInProgressFiles();
287+
await _progressTracker.IncrementInProgressFilesAsync(_cancellationToken).ConfigureAwait(false);
288288
}
289289
else if (JobPartStatus.HasCompletedSuccessfully)
290290
{
291-
_progressTracker.IncrementCompletedFiles();
291+
await _progressTracker.IncrementCompletedFilesAsync(_cancellationToken).ConfigureAwait(false);
292292
await InvokeSingleCompletedArgAsync().ConfigureAwait(false);
293293
}
294294

@@ -309,13 +309,9 @@ await PartTransferStatusEventHandler.RaiseAsync(
309309
}
310310
}
311311

312-
/// <summary>
313-
/// To change all transfer statues at the same time
314-
/// </summary>
315-
/// <param name="bytesTransferred"></param>
316-
internal void ReportBytesWritten(long bytesTransferred)
312+
protected async ValueTask ReportBytesWrittenAsync(long bytesTransferred)
317313
{
318-
_progressTracker.IncrementBytesTransferred(bytesTransferred);
314+
await _progressTracker.IncrementBytesTransferred(bytesTransferred, _cancellationToken).ConfigureAwait(false);
319315
}
320316

321317
public async virtual Task InvokeSingleCompletedArgAsync()
@@ -356,7 +352,7 @@ await TransferSkippedEventHandler.RaiseAsync(
356352
ClientDiagnostics)
357353
.ConfigureAwait(false);
358354
}
359-
_progressTracker.IncrementSkippedFiles();
355+
await _progressTracker.IncrementSkippedFilesAsync(_cancellationToken).ConfigureAwait(false);
360356

361357
// Update the JobPartStatus. If was already updated (e.g. there was a failed item before)
362358
// then don't raise the PartTransferStatusEventHandler
@@ -413,7 +409,7 @@ await TransferFailedEventHandler.RaiseAsync(
413409
ClientDiagnostics)
414410
.ConfigureAwait(false);
415411
}
416-
_progressTracker.IncrementFailedFiles();
412+
await _progressTracker.IncrementFailedFilesAsync(_cancellationToken).ConfigureAwait(false);
417413

418414
// Update the JobPartStatus. If was already updated (e.g. there was a failed item before)
419415
// then don't raise the PartTransferStatusEventHandler

sdk/storage/Azure.Storage.DataMovement/src/ServiceToServiceJobPart.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,7 @@ await _destinationResource.CopyFromUriAsync(
275275
options: options,
276276
cancellationToken: _cancellationToken).ConfigureAwait(false);
277277

278-
ReportBytesWritten(completeLength);
278+
await ReportBytesWrittenAsync(completeLength).ConfigureAwait(false);
279279
await OnTransferStateChangedAsync(DataTransferState.Completed).ConfigureAwait(false);
280280
}
281281
catch (RequestFailedException exception)
@@ -316,7 +316,7 @@ await _destinationResource.CopyBlockFromUriAsync(
316316
cancellationToken: _cancellationToken).ConfigureAwait(false);
317317

318318
// Report first chunk written to progress tracker
319-
ReportBytesWritten(blockSize);
319+
await ReportBytesWrittenAsync(blockSize).ConfigureAwait(false);
320320

321321
if (blockSize == length)
322322
{
@@ -346,7 +346,7 @@ internal static CommitChunkHandler.Behaviors GetBlockListCommitHandlerBehaviors(
346346
{
347347
QueuePutBlockTask = jobPart.QueueStageBlockRequest,
348348
QueueCommitBlockTask = jobPart.QueueCompleteTransferAsync,
349-
ReportProgressInBytes = jobPart.ReportBytesWritten,
349+
ReportProgressInBytes = jobPart.ReportBytesWrittenAsync,
350350
InvokeFailedHandler = jobPart.InvokeFailedArgAsync,
351351
};
352352
}

sdk/storage/Azure.Storage.DataMovement/src/StreamToUriJobPart.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -328,7 +328,7 @@ await _destinationResource.CopyFromStreamAsync(
328328
}
329329

330330
// Report bytes written before completion
331-
ReportBytesWritten(blockSize);
331+
await ReportBytesWrittenAsync(blockSize).ConfigureAwait(false);
332332

333333
// Set completion status to completed
334334
await OnTransferStateChangedAsync(DataTransferState.Completed).ConfigureAwait(false);
@@ -360,7 +360,7 @@ await _destinationResource.CopyFromStreamAsync(
360360
cancellationToken: _cancellationToken).ConfigureAwait(false);
361361
}
362362

363-
ReportBytesWritten(blockSize);
363+
await ReportBytesWrittenAsync(blockSize).ConfigureAwait(false);
364364
}
365365
}
366366

@@ -372,7 +372,7 @@ internal static CommitChunkHandler.Behaviors GetBlockListCommitHandlerBehaviors(
372372
{
373373
QueuePutBlockTask = jobPart.QueueStageBlockRequest,
374374
QueueCommitBlockTask = jobPart.QueueCompleteTransferAsync,
375-
ReportProgressInBytes = jobPart.ReportBytesWritten,
375+
ReportProgressInBytes = jobPart.ReportBytesWrittenAsync,
376376
InvokeFailedHandler = jobPart.InvokeFailedArgAsync,
377377
};
378378
}

0 commit comments

Comments
 (0)