Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.svc.IIdHelperService;
import ca.uhn.fhir.jpa.api.svc.ResolveIdentityMode;
import ca.uhn.fhir.jpa.bulk.export.api.IBulkExportProcessor;
import ca.uhn.fhir.jpa.bulk.export.model.ExpandPatientIdsParams;
import ca.uhn.fhir.jpa.bulk.export.model.ExportPIDIteratorParameters;
import ca.uhn.fhir.jpa.dao.IResultIterator;
import ca.uhn.fhir.jpa.dao.ISearchBuilder;
Expand All @@ -36,12 +38,15 @@
import ca.uhn.fhir.jpa.model.dao.JpaPid;
import ca.uhn.fhir.jpa.model.search.SearchBuilderLoadIncludesParameters;
import ca.uhn.fhir.jpa.model.search.SearchRuntimeDetails;
import ca.uhn.fhir.jpa.searchparam.MatchUrlService;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.mdm.svc.MdmExpandersHolder;
import ca.uhn.fhir.model.api.Include;
import ca.uhn.fhir.model.primitive.IdDt;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ca.uhn.fhir.rest.api.server.bulk.BulkExportJobParameters;
import ca.uhn.fhir.rest.api.server.storage.BaseResourcePersistentId;
import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId;
import ca.uhn.fhir.rest.param.HasOrListParam;
import ca.uhn.fhir.rest.param.HasParam;
import ca.uhn.fhir.rest.param.ReferenceOrListParam;
Expand All @@ -53,12 +58,14 @@
import jakarta.annotation.Nonnull;
import jakarta.persistence.EntityManager;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
Expand All @@ -69,6 +76,7 @@

import static ca.uhn.fhir.rest.api.Constants.PARAM_HAS;
import static ca.uhn.fhir.rest.api.Constants.PARAM_ID;
import static org.apache.commons.lang3.StringUtils.isNotBlank;

