44import  java .util .ArrayList ;
55import  java .util .List ;
66import  java .util .Map ;
7- import  java .util .Optional ;
7+ import  java .util .concurrent .ExecutorService ;
8+ import  java .util .concurrent .Executors ;
9+ import  java .util .concurrent .TimeUnit ;
810import  java .util .stream .Collectors ;
9- import   java . util . stream . Stream ; 
11+ 
1012import  lombok .Getter ;
1113import  lombok .RequiredArgsConstructor ;
1214import  lombok .extern .slf4j .Slf4j ;
15+ import  org .apache .commons .lang3 .time .StopWatch ;
1316import  org .springframework .beans .factory .annotation .Value ;
1417import  org .springframework .stereotype .Component ;
1518import  uk .gov .hmcts .reform .ccd .client .model .CaseDetails ;
19+ import  uk .gov .hmcts .reform .ccd .client .model .SearchResult ;
1620import  uk .gov .hmcts .reform .migration .ccd .CoreCaseDataService ;
1721import  uk .gov .hmcts .reform .migration .service .DataMigrationService ;
1822
23+ import  static  uk .gov .hmcts .reform .migration .queries .CcdElasticSearchQueries .fetchAllUnsetCaseAccessManagementFieldsCasesQuery ;
24+ 
1925@ Slf4j 
2026@ Component 
2127@ RequiredArgsConstructor 
@@ -25,6 +31,8 @@ public class CaseMigrationProcessor {
2531    private  static  final  String  EVENT_SUMMARY  = "Migrate Case" ;
2632    private  static  final  String  EVENT_DESCRIPTION  = "Migrate Case" ;
2733
34+     private  final  StopWatch  totalTimer  = new  StopWatch ();
35+ 
2836    private  final  CoreCaseDataService  coreCaseDataService ;
2937    private  final  DataMigrationService <?> dataMigrationService ;
3038
@@ -40,7 +48,16 @@ public class CaseMigrationProcessor {
4048    @ Value ("${migration.parallel}" )
4149    private  boolean  parallel ;
4250
43-     public  void  processSingleCase (String  userToken , String  caseId , boolean  dryrun ) {
51+     @ Value ("${migration.pageSize}" )
52+     private  int  pageSize ;
53+ 
54+     @ Value ("${migration.numThreads}" )
55+     private  int  numThreads ;
56+ 
57+     @ Value ("${migration.maxCasesToProcess}" )
58+     private  int  maxCasesToProcess ;
59+ 
60+     public  void  processSingleCase (String  userToken , String  caseId , boolean  dryRun ) {
4461        CaseDetails  caseDetails ;
4562        try  {
4663            caseDetails  = coreCaseDataService .fetchOne (userToken , caseId );
@@ -49,77 +66,119 @@ public void processSingleCase(String userToken, String caseId, boolean dryrun) {
4966            return ;
5067        }
5168        if  (dataMigrationService .accepts ().test (caseDetails )) {
52-             updateCase (userToken , caseDetails .getId (), caseDetails .getData (), dryrun );
69+             updateCase (userToken , caseDetails .getId (), caseDetails .getData (), dryRun );
5370        } else  {
5471            log .info ("Case {} already migrated" , caseDetails .getId ());
5572        }
5673    }
5774
58-     public  void  processAllCases (String  userToken , String  firstDate , String  lastDate , boolean  dryrun ) {
59-         CaseDetails  oldestCaseDetails  = coreCaseDataService .fetchOldestCase (userToken );
60-         if  (oldestCaseDetails  != null ) {
61-             log .info ("The data of the oldest case is "  + oldestCaseDetails .getCreatedDate ());
75+     public  void  fetchAndProcessCases (String  userToken , boolean  dryRun ) throws  InterruptedException  {
76+ 
77+         SearchResult  initialSearch  = coreCaseDataService .searchCases (userToken ,
78+             fetchAllUnsetCaseAccessManagementFieldsCasesQuery ());
79+ 
80+         int  totalCasesToProcess  = resolveTotalCasesToProcess (initialSearch );
81+ 
82+         totalTimer .start ();
83+         //Need to fix the off by one issue. 
84+         Long  searchFrom  = handleFirstCase (userToken , dryRun , initialSearch );
85+ 
86+         totalCasesToProcess  -= 1 ;
87+ 
88+         int  casesFetched  = 0 ;
89+ 
90+         ExecutorService  executorService  = Executors .newFixedThreadPool (numThreads );
91+ 
92+         fetchAndSubmitTasks (userToken , dryRun , totalCasesToProcess , searchFrom , casesFetched , executorService );
93+ 
94+         executorService .shutdown ();
95+         executorService .awaitTermination (Integer .MAX_VALUE , TimeUnit .DAYS );
96+     }
97+ 
98+     private  int  resolveTotalCasesToProcess (SearchResult  initialSearch ) {
99+         int  totalCasesToProcess  = 0 ;
100+ 
101+         if  (maxCasesToProcess  > 0 ) {
102+             log .info ("Manual case override in use, limiting to {} cases" , maxCasesToProcess );
103+             totalCasesToProcess  = maxCasesToProcess ;
104+         } else  {
105+             log .info ("No manual case override in use, fetching all: {} cases" , initialSearch .getTotal ());
106+             totalCasesToProcess  = initialSearch .getTotal ();
62107        }
63108
64-         if  ( firstDate  !=  null  &&  lastDate  !=  null ) { 
65-              List < LocalDate >  listOfDates  =  getListOfDates ( LocalDate . parse ( firstDate ),  LocalDate . parse ( lastDate )); 
109+         return   totalCasesToProcess ; 
110+     } 
66111
67-              Optional < Stream < CaseDetails >>  caseDetailsStreamOptional  = 
68-                 coreCaseDataService . fetchAllBetweenDates ( userToken ,  listOfDates ,  parallel ); 
112+     private   void   fetchAndSubmitTasks ( String   userToken ,  boolean   dryRun ,  int   totalCasesToProcess ,  Long   searchFrom ,  int   casesFetched , 
113+                             ExecutorService   executorService ) { 
69114
70-             Stream <CaseDetails > caseDetailsStream ;
115+         while  (casesFetched  < totalCasesToProcess ) {
116+             List <CaseDetails > caseDetails  =
117+                 coreCaseDataService .fetchNCases (userToken , pageSize , searchFrom );
71118
72-             if  (caseDetailsStreamOptional .isEmpty ()) {
73-                 return ;
119+             if  (caseDetails .isEmpty ()) {
120+                 break ;
74121            }
75122
76-             caseDetailsStream  = caseDetailsStreamOptional .get ();
123+             searchFrom  = caseDetails .get ( caseDetails . size () -  1 ). getId ();
77124
78-             if  (parallel ) {
79-                 log .info ("Executing in parallel.. please wait." );
80-                 caseDetailsStream  = caseDetailsStream .parallel ();
81-             }
125+             executorService .execute (() -> caseDetails 
126+                 .forEach (caseDetail  ->
127+                     updateCase (userToken , caseDetail .getId (), caseDetail .getData (), dryRun )));
82128
83-             caseDetailsStream 
84-                 .forEach (caseDetail  -> updateCase (
85-                     userToken ,
86-                     caseDetail .getId (),
87-                     caseDetail .getData (),
88-                     dryrun )
89-                 );
129+             log .info ("New task submitted" );
130+ 
131+             casesFetched  += caseDetails .size ();
132+ 
133+             log .info ("{} cases fetched out of {}" , casesFetched , totalCasesToProcess );
90134        }
91135    }
92136
137+     private  Long  handleFirstCase (String  userToken , boolean  dryRun , SearchResult  initialSearch ) {
138+         log .info ("Processing first case..." );
139+         CaseDetails  firstCase  = initialSearch .getCases ().get (0 );
140+         updateCase (userToken , firstCase .getId (), firstCase .getData (), dryRun );
141+         return  firstCase .getId ();
142+     }
143+ 
144+     public  void  processAllCases (String  userToken , String  userId , boolean  dryRun ) {
145+         coreCaseDataService .fetchAll (userToken , userId ).stream ()
146+             .filter (dataMigrationService .accepts ())
147+             .forEach (caseDetails  -> updateCase (userToken , caseDetails .getId (), caseDetails .getData (), dryRun ));
148+     }
149+ 
93150    protected  List <LocalDate > getListOfDates (LocalDate  startDate , LocalDate  endDate ) {
94151        return  startDate 
95152            .datesUntil (endDate )
96153            .collect (Collectors .toList ());
97154    }
98155
99-     private  void  updateCase (String  authorisation , Long  id , Map <String , Object > data , boolean  dryrun ) {
156+     private  void  updateCase (String  authorisation , Long  id , Map <String , Object > data , boolean  dryRun ) {
100157
101158        totalCases ++;
102159
103-         if  (dryrun ) {
104-             return ;
105-         }
106- 
107160        try  {
108161            var  migratedData  = dataMigrationService .migrate (data );
109-             coreCaseDataService .update (
110-                 authorisation ,
111-                 id .toString (),
112-                 EVENT_ID ,
113-                 EVENT_SUMMARY ,
114-                 EVENT_DESCRIPTION ,
115-                 migratedData );
116- 
117-             log .info ("Case {} successfully updated" , id );
162+             if  (!dryRun ) {
163+                 coreCaseDataService .update (
164+                     authorisation ,
165+                     id .toString (),
166+                     EVENT_ID ,
167+                     EVENT_SUMMARY ,
168+                     EVENT_DESCRIPTION ,
169+                     migratedData );
170+                 log .info ("Case {} successfully updated" , id );
171+             }
118172            migratedCases .add (id );
119173
120174        } catch  (Exception  e ) {
121175            log .error ("Case {} update failed due to: {}" , id , e .getMessage ());
122176            failedCases .add (id );
123177        }
178+ 
179+         if  (totalCases  % 1000  == 0 ) {
180+             log .info ("--------{} cases migrated in {} minutes ({} seconds)-------" , totalCases ,
181+                 totalTimer .getTime (TimeUnit .MINUTES ), totalTimer .getTime (TimeUnit .SECONDS ));
182+         }
124183    }
125184}
0 commit comments