-
Notifications
You must be signed in to change notification settings - Fork 1
Feature/sscs 10722 migration tool performance improvements #9
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
dattipoeHMCTS
wants to merge
16
commits into
master
Choose a base branch
from
feature/sscs-10722-migration-tool-performance-improvements
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
16 commits
Select commit
Hold shift + click to select a range
c2d54b3
SSCS-10722 update to use elastic search and a daily dataset
chrisdavecm e11e7b2
SSCS-10722
wmarshid 4f2e626
SSCS-10722:
wmarshid a32c330
SSCS-10722:
wmarshid 91d92d1
SSCS-10722: improved pagination logic and added parallel processing c…
wmarshid f40d16b
SSCS-10722: counting total cases and improved logging
wmarshid 7d02a03
SSCS-10722: fixed logic for processing parallel streams
wmarshid 1016c1b
SSCS-10722: code improvements
wmarshid ecc67cb
SSCS-10722 Migration improvements
dattipoeHMCTS 9fab97e
SSCS-10722 Add tests, improve logging.
dattipoeHMCTS 0dd05e8
SSCS-10722 Specify loggers, use set for cases.
dattipoeHMCTS ab6703b
SSCS-10722 Pass case ID to migration steps.
dattipoeHMCTS a8aeba1
SSCS-10722 Ensure at least appellant or appointee postcode exits.
dattipoeHMCTS d3ef861
SSCS-10722 Ensure processing venue exists.
dattipoeHMCTS f43a4a4
SSCS-10722 Add logging for failed cases.
dattipoeHMCTS 64b85b4
SSCS-10722 Remove processing venue restriction
dattipoeHMCTS File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
177 changes: 151 additions & 26 deletions
177
modules/processor/src/main/java/uk/gov/hmcts/reform/migration/CaseMigrationProcessor.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,65 +1,190 @@ | ||
| package uk.gov.hmcts.reform.migration; | ||
|
|
||
| import java.time.LocalDate; | ||
| import java.util.HashSet; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.Set; | ||
| import java.util.concurrent.ExecutorService; | ||
| import java.util.concurrent.Executors; | ||
| import java.util.concurrent.TimeUnit; | ||
| import java.util.stream.Collectors; | ||
|
|
||
| import lombok.Getter; | ||
| import lombok.extern.slf4j.Slf4j; | ||
| import org.springframework.beans.factory.annotation.Autowired; | ||
| import lombok.RequiredArgsConstructor; | ||
| import org.apache.commons.lang3.time.StopWatch; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
| import org.springframework.stereotype.Component; | ||
| import uk.gov.hmcts.reform.ccd.client.model.CaseDetails; | ||
| import uk.gov.hmcts.reform.migration.service.DataMigrationService; | ||
| import uk.gov.hmcts.reform.ccd.client.model.SearchResult; | ||
| import uk.gov.hmcts.reform.migration.ccd.CoreCaseDataService; | ||
| import uk.gov.hmcts.reform.migration.service.DataMigrationService; | ||
|
|
||
| import java.util.ArrayList; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import static uk.gov.hmcts.reform.migration.queries.CcdElasticSearchQueries.fetchAllUnsetCaseAccessManagementFieldsCasesQuery; | ||
|
|
||
| @Slf4j | ||
| @Component | ||
| @RequiredArgsConstructor | ||
| public class CaseMigrationProcessor { | ||
|
|
||
| private final Logger errorLogger = LoggerFactory.getLogger("ccd-migration-error"); | ||
|
|
||
| private final Logger infoLogger = LoggerFactory.getLogger("ccd-migration-info"); | ||
|
|
||
| private static final String EVENT_ID = "migrateCase"; | ||
| private static final String EVENT_SUMMARY = "Migrate Case"; | ||
| private static final String EVENT_DESCRIPTION = "Migrate Case"; | ||
|
|
||
| @Autowired | ||
| private CoreCaseDataService coreCaseDataService; | ||
| @Autowired | ||
| private DataMigrationService<?> dataMigrationService; | ||
| private final StopWatch totalTimer = new StopWatch(); | ||
|
|
||
| private final CoreCaseDataService coreCaseDataService; | ||
| private final DataMigrationService<?> dataMigrationService; | ||
|
|
||
| @Getter | ||
| private final Set<Long> migratedCases = new HashSet<>(); | ||
|
|
||
| @Getter | ||
| private List<Long> migratedCases = new ArrayList<>(); | ||
| private final Set<Long> failedCases = new HashSet<>(); | ||
|
|
||
| @Getter | ||
| private List<Long> failedCases = new ArrayList<>(); | ||
| private Long totalCases = 0L; | ||
|
|
||
| public void processSingleCase(String userToken, String caseId) { | ||
| public void processSingleCase(String userToken, String caseId, boolean dryRun) { | ||
| CaseDetails caseDetails; | ||
| try { | ||
| caseDetails = coreCaseDataService.fetchOne(userToken, caseId); | ||
| } catch (Exception ex) { | ||
| log.error("Case {} not found due to: {}", caseId, ex.getMessage()); | ||
| errorLogger.error("Case {} not found due to: {}", caseId, ex.getMessage()); | ||
| return; | ||
| } | ||
| if (dataMigrationService.accepts().test(caseDetails)) { | ||
| updateCase(userToken, caseDetails.getId(), caseDetails.getData()); | ||
| updateCase(userToken, caseDetails.getId(), caseDetails.getData(), dryRun); | ||
| } else { | ||
| infoLogger.info("Case {} already migrated", caseDetails.getId()); | ||
| } | ||
| } | ||
|
|
||
| public void fetchAndProcessCases(String userToken, boolean dryRun, int numThreads, MigrationPageParams pageParams) | ||
| throws InterruptedException { | ||
|
|
||
| SearchResult initialSearch = coreCaseDataService.searchCases(userToken, | ||
| fetchAllUnsetCaseAccessManagementFieldsCasesQuery()); | ||
|
|
||
| if (initialSearch.getTotal() <= 0) { | ||
| return; | ||
| } | ||
|
|
||
| totalTimer.start(); | ||
|
|
||
| int totalCasesToProcess = resolveTotalCasesToProcess(initialSearch, pageParams.getMaxCasesToProcess()); | ||
|
|
||
| Long searchFrom = handleFirstCase(userToken, dryRun, initialSearch); | ||
|
|
||
| ExecutorService executorService = Executors.newFixedThreadPool(numThreads); | ||
|
|
||
| fetchAndSubmitTasks(userToken, dryRun, totalCasesToProcess, pageParams.getPageSize(), searchFrom, | ||
| executorService); | ||
|
|
||
| executorService.shutdown(); | ||
| executorService.awaitTermination(Integer.MAX_VALUE, TimeUnit.DAYS); | ||
| } | ||
|
|
||
| private int resolveTotalCasesToProcess(SearchResult initialSearch, int maxCasesToProcess) { | ||
| int totalCasesToProcess = 0; | ||
|
|
||
| if (maxCasesToProcess > 0) { | ||
| infoLogger.info("Manual case override in use, limiting to {} cases", maxCasesToProcess); | ||
| totalCasesToProcess = maxCasesToProcess; | ||
| } else { | ||
| log.info("Case {} already migrated", caseDetails.getId()); | ||
| infoLogger.info("No manual case override in use, fetching all: {} cases", initialSearch.getTotal()); | ||
| totalCasesToProcess = initialSearch.getTotal(); | ||
| } | ||
|
|
||
| return totalCasesToProcess; | ||
| } | ||
|
|
||
| private void fetchAndSubmitTasks(String userToken, boolean dryRun, int totalCasesToProcess, int pageSize, | ||
| Long searchFrom, ExecutorService executorService) { | ||
| int casesFetched = 1; | ||
| int numCasesToFetch = pageSize; | ||
|
|
||
| while (casesFetched < totalCasesToProcess) { | ||
| numCasesToFetch = resolvePageSize(totalCasesToProcess, casesFetched, numCasesToFetch, pageSize); | ||
|
|
||
| List<CaseDetails> caseDetails = | ||
| coreCaseDataService.fetchNCases(userToken, numCasesToFetch, searchFrom); | ||
|
|
||
| if (caseDetails.isEmpty()) { | ||
| break; | ||
| } | ||
|
|
||
| searchFrom = caseDetails.get(caseDetails.size() - 1).getId(); | ||
|
|
||
| executorService.execute(() -> caseDetails | ||
| .forEach(caseDetail -> | ||
| updateCase(userToken, caseDetail.getId(), caseDetail.getData(), dryRun))); | ||
|
|
||
| infoLogger.info("New task submitted"); | ||
|
|
||
| casesFetched += caseDetails.size(); | ||
|
|
||
| infoLogger.info("{} cases fetched out of {}", casesFetched, totalCasesToProcess); | ||
| } | ||
| } | ||
|
|
||
| public void processAllCases(String userToken, String userId) { | ||
| private int resolvePageSize(int totalCasesToProcess, int casesFetched, int numCasesToFetch, int pageSize) { | ||
| int remainingCases = totalCasesToProcess - casesFetched; | ||
| if (remainingCases < pageSize) { | ||
| numCasesToFetch = remainingCases; | ||
| } | ||
| return numCasesToFetch; | ||
| } | ||
|
|
||
| private Long handleFirstCase(String userToken, boolean dryRun, SearchResult initialSearch) { | ||
| infoLogger.info("Processing first case..."); | ||
| CaseDetails firstCase = initialSearch.getCases().get(0); | ||
| updateCase(userToken, firstCase.getId(), firstCase.getData(), dryRun); | ||
| return firstCase.getId(); | ||
| } | ||
|
|
||
| public void processAllCases(String userToken, String userId, boolean dryRun) { | ||
| coreCaseDataService.fetchAll(userToken, userId).stream() | ||
| .filter(dataMigrationService.accepts()) | ||
| .forEach(caseDetails -> updateCase(userToken, caseDetails.getId(), caseDetails.getData())); | ||
| .forEach(caseDetails -> updateCase(userToken, caseDetails.getId(), caseDetails.getData(), dryRun)); | ||
| } | ||
|
|
||
| private void updateCase(String authorisation, Long id, Map<String, Object> data) { | ||
| log.info("Updating case {}", id); | ||
| protected List<LocalDate> getListOfDates(LocalDate startDate, LocalDate endDate) { | ||
| return startDate | ||
| .datesUntil(endDate) | ||
| .collect(Collectors.toList()); | ||
| } | ||
|
|
||
| private void updateCase(String authorisation, Long id, Map<String, Object> data, boolean dryRun) { | ||
|
|
||
| totalCases++; | ||
|
|
||
| try { | ||
| log.debug("Case data: {}", data); | ||
| coreCaseDataService.update(authorisation, id.toString(), | ||
| EVENT_ID, EVENT_SUMMARY, EVENT_DESCRIPTION, dataMigrationService.migrate(data)); | ||
| log.info("Case {} successfully updated", id); | ||
| var migratedData = dataMigrationService.migrate(data, id); | ||
| if (!dryRun) { | ||
| coreCaseDataService.update( | ||
| authorisation, | ||
| id.toString(), | ||
| EVENT_ID, | ||
| EVENT_SUMMARY, | ||
| EVENT_DESCRIPTION, | ||
| migratedData); | ||
| infoLogger.info("Case {} successfully updated", id); | ||
| } | ||
| migratedCases.add(id); | ||
|
|
||
| } catch (Exception e) { | ||
| log.error("Case {} update failed due to: {}", id, e.getMessage()); | ||
| errorLogger.error("Case {} update failed due to: {}", id, e.getMessage()); | ||
| failedCases.add(id); | ||
| } | ||
|
|
||
| if (totalCases % 1000 == 0) { | ||
| infoLogger.info("----------{} cases migrated in {} minutes ({} seconds)----------", totalCases, | ||
| totalTimer.getTime(TimeUnit.MINUTES), totalTimer.getTime(TimeUnit.SECONDS)); | ||
| } | ||
| } | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
14 changes: 14 additions & 0 deletions
14
modules/processor/src/main/java/uk/gov/hmcts/reform/migration/MigrationPageParams.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,14 @@ | ||
| package uk.gov.hmcts.reform.migration; | ||
|
|
||
|
|
||
| import lombok.Getter; | ||
| import lombok.RequiredArgsConstructor; | ||
|
|
||
| @Getter | ||
| @RequiredArgsConstructor | ||
| public class MigrationPageParams { | ||
|
|
||
| private final int pageSize; | ||
|
|
||
| private final int maxCasesToProcess; | ||
| } |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thats a worryingly long time to wait. Should we not have a smaller timeout (or configurable) ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just to make sure all the threads have finished before the executor service shuts them down, can reduce it, though
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After our conversation, I think this is fine.