5
5
using System . Collections . Generic ;
6
6
using System . Globalization ;
7
7
using System . IO ;
8
- using System . Linq ;
9
8
using System . Threading ;
10
9
using System . Threading . Tasks ;
11
- using System . Xml . Linq ;
12
10
using Azure . Core ;
13
11
using Azure . Storage . DataMovement . Models ;
12
+ using Azure . Storage . DataMovement . JobPlanModels ;
13
+ using System . Linq ;
14
14
15
15
namespace Azure . Storage . DataMovement
16
16
{
@@ -36,7 +36,7 @@ internal abstract class JobPartInternal
36
36
///
37
37
/// Will be disposed of once all tasks of the job have completed or have been cancelled.
38
38
/// </summary>
39
- internal CancellationTokenSource _cancellationTokenSource { get ; set ; }
39
+ internal CancellationToken _cancellationToken { get ; set ; }
40
40
41
41
/// <summary>
42
42
/// Plan file writer for the respective job.
@@ -119,6 +119,9 @@ internal abstract class JobPartInternal
119
119
/// </summary>
120
120
public SyncAsyncEventHandler < SingleTransferCompletedEventArgs > SingleTransferCompletedEventHandler { get ; internal set ; }
121
121
122
+ private List < Task < bool > > _chunkTasks ;
123
+ private List < TaskCompletionSource < bool > > _chunkTaskSources ;
124
+
122
125
/// <summary>
123
126
/// Array pools for reading from streams to upload
124
127
/// </summary>
@@ -143,18 +146,19 @@ internal JobPartInternal(
143
146
SyncAsyncEventHandler < TransferFailedEventArgs > failedEventHandler ,
144
147
SyncAsyncEventHandler < TransferSkippedEventArgs > skippedEventHandler ,
145
148
SyncAsyncEventHandler < SingleTransferCompletedEventArgs > singleTransferEventHandler ,
146
- CancellationTokenSource cancellationTokenSource ,
149
+ CancellationToken cancellationToken ,
150
+ StorageTransferStatus jobPartStatus = StorageTransferStatus . Queued ,
147
151
long ? length = default )
148
152
{
149
- JobPartStatus = StorageTransferStatus . Queued ;
153
+ JobPartStatus = jobPartStatus ;
150
154
PartNumber = partNumber ;
151
155
_dataTransfer = dataTransfer ;
152
156
_sourceResource = sourceResource ;
153
157
_destinationResource = destinationResource ;
154
158
_errorHandling = errorHandling ;
155
159
_createMode = createMode ;
156
160
_checkpointer = checkpointer ;
157
- _cancellationTokenSource = cancellationTokenSource ;
161
+ _cancellationToken = cancellationToken ;
158
162
_arrayPool = arrayPool ;
159
163
PartTransferStatusEventHandler = jobPartEventHandler ;
160
164
TransferStatusEventHandler = statusEventHandler ;
@@ -178,26 +182,75 @@ internal JobPartInternal(
178
182
}
179
183
180
184
Length = length ;
185
+ _chunkTasks = new List < Task < bool > > ( ) ;
186
+ _chunkTaskSources = new List < TaskCompletionSource < bool > > ( ) ;
181
187
}
182
188
183
189
public void SetQueueChunkDelegate ( QueueChunkDelegate chunkDelegate )
184
190
{
185
191
QueueChunk = chunkDelegate ;
186
192
}
187
193
194
+ /// <summary>
195
+ /// Queues the task to the main chunk channel and also appends the tracking
196
+ /// completion source to the task. So we know the state of each chunk especially
197
+ /// when we're looking to stop/pause the job part.
198
+ /// </summary>
199
+ /// <returns></returns>
200
+ public async Task QueueChunkToChannelAsync ( Task chunkTask )
201
+ {
202
+ // Attach TaskCompletionSource
203
+ TaskCompletionSource < bool > chunkCompleted = new TaskCompletionSource < bool > (
204
+ false ,
205
+ TaskCreationOptions . RunContinuationsAsynchronously ) ;
206
+ _chunkTaskSources . Add ( chunkCompleted ) ;
207
+ _chunkTasks . Add ( chunkCompleted . Task ) ;
208
+
209
+ await QueueChunk (
210
+ async ( ) =>
211
+ {
212
+ await chunkTask . ConfigureAwait ( false ) ;
213
+ chunkCompleted . SetResult ( true ) ;
214
+ await CheckAndUpdateCancellationStatusAsync ( ) . ConfigureAwait ( false ) ;
215
+ } ) . ConfigureAwait ( false ) ;
216
+ }
217
+
188
218
/// <summary>
189
219
/// Processes the job to job parts
190
220
/// </summary>
191
221
/// <returns>An IEnumerable that contains the job chunks</returns>
192
222
public abstract Task ProcessPartToChunkAsync ( ) ;
193
223
194
- internal async Task TriggerCancellation ( StorageTransferStatus status )
224
+ /// <summary>
225
+ /// Triggers the cancellation for the Job Part.
226
+ ///
227
+ /// If the status is set to <see cref="StorageTransferStatus.Paused"/>
228
+ /// and any chunks is still processing to be cancelled is will be set to <see cref="StorageTransferStatus.PauseInProgress"/>
229
+ /// until the chunks finish then it will be set to <see cref="StorageTransferStatus.Paused"/>.
230
+ ///
231
+ /// If the status is set to <see cref="StorageTransferStatus.CompletedWithFailedTransfers"/>
232
+ /// and any chunks is still processing to be cancelled is will be set to <see cref="StorageTransferStatus.CancellationInProgress"/>
233
+ /// until the chunks finish then it will be set to <see cref="StorageTransferStatus.CompletedWithFailedTransfers"/>.
234
+ /// </summary>
235
+ /// <returns></returns>
236
+ internal async Task TriggerCancellationAsync ( )
195
237
{
196
- if ( ! _cancellationTokenSource . IsCancellationRequested )
238
+ if ( ! _cancellationToken . IsCancellationRequested )
197
239
{
198
- _cancellationTokenSource . Cancel ( ) ;
240
+ _dataTransfer . _state . TriggerCancellation ( ) ;
241
+ }
242
+ // Set the status to Pause/CancellationInProgress
243
+ if ( StorageTransferStatus . PauseInProgress == _dataTransfer . TransferStatus )
244
+ {
245
+ // It's possible that the status hasn't propagated down to the job part
246
+ // status yet here since we pause from the data transfer object.
247
+ await OnTransferStatusChanged ( StorageTransferStatus . PauseInProgress ) . ConfigureAwait ( false ) ;
248
+ }
249
+ else
250
+ {
251
+ // It's a cancellation if a pause wasn't called.
252
+ await OnTransferStatusChanged ( StorageTransferStatus . CancellationInProgress ) . ConfigureAwait ( false ) ;
199
253
}
200
- await OnTransferStatusChanged ( status ) . ConfigureAwait ( false ) ;
201
254
_dataTransfer . _state . ResetTransferredBytes ( ) ;
202
255
}
203
256
@@ -223,12 +276,15 @@ internal async Task OnTransferStatusChanged(StorageTransferStatus transferStatus
223
276
{
224
277
await InvokeSingleCompletedArg ( ) . ConfigureAwait ( false ) ;
225
278
}
279
+ // Set the status in the checkpointer
280
+ await SetCheckpointerStatus ( transferStatus ) . ConfigureAwait ( false ) ;
281
+
226
282
// TODO: change to RaiseAsync
227
283
await PartTransferStatusEventHandler . Invoke ( new TransferStatusEventArgs (
228
284
_dataTransfer . Id ,
229
285
transferStatus ,
230
286
false ,
231
- _cancellationTokenSource . Token ) ) . ConfigureAwait ( false ) ;
287
+ _cancellationToken ) ) . ConfigureAwait ( false ) ;
232
288
}
233
289
}
234
290
@@ -251,12 +307,12 @@ await SingleTransferCompletedEventHandler.Invoke(
251
307
_sourceResource ,
252
308
_destinationResource ,
253
309
false ,
254
- _cancellationTokenSource . Token ) ) . ConfigureAwait ( false ) ;
310
+ _cancellationToken ) ) . ConfigureAwait ( false ) ;
255
311
}
256
312
}
257
313
258
314
/// <summary>
259
- /// Invokes Failed Argument
315
+ /// Invokes Skipped Argument Event.
260
316
/// </summary>
261
317
public async virtual Task InvokeSkippedArg ( )
262
318
{
@@ -268,13 +324,13 @@ await TransferSkippedEventHandler.Invoke(new TransferSkippedEventArgs(
268
324
_sourceResource ,
269
325
_destinationResource ,
270
326
false ,
271
- _cancellationTokenSource . Token ) ) . ConfigureAwait ( false ) ;
327
+ _cancellationToken ) ) . ConfigureAwait ( false ) ;
272
328
}
273
329
await OnTransferStatusChanged ( StorageTransferStatus . CompletedWithSkippedTransfers ) . ConfigureAwait ( false ) ;
274
330
}
275
331
276
332
/// <summary>
277
- /// Invokes Failed Argument
333
+ /// Invokes Failed Argument Event.
278
334
/// </summary>
279
335
public async virtual Task InvokeFailedArg ( Exception ex )
280
336
{
@@ -287,10 +343,34 @@ await TransferFailedEventHandler.Invoke(new TransferFailedEventArgs(
287
343
_destinationResource ,
288
344
ex ,
289
345
false ,
290
- _cancellationTokenSource . Token ) ) . ConfigureAwait ( false ) ;
346
+ _cancellationToken ) ) . ConfigureAwait ( false ) ;
291
347
}
292
348
// Trigger job cancellation if the failed handler is enabled
293
- await TriggerCancellation ( StorageTransferStatus . CompletedWithFailedTransfers ) . ConfigureAwait ( false ) ;
349
+ await TriggerCancellationAsync ( ) . ConfigureAwait ( false ) ;
350
+ await CheckAndUpdateCancellationStatusAsync ( ) . ConfigureAwait ( false ) ;
351
+ }
352
+
353
+ public async virtual Task AddJobPartToCheckpointer ( int chunksTotal )
354
+ {
355
+ JobPartPlanHeader header = this . ToJobPartPlanHeader ( StorageTransferStatus . InProgress ) ;
356
+ using ( Stream stream = new MemoryStream ( ) )
357
+ {
358
+ header . Serialize ( stream ) ;
359
+ await _checkpointer . AddNewJobPartAsync (
360
+ transferId : _dataTransfer . Id ,
361
+ partNumber : PartNumber ,
362
+ chunksTotal : chunksTotal ,
363
+ headerStream : stream ,
364
+ cancellationToken : _cancellationToken ) . ConfigureAwait ( false ) ;
365
+ }
366
+ }
367
+
368
+ internal async virtual Task SetCheckpointerStatus ( StorageTransferStatus status )
369
+ {
370
+ await _checkpointer . SetJobPartTransferStatusAsync (
371
+ transferId : _dataTransfer . Id ,
372
+ partNumber : PartNumber ,
373
+ status : status ) . ConfigureAwait ( false ) ;
294
374
}
295
375
296
376
internal long CalculateBlockSize ( long length )
@@ -381,5 +461,20 @@ internal static long ParseRangeTotalLength(string range)
381
461
absolutePosition += blockLength ;
382
462
}
383
463
}
464
+
465
+ internal async Task CheckAndUpdateCancellationStatusAsync ( )
466
+ {
467
+ if ( _chunkTasks . All ( ( Task task ) => ( task . IsCompleted ) ) )
468
+ {
469
+ if ( _dataTransfer . TransferStatus == StorageTransferStatus . PauseInProgress )
470
+ {
471
+ await OnTransferStatusChanged ( StorageTransferStatus . Paused ) . ConfigureAwait ( false ) ;
472
+ }
473
+ else if ( _dataTransfer . TransferStatus == StorageTransferStatus . CancellationInProgress )
474
+ {
475
+ await OnTransferStatusChanged ( StorageTransferStatus . CompletedWithFailedTransfers ) . ConfigureAwait ( false ) ;
476
+ }
477
+ }
478
+ }
384
479
}
385
480
}
0 commit comments