-
Notifications
You must be signed in to change notification settings - Fork 1.4k
7281 mdm bulk export expansion #7283
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 9 commits
2462ec9
56d44c0
9ebe21b
c11a7b7
c7b3aff
e9cbe01
c0b657e
95a899c
fd73b19
dc54459
4c13083
a9e4dd9
6a42ecb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
--- | ||
type: add | ||
issue: 7281 | ||
title: "Added MDM expansion support to bulk export jobs. | ||
The version of the bulk export job has been updated | ||
to accommodate this. | ||
" |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,7 +27,9 @@ | |
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings; | ||
import ca.uhn.fhir.jpa.api.dao.DaoRegistry; | ||
import ca.uhn.fhir.jpa.api.svc.IIdHelperService; | ||
import ca.uhn.fhir.jpa.api.svc.ResolveIdentityMode; | ||
import ca.uhn.fhir.jpa.bulk.export.api.IBulkExportProcessor; | ||
import ca.uhn.fhir.jpa.bulk.export.model.ExpandPatientIdsParams; | ||
import ca.uhn.fhir.jpa.bulk.export.model.ExportPIDIteratorParameters; | ||
import ca.uhn.fhir.jpa.dao.IResultIterator; | ||
import ca.uhn.fhir.jpa.dao.ISearchBuilder; | ||
|
@@ -36,12 +38,15 @@ | |
import ca.uhn.fhir.jpa.model.dao.JpaPid; | ||
import ca.uhn.fhir.jpa.model.search.SearchBuilderLoadIncludesParameters; | ||
import ca.uhn.fhir.jpa.model.search.SearchRuntimeDetails; | ||
import ca.uhn.fhir.jpa.searchparam.MatchUrlService; | ||
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; | ||
import ca.uhn.fhir.mdm.svc.MdmExpandersHolder; | ||
import ca.uhn.fhir.model.api.Include; | ||
import ca.uhn.fhir.model.primitive.IdDt; | ||
import ca.uhn.fhir.rest.api.server.SystemRequestDetails; | ||
import ca.uhn.fhir.rest.api.server.bulk.BulkExportJobParameters; | ||
import ca.uhn.fhir.rest.api.server.storage.BaseResourcePersistentId; | ||
import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId; | ||
import ca.uhn.fhir.rest.param.HasOrListParam; | ||
import ca.uhn.fhir.rest.param.HasParam; | ||
import ca.uhn.fhir.rest.param.ReferenceOrListParam; | ||
|
@@ -53,12 +58,14 @@ | |
import jakarta.annotation.Nonnull; | ||
import jakarta.persistence.EntityManager; | ||
import org.hl7.fhir.instance.model.api.IBaseResource; | ||
import org.hl7.fhir.instance.model.api.IIdType; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
import org.springframework.beans.factory.annotation.Autowired; | ||
|
||
import java.io.IOException; | ||
import java.util.ArrayList; | ||
import java.util.Collection; | ||
import java.util.HashSet; | ||
import java.util.Iterator; | ||
import java.util.LinkedHashSet; | ||
|
@@ -69,6 +76,7 @@ | |
|
||
import static ca.uhn.fhir.rest.api.Constants.PARAM_HAS; | ||
import static ca.uhn.fhir.rest.api.Constants.PARAM_ID; | ||
import static org.apache.commons.lang3.StringUtils.isNotBlank; | ||
|
||
public class JpaBulkExportProcessor implements IBulkExportProcessor<JpaPid> { | ||
private static final Logger ourLog = LoggerFactory.getLogger(JpaBulkExportProcessor.class); | ||
|
@@ -87,6 +95,7 @@ public class JpaBulkExportProcessor implements IBulkExportProcessor<JpaPid> { | |
private IHapiTransactionService myHapiTransactionService; | ||
private ISearchParamRegistry mySearchParamRegistry; | ||
private MdmExpandersHolder myMdmExpandersHolder; | ||
private MatchUrlService myMatchUrlService; | ||
|
||
@Autowired | ||
public JpaBulkExportProcessor( | ||
|
@@ -99,6 +108,7 @@ public JpaBulkExportProcessor( | |
EntityManager theEntityManager, | ||
IHapiTransactionService theHapiTransactionService, | ||
ISearchParamRegistry theSearchParamRegistry, | ||
MatchUrlService theMatchUrlService, | ||
MdmExpandersHolder theMdmExpandersHolder) { | ||
myContext = theContext; | ||
myBulkExportHelperSvc = theBulkExportHelperSvc; | ||
|
@@ -109,6 +119,7 @@ public JpaBulkExportProcessor( | |
myEntityManager = theEntityManager; | ||
myHapiTransactionService = theHapiTransactionService; | ||
mySearchParamRegistry = theSearchParamRegistry; | ||
myMatchUrlService = theMatchUrlService; | ||
myMdmExpandersHolder = theMdmExpandersHolder; | ||
} | ||
|
||
|
@@ -138,6 +149,108 @@ public Iterator<JpaPid> getResourcePidIterator(ExportPIDIteratorParameters thePa | |
}); | ||
} | ||
|
||
@Nonnull | ||
@Override | ||
public Set<JpaPid> expandPatientIdList(ExpandPatientIdsParams theParams) { | ||
return myHapiTransactionService | ||
.withSystemRequest() | ||
.withRequestPartitionId(theParams.getRequestPartitionId()) | ||
.readOnly() | ||
.execute(() -> { | ||
Set<JpaPid> patientPids = new HashSet<>(); | ||
switch (theParams.getExportStyle()) { | ||
case GROUP -> { | ||
populateListOfPatientIdsForGroupExport(theParams, patientPids); | ||
} | ||
case PATIENT -> { | ||
populateListOfPatientIdsForPatientExport(theParams, patientPids); | ||
} | ||
default -> { | ||
// nothing to do (patients parameter is not supported for system level exports) | ||
} | ||
} | ||
|
||
return patientPids; | ||
}); | ||
} | ||
|
||
private void populateListOfPatientIdsForPatientExport(ExpandPatientIdsParams theParams, Set<JpaPid> theJpaPids) | ||
throws IOException { | ||
RequestPartitionId partitionId = theParams.getRequestPartitionId(); | ||
List<SearchParameterMap> maps = makeSearchParameterMapsForPatientExport(theParams); | ||
|
||
Set<JpaPid> pids = | ||
new HashSet<>(getPatientPidsUsingSearchMaps(maps, null, null, theParams.getRequestPartitionId())); | ||
|
||
/* | ||
* the pids do not necessarily have the associated IIdType attached ot them. | ||
* But we need them, so we'll expand them out here. | ||
* This will be fast if the pid is still in the cache (which it should be) | ||
*/ | ||
myIdHelperService.fillOutPids(pids, myContext); | ||
|
||
if (theParams.isToDoMdmExpansion()) { | ||
Collection<IIdType> patientIds = pids.stream() | ||
.map(IResourcePersistentId::getAssociatedResourceId) | ||
.collect(Collectors.toList()); | ||
|
||
// MDM expansion requested -> we'll have MdmExpansionHolder do the work | ||
// of fetching mdm linked patients as well as converting all of them to | ||
// JpaPid | ||
Set<JpaPid> resolvedAndMdmExpanded = myMdmExpandersHolder | ||
.getBulkExportMDMResourceExpanderInstance() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. question: how is bulk export MDM pid expansion different than normal mdm pid expansion? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is related to the issue Luis found. Depending on whose code merges first, it'll likely be updated there. Internally, it's doing some check it shouldn't do. But this code here was just refactored from another place so i didn't delve into what it was doing internally. |
||
.expandPatients(patientIds, partitionId); | ||
theJpaPids.addAll(resolvedAndMdmExpanded); | ||
} else { | ||
theJpaPids.addAll(pids); | ||
} | ||
} | ||
|
||
private void populateListOfPatientIdsForGroupExport(ExpandPatientIdsParams theParams, Set<JpaPid> thePatientPids) | ||
throws IOException { | ||
RequestPartitionId partitionId = theParams.getRequestPartitionId(); | ||
|
||
// we have to apply the parameters to filter | ||
// the patients we want (ie, not all the members of the group necessarily fit the filters) | ||
// so first we get a set of SP maps | ||
RuntimeResourceDefinition def = myContext.getResourceDefinition("Patient"); | ||
List<SearchParameterMap> maps = myBulkExportHelperSvc.createSearchParameterMapsForResourcetype( | ||
def, theParams.getFilters(), theParams.getStartDate(), theParams.getEndDate(), true); | ||
|
||
// use those maps to get the patient ids we care about | ||
List<JpaPid> pids = | ||
getPatientPidsUsingSearchMaps(maps, theParams.getGroupId(), null, theParams.getRequestPartitionId()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. thought: I'm confused here. A group export exports for all patients in the group. Why would not all patients in the group be MDM-expanded? Don't we care about all the patient IDs? |
||
|
||
/* | ||
* and fill them out. | ||
* | ||
* Like with patient export above, the JpaPids here might not have | ||
* their associated resource id populated. | ||
* But since we need it, we'll "fill them out" here. | ||
* (should be fast, because the ids should be in the cache) | ||
*/ | ||
Set<JpaPid> pidsSet = new HashSet<>(pids); | ||
myIdHelperService.fillOutPids(pidsSet, myContext); | ||
|
||
Set<IIdType> patientIds = pidsSet.stream() | ||
.map(BaseResourcePersistentId::getAssociatedResourceId) | ||
.collect(Collectors.toSet()); | ||
|
||
if (theParams.isToDoMdmExpansion()) { | ||
// expand them out and add them to our list | ||
Set<JpaPid> jpaPids = myMdmExpandersHolder | ||
.getBulkExportMDMResourceExpanderInstance() | ||
.expandPatients(patientIds, partitionId); | ||
thePatientPids.addAll(jpaPids); | ||
} else { | ||
// no mdm expansion; just add them to the list | ||
myIdHelperService.resolveResourcePids( | ||
partitionId, | ||
patientIds, | ||
ResolveIdentityMode.excludeDeleted().cacheOk()); | ||
} | ||
} | ||
|
||
@SuppressWarnings("unchecked") | ||
private LinkedHashSet<JpaPid> getPidsForPatientStyleExport( | ||
ExportPIDIteratorParameters theParams, | ||
|
@@ -147,6 +260,7 @@ private LinkedHashSet<JpaPid> getPidsForPatientStyleExport( | |
RuntimeResourceDefinition def) | ||
throws IOException { | ||
LinkedHashSet<JpaPid> pids = new LinkedHashSet<>(); | ||
|
||
// Patient | ||
if (myStorageSettings.getIndexMissingFields() == JpaStorageSettings.IndexEnabledEnum.DISABLED) { | ||
String errorMessage = | ||
|
@@ -194,6 +308,7 @@ map, searchRuntime, new SystemRequestDetails(), theParams.getPartitionIdOrAllPar | |
} | ||
} | ||
} | ||
|
||
return pids; | ||
} | ||
|
||
|
@@ -203,12 +318,32 @@ private void filterBySpecificPatient( | |
String patientSearchParam, | ||
SearchParameterMap map) { | ||
if (resourceType.equalsIgnoreCase("Patient")) { | ||
if (theParams.getPatientIds() != null) { | ||
if (theParams.getExpandedPatientIds() != null) { | ||
ReferenceOrListParam referenceOrListParam = | ||
makeReferenceOrListParam(theParams.getExpandedPatientIds().stream() | ||
.map(f -> { | ||
return f.getAssociatedResourceId() | ||
.toUnqualifiedVersionless() | ||
.getValue(); | ||
}) | ||
.collect(Collectors.toList())); | ||
map.add(PARAM_ID, referenceOrListParam); | ||
} else if (theParams.getPatientIds() != null) { | ||
ReferenceOrListParam referenceOrListParam = makeReferenceOrListParam(theParams.getPatientIds()); | ||
map.add(PARAM_ID, referenceOrListParam); | ||
} | ||
} else { | ||
if (theParams.getPatientIds() != null) { | ||
if (theParams.getExpandedPatientIds() != null) { | ||
ReferenceOrListParam referenceOrListParam = | ||
makeReferenceOrListParam(theParams.getExpandedPatientIds().stream() | ||
.map(f -> { | ||
return f.getAssociatedResourceId() | ||
.toUnqualifiedVersionless() | ||
.getValue(); | ||
}) | ||
.collect(Collectors.toList())); | ||
map.add(patientSearchParam, referenceOrListParam); | ||
} else if (theParams.getPatientIds() != null) { | ||
TipzCM marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
ReferenceOrListParam referenceOrListParam = makeReferenceOrListParam(theParams.getPatientIds()); | ||
map.add(patientSearchParam, referenceOrListParam); | ||
} else { | ||
|
@@ -395,6 +530,14 @@ private void validateSearchParametersForGroup(SearchParameterMap expandedSpMap, | |
*/ | ||
private LinkedHashSet<JpaPid> getExpandedPatientList( | ||
ExportPIDIteratorParameters theParameters, boolean theConsiderDateRange) throws IOException { | ||
|
||
if (theParameters.getExpandedPatientIds() != null) { | ||
List<JpaPid> existingMembers = theParameters.getExpandedPatientIds().stream() | ||
.map(pid -> (JpaPid) pid) | ||
.toList(); | ||
return new LinkedHashSet<>(existingMembers); | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. thought: I don't understand the purpose of this method. THe first thing it does is check if its already expanded? Why do it at all? The caller should know if expansion has occurred or not. Are we conflating mdm expansion with group expansion? May be worthwhile to once-over the var names to disambiguate the word. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is kinda where we get into trouble with having shared logic. the "expanded patient ids" are for v3 V2 will get here and not have this. so they will continue to do the expansion just as before (per resource). This is because "FetchIds" happens in differnet places. The only other solution would be to manually put a parameter in that states what version it is and use that or just copy paste the code all over htep lace. I'm ok with eitehr ,if you'd prefer There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If there are different versions of the job, we should fork the code. It is unsafe for us to think we can be backwards compatible by being tricky like this. |
||
|
||
List<JpaPid> members = getMembersFromGroupWithFilter(theParameters, theConsiderDateRange); | ||
ourLog.info( | ||
"Group with ID [{}] has been expanded to {} members, member JpaIds: {}", | ||
|
@@ -417,23 +560,36 @@ private LinkedHashSet<JpaPid> getExpandedPatientList( | |
* | ||
* @return A list of strings representing the Patient IDs of the members (e.g. ["P1", "P2", "P3"] | ||
*/ | ||
@SuppressWarnings("unchecked") | ||
private List<JpaPid> getMembersFromGroupWithFilter( | ||
ExportPIDIteratorParameters theParameters, boolean theConsiderDateRange) throws IOException { | ||
final List<SearchParameterMap> maps = makeSearchParameterMaps(theParameters, theConsiderDateRange); | ||
final List<SearchParameterMap> maps = | ||
makeSearchParameterMapsForGroupExport(theParameters, theConsiderDateRange); | ||
|
||
return getPatientPidsUsingSearchMaps( | ||
maps, | ||
theParameters.getGroupId(), | ||
theParameters.getInstanceId(), | ||
theParameters.getPartitionIdOrAllPartitions()); | ||
} | ||
|
||
private List<JpaPid> getPatientPidsUsingSearchMaps( | ||
List<SearchParameterMap> maps, | ||
String theGroupId, | ||
String theInstanceId, | ||
RequestPartitionId theRequestPartitionId) | ||
throws IOException { | ||
final List<JpaPid> resPids = new ArrayList<>(); | ||
for (SearchParameterMap map : maps) { | ||
ISearchBuilder<JpaPid> searchBuilder = getSearchBuilderForResourceType("Patient"); | ||
ourLog.debug( | ||
"Searching for members of group {} with job instance {} with map {}", | ||
theParameters.getGroupId(), | ||
theParameters.getInstanceId(), | ||
map); | ||
if (isNotBlank(theGroupId)) { | ||
ourLog.debug( | ||
"Searching for members of group {} with job instance {} with map {}", | ||
theGroupId, | ||
theInstanceId, | ||
map); | ||
} | ||
try (IResultIterator<JpaPid> resultIterator = searchBuilder.createQuery( | ||
map, | ||
new SearchRuntimeDetails(null, theParameters.getInstanceId()), | ||
null, | ||
theParameters.getPartitionIdOrAllPartitions())) { | ||
map, new SearchRuntimeDetails(null, theInstanceId), null, theRequestPartitionId)) { | ||
|
||
while (resultIterator.hasNext()) { | ||
resPids.add(resultIterator.next()); | ||
|
@@ -444,7 +600,7 @@ private List<JpaPid> getMembersFromGroupWithFilter( | |
} | ||
|
||
@Nonnull | ||
private List<SearchParameterMap> makeSearchParameterMaps( | ||
private List<SearchParameterMap> makeSearchParameterMapsForGroupExport( | ||
@Nonnull ExportPIDIteratorParameters theParameters, boolean theConsiderDateRange) { | ||
final RuntimeResourceDefinition def = myContext.getResourceDefinition("Patient"); | ||
final List<SearchParameterMap> maps = myBulkExportHelperSvc.createSearchParameterMapsForResourceType( | ||
|
@@ -459,6 +615,21 @@ private List<SearchParameterMap> makeSearchParameterMaps( | |
return maps; | ||
} | ||
|
||
private List<SearchParameterMap> makeSearchParameterMapsForPatientExport(ExpandPatientIdsParams theParams) { | ||
final RuntimeResourceDefinition def = myContext.getResourceDefinition("Patient"); | ||
final List<SearchParameterMap> maps = myBulkExportHelperSvc.createSearchParameterMapsForResourcetype( | ||
def, theParams.getFilters(), theParams.getStartDate(), theParams.getEndDate(), true); | ||
|
||
if (!theParams.getPatientIds().isEmpty()) { | ||
// Patient Instance Export | ||
maps.forEach(map -> { | ||
map.add(PARAM_ID, makeReferenceOrListParam(theParams.getPatientIds())); | ||
}); | ||
} | ||
|
||
return maps; | ||
} | ||
|
||
@Nonnull | ||
private HasOrListParam makeGroupMemberHasOrListParam(@Nonnull String theGroupId) { | ||
final HasParam hasParam = new HasParam("Group", "member", "_id", theGroupId); | ||
|
Uh oh!
There was an error while loading. Please reload this page.