From c2d54b3950db658315586a38fd1ac8848a3c27a7 Mon Sep 17 00:00:00 2001 From: Chris Davidson Date: Thu, 24 Mar 2022 13:35:51 +0000 Subject: [PATCH 01/16] SSCS-10722 update to use elastic search and a daily dataset --- build.gradle | 6 +- modules/processor/build.gradle | 15 ++--- .../migration/CaseMigrationProcessor.java | 45 +++++++++++---- .../reform/migration/CaseMigrationRunner.java | 12 +++- .../migration/ccd/CoreCaseDataService.java | 56 +++++++++++++++++-- .../migration/CaseMigrationProcessorTest.java | 41 +++++++++++--- 6 files changed, 138 insertions(+), 37 deletions(-) diff --git a/build.gradle b/build.gradle index 47d96eca..7db15fc2 100644 --- a/build.gradle +++ b/build.gradle @@ -239,7 +239,7 @@ subprojects { apply plugin: 'com.jfrog.bintray' group = 'uk.gov.hmcts.reform.ccd-case-migration' - version = '3.0.0' + version = 'DEV-SNAPSHOT' sourceCompatibility = '11' targetCompatibility = '11' @@ -257,6 +257,10 @@ subprojects { mavenCentral() jcenter() + // jitpack should be last resort + // see: https://github.com/jitpack/jitpack.io/issues/1939 + maven { url 'https://jitpack.io' } + maven { url "https://dl.bintray.com/hmcts/hmcts-maven" } maven { url "https://repo.maven.apache.org/maven2" } } diff --git a/modules/processor/build.gradle b/modules/processor/build.gradle index d1fe5d70..289adde9 100644 --- a/modules/processor/build.gradle +++ b/modules/processor/build.gradle @@ -5,13 +5,14 @@ plugins { mainClassName = 'uk.gov.hmcts.reform.migration.CaseMigrationRunner' dependencies { - implementation group: 'org.springframework', name: 'spring-context-support' - implementation group: 'com.fasterxml.jackson.module', name: 'jackson-module-parameter-names' - implementation group: 'uk.gov.hmcts.reform', name: 'java-logging', version: '5.0.1' - implementation group: 'uk.gov.hmcts.reform', name: 'idam-client', version: '1.0.2' - implementation group: 'uk.gov.hmcts.reform', name: 'service-auth-provider-client', version: '3.0.0' - implementation group: 'uk.gov.hmcts.reform', name: 'core-case-data-store-client', version: '4.7.3' - implementation group: 'io.rest-assured', name: 'rest-assured', version: '3.3.0' + compile group: 'org.springframework', name: 'spring-context-support' + compile group: 'com.fasterxml.jackson.module', name: 'jackson-module-parameter-names' + compile group: 'uk.gov.hmcts.reform', name: 'java-logging', version: '5.0.1' + compile group: 'uk.gov.hmcts.reform', name: 'idam-client', version: '1.0.2' + compile group: 'uk.gov.hmcts.reform', name: 'service-auth-provider-client', version: '3.0.0' + compile group: 'com.github.hmcts', name: 'ccd-client', version: '4.8.6' + compile group: 'org.elasticsearch', name: 'elasticsearch', version: '7.16.3' + compile group: 'io.rest-assured', name: 'rest-assured', version: '3.3.0' testImplementation group: 'org.springframework.boot', name: 'spring-boot-starter-test' testImplementation group: 'com.github.tomakehurst', name: 'wiremock', version: '2.23.2' diff --git a/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/CaseMigrationProcessor.java b/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/CaseMigrationProcessor.java index bf11646c..2544e887 100644 --- a/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/CaseMigrationProcessor.java +++ b/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/CaseMigrationProcessor.java @@ -8,9 +8,11 @@ import uk.gov.hmcts.reform.migration.service.DataMigrationService; import uk.gov.hmcts.reform.migration.ccd.CoreCaseDataService; +import java.time.LocalDate; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; @Slf4j @Component @@ -28,7 +30,7 @@ public class CaseMigrationProcessor { @Getter private List failedCases = new ArrayList<>(); - public void processSingleCase(String userToken, String caseId) { + public void processSingleCase(String userToken, String caseId, boolean dryrun) { CaseDetails caseDetails; try { caseDetails = coreCaseDataService.fetchOne(userToken, caseId); @@ -37,26 +39,45 @@ public void processSingleCase(String userToken, String caseId) { return; } if (dataMigrationService.accepts().test(caseDetails)) { - updateCase(userToken, caseDetails.getId(), caseDetails.getData()); + updateCase(userToken, caseDetails.getId(), caseDetails.getData(), dryrun); } else { log.info("Case {} already migrated", caseDetails.getId()); } } - public void processAllCases(String userToken, String userId) { - coreCaseDataService.fetchAll(userToken, userId).stream() - .filter(dataMigrationService.accepts()) - .forEach(caseDetails -> updateCase(userToken, caseDetails.getId(), caseDetails.getData())); + public void processAllCases(String userToken, String userId, String firstDate, + String lastDate, boolean dryrun) { + CaseDetails oldestCaseDetails = coreCaseDataService.fetchOldestCase(userToken, userId); + if (oldestCaseDetails != null) { + log.info("The data of the oldest case is " + oldestCaseDetails.getCreatedDate()); + } + if (firstDate != null && lastDate != null) { + List dates = getListOfDates(LocalDate.parse(firstDate), LocalDate.parse(lastDate)); + for (LocalDate localDate : dates) { + coreCaseDataService.fetchAllForDay(userToken, userId, localDate.toString()).stream() + .filter(dataMigrationService.accepts()) + .forEach(caseDetails -> updateCase(userToken, caseDetails.getId(), caseDetails.getData(), dryrun)); + } + } + } + + protected List getListOfDates(LocalDate startDate, LocalDate endDate) { + return startDate.datesUntil(endDate) + .collect(Collectors.toList()); } - private void updateCase(String authorisation, Long id, Map data) { + private void updateCase(String authorisation, Long id, Map data, boolean dryrun) { log.info("Updating case {}", id); try { - log.debug("Case data: {}", data); - coreCaseDataService.update(authorisation, id.toString(), - EVENT_ID, EVENT_SUMMARY, EVENT_DESCRIPTION, dataMigrationService.migrate(data)); - log.info("Case {} successfully updated", id); - migratedCases.add(id); + if (dryrun) { + //do nothing + } else { + log.debug("Case data: {}", data); + coreCaseDataService.update(authorisation, id.toString(), + EVENT_ID, EVENT_SUMMARY, EVENT_DESCRIPTION, dataMigrationService.migrate(data)); + log.info("Case {} successfully updated", id); + migratedCases.add(id); + } } catch (Exception e) { log.error("Case {} update failed due to: {}", id, e.getMessage()); failedCases.add(id); diff --git a/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/CaseMigrationRunner.java b/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/CaseMigrationRunner.java index a59e0f75..92581ad6 100644 --- a/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/CaseMigrationRunner.java +++ b/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/CaseMigrationRunner.java @@ -20,6 +20,12 @@ public class CaseMigrationRunner implements CommandLineRunner { private String idamPassword; @Value("${migration.caseId}") private String ccdCaseId; + @Value("${migration.startDate}") + private String startDate; + @Value("${migration.endDate}") + private String endDate; + @Value("${migration.dryrun}") + private boolean dryrun; @Autowired private IdamClient idamClient; @Autowired @@ -39,10 +45,10 @@ public void run(String... args) { if (ccdCaseId != null && !ccdCaseId.isBlank()) { log.info("Data migration of single case started"); - caseMigrationProcessor.processSingleCase(userToken, ccdCaseId); + caseMigrationProcessor.processSingleCase(userToken, ccdCaseId, dryrun); } else { - log.info("Data migration of all cases started"); - caseMigrationProcessor.processAllCases(userToken, userId); + log.info("Data migration of cases between {} and {} started", startDate, endDate); + caseMigrationProcessor.processAllCases(userToken, userId, startDate, endDate, dryrun); } log.info("-----------------------------------------"); diff --git a/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/ccd/CoreCaseDataService.java b/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/ccd/CoreCaseDataService.java index d210773f..07247af9 100644 --- a/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/ccd/CoreCaseDataService.java +++ b/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/ccd/CoreCaseDataService.java @@ -1,25 +1,30 @@ package uk.gov.hmcts.reform.migration.ccd; +import lombok.extern.slf4j.Slf4j; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.search.sort.SortOrder; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import uk.gov.hmcts.reform.authorisation.generators.AuthTokenGenerator; import uk.gov.hmcts.reform.ccd.client.CoreCaseDataApi; -import uk.gov.hmcts.reform.ccd.client.model.CaseDataContent; -import uk.gov.hmcts.reform.ccd.client.model.CaseDetails; -import uk.gov.hmcts.reform.ccd.client.model.Event; -import uk.gov.hmcts.reform.ccd.client.model.PaginatedSearchMetadata; -import uk.gov.hmcts.reform.ccd.client.model.StartEventResponse; +import uk.gov.hmcts.reform.ccd.client.model.*; import uk.gov.hmcts.reform.migration.auth.AuthUtil; import uk.gov.hmcts.reform.idam.client.IdamClient; import uk.gov.hmcts.reform.idam.client.models.UserDetails; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; import java.util.stream.IntStream; +import static org.elasticsearch.index.query.QueryBuilders.matchQuery; + + +@Slf4j @Service public class CoreCaseDataService { @@ -34,6 +39,8 @@ public class CoreCaseDataService { @Autowired private CoreCaseDataApi coreCaseDataApi; + int pagesize = 50; + public CaseDetails fetchOne(String authorisation, String caseId) { return coreCaseDataApi.getCase(authorisation, authTokenGenerator.generate(), caseId); } @@ -45,6 +52,24 @@ public List fetchAll(String authorisation, String userId) { .collect(Collectors.toList()); } + public List fetchAllForDay(String authorisation, String userId, String day) { + SearchSourceBuilder searchBuilder = new SearchSourceBuilder(); + searchBuilder.size(1); + searchBuilder.query(QueryBuilders.boolQuery().must(matchQuery( + "created_date", day))); + + SearchResult searchResult = coreCaseDataApi.searchCases(authorisation, authTokenGenerator.generate(), "Benefit", searchBuilder.toString()); + int total = searchResult.getTotal(); + log.info("Total for " + day + " is " + total); + searchBuilder.from(pagesize); + int numberOfPages = total/pagesize; + + List caseDetails = IntStream.rangeClosed(0, numberOfPages).boxed() + .flatMap(pageNumber -> fetchPageEs(authorisation, userId, pageNumber, day).stream()) + .collect(Collectors.toList()); + return caseDetails; + } + private int getNumberOfPages(String authorisation, String userId, Map searchCriteria) { PaginatedSearchMetadata metadata = coreCaseDataApi.getPaginationInfoForSearchForCaseworkers(authorisation, authTokenGenerator.generate(), userId, jurisdiction, caseType, searchCriteria); @@ -58,6 +83,27 @@ private List fetchPage(String authorisation, String userId, int pag caseType, searchCriteria); } + private List fetchPageEs(String authorisation, String userId, int pageNumber, String day) { + SearchSourceBuilder searchBuilder = new SearchSourceBuilder(); + searchBuilder.size(pagesize); + searchBuilder.from(pageNumber * pagesize); + searchBuilder.query(QueryBuilders.boolQuery().must(matchQuery( + "created_date", day))); + + List caseDetails = coreCaseDataApi.searchCases(authorisation, authTokenGenerator.generate(), "Benefit", searchBuilder.toString()).getCases(); + return caseDetails; + } + + public CaseDetails fetchOldestCase(String authorisation, String userId) { + SearchSourceBuilder searchBuilder = new SearchSourceBuilder(); + searchBuilder.size(pagesize); + searchBuilder.sort("created_date", SortOrder.ASC); + searchBuilder.query(QueryBuilders.boolQuery()); + + List caseDetails = coreCaseDataApi.searchCases(authorisation, authTokenGenerator.generate(), "Benefit", searchBuilder.toString()).getCases(); + return caseDetails.get(0); + } + public CaseDetails update(String authorisation, String caseId, String eventId, String eventSummary, String eventDescription, Object data) { UserDetails userDetails = idamClient.getUserDetails(AuthUtil.getBearerToken(authorisation)); diff --git a/modules/processor/src/test/java/uk/gov/hmcts/reform/migration/CaseMigrationProcessorTest.java b/modules/processor/src/test/java/uk/gov/hmcts/reform/migration/CaseMigrationProcessorTest.java index 55e9cf4f..c9f5258c 100644 --- a/modules/processor/src/test/java/uk/gov/hmcts/reform/migration/CaseMigrationProcessorTest.java +++ b/modules/processor/src/test/java/uk/gov/hmcts/reform/migration/CaseMigrationProcessorTest.java @@ -1,5 +1,6 @@ package uk.gov.hmcts.reform.migration; +import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.InjectMocks; @@ -8,14 +9,18 @@ import uk.gov.hmcts.reform.ccd.client.model.CaseDetails; import uk.gov.hmcts.reform.migration.ccd.CoreCaseDataService; import uk.gov.hmcts.reform.migration.service.DataMigrationService; - +import java.time.LocalDate; import java.util.HashMap; +import java.util.List; import java.util.Map; -import static java.util.Arrays.asList; +import static org.codehaus.groovy.runtime.InvokerHelper.asList; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.hasSize; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -46,7 +51,7 @@ public class CaseMigrationProcessorTest { public void shouldNotProcessASingleCaseWithOutRedundantFields() { when(coreCaseDataService.fetchOne(USER_TOKEN, CASE_ID)).thenReturn(caseDetails1); when(dataMigrationService.accepts()).thenReturn(candidate -> false); - caseMigrationProcessor.processSingleCase(USER_TOKEN, CASE_ID); + caseMigrationProcessor.processSingleCase(USER_TOKEN, CASE_ID, false); verify(coreCaseDataService, times(1)).fetchOne(USER_TOKEN, CASE_ID); assertThat(caseMigrationProcessor.getFailedCases(), hasSize(0)); assertThat(caseMigrationProcessor.getMigratedCases(), hasSize(0)); @@ -56,7 +61,7 @@ public void shouldNotProcessASingleCaseWithOutRedundantFields() { public void shouldProcessASingleCaseAndMigrationIsSuccessful() { when(coreCaseDataService.fetchOne(USER_TOKEN, CASE_ID)).thenReturn(caseDetails1); when(dataMigrationService.accepts()).thenReturn(candidate -> true); - caseMigrationProcessor.processSingleCase(USER_TOKEN, CASE_ID); + caseMigrationProcessor.processSingleCase(USER_TOKEN, CASE_ID, false); verify(coreCaseDataService, times(1)).fetchOne(USER_TOKEN, CASE_ID); assertThat(caseMigrationProcessor.getFailedCases(), hasSize(0)); assertThat(caseMigrationProcessor.getMigratedCases(), contains(1111L)); @@ -68,7 +73,7 @@ public void shouldProcessASingleCaseAndMigrationIsFailed() { when(dataMigrationService.accepts()).thenReturn(candidate -> true); when(dataMigrationService.migrate(caseDetails1.getData())).thenReturn(caseDetails1.getData()); when(coreCaseDataService.update(USER_TOKEN, caseDetails1.getId().toString(), EVENT_ID, EVENT_SUMMARY, EVENT_DESCRIPTION, caseDetails1.getData())).thenThrow(new RuntimeException("Internal server error")); - caseMigrationProcessor.processSingleCase(USER_TOKEN, CASE_ID); + caseMigrationProcessor.processSingleCase(USER_TOKEN, CASE_ID, false); verify(coreCaseDataService, times(1)).fetchOne(USER_TOKEN, CASE_ID); verify(coreCaseDataService, times(1)).update(USER_TOKEN, "1111", EVENT_ID, EVENT_SUMMARY, EVENT_DESCRIPTION, caseDetails1.getData()); assertThat(caseMigrationProcessor.getFailedCases(), contains(1111L)); @@ -85,7 +90,7 @@ public void shouldProcessAllTheCandidateCases_whenOneCaseFailed() { when(dataMigrationService.migrate(caseDetails2.getData())).thenReturn(caseDetails2.getData()); when(dataMigrationService.migrate(caseDetails3.getData())).thenReturn(caseDetails3.getData()); when(coreCaseDataService.update(USER_TOKEN, caseDetails3.getId().toString(), EVENT_ID, EVENT_SUMMARY, EVENT_DESCRIPTION, caseDetails3.getData())).thenThrow(new RuntimeException("Internal server error")); - caseMigrationProcessor.processAllCases(USER_TOKEN, USER_ID); + caseMigrationProcessor.processAllCases(USER_TOKEN, USER_ID, "2022-03-01", "2022-03-02", false); assertThat(caseMigrationProcessor.getFailedCases(), contains(1113L)); assertThat(caseMigrationProcessor.getMigratedCases(), contains(1111L, 1112L)); } @@ -100,7 +105,7 @@ public void shouldProcessAllTheCandidateCases_whenTwoCasesFailed() { when(dataMigrationService.migrate(caseDetails3.getData())).thenReturn(caseDetails3.getData()); when(coreCaseDataService.update(USER_TOKEN, caseDetails2.getId().toString(), EVENT_ID, EVENT_SUMMARY, EVENT_DESCRIPTION, caseDetails2.getData())).thenThrow(new RuntimeException("Internal server error")); when(coreCaseDataService.update(USER_TOKEN, caseDetails3.getId().toString(), EVENT_ID, EVENT_SUMMARY, EVENT_DESCRIPTION, caseDetails3.getData())).thenThrow(new RuntimeException("Internal server error")); - caseMigrationProcessor.processAllCases(USER_TOKEN, USER_ID); + caseMigrationProcessor.processAllCases(USER_TOKEN, USER_ID, "2022-03-01", "2022-03-02", false); assertThat(caseMigrationProcessor.getFailedCases(), contains(1112L, 1113L)); assertThat(caseMigrationProcessor.getMigratedCases(), contains(1111L)); } @@ -110,13 +115,31 @@ public void shouldProcessNoCaseWhenNoCasesAvailable() { mockDataFetch(); when(dataMigrationService.accepts()).thenReturn(candidate -> true); - caseMigrationProcessor.processAllCases(USER_TOKEN, USER_ID); + caseMigrationProcessor.processAllCases(USER_TOKEN, USER_ID,"2022-03-01", "2022-03-30", false); assertThat(caseMigrationProcessor.getFailedCases(), hasSize(0)); assertThat(caseMigrationProcessor.getFailedCases(), hasSize(0)); } + @Test + public void testGetDates() { + String firstDate = "2021-01-01"; + String lastDate = "2022-01-01"; + List listOfDates = caseMigrationProcessor.getListOfDates(LocalDate.parse(firstDate), + LocalDate.parse(lastDate)); + assertEquals(365, listOfDates.size()); + } + + @Test + public void testGetDatesLeapYear() { + String firstDate = "2020-01-01"; + String lastDate = "2021-01-01"; + List listOfDates = caseMigrationProcessor.getListOfDates(LocalDate.parse(firstDate), + LocalDate.parse(lastDate)); + assertEquals(366, listOfDates.size()); + } + private void mockDataFetch(CaseDetails... caseDetails) { - when(coreCaseDataService.fetchAll(USER_TOKEN, USER_ID)).thenReturn(asList(caseDetails)); + when(coreCaseDataService.fetchAllForDay(eq(USER_TOKEN), eq(USER_ID), anyString())).thenReturn(asList(caseDetails)); } private void mockDataUpdate(CaseDetails caseDetails) { From e11e7b214a8ef246abb5d1e65edc0dc1a6d1dbce Mon Sep 17 00:00:00 2001 From: Waqas Arshid Date: Wed, 20 Apr 2022 15:02:44 +0100 Subject: [PATCH 02/16] SSCS-10722 - tidied code - added timer for performance metrics - added support for multi-threading via parallel streams - updated tests --- .../migration/CaseMigrationProcessor.java | 98 +++++++++++++------ .../reform/migration/CaseMigrationRunner.java | 22 +++-- .../migration/ccd/CoreCaseDataService.java | 56 ++++++----- .../migration/CaseMigrationProcessorTest.java | 40 ++++---- 4 files changed, 129 insertions(+), 87 deletions(-) diff --git a/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/CaseMigrationProcessor.java b/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/CaseMigrationProcessor.java index 2544e887..350935db 100644 --- a/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/CaseMigrationProcessor.java +++ b/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/CaseMigrationProcessor.java @@ -1,34 +1,38 @@ package uk.gov.hmcts.reform.migration; -import lombok.Getter; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; -import uk.gov.hmcts.reform.ccd.client.model.CaseDetails; -import uk.gov.hmcts.reform.migration.service.DataMigrationService; -import uk.gov.hmcts.reform.migration.ccd.CoreCaseDataService; - import java.time.LocalDate; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; +import uk.gov.hmcts.reform.ccd.client.model.CaseDetails; +import uk.gov.hmcts.reform.migration.ccd.CoreCaseDataService; +import uk.gov.hmcts.reform.migration.service.DataMigrationService; @Slf4j @Component +@RequiredArgsConstructor public class CaseMigrationProcessor { + private static final String EVENT_ID = "migrateCase"; private static final String EVENT_SUMMARY = "Migrate Case"; private static final String EVENT_DESCRIPTION = "Migrate Case"; - @Autowired - private CoreCaseDataService coreCaseDataService; - @Autowired - private DataMigrationService dataMigrationService; + private final CoreCaseDataService coreCaseDataService; + private final DataMigrationService dataMigrationService; + @Getter - private List migratedCases = new ArrayList<>(); + private final List migratedCases = new ArrayList<>(); @Getter - private List failedCases = new ArrayList<>(); + private final List failedCases = new ArrayList<>(); + + @Value("${migration.parallel:false}") + private boolean parallel; public void processSingleCase(String userToken, String caseId, boolean dryrun) { CaseDetails caseDetails; @@ -45,18 +49,19 @@ public void processSingleCase(String userToken, String caseId, boolean dryrun) { } } - public void processAllCases(String userToken, String userId, String firstDate, - String lastDate, boolean dryrun) { - CaseDetails oldestCaseDetails = coreCaseDataService.fetchOldestCase(userToken, userId); + public void processAllCases(String userToken, String firstDate, String lastDate, boolean dryrun) { + CaseDetails oldestCaseDetails = coreCaseDataService.fetchOldestCase(userToken); if (oldestCaseDetails != null) { log.info("The data of the oldest case is " + oldestCaseDetails.getCreatedDate()); } if (firstDate != null && lastDate != null) { - List dates = getListOfDates(LocalDate.parse(firstDate), LocalDate.parse(lastDate)); - for (LocalDate localDate : dates) { - coreCaseDataService.fetchAllForDay(userToken, userId, localDate.toString()).stream() - .filter(dataMigrationService.accepts()) - .forEach(caseDetails -> updateCase(userToken, caseDetails.getId(), caseDetails.getData(), dryrun)); + List listOfDates = getListOfDates(LocalDate.parse(firstDate), LocalDate.parse(lastDate)); + if (parallel) { +// log.info("Executing in parallel.. please wait."); +// listOfDates.parallelStream().forEach(date -> migrateCases(date, userToken, dryrun)); + migrateCasesParallel(listOfDates, userToken, dryrun); + } else { + listOfDates.forEach(date -> migrateCases(date, userToken, dryrun)); } } } @@ -66,18 +71,49 @@ protected List getListOfDates(LocalDate startDate, LocalDate endDate) .collect(Collectors.toList()); } + protected void migrateCasesParallel(List dates, String userToken, boolean dryrun) { + log.info("Executing in parallel.. please wait."); + dates.parallelStream() + .forEach(date -> + coreCaseDataService.fetchAllForDay(userToken, date.toString()) + .stream() + .filter(dataMigrationService.accepts()) + .parallel() + .forEach(caseDetails -> updateCase( + userToken, + caseDetails.getId(), + caseDetails.getData(), + dryrun) + ) + ); + } + + protected void migrateCases(LocalDate date, String userToken, boolean dryrun) { + coreCaseDataService.fetchAllForDay(userToken, date.toString()) + .stream() + .filter(dataMigrationService.accepts()) + .forEach(caseDetails -> updateCase( + userToken, + caseDetails.getId(), + caseDetails.getData(), + dryrun) + ); + } + private void updateCase(String authorisation, Long id, Map data, boolean dryrun) { log.info("Updating case {}", id); + + if (dryrun) { + return; + } + try { - if (dryrun) { - //do nothing - } else { - log.debug("Case data: {}", data); - coreCaseDataService.update(authorisation, id.toString(), - EVENT_ID, EVENT_SUMMARY, EVENT_DESCRIPTION, dataMigrationService.migrate(data)); - log.info("Case {} successfully updated", id); - migratedCases.add(id); - } + log.debug("Case data: {}", data); + var migratedData = dataMigrationService.migrate(data); + coreCaseDataService.update(authorisation, id.toString(), + EVENT_ID, EVENT_SUMMARY, EVENT_DESCRIPTION, migratedData); + log.info("Case {} successfully updated", id); + migratedCases.add(id); } catch (Exception e) { log.error("Case {} update failed due to: {}", id, e.getMessage()); failedCases.add(id); diff --git a/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/CaseMigrationRunner.java b/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/CaseMigrationRunner.java index 92581ad6..1b883e2b 100644 --- a/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/CaseMigrationRunner.java +++ b/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/CaseMigrationRunner.java @@ -1,17 +1,18 @@ package uk.gov.hmcts.reform.migration; +import java.util.concurrent.TimeUnit; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; +import org.apache.commons.lang3.time.StopWatch; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.context.annotation.PropertySource; import uk.gov.hmcts.reform.idam.client.IdamClient; @Slf4j @SpringBootApplication -@PropertySource("classpath:application.properties") +@RequiredArgsConstructor public class CaseMigrationRunner implements CommandLineRunner { @Value("${migration.idam.username}") @@ -26,10 +27,9 @@ public class CaseMigrationRunner implements CommandLineRunner { private String endDate; @Value("${migration.dryrun}") private boolean dryrun; - @Autowired - private IdamClient idamClient; - @Autowired - private CaseMigrationProcessor caseMigrationProcessor; + + private final IdamClient idamClient; + private final CaseMigrationProcessor caseMigrationProcessor; public static void main(String[] args) { SpringApplication.run(CaseMigrationRunner.class, args); @@ -43,16 +43,20 @@ public void run(String... args) { String userId = idamClient.getUserDetails(userToken).getId(); log.debug("User ID: {}", userId); + StopWatch stopWatch = StopWatch.createStarted(); + if (ccdCaseId != null && !ccdCaseId.isBlank()) { log.info("Data migration of single case started"); caseMigrationProcessor.processSingleCase(userToken, ccdCaseId, dryrun); } else { log.info("Data migration of cases between {} and {} started", startDate, endDate); - caseMigrationProcessor.processAllCases(userToken, userId, startDate, endDate, dryrun); + caseMigrationProcessor.processAllCases(userToken, startDate, endDate, dryrun); } + stopWatch.stop(); + log.info("-----------------------------------------"); - log.info("Data migration completed"); + log.info("Data migration completed in: {} minutes.", stopWatch.getTime(TimeUnit.MINUTES)); log.info("-----------------------------------------"); log.info("Total number of processed cases: {}", caseMigrationProcessor.getMigratedCases().size() + caseMigrationProcessor.getFailedCases().size()); log.info("Total number of migrations performed: {}", caseMigrationProcessor.getMigratedCases().size()); diff --git a/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/ccd/CoreCaseDataService.java b/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/ccd/CoreCaseDataService.java index 07247af9..418dc1a5 100644 --- a/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/ccd/CoreCaseDataService.java +++ b/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/ccd/CoreCaseDataService.java @@ -1,43 +1,43 @@ package uk.gov.hmcts.reform.migration.ccd; +import static org.elasticsearch.index.query.QueryBuilders.matchQuery; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.sort.SortOrder; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import uk.gov.hmcts.reform.authorisation.generators.AuthTokenGenerator; import uk.gov.hmcts.reform.ccd.client.CoreCaseDataApi; -import uk.gov.hmcts.reform.ccd.client.model.*; -import uk.gov.hmcts.reform.migration.auth.AuthUtil; +import uk.gov.hmcts.reform.ccd.client.model.CaseDataContent; +import uk.gov.hmcts.reform.ccd.client.model.CaseDetails; +import uk.gov.hmcts.reform.ccd.client.model.Event; +import uk.gov.hmcts.reform.ccd.client.model.PaginatedSearchMetadata; +import uk.gov.hmcts.reform.ccd.client.model.SearchResult; +import uk.gov.hmcts.reform.ccd.client.model.StartEventResponse; import uk.gov.hmcts.reform.idam.client.IdamClient; import uk.gov.hmcts.reform.idam.client.models.UserDetails; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; -import java.util.stream.IntStream; - -import static org.elasticsearch.index.query.QueryBuilders.matchQuery; - +import uk.gov.hmcts.reform.migration.auth.AuthUtil; @Slf4j @Service +@RequiredArgsConstructor public class CoreCaseDataService { private String jurisdiction; @Value("${migration.caseType}") private String caseType; - @Autowired - private IdamClient idamClient; - @Autowired - private AuthTokenGenerator authTokenGenerator; - @Autowired - private CoreCaseDataApi coreCaseDataApi; + private final IdamClient idamClient; + private final AuthTokenGenerator authTokenGenerator; + private final CoreCaseDataApi coreCaseDataApi; int pagesize = 50; @@ -52,7 +52,7 @@ public List fetchAll(String authorisation, String userId) { .collect(Collectors.toList()); } - public List fetchAllForDay(String authorisation, String userId, String day) { + public List fetchAllForDay(String authorisation, String day) { SearchSourceBuilder searchBuilder = new SearchSourceBuilder(); searchBuilder.size(1); searchBuilder.query(QueryBuilders.boolQuery().must(matchQuery( @@ -64,10 +64,9 @@ public List fetchAllForDay(String authorisation, String userId, Str searchBuilder.from(pagesize); int numberOfPages = total/pagesize; - List caseDetails = IntStream.rangeClosed(0, numberOfPages).boxed() - .flatMap(pageNumber -> fetchPageEs(authorisation, userId, pageNumber, day).stream()) + return IntStream.rangeClosed(0, numberOfPages).boxed() + .flatMap(pageNumber -> fetchPageEs(authorisation, pageNumber, day).stream()) .collect(Collectors.toList()); - return caseDetails; } private int getNumberOfPages(String authorisation, String userId, Map searchCriteria) { @@ -83,18 +82,21 @@ private List fetchPage(String authorisation, String userId, int pag caseType, searchCriteria); } - private List fetchPageEs(String authorisation, String userId, int pageNumber, String day) { + private List fetchPageEs(String authorisation, int pageNumber, String day) { SearchSourceBuilder searchBuilder = new SearchSourceBuilder(); searchBuilder.size(pagesize); searchBuilder.from(pageNumber * pagesize); searchBuilder.query(QueryBuilders.boolQuery().must(matchQuery( "created_date", day))); - List caseDetails = coreCaseDataApi.searchCases(authorisation, authTokenGenerator.generate(), "Benefit", searchBuilder.toString()).getCases(); - return caseDetails; + return coreCaseDataApi.searchCases( + authorisation, + authTokenGenerator.generate(), + "Benefit", + searchBuilder.toString()).getCases(); } - public CaseDetails fetchOldestCase(String authorisation, String userId) { + public CaseDetails fetchOldestCase(String authorisation) { SearchSourceBuilder searchBuilder = new SearchSourceBuilder(); searchBuilder.size(pagesize); searchBuilder.sort("created_date", SortOrder.ASC); diff --git a/modules/processor/src/test/java/uk/gov/hmcts/reform/migration/CaseMigrationProcessorTest.java b/modules/processor/src/test/java/uk/gov/hmcts/reform/migration/CaseMigrationProcessorTest.java index c9f5258c..f7d5a818 100644 --- a/modules/processor/src/test/java/uk/gov/hmcts/reform/migration/CaseMigrationProcessorTest.java +++ b/modules/processor/src/test/java/uk/gov/hmcts/reform/migration/CaseMigrationProcessorTest.java @@ -1,21 +1,8 @@ package uk.gov.hmcts.reform.migration; -import org.junit.Ignore; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.InjectMocks; -import org.mockito.Mock; -import org.mockito.junit.MockitoJUnitRunner; -import uk.gov.hmcts.reform.ccd.client.model.CaseDetails; -import uk.gov.hmcts.reform.migration.ccd.CoreCaseDataService; -import uk.gov.hmcts.reform.migration.service.DataMigrationService; -import java.time.LocalDate; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - import static org.codehaus.groovy.runtime.InvokerHelper.asList; import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.hasSize; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; @@ -25,6 +12,19 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.time.LocalDate; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; +import uk.gov.hmcts.reform.ccd.client.model.CaseDetails; +import uk.gov.hmcts.reform.migration.ccd.CoreCaseDataService; +import uk.gov.hmcts.reform.migration.service.DataMigrationService; + @RunWith(MockitoJUnitRunner.class) public class CaseMigrationProcessorTest { @@ -90,9 +90,9 @@ public void shouldProcessAllTheCandidateCases_whenOneCaseFailed() { when(dataMigrationService.migrate(caseDetails2.getData())).thenReturn(caseDetails2.getData()); when(dataMigrationService.migrate(caseDetails3.getData())).thenReturn(caseDetails3.getData()); when(coreCaseDataService.update(USER_TOKEN, caseDetails3.getId().toString(), EVENT_ID, EVENT_SUMMARY, EVENT_DESCRIPTION, caseDetails3.getData())).thenThrow(new RuntimeException("Internal server error")); - caseMigrationProcessor.processAllCases(USER_TOKEN, USER_ID, "2022-03-01", "2022-03-02", false); + caseMigrationProcessor.processAllCases(USER_TOKEN, "2022-03-01", "2022-03-02", false); assertThat(caseMigrationProcessor.getFailedCases(), contains(1113L)); - assertThat(caseMigrationProcessor.getMigratedCases(), contains(1111L, 1112L)); + assertThat(caseMigrationProcessor.getMigratedCases(), containsInAnyOrder(1111L, 1112L)); } @Test @@ -105,8 +105,8 @@ public void shouldProcessAllTheCandidateCases_whenTwoCasesFailed() { when(dataMigrationService.migrate(caseDetails3.getData())).thenReturn(caseDetails3.getData()); when(coreCaseDataService.update(USER_TOKEN, caseDetails2.getId().toString(), EVENT_ID, EVENT_SUMMARY, EVENT_DESCRIPTION, caseDetails2.getData())).thenThrow(new RuntimeException("Internal server error")); when(coreCaseDataService.update(USER_TOKEN, caseDetails3.getId().toString(), EVENT_ID, EVENT_SUMMARY, EVENT_DESCRIPTION, caseDetails3.getData())).thenThrow(new RuntimeException("Internal server error")); - caseMigrationProcessor.processAllCases(USER_TOKEN, USER_ID, "2022-03-01", "2022-03-02", false); - assertThat(caseMigrationProcessor.getFailedCases(), contains(1112L, 1113L)); + caseMigrationProcessor.processAllCases(USER_TOKEN, "2022-03-01", "2022-03-02", false); + assertThat(caseMigrationProcessor.getFailedCases(), containsInAnyOrder(1112L, 1113L)); assertThat(caseMigrationProcessor.getMigratedCases(), contains(1111L)); } @@ -115,7 +115,7 @@ public void shouldProcessNoCaseWhenNoCasesAvailable() { mockDataFetch(); when(dataMigrationService.accepts()).thenReturn(candidate -> true); - caseMigrationProcessor.processAllCases(USER_TOKEN, USER_ID,"2022-03-01", "2022-03-30", false); + caseMigrationProcessor.processAllCases(USER_TOKEN,"2022-03-01", "2022-03-30", false); assertThat(caseMigrationProcessor.getFailedCases(), hasSize(0)); assertThat(caseMigrationProcessor.getFailedCases(), hasSize(0)); } @@ -139,7 +139,7 @@ public void testGetDatesLeapYear() { } private void mockDataFetch(CaseDetails... caseDetails) { - when(coreCaseDataService.fetchAllForDay(eq(USER_TOKEN), eq(USER_ID), anyString())).thenReturn(asList(caseDetails)); + when(coreCaseDataService.fetchAllForDay(eq(USER_TOKEN), anyString())).thenReturn(asList(caseDetails)); } private void mockDataUpdate(CaseDetails caseDetails) { From 4f2e62635d8a1c8c8fea8635ab4408df6b2738a7 Mon Sep 17 00:00:00 2001 From: Waqas Arshid Date: Wed, 20 Apr 2022 21:58:12 +0100 Subject: [PATCH 03/16] SSCS-10722: - improved date logic - runner text formatting changes --- .../migration/CaseMigrationProcessor.java | 11 ++++++++-- .../reform/migration/CaseMigrationRunner.java | 5 +++-- .../migration/CaseMigrationProcessorTest.java | 22 +++++++++++++++---- 3 files changed, 30 insertions(+), 8 deletions(-) diff --git a/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/CaseMigrationProcessor.java b/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/CaseMigrationProcessor.java index 350935db..f215cfa0 100644 --- a/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/CaseMigrationProcessor.java +++ b/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/CaseMigrationProcessor.java @@ -1,5 +1,7 @@ package uk.gov.hmcts.reform.migration; +import static java.util.Collections.singletonList; + import java.time.LocalDate; import java.util.ArrayList; import java.util.List; @@ -67,8 +69,13 @@ public void processAllCases(String userToken, String firstDate, String lastDate, } protected List getListOfDates(LocalDate startDate, LocalDate endDate) { - return startDate.datesUntil(endDate) - .collect(Collectors.toList()); + if (startDate.isEqual(endDate)) { + return singletonList(startDate); + } + + return startDate + .datesUntil(endDate) + .collect(Collectors.toList()); } protected void migrateCasesParallel(List dates, String userToken, boolean dryrun) { diff --git a/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/CaseMigrationRunner.java b/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/CaseMigrationRunner.java index 1b883e2b..3c100bc4 100644 --- a/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/CaseMigrationRunner.java +++ b/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/CaseMigrationRunner.java @@ -55,14 +55,15 @@ public void run(String... args) { stopWatch.stop(); - log.info("-----------------------------------------"); - log.info("Data migration completed in: {} minutes.", stopWatch.getTime(TimeUnit.MINUTES)); log.info("-----------------------------------------"); log.info("Total number of processed cases: {}", caseMigrationProcessor.getMigratedCases().size() + caseMigrationProcessor.getFailedCases().size()); log.info("Total number of migrations performed: {}", caseMigrationProcessor.getMigratedCases().size()); log.info("-----------------------------------------"); log.info("Migrated cases: {} ", !caseMigrationProcessor.getMigratedCases().isEmpty() ? caseMigrationProcessor.getMigratedCases() : "NONE"); log.info("Failed cases: {}", !caseMigrationProcessor.getFailedCases().isEmpty() ? caseMigrationProcessor.getFailedCases() : "NONE"); + log.info("-----------------------------------------"); + log.info("Data migration completed in: {} minutes.", stopWatch.getTime(TimeUnit.MINUTES)); + log.info("-----------------------------------------"); } catch (Throwable e) { log.error("Migration failed with the following reason: {}", e.getMessage(), e); } diff --git a/modules/processor/src/test/java/uk/gov/hmcts/reform/migration/CaseMigrationProcessorTest.java b/modules/processor/src/test/java/uk/gov/hmcts/reform/migration/CaseMigrationProcessorTest.java index f7d5a818..cd67efeb 100644 --- a/modules/processor/src/test/java/uk/gov/hmcts/reform/migration/CaseMigrationProcessorTest.java +++ b/modules/processor/src/test/java/uk/gov/hmcts/reform/migration/CaseMigrationProcessorTest.java @@ -121,20 +121,34 @@ public void shouldProcessNoCaseWhenNoCasesAvailable() { } @Test - public void testGetDates() { + public void shouldContainSingleDate() { + String date = "2021-01-01"; + + List listOfDates = caseMigrationProcessor.getListOfDates(LocalDate.parse(date), + LocalDate.parse(date)); + + assertEquals(1, listOfDates.size()); + } + + @Test + public void shouldContainNormalYearOfDates() { String firstDate = "2021-01-01"; String lastDate = "2022-01-01"; - List listOfDates = caseMigrationProcessor.getListOfDates(LocalDate.parse(firstDate), + + List listOfDates = caseMigrationProcessor.getListOfDates(LocalDate.parse(firstDate), LocalDate.parse(lastDate)); + assertEquals(365, listOfDates.size()); } @Test - public void testGetDatesLeapYear() { + public void shouldContainLeapYearOfDates() { String firstDate = "2020-01-01"; String lastDate = "2021-01-01"; - List listOfDates = caseMigrationProcessor.getListOfDates(LocalDate.parse(firstDate), + + List listOfDates = caseMigrationProcessor.getListOfDates(LocalDate.parse(firstDate), LocalDate.parse(lastDate)); + assertEquals(366, listOfDates.size()); } From a32c330b339e0e46f33f0bce7af2d1164f258d00 Mon Sep 17 00:00:00 2001 From: Waqas Arshid Date: Wed, 20 Apr 2022 22:47:47 +0100 Subject: [PATCH 04/16] SSCS-10722: - improved case fetching logic - streamlined logic for using parallel stream --- .../migration/CaseMigrationProcessor.java | 69 ++++++++----------- .../migration/ccd/CoreCaseDataService.java | 39 +++++++++-- .../migration/CaseMigrationProcessorTest.java | 18 ++++- 3 files changed, 77 insertions(+), 49 deletions(-) diff --git a/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/CaseMigrationProcessor.java b/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/CaseMigrationProcessor.java index f215cfa0..ccca57d9 100644 --- a/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/CaseMigrationProcessor.java +++ b/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/CaseMigrationProcessor.java @@ -7,6 +7,7 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import java.util.stream.Stream; import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -56,71 +57,57 @@ public void processAllCases(String userToken, String firstDate, String lastDate, if (oldestCaseDetails != null) { log.info("The data of the oldest case is " + oldestCaseDetails.getCreatedDate()); } + if (firstDate != null && lastDate != null) { List listOfDates = getListOfDates(LocalDate.parse(firstDate), LocalDate.parse(lastDate)); + + Stream caseDetailsStream = + coreCaseDataService.fetchAllBetweenDates(userToken, listOfDates, parallel) + .stream() + .filter(dataMigrationService.accepts()); + if (parallel) { -// log.info("Executing in parallel.. please wait."); -// listOfDates.parallelStream().forEach(date -> migrateCases(date, userToken, dryrun)); - migrateCasesParallel(listOfDates, userToken, dryrun); - } else { - listOfDates.forEach(date -> migrateCases(date, userToken, dryrun)); + log.info("Executing in parallel.. please wait."); + caseDetailsStream = caseDetailsStream.parallel(); } + + caseDetailsStream + .forEach(caseDetail -> updateCase( + userToken, + caseDetail.getId(), + caseDetail.getData(), + dryrun) + ); } } protected List getListOfDates(LocalDate startDate, LocalDate endDate) { - if (startDate.isEqual(endDate)) { - return singletonList(startDate); - } - return startDate .datesUntil(endDate) .collect(Collectors.toList()); } - protected void migrateCasesParallel(List dates, String userToken, boolean dryrun) { - log.info("Executing in parallel.. please wait."); - dates.parallelStream() - .forEach(date -> - coreCaseDataService.fetchAllForDay(userToken, date.toString()) - .stream() - .filter(dataMigrationService.accepts()) - .parallel() - .forEach(caseDetails -> updateCase( - userToken, - caseDetails.getId(), - caseDetails.getData(), - dryrun) - ) - ); - } - - protected void migrateCases(LocalDate date, String userToken, boolean dryrun) { - coreCaseDataService.fetchAllForDay(userToken, date.toString()) - .stream() - .filter(dataMigrationService.accepts()) - .forEach(caseDetails -> updateCase( - userToken, - caseDetails.getId(), - caseDetails.getData(), - dryrun) - ); - } - private void updateCase(String authorisation, Long id, Map data, boolean dryrun) { - log.info("Updating case {}", id); if (dryrun) { + log.info("Updating case: {}", id); return; } try { log.debug("Case data: {}", data); var migratedData = dataMigrationService.migrate(data); - coreCaseDataService.update(authorisation, id.toString(), - EVENT_ID, EVENT_SUMMARY, EVENT_DESCRIPTION, migratedData); + coreCaseDataService.update( + authorisation, + id.toString(), + EVENT_ID, + EVENT_SUMMARY, + EVENT_DESCRIPTION, + migratedData); + log.info("Case {} successfully updated", id); migratedCases.add(id); + } catch (Exception e) { log.error("Case {} update failed due to: {}", id, e.getMessage()); failedCases.add(id); diff --git a/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/ccd/CoreCaseDataService.java b/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/ccd/CoreCaseDataService.java index 418dc1a5..c9833f90 100644 --- a/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/ccd/CoreCaseDataService.java +++ b/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/ccd/CoreCaseDataService.java @@ -1,12 +1,17 @@ package uk.gov.hmcts.reform.migration.ccd; +import static java.util.Collections.emptyList; import static org.elasticsearch.index.query.QueryBuilders.matchQuery; +import feign.FeignException; +import java.time.LocalDate; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; import java.util.stream.IntStream; +import java.util.stream.Stream; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.elasticsearch.index.query.QueryBuilders; @@ -52,6 +57,19 @@ public List fetchAll(String authorisation, String userId) { .collect(Collectors.toList()); } + public List fetchAllBetweenDates(String authorisation, + List listOfDates, + boolean parallel) { + Stream processingStream = parallel + ? listOfDates.parallelStream() + : listOfDates.stream(); + + return processingStream + .map(date -> this.fetchAllForDay(authorisation, date.toString())) + .flatMap(Collection::stream) + .collect(Collectors.toList()); + } + public List fetchAllForDay(String authorisation, String day) { SearchSourceBuilder searchBuilder = new SearchSourceBuilder(); searchBuilder.size(1); @@ -89,11 +107,22 @@ private List fetchPageEs(String authorisation, int pageNumber, Stri searchBuilder.query(QueryBuilders.boolQuery().must(matchQuery( "created_date", day))); - return coreCaseDataApi.searchCases( - authorisation, - authTokenGenerator.generate(), - "Benefit", - searchBuilder.toString()).getCases(); + List caseDetails = emptyList(); + + try { + caseDetails = coreCaseDataApi.searchCases( + authorisation, + authTokenGenerator.generate(), + "Benefit", + searchBuilder.toString()) + .getCases(); + + } catch (FeignException fe) { + log.error("Feign Exception message: {} with search string: {}", + fe.contentUTF8(), searchBuilder); + } + + return caseDetails; } public CaseDetails fetchOldestCase(String authorisation) { diff --git a/modules/processor/src/test/java/uk/gov/hmcts/reform/migration/CaseMigrationProcessorTest.java b/modules/processor/src/test/java/uk/gov/hmcts/reform/migration/CaseMigrationProcessorTest.java index cd67efeb..33e81c5a 100644 --- a/modules/processor/src/test/java/uk/gov/hmcts/reform/migration/CaseMigrationProcessorTest.java +++ b/modules/processor/src/test/java/uk/gov/hmcts/reform/migration/CaseMigrationProcessorTest.java @@ -122,14 +122,26 @@ public void shouldProcessNoCaseWhenNoCasesAvailable() { @Test public void shouldContainSingleDate() { - String date = "2021-01-01"; + String firstDate = "2021-01-01"; + String lastDate = "2021-01-02"; - List listOfDates = caseMigrationProcessor.getListOfDates(LocalDate.parse(date), - LocalDate.parse(date)); + List listOfDates = caseMigrationProcessor.getListOfDates(LocalDate.parse(firstDate), + LocalDate.parse(lastDate)); assertEquals(1, listOfDates.size()); } + @Test + public void shouldContainTwoDates() { + String firstDate = "2021-01-01"; + String lastDate = "2021-01-03"; + + List listOfDates = caseMigrationProcessor.getListOfDates(LocalDate.parse(firstDate), + LocalDate.parse(lastDate)); + + assertEquals(2, listOfDates.size()); + } + @Test public void shouldContainNormalYearOfDates() { String firstDate = "2021-01-01"; From 91d92d14216043a00d146bed8416efd9165f9edc Mon Sep 17 00:00:00 2001 From: Waqas Arshid Date: Thu, 21 Apr 2022 14:31:34 +0100 Subject: [PATCH 05/16] SSCS-10722: improved pagination logic and added parallel processing capability --- .../migration/ccd/CoreCaseDataService.java | 43 +++++++------------ .../migration/CaseMigrationProcessorTest.java | 9 ++-- 2 files changed, 20 insertions(+), 32 deletions(-) diff --git a/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/ccd/CoreCaseDataService.java b/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/ccd/CoreCaseDataService.java index c9833f90..b2dd72d2 100644 --- a/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/ccd/CoreCaseDataService.java +++ b/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/ccd/CoreCaseDataService.java @@ -6,9 +6,7 @@ import feign.FeignException; import java.time.LocalDate; import java.util.Collection; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; @@ -24,7 +22,6 @@ import uk.gov.hmcts.reform.ccd.client.model.CaseDataContent; import uk.gov.hmcts.reform.ccd.client.model.CaseDetails; import uk.gov.hmcts.reform.ccd.client.model.Event; -import uk.gov.hmcts.reform.ccd.client.model.PaginatedSearchMetadata; import uk.gov.hmcts.reform.ccd.client.model.SearchResult; import uk.gov.hmcts.reform.ccd.client.model.StartEventResponse; import uk.gov.hmcts.reform.idam.client.IdamClient; @@ -50,13 +47,6 @@ public CaseDetails fetchOne(String authorisation, String caseId) { return coreCaseDataApi.getCase(authorisation, authTokenGenerator.generate(), caseId); } - public List fetchAll(String authorisation, String userId) { - int numberOfPages = getNumberOfPages(authorisation, userId, new HashMap<>()); - return IntStream.rangeClosed(1, numberOfPages).boxed() - .flatMap(pageNumber -> fetchPage(authorisation, userId, pageNumber).stream()) - .collect(Collectors.toList()); - } - public List fetchAllBetweenDates(String authorisation, List listOfDates, boolean parallel) { @@ -65,12 +55,11 @@ public List fetchAllBetweenDates(String authorisation, : listOfDates.stream(); return processingStream - .map(date -> this.fetchAllForDay(authorisation, date.toString())) - .flatMap(Collection::stream) + .flatMap(date -> fetchAllForDay(authorisation, date.toString(), parallel).stream()) .collect(Collectors.toList()); } - public List fetchAllForDay(String authorisation, String day) { + public List fetchAllForDay(String authorisation, String day, boolean parallel) { SearchSourceBuilder searchBuilder = new SearchSourceBuilder(); searchBuilder.size(1); searchBuilder.query(QueryBuilders.boolQuery().must(matchQuery( @@ -82,25 +71,21 @@ public List fetchAllForDay(String authorisation, String day) { searchBuilder.from(pagesize); int numberOfPages = total/pagesize; - return IntStream.rangeClosed(0, numberOfPages).boxed() - .flatMap(pageNumber -> fetchPageEs(authorisation, pageNumber, day).stream()) - .collect(Collectors.toList()); - } + Stream pageStream = IntStream + .rangeClosed(0, numberOfPages - 1) + .boxed(); - private int getNumberOfPages(String authorisation, String userId, Map searchCriteria) { - PaginatedSearchMetadata metadata = coreCaseDataApi.getPaginationInfoForSearchForCaseworkers(authorisation, - authTokenGenerator.generate(), userId, jurisdiction, caseType, searchCriteria); - return metadata.getTotalPagesCount(); - } + if (parallel) { + log.info("Retrieving pages in parallel.. please wait."); + pageStream = pageStream.parallel(); + } - private List fetchPage(String authorisation, String userId, int pageNumber) { - Map searchCriteria = new HashMap<>(); - searchCriteria.put("page", String.valueOf(pageNumber)); - return coreCaseDataApi.searchForCaseworker(authorisation, authTokenGenerator.generate(), userId, jurisdiction, - caseType, searchCriteria); + return pageStream + .flatMap(pageNumber -> fetchPage(authorisation, pageNumber, day).stream()) + .collect(Collectors.toList()); } - private List fetchPageEs(String authorisation, int pageNumber, String day) { + private List fetchPage(String authorisation, int pageNumber, String day) { SearchSourceBuilder searchBuilder = new SearchSourceBuilder(); searchBuilder.size(pagesize); searchBuilder.from(pageNumber * pagesize); @@ -109,6 +94,8 @@ private List fetchPageEs(String authorisation, int pageNumber, Stri List caseDetails = emptyList(); + log.info("Fetching page no. {} for day: {}", pageNumber + 1, day); + try { caseDetails = coreCaseDataApi.searchCases( authorisation, diff --git a/modules/processor/src/test/java/uk/gov/hmcts/reform/migration/CaseMigrationProcessorTest.java b/modules/processor/src/test/java/uk/gov/hmcts/reform/migration/CaseMigrationProcessorTest.java index 33e81c5a..16b937b4 100644 --- a/modules/processor/src/test/java/uk/gov/hmcts/reform/migration/CaseMigrationProcessorTest.java +++ b/modules/processor/src/test/java/uk/gov/hmcts/reform/migration/CaseMigrationProcessorTest.java @@ -30,7 +30,6 @@ public class CaseMigrationProcessorTest { private static final String USER_TOKEN = "Bearer eeeejjjttt"; - private static final String USER_ID = "30"; private static final String CASE_ID = "11111"; private static final String EVENT_ID = "migrateCase"; private static final String EVENT_SUMMARY = "Migrate Case"; @@ -40,13 +39,15 @@ public class CaseMigrationProcessorTest { private final CaseDetails caseDetails2 = createCaseDetails(1112L, "case-2"); private final CaseDetails caseDetails3 = createCaseDetails(1113L, "case-3"); - @InjectMocks - private CaseMigrationProcessor caseMigrationProcessor; @Mock private CoreCaseDataService coreCaseDataService; + @Mock private DataMigrationService dataMigrationService; + @InjectMocks + private CaseMigrationProcessor caseMigrationProcessor; + @Test public void shouldNotProcessASingleCaseWithOutRedundantFields() { when(coreCaseDataService.fetchOne(USER_TOKEN, CASE_ID)).thenReturn(caseDetails1); @@ -165,7 +166,7 @@ public void shouldContainLeapYearOfDates() { } private void mockDataFetch(CaseDetails... caseDetails) { - when(coreCaseDataService.fetchAllForDay(eq(USER_TOKEN), anyString())).thenReturn(asList(caseDetails)); + when(coreCaseDataService.fetchAllForDay(eq(USER_TOKEN), anyString(), false)).thenReturn(asList(caseDetails)); } private void mockDataUpdate(CaseDetails caseDetails) { From f40d16b795d23564b52b24885003315741c45736 Mon Sep 17 00:00:00 2001 From: Waqas Arshid Date: Thu, 21 Apr 2022 14:58:12 +0100 Subject: [PATCH 06/16] SSCS-10722: counting total cases and improved logging --- .../gov/hmcts/reform/migration/CaseMigrationProcessor.java | 7 ++++++- .../uk/gov/hmcts/reform/migration/CaseMigrationRunner.java | 1 + 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/CaseMigrationProcessor.java b/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/CaseMigrationProcessor.java index ccca57d9..a5acc321 100644 --- a/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/CaseMigrationProcessor.java +++ b/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/CaseMigrationProcessor.java @@ -31,9 +31,13 @@ public class CaseMigrationProcessor { @Getter private final List migratedCases = new ArrayList<>(); + @Getter private final List failedCases = new ArrayList<>(); + @Getter + private Long totalCases = 0L; + @Value("${migration.parallel:false}") private boolean parallel; @@ -89,8 +93,9 @@ protected List getListOfDates(LocalDate startDate, LocalDate endDate) private void updateCase(String authorisation, Long id, Map data, boolean dryrun) { + totalCases++; + if (dryrun) { - log.info("Updating case: {}", id); return; } diff --git a/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/CaseMigrationRunner.java b/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/CaseMigrationRunner.java index 3c100bc4..851fdd43 100644 --- a/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/CaseMigrationRunner.java +++ b/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/CaseMigrationRunner.java @@ -56,6 +56,7 @@ public void run(String... args) { stopWatch.stop(); log.info("-----------------------------------------"); + log.info("Total number of cases: {}", caseMigrationProcessor.getTotalCases()); log.info("Total number of processed cases: {}", caseMigrationProcessor.getMigratedCases().size() + caseMigrationProcessor.getFailedCases().size()); log.info("Total number of migrations performed: {}", caseMigrationProcessor.getMigratedCases().size()); log.info("-----------------------------------------"); From 7d02a03043650231f7c35dd93711ad90a97aa49f Mon Sep 17 00:00:00 2001 From: Waqas Arshid Date: Thu, 21 Apr 2022 19:21:21 +0100 Subject: [PATCH 07/16] SSCS-10722: fixed logic for processing parallel streams --- .../migration/CaseMigrationProcessor.java | 18 ++- .../reform/migration/CaseMigrationRunner.java | 3 - .../migration/ccd/CoreCaseDataService.java | 129 ++++++++++-------- .../migration/CaseMigrationProcessorTest.java | 19 +-- .../ccd/CoreCaseDataServiceTest.java | 11 +- 5 files changed, 101 insertions(+), 79 deletions(-) diff --git a/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/CaseMigrationProcessor.java b/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/CaseMigrationProcessor.java index a5acc321..1d16a599 100644 --- a/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/CaseMigrationProcessor.java +++ b/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/CaseMigrationProcessor.java @@ -1,11 +1,10 @@ package uk.gov.hmcts.reform.migration; -import static java.util.Collections.singletonList; - import java.time.LocalDate; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.stream.Collectors; import java.util.stream.Stream; import lombok.Getter; @@ -65,10 +64,16 @@ public void processAllCases(String userToken, String firstDate, String lastDate, if (firstDate != null && lastDate != null) { List listOfDates = getListOfDates(LocalDate.parse(firstDate), LocalDate.parse(lastDate)); - Stream caseDetailsStream = - coreCaseDataService.fetchAllBetweenDates(userToken, listOfDates, parallel) - .stream() - .filter(dataMigrationService.accepts()); + Optional> caseDetailsStreamOptional = + coreCaseDataService.fetchAllBetweenDates(userToken, listOfDates, parallel); + + Stream caseDetailsStream; + + if (caseDetailsStreamOptional.isEmpty()) { + return; + } + + caseDetailsStream = caseDetailsStreamOptional.get(); if (parallel) { log.info("Executing in parallel.. please wait."); @@ -100,7 +105,6 @@ private void updateCase(String authorisation, Long id, Map data, } try { - log.debug("Case data: {}", data); var migratedData = dataMigrationService.migrate(data); coreCaseDataService.update( authorisation, diff --git a/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/CaseMigrationRunner.java b/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/CaseMigrationRunner.java index 851fdd43..fd186df8 100644 --- a/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/CaseMigrationRunner.java +++ b/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/CaseMigrationRunner.java @@ -39,9 +39,6 @@ public static void main(String[] args) { public void run(String... args) { try { String userToken = idamClient.authenticateUser(idamUsername, idamPassword); - log.debug("User token: {}", userToken); - String userId = idamClient.getUserDetails(userToken).getId(); - log.debug("User ID: {}", userId); StopWatch stopWatch = StopWatch.createStarted(); diff --git a/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/ccd/CoreCaseDataService.java b/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/ccd/CoreCaseDataService.java index b2dd72d2..14143dd6 100644 --- a/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/ccd/CoreCaseDataService.java +++ b/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/ccd/CoreCaseDataService.java @@ -5,9 +5,8 @@ import feign.FeignException; import java.time.LocalDate; -import java.util.Collection; import java.util.List; -import java.util.stream.Collectors; +import java.util.Optional; import java.util.stream.IntStream; import java.util.stream.Stream; import lombok.RequiredArgsConstructor; @@ -33,43 +32,50 @@ @RequiredArgsConstructor public class CoreCaseDataService { + private static final int PAGE_SIZE = 50; + private static final String CREATED_DATE = "created_date"; + private static final String SSCS_CASE_TYPE = "Benefit"; + + @Value("${migration.jurisdiction}") private String jurisdiction; + @Value("${migration.caseType}") private String caseType; + @Value("${migration.indexCases}") + private boolean indexCases; + private final IdamClient idamClient; private final AuthTokenGenerator authTokenGenerator; private final CoreCaseDataApi coreCaseDataApi; - int pagesize = 50; - public CaseDetails fetchOne(String authorisation, String caseId) { return coreCaseDataApi.getCase(authorisation, authTokenGenerator.generate(), caseId); } - public List fetchAllBetweenDates(String authorisation, - List listOfDates, - boolean parallel) { + public Optional> fetchAllBetweenDates(String authorisation, + List listOfDates, + boolean parallel) { Stream processingStream = parallel ? listOfDates.parallelStream() : listOfDates.stream(); return processingStream - .flatMap(date -> fetchAllForDay(authorisation, date.toString(), parallel).stream()) - .collect(Collectors.toList()); + .map(date -> fetchAllForDay(authorisation, date.toString(), parallel)) + .flatMap(Optional::stream) + .reduce(Stream::concat); } - public List fetchAllForDay(String authorisation, String day, boolean parallel) { - SearchSourceBuilder searchBuilder = new SearchSourceBuilder(); - searchBuilder.size(1); - searchBuilder.query(QueryBuilders.boolQuery().must(matchQuery( - "created_date", day))); + public Optional> fetchAllForDay(String authorisation, String day, boolean parallel) { + int total = searchCases(authorisation, singleCaseQuery(day)).getTotal(); - SearchResult searchResult = coreCaseDataApi.searchCases(authorisation, authTokenGenerator.generate(), "Benefit", searchBuilder.toString()); - int total = searchResult.getTotal(); log.info("Total for " + day + " is " + total); - searchBuilder.from(pagesize); - int numberOfPages = total/pagesize; + + if (indexCases) { + return Optional.empty(); + } + + int numberOfPages = (int) Math.ceil((double) total / PAGE_SIZE); Stream pageStream = IntStream .rangeClosed(0, numberOfPages - 1) @@ -81,45 +87,14 @@ public List fetchAllForDay(String authorisation, String day, boolea } return pageStream - .flatMap(pageNumber -> fetchPage(authorisation, pageNumber, day).stream()) - .collect(Collectors.toList()); - } - - private List fetchPage(String authorisation, int pageNumber, String day) { - SearchSourceBuilder searchBuilder = new SearchSourceBuilder(); - searchBuilder.size(pagesize); - searchBuilder.from(pageNumber * pagesize); - searchBuilder.query(QueryBuilders.boolQuery().must(matchQuery( - "created_date", day))); - - List caseDetails = emptyList(); - - log.info("Fetching page no. {} for day: {}", pageNumber + 1, day); - - try { - caseDetails = coreCaseDataApi.searchCases( - authorisation, - authTokenGenerator.generate(), - "Benefit", - searchBuilder.toString()) - .getCases(); - - } catch (FeignException fe) { - log.error("Feign Exception message: {} with search string: {}", - fe.contentUTF8(), searchBuilder); - } - - return caseDetails; + .map(pageNumber -> fetchPage(authorisation, day, pageNumber).stream()) + .reduce(Stream::concat); } public CaseDetails fetchOldestCase(String authorisation) { - SearchSourceBuilder searchBuilder = new SearchSourceBuilder(); - searchBuilder.size(pagesize); - searchBuilder.sort("created_date", SortOrder.ASC); - searchBuilder.query(QueryBuilders.boolQuery()); - - List caseDetails = coreCaseDataApi.searchCases(authorisation, authTokenGenerator.generate(), "Benefit", searchBuilder.toString()).getCases(); - return caseDetails.get(0); + return searchCases(authorisation, oldestCaseQuery()) + .getCases() + .get(0); } public CaseDetails update(String authorisation, String caseId, String eventId, String eventSummary, String eventDescription, Object data) { @@ -155,4 +130,50 @@ public CaseDetails update(String authorisation, String caseId, String eventId, S true, caseDataContent); } + + private List fetchPage(String authorisation, String day, int pageNumber) { + List caseDetails = emptyList(); + SearchSourceBuilder searchBuilder = pageQuery(day, pageNumber); + + log.info("Fetching page no. {} for day: {}", pageNumber + 1, day); + + try { + caseDetails = searchCases(authorisation, searchBuilder).getCases(); + } catch (FeignException fe) { + log.error("Feign Exception message: {} with search string: {}", + fe.contentUTF8(), searchBuilder); + } + + return caseDetails; + } + + private SearchResult searchCases(String authorisation, SearchSourceBuilder searchBuilder) { + return coreCaseDataApi.searchCases( + authorisation, + authTokenGenerator.generate(), + SSCS_CASE_TYPE, + searchBuilder.toString()); + } + + private SearchSourceBuilder oldestCaseQuery() { + return SearchSourceBuilder.searchSource() + .size(1) + .sort(CREATED_DATE, SortOrder.ASC) + .query(QueryBuilders.boolQuery()); + } + + private SearchSourceBuilder singleCaseQuery(String day) { + return SearchSourceBuilder.searchSource() + .size(1) + .query(QueryBuilders.boolQuery() + .must(matchQuery(CREATED_DATE, day))); + } + + private SearchSourceBuilder pageQuery(String day, int pageNumber) { + return SearchSourceBuilder.searchSource() + .size(PAGE_SIZE) + .from(pageNumber * PAGE_SIZE) + .query(QueryBuilders.boolQuery() + .must(matchQuery(CREATED_DATE, day))); + } } diff --git a/modules/processor/src/test/java/uk/gov/hmcts/reform/migration/CaseMigrationProcessorTest.java b/modules/processor/src/test/java/uk/gov/hmcts/reform/migration/CaseMigrationProcessorTest.java index 16b937b4..f563f1b4 100644 --- a/modules/processor/src/test/java/uk/gov/hmcts/reform/migration/CaseMigrationProcessorTest.java +++ b/modules/processor/src/test/java/uk/gov/hmcts/reform/migration/CaseMigrationProcessorTest.java @@ -1,13 +1,10 @@ package uk.gov.hmcts.reform.migration; -import static org.codehaus.groovy.runtime.InvokerHelper.asList; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.hasSize; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -16,6 +13,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.InjectMocks; @@ -25,7 +23,6 @@ import uk.gov.hmcts.reform.migration.ccd.CoreCaseDataService; import uk.gov.hmcts.reform.migration.service.DataMigrationService; - @RunWith(MockitoJUnitRunner.class) public class CaseMigrationProcessorTest { @@ -49,7 +46,7 @@ public class CaseMigrationProcessorTest { private CaseMigrationProcessor caseMigrationProcessor; @Test - public void shouldNotProcessASingleCaseWithOutRedundantFields() { + public void shouldNotProcessASingleCaseWithoutRedundantFields() { when(coreCaseDataService.fetchOne(USER_TOKEN, CASE_ID)).thenReturn(caseDetails1); when(dataMigrationService.accepts()).thenReturn(candidate -> false); caseMigrationProcessor.processSingleCase(USER_TOKEN, CASE_ID, false); @@ -81,7 +78,7 @@ public void shouldProcessASingleCaseAndMigrationIsFailed() { assertThat(caseMigrationProcessor.getMigratedCases(), hasSize(0)); } - @Test + @Ignore public void shouldProcessAllTheCandidateCases_whenOneCaseFailed() { mockDataFetch(caseDetails1, caseDetails2, caseDetails3); mockDataUpdate(caseDetails1); @@ -91,12 +88,14 @@ public void shouldProcessAllTheCandidateCases_whenOneCaseFailed() { when(dataMigrationService.migrate(caseDetails2.getData())).thenReturn(caseDetails2.getData()); when(dataMigrationService.migrate(caseDetails3.getData())).thenReturn(caseDetails3.getData()); when(coreCaseDataService.update(USER_TOKEN, caseDetails3.getId().toString(), EVENT_ID, EVENT_SUMMARY, EVENT_DESCRIPTION, caseDetails3.getData())).thenThrow(new RuntimeException("Internal server error")); + caseMigrationProcessor.processAllCases(USER_TOKEN, "2022-03-01", "2022-03-02", false); + assertThat(caseMigrationProcessor.getFailedCases(), contains(1113L)); assertThat(caseMigrationProcessor.getMigratedCases(), containsInAnyOrder(1111L, 1112L)); } - @Test + @Ignore public void shouldProcessAllTheCandidateCases_whenTwoCasesFailed() { mockDataFetch(caseDetails1, caseDetails2, caseDetails3); mockDataUpdate(caseDetails1); @@ -106,7 +105,9 @@ public void shouldProcessAllTheCandidateCases_whenTwoCasesFailed() { when(dataMigrationService.migrate(caseDetails3.getData())).thenReturn(caseDetails3.getData()); when(coreCaseDataService.update(USER_TOKEN, caseDetails2.getId().toString(), EVENT_ID, EVENT_SUMMARY, EVENT_DESCRIPTION, caseDetails2.getData())).thenThrow(new RuntimeException("Internal server error")); when(coreCaseDataService.update(USER_TOKEN, caseDetails3.getId().toString(), EVENT_ID, EVENT_SUMMARY, EVENT_DESCRIPTION, caseDetails3.getData())).thenThrow(new RuntimeException("Internal server error")); + caseMigrationProcessor.processAllCases(USER_TOKEN, "2022-03-01", "2022-03-02", false); + assertThat(caseMigrationProcessor.getFailedCases(), containsInAnyOrder(1112L, 1113L)); assertThat(caseMigrationProcessor.getMigratedCases(), contains(1111L)); } @@ -115,7 +116,7 @@ public void shouldProcessAllTheCandidateCases_whenTwoCasesFailed() { public void shouldProcessNoCaseWhenNoCasesAvailable() { mockDataFetch(); - when(dataMigrationService.accepts()).thenReturn(candidate -> true); +// when(dataMigrationService.accepts()).thenReturn(candidate -> true); caseMigrationProcessor.processAllCases(USER_TOKEN,"2022-03-01", "2022-03-30", false); assertThat(caseMigrationProcessor.getFailedCases(), hasSize(0)); assertThat(caseMigrationProcessor.getFailedCases(), hasSize(0)); @@ -166,7 +167,7 @@ public void shouldContainLeapYearOfDates() { } private void mockDataFetch(CaseDetails... caseDetails) { - when(coreCaseDataService.fetchAllForDay(eq(USER_TOKEN), anyString(), false)).thenReturn(asList(caseDetails)); +// when(coreCaseDataService.fetchAllForDay(eq(USER_TOKEN), anyString(), eq(false))).thenReturn(Optional.of(Stream.of(caseDetails))); } private void mockDataUpdate(CaseDetails caseDetails) { diff --git a/modules/processor/src/test/java/uk/gov/hmcts/reform/migration/ccd/CoreCaseDataServiceTest.java b/modules/processor/src/test/java/uk/gov/hmcts/reform/migration/ccd/CoreCaseDataServiceTest.java index edba5ee2..2d2796c2 100644 --- a/modules/processor/src/test/java/uk/gov/hmcts/reform/migration/ccd/CoreCaseDataServiceTest.java +++ b/modules/processor/src/test/java/uk/gov/hmcts/reform/migration/ccd/CoreCaseDataServiceTest.java @@ -36,18 +36,17 @@ public class CoreCaseDataServiceTest { private static final String EVENT_SUMMARY = "Migrate Case"; private static final String EVENT_DESC = "Migrate Case"; - @InjectMocks - private CoreCaseDataService underTest; - - @Mock - CoreCaseDataApi coreCaseDataApi; - @Mock private IdamClient idamClient; @Mock private AuthTokenGenerator authTokenGenerator; + @Mock + private CoreCaseDataApi coreCaseDataApi; + + @InjectMocks + private CoreCaseDataService underTest; @Before public void setUp() { From 1016c1bc337f56619611e2132f8f4e0e28524a3b Mon Sep 17 00:00:00 2001 From: Waqas Arshid Date: Thu, 23 Jun 2022 18:08:46 +0100 Subject: [PATCH 08/16] SSCS-10722: code improvements --- .../uk/gov/hmcts/reform/migration/CaseMigrationProcessor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/CaseMigrationProcessor.java b/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/CaseMigrationProcessor.java index 1d16a599..ff770ee7 100644 --- a/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/CaseMigrationProcessor.java +++ b/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/CaseMigrationProcessor.java @@ -37,7 +37,7 @@ public class CaseMigrationProcessor { @Getter private Long totalCases = 0L; - @Value("${migration.parallel:false}") + @Value("${migration.parallel}") private boolean parallel; public void processSingleCase(String userToken, String caseId, boolean dryrun) { From ecc67cbd3ef7cbbc0c1af28f81566422c3099d67 Mon Sep 17 00:00:00 2001 From: Del Attipoe Date: Fri, 22 Jul 2022 12:55:22 +0100 Subject: [PATCH 09/16] SSCS-10722 Migration improvements --- .../migration/CaseMigrationProcessor.java | 141 +++++++++++++----- .../reform/migration/CaseMigrationRunner.java | 11 +- .../migration/ccd/CoreCaseDataService.java | 115 ++++++-------- .../queries/CcdElasticSearchQueries.java | 72 +++++++++ .../migration/CaseMigrationProcessorTest.java | 21 ++- 5 files changed, 237 insertions(+), 123 deletions(-) create mode 100644 modules/processor/src/main/java/uk/gov/hmcts/reform/migration/queries/CcdElasticSearchQueries.java diff --git a/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/CaseMigrationProcessor.java b/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/CaseMigrationProcessor.java index ff770ee7..d0faf157 100644 --- a/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/CaseMigrationProcessor.java +++ b/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/CaseMigrationProcessor.java @@ -4,18 +4,24 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.Optional; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -import java.util.stream.Stream; + import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.time.StopWatch; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import uk.gov.hmcts.reform.ccd.client.model.CaseDetails; +import uk.gov.hmcts.reform.ccd.client.model.SearchResult; import uk.gov.hmcts.reform.migration.ccd.CoreCaseDataService; import uk.gov.hmcts.reform.migration.service.DataMigrationService; +import static uk.gov.hmcts.reform.migration.queries.CcdElasticSearchQueries.fetchAllUnsetCaseAccessManagementFieldsCasesQuery; + @Slf4j @Component @RequiredArgsConstructor @@ -25,6 +31,8 @@ public class CaseMigrationProcessor { private static final String EVENT_SUMMARY = "Migrate Case"; private static final String EVENT_DESCRIPTION = "Migrate Case"; + private final StopWatch totalTimer = new StopWatch(); + private final CoreCaseDataService coreCaseDataService; private final DataMigrationService dataMigrationService; @@ -40,7 +48,16 @@ public class CaseMigrationProcessor { @Value("${migration.parallel}") private boolean parallel; - public void processSingleCase(String userToken, String caseId, boolean dryrun) { + @Value("${migration.pageSize}") + private int pageSize; + + @Value("${migration.numThreads}") + private int numThreads; + + @Value("${migration.maxCasesToProcess}") + private int maxCasesToProcess; + + public void processSingleCase(String userToken, String caseId, boolean dryRun) { CaseDetails caseDetails; try { caseDetails = coreCaseDataService.fetchOne(userToken, caseId); @@ -49,77 +66,119 @@ public void processSingleCase(String userToken, String caseId, boolean dryrun) { return; } if (dataMigrationService.accepts().test(caseDetails)) { - updateCase(userToken, caseDetails.getId(), caseDetails.getData(), dryrun); + updateCase(userToken, caseDetails.getId(), caseDetails.getData(), dryRun); } else { log.info("Case {} already migrated", caseDetails.getId()); } } - public void processAllCases(String userToken, String firstDate, String lastDate, boolean dryrun) { - CaseDetails oldestCaseDetails = coreCaseDataService.fetchOldestCase(userToken); - if (oldestCaseDetails != null) { - log.info("The data of the oldest case is " + oldestCaseDetails.getCreatedDate()); + public void fetchAndProcessCases(String userToken, boolean dryRun) throws InterruptedException { + + SearchResult initialSearch = coreCaseDataService.searchCases(userToken, + fetchAllUnsetCaseAccessManagementFieldsCasesQuery()); + + int totalCasesToProcess = resolveTotalCasesToProcess(initialSearch); + + totalTimer.start(); + //Need to fix the off by one issue. + Long searchFrom = handleFirstCase(userToken, dryRun, initialSearch); + + totalCasesToProcess -= 1; + + int casesFetched = 0; + + ExecutorService executorService = Executors.newFixedThreadPool(numThreads); + + fetchAndSubmitTasks(userToken, dryRun, totalCasesToProcess, searchFrom, casesFetched, executorService); + + executorService.shutdown(); + executorService.awaitTermination(Integer.MAX_VALUE, TimeUnit.DAYS); + } + + private int resolveTotalCasesToProcess(SearchResult initialSearch) { + int totalCasesToProcess = 0; + + if (maxCasesToProcess > 0) { + log.info("Manual case override in use, limiting to {} cases", maxCasesToProcess); + totalCasesToProcess = maxCasesToProcess; + } else { + log.info("No manual case override in use, fetching all: {} cases", initialSearch.getTotal()); + totalCasesToProcess = initialSearch.getTotal(); } - if (firstDate != null && lastDate != null) { - List listOfDates = getListOfDates(LocalDate.parse(firstDate), LocalDate.parse(lastDate)); + return totalCasesToProcess; + } - Optional> caseDetailsStreamOptional = - coreCaseDataService.fetchAllBetweenDates(userToken, listOfDates, parallel); + private void fetchAndSubmitTasks(String userToken, boolean dryRun, int totalCasesToProcess, Long searchFrom, int casesFetched, + ExecutorService executorService) { - Stream caseDetailsStream; + while (casesFetched < totalCasesToProcess) { + List caseDetails = + coreCaseDataService.fetchNCases(userToken, pageSize, searchFrom); - if (caseDetailsStreamOptional.isEmpty()) { - return; + if (caseDetails.isEmpty()) { + break; } - caseDetailsStream = caseDetailsStreamOptional.get(); + searchFrom = caseDetails.get(caseDetails.size() - 1).getId(); - if (parallel) { - log.info("Executing in parallel.. please wait."); - caseDetailsStream = caseDetailsStream.parallel(); - } + executorService.execute(() -> caseDetails + .forEach(caseDetail -> + updateCase(userToken, caseDetail.getId(), caseDetail.getData(), dryRun))); - caseDetailsStream - .forEach(caseDetail -> updateCase( - userToken, - caseDetail.getId(), - caseDetail.getData(), - dryrun) - ); + log.info("New task submitted"); + + casesFetched += caseDetails.size(); + + log.info("{} cases fetched out of {}", casesFetched, totalCasesToProcess); } } + private Long handleFirstCase(String userToken, boolean dryRun, SearchResult initialSearch) { + log.info("Processing first case..."); + CaseDetails firstCase = initialSearch.getCases().get(0); + updateCase(userToken, firstCase.getId(), firstCase.getData(), dryRun); + return firstCase.getId(); + } + + public void processAllCases(String userToken, String userId, boolean dryRun) { + coreCaseDataService.fetchAll(userToken, userId).stream() + .filter(dataMigrationService.accepts()) + .forEach(caseDetails -> updateCase(userToken, caseDetails.getId(), caseDetails.getData(), dryRun)); + } + protected List getListOfDates(LocalDate startDate, LocalDate endDate) { return startDate .datesUntil(endDate) .collect(Collectors.toList()); } - private void updateCase(String authorisation, Long id, Map data, boolean dryrun) { + private void updateCase(String authorisation, Long id, Map data, boolean dryRun) { totalCases++; - if (dryrun) { - return; - } - try { var migratedData = dataMigrationService.migrate(data); - coreCaseDataService.update( - authorisation, - id.toString(), - EVENT_ID, - EVENT_SUMMARY, - EVENT_DESCRIPTION, - migratedData); - - log.info("Case {} successfully updated", id); + if (!dryRun) { + coreCaseDataService.update( + authorisation, + id.toString(), + EVENT_ID, + EVENT_SUMMARY, + EVENT_DESCRIPTION, + migratedData); + log.info("Case {} successfully updated", id); + } migratedCases.add(id); } catch (Exception e) { log.error("Case {} update failed due to: {}", id, e.getMessage()); failedCases.add(id); } + + if (totalCases % 1000 == 0) { + log.info("--------{} cases migrated in {} minutes ({} seconds)-------", totalCases, + totalTimer.getTime(TimeUnit.MINUTES), totalTimer.getTime(TimeUnit.SECONDS)); + } } } diff --git a/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/CaseMigrationRunner.java b/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/CaseMigrationRunner.java index fd186df8..2f86e481 100644 --- a/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/CaseMigrationRunner.java +++ b/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/CaseMigrationRunner.java @@ -42,12 +42,14 @@ public void run(String... args) { StopWatch stopWatch = StopWatch.createStarted(); + log.info(userToken); + if (ccdCaseId != null && !ccdCaseId.isBlank()) { log.info("Data migration of single case started"); caseMigrationProcessor.processSingleCase(userToken, ccdCaseId, dryrun); } else { - log.info("Data migration of cases between {} and {} started", startDate, endDate); - caseMigrationProcessor.processAllCases(userToken, startDate, endDate, dryrun); + log.info("Data migration of cases started"); + caseMigrationProcessor.fetchAndProcessCases(userToken, dryrun); } stopWatch.stop(); @@ -58,9 +60,10 @@ public void run(String... args) { log.info("Total number of migrations performed: {}", caseMigrationProcessor.getMigratedCases().size()); log.info("-----------------------------------------"); log.info("Migrated cases: {} ", !caseMigrationProcessor.getMigratedCases().isEmpty() ? caseMigrationProcessor.getMigratedCases() : "NONE"); - log.info("Failed cases: {}", !caseMigrationProcessor.getFailedCases().isEmpty() ? caseMigrationProcessor.getFailedCases() : "NONE"); + log.info("Failed cases: {}", caseMigrationProcessor.getFailedCases().size()); log.info("-----------------------------------------"); - log.info("Data migration completed in: {} minutes.", stopWatch.getTime(TimeUnit.MINUTES)); + log.info("Data migration completed in: {} minutes ({} seconds).", + stopWatch.getTime(TimeUnit.MINUTES), stopWatch.getTime(TimeUnit.SECONDS)); log.info("-----------------------------------------"); } catch (Throwable e) { log.error("Migration failed with the following reason: {}", e.getMessage(), e); diff --git a/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/ccd/CoreCaseDataService.java b/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/ccd/CoreCaseDataService.java index 14143dd6..857b7775 100644 --- a/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/ccd/CoreCaseDataService.java +++ b/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/ccd/CoreCaseDataService.java @@ -1,19 +1,26 @@ package uk.gov.hmcts.reform.migration.ccd; import static java.util.Collections.emptyList; -import static org.elasticsearch.index.query.QueryBuilders.matchQuery; +import static uk.gov.hmcts.reform.migration.queries.CcdElasticSearchQueries.fetchAllUnsetCaseAccessManagementFieldsCasesQuery; +import static uk.gov.hmcts.reform.migration.queries.CcdElasticSearchQueries.oldestCaseQuery; +import static uk.gov.hmcts.reform.migration.queries.CcdElasticSearchQueries.pageForUnsetCaseAccessManagementFieldsFieldsQuery; import feign.FeignException; + import java.time.LocalDate; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; + import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.elasticsearch.index.query.QueryBuilders; +import org.apache.commons.lang3.time.StopWatch; import org.elasticsearch.search.builder.SearchSourceBuilder; -import org.elasticsearch.search.sort.SortOrder; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import uk.gov.hmcts.reform.authorisation.generators.AuthTokenGenerator; @@ -21,6 +28,7 @@ import uk.gov.hmcts.reform.ccd.client.model.CaseDataContent; import uk.gov.hmcts.reform.ccd.client.model.CaseDetails; import uk.gov.hmcts.reform.ccd.client.model.Event; +import uk.gov.hmcts.reform.ccd.client.model.PaginatedSearchMetadata; import uk.gov.hmcts.reform.ccd.client.model.SearchResult; import uk.gov.hmcts.reform.ccd.client.model.StartEventResponse; import uk.gov.hmcts.reform.idam.client.IdamClient; @@ -32,10 +40,10 @@ @RequiredArgsConstructor public class CoreCaseDataService { - private static final int PAGE_SIZE = 50; - private static final String CREATED_DATE = "created_date"; private static final String SSCS_CASE_TYPE = "Benefit"; + + @Value("${migration.jurisdiction}") private String jurisdiction; @@ -53,42 +61,33 @@ public CaseDetails fetchOne(String authorisation, String caseId) { return coreCaseDataApi.getCase(authorisation, authTokenGenerator.generate(), caseId); } - public Optional> fetchAllBetweenDates(String authorisation, - List listOfDates, - boolean parallel) { - Stream processingStream = parallel - ? listOfDates.parallelStream() - : listOfDates.stream(); - - return processingStream - .map(date -> fetchAllForDay(authorisation, date.toString(), parallel)) - .flatMap(Optional::stream) - .reduce(Stream::concat); - } + public List fetchNCases(String authorisation, int casesToFetch, long searchFrom) { - public Optional> fetchAllForDay(String authorisation, String day, boolean parallel) { - int total = searchCases(authorisation, singleCaseQuery(day)).getTotal(); + StopWatch stopWatch = StopWatch.createStarted(); - log.info("Total for " + day + " is " + total); + List page = fetchPage(authorisation, + pageForUnsetCaseAccessManagementFieldsFieldsQuery(searchFrom, casesToFetch)); - if (indexCases) { - return Optional.empty(); - } + stopWatch.stop(); + + log.info("Case search with page size: {} completed in: {} minutes ({} seconds).", casesToFetch, + stopWatch.getTime(TimeUnit.MINUTES), stopWatch.getTime(TimeUnit.SECONDS)); - int numberOfPages = (int) Math.ceil((double) total / PAGE_SIZE); + return page; - Stream pageStream = IntStream - .rangeClosed(0, numberOfPages - 1) - .boxed(); + } + + private List fetchPage(String authorisation, SearchSourceBuilder searchSourceBuilder) { + List caseDetails = emptyList(); - if (parallel) { - log.info("Retrieving pages in parallel.. please wait."); - pageStream = pageStream.parallel(); + try { + caseDetails = searchCases(authorisation, searchSourceBuilder).getCases(); + } catch (FeignException fe) { + log.error("Feign Exception message: {} with search string: {}", + fe.contentUTF8(), searchSourceBuilder); } - return pageStream - .map(pageNumber -> fetchPage(authorisation, day, pageNumber).stream()) - .reduce(Stream::concat); + return caseDetails; } public CaseDetails fetchOldestCase(String authorisation) { @@ -131,23 +130,7 @@ public CaseDetails update(String authorisation, String caseId, String eventId, S caseDataContent); } - private List fetchPage(String authorisation, String day, int pageNumber) { - List caseDetails = emptyList(); - SearchSourceBuilder searchBuilder = pageQuery(day, pageNumber); - - log.info("Fetching page no. {} for day: {}", pageNumber + 1, day); - - try { - caseDetails = searchCases(authorisation, searchBuilder).getCases(); - } catch (FeignException fe) { - log.error("Feign Exception message: {} with search string: {}", - fe.contentUTF8(), searchBuilder); - } - - return caseDetails; - } - - private SearchResult searchCases(String authorisation, SearchSourceBuilder searchBuilder) { + public SearchResult searchCases(String authorisation, SearchSourceBuilder searchBuilder) { return coreCaseDataApi.searchCases( authorisation, authTokenGenerator.generate(), @@ -155,25 +138,25 @@ private SearchResult searchCases(String authorisation, SearchSourceBuilder searc searchBuilder.toString()); } - private SearchSourceBuilder oldestCaseQuery() { - return SearchSourceBuilder.searchSource() - .size(1) - .sort(CREATED_DATE, SortOrder.ASC) - .query(QueryBuilders.boolQuery()); + + public List fetchAll(String authorisation, String userId) { + int numberOfPages = getNumberOfPages(authorisation, userId, new HashMap<>()); + return IntStream.rangeClosed(1, numberOfPages).boxed() + .flatMap(pageNumber -> fetchPage(authorisation, userId, pageNumber).stream()) + .collect(Collectors.toList()); } - private SearchSourceBuilder singleCaseQuery(String day) { - return SearchSourceBuilder.searchSource() - .size(1) - .query(QueryBuilders.boolQuery() - .must(matchQuery(CREATED_DATE, day))); + private List fetchPage(String authorisation, String userId, int pageNumber) { + Map searchCriteria = new HashMap<>(); + searchCriteria.put("page", String.valueOf(pageNumber)); + return coreCaseDataApi.searchForCaseworker(authorisation, authTokenGenerator.generate(), userId, jurisdiction, + caseType, searchCriteria); } - private SearchSourceBuilder pageQuery(String day, int pageNumber) { - return SearchSourceBuilder.searchSource() - .size(PAGE_SIZE) - .from(pageNumber * PAGE_SIZE) - .query(QueryBuilders.boolQuery() - .must(matchQuery(CREATED_DATE, day))); + private int getNumberOfPages(String authorisation, String userId, Map searchCriteria) { + PaginatedSearchMetadata metadata = coreCaseDataApi.getPaginationInfoForSearchForCaseworkers(authorisation, + authTokenGenerator.generate(), userId, jurisdiction, caseType, searchCriteria); + return metadata.getTotalPagesCount(); } + } diff --git a/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/queries/CcdElasticSearchQueries.java b/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/queries/CcdElasticSearchQueries.java new file mode 100644 index 00000000..ed54114e --- /dev/null +++ b/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/queries/CcdElasticSearchQueries.java @@ -0,0 +1,72 @@ +package uk.gov.hmcts.reform.migration.queries; + +import org.elasticsearch.index.query.BoolQueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.search.sort.SortOrder; + +import static org.elasticsearch.index.query.QueryBuilders.existsQuery; +import static org.elasticsearch.index.query.QueryBuilders.matchQuery; + +public class CcdElasticSearchQueries { + + private CcdElasticSearchQueries() { + } + + private static final String CREATED_DATE = "created_date"; + + public static final String REFERENCE_KEYWORD = "reference.keyword"; + + public static SearchSourceBuilder oldestCaseQuery() { + return SearchSourceBuilder.searchSource() + .size(1) + .sort(CREATED_DATE, SortOrder.ASC) + .query(QueryBuilders.boolQuery()); + } + + public static SearchSourceBuilder pageQuery(String day, int pageNumber, int pageSize) { + return SearchSourceBuilder.searchSource() + .size(pageSize) + .from(pageNumber * pageSize) + .query(QueryBuilders.boolQuery() + .must(matchQuery(CREATED_DATE, day))); + } + + public static SearchSourceBuilder fetchAllUnsetCaseAccessManagementFieldsCasesQuery() { + return SearchSourceBuilder.searchSource() + .size(1) + .query(unsetCaseAccessManagementFieldsQuery()) + .sort(REFERENCE_KEYWORD, SortOrder.ASC); + } + + public static BoolQueryBuilder unsetCaseAccessManagementFieldsQuery() { + return QueryBuilders.boolQuery() + .mustNot( + QueryBuilders.boolQuery() + .should(existsQuery("data.CaseAccessCategory")) + .should(existsQuery("data.caseManagementCategory")) + .should(existsQuery("data.caseManagementLocation")) + .should(existsQuery("data.caseNameHmctsRestricted")) + .should(existsQuery("data.caseNamePublic")) + .should(existsQuery("data.caseNameHmctsInternal")) + .should(existsQuery("data.ogdType")) + .minimumShouldMatch(7)); + } + + public static SearchSourceBuilder pageForUnsetCaseAccessManagementFieldsFieldsQuery(int pageSize) { + return SearchSourceBuilder.searchSource() + .size(pageSize) + .query(unsetCaseAccessManagementFieldsQuery()) + .sort(REFERENCE_KEYWORD, SortOrder.ASC); + } + + public static SearchSourceBuilder pageForUnsetCaseAccessManagementFieldsFieldsQuery(Long lastCaseId, + int pageSize) { + + return SearchSourceBuilder.searchSource() + .size(pageSize) + .query(unsetCaseAccessManagementFieldsQuery()) + .searchAfter(new Object[] { lastCaseId}) + .sort(REFERENCE_KEYWORD, SortOrder.ASC); + } +} diff --git a/modules/processor/src/test/java/uk/gov/hmcts/reform/migration/CaseMigrationProcessorTest.java b/modules/processor/src/test/java/uk/gov/hmcts/reform/migration/CaseMigrationProcessorTest.java index f563f1b4..aa1356ac 100644 --- a/modules/processor/src/test/java/uk/gov/hmcts/reform/migration/CaseMigrationProcessorTest.java +++ b/modules/processor/src/test/java/uk/gov/hmcts/reform/migration/CaseMigrationProcessorTest.java @@ -26,6 +26,7 @@ @RunWith(MockitoJUnitRunner.class) public class CaseMigrationProcessorTest { + private static final String USER_ID = "30"; private static final String USER_TOKEN = "Bearer eeeejjjttt"; private static final String CASE_ID = "11111"; private static final String EVENT_ID = "migrateCase"; @@ -78,7 +79,7 @@ public void shouldProcessASingleCaseAndMigrationIsFailed() { assertThat(caseMigrationProcessor.getMigratedCases(), hasSize(0)); } - @Ignore + @Test public void shouldProcessAllTheCandidateCases_whenOneCaseFailed() { mockDataFetch(caseDetails1, caseDetails2, caseDetails3); mockDataUpdate(caseDetails1); @@ -88,14 +89,12 @@ public void shouldProcessAllTheCandidateCases_whenOneCaseFailed() { when(dataMigrationService.migrate(caseDetails2.getData())).thenReturn(caseDetails2.getData()); when(dataMigrationService.migrate(caseDetails3.getData())).thenReturn(caseDetails3.getData()); when(coreCaseDataService.update(USER_TOKEN, caseDetails3.getId().toString(), EVENT_ID, EVENT_SUMMARY, EVENT_DESCRIPTION, caseDetails3.getData())).thenThrow(new RuntimeException("Internal server error")); - - caseMigrationProcessor.processAllCases(USER_TOKEN, "2022-03-01", "2022-03-02", false); - + caseMigrationProcessor.processAllCases(USER_TOKEN, USER_ID, false); assertThat(caseMigrationProcessor.getFailedCases(), contains(1113L)); - assertThat(caseMigrationProcessor.getMigratedCases(), containsInAnyOrder(1111L, 1112L)); + assertThat(caseMigrationProcessor.getMigratedCases(), contains(1111L, 1112L)); } - @Ignore + @Test public void shouldProcessAllTheCandidateCases_whenTwoCasesFailed() { mockDataFetch(caseDetails1, caseDetails2, caseDetails3); mockDataUpdate(caseDetails1); @@ -105,10 +104,8 @@ public void shouldProcessAllTheCandidateCases_whenTwoCasesFailed() { when(dataMigrationService.migrate(caseDetails3.getData())).thenReturn(caseDetails3.getData()); when(coreCaseDataService.update(USER_TOKEN, caseDetails2.getId().toString(), EVENT_ID, EVENT_SUMMARY, EVENT_DESCRIPTION, caseDetails2.getData())).thenThrow(new RuntimeException("Internal server error")); when(coreCaseDataService.update(USER_TOKEN, caseDetails3.getId().toString(), EVENT_ID, EVENT_SUMMARY, EVENT_DESCRIPTION, caseDetails3.getData())).thenThrow(new RuntimeException("Internal server error")); - - caseMigrationProcessor.processAllCases(USER_TOKEN, "2022-03-01", "2022-03-02", false); - - assertThat(caseMigrationProcessor.getFailedCases(), containsInAnyOrder(1112L, 1113L)); + caseMigrationProcessor.processAllCases(USER_TOKEN, USER_ID, false); + assertThat(caseMigrationProcessor.getFailedCases(), contains(1112L, 1113L)); assertThat(caseMigrationProcessor.getMigratedCases(), contains(1111L)); } @@ -116,8 +113,8 @@ public void shouldProcessAllTheCandidateCases_whenTwoCasesFailed() { public void shouldProcessNoCaseWhenNoCasesAvailable() { mockDataFetch(); -// when(dataMigrationService.accepts()).thenReturn(candidate -> true); - caseMigrationProcessor.processAllCases(USER_TOKEN,"2022-03-01", "2022-03-30", false); + when(dataMigrationService.accepts()).thenReturn(candidate -> true); + caseMigrationProcessor.processAllCases(USER_TOKEN, USER_ID, false); assertThat(caseMigrationProcessor.getFailedCases(), hasSize(0)); assertThat(caseMigrationProcessor.getFailedCases(), hasSize(0)); } From 9fab97e8b5f4e18c62bf2d18d51ffac7c2077b35 Mon Sep 17 00:00:00 2001 From: Del Attipoe Date: Wed, 27 Jul 2022 13:20:43 +0100 Subject: [PATCH 10/16] SSCS-10722 Add tests, improve logging. --- .../migration/CaseMigrationProcessor.java | 50 ++++---- .../reform/migration/CaseMigrationRunner.java | 24 ++-- .../reform/migration/MigrationPageParams.java | 14 +++ .../migration/ccd/CoreCaseDataService.java | 9 -- .../migration/CaseMigrationProcessorTest.java | 115 +++++++++++++----- 5 files changed, 141 insertions(+), 71 deletions(-) create mode 100644 modules/processor/src/main/java/uk/gov/hmcts/reform/migration/MigrationPageParams.java diff --git a/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/CaseMigrationProcessor.java b/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/CaseMigrationProcessor.java index d0faf157..d6f284f4 100644 --- a/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/CaseMigrationProcessor.java +++ b/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/CaseMigrationProcessor.java @@ -45,18 +45,6 @@ public class CaseMigrationProcessor { @Getter private Long totalCases = 0L; - @Value("${migration.parallel}") - private boolean parallel; - - @Value("${migration.pageSize}") - private int pageSize; - - @Value("${migration.numThreads}") - private int numThreads; - - @Value("${migration.maxCasesToProcess}") - private int maxCasesToProcess; - public void processSingleCase(String userToken, String caseId, boolean dryRun) { CaseDetails caseDetails; try { @@ -72,30 +60,32 @@ public void processSingleCase(String userToken, String caseId, boolean dryRun) { } } - public void fetchAndProcessCases(String userToken, boolean dryRun) throws InterruptedException { + public void fetchAndProcessCases(String userToken, boolean dryRun, int numThreads, MigrationPageParams pageParams) + throws InterruptedException { SearchResult initialSearch = coreCaseDataService.searchCases(userToken, fetchAllUnsetCaseAccessManagementFieldsCasesQuery()); - int totalCasesToProcess = resolveTotalCasesToProcess(initialSearch); + if (initialSearch.getTotal() <= 0) { + return; + } totalTimer.start(); - //Need to fix the off by one issue. - Long searchFrom = handleFirstCase(userToken, dryRun, initialSearch); - totalCasesToProcess -= 1; + int totalCasesToProcess = resolveTotalCasesToProcess(initialSearch, pageParams.getMaxCasesToProcess()); - int casesFetched = 0; + Long searchFrom = handleFirstCase(userToken, dryRun, initialSearch); ExecutorService executorService = Executors.newFixedThreadPool(numThreads); - fetchAndSubmitTasks(userToken, dryRun, totalCasesToProcess, searchFrom, casesFetched, executorService); + fetchAndSubmitTasks(userToken, dryRun, totalCasesToProcess, pageParams.getPageSize(), searchFrom, + executorService); executorService.shutdown(); executorService.awaitTermination(Integer.MAX_VALUE, TimeUnit.DAYS); } - private int resolveTotalCasesToProcess(SearchResult initialSearch) { + private int resolveTotalCasesToProcess(SearchResult initialSearch, int maxCasesToProcess) { int totalCasesToProcess = 0; if (maxCasesToProcess > 0) { @@ -109,12 +99,16 @@ private int resolveTotalCasesToProcess(SearchResult initialSearch) { return totalCasesToProcess; } - private void fetchAndSubmitTasks(String userToken, boolean dryRun, int totalCasesToProcess, Long searchFrom, int casesFetched, - ExecutorService executorService) { + private void fetchAndSubmitTasks(String userToken, boolean dryRun, int totalCasesToProcess, int pageSize, + Long searchFrom, ExecutorService executorService) { + int casesFetched = 1; + int numCasesToFetch = pageSize; while (casesFetched < totalCasesToProcess) { + numCasesToFetch = resolvePageSize(totalCasesToProcess, casesFetched, numCasesToFetch, pageSize); + List caseDetails = - coreCaseDataService.fetchNCases(userToken, pageSize, searchFrom); + coreCaseDataService.fetchNCases(userToken, numCasesToFetch, searchFrom); if (caseDetails.isEmpty()) { break; @@ -134,6 +128,14 @@ private void fetchAndSubmitTasks(String userToken, boolean dryRun, int totalCase } } + private int resolvePageSize(int totalCasesToProcess, int casesFetched, int numCasesToFetch, int pageSize) { + int remainingCases = totalCasesToProcess - casesFetched; + if (remainingCases < pageSize) { + numCasesToFetch = remainingCases; + } + return numCasesToFetch; + } + private Long handleFirstCase(String userToken, boolean dryRun, SearchResult initialSearch) { log.info("Processing first case..."); CaseDetails firstCase = initialSearch.getCases().get(0); @@ -177,7 +179,7 @@ private void updateCase(String authorisation, Long id, Map data, } if (totalCases % 1000 == 0) { - log.info("--------{} cases migrated in {} minutes ({} seconds)-------", totalCases, + log.info("----------{} cases migrated in {} minutes ({} seconds)----------", totalCases, totalTimer.getTime(TimeUnit.MINUTES), totalTimer.getTime(TimeUnit.SECONDS)); } } diff --git a/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/CaseMigrationRunner.java b/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/CaseMigrationRunner.java index 2f86e481..41769098 100644 --- a/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/CaseMigrationRunner.java +++ b/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/CaseMigrationRunner.java @@ -21,12 +21,15 @@ public class CaseMigrationRunner implements CommandLineRunner { private String idamPassword; @Value("${migration.caseId}") private String ccdCaseId; - @Value("${migration.startDate}") - private String startDate; - @Value("${migration.endDate}") - private String endDate; @Value("${migration.dryrun}") private boolean dryrun; + @Value("${migration.pageSize}") + private int pageSize; + @Value("${migration.maxCasesToProcess}") + private int maxCasesToProcess; + @Value("${migration.numThreads}") + private int numThreads; + private final IdamClient idamClient; private final CaseMigrationProcessor caseMigrationProcessor; @@ -40,26 +43,29 @@ public void run(String... args) { try { String userToken = idamClient.authenticateUser(idamUsername, idamPassword); - StopWatch stopWatch = StopWatch.createStarted(); + MigrationPageParams pageParams = new MigrationPageParams(pageSize, maxCasesToProcess); - log.info(userToken); + StopWatch stopWatch = StopWatch.createStarted(); if (ccdCaseId != null && !ccdCaseId.isBlank()) { log.info("Data migration of single case started"); caseMigrationProcessor.processSingleCase(userToken, ccdCaseId, dryrun); } else { log.info("Data migration of cases started"); - caseMigrationProcessor.fetchAndProcessCases(userToken, dryrun); + caseMigrationProcessor.fetchAndProcessCases(userToken, dryrun, numThreads, pageParams); } stopWatch.stop(); log.info("-----------------------------------------"); log.info("Total number of cases: {}", caseMigrationProcessor.getTotalCases()); - log.info("Total number of processed cases: {}", caseMigrationProcessor.getMigratedCases().size() + caseMigrationProcessor.getFailedCases().size()); + log.info("Total number of processed cases: {}", caseMigrationProcessor.getMigratedCases().size() + + caseMigrationProcessor.getFailedCases().size()); log.info("Total number of migrations performed: {}", caseMigrationProcessor.getMigratedCases().size()); log.info("-----------------------------------------"); - log.info("Migrated cases: {} ", !caseMigrationProcessor.getMigratedCases().isEmpty() ? caseMigrationProcessor.getMigratedCases() : "NONE"); + log.info("Migrated cases: {} ", !caseMigrationProcessor.getMigratedCases().isEmpty() + ? caseMigrationProcessor.getMigratedCases() + : "NONE"); log.info("Failed cases: {}", caseMigrationProcessor.getFailedCases().size()); log.info("-----------------------------------------"); log.info("Data migration completed in: {} minutes ({} seconds).", diff --git a/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/MigrationPageParams.java b/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/MigrationPageParams.java new file mode 100644 index 00000000..55f8e119 --- /dev/null +++ b/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/MigrationPageParams.java @@ -0,0 +1,14 @@ +package uk.gov.hmcts.reform.migration; + + +import lombok.Getter; +import lombok.RequiredArgsConstructor; + +@Getter +@RequiredArgsConstructor +public class MigrationPageParams { + + private final int pageSize; + + private final int maxCasesToProcess; +} diff --git a/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/ccd/CoreCaseDataService.java b/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/ccd/CoreCaseDataService.java index 857b7775..d4892a04 100644 --- a/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/ccd/CoreCaseDataService.java +++ b/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/ccd/CoreCaseDataService.java @@ -1,21 +1,17 @@ package uk.gov.hmcts.reform.migration.ccd; import static java.util.Collections.emptyList; -import static uk.gov.hmcts.reform.migration.queries.CcdElasticSearchQueries.fetchAllUnsetCaseAccessManagementFieldsCasesQuery; import static uk.gov.hmcts.reform.migration.queries.CcdElasticSearchQueries.oldestCaseQuery; import static uk.gov.hmcts.reform.migration.queries.CcdElasticSearchQueries.pageForUnsetCaseAccessManagementFieldsFieldsQuery; import feign.FeignException; -import java.time.LocalDate; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.IntStream; -import java.util.stream.Stream; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -42,17 +38,12 @@ public class CoreCaseDataService { private static final String SSCS_CASE_TYPE = "Benefit"; - - @Value("${migration.jurisdiction}") private String jurisdiction; @Value("${migration.caseType}") private String caseType; - @Value("${migration.indexCases}") - private boolean indexCases; - private final IdamClient idamClient; private final AuthTokenGenerator authTokenGenerator; private final CoreCaseDataApi coreCaseDataApi; diff --git a/modules/processor/src/test/java/uk/gov/hmcts/reform/migration/CaseMigrationProcessorTest.java b/modules/processor/src/test/java/uk/gov/hmcts/reform/migration/CaseMigrationProcessorTest.java index aa1356ac..ed505320 100644 --- a/modules/processor/src/test/java/uk/gov/hmcts/reform/migration/CaseMigrationProcessorTest.java +++ b/modules/processor/src/test/java/uk/gov/hmcts/reform/migration/CaseMigrationProcessorTest.java @@ -1,25 +1,29 @@ package uk.gov.hmcts.reform.migration; +import static org.codehaus.groovy.runtime.InvokerHelper.asList; import static org.hamcrest.Matchers.contains; -import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.hasSize; -import static org.junit.Assert.assertEquals; +import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import java.time.LocalDate; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import org.junit.Ignore; + +import org.elasticsearch.search.builder.SearchSourceBuilder; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; import uk.gov.hmcts.reform.ccd.client.model.CaseDetails; +import uk.gov.hmcts.reform.ccd.client.model.SearchResult; import uk.gov.hmcts.reform.migration.ccd.CoreCaseDataService; import uk.gov.hmcts.reform.migration.service.DataMigrationService; @@ -120,51 +124,104 @@ public void shouldProcessNoCaseWhenNoCasesAvailable() { } @Test - public void shouldContainSingleDate() { - String firstDate = "2021-01-01"; - String lastDate = "2021-01-02"; + public void shouldDoNothingIfNoCasesToProcess() throws InterruptedException { + SearchResult result = SearchResult.builder() + .cases(new ArrayList<>()) + .total(0).build(); + + MigrationPageParams pageParams = new MigrationPageParams(10, 10); + + when(coreCaseDataService.searchCases(anyString(), + any(SearchSourceBuilder.class))).thenReturn(result); + + caseMigrationProcessor.fetchAndProcessCases(USER_TOKEN, false, 1, pageParams); + + assertThat(caseMigrationProcessor.getFailedCases().size(), is(0)); + assertThat(caseMigrationProcessor.getMigratedCases().size(), is(0)); + } + + @Test + public void shouldUseOverrideIfProvided() throws InterruptedException { + SearchResult result = SearchResult.builder() + .cases(List.of(caseDetails1, caseDetails2, caseDetails3)) + .total(3).build(); + + MigrationPageParams pageParams = new MigrationPageParams(10, 2); + + when(coreCaseDataService.searchCases(anyString(), + any(SearchSourceBuilder.class))).thenReturn(result); - List listOfDates = caseMigrationProcessor.getListOfDates(LocalDate.parse(firstDate), - LocalDate.parse(lastDate)); + when(coreCaseDataService.fetchNCases(USER_TOKEN, 1, + caseDetails1.getId())).thenReturn(List.of(caseDetails2)); - assertEquals(1, listOfDates.size()); + caseMigrationProcessor.fetchAndProcessCases(USER_TOKEN, false, 1, pageParams); + + assertThat(caseMigrationProcessor.getFailedCases().size(), is(0)); + assertThat(caseMigrationProcessor.getMigratedCases().size(), is(2)); } @Test - public void shouldContainTwoDates() { - String firstDate = "2021-01-01"; - String lastDate = "2021-01-03"; + public void shouldProcessAllCasesIfNoOverride() throws InterruptedException { + SearchResult result = SearchResult.builder() + .cases(List.of(caseDetails1, caseDetails2, caseDetails3)) + .total(3).build(); + + MigrationPageParams pageParams = new MigrationPageParams(10, 0); + + when(coreCaseDataService.searchCases(anyString(), + any(SearchSourceBuilder.class))).thenReturn(result); - List listOfDates = caseMigrationProcessor.getListOfDates(LocalDate.parse(firstDate), - LocalDate.parse(lastDate)); + when(coreCaseDataService.fetchNCases(USER_TOKEN, 2, + caseDetails1.getId())).thenReturn(List.of(caseDetails2, caseDetails3)); - assertEquals(2, listOfDates.size()); + caseMigrationProcessor.fetchAndProcessCases(USER_TOKEN, false, 1, pageParams); + + assertThat(caseMigrationProcessor.getFailedCases().size(), is(0)); + assertThat(caseMigrationProcessor.getMigratedCases().size(), is(3)); } @Test - public void shouldContainNormalYearOfDates() { - String firstDate = "2021-01-01"; - String lastDate = "2022-01-01"; + public void shouldBreakWhenNoMoreCasesReturned() throws InterruptedException { + SearchResult result = SearchResult.builder() + .cases(List.of(caseDetails1, caseDetails2, caseDetails3)) + .total(3).build(); + + MigrationPageParams pageParams = new MigrationPageParams(10, 2); - List listOfDates = caseMigrationProcessor.getListOfDates(LocalDate.parse(firstDate), - LocalDate.parse(lastDate)); + when(coreCaseDataService.searchCases(any(String.class), + any(SearchSourceBuilder.class))).thenReturn(result); - assertEquals(365, listOfDates.size()); + caseMigrationProcessor.fetchAndProcessCases(USER_TOKEN, false, 1, pageParams); + + assertThat(caseMigrationProcessor.getFailedCases().size(), is(0)); + assertThat(caseMigrationProcessor.getMigratedCases().size(), is(1)); } @Test - public void shouldContainLeapYearOfDates() { - String firstDate = "2020-01-01"; - String lastDate = "2021-01-01"; + public void shouldSearchFromLastCaseInPreviousResult() throws InterruptedException { + SearchResult result = SearchResult.builder() + .cases(List.of(caseDetails1, caseDetails2, caseDetails3)) + .total(3).build(); + + MigrationPageParams pageParams = new MigrationPageParams(1, 0); + + when(coreCaseDataService.searchCases(any(String.class), + any(SearchSourceBuilder.class))).thenReturn(result); + + when(coreCaseDataService.fetchNCases(USER_TOKEN, 1, + caseDetails1.getId())).thenReturn(List.of(caseDetails2)); + + when(coreCaseDataService.fetchNCases(USER_TOKEN, 1, + caseDetails2.getId())).thenReturn(List.of(caseDetails3)); - List listOfDates = caseMigrationProcessor.getListOfDates(LocalDate.parse(firstDate), - LocalDate.parse(lastDate)); + caseMigrationProcessor.fetchAndProcessCases(USER_TOKEN, false, 1, pageParams); - assertEquals(366, listOfDates.size()); + assertThat(caseMigrationProcessor.getFailedCases().size(), is(0)); + assertThat(caseMigrationProcessor.getMigratedCases().size(), is(3)); } private void mockDataFetch(CaseDetails... caseDetails) { -// when(coreCaseDataService.fetchAllForDay(eq(USER_TOKEN), anyString(), eq(false))).thenReturn(Optional.of(Stream.of(caseDetails))); + when(coreCaseDataService.fetchAll(USER_TOKEN, USER_ID)).thenReturn(asList(caseDetails)); } private void mockDataUpdate(CaseDetails caseDetails) { From 0dd05e8720d7e153c24e2877d3514b82cfca0130 Mon Sep 17 00:00:00 2001 From: Del Attipoe Date: Thu, 28 Jul 2022 11:44:30 +0100 Subject: [PATCH 11/16] SSCS-10722 Specify loggers, use set for cases. --- .../migration/CaseMigrationProcessor.java | 36 ++++++++++--------- .../reform/migration/CaseMigrationRunner.java | 7 ++-- 2 files changed, 23 insertions(+), 20 deletions(-) diff --git a/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/CaseMigrationProcessor.java b/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/CaseMigrationProcessor.java index d6f284f4..dea90c6a 100644 --- a/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/CaseMigrationProcessor.java +++ b/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/CaseMigrationProcessor.java @@ -1,9 +1,10 @@ package uk.gov.hmcts.reform.migration; import java.time.LocalDate; -import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -11,9 +12,9 @@ import lombok.Getter; import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.time.StopWatch; -import org.springframework.beans.factory.annotation.Value; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import uk.gov.hmcts.reform.ccd.client.model.CaseDetails; import uk.gov.hmcts.reform.ccd.client.model.SearchResult; @@ -22,11 +23,14 @@ import static uk.gov.hmcts.reform.migration.queries.CcdElasticSearchQueries.fetchAllUnsetCaseAccessManagementFieldsCasesQuery; -@Slf4j @Component @RequiredArgsConstructor public class CaseMigrationProcessor { + private final Logger errorLogger = LoggerFactory.getLogger("ccd-migration-error"); + + private final Logger infoLogger = LoggerFactory.getLogger("ccd-migration-info"); + private static final String EVENT_ID = "migrateCase"; private static final String EVENT_SUMMARY = "Migrate Case"; private static final String EVENT_DESCRIPTION = "Migrate Case"; @@ -37,10 +41,10 @@ public class CaseMigrationProcessor { private final DataMigrationService dataMigrationService; @Getter - private final List migratedCases = new ArrayList<>(); + private final Set migratedCases = new HashSet<>(); @Getter - private final List failedCases = new ArrayList<>(); + private final Set failedCases = new HashSet<>(); @Getter private Long totalCases = 0L; @@ -50,13 +54,13 @@ public void processSingleCase(String userToken, String caseId, boolean dryRun) { try { caseDetails = coreCaseDataService.fetchOne(userToken, caseId); } catch (Exception ex) { - log.error("Case {} not found due to: {}", caseId, ex.getMessage()); + errorLogger.error("Case {} not found due to: {}", caseId, ex.getMessage()); return; } if (dataMigrationService.accepts().test(caseDetails)) { updateCase(userToken, caseDetails.getId(), caseDetails.getData(), dryRun); } else { - log.info("Case {} already migrated", caseDetails.getId()); + infoLogger.info("Case {} already migrated", caseDetails.getId()); } } @@ -89,10 +93,10 @@ private int resolveTotalCasesToProcess(SearchResult initialSearch, int maxCasesT int totalCasesToProcess = 0; if (maxCasesToProcess > 0) { - log.info("Manual case override in use, limiting to {} cases", maxCasesToProcess); + infoLogger.info("Manual case override in use, limiting to {} cases", maxCasesToProcess); totalCasesToProcess = maxCasesToProcess; } else { - log.info("No manual case override in use, fetching all: {} cases", initialSearch.getTotal()); + infoLogger.info("No manual case override in use, fetching all: {} cases", initialSearch.getTotal()); totalCasesToProcess = initialSearch.getTotal(); } @@ -120,11 +124,11 @@ private void fetchAndSubmitTasks(String userToken, boolean dryRun, int totalCase .forEach(caseDetail -> updateCase(userToken, caseDetail.getId(), caseDetail.getData(), dryRun))); - log.info("New task submitted"); + infoLogger.info("New task submitted"); casesFetched += caseDetails.size(); - log.info("{} cases fetched out of {}", casesFetched, totalCasesToProcess); + infoLogger.info("{} cases fetched out of {}", casesFetched, totalCasesToProcess); } } @@ -137,7 +141,7 @@ private int resolvePageSize(int totalCasesToProcess, int casesFetched, int numCa } private Long handleFirstCase(String userToken, boolean dryRun, SearchResult initialSearch) { - log.info("Processing first case..."); + infoLogger.info("Processing first case..."); CaseDetails firstCase = initialSearch.getCases().get(0); updateCase(userToken, firstCase.getId(), firstCase.getData(), dryRun); return firstCase.getId(); @@ -169,17 +173,17 @@ private void updateCase(String authorisation, Long id, Map data, EVENT_SUMMARY, EVENT_DESCRIPTION, migratedData); - log.info("Case {} successfully updated", id); + infoLogger.info("Case {} successfully updated", id); } migratedCases.add(id); } catch (Exception e) { - log.error("Case {} update failed due to: {}", id, e.getMessage()); + errorLogger.error("Case {} update failed due to: {}", id, e.getMessage()); failedCases.add(id); } if (totalCases % 1000 == 0) { - log.info("----------{} cases migrated in {} minutes ({} seconds)----------", totalCases, + infoLogger.info("----------{} cases migrated in {} minutes ({} seconds)----------", totalCases, totalTimer.getTime(TimeUnit.MINUTES), totalTimer.getTime(TimeUnit.SECONDS)); } } diff --git a/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/CaseMigrationRunner.java b/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/CaseMigrationRunner.java index 41769098..fb63c080 100644 --- a/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/CaseMigrationRunner.java +++ b/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/CaseMigrationRunner.java @@ -2,19 +2,20 @@ import java.util.concurrent.TimeUnit; import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.time.StopWatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import uk.gov.hmcts.reform.idam.client.IdamClient; -@Slf4j @SpringBootApplication @RequiredArgsConstructor public class CaseMigrationRunner implements CommandLineRunner { + private final Logger log = LoggerFactory.getLogger("ccd-migration-info"); @Value("${migration.idam.username}") private String idamUsername; @Value("${migration.idam.password}") @@ -29,8 +30,6 @@ public class CaseMigrationRunner implements CommandLineRunner { private int maxCasesToProcess; @Value("${migration.numThreads}") private int numThreads; - - private final IdamClient idamClient; private final CaseMigrationProcessor caseMigrationProcessor; From ab6703b5f0dda9a58bf1cc2443a2bcdd0cc3aca4 Mon Sep 17 00:00:00 2001 From: Del Attipoe Date: Fri, 29 Jul 2022 12:25:48 +0100 Subject: [PATCH 12/16] SSCS-10722 Pass case ID to migration steps. --- .../reform/migration/CaseMigrationProcessor.java | 2 +- .../migration/service/DataMigrationService.java | 2 +- .../migration/CaseMigrationProcessorTest.java | 14 +++++++------- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/CaseMigrationProcessor.java b/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/CaseMigrationProcessor.java index dea90c6a..cf5df025 100644 --- a/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/CaseMigrationProcessor.java +++ b/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/CaseMigrationProcessor.java @@ -164,7 +164,7 @@ private void updateCase(String authorisation, Long id, Map data, totalCases++; try { - var migratedData = dataMigrationService.migrate(data); + var migratedData = dataMigrationService.migrate(data, id); if (!dryRun) { coreCaseDataService.update( authorisation, diff --git a/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/service/DataMigrationService.java b/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/service/DataMigrationService.java index b6fea5c0..a9974994 100644 --- a/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/service/DataMigrationService.java +++ b/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/service/DataMigrationService.java @@ -8,5 +8,5 @@ public interface DataMigrationService { Predicate accepts(); - T migrate(Map data); + T migrate(Map data, Long id); } diff --git a/modules/processor/src/test/java/uk/gov/hmcts/reform/migration/CaseMigrationProcessorTest.java b/modules/processor/src/test/java/uk/gov/hmcts/reform/migration/CaseMigrationProcessorTest.java index ed505320..5be82e16 100644 --- a/modules/processor/src/test/java/uk/gov/hmcts/reform/migration/CaseMigrationProcessorTest.java +++ b/modules/processor/src/test/java/uk/gov/hmcts/reform/migration/CaseMigrationProcessorTest.java @@ -74,7 +74,7 @@ public void shouldProcessASingleCaseAndMigrationIsSuccessful() { public void shouldProcessASingleCaseAndMigrationIsFailed() { when(coreCaseDataService.fetchOne(USER_TOKEN, CASE_ID)).thenReturn(caseDetails1); when(dataMigrationService.accepts()).thenReturn(candidate -> true); - when(dataMigrationService.migrate(caseDetails1.getData())).thenReturn(caseDetails1.getData()); + when(dataMigrationService.migrate(caseDetails1.getData(), caseDetails1.getId())).thenReturn(caseDetails1.getData()); when(coreCaseDataService.update(USER_TOKEN, caseDetails1.getId().toString(), EVENT_ID, EVENT_SUMMARY, EVENT_DESCRIPTION, caseDetails1.getData())).thenThrow(new RuntimeException("Internal server error")); caseMigrationProcessor.processSingleCase(USER_TOKEN, CASE_ID, false); verify(coreCaseDataService, times(1)).fetchOne(USER_TOKEN, CASE_ID); @@ -89,9 +89,9 @@ public void shouldProcessAllTheCandidateCases_whenOneCaseFailed() { mockDataUpdate(caseDetails1); mockDataUpdate(caseDetails2); when(dataMigrationService.accepts()).thenReturn(candidate -> true); - when(dataMigrationService.migrate(caseDetails1.getData())).thenReturn(caseDetails1.getData()); - when(dataMigrationService.migrate(caseDetails2.getData())).thenReturn(caseDetails2.getData()); - when(dataMigrationService.migrate(caseDetails3.getData())).thenReturn(caseDetails3.getData()); + when(dataMigrationService.migrate(caseDetails1.getData(), caseDetails1.getId())).thenReturn(caseDetails1.getData()); + when(dataMigrationService.migrate(caseDetails2.getData(), caseDetails2.getId())).thenReturn(caseDetails2.getData()); + when(dataMigrationService.migrate(caseDetails3.getData(), caseDetails3.getId())).thenReturn(caseDetails3.getData()); when(coreCaseDataService.update(USER_TOKEN, caseDetails3.getId().toString(), EVENT_ID, EVENT_SUMMARY, EVENT_DESCRIPTION, caseDetails3.getData())).thenThrow(new RuntimeException("Internal server error")); caseMigrationProcessor.processAllCases(USER_TOKEN, USER_ID, false); assertThat(caseMigrationProcessor.getFailedCases(), contains(1113L)); @@ -103,9 +103,9 @@ public void shouldProcessAllTheCandidateCases_whenTwoCasesFailed() { mockDataFetch(caseDetails1, caseDetails2, caseDetails3); mockDataUpdate(caseDetails1); when(dataMigrationService.accepts()).thenReturn(candidate -> true); - when(dataMigrationService.migrate(caseDetails1.getData())).thenReturn(caseDetails1.getData()); - when(dataMigrationService.migrate(caseDetails2.getData())).thenReturn(caseDetails2.getData()); - when(dataMigrationService.migrate(caseDetails3.getData())).thenReturn(caseDetails3.getData()); + when(dataMigrationService.migrate(caseDetails1.getData(), caseDetails1.getId())).thenReturn(caseDetails1.getData()); + when(dataMigrationService.migrate(caseDetails2.getData(), caseDetails2.getId())).thenReturn(caseDetails2.getData()); + when(dataMigrationService.migrate(caseDetails3.getData(), caseDetails3.getId())).thenReturn(caseDetails3.getData()); when(coreCaseDataService.update(USER_TOKEN, caseDetails2.getId().toString(), EVENT_ID, EVENT_SUMMARY, EVENT_DESCRIPTION, caseDetails2.getData())).thenThrow(new RuntimeException("Internal server error")); when(coreCaseDataService.update(USER_TOKEN, caseDetails3.getId().toString(), EVENT_ID, EVENT_SUMMARY, EVENT_DESCRIPTION, caseDetails3.getData())).thenThrow(new RuntimeException("Internal server error")); caseMigrationProcessor.processAllCases(USER_TOKEN, USER_ID, false); From a8aeba1040082b602591918f0da8dbaac002c1d7 Mon Sep 17 00:00:00 2001 From: Del Attipoe Date: Wed, 17 Aug 2022 12:11:17 +0100 Subject: [PATCH 13/16] SSCS-10722 Ensure at least appellant or appointee postcode exits. --- .../reform/migration/queries/CcdElasticSearchQueries.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/queries/CcdElasticSearchQueries.java b/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/queries/CcdElasticSearchQueries.java index ed54114e..d4926d89 100644 --- a/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/queries/CcdElasticSearchQueries.java +++ b/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/queries/CcdElasticSearchQueries.java @@ -41,6 +41,10 @@ public static SearchSourceBuilder fetchAllUnsetCaseAccessManagementFieldsCasesQu public static BoolQueryBuilder unsetCaseAccessManagementFieldsQuery() { return QueryBuilders.boolQuery() + .must(QueryBuilders.boolQuery() + .should(existsQuery("data.appeal.appellant.address.postcode")) + .should(existsQuery("data.appeal.appellant.appointee.address.postcode")) + .minimumShouldMatch(1)) .mustNot( QueryBuilders.boolQuery() .should(existsQuery("data.CaseAccessCategory")) From d3ef86171a8a4dba305ad3f46e11cb7c773488c5 Mon Sep 17 00:00:00 2001 From: Del Attipoe Date: Wed, 17 Aug 2022 12:59:52 +0100 Subject: [PATCH 14/16] SSCS-10722 Ensure processing venue exists. --- .../reform/migration/queries/CcdElasticSearchQueries.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/queries/CcdElasticSearchQueries.java b/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/queries/CcdElasticSearchQueries.java index d4926d89..c97b1749 100644 --- a/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/queries/CcdElasticSearchQueries.java +++ b/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/queries/CcdElasticSearchQueries.java @@ -44,7 +44,8 @@ public static BoolQueryBuilder unsetCaseAccessManagementFieldsQuery() { .must(QueryBuilders.boolQuery() .should(existsQuery("data.appeal.appellant.address.postcode")) .should(existsQuery("data.appeal.appellant.appointee.address.postcode")) - .minimumShouldMatch(1)) + .minimumShouldMatch(1) + .must(existsQuery("data.processingVenue"))) .mustNot( QueryBuilders.boolQuery() .should(existsQuery("data.CaseAccessCategory")) From f43a4a431b216019e5a7cc248737fd9b634fe487 Mon Sep 17 00:00:00 2001 From: Del Attipoe Date: Mon, 10 Oct 2022 11:02:22 +0100 Subject: [PATCH 15/16] SSCS-10722 Add logging for failed cases. --- .../uk/gov/hmcts/reform/migration/CaseMigrationRunner.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/CaseMigrationRunner.java b/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/CaseMigrationRunner.java index fb63c080..dc8b97bb 100644 --- a/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/CaseMigrationRunner.java +++ b/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/CaseMigrationRunner.java @@ -62,10 +62,14 @@ public void run(String... args) { + caseMigrationProcessor.getFailedCases().size()); log.info("Total number of migrations performed: {}", caseMigrationProcessor.getMigratedCases().size()); log.info("-----------------------------------------"); + log.info("Number of migrated cases: {}", caseMigrationProcessor.getMigratedCases().size()); log.info("Migrated cases: {} ", !caseMigrationProcessor.getMigratedCases().isEmpty() ? caseMigrationProcessor.getMigratedCases() : "NONE"); - log.info("Failed cases: {}", caseMigrationProcessor.getFailedCases().size()); + log.info("Number of failed cases: {}", caseMigrationProcessor.getFailedCases().size()); + log.info("Failed cases: {} ", !caseMigrationProcessor.getFailedCases().isEmpty() + ? caseMigrationProcessor.getFailedCases() + : "NONE"); log.info("-----------------------------------------"); log.info("Data migration completed in: {} minutes ({} seconds).", stopWatch.getTime(TimeUnit.MINUTES), stopWatch.getTime(TimeUnit.SECONDS)); From 64b85b4b5a7eb3978302b0fa92fb9e5c9ec78b8f Mon Sep 17 00:00:00 2001 From: dattipoeHMCTS <95091179+dattipoeHMCTS@users.noreply.github.com> Date: Wed, 12 Oct 2022 21:04:30 +0100 Subject: [PATCH 16/16] SSCS-10722 Remove processing venue restriction --- .../reform/migration/queries/CcdElasticSearchQueries.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/queries/CcdElasticSearchQueries.java b/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/queries/CcdElasticSearchQueries.java index c97b1749..d4926d89 100644 --- a/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/queries/CcdElasticSearchQueries.java +++ b/modules/processor/src/main/java/uk/gov/hmcts/reform/migration/queries/CcdElasticSearchQueries.java @@ -44,8 +44,7 @@ public static BoolQueryBuilder unsetCaseAccessManagementFieldsQuery() { .must(QueryBuilders.boolQuery() .should(existsQuery("data.appeal.appellant.address.postcode")) .should(existsQuery("data.appeal.appellant.appointee.address.postcode")) - .minimumShouldMatch(1) - .must(existsQuery("data.processingVenue"))) + .minimumShouldMatch(1)) .mustNot( QueryBuilders.boolQuery() .should(existsQuery("data.CaseAccessCategory"))