diff --git a/hapi-fhir-base/src/main/java/ca/uhn/fhir/interceptor/model/RequestPartitionId.java b/hapi-fhir-base/src/main/java/ca/uhn/fhir/interceptor/model/RequestPartitionId.java index 600bf2bc038a..bf0beb58dfbe 100644 --- a/hapi-fhir-base/src/main/java/ca/uhn/fhir/interceptor/model/RequestPartitionId.java +++ b/hapi-fhir-base/src/main/java/ca/uhn/fhir/interceptor/model/RequestPartitionId.java @@ -45,7 +45,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; -import static org.apache.commons.lang3.ObjectUtils.defaultIfNull; +import static org.apache.commons.lang3.ObjectUtils.getIfNull; /** * @since 5.0.0 @@ -72,10 +72,7 @@ public class RequestPartitionId implements IModelJson { */ private RequestPartitionId( @Nullable String thePartitionName, @Nullable Integer thePartitionId, @Nullable LocalDate thePartitionDate) { - myPartitionIds = toListOrNull(thePartitionId); - myPartitionNames = toListOrNull(thePartitionName); - myPartitionDate = thePartitionDate; - myAllPartitions = false; + this(toListOrNull(thePartitionName), toListOrNull(thePartitionId), thePartitionDate); } /** @@ -85,10 +82,21 @@ private RequestPartitionId( @Nullable List thePartitionName, @Nullable List thePartitionId, @Nullable LocalDate thePartitionDate) { + this(thePartitionName, thePartitionId, thePartitionDate, false); + } + + /** + * Constructor for a multiple partitions with explicit "all partitions" flag + */ + private RequestPartitionId( + @Nullable List thePartitionName, + @Nullable List thePartitionId, + @Nullable LocalDate thePartitionDate, + boolean theAllPartitions) { myPartitionIds = toListOrNull(thePartitionId); myPartitionNames = toListOrNull(thePartitionName); myPartitionDate = thePartitionDate; - myAllPartitions = false; + myAllPartitions = theAllPartitions; } /** @@ -117,7 +125,7 @@ public static Optional getPartitionIfAssigned(IBaseResource * @since 7.4.0 */ public RequestPartitionId mergeIds(RequestPartitionId theOther) { - if (isAllPartitions() || theOther.isAllPartitions()) { + if ((isAllPartitions() && !hasPartitionIds()) || (theOther.isAllPartitions() && !theOther.hasPartitionIds())) { return RequestPartitionId.allPartitions(); } @@ -131,7 +139,12 @@ public RequestPartitionId mergeIds(RequestPartitionId theOther) { List newPartitionIds = Stream.concat(thisPartitionIds.stream(), otherPartitionIds.stream()) .distinct() .collect(Collectors.toList()); - return RequestPartitionId.fromPartitionIds(newPartitionIds); + boolean newAllPartitions = isAllPartitions() || theOther.isAllPartitions(); + if (newAllPartitions) { + return RequestPartitionId.allPartitionsWithPartitionIds(newPartitionIds); + } else { + return RequestPartitionId.fromPartitionIds(newPartitionIds); + } } public static RequestPartitionId fromJson(String theJson) throws JsonProcessingException { @@ -260,7 +273,7 @@ public boolean isDefaultPartition() { * thePartitionId. */ public boolean isPartition(@Nullable Integer thePartitionId) { - if (isAllPartitions()) { + if (isAllPartitions() && !hasPartitionIds()) { return false; } return hasPartitionIds() @@ -348,6 +361,26 @@ public static RequestPartitionId allPartitions() { return ALL_PARTITIONS; } + /** + * Creates a new RequestPartitionId which indicates "all partitions" and explicitly lists them + * + * @since 8.8.0 + */ + @Nonnull + public static RequestPartitionId allPartitionsWithPartitionIds(Integer... thePartitionIds) { + return allPartitionsWithPartitionIds(toListOrNull(thePartitionIds)); + } + + /** + * Creates a new RequestPartitionId which indicates "all partitions" and explicitly lists them + * + * @since 8.8.0 + */ + @Nonnull + public static RequestPartitionId allPartitionsWithPartitionIds(List thePartitionIds) { + return new RequestPartitionId(null, thePartitionIds, null, true); + } + /** * @deprecated use {@link RequestPartitionId#defaultPartition(IDefaultPartitionSettings)} instead */ @@ -443,18 +476,32 @@ public static RequestPartitionId forPartitionIdsAndNames( return new RequestPartitionId(thePartitionNames, thePartitionIds, thePartitionDate); } + @Nonnull + public static RequestPartitionId forPartitionIdsAndNames( + List thePartitionNames, + List thePartitionIds, + LocalDate thePartitionDate, + boolean theAllPartitions) { + return new RequestPartitionId(thePartitionNames, thePartitionIds, thePartitionDate, theAllPartitions); + } + /** * Create a string representation suitable for use as a cache key. Null aware. *

* Returns the partition IDs (numeric) as a joined string with a space between, using the string "null" for any null values */ public static String stringifyForKey(@Nonnull RequestPartitionId theRequestPartitionId) { - String retVal = "(all)"; - if (!theRequestPartitionId.isAllPartitions()) { + String retVal; + if (theRequestPartitionId.hasPartitionIds()) { assert theRequestPartitionId.hasPartitionIds(); retVal = theRequestPartitionId.getPartitionIds().stream() - .map(t -> defaultIfNull(t, "null").toString()) + .map(t -> getIfNull(t, "null").toString()) .collect(Collectors.joining(" ")); + if (theRequestPartitionId.isAllPartitions()) { + retVal = "(all) " + retVal; + } + } else { + retVal = "(all)"; } return retVal; } diff --git a/hapi-fhir-base/src/test/java/ca/uhn/fhir/interceptor/model/RequestPartitionIdTest.java b/hapi-fhir-base/src/test/java/ca/uhn/fhir/interceptor/model/RequestPartitionIdTest.java index 429bf795e0c4..8f5b259e6dc5 100644 --- a/hapi-fhir-base/src/test/java/ca/uhn/fhir/interceptor/model/RequestPartitionIdTest.java +++ b/hapi-fhir-base/src/test/java/ca/uhn/fhir/interceptor/model/RequestPartitionIdTest.java @@ -9,6 +9,8 @@ import org.slf4j.LoggerFactory; import java.time.LocalDate; +import java.util.List; +import java.util.stream.Stream; import static ca.uhn.fhir.interceptor.model.RequestPartitionId.allPartitions; import static ca.uhn.fhir.interceptor.model.RequestPartitionId.defaultPartition; @@ -118,6 +120,24 @@ public void testMergeIds_IncludesDefault() { } + @ParameterizedTest + @MethodSource("testStringifyForKeyTestCases") + public void testStringifyForKey(RequestPartitionId theRequestPartitionId, String theExpectedString) { + String actual = RequestPartitionId.stringifyForKey(theRequestPartitionId); + assertEquals(theExpectedString, actual); + } + + + static Stream testStringifyForKeyTestCases() { + return List.of( + new Object[]{RequestPartitionId.allPartitions(), "(all)"}, + new Object[]{RequestPartitionId.defaultPartition(), "null"}, + new Object[]{RequestPartitionId.fromPartitionIds(1, 2, 3), "1 2 3"}, + new Object[]{RequestPartitionId.fromPartitionIds(null, 2, 3), "null 2 3"}, + new Object[]{RequestPartitionId.allPartitionsWithPartitionIds(1, 2, 3), "(all) 1 2 3"} + ).stream(); + } + record ContainsTestCase(String description, RequestPartitionId left, RequestPartitionId right, Comparison comparison) { enum Comparison { LEFT_CONTAINS_RIGHT, diff --git a/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/8_8_0/7406-add-partition-param-to-bulk-patch.yaml b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/8_8_0/7406-add-partition-param-to-bulk-patch.yaml new file mode 100644 index 000000000000..7fde459bf665 --- /dev/null +++ b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/8_8_0/7406-add-partition-param-to-bulk-patch.yaml @@ -0,0 +1,8 @@ +--- +type: add +issue: 7406 +title: "A new parameter has been added to the `$hapi.fhir.bulk-patch` and + `$hapi.fhir.bulk-patch-rewrite-history` operations which can be used to + explicitly specify the partition(s) to use when applying these + operations. This change also generally improves support for using these + jobs in a partitioned environment." diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/Batch2DaoSvcImpl.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/Batch2DaoSvcImpl.java index 0419428f5ab4..4da9c4efe444 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/Batch2DaoSvcImpl.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/Batch2DaoSvcImpl.java @@ -171,12 +171,13 @@ private Stream searchForResourceIdsAndType( ISearchBuilder builder = mySearchBuilderFactory.newSearchBuilder(null, null); return myTransactionService .withRequest(theRequestDetails) - .search(() -> builder.createQueryStream( + .withRequestPartitionId(theRequestPartitionId) + .search(partition -> builder.createQueryStream( theSearchParams, new SearchRuntimeDetails( theRequestDetails, UUID.randomUUID().toString()), theRequestDetails, - theRequestPartitionId)) + partition)) .map(pid -> new TypedResourcePid(pid.getResourceType(), pid)); } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/cache/ResourceVersionSvcDaoImpl.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/cache/ResourceVersionSvcDaoImpl.java index 4dc197f124c5..c184d94a80a6 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/cache/ResourceVersionSvcDaoImpl.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/cache/ResourceVersionSvcDaoImpl.java @@ -27,6 +27,7 @@ import ca.uhn.fhir.jpa.dao.data.IResourceTableDao; import ca.uhn.fhir.jpa.model.cross.IResourceLookup; import ca.uhn.fhir.jpa.model.dao.JpaPid; +import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc; import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; import ca.uhn.fhir.rest.api.server.SystemRequestDetails; import jakarta.annotation.Nonnull; @@ -60,6 +61,9 @@ public class ResourceVersionSvcDaoImpl implements IResourceVersionSvc { @Autowired IIdHelperService myIdHelperService; + @Autowired + IRequestPartitionHelperSvc myRequestPartitionHelperSvc; + @Override @Nonnull public ResourceVersionMap getVersionMap( @@ -76,6 +80,13 @@ public ResourceVersionMap getVersionMap( return ResourceVersionMap.fromIdsWithVersions(fhirIds); } + @Override + public ResourceVersionMap getVersionMap(String theResourceName, SearchParameterMap theSearchParamMap) { + RequestPartitionId requestPartition = myRequestPartitionHelperSvc.determineReadPartitionForRequestForSearchType( + null, theResourceName, theSearchParamMap); + return getVersionMap(requestPartition, theResourceName, theSearchParamMap); + } + /** * Retrieves the latest versions for any resourceid that are found. * If they are not found, they will not be contained in the returned map. diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/BaseHapiFhirResourceDao.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/BaseHapiFhirResourceDao.java index 5a908af2171a..d1f308c158d7 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/BaseHapiFhirResourceDao.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/BaseHapiFhirResourceDao.java @@ -934,23 +934,32 @@ public DeleteMethodOutcome deleteByUrl( @Nonnull TransactionDetails theTransactionDetails) { validateDeleteEnabled(); + ResourceSearch resourceSearch = myMatchUrlService.getResourceSearch(theUrl); + SearchParameterMap paramMap = resourceSearch.getSearchParameterMap(); + + RequestPartitionId requestPartitionId = + myRequestPartitionHelperService.determineReadPartitionForRequestForSearchType( + theRequestDetails, myResourceName, paramMap); + return myTransactionService .withRequest(theRequestDetails) + .withRequestPartitionId(requestPartitionId) .withTransactionDetails(theTransactionDetails) - .execute(tx -> doDeleteByUrl(theUrl, deleteConflicts, theTransactionDetails, theRequestDetails)); + .execute(tx -> + doDeleteByUrl(theUrl, paramMap, deleteConflicts, theTransactionDetails, theRequestDetails)); } @Nonnull private DeleteMethodOutcome doDeleteByUrl( String theUrl, + SearchParameterMap theParamMap, DeleteConflictList deleteConflicts, TransactionDetails theTransactionDetails, RequestDetails theRequestDetails) { - ResourceSearch resourceSearch = myMatchUrlService.getResourceSearch(theUrl); - SearchParameterMap paramMap = resourceSearch.getSearchParameterMap(); - paramMap.setLoadSynchronous(true); - Set resourceIds = myMatchResourceUrlService.search(paramMap, myResourceType, theRequestDetails, null); + theParamMap.setLoadSynchronous(true); + Set resourceIds = + myMatchResourceUrlService.search(theParamMap, myResourceType, theRequestDetails, null); if (resourceIds.size() > 1) { if (!getStorageSettings().isAllowMultipleDelete()) { @@ -1663,7 +1672,7 @@ public T read(IIdType theId, RequestDetails theRequest, boolean theDeletedOk) { .withRequest(theRequest) .withTransactionDetails(transactionDetails) .withRequestPartitionId(requestPartitionId) - .read(() -> doReadInTransaction(theId, theRequest, theDeletedOk, requestPartitionId)); + .read(partition -> doReadInTransaction(theId, theRequest, theDeletedOk, partition)); } private T doReadInTransaction( @@ -2278,7 +2287,7 @@ public List searchForIds( .withRequest(theRequest) .withTransactionDetails(transactionDetails) .withRequestPartitionId(requestPartitionId) - .searchList(() -> { + .searchList(partition -> { if (isNull(theParams.getLoadSynchronousUpTo())) { theParams.setLoadSynchronousUpTo(myStorageSettings.getInternalSynchronousSearchSize()); } @@ -2292,7 +2301,7 @@ public List searchForIds( SearchRuntimeDetails searchRuntimeDetails = new SearchRuntimeDetails(theRequest, uuid); try (IResultIterator iter = - builder.createQuery(theParams, searchRuntimeDetails, theRequest, requestPartitionId)) { + builder.createQuery(theParams, searchRuntimeDetails, theRequest, partition)) { while (iter.hasNext()) { ids.add(iter.next()); } @@ -2325,8 +2334,8 @@ public > Stream searchForIdStream( //noinspection unchecked return (Stream) myTransactionService .withRequest(theRequest) - .search(() -> - builder.createQueryStream(theParams, searchRuntimeDetails, theRequest, requestPartitionId)); + .withRequestPartitionId(requestPartitionId) + .search(partition -> builder.createQueryStream(theParams, searchRuntimeDetails, theRequest, partition)); } @Override @@ -2353,13 +2362,14 @@ private List searchForTransformedIds( return myTransactionService .withRequest(theRequest) .withPropagation(Propagation.REQUIRED) - .searchList(() -> { + .withRequestPartitionId(requestPartitionId) + .searchList(partition -> { ISearchBuilder builder = mySearchBuilderFactory.newSearchBuilder(getResourceName(), getResourceType()); Stream pidStream = - builder.createQueryStream(theParams, searchRuntimeDetails, theRequest, requestPartitionId); + builder.createQueryStream(theParams, searchRuntimeDetails, theRequest, partition); - Stream transformedStream = transform.apply(theRequest, pidStream, requestPartitionId); + Stream transformedStream = transform.apply(theRequest, pidStream, partition); return transformedStream.collect(Collectors.toList()); }); diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/partition/RequestPartitionHelperSvc.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/partition/RequestPartitionHelperSvc.java index 71a584cc64b2..01cdef271b48 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/partition/RequestPartitionHelperSvc.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/partition/RequestPartitionHelperSvc.java @@ -42,15 +42,16 @@ public RequestPartitionHelperSvc() {} public RequestPartitionId validateAndNormalizePartitionIds(RequestPartitionId theRequestPartitionId) { List names = null; List partitionIds = null; - for (int i = 0; i < theRequestPartitionId.getPartitionIds().size(); i++) { + List originalPartitionIds = theRequestPartitionId.getPartitionIds(); + for (int i = 0; i < originalPartitionIds.size(); i++) { PartitionEntity partition; - Integer id = theRequestPartitionId.getPartitionIds().get(i); + Integer id = originalPartitionIds.get(i); if (id == null) { partition = null; if (myPartitionSettings.getDefaultPartitionId() != null) { if (partitionIds == null) { - partitionIds = new ArrayList<>(theRequestPartitionId.getPartitionIds()); + partitionIds = new ArrayList<>(originalPartitionIds); } partitionIds.set(i, myPartitionSettings.getDefaultPartitionId()); } @@ -63,7 +64,7 @@ public RequestPartitionId validateAndNormalizePartitionIds(RequestPartitionId th .getMessage( BaseRequestPartitionHelperSvc.class, "unknownPartitionId", - theRequestPartitionId.getPartitionIds().get(i)); + originalPartitionIds.get(i)); throw new ResourceNotFoundException(Msg.code(1316) + msg); } } @@ -71,7 +72,7 @@ public RequestPartitionId validateAndNormalizePartitionIds(RequestPartitionId th if (theRequestPartitionId.hasPartitionNames()) { if (partition == null) { Validate.isTrue( - theRequestPartitionId.getPartitionIds().get(i) == null, + originalPartitionIds.get(i) == null, "Partition %s must not have an ID", JpaConstants.DEFAULT_PARTITION_NAME); } else { @@ -80,7 +81,7 @@ public RequestPartitionId validateAndNormalizePartitionIds(RequestPartitionId th theRequestPartitionId.getPartitionNames().get(i), partition.getName()), "Partition name %s does not match ID %s", theRequestPartitionId.getPartitionNames().get(i), - theRequestPartitionId.getPartitionIds().get(i)); + originalPartitionIds.get(i)); } } else { if (names == null) { @@ -95,12 +96,15 @@ public RequestPartitionId validateAndNormalizePartitionIds(RequestPartitionId th } if (names != null) { - List partitionIdsToUse = theRequestPartitionId.getPartitionIds(); + List partitionIdsToUse = originalPartitionIds; if (partitionIds != null) { partitionIdsToUse = partitionIds; } return RequestPartitionId.forPartitionIdsAndNames( - names, partitionIdsToUse, theRequestPartitionId.getPartitionDate()); + names, + partitionIdsToUse, + theRequestPartitionId.getPartitionDate(), + theRequestPartitionId.isAllPartitions()); } return theRequestPartitionId; diff --git a/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/config/PartitionSettings.java b/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/config/PartitionSettings.java index b94496575058..91a94111c53f 100644 --- a/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/config/PartitionSettings.java +++ b/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/config/PartitionSettings.java @@ -138,7 +138,7 @@ public PartitionSettings setPartitioningEnabled(boolean thePartitioningEnabled) } /** - * Should resources references be permitted to cross partition boundaries. Default is {@link CrossPartitionReferenceMode#NOT_ALLOWED}. + * Should resource references be permitted to cross partition boundaries? Default is {@link CrossPartitionReferenceMode#NOT_ALLOWED}. * * @since 5.0.0 */ diff --git a/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/util/JpaConstants.java b/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/util/JpaConstants.java index 4ed96e321c04..86d51244a121 100644 --- a/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/util/JpaConstants.java +++ b/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/util/JpaConstants.java @@ -348,6 +348,7 @@ public class JpaConstants { public static final String OPERATION_BULK_PATCH_PARAM_DRY_RUN_MODE_COLLECT_CHANGES = "collectChanges"; public static final String OPERATION_BULK_PATCH_PARAM_LIMIT_RESOURCE_COUNT = "limitResourceCount"; public static final String OPERATION_BULK_PATCH_PARAM_LIMIT_RESOURCE_VERSION_COUNT = "limitResourceVersionCount"; + public static final String OPERATION_BULK_PATCH_PARAM_PARTITION_ID = "partitionId"; public static final String OPERATION_BULK_PATCH_STATUS = "$hapi.fhir.bulk-patch-status"; public static final String OPERATION_BULK_PATCH_STATUS_PARAM_JOB_ID = "_jobId"; public static final String OPERATION_BULK_PATCH_PARAM_URL = "url"; diff --git a/hapi-fhir-jpaserver-searchparam/src/main/java/ca/uhn/fhir/rest/server/interceptor/partition/RequestTenantPartitionInterceptor.java b/hapi-fhir-jpaserver-searchparam/src/main/java/ca/uhn/fhir/rest/server/interceptor/partition/RequestTenantPartitionInterceptor.java index 23c9b8ad9b92..02fb44b9a49c 100644 --- a/hapi-fhir-jpaserver-searchparam/src/main/java/ca/uhn/fhir/rest/server/interceptor/partition/RequestTenantPartitionInterceptor.java +++ b/hapi-fhir-jpaserver-searchparam/src/main/java/ca/uhn/fhir/rest/server/interceptor/partition/RequestTenantPartitionInterceptor.java @@ -106,7 +106,7 @@ protected RequestPartitionId extractPartitionIdFromRequest(RequestDetails theReq if (requestDetails.getRequestPartitionId() != null) { return requestDetails.getRequestPartitionId(); } - return RequestPartitionId.defaultPartition(myPartitionSettings); + return RequestPartitionId.allPartitions(); } throw new InternalErrorException(Msg.code(343) + "No partition ID has been specified"); } diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/BaseHapiFhirResourceDaoTest.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/BaseHapiFhirResourceDaoTest.java index 4c0715e2d538..615240f31d2e 100644 --- a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/BaseHapiFhirResourceDaoTest.java +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/BaseHapiFhirResourceDaoTest.java @@ -59,6 +59,7 @@ import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.Mockito; +import org.mockito.Spy; import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.stubbing.Answer; import org.springframework.context.ApplicationContext; @@ -139,8 +140,7 @@ class BaseHapiFhirResourceDaoTest { @Mock private MatchResourceUrlService myMatchResourceUrlService; - @Mock - private HapiTransactionService myTransactionService; + private HapiTransactionService myTransactionService = new MockHapiTransactionService(); @Mock private DeleteConflictService myDeleteConflictService; @@ -174,6 +174,7 @@ public void init() { // by calling setup themselves mySvc.setResourceType(Patient.class); mySvc.setContext(myFhirContext); + mySvc.setTransactionService(myTransactionService); mySvc.start(); mySpiedSvc = spy(mySvc); } @@ -501,17 +502,6 @@ void beforeEach() { when(myStorageSettings.isDeleteEnabled()).thenReturn(true); when(myMatchUrlService.getResourceSearch(URL)) .thenReturn(new ResourceSearch(mock(RuntimeResourceDefinition.class), SearchParameterMap.newSynchronous(), RequestPartitionId.allPartitions())); - - // mocks for transaction handling: - final IHapiTransactionService.IExecutionBuilder mockExecutionBuilder = mock(IHapiTransactionService.IExecutionBuilder.class); - when(mockExecutionBuilder.withTransactionDetails(any(TransactionDetails.class))).thenReturn(mockExecutionBuilder); - when(myTransactionService.withRequest(REQUEST)).thenReturn(mockExecutionBuilder); - final Answer answer = theInvocationOnMock -> { - final TransactionCallback arg = theInvocationOnMock.getArgument(0); - return arg.doInTransaction(mock(TransactionStatus.class)); - }; - when(mockExecutionBuilder.execute(ArgumentMatchers.>any())) - .thenAnswer(answer); } @ParameterizedTest diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/r4/BasePartitioningR4Test.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/r4/BasePartitioningR4Test.java index 08d97fdbd25e..75336fba1a97 100644 --- a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/r4/BasePartitioningR4Test.java +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/r4/BasePartitioningR4Test.java @@ -73,8 +73,9 @@ protected void after() { myPartitionSettings.setPartitioningEnabled(defaultPartitionSettings.isPartitioningEnabled()); myPartitionSettings.setAllowReferencesAcrossPartitions(defaultPartitionSettings.getAllowReferencesAcrossPartitions()); myPartitionSettings.setDefaultPartitionId(defaultPartitionSettings.getDefaultPartitionId()); + myPartitionSettings.setUnnamedPartitionMode(defaultPartitionSettings.isUnnamedPartitionMode()); - mySrdInterceptorService.unregisterInterceptorsIf(t -> t instanceof MyReadWriteInterceptor); + unregisterPartitionInterceptor(); myStorageSettings.setIndexMissingFields(defaultStorageSettings.getIndexMissingFields()); myStorageSettings.setAutoCreatePlaceholderReferenceTargets(defaultStorageSettings.isAutoCreatePlaceholderReferenceTargets()); @@ -86,6 +87,10 @@ protected void after() { } } + protected void unregisterPartitionInterceptor() { + mySrdInterceptorService.unregisterInterceptorsIf(t -> t instanceof MyReadWriteInterceptor); + } + protected void assertNoRemainingPartitionIds() { myPartitionInterceptor.assertNoRemainingIds(); } diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4CreateTest.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4CreateTest.java index a64babd3a226..6be8a7f6dabe 100644 --- a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4CreateTest.java +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4CreateTest.java @@ -81,6 +81,7 @@ import java.util.Locale; import java.util.Optional; import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -1405,7 +1406,7 @@ public void testConditionalCreateDependsOnFirstEntryExisting(boolean theHasQuest @ParameterizedTest @ValueSource(booleans = {true, false}) - void conditionalCreateSameIdentifierCrossPartition(boolean theIsSearchUrlDuplicateAcrossPartitionsEnabled) { + void conditionalCreateSameIdentifierCrossPartition(boolean theIsSearchUrlDuplicateAcrossPartitionsEnabled) throws Exception { myPartitionSettings.setPartitioningEnabled(true); myPartitionSettings.setConditionalCreateDuplicateIdentifiersEnabled(theIsSearchUrlDuplicateAcrossPartitionsEnabled); @@ -1419,26 +1420,30 @@ void conditionalCreateSameIdentifierCrossPartition(boolean theIsSearchUrlDuplica partitionEntity2.setName("Partition-B"); myPartitionDao.save(partitionEntity2); - final BundleBuilder bundleBuilder = new BundleBuilder(myFhirContext); - final String matchUrl = "identifier=http://tempuri.org|1"; - bundleBuilder.addTransactionCreateEntry(myTask1, "urn:uuid:59cda086-4763-4ef0-8e36-8c90058686ea") - .conditional(matchUrl); + Callable bundleSupplier = () -> { + final BundleBuilder bundleBuilder = new BundleBuilder(myFhirContext); + final String matchUrl = "identifier=http://tempuri.org|1"; + Task task = myFhirContext.newTerser().clone(myTask1); + bundleBuilder.addTransactionCreateEntry(task, "urn:uuid:59cda086-4763-4ef0-8e36-8c90058686ea") + .conditional(matchUrl); + return bundleBuilder.getBundleTyped(); + }; final RequestPartitionId requestPartitionId1 = RequestPartitionId.fromPartitionId(1, LocalDate.now()); final RequestPartitionId requestPartitionId2 = RequestPartitionId.fromPartitionId(2, LocalDate.now()); - final List responseEntries1 = sendBundleAndGetResponse(bundleBuilder.getBundle(), requestPartitionId1); + final List responseEntries1 = sendBundleAndGetResponse(bundleSupplier.call(), requestPartitionId1); assertEquals(1, responseEntries1.size()); final Bundle.BundleEntryComponent bundleEntry1 = responseEntries1.get(0); assertEquals("201 Created", bundleEntry1.getResponse().getStatus()); if (!theIsSearchUrlDuplicateAcrossPartitionsEnabled) { - final IBaseBundle bundle = bundleBuilder.getBundle(); + final IBaseBundle bundle = bundleSupplier.call(); assertThatThrownBy(() -> sendBundleAndGetResponse(bundle, requestPartitionId2)).isInstanceOf(ResourceVersionConflictException.class); return; } - final List responseEntries2 = sendBundleAndGetResponse(bundleBuilder.getBundle(), requestPartitionId2); + final List responseEntries2 = sendBundleAndGetResponse(bundleSupplier.call(), requestPartitionId2); assertEquals(1, responseEntries2.size()); final Bundle.BundleEntryComponent bundleEntry2 = responseEntries1.get(0); assertEquals("201 Created", bundleEntry2.getResponse().getStatus()); diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/r4/PartitioningSqlR4Test.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/r4/PartitioningSqlR4Test.java index 9f32f9c2965f..6d432e44a54c 100644 --- a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/r4/PartitioningSqlR4Test.java +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/r4/PartitioningSqlR4Test.java @@ -8,8 +8,10 @@ import ca.uhn.fhir.batch2.model.JobInstanceStartRequest; import ca.uhn.fhir.context.RuntimeResourceDefinition; import ca.uhn.fhir.i18n.Msg; +import ca.uhn.fhir.interceptor.api.Hook; import ca.uhn.fhir.interceptor.api.HookParams; import ca.uhn.fhir.interceptor.api.IAnonymousInterceptor; +import ca.uhn.fhir.interceptor.api.Interceptor; import ca.uhn.fhir.interceptor.api.Pointcut; import ca.uhn.fhir.interceptor.model.RequestPartitionId; import ca.uhn.fhir.jpa.api.config.JpaStorageSettings; @@ -1798,6 +1800,46 @@ public void testSearch_NoParams_SearchMultiplePartitionsByName_WithDefault() { assertThat(sql).as(sql).contains("PARTITION_ID IS NULL"); } + + @Test + public void testSearch_Partitioned_AllPartitionsWithPartitionIds() { + // Setup + myPartitionSettings.setUnnamedPartitionMode(true); + + // Use a fixed interceptor that picks multiple partitions for read + @Interceptor + class PartitionInterceptor { + @Hook(Pointcut.STORAGE_PARTITION_IDENTIFY_READ) + public RequestPartitionId readPartition() { + return RequestPartitionId.allPartitionsWithPartitionIds(1, 2, 3); + } + @Hook(Pointcut.STORAGE_PARTITION_IDENTIFY_CREATE) + public RequestPartitionId createPartition() { + return RequestPartitionId.fromPartitionIds(2); + } + } + unregisterPartitionInterceptor(); + registerInterceptor(new PartitionInterceptor()); + + createPatient(withId("A"), withActiveTrue()); + + // Test + myCaptureQueriesListener.clear(); + SearchParameterMap map = new SearchParameterMap(); + map.setLoadSynchronous(true); + map.add(Patient.SP_ACTIVE, new TokenParam().setValue("true")); + IBundleProvider result = myPatientDao.search(map, newSrd()); + + // Verify + assertThat(toUnqualifiedVersionlessIdValues(result)).containsExactly("Patient/A"); + + myCaptureQueriesListener.logSelectQueriesForCurrentThread(); + String searchSql = myCaptureQueriesListener.getSelectQueries().get(0).getSql(true, true); + assertThat(searchSql).doesNotContainIgnoringCase("PARTITION_ID IN"); + assertThat(searchSql).doesNotContainIgnoringCase("PARTITION_ID ="); + } + + @Test public void testSearch_DateParam_SearchAllPartitions() { myPartitionSettings.setIncludePartitionInSearchHashes(false); diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/partition/PartitionedSubscriptionTriggeringR4Test.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/partition/PartitionedSubscriptionTriggeringR4Test.java index b0324e66ea5d..fe1ed1bd3d0e 100644 --- a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/partition/PartitionedSubscriptionTriggeringR4Test.java +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/partition/PartitionedSubscriptionTriggeringR4Test.java @@ -91,8 +91,7 @@ public void beforeEach() throws ServletException { myPartitionInterceptor = new MyReadWriteInterceptor(); myPartitionInterceptor.setRequestPartitionId(REQ_PART_1); - - mySrdInterceptorService.registerInterceptor(myPartitionInterceptor); + registerInterceptor(myPartitionInterceptor); myPartitionConfigSvc.createPartition(new PartitionEntity().setId(1).setName(PARTITION_1), null); myPartitionConfigSvc.createPartition(new PartitionEntity().setId(2).setName(PARTITION_2), null); @@ -116,7 +115,6 @@ public void afterUnregisterRestHookListener() { myStorageSettings.setExpungeEnabled(new JpaStorageSettings().isExpungeEnabled()); myStorageSettings.setAllowMultipleDelete(new JpaStorageSettings().isAllowMultipleDelete()); - mySrdInterceptorService.unregisterInterceptorsIf(t -> t instanceof BasePartitioningR4Test.MyReadWriteInterceptor); await().until(() -> { mySubscriptionTriggeringSvc.runDeliveryPass(); return ((SubscriptionTriggeringSvcImpl) mySubscriptionTriggeringSvc).getActiveJobCount() == 0; @@ -133,8 +131,6 @@ public void testCreateSubscriptionInPartition() throws Exception { String criteria1 = "Observation?code=SNOMED-CT|" + code + "&_format=xml"; Subscription subscription = newSubscription(criteria1, payload); - assertThat(mySrdInterceptorService.getAllRegisteredInterceptors()).hasSize(1); - myDaoRegistry.getResourceDao("Subscription").create(subscription, mySrd); waitForActivatedSubscriptionCount(1); @@ -160,8 +156,6 @@ public void testCreateSubscriptionInPartitionAndResourceInDifferentPartition(boo Subscription subscription = newSubscription(criteria1, payload); subscription.addExtension(HapiExtensions.EXTENSION_SUBSCRIPTION_CROSS_PARTITION, new org.hl7.fhir.r4.model.BooleanType().setValue(true)); - assertThat(mySrdInterceptorService.getAllRegisteredInterceptors()).hasSize(1); - myDaoRegistry.getResourceDao("Subscription").create(subscription, new SystemRequestDetails().setRequestPartitionId(myPartitionSettings.getDefaultRequestPartitionId())); @@ -315,8 +309,6 @@ public void testManualTriggeredSubscriptionInPartition() throws Exception { // Create the subscription now DaoMethodOutcome subscriptionOutcome = myDaoRegistry.getResourceDao("Subscription").create(newSubscription(criteria1, payload), mySrd); - assertThat(mySrdInterceptorService.getAllRegisteredInterceptors()).hasSize(1); - Subscription subscription = (Subscription) subscriptionOutcome.getResource(); waitForActivatedSubscriptionCount(1); diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/subscription/BaseSubscriptionsR4Test.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/subscription/BaseSubscriptionsR4Test.java index 8fe147cffd7d..528ee085810d 100644 --- a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/subscription/BaseSubscriptionsR4Test.java +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/subscription/BaseSubscriptionsR4Test.java @@ -6,6 +6,8 @@ import ca.uhn.fhir.jpa.api.config.JpaStorageSettings; import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; import ca.uhn.fhir.jpa.api.model.DaoMethodOutcome; +import ca.uhn.fhir.jpa.cache.IResourceChangeListenerCache; +import ca.uhn.fhir.jpa.cache.ResourceChangeListenerCache; import ca.uhn.fhir.jpa.dao.data.IResourceModifiedDao; import ca.uhn.fhir.jpa.provider.BaseResourceProviderR4Test; import ca.uhn.fhir.jpa.subscription.channel.impl.LinkedBlockingChannel; @@ -181,7 +183,6 @@ protected Subscription newSubscription(String theCriteria, String thePayload) { return subscription; } - protected void waitForQueueToDrain() throws InterruptedException { mySubscriptionTestUtil.waitForQueueToDrain(); } diff --git a/hapi-fhir-jpaserver-test-r5/src/test/java/ca/uhn/fhir/jpa/dao/r5/BaseJpaR5Test.java b/hapi-fhir-jpaserver-test-r5/src/test/java/ca/uhn/fhir/jpa/dao/r5/BaseJpaR5Test.java index 53539137fb13..96a56104e1f0 100644 --- a/hapi-fhir-jpaserver-test-r5/src/test/java/ca/uhn/fhir/jpa/dao/r5/BaseJpaR5Test.java +++ b/hapi-fhir-jpaserver-test-r5/src/test/java/ca/uhn/fhir/jpa/dao/r5/BaseJpaR5Test.java @@ -57,6 +57,7 @@ import ca.uhn.fhir.jpa.search.reindex.IInstanceReindexService; import ca.uhn.fhir.jpa.search.reindex.IResourceReindexingSvc; import ca.uhn.fhir.jpa.search.warm.ICacheWarmingSvc; +import ca.uhn.fhir.jpa.searchparam.extractor.ISearchParamExtractor; import ca.uhn.fhir.jpa.searchparam.registry.SearchParamRegistryImpl; import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry; import ca.uhn.fhir.jpa.term.TermDeferredStorageSvcImpl; @@ -177,6 +178,8 @@ public abstract class BaseJpaR5Test extends BaseJpaTest implements ITestDataBuil @Autowired protected ISearchParamPresentDao mySearchParamPresentDao; @Autowired + protected ISearchParamExtractor mySearchParamExtractor; + @Autowired protected IResourceIndexedSearchParamStringDao myResourceIndexedSearchParamStringDao; @Autowired protected IResourceIndexedSearchParamTokenDao myResourceIndexedSearchParamTokenDao; diff --git a/hapi-fhir-jpaserver-test-r5/src/test/java/ca/uhn/fhir/jpa/dao/r5/bulkpatch/BulkPatchPartitionedJobR5Test.java b/hapi-fhir-jpaserver-test-r5/src/test/java/ca/uhn/fhir/jpa/dao/r5/bulkpatch/BulkPatchPartitionedJobR5Test.java new file mode 100644 index 000000000000..5f48b6eeeff5 --- /dev/null +++ b/hapi-fhir-jpaserver-test-r5/src/test/java/ca/uhn/fhir/jpa/dao/r5/bulkpatch/BulkPatchPartitionedJobR5Test.java @@ -0,0 +1,154 @@ +package ca.uhn.fhir.jpa.dao.r5.bulkpatch; + +import ca.uhn.fhir.batch2.jobs.bulkmodify.patch.BulkPatchProvider; +import ca.uhn.fhir.context.FhirContext; +import ca.uhn.fhir.jpa.dao.data.IBatch2WorkChunkRepository; +import ca.uhn.fhir.jpa.interceptor.PatientIdPartitionInterceptor; +import ca.uhn.fhir.jpa.model.config.PartitionSettings; +import ca.uhn.fhir.jpa.model.util.JpaConstants; +import ca.uhn.fhir.rest.api.Constants; +import ca.uhn.fhir.rest.api.MethodOutcome; +import ca.uhn.fhir.rest.client.api.IGenericClient; +import ca.uhn.fhir.rest.server.provider.ProviderConstants; +import ca.uhn.fhir.test.utilities.server.RestfulServerExtension; +import ca.uhn.fhir.util.FhirPatchBuilder; +import org.hl7.fhir.r5.model.Identifier; +import org.hl7.fhir.r5.model.Parameters; +import org.hl7.fhir.r5.model.StringType; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.springframework.beans.factory.annotation.Autowired; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + +public class BulkPatchPartitionedJobR5Test extends BaseBulkPatchR5Test { + + @Autowired + private IBatch2WorkChunkRepository myBatch2WorkChunkRepository; + + @Autowired + private BulkPatchProvider myBulkPatchProvider = new BulkPatchProvider(); + + @RegisterExtension + private RestfulServerExtension myRestfulServerExtension = new RestfulServerExtension(FhirContext.forR5Cached()) + .withServer(server -> { + assert myBulkPatchProvider != null; + server.registerProvider(myBulkPatchProvider); + }); + + private IGenericClient myFhirClient; + + @BeforeEach + @Override + public void before() throws Exception { + super.before(); + + myPartitionSettings.setPartitioningEnabled(true); + myPartitionSettings.setAllowReferencesAcrossPartitions(PartitionSettings.CrossPartitionReferenceMode.ALLOWED_UNQUALIFIED); + myPartitionSettings.setDefaultPartitionId(0); + myPartitionSettings.setUnnamedPartitionMode(true); + + registerInterceptor(new PatientIdPartitionInterceptor(getFhirContext(), mySearchParamExtractor, myPartitionSettings)); + + myFhirClient = myRestfulServerExtension.getFhirClient(); + } + + @Test + void testPartitionedBulkPatch_SpecificPartitionsSpecified() { + createPatient(withId("A"), withActiveTrue()); + createPatient(withId("B"), withActiveTrue()); + + int partitionAId = PatientIdPartitionInterceptor.defaultPartitionAlgorithm("A"); + int partitionCId = PatientIdPartitionInterceptor.defaultPartitionAlgorithm("C"); + + Parameters patch = createPatchToAddIdentifierToPatient(); + + Parameters request = new Parameters(); + request.addParameter() + .setName(JpaConstants.OPERATION_BULK_PATCH_PARAM_PATCH) + .setResource(patch); + request.addParameter() + .setName(JpaConstants.OPERATION_BULK_PATCH_PARAM_URL) + .setValue(new StringType("Patient?")); + request.addParameter() + .setName(JpaConstants.OPERATION_BULK_PATCH_PARAM_PARTITION_ID) + .setValue(new StringType(Integer.toString(partitionAId))); + request.addParameter() + .setName(JpaConstants.OPERATION_BULK_PATCH_PARAM_PARTITION_ID) + .setValue(new StringType(Integer.toString(partitionCId))); + + MethodOutcome response = myFhirClient + .operation() + .onServer() + .named(JpaConstants.OPERATION_BULK_PATCH) + .withParameters(request) + .withAdditionalHeader(Constants.HEADER_PREFER, Constants.HEADER_PREFER_RESPOND_ASYNC) + .returnMethodOutcome() + .execute(); + assertEquals(Constants.STATUS_HTTP_202_ACCEPTED, response.getResponseStatusCode()); + String pollLocation = response.getResponseHeaders().get(Constants.HEADER_CONTENT_LOCATION).get(0); + String prefix = myRestfulServerExtension.getBaseUrl() + "/$hapi.fhir.bulk-patch-status?_jobId="; + assertThat(pollLocation).startsWith(prefix); + String jobId = pollLocation.substring(prefix.length()); + + myBatch2JobHelper.awaitJobCompletion(jobId); + + // Verify + // Since we patched partitions for resource A and C, resource B should not have been patched. + assertEquals("123", readPatient("Patient/A").getIdentifierFirstRep().getValue()); + assertNull(readPatient("Patient/B").getIdentifierFirstRep().getValue()); + } + + @Test + void testPartitionedBulkPatch_AllPartitionsSpecified() { + createPatient(withId("A"), withActiveTrue()); + createPatient(withId("B"), withActiveTrue()); + + Parameters patch = createPatchToAddIdentifierToPatient(); + + Parameters request = new Parameters(); + request.addParameter() + .setName(JpaConstants.OPERATION_BULK_PATCH_PARAM_PATCH) + .setResource(patch); + request.addParameter() + .setName(JpaConstants.OPERATION_BULK_PATCH_PARAM_URL) + .setValue(new StringType("Patient?")); + request.addParameter() + .setName(JpaConstants.OPERATION_BULK_PATCH_PARAM_PARTITION_ID) + .setValue(new StringType(ProviderConstants.ALL_PARTITIONS_TENANT_NAME)); + + MethodOutcome response = myFhirClient + .operation() + .onServer() + .named(JpaConstants.OPERATION_BULK_PATCH) + .withParameters(request) + .withAdditionalHeader(Constants.HEADER_PREFER, Constants.HEADER_PREFER_RESPOND_ASYNC) + .returnMethodOutcome() + .execute(); + assertEquals(Constants.STATUS_HTTP_202_ACCEPTED, response.getResponseStatusCode()); + String pollLocation = response.getResponseHeaders().get(Constants.HEADER_CONTENT_LOCATION).get(0); + String prefix = myRestfulServerExtension.getBaseUrl() + "/$hapi.fhir.bulk-patch-status?_jobId="; + assertThat(pollLocation).startsWith(prefix); + String jobId = pollLocation.substring(prefix.length()); + + myBatch2JobHelper.awaitJobCompletion(jobId); + + // Verify + assertEquals("123", readPatient("Patient/A").getIdentifierFirstRep().getValue()); + assertEquals("123", readPatient("Patient/B").getIdentifierFirstRep().getValue()); + } + + private Parameters createPatchToAddIdentifierToPatient() { + FhirPatchBuilder patchBuildr = new FhirPatchBuilder(myFhirContext); + patchBuildr + .add() + .path("Patient") + .name("identifier") + .value(new Identifier().setSystem("http://foo").setValue("123")); + return (Parameters) patchBuildr.build(); + } + +} diff --git a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/bulkmodify/framework/base/BaseBulkModifyOrRewriteProvider.java b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/bulkmodify/framework/base/BaseBulkModifyOrRewriteProvider.java index 6477dd802e63..fb1a15ac31f2 100644 --- a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/bulkmodify/framework/base/BaseBulkModifyOrRewriteProvider.java +++ b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/bulkmodify/framework/base/BaseBulkModifyOrRewriteProvider.java @@ -24,18 +24,26 @@ import ca.uhn.fhir.batch2.model.JobInstance; import ca.uhn.fhir.batch2.model.JobInstanceStartRequest; import ca.uhn.fhir.context.FhirContext; +import ca.uhn.fhir.context.RuntimeResourceDefinition; import ca.uhn.fhir.i18n.Msg; +import ca.uhn.fhir.interceptor.api.HookParams; +import ca.uhn.fhir.interceptor.api.IInterceptorService; +import ca.uhn.fhir.interceptor.api.Pointcut; +import ca.uhn.fhir.interceptor.model.RequestPartitionId; import ca.uhn.fhir.jpa.batch.models.Batch2JobStartResponse; import ca.uhn.fhir.jpa.dao.BaseTransactionProcessor; +import ca.uhn.fhir.jpa.model.config.PartitionSettings; import ca.uhn.fhir.jpa.model.util.JpaConstants; import ca.uhn.fhir.model.valueset.BundleTypeEnum; import ca.uhn.fhir.parser.IParser; import ca.uhn.fhir.rest.api.Constants; +import ca.uhn.fhir.rest.api.server.RequestDetails; import ca.uhn.fhir.rest.server.RestfulServer; import ca.uhn.fhir.rest.server.RestfulServerUtils; import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException; import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException; import ca.uhn.fhir.rest.server.servlet.ServletRequestDetails; +import ca.uhn.fhir.rest.server.util.CompositeInterceptorBroadcaster; import ca.uhn.fhir.rest.server.util.ServletRequestUtil; import ca.uhn.fhir.util.BundleBuilder; import ca.uhn.fhir.util.CanonicalBundleEntry; @@ -65,8 +73,10 @@ import java.util.Map; import java.util.Set; +import static ca.uhn.fhir.rest.server.provider.ProviderConstants.ALL_PARTITIONS_TENANT_NAME; import static org.apache.commons.lang3.StringUtils.isBlank; import static org.apache.commons.lang3.StringUtils.isNotBlank; +import static org.apache.commons.lang3.StringUtils.trim; /** * Base class for a plain provider which can initiate a Bulk Modify or Bulk Rewrite job. @@ -79,6 +89,12 @@ public abstract class BaseBulkModifyOrRewriteProvider { @Autowired protected IJobCoordinator myJobCoordinator; + @Autowired + protected PartitionSettings myPartitionSettings; + + @Autowired + protected IInterceptorService myInterceptorService; + /** * Subclasses should call this method to initiate a new job */ @@ -90,9 +106,14 @@ protected void startJobAndReturnResponse( IPrimitiveType theBatchSize, IPrimitiveType theLimitResourceCount, IPrimitiveType theLimitResourceVersionCount, + List> thePartitionIds, BaseBulkModifyJobParameters theJobParameters) throws IOException { ServletRequestUtil.validatePreferAsyncHeader(theRequestDetails, getOperationName()); + + theJobParameters.setRequestPartitionId( + parsePartitionIdsParameterAndInvokeInterceptors(theRequestDetails, thePartitionIds)); + if (theUrlsToReindex != null) { for (IPrimitiveType url : theUrlsToReindex) { if (isNotBlank(url.getValueAsString())) { @@ -162,6 +183,30 @@ protected void startJobAndReturnResponse( null); } + /** + * Parses the {@link JpaConstants#OPERATION_BULK_PATCH_PARAM_PARTITION_ID} parameter value(s) + * and invokes any registered {@link Pointcut#STORAGE_PARTITION_SELECTED} interceptors (used + * for security) + */ + private RequestPartitionId parsePartitionIdsParameterAndInvokeInterceptors( + RequestDetails theRequestDetails, List> thePartitionIds) { + if (!myPartitionSettings.isPartitioningEnabled()) { + return null; + } + + RequestPartitionId partitionId = parsePartitionIdsParameter(thePartitionIds); + + // Invoke interceptor: STORAGE_PARTITION_SELECTED + CompositeInterceptorBroadcaster.newCompositeBroadcaster(myInterceptorService, theRequestDetails) + .ifHasCallHooks(Pointcut.STORAGE_PARTITION_SELECTED, () -> new HookParams() + .add(RequestDetails.class, theRequestDetails) + .addIfMatchesType(ServletRequestDetails.class, theRequestDetails) + .add(RequestPartitionId.class, partitionId) + .add(RuntimeResourceDefinition.class, null)); + + return partitionId; + } + @Nonnull private String createPollUrl(ServletRequestDetails theRequestDetails, String jobInstanceId) { ServletContext servletContext = @@ -181,8 +226,7 @@ private String createPollUrl(ServletRequestDetails theRequestDetails, String job pollUrlBuilder.append(JpaConstants.OPERATION_BULK_PATCH_STATUS_PARAM_JOB_ID); pollUrlBuilder.append('='); pollUrlBuilder.append(jobInstanceId); - String pollUrl = pollUrlBuilder.toString(); - return pollUrl; + return pollUrlBuilder.toString(); } /** @@ -420,4 +464,37 @@ public void setContextForUnitTest(FhirContext theContext) { public void setJobCoordinatorForUnitTest(IJobCoordinator theJobCoordinator) { myJobCoordinator = theJobCoordinator; } + + @VisibleForTesting + public void setPartitionSettingsForUnitTest(PartitionSettings thePartitionSettings) { + myPartitionSettings = thePartitionSettings; + } + + public static RequestPartitionId parsePartitionIdsParameter(List> thePartitionIdsParameter) { + List partitionIds = new ArrayList<>(); + + if (thePartitionIdsParameter != null) { + for (IPrimitiveType next : thePartitionIdsParameter) { + String value = next.getValueAsString(); + if (isNotBlank(value)) { + if (ALL_PARTITIONS_TENANT_NAME.equals(value)) { + partitionIds.clear(); + break; + } + try { + partitionIds.add(Integer.parseInt(trim(value))); + } catch (NumberFormatException e) { + throw new InvalidRequestException( + Msg.code(2820) + "Invalid partition ID: " + UrlUtil.sanitizeUrlPart(value)); + } + } + } + } + + if (partitionIds.isEmpty()) { + return RequestPartitionId.allPartitions(); + } + + return RequestPartitionId.fromPartitionIds(partitionIds); + } } diff --git a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/bulkmodify/patch/BulkPatchProvider.java b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/bulkmodify/patch/BulkPatchProvider.java index 75b58edb1f0d..875806da0186 100644 --- a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/bulkmodify/patch/BulkPatchProvider.java +++ b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/bulkmodify/patch/BulkPatchProvider.java @@ -78,7 +78,14 @@ public void bulkPatch( @OperationParam( name = JpaConstants.OPERATION_BULK_PATCH_PARAM_LIMIT_RESOURCE_VERSION_COUNT, typeName = "integer") - IPrimitiveType theLimitResourceVersionCount) + IPrimitiveType theLimitResourceVersionCount, + // partitionId + @OperationParam( + name = JpaConstants.OPERATION_BULK_PATCH_PARAM_PARTITION_ID, + typeName = "string", + min = 0, + max = OperationParam.MAX_UNLIMITED) + List> thePartitionIds) throws IOException { BulkPatchJobParameters jobParameters = new BulkPatchJobParameters(); jobParameters.setFhirPatch(myContext, thePatch); @@ -91,6 +98,7 @@ public void bulkPatch( theBatchSize, theLimitResourceCount, theLimitResourceVersionCount, + thePartitionIds, jobParameters); } diff --git a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/bulkmodify/patchrewrite/BulkPatchRewriteProvider.java b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/bulkmodify/patchrewrite/BulkPatchRewriteProvider.java index 4dd8fbb54dc7..a6c21eb120ff 100644 --- a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/bulkmodify/patchrewrite/BulkPatchRewriteProvider.java +++ b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/bulkmodify/patchrewrite/BulkPatchRewriteProvider.java @@ -76,7 +76,14 @@ public void bulkPatch( @OperationParam( name = JpaConstants.OPERATION_BULK_PATCH_PARAM_LIMIT_RESOURCE_VERSION_COUNT, typeName = "integer") - IPrimitiveType theLimitResourceVersionCount) + IPrimitiveType theLimitResourceVersionCount, + // partitionIds + @OperationParam( + name = JpaConstants.OPERATION_BULK_PATCH_PARAM_PARTITION_ID, + typeName = "string", + min = 0, + max = OperationParam.MAX_UNLIMITED) + List> thePartitionIds) throws IOException { BulkPatchRewriteJobParameters jobParameters = new BulkPatchRewriteJobParameters(); jobParameters.setFhirPatch(myContext, thePatch); @@ -89,6 +96,7 @@ public void bulkPatch( theBatchSize, theLimitResourceCount, theLimitResourceVersionCount, + thePartitionIds, jobParameters); } diff --git a/hapi-fhir-storage-batch2-jobs/src/test/java/ca/uhn/fhir/batch2/jobs/bulkmodify/patch/BulkPatchProviderTest.java b/hapi-fhir-storage-batch2-jobs/src/test/java/ca/uhn/fhir/batch2/jobs/bulkmodify/patch/BulkPatchProviderTest.java index 7d4995d8c536..e5847390d54e 100644 --- a/hapi-fhir-storage-batch2-jobs/src/test/java/ca/uhn/fhir/batch2/jobs/bulkmodify/patch/BulkPatchProviderTest.java +++ b/hapi-fhir-storage-batch2-jobs/src/test/java/ca/uhn/fhir/batch2/jobs/bulkmodify/patch/BulkPatchProviderTest.java @@ -9,9 +9,12 @@ import ca.uhn.fhir.batch2.model.StatusEnum; import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.jpa.batch.models.Batch2JobStartResponse; +import ca.uhn.fhir.jpa.model.config.PartitionSettings; import ca.uhn.fhir.jpa.model.util.JpaConstants; import ca.uhn.fhir.rest.api.Constants; import ca.uhn.fhir.rest.client.apache.ResourceEntity; +import ca.uhn.fhir.rest.param.StringParam; +import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException; import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException; import ca.uhn.fhir.test.utilities.HttpClientExtension; import ca.uhn.fhir.test.utilities.server.RestfulServerExtension; @@ -20,11 +23,13 @@ import ca.uhn.test.util.ParsedHttpResponse; import jakarta.annotation.Nonnull; import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpPost; import org.eclipse.jetty.http.HttpStatus; import org.hl7.fhir.instance.model.api.IBaseResource; +import org.hl7.fhir.instance.model.api.IPrimitiveType; import org.hl7.fhir.r4.model.BooleanType; import org.hl7.fhir.r4.model.Bundle; import org.hl7.fhir.r4.model.CodeType; @@ -37,6 +42,7 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; import org.mockito.ArgumentCaptor; @@ -48,7 +54,9 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.util.Arrays; import java.util.List; +import java.util.stream.Collectors; import java.util.stream.Stream; import static ca.uhn.fhir.jpa.model.util.JpaConstants.OPERATION_BULK_PATCH_STATUS; @@ -57,6 +65,7 @@ import static ca.uhn.fhir.jpa.model.util.JpaConstants.OPERATION_BULK_PATCH_STATUS_PARAM_RETURN_VALUE_DRYRUN_CHANGES; import static ca.uhn.fhir.jpa.model.util.JpaConstants.OPERATION_BULK_PATCH_STATUS_PARAM_RETURN_VALUE_REPORT; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; @@ -89,6 +98,7 @@ public class BulkPatchProviderTest { void beforeEach() { ourProvider.setContextForUnitTest(ourCtx); ourProvider.setJobCoordinatorForUnitTest(myJobCoordinator); + ourProvider.setPartitionSettingsForUnitTest(new PartitionSettings()); } @ParameterizedTest @@ -319,6 +329,34 @@ void testPollForStatus_UnknownJob() throws IOException { } } + @ParameterizedTest + @CsvSource(delimiter = '#', textBlock = """ + 1|2|3 # {"allPartitions":false,"partitionIds":[1,2,3]} + 1| 2 |3 # {"allPartitions":false,"partitionIds":[1,2,3]} + 1|2| | |3 # {"allPartitions":false,"partitionIds":[1,2,3]} + _ALL # {"allPartitions":true} + _ALL|1 # {"allPartitions":true} + 2|_ALL|1 # {"allPartitions":true} + FOO # EX: HAPI-2820: Invalid partition ID: FOO + """) + void testPparsePartitionIdsParameter(String theValues, String theExpected) { + List> input = Arrays.stream(StringUtils.split(theValues, '|')) + .map(StringType::new) + .collect(Collectors.toUnmodifiableList()); + + if (theExpected.startsWith("EX: ")) { + String expectedMessage = theExpected.substring(4); + assertThatThrownBy(() -> BulkPatchProvider.parsePartitionIdsParameter(input)) + .isInstanceOf(InvalidRequestException.class) + .hasMessage(expectedMessage); + return; + } + + String actual = BulkPatchProvider.parsePartitionIdsParameter(input).toJson(); + assertEquals(theExpected, actual); + } + + public static void validateStatusPollResponse(PollForStatusTest theParams, CloseableHttpResponse response) throws IOException { String responseString = IOUtils.toString(response.getEntity().getContent(), StandardCharsets.UTF_8); diff --git a/hapi-fhir-storage-batch2-jobs/src/test/java/ca/uhn/fhir/batch2/jobs/bulkmodify/patchrewrite/BulkPatchRewriteProviderTest.java b/hapi-fhir-storage-batch2-jobs/src/test/java/ca/uhn/fhir/batch2/jobs/bulkmodify/patchrewrite/BulkPatchRewriteProviderTest.java index 2e6e033d672c..da410cd629ab 100644 --- a/hapi-fhir-storage-batch2-jobs/src/test/java/ca/uhn/fhir/batch2/jobs/bulkmodify/patchrewrite/BulkPatchRewriteProviderTest.java +++ b/hapi-fhir-storage-batch2-jobs/src/test/java/ca/uhn/fhir/batch2/jobs/bulkmodify/patchrewrite/BulkPatchRewriteProviderTest.java @@ -6,18 +6,17 @@ import ca.uhn.fhir.batch2.model.JobInstanceStartRequest; import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.jpa.batch.models.Batch2JobStartResponse; +import ca.uhn.fhir.jpa.model.config.PartitionSettings; import ca.uhn.fhir.jpa.model.util.JpaConstants; import ca.uhn.fhir.rest.api.Constants; import ca.uhn.fhir.rest.client.apache.ResourceEntity; import ca.uhn.fhir.test.utilities.HttpClientExtension; import ca.uhn.fhir.test.utilities.server.RestfulServerExtension; -import org.apache.commons.io.IOUtils; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpPost; import org.eclipse.jetty.http.HttpStatus; import org.hl7.fhir.instance.model.api.IBaseResource; -import org.hl7.fhir.r4.model.OperationOutcome; import org.hl7.fhir.r4.model.Parameters; import org.hl7.fhir.r4.model.StringType; import org.junit.jupiter.api.BeforeEach; @@ -34,7 +33,6 @@ import org.slf4j.LoggerFactory; import java.io.IOException; -import java.nio.charset.StandardCharsets; import java.util.stream.Stream; import static ca.uhn.fhir.batch2.jobs.bulkmodify.patch.BulkPatchProviderTest.createTestPollForStatusParameters; @@ -72,6 +70,7 @@ class BulkPatchRewriteProviderTest { void beforeEach() { ourProvider.setContextForUnitTest(ourCtx); ourProvider.setJobCoordinatorForUnitTest(myJobCoordinator); + ourProvider.setPartitionSettingsForUnitTest(new PartitionSettings()); } @Test diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/IJobPartitionProvider.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/IJobPartitionProvider.java index e9451cbb9fdf..aa842046381d 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/IJobPartitionProvider.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/IJobPartitionProvider.java @@ -49,4 +49,16 @@ public interface IJobPartitionProvider { default List getPartitionedUrls(RequestDetails theRequestDetails, List theUrls) { return theUrls.stream().map(url -> new PartitionedUrl().setUrl(url)).collect(Collectors.toList()); } + + /** + * Returns a collection of partitions representing "all partitions" for each shard + */ + List getAllPartitions(); + + /** + * Groups the partitions within a given partition ID by shard + */ + default List splitPartitionByShards(RequestPartitionId theRequestPartitionId) { + return List.of(theRequestPartitionId); + } } diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/DefaultJobPartitionProvider.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/DefaultJobPartitionProvider.java index f126f7fd97e6..6ab9c3105955 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/DefaultJobPartitionProvider.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/DefaultJobPartitionProvider.java @@ -101,6 +101,7 @@ public List getPartitionedUrls(RequestDetails theRequestDetails, return retVal; } + @Override public List getAllPartitions() { return List.of(RequestPartitionId.allPartitions()); } diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/jobs/parameters/PartitionedUrl.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/jobs/parameters/PartitionedUrl.java index 7dec3aebce4e..83eb819ed22d 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/jobs/parameters/PartitionedUrl.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/jobs/parameters/PartitionedUrl.java @@ -45,6 +45,13 @@ public class PartitionedUrl implements IModelJson { @JsonProperty("requestPartitionId") private RequestPartitionId myRequestPartitionId; + /** + * Constructor + */ + public PartitionedUrl() { + super(); + } + public String getUrl() { return myUrl; } diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/jobs/step/GenerateRangeChunksStep.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/jobs/step/GenerateRangeChunksStep.java index d3afc35e8a42..7bd66a83132f 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/jobs/step/GenerateRangeChunksStep.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/jobs/step/GenerateRangeChunksStep.java @@ -21,6 +21,7 @@ import ca.uhn.fhir.batch2.api.IFirstJobStepWorker; import ca.uhn.fhir.batch2.api.IJobDataSink; +import ca.uhn.fhir.batch2.api.IJobPartitionProvider; import ca.uhn.fhir.batch2.api.JobExecutionFailedException; import ca.uhn.fhir.batch2.api.RunOutcome; import ca.uhn.fhir.batch2.api.StepExecutionDetails; @@ -32,17 +33,22 @@ import ca.uhn.fhir.util.Logs; import jakarta.annotation.Nonnull; import org.slf4j.Logger; +import org.springframework.beans.factory.annotation.Autowired; import org.thymeleaf.util.StringUtils; import java.util.Date; import java.util.List; import static ca.uhn.fhir.batch2.util.Batch2Utils.BATCH_START_DATE; +import static org.apache.commons.lang3.ObjectUtils.getIfNull; public class GenerateRangeChunksStep implements IFirstJobStepWorker { private static final Logger ourLog = Logs.getBatchTroubleshootingLog(); + @Autowired + private IJobPartitionProvider myJobPartitionProvider; + @Nonnull @Override public RunOutcome run( @@ -57,12 +63,20 @@ public RunOutcome run( List partitionedUrls = params.getPartitionedUrls(); if (!partitionedUrls.isEmpty()) { - partitionedUrls.forEach(partitionedUrl -> { - ChunkRangeJson chunkRangeJson = new ChunkRangeJson(start, end) - .setUrl(partitionedUrl.getUrl()) - .setPartitionId(getRequestPartitionIdForUrl(partitionedUrl)); - sendChunk(chunkRangeJson, theDataSink); - }); + for (PartitionedUrl partitionedUrl : partitionedUrls) { + RequestPartitionId requestPartitionId = partitionedUrl.getRequestPartitionId(); + requestPartitionId = getIfNull(requestPartitionId, RequestPartitionId.allPartitions()); + + List partitionChunks = + myJobPartitionProvider.splitPartitionByShards(requestPartitionId); + + for (RequestPartitionId nextPartitionId : partitionChunks) { + ChunkRangeJson chunkRangeJson = new ChunkRangeJson(start, end) + .setUrl(partitionedUrl.getUrl()) + .setPartitionId(nextPartitionId); + sendChunk(chunkRangeJson, theDataSink); + } + } return RunOutcome.SUCCESS; } @@ -82,8 +96,4 @@ private void sendChunk(ChunkRangeJson theData, IJobDataSink theD theData.getPartitionId()); theDataSink.accept(theData); } - - private RequestPartitionId getRequestPartitionIdForUrl(PartitionedUrl theUrl) { - return theUrl.getRequestPartitionId(); - } } diff --git a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/jobs/step/GenerateRangeChunksStepTest.java b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/jobs/step/GenerateRangeChunksStepTest.java index b2097d9c43d1..6addc0ab28b9 100644 --- a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/jobs/step/GenerateRangeChunksStepTest.java +++ b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/jobs/step/GenerateRangeChunksStepTest.java @@ -1,6 +1,7 @@ package ca.uhn.fhir.batch2.jobs.step; import ca.uhn.fhir.batch2.api.IJobDataSink; +import ca.uhn.fhir.batch2.api.IJobPartitionProvider; import ca.uhn.fhir.batch2.api.StepExecutionDetails; import ca.uhn.fhir.batch2.api.VoidModel; import ca.uhn.fhir.batch2.jobs.chunk.ChunkRangeJson; @@ -14,6 +15,7 @@ import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import org.mockito.ArgumentCaptor; +import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; @@ -21,6 +23,8 @@ import java.util.stream.Stream; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -28,11 +32,14 @@ @ExtendWith(MockitoExtension.class) public class GenerateRangeChunksStepTest { + @InjectMocks private final GenerateRangeChunksStep myStep = new GenerateRangeChunksStep<>(); @Mock private StepExecutionDetails myStepExecutionDetails; @Mock private IJobDataSink myJobDataSink; + @Mock + private IJobPartitionProvider myJobPartitionProvider; @BeforeEach void setUp() { @@ -63,6 +70,12 @@ public void run_withParameters_producesExpectedChunks(List thePa PartitionedUrlJobParameters parameters = new PartitionedUrlJobParameters(); thePartitionedUrls.forEach(parameters::addPartitionedUrl); + if (!thePartitionedUrls.isEmpty()) { + when(myJobPartitionProvider.splitPartitionByShards(any())).thenAnswer(t -> { + RequestPartitionId partitionId = t.getArgument(0, RequestPartitionId.class); + return List.of(partitionId); + }); + } when(myStepExecutionDetails.getParameters()).thenReturn(parameters); myStep.run(myStepExecutionDetails, myJobDataSink); @@ -81,8 +94,12 @@ public void run_withParameters_producesExpectedChunks(List thePa PartitionedUrl partitionedUrl = thePartitionedUrls.get(i); ChunkRangeJson chunkRangeJson = captor.getAllValues().get(i); assertThat(chunkRangeJson.getUrl()).isEqualTo(partitionedUrl.getUrl()); - assertThat(chunkRangeJson.getPartitionId()).isEqualTo(partitionedUrl.getRequestPartitionId()); + if (partitionedUrl.getRequestPartitionId() != null) { + assertEquals(partitionedUrl.getRequestPartitionId(), chunkRangeJson.getPartitionId()); + } else { + assertEquals(RequestPartitionId.allPartitions(), chunkRangeJson.getPartitionId()); + } } } } -} \ No newline at end of file +} diff --git a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/dao/tx/HapiTransactionService.java b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/dao/tx/HapiTransactionService.java index ca9d79027609..54c32c9e633b 100644 --- a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/dao/tx/HapiTransactionService.java +++ b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/dao/tx/HapiTransactionService.java @@ -64,6 +64,7 @@ import java.util.List; import java.util.Objects; import java.util.concurrent.Callable; +import java.util.stream.Stream; /** * @see IHapiTransactionService for an explanation of this class @@ -188,7 +189,7 @@ public T execute( * @deprecated Use {@link #withRequest(RequestDetails)} with fluent call instead */ @Deprecated - @SuppressWarnings("ConstantConditions") + @SuppressWarnings({"ConstantConditions", "removal"}) public T execute( @Nullable RequestDetails theRequestDetails, @Nullable TransactionDetails theTransactionDetails, @@ -207,6 +208,7 @@ public T execute( /** * @deprecated Use {@link #withRequest(RequestDetails)} with fluent call instead */ + @SuppressWarnings("removal") @Deprecated public T execute( @Nullable RequestDetails theRequestDetails, @@ -533,6 +535,7 @@ public ExecutionBuilder readOnly() { return this; } + @SuppressWarnings("removal") @Override public ExecutionBuilder onRollback(Runnable theOnRollback) { assert myOnRollback == null; @@ -560,6 +563,21 @@ public T execute(@Nonnull TransactionCallback callback) { return doExecute(this, callback); } + @Override + public T read(IExecutionCallable theCallback) { + return execute(() -> theCallback.call(myRequestPartitionId)); + } + + @Override + public Stream search(IExecutionCallable> theCallback) { + return execute(() -> theCallback.call(myRequestPartitionId)); + } + + @Override + public List searchList(IExecutionCallable> theCallback) { + return execute(() -> theCallback.call(myRequestPartitionId)); + } + @VisibleForTesting public RequestPartitionId getRequestPartitionIdForTesting() { return myRequestPartitionId; diff --git a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/dao/tx/IHapiTransactionService.java b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/dao/tx/IHapiTransactionService.java index d1f1e7f8c160..0bce6f5feb78 100644 --- a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/dao/tx/IHapiTransactionService.java +++ b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/dao/tx/IHapiTransactionService.java @@ -96,6 +96,9 @@ T withRequest( *

* This is an experimental API, subject to change in a future release. *

+ *

+ * This method should return the same result regardless of the order of the arguments. + *

* * @since 7.4.0 */ @@ -132,7 +135,6 @@ default IExecutionBuilder readOnly(boolean theReadOnly) { return this; } } - ; /** * @deprecated Use {@link TransactionDetails#addRollbackUndoAction(Runnable)} instead @@ -149,23 +151,22 @@ default IExecutionBuilder readOnly(boolean theReadOnly) { /** * Read query path. */ - default T read(Callable theCallback) { - return execute(theCallback); - } + T read(IExecutionCallable theCallback); /** * Search for open Stream. * The Stream may not be readable outside an outermost transaction. */ - default Stream search(Callable> theCallback) { - return execute(theCallback); - } + Stream search(IExecutionCallable> theCallback); /** * Search for concrete List. */ - default List searchList(Callable> theCallback) { - return execute(theCallback); - } + List searchList(IExecutionCallable> theCallback); + } + + interface IExecutionCallable { + + T call(RequestPartitionId theRequestPartition); } } diff --git a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/interceptor/PatientCompartmentEnforcingInterceptor.java b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/interceptor/PatientCompartmentEnforcingInterceptor.java index 28d686f285dd..4c84ec177c05 100644 --- a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/interceptor/PatientCompartmentEnforcingInterceptor.java +++ b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/interceptor/PatientCompartmentEnforcingInterceptor.java @@ -61,7 +61,7 @@ public PatientCompartmentEnforcingInterceptor( @Hook(Pointcut.STORAGE_PRESTORAGE_RESOURCE_UPDATED) public void storagePreStorageResourceUpdated(IBaseResource theOldResource, IBaseResource theResource) { - ourLog.info("Interceptor STORAGE_PRESTORAGE_RESOURCE_UPDATED - started"); + ourLog.debug("Interceptor STORAGE_PRESTORAGE_RESOURCE_UPDATED - started"); StopWatch stopWatch = new StopWatch(); try { String patientCompartmentOld = ResourceCompartmentUtil.getPatientCompartmentIdentity( @@ -78,7 +78,7 @@ public void storagePreStorageResourceUpdated(IBaseResource theOldResource, IBase } } finally { - ourLog.info("Interceptor STORAGE_PRESTORAGE_RESOURCE_UPDATED - ended, execution took {}", stopWatch); + ourLog.debug("Interceptor STORAGE_PRESTORAGE_RESOURCE_UPDATED - ended, execution took {}", stopWatch); } } } diff --git a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/interceptor/PatientIdPartitionInterceptor.java b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/interceptor/PatientIdPartitionInterceptor.java index 972b16bb43d0..c8dc10dbc5ba 100644 --- a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/interceptor/PatientIdPartitionInterceptor.java +++ b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/interceptor/PatientIdPartitionInterceptor.java @@ -391,10 +391,12 @@ protected RequestPartitionId provideCompartmentMemberInstanceResponse( * Math.abs(theResourceIdPart.hashCode()) % 15000. *

* This logic can be replaced with other logic of your choosing. + * + * @see #defaultPartitionAlgorithm(String) */ @SuppressWarnings("unused") protected int providePartitionIdForPatientId(RequestDetails theRequestDetails, String theResourceIdPart) { - return Math.abs(theResourceIdPart.hashCode() % 15000); + return defaultPartitionAlgorithm(theResourceIdPart); } /** @@ -419,4 +421,12 @@ protected RequestPartitionId throwNonCompartmentMemberInstanceFailureResponse(IB protected RequestPartitionId provideNonCompartmentMemberTypeResponse(IBaseResource theResource) { return myPartitionSettings.getDefaultRequestPartitionId(); } + + /** + * This method supplies the default algorithm used for partitioning, if {@link #providePartitionIdForPatientId(RequestDetails, String)} + * has not been overridden. + */ + public static int defaultPartitionAlgorithm(String theResourceIdPart) { + return Math.abs(theResourceIdPart.hashCode() % 15000); + } } diff --git a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/partition/BaseRequestPartitionHelperSvc.java b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/partition/BaseRequestPartitionHelperSvc.java index 0720e17a638f..521261c2bfc9 100644 --- a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/partition/BaseRequestPartitionHelperSvc.java +++ b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/partition/BaseRequestPartitionHelperSvc.java @@ -29,6 +29,7 @@ import ca.uhn.fhir.interceptor.model.RequestPartitionId; import ca.uhn.fhir.jpa.model.config.PartitionSettings; import ca.uhn.fhir.jpa.model.util.JpaConstants; +import ca.uhn.fhir.rest.api.Constants; import ca.uhn.fhir.rest.api.server.RequestDetails; import ca.uhn.fhir.rest.api.server.SystemRequestDetails; import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; @@ -293,6 +294,12 @@ public RequestPartitionId determineCreatePartitionForRequest( return RequestPartitionId.allPartitions(); } + RequestPartitionId existingPartitionId = + (RequestPartitionId) theResource.getUserData(Constants.RESOURCE_PARTITION_ID); + if (existingPartitionId != null) { + return existingPartitionId; + } + RequestDetails requestDetails = theRequest; boolean nonPartitionableResource = isResourceNonPartitionable(theResourceType);