public class JpaBulkExportProcessor implements IBulkExportProcessor<JpaPid> {
private static final Logger ourLog = LoggerFactory.getLogger(JpaBulkExportProcessor.class);
Expand All @@ -87,6 +95,7 @@ public class JpaBulkExportProcessor implements IBulkExportProcessor<JpaPid> {
private IHapiTransactionService myHapiTransactionService;
private ISearchParamRegistry mySearchParamRegistry;
private MdmExpandersHolder myMdmExpandersHolder;
private MatchUrlService myMatchUrlService;

@Autowired
public JpaBulkExportProcessor(
Expand All @@ -99,6 +108,7 @@ public JpaBulkExportProcessor(
EntityManager theEntityManager,
IHapiTransactionService theHapiTransactionService,
ISearchParamRegistry theSearchParamRegistry,
MatchUrlService theMatchUrlService,
MdmExpandersHolder theMdmExpandersHolder) {
myContext = theContext;
myBulkExportHelperSvc = theBulkExportHelperSvc;
Expand All @@ -109,6 +119,7 @@ public JpaBulkExportProcessor(
myEntityManager = theEntityManager;
myHapiTransactionService = theHapiTransactionService;
mySearchParamRegistry = theSearchParamRegistry;
myMatchUrlService = theMatchUrlService;
myMdmExpandersHolder = theMdmExpandersHolder;
}

Expand Down Expand Up @@ -138,6 +149,108 @@ public Iterator<JpaPid> getResourcePidIterator(ExportPIDIteratorParameters thePa
});
}

@Nonnull
@Override
public Set<JpaPid> expandPatientIdList(ExpandPatientIdsParams theParams) {
return myHapiTransactionService
.withSystemRequest()
.withRequestPartitionId(theParams.getRequestPartitionId())
.readOnly()
.execute(() -> {
Set<JpaPid> patientPids = new HashSet<>();
switch (theParams.getExportStyle()) {
case GROUP -> {
populateListOfPatientIdsForGroupExport(theParams, patientPids);
}
case PATIENT -> {
populateListOfPatientIdsForPatientExport(theParams, patientPids);
}
default -> {
// nothing to do (patients parameter is not supported for system level exports)
}
}

return patientPids;
});
}

private void populateListOfPatientIdsForPatientExport(ExpandPatientIdsParams theParams, Set<JpaPid> theJpaPids)
throws IOException {
RequestPartitionId partitionId = theParams.getRequestPartitionId();
List<SearchParameterMap> maps = makeSearchParameterMapsForPatientExport(theParams);

Set<JpaPid> pids =
new HashSet<>(getPatientPidsUsingSearchMaps(maps, null, null, theParams.getRequestPartitionId()));

/*
* the pids do not necessarily have the associated IIdType attached ot them.
* But we need them, so we'll expand them out here.
* This will be fast if the pid is still in the cache (which it should be)
*/
myIdHelperService.fillOutPids(pids, myContext);

if (theParams.isToDoMdmExpansion()) {
Collection<IIdType> patientIds = pids.stream()
.map(IResourcePersistentId::getAssociatedResourceId)
.collect(Collectors.toList());

// MDM expansion requested -> we'll have MdmExpansionHolder do the work
// of fetching mdm linked patients as well as converting all of them to
// JpaPid
Set<JpaPid> resolvedAndMdmExpanded = myMdmExpandersHolder
.getBulkExportMDMResourceExpanderInstance()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: how is bulk export MDM pid expansion different than normal mdm pid expansion?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is related to the issue Luis found.

Depending on whose code merges first, it'll likely be updated there.

Internally, it's doing some check it shouldn't do. But this code here was just refactored from another place so i didn't delve into what it was doing internally.

.expandPatients(patientIds, partitionId);
theJpaPids.addAll(resolvedAndMdmExpanded);
} else {
theJpaPids.addAll(pids);
}
}

private void populateListOfPatientIdsForGroupExport(ExpandPatientIdsParams theParams, Set<JpaPid> thePatientPids)
throws IOException {
RequestPartitionId partitionId = theParams.getRequestPartitionId();

// we have to apply the parameters to filter
// the patients we want (ie, not all the members of the group necessarily fit the filters)
// so first we get a set of SP maps
RuntimeResourceDefinition def = myContext.getResourceDefinition("Patient");
List<SearchParameterMap> maps = myBulkExportHelperSvc.createSearchParameterMapsForResourcetype(
def, theParams.getFilters(), theParams.getStartDate(), theParams.getEndDate(), true);

// use those maps to get the patient ids we care about
List<JpaPid> pids =
getPatientPidsUsingSearchMaps(maps, theParams.getGroupId(), null, theParams.getRequestPartitionId());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thought: I'm confused here. A group export exports for all patients in the group. Why would not all patients in the group be MDM-expanded? Don't we care about all the patient IDs?


/*
* and fill them out.
*
* Like with patient export above, the JpaPids here might not have
* their associated resource id populated.
* But since we need it, we'll "fill them out" here.
* (should be fast, because the ids should be in the cache)
*/
Set<JpaPid> pidsSet = new HashSet<>(pids);
myIdHelperService.fillOutPids(pidsSet, myContext);

Set<IIdType> patientIds = pidsSet.stream()
.map(BaseResourcePersistentId::getAssociatedResourceId)
.collect(Collectors.toSet());

if (theParams.isToDoMdmExpansion()) {
// expand them out and add them to our list
Set<JpaPid> jpaPids = myMdmExpandersHolder
.getBulkExportMDMResourceExpanderInstance()
.expandPatients(patientIds, partitionId);
thePatientPids.addAll(jpaPids);
} else {
// no mdm expansion; just add them to the list
myIdHelperService.resolveResourcePids(
partitionId,
patientIds,
ResolveIdentityMode.excludeDeleted().cacheOk());
}
}

@SuppressWarnings("unchecked")
private LinkedHashSet<JpaPid> getPidsForPatientStyleExport(
ExportPIDIteratorParameters theParams,
Expand All @@ -147,6 +260,7 @@ private LinkedHashSet<JpaPid> getPidsForPatientStyleExport(
RuntimeResourceDefinition def)
throws IOException {
LinkedHashSet<JpaPid> pids = new LinkedHashSet<>();

// Patient
if (myStorageSettings.getIndexMissingFields() == JpaStorageSettings.IndexEnabledEnum.DISABLED) {
String errorMessage =
Expand Down Expand Up @@ -194,6 +308,7 @@ map, searchRuntime, new SystemRequestDetails(), theParams.getPartitionIdOrAllPar
}
}
}

return pids;
}

Expand All @@ -203,12 +318,32 @@ private void filterBySpecificPatient(
String patientSearchParam,
SearchParameterMap map) {
if (resourceType.equalsIgnoreCase("Patient")) {
if (theParams.getPatientIds() != null) {
if (theParams.getExpandedPatientIds() != null) {
ReferenceOrListParam referenceOrListParam =
makeReferenceOrListParam(theParams.getExpandedPatientIds().stream()
.map(f -> {
return f.getAssociatedResourceId()
.toUnqualifiedVersionless()
.getValue();
})
.collect(Collectors.toList()));
map.add(PARAM_ID, referenceOrListParam);
} else if (theParams.getPatientIds() != null) {
ReferenceOrListParam referenceOrListParam = makeReferenceOrListParam(theParams.getPatientIds());
map.add(PARAM_ID, referenceOrListParam);
}
} else {
if (theParams.getPatientIds() != null) {
if (theParams.getExpandedPatientIds() != null) {
ReferenceOrListParam referenceOrListParam =
makeReferenceOrListParam(theParams.getExpandedPatientIds().stream()
.map(f -> {
return f.getAssociatedResourceId()
.toUnqualifiedVersionless()
.getValue();
})
.collect(Collectors.toList()));
map.add(patientSearchParam, referenceOrListParam);
} else if (theParams.getPatientIds() != null) {
ReferenceOrListParam referenceOrListParam = makeReferenceOrListParam(theParams.getPatientIds());
map.add(patientSearchParam, referenceOrListParam);
} else {
Expand Down Expand Up @@ -395,6 +530,14 @@ private void validateSearchParametersForGroup(SearchParameterMap expandedSpMap,
*/
private LinkedHashSet<JpaPid> getExpandedPatientList(
ExportPIDIteratorParameters theParameters, boolean theConsiderDateRange) throws IOException {

if (theParameters.getExpandedPatientIds() != null) {
List<JpaPid> existingMembers = theParameters.getExpandedPatientIds().stream()
.map(pid -> (JpaPid) pid)
.toList();
return new LinkedHashSet<>(existingMembers);
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thought: I don't understand the purpose of this method. THe first thing it does is check if its already expanded? Why do it at all? The caller should know if expansion has occurred or not. Are we conflating mdm expansion with group expansion? May be worthwhile to once-over the var names to disambiguate the word.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is kinda where we get into trouble with having shared logic.

the "expanded patient ids" are for v3

V2 will get here and not have this. so they will continue to do the expansion just as before (per resource).

This is because "FetchIds" happens in differnet places.

The only other solution would be to manually put a parameter in that states what version it is and use that or just copy paste the code all over htep lace.

I'm ok with eitehr ,if you'd prefer

Copy link
Contributor

@michaelabuckley michaelabuckley Oct 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there are different versions of the job, we should fork the code. It is unsafe for us to think we can be backwards compatible by being tricky like this.


List<JpaPid> members = getMembersFromGroupWithFilter(theParameters, theConsiderDateRange);
ourLog.info(
"Group with ID [{}] has been expanded to {} members, member JpaIds: {}",
Expand All @@ -417,23 +560,36 @@ private LinkedHashSet<JpaPid> getExpandedPatientList(
*
* @return A list of strings representing the Patient IDs of the members (e.g. ["P1", "P2", "P3"]
*/
@SuppressWarnings("unchecked")
private List<JpaPid> getMembersFromGroupWithFilter(
ExportPIDIteratorParameters theParameters, boolean theConsiderDateRange) throws IOException {
final List<SearchParameterMap> maps = makeSearchParameterMaps(theParameters, theConsiderDateRange);
final List<SearchParameterMap> maps =
makeSearchParameterMapsForGroupExport(theParameters, theConsiderDateRange);

return getPatientPidsUsingSearchMaps(
maps,
theParameters.getGroupId(),
theParameters.getInstanceId(),
theParameters.getPartitionIdOrAllPartitions());
}

private List<JpaPid> getPatientPidsUsingSearchMaps(
List<SearchParameterMap> maps,
String theGroupId,
String theInstanceId,
RequestPartitionId theRequestPartitionId)
throws IOException {
final List<JpaPid> resPids = new ArrayList<>();
for (SearchParameterMap map : maps) {
ISearchBuilder<JpaPid> searchBuilder = getSearchBuilderForResourceType("Patient");
ourLog.debug(
"Searching for members of group {} with job instance {} with map {}",
theParameters.getGroupId(),
theParameters.getInstanceId(),
map);
if (isNotBlank(theGroupId)) {
ourLog.debug(
"Searching for members of group {} with job instance {} with map {}",
theGroupId,
theInstanceId,
map);
}
try (IResultIterator<JpaPid> resultIterator = searchBuilder.createQuery(
map,
new SearchRuntimeDetails(null, theParameters.getInstanceId()),
null,
theParameters.getPartitionIdOrAllPartitions())) {
map, new SearchRuntimeDetails(null, theInstanceId), null, theRequestPartitionId)) {

while (resultIterator.hasNext()) {
resPids.add(resultIterator.next());
Expand All @@ -444,7 +600,7 @@ private List<JpaPid> getMembersFromGroupWithFilter(
}

@Nonnull
private List<SearchParameterMap> makeSearchParameterMaps(
private List<SearchParameterMap> makeSearchParameterMapsForGroupExport(
@Nonnull ExportPIDIteratorParameters theParameters, boolean theConsiderDateRange) {
final RuntimeResourceDefinition def = myContext.getResourceDefinition("Patient");
final List<SearchParameterMap> maps = myBulkExportHelperSvc.createSearchParameterMapsForResourceType(
Expand All @@ -459,6 +615,21 @@ private List<SearchParameterMap> makeSearchParameterMaps(
return maps;
}

private List<SearchParameterMap> makeSearchParameterMapsForPatientExport(ExpandPatientIdsParams theParams) {
final RuntimeResourceDefinition def = myContext.getResourceDefinition("Patient");
final List<SearchParameterMap> maps = myBulkExportHelperSvc.createSearchParameterMapsForResourcetype(
def, theParams.getFilters(), theParams.getStartDate(), theParams.getEndDate(), true);

if (!theParams.getPatientIds().isEmpty()) {
// Patient Instance Export
maps.forEach(map -> {
map.add(PARAM_ID, makeReferenceOrListParam(theParams.getPatientIds()));
});
}

return maps;
}

@Nonnull
private HasOrListParam makeGroupMemberHasOrListParam(@Nonnull String theGroupId) {
final HasParam hasParam = new HasParam("Group", "member", "_id", theGroupId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import ca.uhn.fhir.jpa.dao.SearchBuilderFactory;
import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
import ca.uhn.fhir.jpa.model.dao.JpaPid;
import ca.uhn.fhir.jpa.searchparam.MatchUrlService;
import ca.uhn.fhir.mdm.svc.MdmExpandersHolder;
import ca.uhn.fhir.rest.server.util.ISearchParamRegistry;
import jakarta.persistence.EntityManager;
Expand All @@ -48,6 +49,7 @@ public IBulkExportProcessor<JpaPid> jpaBulkExportProcessor(
EntityManager theEntityManager,
IHapiTransactionService theHapiTransactionService,
ISearchParamRegistry theSearchParamRegistry,
MatchUrlService theMatchUrlService,
MdmExpandersHolder theMdmExpandersHolder) {
return new JpaBulkExportProcessor(
theFhirContext,
Expand All @@ -59,6 +61,7 @@ public IBulkExportProcessor<JpaPid> jpaBulkExportProcessor(
theEntityManager,
theHapiTransactionService,
theSearchParamRegistry,
theMatchUrlService,
theMdmExpandersHolder);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2299,12 +2299,13 @@ private <V> List<V> searchForTransformedIds(
.searchList(() -> {
ISearchBuilder<JpaPid> builder =
mySearchBuilderFactory.newSearchBuilder(getResourceName(), getResourceType());
Stream<JpaPid> pidStream =
builder.createQueryStream(theParams, searchRuntimeDetails, theRequest, requestPartitionId);
try (Stream<JpaPid> pidStream = builder.createQueryStream(
theParams, searchRuntimeDetails, theRequest, requestPartitionId)) {

Stream<V> transformedStream = transform.apply(theRequest, pidStream, requestPartitionId);

return transformedStream.collect(Collectors.toList());
try (Stream<V> transformedStream = transform.apply(theRequest, pidStream, requestPartitionId)) {
return transformedStream.collect(Collectors.toList());
}
}
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,8 +273,6 @@ public void getResourcePidIterator_groupExportStyleWithPatientResource_returnsIt
pids.add(JpaPid.fromId(type.getIdPartAsLong()));
}



parameters.setExpandMdm(theMdm); // set mdm expansion
parameters.setPartitionId(getPartitionIdFromParams(thePartitioned));

Expand Down Expand Up @@ -327,7 +325,6 @@ private void validatePartitionId(boolean thePartitioned, RequestPartitionId theP
} else {
assertEquals(RequestPartitionId.allPartitions(), thePartitionId);
}

}

// source is: "isExpandMdm,(whether or not to test on a specific partition)
Expand Down
Loading
Loading