@@ -79,6 +79,42 @@ private void AssertAllJobsAndPartsCompleted(int numJobs, int numJobParts, List<D
7979 }
8080 }
8181
82+ private async Task AssertResumeTransfer (
83+ int numJobs ,
84+ int numJobParts ,
85+ int numChunks ,
86+ List < DataTransfer > resumedTransfers ,
87+ MemoryTransferCheckpointer checkpointer ,
88+ StepProcessor < TransferJobInternal > jobsProcessor ,
89+ StepProcessor < JobPartInternal > partsProcessor ,
90+ StepProcessor < Func < Task > > chunksProcessor )
91+ {
92+ await Task . Delay ( 50 ) ;
93+ int pausedJobsCount = GetJobsStateCount ( resumedTransfers , checkpointer ) [ DataTransferState . Paused ] ;
94+ Assert . That ( pausedJobsCount , Is . EqualTo ( numJobs ) ) ;
95+
96+ // process jobs on resume
97+ Assert . That ( await jobsProcessor . StepAll ( ) , Is . EqualTo ( numJobs ) , "Error job processing on resume" ) ;
98+
99+ await Task . Delay ( 50 ) ;
100+ int inProgressJobsCount = GetJobsStateCount ( resumedTransfers , checkpointer ) [ DataTransferState . InProgress ] ;
101+ int enumerationCompleteCount = GetEnumerationCompleteCount ( resumedTransfers , checkpointer ) ;
102+ Assert . That ( enumerationCompleteCount , Is . EqualTo ( numJobs ) , "Error: all jobs should have finished enumerating" ) ;
103+ Assert . That ( inProgressJobsCount , Is . EqualTo ( numJobs ) , "Error: all jobs should be in InProgress state after Job Processing on resume" ) ;
104+
105+ // process job parts on resume
106+ Assert . That ( await partsProcessor . StepAll ( ) , Is . EqualTo ( numJobParts ) , "Error job part processing on resume" ) ;
107+
108+ await Task . Delay ( 50 ) ;
109+ int inProgressJobPartsCount = GetJobPartsStateCount ( resumedTransfers , checkpointer ) [ DataTransferState . InProgress ] ;
110+ Assert . That ( inProgressJobPartsCount , Is . EqualTo ( numJobParts ) , "Error: all job parts should be in InProgress state after Part Processing on resume" ) ;
111+
112+ // process chunks on resume
113+ Assert . That ( await chunksProcessor . StepAll ( ) , Is . EqualTo ( numChunks ) , "Error chunk processing on resume" ) ;
114+ await Task . Delay ( 50 ) ;
115+ AssertAllJobsAndPartsCompleted ( numJobs , numJobParts , resumedTransfers , checkpointer ) ;
116+ }
117+
82118 [ Test ]
83119 [ Combinatorial ]
84120 public async Task PauseResumeDuringJobProcessing_ItemTransfer (
@@ -211,31 +247,7 @@ public async Task PauseResumeDuringJobProcessing_ItemTransfer(
211247 MaximumTransferChunkSize = chunkSize ,
212248 } ) ;
213249
214- await Task . Delay ( 50 ) ;
215- int pausedJobsCount_resume = GetJobsStateCount ( resumedTransfers , checkpointer ) [ DataTransferState . Paused ] ;
216- Assert . That ( pausedJobsCount_resume , Is . EqualTo ( items ) ) ;
217-
218- // process jobs on resume
219- Assert . That ( await jobsProcessor . StepAll ( ) , Is . EqualTo ( items ) , "Error job processing on resume" ) ;
220-
221- await Task . Delay ( 50 ) ;
222- int inProgressJobsCount = GetJobsStateCount ( resumedTransfers , checkpointer ) [ DataTransferState . InProgress ] ;
223- int enumerationCompleteCount2 = GetEnumerationCompleteCount ( transfers , checkpointer ) ;
224- Assert . That ( enumerationCompleteCount2 , Is . EqualTo ( items ) , "Error: all jobs should have finished enumerating" ) ;
225- Assert . That ( inProgressJobsCount , Is . EqualTo ( items ) , "Error: all jobs should be in InProgress state after Job Processing on resume" ) ;
226-
227- // process job parts on resume
228- Assert . That ( await partsProcessor . StepAll ( ) , Is . EqualTo ( items ) , "Error job part processing on resume" ) ;
229-
230- await Task . Delay ( 50 ) ;
231- int inProgressJobPartsCount = GetJobPartsStateCount ( resumedTransfers , checkpointer ) [ DataTransferState . InProgress ] ;
232- Assert . That ( inProgressJobPartsCount , Is . EqualTo ( items ) , "Error: all job parts should be in InProgress state after Job Processing on resume" ) ;
233-
234- // process chunks on resume
235- Assert . That ( await chunksProcessor . StepAll ( ) , Is . EqualTo ( numChunks ) , "Error chunk processing on resume" ) ;
236-
237- await Task . Delay ( 50 ) ;
238- AssertAllJobsAndPartsCompleted ( items , items , transfers , checkpointer ) ;
250+ await AssertResumeTransfer ( items , items , numChunks , resumedTransfers , checkpointer , jobsProcessor , partsProcessor , chunksProcessor ) ;
239251 }
240252
241253 [ Test ]
@@ -371,33 +383,7 @@ public async Task PauseResumeDuringPartProcessing_ItemTransfer(
371383 // START RESUME TRANSFERS
372384 List < DataTransfer > resumedTransfers = await transferManager . ResumeAllTransfersAsync ( ) ;
373385
374- await Task . Delay ( 50 ) ;
375- int pausedJobsCount_resume = GetJobsStateCount ( resumedTransfers , checkpointer ) [ DataTransferState . Paused ] ;
376- Assert . That ( pausedJobsCount_resume , Is . EqualTo ( items ) ) ;
377-
378- // process jobs on resume
379- Assert . That ( await jobsProcessor . StepAll ( ) , Is . EqualTo ( items ) , "Error job processing on resume" ) ;
380-
381- await Task . Delay ( 50 ) ;
382- int inProgressJobsCount = GetJobsStateCount ( resumedTransfers , checkpointer ) [ DataTransferState . InProgress ] ;
383- int pausedJobPartsCount = GetJobPartsStateCount ( resumedTransfers , checkpointer ) [ DataTransferState . Paused ] ;
384- int enumerationCompleteCount2 = GetEnumerationCompleteCount ( transfers , checkpointer ) ;
385- Assert . That ( enumerationCompleteCount2 , Is . EqualTo ( items ) , "Error: all jobs should have finished enumerating" ) ;
386- Assert . That ( inProgressJobsCount , Is . EqualTo ( items ) , "Error: all jobs should be in InProgress state after Job Processing on resume" ) ;
387- Assert . That ( pausedJobPartsCount , Is . EqualTo ( items ) ) ;
388-
389- // process job parts on resume
390- Assert . That ( await partsProcessor . StepAll ( ) , Is . EqualTo ( items ) , "Error job part processing on resume" ) ;
391-
392- await Task . Delay ( 50 ) ;
393- int inProgressJobPartsCount = GetJobPartsStateCount ( resumedTransfers , checkpointer ) [ DataTransferState . InProgress ] ;
394- Assert . That ( inProgressJobPartsCount , Is . EqualTo ( items ) , "Error: all job parts should be in InProgress state after Part Processing on resume" ) ;
395-
396- // process chunks on resume
397- Assert . That ( await chunksProcessor . StepAll ( ) , Is . EqualTo ( numChunks ) , "Error chunk processing on resume" ) ;
398-
399- await Task . Delay ( 50 ) ;
400- AssertAllJobsAndPartsCompleted ( items , items , transfers , checkpointer ) ;
386+ await AssertResumeTransfer ( items , items , numChunks , resumedTransfers , checkpointer , jobsProcessor , partsProcessor , chunksProcessor ) ;
401387 }
402388
403389 [ Test ]
@@ -548,63 +534,14 @@ public async Task PauseResumeDuringChunkProcessing_ItemTransfer(
548534 // START RESUME TRANSFERS
549535 List < DataTransfer > resumedTransfers = await transferManager . ResumeAllTransfersAsync ( ) ;
550536
551- await Task . Delay ( 50 ) ;
552- int pausedJobsCount_resume = GetJobsStateCount ( resumedTransfers , checkpointer ) [ DataTransferState . Paused ] ;
553537 if ( pauseLocation == PauseLocation . PauseProcessHalfway )
554538 {
555- Assert . That ( pausedJobsCount_resume , Is . EqualTo ( items / 2 ) ) ;
556- Assert . That ( jobsProcessor . ItemsInQueue , Is . EqualTo ( items / 2 ) , "Error in job processor on resume" ) ;
539+ await AssertResumeTransfer ( items / 2 , items / 2 , numChunks / 2 , resumedTransfers , checkpointer , jobsProcessor , partsProcessor , chunksProcessor ) ;
557540 }
558541 else
559542 {
560- Assert . That ( pausedJobsCount_resume , Is . EqualTo ( items ) ) ;
561- Assert . That ( jobsProcessor . ItemsInQueue , Is . EqualTo ( items ) , "Error in job processor on resume" ) ;
543+ await AssertResumeTransfer ( items , items , numChunks , resumedTransfers , checkpointer , jobsProcessor , partsProcessor , chunksProcessor ) ;
562544 }
563-
564- // process jobs on resume
565- await jobsProcessor . StepAll ( ) ;
566-
567- await Task . Delay ( 50 ) ;
568- int inProgressJobsCount = GetJobsStateCount ( resumedTransfers , checkpointer ) [ DataTransferState . InProgress ] ;
569- int pausedJobPartsCount = GetJobPartsStateCount ( resumedTransfers , checkpointer ) [ DataTransferState . Paused ] ;
570- int enumerationCompleteCount2 = GetEnumerationCompleteCount ( transfers , checkpointer ) ;
571- Assert . That ( enumerationCompleteCount2 , Is . EqualTo ( items ) , "Error: all jobs should have finished enumerating" ) ;
572- if ( pauseLocation == PauseLocation . PauseProcessHalfway )
573- {
574- // the count for all jobs for PauseProcessHalfway is (items / 2) since half have already completed transfer
575- Assert . That ( inProgressJobsCount , Is . EqualTo ( items / 2 ) , "Error: all jobs should be in InProgress state after Job Processing on resume" ) ;
576- Assert . That ( pausedJobPartsCount , Is . EqualTo ( items / 2 ) ) ;
577- Assert . That ( partsProcessor . ItemsInQueue , Is . EqualTo ( items / 2 ) , "Error in job part processor on resume" ) ;
578- }
579- else
580- {
581- Assert . That ( inProgressJobsCount , Is . EqualTo ( items ) , "Error: all jobs should be in InProgress state after Job Processing on resume" ) ;
582- Assert . That ( pausedJobPartsCount , Is . EqualTo ( items ) ) ;
583- Assert . That ( partsProcessor . ItemsInQueue , Is . EqualTo ( items ) , "Error in job part processor on resume" ) ;
584- }
585-
586- // process job parts on resume
587- await partsProcessor . StepAll ( ) ;
588-
589- await Task . Delay ( 50 ) ;
590- int inProgressJobPartsCount = GetJobPartsStateCount ( resumedTransfers , checkpointer ) [ DataTransferState . InProgress ] ;
591- if ( pauseLocation == PauseLocation . PauseProcessHalfway )
592- {
593- // the count for all job parts for PauseProcessHalfway is (items / 2) since half have already completed transfer
594- Assert . That ( inProgressJobPartsCount , Is . EqualTo ( items / 2 ) , "Error: all job parts should be in InProgress state after Part Processing on resume" ) ;
595- Assert . That ( chunksProcessor . ItemsInQueue , Is . EqualTo ( numChunks / 2 ) , "Error in chunk processor on resume" ) ;
596- }
597- else
598- {
599- Assert . That ( inProgressJobPartsCount , Is . EqualTo ( items ) , "Error: all job parts should be in InProgress state after Part Processing on resume" ) ;
600- Assert . That ( chunksProcessor . ItemsInQueue , Is . EqualTo ( numChunks ) , "Error in chunk processor on resume" ) ;
601- }
602-
603- // process chunks on resume
604- await chunksProcessor . StepAll ( ) ;
605-
606- await Task . Delay ( 50 ) ;
607- AssertAllJobsAndPartsCompleted ( items , items , transfers , checkpointer ) ;
608545 }
609546
610547 [ Test ]
@@ -740,31 +677,7 @@ public async Task PauseResumeDuringJobProcessing_ContainerTransfer(
740677 MaximumTransferChunkSize = chunkSize ,
741678 } ) ;
742679
743- await Task . Delay ( 50 ) ;
744- int pausedJobsCount_resume = GetJobsStateCount ( resumedTransfers , checkpointer ) [ DataTransferState . Paused ] ;
745- Assert . That ( pausedJobsCount_resume , Is . EqualTo ( numJobs ) ) ;
746-
747- // process jobs on resume
748- Assert . That ( await jobsProcessor . StepAll ( ) , Is . EqualTo ( numJobs ) , "Error job processing on resume" ) ;
749-
750- await Task . Delay ( 50 ) ;
751- int inProgressJobsCount = GetJobsStateCount ( resumedTransfers , checkpointer ) [ DataTransferState . InProgress ] ;
752- int enumerationCompleteCount2 = GetEnumerationCompleteCount ( transfers , checkpointer ) ;
753- Assert . That ( enumerationCompleteCount2 , Is . EqualTo ( numJobs ) , "Error: all jobs should have finished enumerating" ) ;
754- Assert . That ( inProgressJobsCount , Is . EqualTo ( numJobs ) , "Error: all jobs should be in InProgress state after Job Processing on resume" ) ;
755-
756- // process job parts on resume
757- Assert . That ( await partsProcessor . StepAll ( ) , Is . EqualTo ( numJobParts ) , "Error job part processing on resume" ) ;
758-
759- await Task . Delay ( 50 ) ;
760- int inProgressJobPartsCount = GetJobPartsStateCount ( resumedTransfers , checkpointer ) [ DataTransferState . InProgress ] ;
761- Assert . That ( inProgressJobPartsCount , Is . EqualTo ( numJobParts ) , "Error: all job parts should be in InProgress state after Part Processing on resume" ) ;
762-
763- // process chunks on resume
764- Assert . That ( await chunksProcessor . StepAll ( ) , Is . EqualTo ( numChunks ) , "Error chunk processing on resume" ) ;
765-
766- await Task . Delay ( 50 ) ;
767- AssertAllJobsAndPartsCompleted ( numJobs , numJobParts , transfers , checkpointer ) ;
680+ await AssertResumeTransfer ( numJobs , numJobParts , numChunks , resumedTransfers , checkpointer , jobsProcessor , partsProcessor , chunksProcessor ) ;
768681 }
769682
770683 [ Test ]
@@ -900,33 +813,7 @@ public async Task PauseResumeDuringPartProcessing_ContainerTransfer(
900813 // START RESUME TRANSFERS
901814 List < DataTransfer > resumedTransfers = await transferManager . ResumeAllTransfersAsync ( ) ;
902815
903- await Task . Delay ( 50 ) ;
904- int pausedJobsCount_resume = GetJobsStateCount ( resumedTransfers , checkpointer ) [ DataTransferState . Paused ] ;
905- Assert . That ( pausedJobsCount_resume , Is . EqualTo ( numJobs ) ) ;
906-
907- // process jobs on resume
908- Assert . That ( await jobsProcessor . StepAll ( ) , Is . EqualTo ( numJobs ) , "Error job processing on resume" ) ;
909-
910- await Task . Delay ( 50 ) ;
911- int inProgressJobsCount = GetJobsStateCount ( resumedTransfers , checkpointer ) [ DataTransferState . InProgress ] ;
912- int pausedJobPartsCount = GetJobPartsStateCount ( resumedTransfers , checkpointer ) [ DataTransferState . Paused ] ;
913- int enumerationCompleteCount2 = GetEnumerationCompleteCount ( transfers , checkpointer ) ;
914- Assert . That ( enumerationCompleteCount2 , Is . EqualTo ( numJobs ) , "Error: all jobs should have finished enumerating" ) ;
915- Assert . That ( inProgressJobsCount , Is . EqualTo ( numJobs ) , "Error: all jobs should be in InProgress state after Job Processing on resume" ) ;
916- Assert . That ( pausedJobPartsCount , Is . EqualTo ( numJobParts ) ) ;
917-
918- // process job parts on resume
919- Assert . That ( await partsProcessor . StepAll ( ) , Is . EqualTo ( numJobParts ) , "Error job part processing on resume" ) ;
920-
921- await Task . Delay ( 50 ) ;
922- int inProgressJobPartsCount = GetJobPartsStateCount ( resumedTransfers , checkpointer ) [ DataTransferState . InProgress ] ;
923- Assert . That ( inProgressJobPartsCount , Is . EqualTo ( numJobParts ) , "Error: all job parts should be in InProgress state after Part Processing on resume" ) ;
924-
925- // process chunks on resume
926- Assert . That ( await chunksProcessor . StepAll ( ) , Is . EqualTo ( numChunks ) , "Error chunk processing on resume" ) ;
927-
928- await Task . Delay ( 50 ) ;
929- AssertAllJobsAndPartsCompleted ( numJobs , numJobParts , transfers , checkpointer ) ;
816+ await AssertResumeTransfer ( numJobs , numJobParts , numChunks , resumedTransfers , checkpointer , jobsProcessor , partsProcessor , chunksProcessor ) ;
930817 }
931818
932819 [ Test ]
@@ -1091,58 +978,17 @@ public async Task PauseResumeDuringChunkProcessing_ContainerTransfer(
1091978 // START RESUME TRANSFERS
1092979 List < DataTransfer > resumedTransfers = await transferManager . ResumeAllTransfersAsync ( ) ;
1093980
1094- await Task . Delay ( 50 ) ;
1095- int expectedJobsCount_half = numJobs - expectedAlreadyCompletedJobsCount_half ;
1096- if ( pauseLocation == PauseLocation . PauseProcessHalfway )
1097- {
1098- Assert . That ( jobsProcessor . ItemsInQueue , Is . EqualTo ( expectedJobsCount_half ) , "Error in job processor on resume" ) ;
1099- }
1100- else
1101- {
1102- Assert . That ( jobsProcessor . ItemsInQueue , Is . EqualTo ( numJobs ) , "Error in job processor on resume" ) ;
1103- }
1104-
1105- // process jobs on resume
1106- await jobsProcessor . StepAll ( ) ;
1107-
1108- await Task . Delay ( 50 ) ;
1109- int inProgressJobsCount = GetJobsStateCount ( resumedTransfers , checkpointer ) [ DataTransferState . InProgress ] ;
1110- int enumerationCompleteCount2 = GetEnumerationCompleteCount ( transfers , checkpointer ) ;
1111- Assert . That ( enumerationCompleteCount2 , Is . EqualTo ( numJobs ) , "Error: all jobs should have finished enumerating" ) ;
1112- int expectedPartsCount_half = Enumerable . Range ( numJobs + 1 - expectedJobsCount_half , expectedJobsCount_half )
1113- . Sum ( i => i * 2 ) ;
1114981 if ( pauseLocation == PauseLocation . PauseProcessHalfway )
1115982 {
1116- Assert . That ( inProgressJobsCount , Is . EqualTo ( expectedJobsCount_half ) , "Error: all remaining jobs should be in InProgress state after Job Processing on resume" ) ;
1117- Assert . That ( partsProcessor . ItemsInQueue , Is . EqualTo ( expectedPartsCount_half ) , "Error in parts processor on resume" ) ;
983+ int expectedJobsCount = numJobs - expectedAlreadyCompletedJobsCount_half ;
984+ int expectedPartsCount = Enumerable . Range ( numJobs + 1 - expectedJobsCount , expectedJobsCount )
985+ . Sum ( i => i * 2 ) ;
986+ await AssertResumeTransfer ( expectedJobsCount , expectedPartsCount , expectedPartsCount , resumedTransfers , checkpointer , jobsProcessor , partsProcessor , chunksProcessor ) ;
1118987 }
1119988 else
1120989 {
1121- Assert . That ( inProgressJobsCount , Is . EqualTo ( numJobs ) , "Error: all jobs should be in InProgress state after Job Processing on resume" ) ;
1122- Assert . That ( partsProcessor . ItemsInQueue , Is . EqualTo ( numJobParts ) , "Error in parts processor on resume" ) ;
990+ await AssertResumeTransfer ( numJobs , numJobParts , numChunks , resumedTransfers , checkpointer , jobsProcessor , partsProcessor , chunksProcessor ) ;
1123991 }
1124-
1125- // process job parts on resume
1126- await partsProcessor . StepAll ( ) ;
1127-
1128- await Task . Delay ( 50 ) ;
1129- int inProgressJobPartsCount = GetJobPartsStateCount ( resumedTransfers , checkpointer ) [ DataTransferState . InProgress ] ;
1130- if ( pauseLocation == PauseLocation . PauseProcessHalfway )
1131- {
1132- Assert . That ( inProgressJobPartsCount , Is . EqualTo ( expectedPartsCount_half ) , "Error: all remaining job parts should be in InProgress state after Part Processing on resume" ) ;
1133- Assert . That ( chunksProcessor . ItemsInQueue , Is . EqualTo ( expectedPartsCount_half ) , "Error in chunks processor on resume" ) ; // For this test, job parts is 1:1 with job chunks
1134- }
1135- else
1136- {
1137- Assert . That ( inProgressJobPartsCount , Is . EqualTo ( numJobParts ) , "Error: all job parts should be in InProgress state after Part Processing on resume" ) ;
1138- Assert . That ( chunksProcessor . ItemsInQueue , Is . EqualTo ( numChunks ) , "Error in chunks processor on resume" ) ;
1139- }
1140-
1141- // process chunks on resume
1142- await chunksProcessor . StepAll ( ) ;
1143-
1144- await Task . Delay ( 50 ) ;
1145- AssertAllJobsAndPartsCompleted ( numJobs , numJobParts , transfers , checkpointer ) ;
1146992 }
1147993
1148994 public enum PauseLocation
0 commit comments