diff --git a/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/8_6_0/7281-add-mdm-expansion-to-patient-exports.yaml b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/8_6_0/7281-add-mdm-expansion-to-patient-exports.yaml new file mode 100644 index 000000000000..68546867d9ce --- /dev/null +++ b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/8_6_0/7281-add-mdm-expansion-to-patient-exports.yaml @@ -0,0 +1,7 @@ +--- +type: add +issue: 7281 +title: "Added MDM expansion support to bulk export jobs. + The version of the bulk export job has been updated + to accommodate this. +" diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/export/svc/JpaBulkExportProcessor.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/export/svc/JpaBulkExportProcessor.java index 79485f5d3d90..9116c988e7c2 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/export/svc/JpaBulkExportProcessor.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/export/svc/JpaBulkExportProcessor.java @@ -27,7 +27,9 @@ import ca.uhn.fhir.jpa.api.config.JpaStorageSettings; import ca.uhn.fhir.jpa.api.dao.DaoRegistry; import ca.uhn.fhir.jpa.api.svc.IIdHelperService; +import ca.uhn.fhir.jpa.api.svc.ResolveIdentityMode; import ca.uhn.fhir.jpa.bulk.export.api.IBulkExportProcessor; +import ca.uhn.fhir.jpa.bulk.export.model.ExpandPatientIdsParams; import ca.uhn.fhir.jpa.bulk.export.model.ExportPIDIteratorParameters; import ca.uhn.fhir.jpa.dao.IResultIterator; import ca.uhn.fhir.jpa.dao.ISearchBuilder; @@ -36,12 +38,16 @@ import ca.uhn.fhir.jpa.model.dao.JpaPid; import ca.uhn.fhir.jpa.model.search.SearchBuilderLoadIncludesParameters; import ca.uhn.fhir.jpa.model.search.SearchRuntimeDetails; +import ca.uhn.fhir.jpa.searchparam.MatchUrlService; import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; import ca.uhn.fhir.mdm.svc.MdmExpandersHolder; +import ca.uhn.fhir.model.api.IQueryParameterType; import ca.uhn.fhir.model.api.Include; import ca.uhn.fhir.model.primitive.IdDt; import ca.uhn.fhir.rest.api.server.SystemRequestDetails; import ca.uhn.fhir.rest.api.server.bulk.BulkExportJobParameters; +import ca.uhn.fhir.rest.api.server.storage.BaseResourcePersistentId; +import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId; import ca.uhn.fhir.rest.param.HasOrListParam; import ca.uhn.fhir.rest.param.HasParam; import ca.uhn.fhir.rest.param.ReferenceOrListParam; @@ -53,12 +59,14 @@ import jakarta.annotation.Nonnull; import jakarta.persistence.EntityManager; import org.hl7.fhir.instance.model.api.IBaseResource; +import org.hl7.fhir.instance.model.api.IIdType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashSet; @@ -69,6 +77,7 @@ import static ca.uhn.fhir.rest.api.Constants.PARAM_HAS; import static ca.uhn.fhir.rest.api.Constants.PARAM_ID; +import static org.apache.commons.lang3.StringUtils.isNotBlank; public class JpaBulkExportProcessor implements IBulkExportProcessor { private static final Logger ourLog = LoggerFactory.getLogger(JpaBulkExportProcessor.class); @@ -87,6 +96,7 @@ public class JpaBulkExportProcessor implements IBulkExportProcessor { private IHapiTransactionService myHapiTransactionService; private ISearchParamRegistry mySearchParamRegistry; private MdmExpandersHolder myMdmExpandersHolder; + private MatchUrlService myMatchUrlService; @Autowired public JpaBulkExportProcessor( @@ -99,6 +109,7 @@ public JpaBulkExportProcessor( EntityManager theEntityManager, IHapiTransactionService theHapiTransactionService, ISearchParamRegistry theSearchParamRegistry, + MatchUrlService theMatchUrlService, MdmExpandersHolder theMdmExpandersHolder) { myContext = theContext; myBulkExportHelperSvc = theBulkExportHelperSvc; @@ -109,6 +120,7 @@ public JpaBulkExportProcessor( myEntityManager = theEntityManager; myHapiTransactionService = theHapiTransactionService; mySearchParamRegistry = theSearchParamRegistry; + myMatchUrlService = theMatchUrlService; myMdmExpandersHolder = theMdmExpandersHolder; } @@ -138,6 +150,108 @@ public Iterator getResourcePidIterator(ExportPIDIteratorParameters thePa }); } + @Nonnull + @Override + public Set expandPatientIdList(ExpandPatientIdsParams theParams) { + return myHapiTransactionService + .withSystemRequest() + .withRequestPartitionId(theParams.getRequestPartitionId()) + .readOnly() + .execute(() -> { + Set patientPids = new HashSet<>(); + switch (theParams.getExportStyle()) { + case GROUP -> { + populateListOfPatientIdsForGroupExport(theParams, patientPids); + } + case PATIENT -> { + populateListOfPatientIdsForPatientExport(theParams, patientPids); + } + default -> { + // nothing to do (patients parameter is not supported for system level exports) + } + } + + return patientPids; + }); + } + + private void populateListOfPatientIdsForPatientExport(ExpandPatientIdsParams theParams, Set theJpaPids) + throws IOException { + RequestPartitionId partitionId = theParams.getRequestPartitionId(); + List maps = makeSearchParameterMapsForPatientExport(theParams); + + Set pids = + new HashSet<>(getPatientPidsUsingSearchMaps(maps, null, null, theParams.getRequestPartitionId())); + + /* + * the pids do not necessarily have the associated IIdType attached ot them. + * But we need them, so we'll expand them out here. + * This will be fast if the pid is still in the cache (which it should be) + */ + myIdHelperService.fillOutPids(pids, myContext); + + if (theParams.isShouldDoMdmExpansion()) { + Collection patientIds = pids.stream() + .map(IResourcePersistentId::getAssociatedResourceId) + .collect(Collectors.toList()); + + // MDM expansion requested -> we'll have MdmExpansionHolder do the work + // of fetching mdm linked patients as well as converting all of them to + // JpaPid + Set resolvedAndMdmExpanded = myMdmExpandersHolder + .getBulkExportMDMResourceExpanderInstance() + .expandPatients(patientIds, partitionId); + theJpaPids.addAll(resolvedAndMdmExpanded); + } else { + theJpaPids.addAll(pids); + } + } + + private void populateListOfPatientIdsForGroupExport(ExpandPatientIdsParams theParams, Set thePatientPids) + throws IOException { + RequestPartitionId partitionId = theParams.getRequestPartitionId(); + + // we have to apply the parameters to filter + // the patients we want (ie, not all the members of the group necessarily fit the filters) + // so first we get a set of SP maps + RuntimeResourceDefinition def = myContext.getResourceDefinition("Patient"); + List maps = myBulkExportHelperSvc.createSearchParameterMapsForResourcetype( + def, theParams.getFilters(), theParams.getStartDate(), theParams.getEndDate(), true); + + // use those maps to get the patient ids we care about + List pids = + getPatientPidsUsingSearchMaps(maps, theParams.getGroupId(), null, theParams.getRequestPartitionId()); + + /* + * and fill them out. + * + * Like with patient export above, the JpaPids here might not have + * their associated resource id populated. + * But since we need it, we'll "fill them out" here. + * (should be fast, because the ids should be in the cache) + */ + Set pidsSet = new HashSet<>(pids); + myIdHelperService.fillOutPids(pidsSet, myContext); + + Set patientIds = pidsSet.stream() + .map(BaseResourcePersistentId::getAssociatedResourceId) + .collect(Collectors.toSet()); + + if (theParams.isShouldDoMdmExpansion()) { + // expand them out and add them to our list + Set jpaPids = myMdmExpandersHolder + .getBulkExportMDMResourceExpanderInstance() + .expandPatients(patientIds, partitionId); + thePatientPids.addAll(jpaPids); + } else { + // no mdm expansion; just add them to the list + myIdHelperService.resolveResourcePids( + partitionId, + patientIds, + ResolveIdentityMode.excludeDeleted().cacheOk()); + } + } + @SuppressWarnings("unchecked") private LinkedHashSet getPidsForPatientStyleExport( ExportPIDIteratorParameters theParams, @@ -147,6 +261,7 @@ private LinkedHashSet getPidsForPatientStyleExport( RuntimeResourceDefinition def) throws IOException { LinkedHashSet pids = new LinkedHashSet<>(); + // Patient if (myStorageSettings.getIndexMissingFields() == JpaStorageSettings.IndexEnabledEnum.DISABLED) { String errorMessage = @@ -194,6 +309,7 @@ map, searchRuntime, new SystemRequestDetails(), theParams.getPartitionIdOrAllPar } } } + return pids; } @@ -202,18 +318,31 @@ private void filterBySpecificPatient( String resourceType, String patientSearchParam, SearchParameterMap map) { - if (resourceType.equalsIgnoreCase("Patient")) { - if (theParams.getPatientIds() != null) { - ReferenceOrListParam referenceOrListParam = makeReferenceOrListParam(theParams.getPatientIds()); - map.add(PARAM_ID, referenceOrListParam); - } - } else { - if (theParams.getPatientIds() != null) { - ReferenceOrListParam referenceOrListParam = makeReferenceOrListParam(theParams.getPatientIds()); - map.add(patientSearchParam, referenceOrListParam); - } else { - map.add(patientSearchParam, new ReferenceParam().setMissing(false)); - } + boolean isPatientResource = resourceType.equalsIgnoreCase("Patient"); + + // construct our patient ids params, depending on if they are expanded or not + ReferenceOrListParam patientIdsParams = null; + if (theParams.getExpandedPatientIds() != null) { + patientIdsParams = + makeReferenceOrListParam(theParams.getExpandedPatientIds().stream() + .map(f -> { + return f.getAssociatedResourceId() + .toUnqualifiedVersionless() + .getValue(); + }) + .collect(Collectors.toList())); + } else if (theParams.getPatientIds() != null) { + patientIdsParams = makeReferenceOrListParam(theParams.getPatientIds()); + } + + if (patientIdsParams != null) { + // add the patient id filtering + String paramName = isPatientResource ? PARAM_ID : patientSearchParam; + map.add(paramName, patientIdsParams); + } else if (!isPatientResource) { + // we only search for resources missing patient references + // if we're not searching for patient resources + map.add(patientSearchParam, new ReferenceParam().setMissing(true)); } } @@ -395,6 +524,14 @@ private void validateSearchParametersForGroup(SearchParameterMap expandedSpMap, */ private LinkedHashSet getExpandedPatientList( ExportPIDIteratorParameters theParameters, boolean theConsiderDateRange) throws IOException { + + if (theParameters.getExpandedPatientIds() != null) { + List existingMembers = theParameters.getExpandedPatientIds().stream() + .map(pid -> (JpaPid) pid) + .toList(); + return new LinkedHashSet<>(existingMembers); + } + List members = getMembersFromGroupWithFilter(theParameters, theConsiderDateRange); ourLog.info( "Group with ID [{}] has been expanded to {} members, member JpaIds: {}", @@ -417,23 +554,36 @@ private LinkedHashSet getExpandedPatientList( * * @return A list of strings representing the Patient IDs of the members (e.g. ["P1", "P2", "P3"] */ - @SuppressWarnings("unchecked") private List getMembersFromGroupWithFilter( ExportPIDIteratorParameters theParameters, boolean theConsiderDateRange) throws IOException { - final List maps = makeSearchParameterMaps(theParameters, theConsiderDateRange); + final List maps = + makeSearchParameterMapsForGroupExport(theParameters, theConsiderDateRange); + + return getPatientPidsUsingSearchMaps( + maps, + theParameters.getGroupId(), + theParameters.getInstanceId(), + theParameters.getPartitionIdOrAllPartitions()); + } + + private List getPatientPidsUsingSearchMaps( + List maps, + String theGroupId, + String theInstanceId, + RequestPartitionId theRequestPartitionId) + throws IOException { final List resPids = new ArrayList<>(); for (SearchParameterMap map : maps) { ISearchBuilder searchBuilder = getSearchBuilderForResourceType("Patient"); - ourLog.debug( - "Searching for members of group {} with job instance {} with map {}", - theParameters.getGroupId(), - theParameters.getInstanceId(), - map); + if (isNotBlank(theGroupId)) { + ourLog.debug( + "Searching for members of group {} with job instance {} with map {}", + theGroupId, + theInstanceId, + map); + } try (IResultIterator resultIterator = searchBuilder.createQuery( - map, - new SearchRuntimeDetails(null, theParameters.getInstanceId()), - null, - theParameters.getPartitionIdOrAllPartitions())) { + map, new SearchRuntimeDetails(null, theInstanceId), null, theRequestPartitionId)) { while (resultIterator.hasNext()) { resPids.add(resultIterator.next()); @@ -444,7 +594,7 @@ private List getMembersFromGroupWithFilter( } @Nonnull - private List makeSearchParameterMaps( + private List makeSearchParameterMapsForGroupExport( @Nonnull ExportPIDIteratorParameters theParameters, boolean theConsiderDateRange) { final RuntimeResourceDefinition def = myContext.getResourceDefinition("Patient"); final List maps = myBulkExportHelperSvc.createSearchParameterMapsForResourceType( @@ -459,6 +609,21 @@ private List makeSearchParameterMaps( return maps; } + private List makeSearchParameterMapsForPatientExport(ExpandPatientIdsParams theParams) { + final RuntimeResourceDefinition def = myContext.getResourceDefinition("Patient"); + final List maps = myBulkExportHelperSvc.createSearchParameterMapsForResourcetype( + def, theParams.getFilters(), theParams.getStartDate(), theParams.getEndDate(), true); + + if (!theParams.getPatientIds().isEmpty()) { + // Patient Instance Export + maps.forEach(map -> { + map.add(PARAM_ID, makeReferenceOrListParam(theParams.getPatientIds())); + }); + } + + return maps; + } + @Nonnull private HasOrListParam makeGroupMemberHasOrListParam(@Nonnull String theGroupId) { final HasParam hasParam = new HasParam("Group", "member", "_id", theGroupId); diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/config/JpaBulkExportConfig.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/config/JpaBulkExportConfig.java index f08fffc689b6..24ccaa541f1c 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/config/JpaBulkExportConfig.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/config/JpaBulkExportConfig.java @@ -29,6 +29,7 @@ import ca.uhn.fhir.jpa.dao.SearchBuilderFactory; import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService; import ca.uhn.fhir.jpa.model.dao.JpaPid; +import ca.uhn.fhir.jpa.searchparam.MatchUrlService; import ca.uhn.fhir.mdm.svc.MdmExpandersHolder; import ca.uhn.fhir.rest.server.util.ISearchParamRegistry; import jakarta.persistence.EntityManager; @@ -48,6 +49,7 @@ public IBulkExportProcessor jpaBulkExportProcessor( EntityManager theEntityManager, IHapiTransactionService theHapiTransactionService, ISearchParamRegistry theSearchParamRegistry, + MatchUrlService theMatchUrlService, MdmExpandersHolder theMdmExpandersHolder) { return new JpaBulkExportProcessor( theFhirContext, @@ -59,6 +61,7 @@ public IBulkExportProcessor jpaBulkExportProcessor( theEntityManager, theHapiTransactionService, theSearchParamRegistry, + theMatchUrlService, theMdmExpandersHolder); } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/BaseHapiFhirResourceDao.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/BaseHapiFhirResourceDao.java index 95e19b8afd39..6485efc4737c 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/BaseHapiFhirResourceDao.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/BaseHapiFhirResourceDao.java @@ -2316,12 +2316,13 @@ private List searchForTransformedIds( .searchList(() -> { ISearchBuilder builder = mySearchBuilderFactory.newSearchBuilder(getResourceName(), getResourceType()); - Stream pidStream = - builder.createQueryStream(theParams, searchRuntimeDetails, theRequest, requestPartitionId); + try (Stream pidStream = builder.createQueryStream( + theParams, searchRuntimeDetails, theRequest, requestPartitionId)) { - Stream transformedStream = transform.apply(theRequest, pidStream, requestPartitionId); - - return transformedStream.collect(Collectors.toList()); + try (Stream transformedStream = transform.apply(theRequest, pidStream, requestPartitionId)) { + return transformedStream.collect(Collectors.toList()); + } + } }); } diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/export/svc/JpaBulkExportProcessorTest.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/export/svc/JpaBulkExportProcessorTest.java index 3a1adefeffc1..4a5a95054d6f 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/export/svc/JpaBulkExportProcessorTest.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/bulk/export/svc/JpaBulkExportProcessorTest.java @@ -273,8 +273,6 @@ public void getResourcePidIterator_groupExportStyleWithPatientResource_returnsIt pids.add(JpaPid.fromId(type.getIdPartAsLong())); } - - parameters.setExpandMdm(theMdm); // set mdm expansion parameters.setPartitionId(getPartitionIdFromParams(thePartitioned)); @@ -327,7 +325,6 @@ private void validatePartitionId(boolean thePartitioned, RequestPartitionId theP } else { assertEquals(RequestPartitionId.allPartitions(), thePartitionId); } - } // source is: "isExpandMdm,(whether or not to test on a specific partition) diff --git a/hapi-fhir-jpaserver-elastic-test-utilities/src/test/java/ca/uhn/fhir/jpa/bulk/BulkGroupExportWithIndexedSearchParametersTest.java b/hapi-fhir-jpaserver-elastic-test-utilities/src/test/java/ca/uhn/fhir/jpa/bulk/BulkGroupExportWithIndexedSearchParametersTest.java index 4fa7f2d4a23b..42231e0b6388 100644 --- a/hapi-fhir-jpaserver-elastic-test-utilities/src/test/java/ca/uhn/fhir/jpa/bulk/BulkGroupExportWithIndexedSearchParametersTest.java +++ b/hapi-fhir-jpaserver-elastic-test-utilities/src/test/java/ca/uhn/fhir/jpa/bulk/BulkGroupExportWithIndexedSearchParametersTest.java @@ -2,7 +2,6 @@ import ca.uhn.fhir.batch2.api.IJobCoordinator; import ca.uhn.fhir.batch2.model.JobInstanceStartRequest; -import ca.uhn.fhir.cache.BaseResourceCacheSynchronizer; import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao; import ca.uhn.fhir.jpa.api.model.BulkExportJobResults; @@ -12,7 +11,6 @@ import ca.uhn.fhir.jpa.test.config.TestR4Config; import ca.uhn.fhir.rest.api.Constants; import ca.uhn.fhir.rest.api.server.bulk.BulkExportJobParameters; -import ca.uhn.fhir.rest.server.util.ISearchParamRegistry; import ca.uhn.fhir.util.Batch2JobDefinitionConstants; import ca.uhn.fhir.util.JsonUtil; import org.hl7.fhir.r4.model.Bundle; diff --git a/hapi-fhir-jpaserver-mdm/src/test/java/ca/uhn/fhir/jpa/mdm/svc/BulkExportMdmEidMatchOnlyResourceExpanderIT.java b/hapi-fhir-jpaserver-mdm/src/test/java/ca/uhn/fhir/jpa/mdm/svc/BulkExportMdmEidMatchOnlyResourceExpanderIT.java new file mode 100644 index 000000000000..17febc48eb6d --- /dev/null +++ b/hapi-fhir-jpaserver-mdm/src/test/java/ca/uhn/fhir/jpa/mdm/svc/BulkExportMdmEidMatchOnlyResourceExpanderIT.java @@ -0,0 +1,153 @@ +package ca.uhn.fhir.jpa.mdm.svc; + +import ca.uhn.fhir.interceptor.model.RequestPartitionId; +import ca.uhn.fhir.jpa.api.model.DaoMethodOutcome; +import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService; +import ca.uhn.fhir.jpa.entity.MdmLink; +import ca.uhn.fhir.jpa.mdm.BaseMdmR4Test; +import ca.uhn.fhir.jpa.mdm.helper.MdmLinkHelper; +import ca.uhn.fhir.jpa.mdm.helper.testmodels.MDMLinkResults; +import ca.uhn.fhir.jpa.mdm.helper.testmodels.MDMState; +import ca.uhn.fhir.jpa.model.dao.JpaPid; +import ca.uhn.fhir.mdm.svc.BulkExportMdmEidMatchOnlyResourceExpander; +import ca.uhn.fhir.mdm.svc.MdmEidMatchOnlyExpandSvc; +import ca.uhn.fhir.mdm.util.EIDHelper; +import ca.uhn.fhir.mdm.util.MdmResourceUtil; +import ca.uhn.fhir.rest.api.server.SystemRequestDetails; +import org.hl7.fhir.instance.model.api.IIdType; +import org.hl7.fhir.r4.model.IdType; +import org.hl7.fhir.r4.model.Patient; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +public class BulkExportMdmEidMatchOnlyResourceExpanderIT extends BaseMdmR4Test { + + @Autowired + private BulkExportMdmEidMatchOnlyResourceExpander myResourceExpander; + + @Autowired + private IHapiTransactionService myHapiTransactionService; + + @Autowired + private EIDHelper myEIDHelper; + + @Autowired + private MdmEidMatchOnlyExpandSvc myMdmEidMatchOnlyExpandSvc; + + @Autowired + private MdmLinkHelper myLinkHelper; + + @BeforeEach + public void before() throws Exception { + super.before(); + + myMdmEidMatchOnlyExpandSvc.setMyEidHelper(myEIDHelper); + } + + @Test + public void expandPatients_bulkExport_returnsLinkedIds() { + // setup + SystemRequestDetails requestDetails = new SystemRequestDetails(); + String eidValue = "eid-value"; + String patientEidSystem = myMdmSettings.getMdmRules() + .getEnterpriseEIDSystemForResourceType("Patient"); + + Map mapping = new HashMap<>(); + for (String id : new String[] { "P1", "P2", "P3", "GP1" }) { + Patient patient = new Patient(); + patient.setActive(true); + patient.addIdentifier() + .setSystem(patientEidSystem) + .setValue(eidValue); + patient.addName() + .setFamily("Simpson"); + patient.setId(new IdType("Patient/" + id)); + + MdmResourceUtil.setMdmManaged(patient); + if (id.equalsIgnoreCase("GP1")) { + MdmResourceUtil.setGoldenResource(patient); + } + + Patient createdPatient = (Patient) myPatientDao.create(patient, requestDetails) + .getResource(); + + mapping.put(id, patient); + } + + // Just for setup; we won't be doing any link changing here + String inputState = """ + GP1, AUTO, MATCH, P1 + GP1, AUTO, MATCH, P2 + GP1, AUTO, MATCH, P3 + """; + MDMState state = new MDMState<>(); + state.setInputState(inputState); + state.setParameterToValue(mapping); + MDMLinkResults createdState = myLinkHelper.setup(state); + myLinkHelper.logMdmLinks(); + + MdmLink firstLink = createdState.getResults().get(0); + + // test + Set pids = withTransaction(() -> { + return myResourceExpander.expandPatients(List.of(new IdType("Patient/P1")), RequestPartitionId.allPartitions()); + }); + + assertNotNull(pids); + assertEquals(4, pids.size()); + } + + @Test + public void expandMdmBySourceResourceIdsForSingleResourceType_largeNumber_works() { + // setup + int count = 10000; + RequestPartitionId requestPartitionId = RequestPartitionId.allPartitions(); + String eidValue = "eid-value"; + String patientEidSystem = myMdmSettings.getMdmRules() + .getEnterpriseEIDSystemForResourceType("Patient"); + SystemRequestDetails requestDetails = new SystemRequestDetails(); + DaoMethodOutcome outcome; + + IIdType id = null; + for (int i = 0; i < count; i++) { + Patient patient = new Patient(); + patient.setId("Patient/pat-" + i); + patient.setActive(true); + patient.addName() + .setFamily("Simpsons"); + patient.addIdentifier() + .setSystem(patientEidSystem) + .setValue(eidValue); + + outcome = myPatientDao.update(patient, requestDetails); + if (id == null) { + id = outcome.getId(); + } + } + + // test + Set expanded = myMdmEidMatchOnlyExpandSvc.expandMdmBySourceResourceIdsForSingleResourceType(requestPartitionId, + Set.of(id)); + + // validate + assertEquals(count, expanded.size()); + } + + private T withTransaction(Callable theCallable) { + return myHapiTransactionService + .withSystemRequest() + .withRequestPartitionId(RequestPartitionId.allPartitions()) + .readOnly() + .execute(theCallable); + } +} diff --git a/hapi-fhir-jpaserver-mdm/src/test/java/ca/uhn/fhir/jpa/mdm/svc/MdmBatchSvcImplIT.java b/hapi-fhir-jpaserver-mdm/src/test/java/ca/uhn/fhir/jpa/mdm/svc/MdmBatchSvcImplIT.java index e64d1510a9b6..f4ee580d7fb1 100644 --- a/hapi-fhir-jpaserver-mdm/src/test/java/ca/uhn/fhir/jpa/mdm/svc/MdmBatchSvcImplIT.java +++ b/hapi-fhir-jpaserver-mdm/src/test/java/ca/uhn/fhir/jpa/mdm/svc/MdmBatchSvcImplIT.java @@ -30,6 +30,7 @@ class MdmBatchSvcImplIT extends BaseMdmR4Test { public void before() { myInterceptorService.registerAnonymousInterceptor(Pointcut.MDM_AFTER_PERSISTED_RESOURCE_CHECKED, afterMdmLatch); } + @Override @AfterEach public void after() throws IOException { @@ -41,7 +42,6 @@ public void after() throws IOException { @Test public void testMdmBatchRunWorksOverMultipleTargetTypes() throws InterruptedException { - for (int i =0; i < 10; i++) { createPatient(buildJanePatient()); } diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/bulk/BulkExportUseCaseTest.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/bulk/BulkExportUseCaseTest.java index 986796df5535..d92bfdab534c 100644 --- a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/bulk/BulkExportUseCaseTest.java +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/bulk/BulkExportUseCaseTest.java @@ -46,6 +46,7 @@ import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpGet; import org.assertj.core.api.AssertionsForClassTypes; +import org.hl7.fhir.instance.model.api.IBaseBundle; import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IIdType; import org.hl7.fhir.r4.model.Binary; @@ -72,6 +73,8 @@ import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -103,6 +106,7 @@ import static org.junit.jupiter.api.Assertions.fail; + public class BulkExportUseCaseTest extends BaseResourceProviderR4Test { private static final Logger ourLog = LoggerFactory.getLogger(BulkExportUseCaseTest.class); @@ -130,7 +134,6 @@ public void beforeEach() { myStorageSettings.setJobFastTrackingEnabled(false); } - @Nested public class SpecConformanceTests { @@ -569,6 +572,103 @@ public void after() { myStorageSettings.setBulkExportFileMaximumCapacity(JpaStorageSettings.DEFAULT_BULK_EXPORT_FILE_MAXIMUM_CAPACITY); } + @ParameterizedTest + @ValueSource(booleans = { true, false }) + public void patientExport_withMdmEnabledAndEIDMatch_returnsLinkedResources( + boolean theTypeExport + ) { + // setup + PatientExportMdmExpansionSetup info = setupForPatientMdmLinkedTests(); + + // an unmatched patient to verify it isn't included + Patient unmatched = new Patient(); + unmatched.setActive(true); + unmatched.setId("pat-unmatched"); + Patient created = (Patient) myPatientDao.update(unmatched, mySrd) + .getResource(); + + Observation unmatchedObs = new Observation(); + unmatchedObs.setSubject(new Reference(created.getId())); + unmatchedObs.setId("obs-unmatched"); + Observation createdObs = (Observation) myObservationDao.update(unmatchedObs, mySrd) + .getResource(); + + // test + BulkExportJobResults results = null; + if (theTypeExport) { + // for type we'll use an id filter for the patient to make 'similar' behaviour + results = startBulkExportJobAndAwaitCompletion( + BulkExportJobParameters.ExportStyle.PATIENT, + new HashSet<>(), + Set.of("_id=" + info.PrimaryPatient.getId()), + null, + true + ); + } else { + results = startBulkExportJobAndAwaitCompletion( + BulkExportJobParameters.ExportStyle.PATIENT, + new HashSet<>(), + new HashSet<>(), + info.PrimaryPatient.getId(), + true + ); + } + + // validate + assertNotNull(results); + // Patient && Observation + assertEquals(2, results.getResourceTypeToBinaryIds().size()); + Map> exportedResourcesMap = convertJobResultsToResources(results); + + assertEquals(2, exportedResourcesMap.size()); + assertEquals(2, exportedResourcesMap.get("Patient").size()); + // ensure our unmatched resource isn't included + assertFalse(exportedResourcesMap.get("Patient") + .stream().anyMatch(p -> p.getIdElement().toUnqualifiedVersionless().getValue().equalsIgnoreCase(created.getIdElement().toUnqualifiedVersionless().getId()))); + assertEquals(2, exportedResourcesMap.get("Observation").size()); + assertFalse(exportedResourcesMap.get("Observation") + .stream().anyMatch(o -> o.getIdElement().toUnqualifiedVersionless().getValue().equalsIgnoreCase(createdObs.getIdElement().toUnqualifiedVersionless().getId()))); + } + + /** + * This does setup for + * batch export tests that require mdm linking on patient eid. + * It will create a series of patients and observations that will + * be linked. + */ + private PatientExportMdmExpansionSetup setupForPatientMdmLinkedTests() { + createAndSetMdmSettingsForEidMatchOnly(); + + BundleBuilder bb = new BundleBuilder(myFhirContext); + Patient pat1 = new Patient(); + pat1.setId("pat-1"); + pat1.addIdentifier(new Identifier().setSystem(TEST_PATIENT_EID_SYS).setValue("the-patient-eid-value")); + bb.addTransactionUpdateEntry(pat1); + + Observation obs1 = new Observation(); + obs1.setId("obs-1"); + obs1.setSubject(new Reference("Patient/pat-1")); + bb.addTransactionUpdateEntry(obs1); + + Patient pat2 = new Patient(); + pat2.setId("pat-2"); + pat2.addIdentifier(new Identifier().setSystem(TEST_PATIENT_EID_SYS).setValue("the-patient-eid-value")); + bb.addTransactionUpdateEntry(pat2); + + Observation obs2 = new Observation(); + obs2.setId("obs-2"); + obs2.setSubject(new Reference("Patient/pat-2")); + bb.addTransactionUpdateEntry(obs2); + + IBaseBundle createdBundle = myClient.transaction().withBundle(bb.getBundle()).execute(); + + return new PatientExportMdmExpansionSetup(pat1, (Bundle) createdBundle); + } + + public record PatientExportMdmExpansionSetup(Patient PrimaryPatient, Bundle BundleOResources) { + } + + // TODO reenable 4637 // Reenable when bulk exports that return no results work as expected @Disabled @@ -691,6 +791,10 @@ void tearDown() { restoreMdmSettingsToDefault(); } + private void restoreMdmSettingsToDefault() { + myMdmExpandersHolder.setMdmSettings(new MdmSettings(myMdmRulesValidator)); + } + @Test public void testGroupExportSuccessfulyExportsPatientForwardReferences() { BundleBuilder bb = new BundleBuilder(myFhirContext); @@ -1540,6 +1644,49 @@ public void testSystemBulkExport() { assertThat(exportedPatientVersionsMap).isEqualTo(patientVersionsMap); } + @Test + public void groupExport_withMdmEnabled_returnsLinkedResources() { + // setup + createAndSetMdmSettingsForEidMatchOnly(); + GroupExportSetup info = setupForMdmGroupExport(); + + Patient excluded = new Patient(); + excluded.setActive(true); + excluded.setId("Patient/patex"); + excluded.addIdentifier() + .setSystem(TEST_PATIENT_EID_SYS) + .setValue("different-value"); // don't want it pulled in by mdm + excluded.addName() + .setFamily("Bouvier"); + DaoMethodOutcome result = myPatientDao.update(excluded, mySrd); + + // add to the group + Group updated = info.Group; + updated.addMember() + .getEntity() + .setReference(result.getId().toUnqualifiedVersionless().getValue()); + myGroupDao.update(updated, mySrd); + + BulkExportJobResults bulkExportJobResults = startBulkExportJobAndAwaitCompletion( + BulkExportJobParameters.ExportStyle.GROUP, + new HashSet<>(), + Set.of("Patient?name=Simpson"), // filters + "mdm-group", + true, + false); + Map> exportedResourcesMap = convertJobResultsToResources(bulkExportJobResults); + + assertThat(exportedResourcesMap.keySet()).hasSize(3); + List exportedGroups = exportedResourcesMap.get("Group"); + assertResourcesIds(exportedGroups, "Group/mdm-group"); + + List exportedPatients = exportedResourcesMap.get("Patient"); + assertResourcesIds(exportedPatients, "Patient/pat-1", "Patient/pat-2"); + + List exportedObservations = exportedResourcesMap.get("Observation"); + assertResourcesIds(exportedObservations, "Observation/obs-1", "Observation/obs-2"); + } + @Test public void testSystemBulkExport_withResourcesExceedingPageSizes() { // given @@ -1868,8 +2015,34 @@ void testGroupExportWithMdmEnabled_EidMatchOnly() { createAndSetMdmSettingsForEidMatchOnly(); BundleBuilder bb = new BundleBuilder(myFhirContext); + createAndSetMdmSettingsForEidMatchOnly(); + setupForMdmGroupExport(); + + BulkExportJobResults bulkExportJobResults = startBulkExportJobAndAwaitCompletion( + BulkExportJobParameters.ExportStyle.GROUP, + new HashSet<>(), new HashSet<>(), + "mdm-group", + true, + false); + Map> exportedResourcesMap = convertJobResultsToResources(bulkExportJobResults); + + assertThat(exportedResourcesMap.keySet()).hasSize(3); + List exportedGroups = exportedResourcesMap.get("Group"); + assertResourcesIds(exportedGroups, "Group/mdm-group"); + + List exportedPatients = exportedResourcesMap.get("Patient"); + assertResourcesIds(exportedPatients, "Patient/pat-1", "Patient/pat-2"); + + List exportedObservations = exportedResourcesMap.get("Observation"); + assertResourcesIds(exportedObservations, "Observation/obs-1", "Observation/obs-2"); + + } + - //In this test, we create two patients with the same Eid value for the eid system specified in mdm rules + private GroupExportSetup setupForMdmGroupExport() { + BundleBuilder bb = new BundleBuilder(myFhirContext); + + //we create two patients with the same Eid value for the eid system specified in mdm rules //and 2 observations referencing one of each of these patients //Create a group that contains one of the patients. //When we export the group, we should get both patients and the 2 observations @@ -1877,6 +2050,9 @@ void testGroupExportWithMdmEnabled_EidMatchOnly() { //based on having the same eid value Patient pat1 = new Patient(); pat1.setId("pat-1"); + pat1.setActive(true); + pat1.addName() + .setFamily("Simpson"); pat1.addIdentifier(new Identifier().setSystem(TEST_PATIENT_EID_SYS).setValue("the-patient-eid-value")); bb.addTransactionUpdateEntry(pat1); @@ -1887,6 +2063,9 @@ void testGroupExportWithMdmEnabled_EidMatchOnly() { Patient pat2 = new Patient(); pat2.setId("pat-2"); + pat2.setActive(true); + pat1.addName() + .setFamily("Simpson"); pat2.addIdentifier(new Identifier().setSystem(TEST_PATIENT_EID_SYS).setValue("the-patient-eid-value")); bb.addTransactionUpdateEntry(pat2); @@ -1901,23 +2080,13 @@ void testGroupExportWithMdmEnabled_EidMatchOnly() { group.addMember().getEntity().setReference("Patient/pat-1"); bb.addTransactionUpdateEntry(group); - myClient.transaction().withBundle(bb.getBundle()).execute(); - - BulkExportJobResults bulkExportJobResults = startGroupBulkExportJobAndAwaitCompletionForMdmExpand(new HashSet<>(), new HashSet<>(), "mdm-group", true); - Map> exportedResourcesMap = convertJobResultsToResources(bulkExportJobResults); - - assertThat(exportedResourcesMap.keySet()).hasSize(3); - List exportedGroups = exportedResourcesMap.get("Group"); - assertResourcesIds(exportedGroups, "Group/mdm-group"); - - List exportedPatients = exportedResourcesMap.get("Patient"); - assertResourcesIds(exportedPatients, "Patient/pat-1", "Patient/pat-2"); - - List exportedObservations = exportedResourcesMap.get("Observation"); - assertResourcesIds(exportedObservations, "Observation/obs-1", "Observation/obs-2"); + Bundle b = (Bundle) bb.getBundle(); + myClient.transaction().withBundle(b).execute(); + return new GroupExportSetup(group, b); } + public record GroupExportSetup(Group Group, Bundle BundleOfResources) {} private void createAndSetMdmSettingsForEidMatchOnly() { MdmSettings mdmSettings = new MdmSettings(myMdmRulesValidator); @@ -2082,17 +2251,27 @@ BulkExportJobResults startBulkExportJobAndAwaitCompletion( .returnMethodOutcome() .withAdditionalHeader(Constants.HEADER_PREFER, Constants.HEADER_PREFER_RESPOND_ASYNC) .execute(); - } else if (theExportStyle == BulkExportJobParameters.ExportStyle.PATIENT && theGroupOrPatientId != null) { + } else if (theExportStyle == BulkExportJobParameters.ExportStyle.PATIENT) { //TODO add support for this actual processor. - fail("Bulk Exports that return no data do not return"); - outcome = myClient - .operation() - .onInstance("Patient/" + theGroupOrPatientId) - .named(ProviderConstants.OPERATION_EXPORT) - .withParameters(parameters) - .returnMethodOutcome() - .withAdditionalHeader(Constants.HEADER_PREFER, Constants.HEADER_PREFER_RESPOND_ASYNC) - .execute(); + if (theGroupOrPatientId != null) { + outcome = myClient + .operation() + .onInstance("Patient/" + theGroupOrPatientId) + .named(ProviderConstants.OPERATION_EXPORT) + .withParameters(parameters) + .returnMethodOutcome() + .withAdditionalHeader(Constants.HEADER_PREFER, Constants.HEADER_PREFER_RESPOND_ASYNC) + .execute(); + } else { + outcome = myClient + .operation() + .onType("Patient") + .named(ProviderConstants.OPERATION_EXPORT) + .withParameters(parameters) + .returnMethodOutcome() + .withAdditionalHeader(Constants.HEADER_PREFER, Constants.HEADER_PREFER_RESPOND_ASYNC) + .execute(); + } } else { // system request outcome = myClient diff --git a/hapi-fhir-server-mdm/src/main/java/ca/uhn/fhir/mdm/api/IMdmLinkExpandSvc.java b/hapi-fhir-server-mdm/src/main/java/ca/uhn/fhir/mdm/api/IMdmLinkExpandSvc.java index bd52509dc8f3..5cb45bd1e38c 100644 --- a/hapi-fhir-server-mdm/src/main/java/ca/uhn/fhir/mdm/api/IMdmLinkExpandSvc.java +++ b/hapi-fhir-server-mdm/src/main/java/ca/uhn/fhir/mdm/api/IMdmLinkExpandSvc.java @@ -24,6 +24,7 @@ import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IIdType; +import java.util.Collection; import java.util.Set; public interface IMdmLinkExpandSvc { @@ -31,6 +32,15 @@ public interface IMdmLinkExpandSvc { Set expandMdmBySourceResourceId(RequestPartitionId theRequestPartitionId, IIdType theId); + /** + * Does the mdm expansion of a list of ids for a single resource type + * @param theRequestPartitionId the request partition to use + * @param theIds the list of patient ids to expand + * @return the mdm expanded set of patient ids (should include the original set as well as any linked patient ids) + */ + Set expandMdmBySourceResourceIdsForSingleResourceType( + RequestPartitionId theRequestPartitionId, Collection theIds); + Set expandMdmBySourceResourcePid( RequestPartitionId theRequestPartitionId, IResourcePersistentId theSourceResourcePid); diff --git a/hapi-fhir-server-mdm/src/main/java/ca/uhn/fhir/mdm/svc/BulkExportMdmEidMatchOnlyResourceExpander.java b/hapi-fhir-server-mdm/src/main/java/ca/uhn/fhir/mdm/svc/BulkExportMdmEidMatchOnlyResourceExpander.java index 2239db8e6412..eea61299025c 100644 --- a/hapi-fhir-server-mdm/src/main/java/ca/uhn/fhir/mdm/svc/BulkExportMdmEidMatchOnlyResourceExpander.java +++ b/hapi-fhir-server-mdm/src/main/java/ca/uhn/fhir/mdm/svc/BulkExportMdmEidMatchOnlyResourceExpander.java @@ -32,6 +32,7 @@ import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IIdType; +import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -115,6 +116,18 @@ public Set expandGroup(String groupResourceId, RequestPartitionId reques return new HashSet<>(pidList); } + @Override + public Set expandPatients(Collection thePatientIds, RequestPartitionId theRequestPartitionId) { + Set ids = myMdmEidMatchOnlyLinkExpandSvc.expandMdmBySourceResourceIdsForSingleResourceType( + theRequestPartitionId, thePatientIds); + + List pids = myIdHelperService.resolveResourcePids( + theRequestPartitionId, + ids.stream().map(id -> myFhirContext.getVersion().newIdType(id)).collect(Collectors.toList()), + ResolveIdentityMode.excludeDeleted().cacheOk()); + return new HashSet<>(pids); + } + @Override public void annotateResource(IBaseResource resource) { // This function is normally used to add golden resource id to the exported resources, diff --git a/hapi-fhir-server-mdm/src/main/java/ca/uhn/fhir/mdm/svc/BulkExportMdmResourceExpander.java b/hapi-fhir-server-mdm/src/main/java/ca/uhn/fhir/mdm/svc/BulkExportMdmResourceExpander.java index 06b12dac3a71..89b1a3e41ca9 100644 --- a/hapi-fhir-server-mdm/src/main/java/ca/uhn/fhir/mdm/svc/BulkExportMdmResourceExpander.java +++ b/hapi-fhir-server-mdm/src/main/java/ca/uhn/fhir/mdm/svc/BulkExportMdmResourceExpander.java @@ -27,6 +27,7 @@ import ca.uhn.fhir.jpa.api.dao.DaoRegistry; import ca.uhn.fhir.jpa.api.model.PersistentIdToForcedIdMap; import ca.uhn.fhir.jpa.api.svc.IIdHelperService; +import ca.uhn.fhir.jpa.api.svc.ResolveIdentityMode; import ca.uhn.fhir.jpa.model.dao.JpaPid; import ca.uhn.fhir.mdm.api.MdmMatchResultEnum; import ca.uhn.fhir.mdm.dao.IMdmLinkDao; @@ -40,10 +41,18 @@ import org.hl7.fhir.instance.model.api.IBaseExtension; import org.hl7.fhir.instance.model.api.IBaseReference; import org.hl7.fhir.instance.model.api.IBaseResource; +import org.hl7.fhir.instance.model.api.IIdType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.*; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; /** * Implementation of MDM resource expansion for bulk export operations. @@ -83,7 +92,23 @@ public Set expandGroup(String theGroupResourceId, RequestPartitionId the return performMembershipExpansionViaMdmTable(pidOrNull); } - @SuppressWarnings({"rawtypes", "unchecked"}) + @Override + public Set expandPatients(Collection thePatientIds, RequestPartitionId theRequestPartitionId) { + List resolvedPids = myIdHelperService.resolveResourcePids( + theRequestPartitionId, + thePatientIds.stream().map(IdDt::new).collect(Collectors.toList()), + ResolveIdentityMode.excludeDeleted().cacheOk()); + Set pids = new HashSet<>(); + + Collection> matchedGoldenAndSourceIds = myMdmLinkDao.resolveGoldenResources(resolvedPids); + matchedGoldenAndSourceIds.forEach(set -> { + pids.add(set.getGoldenPid()); + pids.add(set.getSourcePid()); + }); + return pids; + } + + @SuppressWarnings({"unchecked"}) private Set performMembershipExpansionViaMdmTable(JpaPid pidOrNull) { List> goldenPidTargetPidTuples = myMdmLinkDao.expandPidsFromGroupPidGivenMatchResult(pidOrNull, MdmMatchResultEnum.MATCH); diff --git a/hapi-fhir-server-mdm/src/main/java/ca/uhn/fhir/mdm/svc/IBulkExportMdmResourceExpander.java b/hapi-fhir-server-mdm/src/main/java/ca/uhn/fhir/mdm/svc/IBulkExportMdmResourceExpander.java index 22b83a6cb4b2..0d060480b141 100644 --- a/hapi-fhir-server-mdm/src/main/java/ca/uhn/fhir/mdm/svc/IBulkExportMdmResourceExpander.java +++ b/hapi-fhir-server-mdm/src/main/java/ca/uhn/fhir/mdm/svc/IBulkExportMdmResourceExpander.java @@ -22,7 +22,9 @@ import ca.uhn.fhir.interceptor.model.RequestPartitionId; import ca.uhn.fhir.jpa.model.dao.JpaPid; import org.hl7.fhir.instance.model.api.IBaseResource; +import org.hl7.fhir.instance.model.api.IIdType; +import java.util.Collection; import java.util.Set; /** @@ -36,6 +38,15 @@ public interface IBulkExportMdmResourceExpander { */ Set expandGroup(String groupResourceId, RequestPartitionId requestPartitionId); + /** + * For the provided list of patient ids, return the entire set of PIDs (including the linked + * resource ids) + * @param thePatientIds + * @param theRequestPartitionId + * @return + */ + Set expandPatients(Collection thePatientIds, RequestPartitionId theRequestPartitionId); + /** * annotates the given resource to be exported with the implementation specific extra information if applicable */ diff --git a/hapi-fhir-server-mdm/src/main/java/ca/uhn/fhir/mdm/svc/MdmEidMatchOnlyExpandSvc.java b/hapi-fhir-server-mdm/src/main/java/ca/uhn/fhir/mdm/svc/MdmEidMatchOnlyExpandSvc.java index 3422455e9fde..92bf71ca0aa0 100644 --- a/hapi-fhir-server-mdm/src/main/java/ca/uhn/fhir/mdm/svc/MdmEidMatchOnlyExpandSvc.java +++ b/hapi-fhir-server-mdm/src/main/java/ca/uhn/fhir/mdm/svc/MdmEidMatchOnlyExpandSvc.java @@ -27,14 +27,23 @@ import ca.uhn.fhir.mdm.api.IMdmLinkExpandSvc; import ca.uhn.fhir.mdm.model.CanonicalEID; import ca.uhn.fhir.mdm.util.EIDHelper; +import ca.uhn.fhir.rest.api.server.IBundleProvider; import ca.uhn.fhir.rest.api.server.SystemRequestDetails; import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId; +import ca.uhn.fhir.rest.param.ReferenceOrListParam; +import ca.uhn.fhir.rest.param.ReferenceParam; import ca.uhn.fhir.rest.param.TokenOrListParam; import ca.uhn.fhir.rest.param.TokenParam; +import org.apache.commons.lang3.Validate; import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IIdType; -import java.util.*; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; /** * MDM link expansion service that is used when MDM mode is Match-Only and eid systems are defined in Mdm rules. @@ -84,6 +93,59 @@ public Set expandMdmBySourceResourceId(RequestPartitionId theRequestPart return result; } + @Override + public Set expandMdmBySourceResourceIdsForSingleResourceType( + RequestPartitionId theRequestPartitionId, Collection theIds) { + Set resourceTypes = + theIds.stream().map(IIdType::getResourceType).collect(Collectors.toSet()); + Validate.isTrue( + resourceTypes.size() == 1, + "Expected only single resource type; found " + resourceTypes.size() + "." + + (resourceTypes.isEmpty() + ? "" + : " Found resource Types: " + String.join(", ", resourceTypes))); + + @SuppressWarnings("OptionalGetWithoutIsPresent") + String resourceType = resourceTypes.stream().findFirst().get(); + SystemRequestDetails srd = SystemRequestDetails.forRequestPartitionId(theRequestPartitionId); + + @SuppressWarnings("unchecked") + IFhirResourceDao resourceDao = myDaoRegistry.getResourceDao(resourceType); + + SearchParameterMap map; + { + map = new SearchParameterMap(); + map.setLoadSynchronous(true); + if (!theIds.isEmpty()) { + ReferenceOrListParam idsParam = new ReferenceOrListParam(); + theIds.forEach(id -> idsParam.add(new ReferenceParam(id))); + map.add("_id", idsParam); + } + } + + IBundleProvider bundleProvider = resourceDao.search(map, srd); + + Set eids = new HashSet<>(); + for (IBaseResource resource : bundleProvider.getAllResources()) { + eids.addAll(myEidHelper.getExternalEid(resource)); + } + + // construct a new search for the eids + { + map = new SearchParameterMap(); + final TokenOrListParam tokenOrListParam = new TokenOrListParam(); + eids.forEach(eid -> tokenOrListParam.addOr(new TokenParam(eid.getSystem(), eid.getValue()))); + map.add("identifier", tokenOrListParam); + } + List ids = resourceDao.searchForResourceIds(map, srd); + + Set result = new HashSet<>(); + for (IIdType id : ids) { + result.add(id.toUnqualifiedVersionless().getValue()); + } + return result; + } + @Override public Set expandMdmBySourceResource(RequestPartitionId theRequestPartitionId, IBaseResource theResource) { return expandMdmBySourceResourceId(theRequestPartitionId, theResource.getIdElement()); @@ -100,7 +162,7 @@ public Set expandMdmBySourceResourcePid( public Set expandMdmByGoldenResourceId( RequestPartitionId theRequestPartitionId, IResourcePersistentId theGoldenResourcePid) { // This operation is not applicable when using MDM in MATCH_ONLY mode, - // return an emtpy set to rather than an exception to not affect existing code + // return an empty set to rather than an exception to not affect existing code return Collections.emptySet(); } diff --git a/hapi-fhir-server-mdm/src/main/java/ca/uhn/fhir/mdm/svc/MdmLinkExpandSvc.java b/hapi-fhir-server-mdm/src/main/java/ca/uhn/fhir/mdm/svc/MdmLinkExpandSvc.java index 82fb144b9282..300dcb908e16 100644 --- a/hapi-fhir-server-mdm/src/main/java/ca/uhn/fhir/mdm/svc/MdmLinkExpandSvc.java +++ b/hapi-fhir-server-mdm/src/main/java/ca/uhn/fhir/mdm/svc/MdmLinkExpandSvc.java @@ -28,6 +28,7 @@ import ca.uhn.fhir.mdm.model.MdmPidTuple; import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId; import jakarta.annotation.Nonnull; +import org.apache.commons.lang3.Validate; import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IIdType; import org.slf4j.Logger; @@ -35,8 +36,10 @@ import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.stream.Collectors; @@ -83,6 +86,30 @@ public Set expandMdmBySourceResourceId(RequestPartitionId theRequestPart myIdHelperService.getPidOrThrowException(RequestPartitionId.allPartitions(), theId)); } + @Override + public Set expandMdmBySourceResourceIdsForSingleResourceType( + RequestPartitionId theRequestPartitionId, Collection theIds) { + Set resourceTypes = + theIds.stream().map(IIdType::getResourceType).collect(Collectors.toSet()); + Validate.isTrue(resourceTypes.size() == 1, "Expected only 1 resource type, found " + resourceTypes.size()); + + Collection> response = myMdmLinkDao.resolveGoldenResources(Arrays.asList(theIds.toArray())); + + Set expandedIds = new HashSet<>(); + response.stream().forEach(tuple -> { + expandedIds.add(tuple.getSourcePid() + .getAssociatedResourceId() + .toUnqualifiedVersionless() + .getIdPart()); + expandedIds.add(tuple.getGoldenPid() + .getAssociatedResourceId() + .toUnqualifiedVersionless() + .getIdPart()); + }); + + return expandedIds; + } + /** * Given a partition ID and a PID of a source resource, perform MDM expansion and return all the resource IDs of all resources that are * MDM-Matched to this resource. diff --git a/hapi-fhir-server-mdm/src/main/java/ca/uhn/fhir/mdm/util/EIDHelper.java b/hapi-fhir-server-mdm/src/main/java/ca/uhn/fhir/mdm/util/EIDHelper.java index 9930cc7690e2..b1138050d704 100644 --- a/hapi-fhir-server-mdm/src/main/java/ca/uhn/fhir/mdm/util/EIDHelper.java +++ b/hapi-fhir-server-mdm/src/main/java/ca/uhn/fhir/mdm/util/EIDHelper.java @@ -45,6 +45,10 @@ public EIDHelper(FhirContext theFhirContext, IMdmSettings theMdmSettings) { myMdmSettings = theMdmSettings; } + public IMdmSettings getMdmSettings() { + return myMdmSettings; + } + public CanonicalEID createHapiEid() { return new CanonicalEID( MdmConstants.HAPI_ENTERPRISE_IDENTIFIER_SYSTEM, diff --git a/hapi-fhir-server/src/main/java/ca/uhn/fhir/rest/api/server/storage/BaseResourcePersistentId.java b/hapi-fhir-server/src/main/java/ca/uhn/fhir/rest/api/server/storage/BaseResourcePersistentId.java index 872acd01f715..674e6e764723 100644 --- a/hapi-fhir-server/src/main/java/ca/uhn/fhir/rest/api/server/storage/BaseResourcePersistentId.java +++ b/hapi-fhir-server/src/main/java/ca/uhn/fhir/rest/api/server/storage/BaseResourcePersistentId.java @@ -81,6 +81,9 @@ public void setVersion(Long theVersion) { @Override public String getResourceType() { + if (myResourceType == null && myAssociatedResourceId != null && myAssociatedResourceId.hasResourceType()) { + myResourceType = myAssociatedResourceId.getResourceType(); + } return myResourceType; } diff --git a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/export/BulkDataExportProvider.java b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/export/BulkDataExportProvider.java index c9b6faa54508..db86573671ef 100644 --- a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/export/BulkDataExportProvider.java +++ b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/export/BulkDataExportProvider.java @@ -262,6 +262,8 @@ public void patientExport( List thePatient, @OperationParam(name = JpaConstants.PARAM_EXPORT_IDENTIFIER, min = 0, max = 1, typeName = "string") IPrimitiveType theExportIdentifier, + @OperationParam(name = JpaConstants.PARAM_EXPORT_MDM, min = 0, max = 1, typeName = "boolean") + IPrimitiveType theMdm, @OperationParam(name = JpaConstants.PARAM_EXPORT_INCLUDE_HISTORY, min = 0, max = 1, typeName = "boolean") IPrimitiveType theIncludeHistory, ServletRequestDetails theRequestDetails) { @@ -279,6 +281,7 @@ public void patientExport( theTypeFilter, theTypePostFetchFilterUrl, patientIds, + theMdm, theIncludeHistory); } @@ -314,6 +317,8 @@ public void patientInstanceExport( List> theTypePostFetchFilterUrl, @OperationParam(name = JpaConstants.PARAM_EXPORT_IDENTIFIER, min = 0, max = 1, typeName = "string") IPrimitiveType theExportIdentifier, + @OperationParam(name = JpaConstants.PARAM_EXPORT_MDM, min = 0, max = 1, typeName = "boolean") + IPrimitiveType theMdm, @OperationParam(name = JpaConstants.PARAM_EXPORT_INCLUDE_HISTORY, min = 0, max = 1, typeName = "boolean") IPrimitiveType theIncludeHistory, ServletRequestDetails theRequestDetails) { @@ -328,6 +333,7 @@ public void patientInstanceExport( theTypePostFetchFilterUrl, List.of(theIdParam), theExportIdentifier, + theMdm, theIncludeHistory, theRequestDetails); } @@ -371,6 +377,7 @@ private void doPatientExport( List> theTypeFilter, List> theTypePostFetchFilterUrl, List> thePatientIds, + IPrimitiveType theMdmExpansion, IPrimitiveType theIncludeHistory) { ServletRequestUtil.validatePreferAsyncHeader(theRequestDetails, ProviderConstants.OPERATION_EXPORT); @@ -398,6 +405,7 @@ private void doPatientExport( .exportStyle(ExportStyle.PATIENT) .postFetchFilterUrl(theTypePostFetchFilterUrl) .patientIds(thePatientIds) + .expandMdm(theMdmExpansion) .includeHistory(theIncludeHistory) .build(); diff --git a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/export/BulkExportAppCtx.java b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/export/BulkExportAppCtx.java index 2d200d8c9c89..a2fbafb5eb19 100644 --- a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/export/BulkExportAppCtx.java +++ b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/export/BulkExportAppCtx.java @@ -23,6 +23,8 @@ import ca.uhn.fhir.batch2.jobs.export.models.BulkExportBinaryFileId; import ca.uhn.fhir.batch2.jobs.export.models.ExpandedResourcesList; import ca.uhn.fhir.batch2.jobs.export.models.ResourceIdList; +import ca.uhn.fhir.batch2.jobs.export.svc.BulkExportIdFetchingSvc; +import ca.uhn.fhir.batch2.jobs.export.v3.BulkExportV3Config; import ca.uhn.fhir.batch2.model.JobDefinition; import ca.uhn.fhir.jpa.api.model.BulkExportJobResults; import ca.uhn.fhir.model.api.IModelJson; @@ -30,8 +32,10 @@ import ca.uhn.fhir.util.Batch2JobDefinitionConstants; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; -@Configuration +@Configuration() +@Import({BulkExportV3Config.class}) public class BulkExportAppCtx { public static final String WRITE_TO_BINARIES = "write-to-binaries"; @@ -147,4 +151,11 @@ public ExpandResourceAndWriteBinaryStep expandResourceAndWriteBinaryStep() { public BulkExportCreateReportStep createReportStep() { return new BulkExportCreateReportStep(); } + + // shared + + @Bean + public BulkExportIdFetchingSvc resourceIdFetchingSvc() { + return new BulkExportIdFetchingSvc(); + } } diff --git a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/export/FetchResourceIdsStep.java b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/export/FetchResourceIdsStep.java index 32361f1791eb..02caa73d49cd 100644 --- a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/export/FetchResourceIdsStep.java +++ b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/export/FetchResourceIdsStep.java @@ -25,39 +25,20 @@ import ca.uhn.fhir.batch2.api.RunOutcome; import ca.uhn.fhir.batch2.api.StepExecutionDetails; import ca.uhn.fhir.batch2.api.VoidModel; -import ca.uhn.fhir.batch2.jobs.chunk.TypedPidJson; import ca.uhn.fhir.batch2.jobs.export.models.ResourceIdList; -import ca.uhn.fhir.i18n.Msg; -import ca.uhn.fhir.jpa.api.config.JpaStorageSettings; -import ca.uhn.fhir.jpa.bulk.export.api.IBulkExportProcessor; +import ca.uhn.fhir.batch2.jobs.export.svc.BulkExportIdFetchingSvc; import ca.uhn.fhir.jpa.bulk.export.model.ExportPIDIteratorParameters; -import ca.uhn.fhir.rest.api.IResourceSupportedSvc; import ca.uhn.fhir.rest.api.server.bulk.BulkExportJobParameters; -import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId; -import ca.uhn.fhir.util.SearchParameterUtil; -import com.google.common.annotations.VisibleForTesting; import jakarta.annotation.Nonnull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Set; - public class FetchResourceIdsStep implements IFirstJobStepWorker { private static final Logger ourLog = LoggerFactory.getLogger(FetchResourceIdsStep.class); @Autowired - private IBulkExportProcessor myBulkExportProcessor; - - @Autowired - private JpaStorageSettings myStorageSettings; - - @Autowired - private IResourceSupportedSvc myResourceSupportedSvc; + private BulkExportIdFetchingSvc myBulkExportProcessor; @Nonnull @Override @@ -93,108 +74,14 @@ public RunOutcome run( int submissionCount = 0; try { - Set submittedBatchResourceIds = new HashSet<>(); - - /* - * NB: patient-compartment limitation - * We know that Group and List are part of patient compartment. - * But allowing export of them seems like a security flaw. - * So we'll exclude them. - */ - Set resourceTypesToOmit = - theStepExecutionDetails.getParameters().getExportStyle() - == BulkExportJobParameters.ExportStyle.PATIENT - ? new HashSet<>( - SearchParameterUtil.RESOURCE_TYPES_TO_SP_TO_OMIT_FROM_PATIENT_COMPARTMENT.keySet()) - : Set.of(); - - /* - * We will fetch ids for each resource type in the ResourceTypes (_type filter). - */ - for (String resourceType : params.getResourceTypes()) { - if (resourceTypesToOmit.contains(resourceType) || !myResourceSupportedSvc.isSupported(resourceType)) { - continue; - } - providerParams.setResourceType(resourceType); - - // filters are the filters for searching - ourLog.info( - "Running FetchResourceIdsStep for resource type: {} with params: {}", - resourceType, - providerParams); - @SuppressWarnings("unchecked") - Iterator> pidIterator = (Iterator>) - myBulkExportProcessor.getResourcePidIterator(providerParams); - List idsToSubmit = new ArrayList<>(); - - int estimatedChunkSize = 0; - - if (!pidIterator.hasNext()) { - ourLog.debug("Bulk Export generated an iterator with no results!"); - } - while (pidIterator.hasNext()) { - IResourcePersistentId pid = pidIterator.next(); - - TypedPidJson batchResourceId; - if (pid.getResourceType() != null) { - batchResourceId = new TypedPidJson(pid.getResourceType(), pid); - } else { - batchResourceId = new TypedPidJson(resourceType, pid); - } - - if (!submittedBatchResourceIds.add(batchResourceId)) { - continue; - } - - idsToSubmit.add(batchResourceId); - - if (estimatedChunkSize > 0) { - // Account for comma between array entries - estimatedChunkSize++; - } - estimatedChunkSize += batchResourceId.estimateSerializedSize(); - - // Make sure resources stored in each batch does not go over the max capacity - if (idsToSubmit.size() >= myStorageSettings.getBulkExportFileMaximumCapacity() - || estimatedChunkSize >= myStorageSettings.getBulkExportFileMaximumSize()) { - submitWorkChunk(idsToSubmit, resourceType, theDataSink); - submissionCount++; - idsToSubmit = new ArrayList<>(); - estimatedChunkSize = 0; - } - } - - // if we have any other Ids left, submit them now - if (!idsToSubmit.isEmpty()) { - submitWorkChunk(idsToSubmit, resourceType, theDataSink); - submissionCount++; - } - } - } catch (Exception ex) { - ourLog.error(ex.getMessage(), ex); - + submissionCount = myBulkExportProcessor.fetchIds(providerParams, theDataSink::accept); + } catch (JobExecutionFailedException ex) { theDataSink.recoveredError(ex.getMessage()); - - throw new JobExecutionFailedException(Msg.code(2239) + " : " + ex.getMessage()); + // rethrow so it can be properly handled + throw ex; } ourLog.info("Submitted {} groups of ids for processing", submissionCount); return RunOutcome.SUCCESS; } - - private void submitWorkChunk( - List theBatchResourceIds, String theResourceType, IJobDataSink theDataSink) { - ResourceIdList idList = new ResourceIdList(); - - idList.setIds(theBatchResourceIds); - - idList.setResourceType(theResourceType); - - theDataSink.accept(idList); - } - - @VisibleForTesting - public void setBulkExportProcessorForUnitTest(IBulkExportProcessor theBulkExportProcessor) { - myBulkExportProcessor = theBulkExportProcessor; - } } diff --git a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/export/models/MdmExpandedPatientIds.java b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/export/models/MdmExpandedPatientIds.java new file mode 100644 index 000000000000..54526e644a16 --- /dev/null +++ b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/export/models/MdmExpandedPatientIds.java @@ -0,0 +1,31 @@ +package ca.uhn.fhir.batch2.jobs.export.models; + +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.ArrayList; +import java.util.List; + +public class MdmExpandedPatientIds extends BulkExportJobBase { + + /** + * List of Id objects for serialization; + * Using TypedPidJson, but all types will be Patient + */ + @JsonProperty("patientIds") + private List myExpandedPatientIds; + + public List getExpandedPatientIds() { + if (myExpandedPatientIds == null) { + myExpandedPatientIds = new ArrayList<>(); + } + return myExpandedPatientIds; + } + + public void setExpandedPatientIds(List theExpandedPatientIds) { + myExpandedPatientIds = theExpandedPatientIds; + } + + public void addExpandedPatientId(PatientIdAndPidJson theId) { + getExpandedPatientIds().add(theId); + } +} diff --git a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/export/models/PatientIdAndPidJson.java b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/export/models/PatientIdAndPidJson.java new file mode 100644 index 000000000000..e620d11826f2 --- /dev/null +++ b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/export/models/PatientIdAndPidJson.java @@ -0,0 +1,53 @@ +package ca.uhn.fhir.batch2.jobs.export.models; + +import ca.uhn.fhir.batch2.jobs.chunk.TypedPidJson; +import ca.uhn.fhir.context.FhirContext; +import ca.uhn.fhir.jpa.api.svc.IIdHelperService; +import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.hl7.fhir.instance.model.api.IIdType; + +public class PatientIdAndPidJson extends TypedPidJson { + + /** + * This is the actual resource id (server or forced). + * Eg: Patient/123 or Patient/RED + */ + @JsonProperty("resourceId") + private String myResourceId; + + /** + * Empty constructor for serialization + */ + public PatientIdAndPidJson() {} + + public PatientIdAndPidJson(IResourcePersistentId theResourcePersistentId) { + super( + theResourcePersistentId.getResourceType(), + theResourcePersistentId.getPartitionId(), + theResourcePersistentId.getId().toString()); + + setResourceId(theResourcePersistentId.getAssociatedResourceId()); + } + + public String getResourceId() { + return myResourceId; + } + + public void setResourceId(IIdType theIdType) { + this.setResourceId(theIdType.toUnqualifiedVersionless().getValue()); + } + + public void setResourceId(String theResourceId) { + myResourceId = theResourceId; + } + + public > T toPersistentId( + IIdHelperService theIdHelperService, FhirContext theFhirContext) { + T pid = super.toPersistentId(theIdHelperService); + + // set the resource persistent id + pid.setAssociatedResourceId(theFhirContext.getVersion().newIdType(getResourceId())); + return pid; + } +} diff --git a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/export/models/ResourceIdList.java b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/export/models/ResourceIdList.java index 77267e6e5c03..f548cdc9ee40 100644 --- a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/export/models/ResourceIdList.java +++ b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/export/models/ResourceIdList.java @@ -22,6 +22,7 @@ import ca.uhn.fhir.batch2.jobs.chunk.TypedPidJson; import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.ArrayList; import java.util.List; public class ResourceIdList extends BulkExportJobBase { @@ -36,6 +37,9 @@ public class ResourceIdList extends BulkExportJobBase { private String myResourceType; public List getIds() { + if (myBatchResourceIds == null) { + myBatchResourceIds = new ArrayList<>(); + } return myBatchResourceIds; } @@ -43,6 +47,10 @@ public void setIds(List theBatchResourceIds) { myBatchResourceIds = theBatchResourceIds; } + public void addId(TypedPidJson theTypedPidJson) { + getIds().add(theTypedPidJson); + } + public String getResourceType() { return myResourceType; } diff --git a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/export/svc/BulkExportIdFetchingSvc.java b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/export/svc/BulkExportIdFetchingSvc.java new file mode 100644 index 000000000000..fef393dbce00 --- /dev/null +++ b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/export/svc/BulkExportIdFetchingSvc.java @@ -0,0 +1,136 @@ +package ca.uhn.fhir.batch2.jobs.export.svc; + +import ca.uhn.fhir.batch2.api.JobExecutionFailedException; +import ca.uhn.fhir.batch2.jobs.chunk.TypedPidJson; +import ca.uhn.fhir.batch2.jobs.export.models.ResourceIdList; +import ca.uhn.fhir.i18n.Msg; +import ca.uhn.fhir.jpa.api.config.JpaStorageSettings; +import ca.uhn.fhir.jpa.bulk.export.api.IBulkExportProcessor; +import ca.uhn.fhir.jpa.bulk.export.model.ExportPIDIteratorParameters; +import ca.uhn.fhir.rest.api.IResourceSupportedSvc; +import ca.uhn.fhir.rest.api.server.bulk.BulkExportJobParameters; +import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId; +import ca.uhn.fhir.util.SearchParameterUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Set; +import java.util.function.Consumer; + +public class BulkExportIdFetchingSvc { + + private static final Logger ourLog = LoggerFactory.getLogger(BulkExportIdFetchingSvc.class); + + @SuppressWarnings("rawtypes") + @Autowired + private IBulkExportProcessor myBulkExportProcessor; + + @Autowired + private JpaStorageSettings myStorageSettings; + + @Autowired + private IResourceSupportedSvc myResourceSupportedSvc; + + public int fetchIds(ExportPIDIteratorParameters theProviderParameters, Consumer theConsumer) { + BulkExportJobParameters.ExportStyle exportStyle = theProviderParameters.getExportStyle(); + List resourceTypes = theProviderParameters.getRequestedResourceTypes(); + + int submissionCount = 0; + + try { + Set submittedBatchResourceIds = new HashSet<>(); + + /* + * NB: patient-compartment limitation + * We know that Group and List are part of patient compartment. + * But allowing export of them seems like a security flaw. + * So we'll exclude them. + */ + Set resourceTypesToOmit = exportStyle == BulkExportJobParameters.ExportStyle.PATIENT + ? new HashSet<>(SearchParameterUtil.RESOURCE_TYPES_TO_SP_TO_OMIT_FROM_PATIENT_COMPARTMENT.keySet()) + : Set.of(); + + /* + * We will fetch ids for each resource type in the ResourceTypes (_type filter). + */ + for (String resourceType : resourceTypes) { + if (resourceTypesToOmit.contains(resourceType) || !myResourceSupportedSvc.isSupported(resourceType)) { + continue; + } + // clone them because we'll change them before use + ExportPIDIteratorParameters providerParams = new ExportPIDIteratorParameters(theProviderParameters); + providerParams.setResourceType(resourceType); + + // filters are the filters for searching + ourLog.info("Running FetchIds for resource type: {} with params: {}", resourceType, providerParams); + + @SuppressWarnings({"rawtypes", "unchecked"}) + Iterator pidIterator = + myBulkExportProcessor.getResourcePidIterator(providerParams); + List idsToSubmit = new ArrayList<>(); + + int estimatedChunkSize = 0; + + if (!pidIterator.hasNext()) { + ourLog.debug("Bulk Export generated an iterator with no results!"); + } + + while (pidIterator.hasNext()) { + IResourcePersistentId pid = pidIterator.next(); + + TypedPidJson batchResourceId; + if (pid.getResourceType() != null) { + batchResourceId = new TypedPidJson(pid.getResourceType(), pid); + } else { + batchResourceId = new TypedPidJson(resourceType, pid); + } + + if (!submittedBatchResourceIds.add(batchResourceId)) { + continue; + } + + idsToSubmit.add(batchResourceId); + + if (estimatedChunkSize > 0) { + // Account for comma between array entries + estimatedChunkSize++; + } + estimatedChunkSize += batchResourceId.estimateSerializedSize(); + + // Make sure resources stored in each batch does not go over the max capacity + if (idsToSubmit.size() >= myStorageSettings.getBulkExportFileMaximumCapacity() + || estimatedChunkSize >= myStorageSettings.getBulkExportFileMaximumSize()) { + ResourceIdList list = new ResourceIdList(); + list.setIds(idsToSubmit); + list.setResourceType(resourceType); + theConsumer.accept(list); + submissionCount++; + idsToSubmit = new ArrayList<>(); + estimatedChunkSize = 0; + } + } + + // if we have any other Ids left, submit them now + if (!idsToSubmit.isEmpty()) { + ResourceIdList list = new ResourceIdList(); + list.setIds(idsToSubmit); + list.setResourceType(resourceType); + theConsumer.accept(list); + submissionCount++; + } + } + } catch (Exception ex) { + // any recoverable error presumably + ourLog.error(ex.getMessage(), ex); + + throw new JobExecutionFailedException(Msg.code(2239) + " : " + ex.getMessage(), ex); + } + + return submissionCount; + } +} diff --git a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/export/v3/BulkExportV3Config.java b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/export/v3/BulkExportV3Config.java new file mode 100644 index 000000000000..45bc7ae5eb0e --- /dev/null +++ b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/export/v3/BulkExportV3Config.java @@ -0,0 +1,111 @@ +package ca.uhn.fhir.batch2.jobs.export.v3; + +import ca.uhn.fhir.batch2.api.VoidModel; +import ca.uhn.fhir.batch2.jobs.export.BulkExportCreateReportStep; +import ca.uhn.fhir.batch2.jobs.export.BulkExportJobParametersValidator; +import ca.uhn.fhir.batch2.jobs.export.ExpandResourceAndWriteBinaryStep; +import ca.uhn.fhir.batch2.jobs.export.ExpandResourcesStep; +import ca.uhn.fhir.batch2.jobs.export.WriteBinaryStep; +import ca.uhn.fhir.batch2.jobs.export.models.BulkExportBinaryFileId; +import ca.uhn.fhir.batch2.jobs.export.models.MdmExpandedPatientIds; +import ca.uhn.fhir.batch2.jobs.export.models.ResourceIdList; +import ca.uhn.fhir.batch2.model.JobDefinition; +import ca.uhn.fhir.jpa.api.model.BulkExportJobResults; +import ca.uhn.fhir.model.api.IModelJson; +import ca.uhn.fhir.rest.api.server.bulk.BulkExportJobParameters; +import ca.uhn.fhir.util.Batch2JobDefinitionConstants; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import static ca.uhn.fhir.batch2.jobs.export.BulkExportAppCtx.WRITE_TO_BINARIES; + +@Configuration +public class BulkExportV3Config { + + @Bean + public JobDefinition bulkExportJobV3Definition() { + JobDefinition.Builder builder = JobDefinition.newBuilder(); + builder.setJobDefinitionId(Batch2JobDefinitionConstants.BULK_EXPORT); + builder.setJobDescription("FHIR Bulk Export"); + builder.setJobDefinitionVersion(3); + + JobDefinition def = builder.setParametersType(BulkExportJobParameters.class) + // validator + .setParametersValidator(bulkExportJobParametersValidator()) + .gatedExecution() + .addFirstStep( + "mdm-expand-if-necessary", + "Expand out patient ids if necessary", + MdmExpandedPatientIds.class, + mdmExpansionStep()) + // load in (all) ids and create id chunks of 1000 each + .addIntermediateStep( + "fetch-resources", + "Fetches resource PIDs for exporting", + ResourceIdList.class, + fetchResourceIdsV3Step()) + // expand out - fetch resources + // and write binaries and save to db + .addIntermediateStep( + WRITE_TO_BINARIES, + "Writes the expanded resources to the binaries and saves", + BulkExportBinaryFileId.class, + expandResourceAndWriteBinaryStep()) + // finalize the job (set to complete) + .addFinalReducerStep( + "create-report-step", + "Creates the output report from a bulk export job", + BulkExportJobResults.class, + createReportStep()) + .build(); + + return def; + } + + @Bean + public BulkExportJobParametersValidator bulkExportJobParametersValidator() { + return new BulkExportJobParametersValidator(); + } + + @Bean + public FetchIdsV3Step fetchResourceIdsV3Step() { + return new FetchIdsV3Step(); + } + + /** + * pre-expands patients lists (if necessary) + */ + @Bean + public MdmExpansionStep mdmExpansionStep() { + return new MdmExpansionStep(); + } + + /** + * Note, this bean is only used for version 1 of the bulk export job definition + */ + @Bean + public ExpandResourcesStep expandResourcesStep() { + return new ExpandResourcesStep(); + } + + /** + * Note, this bean is only used for version 1 of the bulk export job definition + */ + @Bean + public WriteBinaryStep writeBinaryStep() { + return new WriteBinaryStep(); + } + + /** + * Note, this bean is only used for version 2 of the bulk export job definition + */ + @Bean + public ExpandResourceAndWriteBinaryStep expandResourceAndWriteBinaryStep() { + return new ExpandResourceAndWriteBinaryStep(); + } + + @Bean + public BulkExportCreateReportStep createReportStep() { + return new BulkExportCreateReportStep(); + } +} diff --git a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/export/v3/FetchIdsV3Step.java b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/export/v3/FetchIdsV3Step.java new file mode 100644 index 000000000000..345d49e13fff --- /dev/null +++ b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/export/v3/FetchIdsV3Step.java @@ -0,0 +1,85 @@ +package ca.uhn.fhir.batch2.jobs.export.v3; + +import ca.uhn.fhir.batch2.api.IJobDataSink; +import ca.uhn.fhir.batch2.api.IJobStepWorker; +import ca.uhn.fhir.batch2.api.JobExecutionFailedException; +import ca.uhn.fhir.batch2.api.RunOutcome; +import ca.uhn.fhir.batch2.api.StepExecutionDetails; +import ca.uhn.fhir.batch2.jobs.export.models.MdmExpandedPatientIds; +import ca.uhn.fhir.batch2.jobs.export.models.PatientIdAndPidJson; +import ca.uhn.fhir.batch2.jobs.export.models.ResourceIdList; +import ca.uhn.fhir.batch2.jobs.export.svc.BulkExportIdFetchingSvc; +import ca.uhn.fhir.context.FhirContext; +import ca.uhn.fhir.jpa.api.svc.IIdHelperService; +import ca.uhn.fhir.jpa.bulk.export.model.ExportPIDIteratorParameters; +import ca.uhn.fhir.jpa.model.dao.JpaPid; +import ca.uhn.fhir.rest.api.server.bulk.BulkExportJobParameters; +import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId; +import jakarta.annotation.Nonnull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; + +import java.util.List; +import java.util.stream.Collectors; + +public class FetchIdsV3Step implements IJobStepWorker { + private static final Logger ourLog = LoggerFactory.getLogger(FetchIdsV3Step.class); + + @Autowired + private BulkExportIdFetchingSvc myBulkExportIdFetchingSvc; + + @Autowired + private IIdHelperService myIdHelperService; + + @Autowired + private FhirContext myFhirContext; + + @Override + public RunOutcome run( + @Nonnull StepExecutionDetails theStepExecutionDetails, + @Nonnull IJobDataSink theDataSink) + throws JobExecutionFailedException { + BulkExportJobParameters params = theStepExecutionDetails.getParameters(); + MdmExpandedPatientIds expandedPatientIds = theStepExecutionDetails.getData(); + ourLog.info( + "Fetching resource IDs for bulk export job instance [{}]", + theStepExecutionDetails.getInstance().getInstanceId()); + + ExportPIDIteratorParameters providerParams = new ExportPIDIteratorParameters(); + providerParams.setInstanceId(theStepExecutionDetails.getInstance().getInstanceId()); + providerParams.setChunkId(theStepExecutionDetails.getChunkId()); + providerParams.setFilters(params.getFilters()); + providerParams.setStartDate(params.getSince()); + providerParams.setEndDate(params.getUntil()); + providerParams.setExportStyle(params.getExportStyle()); + providerParams.setGroupId(params.getGroupId()); + providerParams.setPatientIds(params.getPatientIds()); + providerParams.setExpandMdm(params.isExpandMdm()); + providerParams.setPartitionId(params.getPartitionId()); + providerParams.setRequestedResourceTypes(params.getResourceTypes()); + + if (params.isExpandMdm()) { + providerParams.setExpandedPatientIds(convertExpandedPatientIds(expandedPatientIds.getExpandedPatientIds())); + } + + int submissionCount = 0; + try { + submissionCount = myBulkExportIdFetchingSvc.fetchIds(providerParams, theDataSink::accept); + } catch (JobExecutionFailedException ex) { + theDataSink.recoveredError(ex.getMessage()); + throw ex; + } + + ourLog.info("Submitted {} groups of ids for processing", submissionCount); + return RunOutcome.SUCCESS; + } + + private List> convertExpandedPatientIds(List theExpanded) { + return theExpanded.stream() + .map(id -> { + return id.toPersistentId(myIdHelperService, myFhirContext); + }) + .collect(Collectors.toList()); + } +} diff --git a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/export/v3/MdmExpansionStep.java b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/export/v3/MdmExpansionStep.java new file mode 100644 index 000000000000..4a1634c64347 --- /dev/null +++ b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/export/v3/MdmExpansionStep.java @@ -0,0 +1,69 @@ +package ca.uhn.fhir.batch2.jobs.export.v3; + +import ca.uhn.fhir.batch2.api.IFirstJobStepWorker; +import ca.uhn.fhir.batch2.api.IJobDataSink; +import ca.uhn.fhir.batch2.api.JobExecutionFailedException; +import ca.uhn.fhir.batch2.api.RunOutcome; +import ca.uhn.fhir.batch2.api.StepExecutionDetails; +import ca.uhn.fhir.batch2.api.VoidModel; +import ca.uhn.fhir.batch2.jobs.export.models.MdmExpandedPatientIds; +import ca.uhn.fhir.batch2.jobs.export.models.PatientIdAndPidJson; +import ca.uhn.fhir.jpa.bulk.export.api.IBulkExportProcessor; +import ca.uhn.fhir.jpa.bulk.export.model.ExpandPatientIdsParams; +import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc; +import ca.uhn.fhir.rest.api.server.bulk.BulkExportJobParameters; +import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId; +import jakarta.annotation.Nonnull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; + +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +public class MdmExpansionStep implements IFirstJobStepWorker { + private static final Logger ourLog = LoggerFactory.getLogger(MdmExpansionStep.class); + + @SuppressWarnings("rawtypes") + @Autowired + private IBulkExportProcessor myBulkExportProcessor; + + @Autowired + private IRequestPartitionHelperSvc myRequestPartitionHelperSvc; + + @Override + public RunOutcome run( + StepExecutionDetails theStepExecutionDetails, + @Nonnull IJobDataSink theDataSink) + throws JobExecutionFailedException { + BulkExportJobParameters jobParameters = theStepExecutionDetails.getParameters(); + + ourLog.info( + "Doing MDM expansion for bulk export job instance[{}]", + theStepExecutionDetails.getInstance().getInstanceId()); + + ExpandPatientIdsParams params = new ExpandPatientIdsParams(jobParameters.getExportStyle()); + params.setShouldDoMdmExpansion(jobParameters.isExpandMdm()); + params.setGroupId(jobParameters.getGroupId()); + params.setRequestPartitionId(jobParameters.getPartitionId()); + params.setFilters(jobParameters.getFilters()); + List patientIds = jobParameters.getPatientIds(); + + params.setPatientIds(patientIds); + + @SuppressWarnings("unchecked") + Set> resourcePersistentIdSet = myBulkExportProcessor.expandPatientIdList(params); + + MdmExpandedPatientIds expandedPatientIds = new MdmExpandedPatientIds(); + expandedPatientIds.setExpandedPatientIds( + resourcePersistentIdSet.stream().map(PatientIdAndPidJson::new).collect(Collectors.toList())); + theDataSink.accept(expandedPatientIds); + + ourLog.info( + "MDM expansion performed generating {} ids", + expandedPatientIds.getExpandedPatientIds().size()); + + return RunOutcome.SUCCESS; + } +} diff --git a/hapi-fhir-storage-batch2-jobs/src/test/java/ca/uhn/fhir/batch2/jobs/export/FetchResourceIdsStepTest.java b/hapi-fhir-storage-batch2-jobs/src/test/java/ca/uhn/fhir/batch2/jobs/export/FetchResourceIdsStepTest.java index 3fb9e8610fb0..1b5f265c71b2 100644 --- a/hapi-fhir-storage-batch2-jobs/src/test/java/ca/uhn/fhir/batch2/jobs/export/FetchResourceIdsStepTest.java +++ b/hapi-fhir-storage-batch2-jobs/src/test/java/ca/uhn/fhir/batch2/jobs/export/FetchResourceIdsStepTest.java @@ -6,14 +6,11 @@ import ca.uhn.fhir.batch2.api.VoidModel; import ca.uhn.fhir.batch2.jobs.chunk.TypedPidJson; import ca.uhn.fhir.batch2.jobs.export.models.ResourceIdList; +import ca.uhn.fhir.batch2.jobs.export.svc.BulkExportIdFetchingSvc; import ca.uhn.fhir.batch2.model.JobInstance; import ca.uhn.fhir.batch2.model.WorkChunk; import ca.uhn.fhir.interceptor.model.RequestPartitionId; -import ca.uhn.fhir.jpa.api.config.JpaStorageSettings; -import ca.uhn.fhir.jpa.bulk.export.api.IBulkExportProcessor; import ca.uhn.fhir.jpa.bulk.export.model.ExportPIDIteratorParameters; -import ca.uhn.fhir.jpa.model.dao.JpaPid; -import ca.uhn.fhir.rest.api.IResourceSupportedSvc; import ca.uhn.fhir.rest.api.server.bulk.BulkExportJobParameters; import ch.qos.logback.classic.Level; import ch.qos.logback.classic.Logger; @@ -36,13 +33,12 @@ import java.util.Collections; import java.util.Date; import java.util.List; +import java.util.function.Consumer; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -57,20 +53,14 @@ public class FetchResourceIdsStepTest { private ListAppender myAppender; @Mock - private IBulkExportProcessor myBulkExportProcessor; + private BulkExportIdFetchingSvc myBulkExportIdFetchingSvc; @InjectMocks private FetchResourceIdsStep myFirstStep; - @Mock - private JpaStorageSettings myStorageSettings; - - @Mock - private IResourceSupportedSvc myResourceSupportedSvc; @BeforeEach public void init() { ourLog.addAppender(myAppender); - myFirstStep.setBulkExportProcessorForUnitTest(myBulkExportProcessor); } @AfterEach @@ -112,31 +102,48 @@ public void run_withValidInputs_succeeds(boolean thePartitioned) { instance.setInstanceId("1"); StepExecutionDetails input = createInput(parameters, instance); ourLog.setLevel(Level.INFO); - List patientIds = new ArrayList<>(); - List observationIds = new ArrayList<>(); +// List patientIds = new ArrayList<>(); +// List observationIds = new ArrayList<>(); + ResourceIdList patientIds = new ResourceIdList(); + patientIds.setResourceType("Patient"); + ResourceIdList observationIds = new ResourceIdList(); + observationIds.setResourceType("Observation"); { - JpaPid id1 = JpaPid.fromId(123L); - JpaPid id2 = JpaPid.fromId(234L); - patientIds.add(id1); - patientIds.add(id2); +// JpaPid id1 = JpaPid.fromId(123L); +// JpaPid id2 = JpaPid.fromId(234L); +// patientIds.add(id1); +// patientIds.add(id2); + List pids = new ArrayList<>(); + pids.add(new TypedPidJson("Patient", null, "123")); + pids.add(new TypedPidJson("Patient", null, "234")); + patientIds.setIds(pids); } { - JpaPid id1 = JpaPid.fromId(345L); - JpaPid id2 = JpaPid.fromId(456L); - observationIds.add(id1); - observationIds.add(id2); +// JpaPid id1 = JpaPid.fromId(345L); +// JpaPid id2 = JpaPid.fromId(456L); +// observationIds.add(id1); +// observationIds.add(id2); + observationIds.addId(new TypedPidJson("Observation", null, "345")); + observationIds.addId(new TypedPidJson("Observation", null, "456")); } // when - when(myResourceSupportedSvc.isSupported(anyString())).thenReturn(true); - when(myBulkExportProcessor.getResourcePidIterator( - any(ExportPIDIteratorParameters.class) - )).thenReturn(patientIds.iterator()) - .thenReturn(observationIds.iterator()); +// when(myResourceSupportedSvc.isSupported(anyString())).thenReturn(true); +// when(myBulkExportProcessor.getResourcePidIterator( +// any(ExportPIDIteratorParameters.class) +// )).thenReturn(patientIds.iterator()) +// .thenReturn(observationIds.iterator()); + when(myBulkExportIdFetchingSvc.fetchIds(any(ExportPIDIteratorParameters.class), any(Consumer.class))) + .thenAnswer(args -> { + Consumer consumer = args.getArgument(1); + consumer.accept(patientIds); + consumer.accept(observationIds); + return parameters.getResourceTypes().size(); + }); int maxFileCapacity = 1000; - when(myStorageSettings.getBulkExportFileMaximumCapacity()).thenReturn(maxFileCapacity); - when(myStorageSettings.getBulkExportFileMaximumSize()).thenReturn(10000L); +// when(myStorageSettings.getBulkExportFileMaximumCapacity()).thenReturn(maxFileCapacity); +// when(myStorageSettings.getBulkExportFileMaximumSize()).thenReturn(10000L); // test RunOutcome outcome = myFirstStep.run(input, sink); @@ -154,10 +161,10 @@ public void run_withValidInputs_succeeds(boolean thePartitioned) { assertThat(parameters.getResourceTypes()).contains(resourceType); if (resourceType.equals("Patient")) { - assertThat(idList.getIds()).hasSize(patientIds.size()); + assertThat(idList.getIds()).hasSize(patientIds.getIds().size()); } else if (resourceType.equals("Observation")) { - assertThat(idList.getIds()).hasSize(observationIds.size()); + assertThat(idList.getIds()).hasSize(observationIds.getIds().size()); } else { // we shouldn't have others @@ -169,17 +176,9 @@ else if (resourceType.equals("Observation")) { verify(myAppender, atLeastOnce()).doAppend(logCaptor.capture()); List events = logCaptor.getAllValues(); assertThat(events.get(0).getMessage()).contains("Fetching resource IDs for bulk export job instance"); - assertThat(events.get(1).getMessage()).contains("Running FetchResource"); - assertThat(events.get(2).getMessage()).contains("Running FetchResource"); - assertThat(events.get(3).getFormattedMessage()).contains("Submitted " + assertThat(events.get(1).getFormattedMessage()).contains("Submitted " + parameters.getResourceTypes().size() + " groups of ids for processing"); - - ArgumentCaptor mapppedParamsCaptor = ArgumentCaptor.forClass(ExportPIDIteratorParameters.class); - verify(myBulkExportProcessor, times(2)).getResourcePidIterator(mapppedParamsCaptor.capture()); - List capturedParameters = mapppedParamsCaptor.getAllValues(); - assertEquals(parameters.getPartitionId(), capturedParameters.get(0).getPartitionIdOrAllPartitions()); - assertEquals(parameters.getPartitionId(), capturedParameters.get(1).getPartitionIdOrAllPartitions()); } @Test @@ -192,24 +191,31 @@ public void run_moreThanTheMaxFileCapacityPatients_hasAtLeastTwoJobs() { parameters.setResourceTypes(Collections.singletonList("Patient")); StepExecutionDetails input = createInput(parameters, instance); ourLog.setLevel(Level.INFO); - List patientIds = new ArrayList<>(); +// List patientIds = new ArrayList<>(); + List patientIds = new ArrayList<>(); // when int maxFileCapacity = 5; - when(myStorageSettings.getBulkExportFileMaximumCapacity()).thenReturn(maxFileCapacity); - when(myStorageSettings.getBulkExportFileMaximumSize()).thenReturn(10000L); - when(myResourceSupportedSvc.isSupported(anyString())).thenReturn(true); - +// when(myStorageSettings.getBulkExportFileMaximumCapacity()).thenReturn(maxFileCapacity); +// when(myStorageSettings.getBulkExportFileMaximumSize()).thenReturn(10000L); +// when(myResourceSupportedSvc.isSupported(anyString())).thenReturn(true); + ResourceIdList list = new ResourceIdList(); + list.setResourceType("Patient"); for (int i = 0; i <= maxFileCapacity; i++) { - JpaPid id = JpaPid.fromId((long) i); - patientIds.add(id); + patientIds.add(new TypedPidJson("Patient", null, Long.toString(i))); } // when - when(myBulkExportProcessor.getResourcePidIterator( - any(ExportPIDIteratorParameters.class) - )).thenReturn(patientIds.iterator()); +// when(myBulkExportProcessor.getResourcePidIterator( +// any(ExportPIDIteratorParameters.class) +// )).thenReturn(patientIds.iterator()); + when(myBulkExportIdFetchingSvc.fetchIds(any(ExportPIDIteratorParameters.class), any(Consumer.class))) + .thenAnswer(args -> { + Consumer consumer = args.getArgument(1); + consumer.accept(list); + return 1; + }); // test RunOutcome outcome = myFirstStep.run(input, sink); @@ -218,22 +224,27 @@ public void run_moreThanTheMaxFileCapacityPatients_hasAtLeastTwoJobs() { ArgumentCaptor captor = ArgumentCaptor.forClass(ResourceIdList.class); assertEquals(RunOutcome.SUCCESS, outcome); - verify(sink, times(2)) - .accept(captor.capture()); - List listIds = captor.getAllValues(); - - // verify all submitted ids are there - boolean found = false; - for (JpaPid pid : patientIds) { - TypedPidJson batchResourceId = new TypedPidJson("Patient", pid); - for (ResourceIdList idList : listIds) { - found = idList.getIds().contains(batchResourceId); - if (found) { - break; - } - } - assertTrue(found); - found = false; - } + verify(sink).accept(captor.capture()); + List listids = captor.getAllValues(); + assertEquals(1, listids.size()); + assertEquals(list, listids.get(0)); + +// verify(sink, times(2)) +// .accept(captor.capture()); +// List listIds = captor.getAllValues(); +// +// // verify all submitted ids are there +// boolean found = false; +// for (JpaPid pid : patientIds) { +// TypedPidJson batchResourceId = new TypedPidJson("Patient", pid); +// for (ResourceIdList idList : listIds) { +// found = idList.getIds().contains(batchResourceId); +// if (found) { +// break; +// } +// } +// assertTrue(found); +// found = false; +// } } } diff --git a/hapi-fhir-storage-batch2-jobs/src/test/java/ca/uhn/fhir/batch2/jobs/export/svc/BulkExportIdFetchingSvcTest.java b/hapi-fhir-storage-batch2-jobs/src/test/java/ca/uhn/fhir/batch2/jobs/export/svc/BulkExportIdFetchingSvcTest.java new file mode 100644 index 000000000000..18c7e19ed8e5 --- /dev/null +++ b/hapi-fhir-storage-batch2-jobs/src/test/java/ca/uhn/fhir/batch2/jobs/export/svc/BulkExportIdFetchingSvcTest.java @@ -0,0 +1,155 @@ +package ca.uhn.fhir.batch2.jobs.export.svc; + + +import ca.uhn.fhir.batch2.jobs.chunk.TypedPidJson; +import ca.uhn.fhir.batch2.jobs.export.FetchResourceIdsStep; +import ca.uhn.fhir.batch2.jobs.export.models.ResourceIdList; +import ca.uhn.fhir.context.FhirContext; +import ca.uhn.fhir.jpa.api.config.JpaStorageSettings; +import ca.uhn.fhir.jpa.bulk.export.api.IBulkExportProcessor; +import ca.uhn.fhir.jpa.bulk.export.model.ExportPIDIteratorParameters; +import ca.uhn.fhir.jpa.model.dao.JpaPid; +import ca.uhn.fhir.jpa.model.entity.IdAndPartitionId; +import ca.uhn.fhir.rest.api.IResourceSupportedSvc; +import ca.uhn.fhir.rest.api.server.bulk.BulkExportJobParameters; +import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId; +import ca.uhn.test.concurrency.PointcutLatch; +import ch.qos.logback.classic.Level; +import ch.qos.logback.classic.Logger; +import ch.qos.logback.classic.spi.ILoggingEvent; +import ch.qos.logback.core.read.ListAppender; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Spy; +import org.mockito.junit.jupiter.MockitoExtension; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.function.Consumer; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +public class BulkExportIdFetchingSvcTest { + private static final Logger ourLog = (Logger) LoggerFactory.getLogger(BulkExportIdFetchingSvc.class); + + @Mock + private ListAppender myAppender; + + @Mock + private IBulkExportProcessor myBulkExportProcessor; + + @Spy + private JpaStorageSettings myStorageSettings = new JpaStorageSettings(); + + @Mock + private IResourceSupportedSvc myIResourceSupportedSvc; + + @InjectMocks + private BulkExportIdFetchingSvc mySvc; + + @BeforeEach + public void init() { + ourLog.addAppender(myAppender); + } + + @AfterEach + public void after() { + ourLog.detachAppender(myAppender); + } + + @Test + public void fetchIds_happyPath_test() throws InterruptedException { + // setup + ExportPIDIteratorParameters parameters = new ExportPIDIteratorParameters(); + parameters.setRequestedResourceTypes(List.of("Patient", "Observation")); + parameters.setExportStyle(BulkExportJobParameters.ExportStyle.PATIENT); + + List> patientIds = new ArrayList<>(); + List> observationIds = new ArrayList<>(); + + patientIds.add(JpaPid.fromIdAndVersionAndResourceType(1L, 1L, "Patient")); + patientIds.add(JpaPid.fromIdAndVersionAndResourceType(2L, 1L, "Patient")); + + observationIds.add(JpaPid.fromIdAndVersionAndResourceType(3L, 1L, "Observation")); + observationIds.add(JpaPid.fromIdAndVersionAndResourceType(4L, 1L, "Observation")); + + List resourceIdLists = new ArrayList<>(); + PointcutLatch latch = new PointcutLatch("latch"); + Consumer consumer = resourceIdList -> { + resourceIdLists.add(resourceIdList); + latch.call(1); + }; + + // when + when(myIResourceSupportedSvc.isSupported(anyString())) + .thenReturn(true); + when(myBulkExportProcessor.getResourcePidIterator(any(ExportPIDIteratorParameters.class))) + .thenReturn(patientIds.iterator()) + .thenReturn(observationIds.iterator()); + + // test + latch.setExpectedCount(2); + int submission = mySvc.fetchIds(parameters, consumer); + + // wait + latch.awaitExpected(); + + // verify + assertEquals(2, submission); + assertEquals(submission, resourceIdLists.size()); + resourceIdLists.forEach(rid -> { + assertTrue(rid.getResourceType().equals("Observation") || rid.getResourceType().equals("Patient")); + + switch (rid.getResourceType()) { + case "Observation" -> { + assertEquals(observationIds.size(), rid.getIds().size()); + Set strPid = new HashSet<>(); + for (IResourcePersistentId id : observationIds) { + assertTrue(id instanceof JpaPid); + JpaPid jpid = (JpaPid) id; + strPid.add(jpid.getId().toString()); + } + for (TypedPidJson id : rid.getIds()) { + assertTrue(strPid.contains(id.getPid())); + } + } + case "Patient" -> { + assertEquals(patientIds.size(), rid.getIds().size()); + Set strPid = new HashSet<>(); + for (IResourcePersistentId id : patientIds) { + assertTrue(id instanceof JpaPid); + JpaPid jpid = (JpaPid) id; + strPid.add(jpid.getId().toString()); + } + for (TypedPidJson id : rid.getIds()) { + assertTrue(strPid.contains(id.getPid())); + } + } + } + }); + + ArgumentCaptor captor = ArgumentCaptor.forClass(ILoggingEvent.class); + verify(myAppender, times(submission)).doAppend(captor.capture()); + captor.getAllValues() + .forEach(logEvent -> { + assertEquals(Level.INFO, logEvent.getLevel()); + assertTrue(logEvent.getMessage() + .contains("Running FetchIds for resource type")); + }); + } +} diff --git a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/api/svc/IIdHelperService.java b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/api/svc/IIdHelperService.java index 261a4122d6a3..45da92cba172 100644 --- a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/api/svc/IIdHelperService.java +++ b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/api/svc/IIdHelperService.java @@ -102,7 +102,7 @@ Map> resolveResourceIdentities( */ default List resolveResourcePids( RequestPartitionId theRequestPartitionId, - List theTargetIds, + Collection theTargetIds, ResolveIdentityMode theResolveIdentityMode) { return resolveResourceIdentities(theRequestPartitionId, theTargetIds, theResolveIdentityMode).values().stream() .map(IResourceLookup::getPersistentId) @@ -177,6 +177,22 @@ default T getPidOrThrowException(RequestPartitionId theRequestPartitionId, IIdTy */ Set translatePidsToFhirResourceIds(Set thePids); + /** + * This takes PIDs and, if they do not have the associated resource id added, + * pulls it from the db and adds it. + * @param thePids + */ + default void fillOutPids(Set thePids, FhirContext theContext) { + PersistentIdToForcedIdMap pidToForcedIdMap = translatePidsToForcedIds(thePids); + + thePids.forEach(pid -> { + Optional val = pidToForcedIdMap.get(pid); + val.ifPresent(id -> { + pid.setAssociatedResourceId(theContext.getVersion().newIdType(id)); + }); + }); + } + /** * @deprecated Use {@link #newPid(Object, Integer)} */ diff --git a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/bulk/export/api/IBulkExportProcessor.java b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/bulk/export/api/IBulkExportProcessor.java index c1b297ee4b4e..fa7cd4fe74f0 100644 --- a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/bulk/export/api/IBulkExportProcessor.java +++ b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/bulk/export/api/IBulkExportProcessor.java @@ -19,12 +19,15 @@ */ package ca.uhn.fhir.jpa.bulk.export.api; +import ca.uhn.fhir.jpa.bulk.export.model.ExpandPatientIdsParams; import ca.uhn.fhir.jpa.bulk.export.model.ExportPIDIteratorParameters; import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId; +import jakarta.annotation.Nonnull; import org.hl7.fhir.instance.model.api.IBaseResource; import java.util.Iterator; import java.util.List; +import java.util.Set; public interface IBulkExportProcessor { @@ -35,6 +38,15 @@ public interface IBulkExportProcessor { */ Iterator getResourcePidIterator(ExportPIDIteratorParameters theParams); + /** + * Expand out patient ids based on group, patient ids, and whether or not mdm expansion is + * desired + * @param theParams - parameters for doing the expansion of patient ids + * @return a list of patient ids (or an empty list if not applicable) + */ + @Nonnull + Set expandPatientIdList(ExpandPatientIdsParams theParams); + /** * Does the MDM expansion of resources if necessary * @param theResources - the list of resources to expand diff --git a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/bulk/export/model/ExpandPatientIdsParams.java b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/bulk/export/model/ExpandPatientIdsParams.java new file mode 100644 index 000000000000..ae49ab05ccf6 --- /dev/null +++ b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/bulk/export/model/ExpandPatientIdsParams.java @@ -0,0 +1,114 @@ +package ca.uhn.fhir.jpa.bulk.export.model; + +import ca.uhn.fhir.interceptor.model.RequestPartitionId; +import ca.uhn.fhir.rest.api.server.bulk.BulkExportJobParameters; + +import java.util.Date; +import java.util.List; + +public class ExpandPatientIdsParams { + /** + * The Export style + */ + private final BulkExportJobParameters.ExportStyle myExportStyle; + /** + * The group id, if available + */ + private String myGroupId; + /** + * Patient ids, if available + */ + private List myPatientIds; + /** + * Desired partition or all partitions. + */ + private RequestPartitionId myRequestPartitionId; + + /** + * List of filters to apply + */ + private List myFilters; + + /** + * The earliest date from which to retrieve records + */ + private Date myStartDate; + + /** + * The latest date to which to retrieve records + */ + private Date myEndDate; + + /** + * Whether or not to do mdm expansion + */ + private boolean myShouldDoMdmExpansion; + + public ExpandPatientIdsParams(BulkExportJobParameters.ExportStyle theExportStyle) { + myExportStyle = theExportStyle; + } + + public BulkExportJobParameters.ExportStyle getExportStyle() { + return myExportStyle; + } + + public String getGroupId() { + return myGroupId; + } + + public void setGroupId(String theGroupId) { + myGroupId = theGroupId; + } + + public List getPatientIds() { + return myPatientIds; + } + + public void setPatientIds(List thePatientIds) { + myPatientIds = thePatientIds; + } + + public RequestPartitionId getRequestPartitionId() { + if (myRequestPartitionId != null) { + return myRequestPartitionId; + } else { + return RequestPartitionId.allPartitions(); + } + } + + public void setRequestPartitionId(RequestPartitionId theRequestPartitionId) { + myRequestPartitionId = theRequestPartitionId; + } + + public boolean isShouldDoMdmExpansion() { + return myShouldDoMdmExpansion; + } + + public void setShouldDoMdmExpansion(boolean theShouldDoMdmExpansion) { + myShouldDoMdmExpansion = theShouldDoMdmExpansion; + } + + public List getFilters() { + return myFilters; + } + + public void setFilters(List theFilters) { + myFilters = theFilters; + } + + public Date getStartDate() { + return myStartDate; + } + + public void setStartDate(Date theStartDate) { + myStartDate = theStartDate; + } + + public Date getEndDate() { + return myEndDate; + } + + public void setEndDate(Date theEndDate) { + myEndDate = theEndDate; + } +} diff --git a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/bulk/export/model/ExportPIDIteratorParameters.java b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/bulk/export/model/ExportPIDIteratorParameters.java index a1baf19ad47a..e618a41be5be 100644 --- a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/bulk/export/model/ExportPIDIteratorParameters.java +++ b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/bulk/export/model/ExportPIDIteratorParameters.java @@ -21,6 +21,7 @@ import ca.uhn.fhir.interceptor.model.RequestPartitionId; import ca.uhn.fhir.rest.api.server.bulk.BulkExportJobParameters; +import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId; import org.apache.commons.lang3.builder.ToStringBuilder; import org.apache.commons.lang3.builder.ToStringStyle; @@ -91,6 +92,29 @@ public class ExportPIDIteratorParameters { private boolean myIncludeHistory; + private List> myExpandedPatientIds; + + public ExportPIDIteratorParameters() {} + + /** + * Copy constructor + */ + public ExportPIDIteratorParameters(ExportPIDIteratorParameters theParameters) { + setExpandedPatientIds(theParameters.getExpandedPatientIds()); + setPatientIds(theParameters.getPatientIds()); + setExportStyle(theParameters.getExportStyle()); + setResourceType(theParameters.getResourceType()); + setFilters(theParameters.getFilters()); + setStartDate(theParameters.getStartDate()); + setChunkId(theParameters.getChunkId()); + setEndDate(theParameters.getEndDate()); + setRequestedResourceTypes(theParameters.getRequestedResourceTypes()); + setPartitionId(theParameters.getPartitionIdOrAllPartitions()); + setGroupId(theParameters.getGroupId()); + setInstanceId(theParameters.getInstanceId()); + setExpandMdm(theParameters.isExpandMdm()); + } + public String getChunkId() { return myChunkId; } @@ -183,6 +207,21 @@ public void setPartitionId(RequestPartitionId thePartitionId) { myPartitionId = thePartitionId; } + public List> getExpandedPatientIds() { + return myExpandedPatientIds; + } + + public void setExpandedPatientIds(List> theExpandedPatientIds) { + myExpandedPatientIds = theExpandedPatientIds; + } + + public void addExpandedPatientId(IResourcePersistentId theId) { + if (myExpandedPatientIds == null) { + myExpandedPatientIds = new ArrayList<>(); + } + myExpandedPatientIds.add(theId); + } + public List getRequestedResourceTypes() { if (myRequestedResourceTypes == null) { myRequestedResourceTypes = new ArrayList<>(); diff --git a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/bulk/export/svc/BulkExportHelperService.java b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/bulk/export/svc/BulkExportHelperService.java index c64c1470826d..63ef2cf7865d 100644 --- a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/bulk/export/svc/BulkExportHelperService.java +++ b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/bulk/export/svc/BulkExportHelperService.java @@ -54,16 +54,26 @@ public BulkExportHelperService() {} */ public List createSearchParameterMapsForResourceType( RuntimeResourceDefinition theDef, ExportPIDIteratorParameters theParams, boolean theConsiderDateRange) { - String resourceType = theDef.getName(); + List typeFilters = theParams.getFilters(); + return createSearchParameterMapsForResourcetype( + theDef, typeFilters, theParams.getStartDate(), theParams.getEndDate(), theConsiderDateRange); + } + + public List createSearchParameterMapsForResourcetype( + RuntimeResourceDefinition theDef, + List theFilters, + Date theStartDate, + Date theEndDate, + boolean theConsiderDateRange) { + String resourceType = theDef.getName(); List spMaps = null; - spMaps = typeFilters.stream() + spMaps = theFilters.stream() .filter(typeFilter -> typeFilter.startsWith(resourceType + "?")) - .map(filter -> buildSearchParameterMapForTypeFilter( - filter, theDef, theParams.getStartDate(), theParams.getEndDate())) + .map(filter -> buildSearchParameterMapForTypeFilter(filter, theDef, theStartDate, theEndDate)) .collect(Collectors.toList()); - typeFilters.stream().filter(filter -> !filter.contains("?")).forEach(filter -> { + theFilters.stream().filter(filter -> !filter.contains("?")).forEach(filter -> { ourLog.warn( "Found a strange _typeFilter that we could not process: {}. _typeFilters should follow the format ResourceType?searchparameter=value .", filter); @@ -73,7 +83,7 @@ public List createSearchParameterMapsForResourceType( if (spMaps.isEmpty()) { SearchParameterMap defaultMap = new SearchParameterMap(); if (theConsiderDateRange) { - addLastUpdatedFilter(defaultMap, theParams.getStartDate(), theParams.getEndDate()); + addLastUpdatedFilter(defaultMap, theStartDate, theEndDate); } spMaps = Collections.singletonList(defaultMap); }