Skip to content

Commit c41b9eb

Browse files
authored
[Storage] [DataMovement] Mended gaps in usage of CancellationTokens (Azure#46712)
* Initial commit * Some leftover merge conflict resolution * Exported API * Add link * addressed failed tests * minor spacing * changed method signature * export api * Removed params in GetTransfersAsync() * addressed feedback * removed * removed unused import * removed unneccessary cancellatyionToken link * resolve merge conflicts
1 parent 5ec91d3 commit c41b9eb

File tree

10 files changed

+64
-46
lines changed

10 files changed

+64
-46
lines changed

sdk/storage/Azure.Storage.DataMovement/api/Azure.Storage.DataMovement.net6.0.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -251,8 +251,8 @@ public partial class TransferManager : System.IAsyncDisposable
251251
{
252252
protected TransferManager() { }
253253
public TransferManager(Azure.Storage.DataMovement.TransferManagerOptions options = null) { }
254-
public virtual System.Collections.Generic.IAsyncEnumerable<Azure.Storage.DataMovement.DataTransferProperties> GetResumableTransfersAsync() { throw null; }
255-
public virtual System.Collections.Generic.IAsyncEnumerable<Azure.Storage.DataMovement.DataTransfer> GetTransfersAsync(params Azure.Storage.DataMovement.DataTransferStatus[] filterByStatus) { throw null; }
254+
public virtual System.Collections.Generic.IAsyncEnumerable<Azure.Storage.DataMovement.DataTransferProperties> GetResumableTransfersAsync([System.Runtime.CompilerServices.EnumeratorCancellationAttribute] System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
255+
public virtual System.Collections.Generic.IAsyncEnumerable<Azure.Storage.DataMovement.DataTransfer> GetTransfersAsync(System.Collections.Generic.ICollection<Azure.Storage.DataMovement.DataTransferStatus> filterByStatus = null, [System.Runtime.CompilerServices.EnumeratorCancellationAttribute] System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
256256
public virtual System.Threading.Tasks.Task PauseTransferIfRunningAsync(string transferId, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
257257
public virtual System.Threading.Tasks.Task<System.Collections.Generic.List<Azure.Storage.DataMovement.DataTransfer>> ResumeAllTransfersAsync(Azure.Storage.DataMovement.DataTransferOptions transferOptions = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
258258
public virtual System.Threading.Tasks.Task<Azure.Storage.DataMovement.DataTransfer> ResumeTransferAsync(string transferId, Azure.Storage.DataMovement.DataTransferOptions transferOptions = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }

sdk/storage/Azure.Storage.DataMovement/api/Azure.Storage.DataMovement.netstandard2.0.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -251,8 +251,8 @@ public partial class TransferManager : System.IAsyncDisposable
251251
{
252252
protected TransferManager() { }
253253
public TransferManager(Azure.Storage.DataMovement.TransferManagerOptions options = null) { }
254-
public virtual System.Collections.Generic.IAsyncEnumerable<Azure.Storage.DataMovement.DataTransferProperties> GetResumableTransfersAsync() { throw null; }
255-
public virtual System.Collections.Generic.IAsyncEnumerable<Azure.Storage.DataMovement.DataTransfer> GetTransfersAsync(params Azure.Storage.DataMovement.DataTransferStatus[] filterByStatus) { throw null; }
254+
public virtual System.Collections.Generic.IAsyncEnumerable<Azure.Storage.DataMovement.DataTransferProperties> GetResumableTransfersAsync([System.Runtime.CompilerServices.EnumeratorCancellationAttribute] System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
255+
public virtual System.Collections.Generic.IAsyncEnumerable<Azure.Storage.DataMovement.DataTransfer> GetTransfersAsync(System.Collections.Generic.ICollection<Azure.Storage.DataMovement.DataTransferStatus> filterByStatus = null, [System.Runtime.CompilerServices.EnumeratorCancellationAttribute] System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
256256
public virtual System.Threading.Tasks.Task PauseTransferIfRunningAsync(string transferId, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
257257
public virtual System.Threading.Tasks.Task<System.Collections.Generic.List<Azure.Storage.DataMovement.DataTransfer>> ResumeAllTransfersAsync(Azure.Storage.DataMovement.DataTransferOptions transferOptions = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
258258
public virtual System.Threading.Tasks.Task<Azure.Storage.DataMovement.DataTransfer> ResumeTransferAsync(string transferId, Azure.Storage.DataMovement.DataTransferOptions transferOptions = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }

sdk/storage/Azure.Storage.DataMovement/src/JobPartInternal.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,7 @@ await QueueChunk(
221221
{
222222
try
223223
{
224-
await Task.Run(chunkTask).ConfigureAwait(false);
224+
await Task.Run(chunkTask, _cancellationToken).ConfigureAwait(false);
225225
}
226226
catch (Exception ex)
227227
{
@@ -464,7 +464,7 @@ public async virtual Task CleanupAbortedJobPartAsync()
464464
// If the job part is paused or ended with failures
465465
// delete the destination resource because it could be unfinished or corrupted
466466
// If we resume we would have to start from the beginning anyways.
467-
await _destinationResource.DeleteIfExistsAsync().ConfigureAwait(false);
467+
await _destinationResource.DeleteIfExistsAsync(_cancellationToken).ConfigureAwait(false);
468468
}
469469
}
470470

sdk/storage/Azure.Storage.DataMovement/src/JobPlan/JobPartPlanFile.cs

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ internal class JobPartPlanFile : IDisposable
2929
/// </summary>
3030
public readonly SemaphoreSlim WriteLock;
3131

32+
private const int DefaultBufferSize = 81920;
33+
3234
private JobPartPlanFile()
3335
{
3436
WriteLock = new SemaphoreSlim(1);
@@ -38,28 +40,22 @@ public static async Task<JobPartPlanFile> CreateJobPartPlanFileAsync(
3840
string checkpointerPath,
3941
string id,
4042
int jobPart,
41-
Stream headerStream)
43+
Stream headerStream,
44+
CancellationToken cancellationToken = default)
4245
{
4346
Argument.AssertNotNullOrEmpty(checkpointerPath, nameof(checkpointerPath));
4447
Argument.AssertNotNullOrEmpty(id, nameof(id));
4548
Argument.AssertNotNull(jobPart, nameof(jobPart));
4649
Argument.AssertNotNull(headerStream, nameof(headerStream));
4750

4851
JobPartPlanFileName fileName = new JobPartPlanFileName(checkpointerPath: checkpointerPath, id: id, jobPartNumber: jobPart);
49-
JobPartPlanFile result = new JobPartPlanFile()
50-
{
51-
FileName = fileName
52-
};
53-
using (FileStream fileStream = File.Create(result.FileName.ToString()))
54-
{
55-
await headerStream.CopyToAsync(fileStream).ConfigureAwait(false);
56-
}
57-
return result;
52+
return await CreateJobPartPlanFileAsync(fileName, headerStream, cancellationToken).ConfigureAwait(false);
5853
}
5954

6055
public static async Task<JobPartPlanFile> CreateJobPartPlanFileAsync(
6156
JobPartPlanFileName fileName,
62-
Stream headerStream)
57+
Stream headerStream,
58+
CancellationToken cancellationToken = default)
6359
{
6460
JobPartPlanFile result = new JobPartPlanFile()
6561
{
@@ -68,7 +64,7 @@ public static async Task<JobPartPlanFile> CreateJobPartPlanFileAsync(
6864

6965
using (FileStream fileStream = File.Create(result.FileName.ToString()))
7066
{
71-
await headerStream.CopyToAsync(fileStream).ConfigureAwait(false);
67+
await headerStream.CopyToAsync(fileStream, DefaultBufferSize, cancellationToken).ConfigureAwait(false);
7268
}
7369
return result;
7470
}

sdk/storage/Azure.Storage.DataMovement/src/JobPlan/JobPlanFile.cs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ internal class JobPlanFile : IDisposable
3434
/// </summary>
3535
public readonly SemaphoreSlim WriteLock;
3636

37+
private const int DefaultBufferSize = 81920;
38+
3739
private JobPlanFile(string id, string filePath)
3840
{
3941
Id = id;
@@ -45,7 +47,8 @@ private JobPlanFile(string id, string filePath)
4547
public static async Task<JobPlanFile> CreateJobPlanFileAsync(
4648
string checkpointerPath,
4749
string id,
48-
Stream headerStream)
50+
Stream headerStream,
51+
CancellationToken cancellationToken = default)
4952
{
5053
Argument.AssertNotNullOrEmpty(checkpointerPath, nameof(checkpointerPath));
5154
Argument.AssertNotNullOrEmpty(id, nameof(id));
@@ -57,7 +60,7 @@ public static async Task<JobPlanFile> CreateJobPlanFileAsync(
5760
JobPlanFile jobPlanFile = new(id, filePath);
5861
using (FileStream fileStream = File.Create(jobPlanFile.FilePath))
5962
{
60-
await headerStream.CopyToAsync(fileStream).ConfigureAwait(false);
63+
await headerStream.CopyToAsync(fileStream, DefaultBufferSize, cancellationToken).ConfigureAwait(false);
6164
}
6265

6366
return jobPlanFile;

sdk/storage/Azure.Storage.DataMovement/src/LocalFileStorageResource.cs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -94,18 +94,19 @@ protected internal override Task<StorageResourceReadStreamResult> ReadStreamAsyn
9494
/// Creates the local file.
9595
/// </summary>
9696
/// <param name="overwrite"></param>
97-
/// <returns></returns>
98-
internal Task CreateAsync(bool overwrite)
97+
internal void Create(bool overwrite)
9998
{
10099
if (overwrite || !File.Exists(_uri.LocalPath))
101100
{
102101
Directory.CreateDirectory(System.IO.Path.GetDirectoryName(_uri.LocalPath));
103102
File.Create(_uri.LocalPath).Close();
104103
FileAttributes attributes = File.GetAttributes(_uri.LocalPath);
105104
File.SetAttributes(_uri.LocalPath, attributes | FileAttributes.Temporary);
106-
return Task.CompletedTask;
107105
}
108-
throw Errors.LocalFileAlreadyExists(_uri.LocalPath);
106+
else
107+
{
108+
throw Errors.LocalFileAlreadyExists(_uri.LocalPath);
109+
}
109110
}
110111

111112
/// <summary>
@@ -137,7 +138,7 @@ protected internal override async Task CopyFromStreamAsync(
137138
long position = options?.Position != default ? options.Position.Value : 0;
138139
if (position == 0)
139140
{
140-
await CreateAsync(overwrite).ConfigureAwait(false);
141+
Create(overwrite);
141142
}
142143
if (streamLength > 0)
143144
{

sdk/storage/Azure.Storage.DataMovement/src/LocalTransferCheckpointer.cs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ public override async Task AddNewJobAsync(
6565
Argument.AssertNotNullOrEmpty(transferId, nameof(transferId));
6666
Argument.AssertNotNull(source, nameof(source));
6767
Argument.AssertNotNull(destination, nameof(destination));
68+
CancellationHelper.ThrowIfCancellationRequested(cancellationToken);
6869

6970
if (_transferStates.ContainsKey(transferId))
7071
{
@@ -94,7 +95,8 @@ public override async Task AddNewJobAsync(
9495
JobPlanFile jobPlanFile = await JobPlanFile.CreateJobPlanFileAsync(
9596
_pathToCheckpointer,
9697
transferId,
97-
headerStream).ConfigureAwait(false);
98+
headerStream,
99+
cancellationToken).ConfigureAwait(false);
98100
_transferStates.Add(transferId, jobPlanFile);
99101
}
100102
}
@@ -108,13 +110,15 @@ public override async Task AddNewJobPartAsync(
108110
Argument.AssertNotNullOrEmpty(transferId, nameof(transferId));
109111
Argument.AssertNotNull(partNumber, nameof(partNumber));
110112
Argument.AssertNotNull(headerStream, nameof(headerStream));
113+
CancellationHelper.ThrowIfCancellationRequested(cancellationToken);
111114
headerStream.Position = 0;
112115

113116
JobPartPlanFile mappedFile = await JobPartPlanFile.CreateJobPartPlanFileAsync(
114117
_pathToCheckpointer,
115118
transferId,
116119
partNumber,
117-
headerStream).ConfigureAwait(false);
120+
headerStream,
121+
cancellationToken).ConfigureAwait(false);
118122

119123
// Add the job part into the current state
120124
if (_transferStates.ContainsKey(transferId))
@@ -153,7 +157,7 @@ public override async Task<Stream> ReadJobPlanFileAsync(
153157
CancellationHelper.ThrowIfCancellationRequested(cancellationToken);
154158
if (_transferStates.TryGetValue(transferId, out JobPlanFile jobPlanFile))
155159
{
156-
await jobPlanFile.WriteLock.WaitAsync().ConfigureAwait(false);
160+
await jobPlanFile.WriteLock.WaitAsync(cancellationToken).ConfigureAwait(false);
157161
try
158162
{
159163
using (MemoryMappedFile mmf = MemoryMappedFile.CreateFromFile(jobPlanFile.FilePath))
@@ -183,6 +187,7 @@ public override async Task<Stream> ReadJobPartPlanFileAsync(
183187
int length,
184188
CancellationToken cancellationToken = default)
185189
{
190+
CancellationHelper.ThrowIfCancellationRequested(cancellationToken);
186191
if (_transferStates.TryGetValue(transferId, out JobPlanFile jobPlanFile))
187192
{
188193
if (jobPlanFile.JobParts.TryGetValue(partNumber, out JobPartPlanFile jobPartPlanFile))
@@ -229,7 +234,7 @@ public override async Task WriteToJobPlanFileAsync(
229234
CancellationHelper.ThrowIfCancellationRequested(cancellationToken);
230235
if (_transferStates.TryGetValue(transferId, out JobPlanFile jobPlanFile))
231236
{
232-
await jobPlanFile.WriteLock.WaitAsync().ConfigureAwait(false);
237+
await jobPlanFile.WriteLock.WaitAsync(cancellationToken).ConfigureAwait(false);
233238
try
234239
{
235240
using (MemoryMappedFile mmf = MemoryMappedFile.CreateFromFile(jobPlanFile.FilePath, FileMode.Open))

sdk/storage/Azure.Storage.DataMovement/src/TransferJobInternal.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -636,7 +636,7 @@ protected async Task OnEnumerationComplete()
636636
/// </summary>
637637
protected async Task OnAllResourcesEnumerated()
638638
{
639-
await _checkpointer.SetEnumerationCompleteAsync(_dataTransfer.Id).ConfigureAwait(false);
639+
await _checkpointer.SetEnumerationCompleteAsync(_dataTransfer.Id, _cancellationToken).ConfigureAwait(false);
640640
}
641641

642642
internal async Task CheckAndUpdateStatusAsync()

sdk/storage/Azure.Storage.DataMovement/src/TransferManager.cs

Lines changed: 27 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
using System.Buffers;
66
using System.Collections.Generic;
77
using System.Linq;
8+
using System.Runtime.CompilerServices;
89
using System.Threading;
910
using System.Threading.Tasks;
1011
using Azure.Core;
@@ -99,15 +100,15 @@ private void ConfigureProcessorCallbacks()
99100
_chunksProcessor.Process = Task.Run;
100101
}
101102

102-
private async Task ProcessJobAsync(TransferJobInternal job, CancellationToken _)
103+
private async Task ProcessJobAsync(TransferJobInternal job, CancellationToken cancellationToken = default)
103104
{
104105
await foreach (JobPartInternal partItem in job.ProcessJobToJobPartAsync().ConfigureAwait(false))
105106
{
106107
job.IncrementJobParts();
107-
await _partsProcessor.QueueAsync(partItem).ConfigureAwait(false);
108+
await _partsProcessor.QueueAsync(partItem, cancellationToken).ConfigureAwait(false);
108109
}
109110
}
110-
private async Task ProcessPartAsync(JobPartInternal part, CancellationToken _)
111+
private async Task ProcessPartAsync(JobPartInternal part, CancellationToken cancellationToken = default)
111112
{
112113
part.SetQueueChunkDelegate(_chunksProcessor.QueueAsync);
113114
await part.ProcessPartToChunkAsync().ConfigureAwait(false);
@@ -147,13 +148,19 @@ public virtual async Task PauseTransferIfRunningAsync(string transferId, Cancell
147148
/// If not specified or specified to <see cref="DataTransferState.None"/>,
148149
/// all transfers will be returned regardless of status.
149150
/// </param>
151+
/// <param name="cancellationToken">
152+
/// Optional <see cref="CancellationToken"/> to propagate
153+
/// notifications that the operation should be cancelled.
154+
/// </param>
150155
/// <returns></returns>
151156
public virtual async IAsyncEnumerable<DataTransfer> GetTransfersAsync(
152-
params DataTransferStatus[] filterByStatus)
157+
ICollection<DataTransferStatus> filterByStatus = default,
158+
[EnumeratorCancellation] CancellationToken cancellationToken = default)
153159
{
154-
await SetDataTransfers().ConfigureAwait(false);
160+
cancellationToken = LinkCancellation(cancellationToken);
161+
await SetDataTransfers(cancellationToken).ConfigureAwait(false);
155162
IEnumerable<DataTransfer> totalTransfers;
156-
if (filterByStatus == default || filterByStatus.Length == 0)
163+
if (filterByStatus == default || filterByStatus.Count == 0)
157164
{
158165
totalTransfers = _dataTransfers.Select(d => d.Value);
159166
}
@@ -172,22 +179,28 @@ public virtual async IAsyncEnumerable<DataTransfer> GetTransfersAsync(
172179
/// <summary>
173180
/// Lists all the transfers stored in the checkpointer that can be resumed.
174181
/// </summary>
182+
/// <param name="cancellationToken">
183+
/// Optional <see cref="CancellationToken"/> to propagate
184+
/// notifications that the operation should be cancelled.
185+
/// </param>
175186
/// <returns>
176187
/// List of <see cref="DataTransferProperties"/> objects that can be used to rebuild resources
177188
/// to resume with.
178189
/// </returns>
179-
public virtual async IAsyncEnumerable<DataTransferProperties> GetResumableTransfersAsync()
190+
public virtual async IAsyncEnumerable<DataTransferProperties> GetResumableTransfersAsync(
191+
[EnumeratorCancellation] CancellationToken cancellationToken = default)
180192
{
181-
List<string> storedTransfers = await _checkpointer.GetStoredTransfersAsync().ConfigureAwait(false);
193+
cancellationToken = LinkCancellation(cancellationToken);
194+
List<string> storedTransfers = await _checkpointer.GetStoredTransfersAsync(cancellationToken).ConfigureAwait(false);
182195
foreach (string transferId in storedTransfers)
183196
{
184-
if (!await _checkpointer.IsResumableAsync(transferId, _cancellationToken).ConfigureAwait(false))
197+
if (!await _checkpointer.IsResumableAsync(transferId, cancellationToken).ConfigureAwait(false))
185198
{
186199
continue;
187200
}
188201

189202
DataTransferProperties properties = await _checkpointer.GetDataTransferPropertiesAsync(
190-
transferId, _cancellationToken).ConfigureAwait(false);
203+
transferId, cancellationToken).ConfigureAwait(false);
191204
yield return properties;
192205
}
193206
}
@@ -214,7 +227,7 @@ public virtual async Task<List<DataTransfer>> ResumeAllTransfersAsync(
214227
}
215228

216229
List<DataTransfer> transfers = new();
217-
await foreach (DataTransferProperties properties in GetResumableTransfersAsync().ConfigureAwait(false))
230+
await foreach (DataTransferProperties properties in GetResumableTransfersAsync(cancellationToken).ConfigureAwait(false))
218231
{
219232
transfers.Add(await ResumeTransferAsync(properties, transferOptions, cancellationToken).ConfigureAwait(false));
220233
}
@@ -403,14 +416,14 @@ private async Task<DataTransfer> BuildAndAddTransferJobAsync(
403416
}
404417
#endregion
405418

406-
private async Task SetDataTransfers()
419+
private async Task SetDataTransfers(CancellationToken cancellationToken = default)
407420
{
408421
_dataTransfers.Clear();
409422

410-
List<string> storedTransfers = await _checkpointer.GetStoredTransfersAsync().ConfigureAwait(false);
423+
List<string> storedTransfers = await _checkpointer.GetStoredTransfersAsync(cancellationToken).ConfigureAwait(false);
411424
foreach (string transferId in storedTransfers)
412425
{
413-
DataTransferStatus jobStatus = await _checkpointer.GetJobStatusAsync(transferId).ConfigureAwait(false);
426+
DataTransferStatus jobStatus = await _checkpointer.GetJobStatusAsync(transferId, cancellationToken).ConfigureAwait(false);
414427
_dataTransfers.Add(transferId, new DataTransfer(
415428
id: transferId,
416429
status: jobStatus)

0 commit comments

Comments
 (0)