Skip to content

Commit 7f3ff6f

Browse files
authored
6716 fixing error msg update code (hapifhir#6764)
1 parent 1321fa3 commit 7f3ff6f

File tree

14 files changed

+264
-45
lines changed

14 files changed

+264
-45
lines changed

hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImpl.java

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@
5656
import jakarta.annotation.Nullable;
5757
import jakarta.persistence.EntityManager;
5858
import jakarta.persistence.LockModeType;
59-
import jakarta.persistence.Query;
6059
import org.apache.commons.collections4.ListUtils;
6160
import org.apache.commons.lang3.Validate;
6261
import org.slf4j.Logger;
@@ -381,15 +380,10 @@ public WorkChunkStatusEnum onWorkChunkError(WorkChunkErrorEvent theParameters) {
381380
return myTransactionService.withSystemRequestOnDefaultPartition().execute(() -> {
382381
int changeCount = myWorkChunkRepository.updateChunkStatusAndIncrementErrorCountForEndError(
383382
chunkId, new Date(), errorMessage, WorkChunkStatusEnum.ERRORED);
384-
Validate.isTrue(changeCount > 0, "changed chunk matching %s", chunkId);
385-
386-
Query query = myEntityManager.createQuery("update Batch2WorkChunkEntity " + "set myStatus = :failed "
387-
+ ",myErrorMessage = CONCAT('Too many errors: ', CAST(myErrorCount as string), '. Last error msg was ', myErrorMessage) "
388-
+ "where myId = :chunkId and myErrorCount > :maxCount");
389-
query.setParameter("chunkId", chunkId);
390-
query.setParameter("failed", WorkChunkStatusEnum.FAILED);
391-
query.setParameter("maxCount", MAX_CHUNK_ERROR_COUNT);
392-
int failChangeCount = query.executeUpdate();
383+
Validate.isTrue(changeCount > 0, "No changed chunk matching %s", chunkId);
384+
385+
int failChangeCount = myWorkChunkRepository.updateChunkForTooManyErrors(
386+
WorkChunkStatusEnum.FAILED, chunkId, MAX_CHUNK_ERROR_COUNT, ERROR_MSG_MAX_LENGTH);
393387

394388
if (failChangeCount > 0) {
395389
return WorkChunkStatusEnum.FAILED;

hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/data/IBatch2WorkChunkRepository.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,26 @@ int updateChunkStatusAndIncrementErrorCountForEndError(
113113
@Param("em") String theErrorMessage,
114114
@Param("status") WorkChunkStatusEnum theInProgress);
115115

116+
/**
117+
* Updates the workchunk error count and error message for WorkChunks that have failed after multiple retries.
118+
*
119+
* @param theStatus - the new status of the workchunk
120+
* @param theChunkId - the id of the workchunk to update
121+
* @param theMaxErrorCount - maximum error count (# of errors allowed for retry)
122+
* @param theMaxErrorSize - max error size (maximum number of characters)
123+
* @return - the number of updated chunks (should be 1)
124+
*/
125+
@Modifying
126+
@Query("UPDATE Batch2WorkChunkEntity e "
127+
+ "SET e.myStatus = :failed, "
128+
+ "e.myErrorMessage = LEFT(CONCAT('Too many errors (', CAST(e.myErrorCount as string), '). Last err msg ', e.myErrorMessage), :maxErrorSize) "
129+
+ "WHERE e.myId = :chunkId and e.myErrorCount > :maxCount")
130+
int updateChunkForTooManyErrors(
131+
@Param("failed") WorkChunkStatusEnum theStatus,
132+
@Param("chunkId") String theChunkId,
133+
@Param("maxCount") int theMaxErrorCount,
134+
@Param("maxErrorSize") int theMaxErrorSize);
135+
116136
@Modifying
117137
@Query(
118138
"UPDATE Batch2WorkChunkEntity e SET e.myStatus = :status, e.myStartTime = :st WHERE e.myId = :id AND e.myStatus IN :startStatuses")

hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/channel/subscription/SubscriptionChannelFactoryTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelReceiver;
44
import ca.uhn.fhir.jpa.subscription.channel.impl.LinkedBlockingChannelFactory;
5+
import ca.uhn.fhir.jpa.subscription.channel.impl.RetryPolicyProvider;
56
import org.junit.jupiter.api.BeforeEach;
67
import org.junit.jupiter.api.Test;
78
import org.junit.jupiter.api.extension.ExtendWith;
@@ -39,7 +40,7 @@ public class SubscriptionChannelFactoryTest {
3940
@BeforeEach
4041
public void before() {
4142
when(myChannelNamer.getChannelName(any(), any())).thenReturn("CHANNEL_NAME");
42-
mySvc = new SubscriptionChannelFactory(new LinkedBlockingChannelFactory(myChannelNamer));
43+
mySvc = new SubscriptionChannelFactory(new LinkedBlockingChannelFactory(myChannelNamer, new RetryPolicyProvider()));
4344
}
4445

4546
/**

hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/BaseSubscriptionTest.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import ca.uhn.fhir.jpa.searchparam.registry.SearchParamRegistryImpl;
99
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelFactory;
1010
import ca.uhn.fhir.jpa.subscription.channel.impl.LinkedBlockingChannelFactory;
11+
import ca.uhn.fhir.jpa.subscription.channel.impl.RetryPolicyProvider;
1112
import ca.uhn.fhir.jpa.subscription.channel.subscription.IChannelNamer;
1213
import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelFactory;
1314
import ca.uhn.fhir.jpa.subscription.match.config.SubscriptionProcessorConfig;
@@ -73,6 +74,9 @@ public void initSearchParamRegistry(IBaseResource theReadResource) {
7374
@Configuration
7475
public static class MyConfig {
7576

77+
// would normally be a bean; but this is a test
78+
private RetryPolicyProvider myRetryPolicyProvider = new RetryPolicyProvider();
79+
7680
@Bean
7781
public JpaStorageSettings jpaStorageSettings() {
7882
return new JpaStorageSettings();
@@ -85,12 +89,12 @@ public SubscriptionSettings subscriptionSettings() {
8589

8690
@Bean
8791
public IChannelFactory channelFactory(IChannelNamer theNamer) {
88-
return new LinkedBlockingChannelFactory(theNamer);
92+
return new LinkedBlockingChannelFactory(theNamer, myRetryPolicyProvider);
8993
}
9094

9195
@Bean
9296
public SubscriptionChannelFactory mySubscriptionChannelFactory(IChannelNamer theChannelNamer) {
93-
return new SubscriptionChannelFactory(new LinkedBlockingChannelFactory(theChannelNamer));
97+
return new SubscriptionChannelFactory(new LinkedBlockingChannelFactory(theChannelNamer, myRetryPolicyProvider));
9498
}
9599

96100
@Bean

hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/SubscriptionTestConfig.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import ca.uhn.fhir.jpa.searchparam.config.SearchParamConfig;
2626
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelFactory;
2727
import ca.uhn.fhir.jpa.subscription.channel.impl.LinkedBlockingChannelFactory;
28+
import ca.uhn.fhir.jpa.subscription.channel.impl.RetryPolicyProvider;
2829
import ca.uhn.fhir.jpa.subscription.channel.subscription.IChannelNamer;
2930
import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelFactory;
3031
import org.springframework.beans.factory.annotation.Autowired;
@@ -43,6 +44,8 @@ public class SubscriptionTestConfig {
4344
private FhirContext myFhirContext;
4445
@Autowired
4546
private IChannelNamer myChannelNamer;
47+
@Autowired
48+
private RetryPolicyProvider myRetryPolicyProvider;
4649

4750
@Primary
4851
@Bean(name = "myJpaValidationSupportChain")
@@ -52,7 +55,7 @@ public IValidationSupport validationSupportChainR4() {
5255

5356
@Bean
5457
public IChannelFactory subscribableChannelFactory() {
55-
return new LinkedBlockingChannelFactory(myChannelNamer);
58+
return new LinkedBlockingChannelFactory(myChannelNamer, myRetryPolicyProvider);
5659
}
5760

5861
@Bean

hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/batch2/Batch2CoordinatorIT.java

Lines changed: 152 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,5 @@
11
package ca.uhn.fhir.jpa.batch2;
22

3-
import static org.junit.jupiter.api.Assertions.assertEquals;
4-
import static org.junit.jupiter.api.Assertions.assertFalse;
5-
import static org.junit.jupiter.api.Assertions.assertTrue;
6-
import static org.junit.jupiter.api.Assertions.assertNotNull;
73
import ca.uhn.fhir.batch2.api.ChunkExecutionDetails;
84
import ca.uhn.fhir.batch2.api.IJobCompletionHandler;
95
import ca.uhn.fhir.batch2.api.IJobCoordinator;
@@ -25,15 +21,19 @@
2521
import ca.uhn.fhir.batch2.model.JobInstanceStartRequest;
2622
import ca.uhn.fhir.batch2.model.JobWorkNotificationJsonMessage;
2723
import ca.uhn.fhir.batch2.model.StatusEnum;
24+
import ca.uhn.fhir.batch2.model.WorkChunk;
25+
import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
2826
import ca.uhn.fhir.batch2.models.JobInstanceFetchRequest;
2927
import ca.uhn.fhir.jpa.batch.models.Batch2JobStartResponse;
3028
import ca.uhn.fhir.jpa.subscription.channel.api.ChannelConsumerSettings;
3129
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelFactory;
3230
import ca.uhn.fhir.jpa.subscription.channel.impl.LinkedBlockingChannel;
31+
import ca.uhn.fhir.jpa.subscription.channel.impl.RetryPolicyProvider;
3332
import ca.uhn.fhir.jpa.test.BaseJpaR4Test;
3433
import ca.uhn.fhir.jpa.test.Batch2JobHelper;
3534
import ca.uhn.fhir.jpa.test.config.Batch2FastSchedulerConfig;
3635
import ca.uhn.fhir.jpa.test.config.TestR4Config;
36+
import ca.uhn.fhir.jpa.util.RandomTextUtils;
3737
import ca.uhn.fhir.model.api.IModelJson;
3838
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
3939
import ca.uhn.fhir.test.utilities.UnregisterScheduledProcessor;
@@ -51,9 +51,16 @@
5151
import org.slf4j.Logger;
5252
import org.slf4j.LoggerFactory;
5353
import org.springframework.beans.factory.annotation.Autowired;
54+
import org.springframework.context.annotation.Bean;
55+
import org.springframework.context.annotation.Configuration;
56+
import org.springframework.context.annotation.Primary;
5457
import org.springframework.data.domain.Page;
5558
import org.springframework.data.domain.Sort;
5659
import org.springframework.messaging.MessageHandler;
60+
import org.springframework.retry.RetryPolicy;
61+
import org.springframework.retry.backoff.BackOffPolicy;
62+
import org.springframework.retry.backoff.NoBackOffPolicy;
63+
import org.springframework.retry.policy.MaxAttemptsRetryPolicy;
5764
import org.springframework.test.context.ContextConfiguration;
5865
import org.springframework.test.context.TestPropertySource;
5966
import org.testcontainers.shaded.org.awaitility.Awaitility;
@@ -73,19 +80,71 @@
7380

7481
import static ca.uhn.fhir.batch2.config.BaseBatch2Config.CHANNEL_NAME;
7582
import static ca.uhn.fhir.batch2.coordinator.WorkChunkProcessor.MAX_CHUNK_ERROR_COUNT;
83+
import static ca.uhn.fhir.jpa.entity.Batch2WorkChunkEntity.ERROR_MSG_MAX_LENGTH;
7684
import static org.assertj.core.api.Assertions.assertThat;
77-
import static org.junit.jupiter.api.Assertions.fail;
85+
import static org.awaitility.Awaitility.await;
86+
import static org.junit.jupiter.api.Assertions.assertEquals;
87+
import static org.junit.jupiter.api.Assertions.assertFalse;
88+
import static org.junit.jupiter.api.Assertions.assertNotNull;
89+
import static org.junit.jupiter.api.Assertions.assertTrue;
7890
import static org.junit.jupiter.api.Assertions.fail;
7991

8092

8193
@ContextConfiguration(classes = {
82-
Batch2FastSchedulerConfig.class
94+
Batch2FastSchedulerConfig.class,
95+
Batch2CoordinatorIT.RPConfig.class
8396
})
8497
@TestPropertySource(properties = {
8598
// These tests require scheduling to work
8699
UnregisterScheduledProcessor.SCHEDULING_DISABLED_EQUALS_FALSE
87100
})
88101
public class Batch2CoordinatorIT extends BaseJpaR4Test {
102+
103+
/***
104+
* Our internal configuration of Retry Mechanism is
105+
* with exponential backoff, and infinite retries.
106+
*
107+
* This isn't ideal for tests; so we will override
108+
* the retry mechanism for tests that require it to
109+
* make them run faster and more 'predictably'
110+
*/
111+
public static class RetryProviderOverride extends RetryPolicyProvider {
112+
113+
private RetryPolicy myRetryPolicy;
114+
115+
private BackOffPolicy myBackOffPolicy;
116+
117+
public void setPolicies(RetryPolicy theRetryPolicy, BackOffPolicy theBackOffPolicy) {
118+
myRetryPolicy = theRetryPolicy;
119+
myBackOffPolicy = theBackOffPolicy;
120+
}
121+
122+
@Override
123+
protected RetryPolicy retryPolicy() {
124+
if (myRetryPolicy != null) {
125+
return myRetryPolicy;
126+
}
127+
return super.retryPolicy();
128+
}
129+
130+
@Override
131+
protected BackOffPolicy backOffPolicy() {
132+
if (myBackOffPolicy != null) {
133+
return myBackOffPolicy;
134+
}
135+
return super.backOffPolicy();
136+
}
137+
}
138+
139+
@Configuration
140+
public static class RPConfig {
141+
@Primary
142+
@Bean
143+
public RetryPolicyProvider retryPolicyProvider() {
144+
return new RetryProviderOverride();
145+
}
146+
}
147+
89148
private static final Logger ourLog = LoggerFactory.getLogger(Batch2CoordinatorIT.class);
90149

91150
public static final int TEST_JOB_VERSION = 1;
@@ -106,6 +165,9 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
106165
@Autowired
107166
IJobPersistence myJobPersistence;
108167

168+
@Autowired
169+
private RetryPolicyProvider myRetryPolicyProvider;
170+
109171
@RegisterExtension
110172
LogbackTestExtension myLogbackTestExtension = new LogbackTestExtension();
111173

@@ -132,6 +194,11 @@ public void before() throws Exception {
132194
};
133195
myWorkChannel = (LinkedBlockingChannel) myChannelFactory.getOrCreateReceiver(CHANNEL_NAME, JobWorkNotificationJsonMessage.class, new ChannelConsumerSettings());
134196
myStorageSettings.setJobFastTrackingEnabled(true);
197+
198+
// reset
199+
if (myRetryPolicyProvider instanceof RetryProviderOverride rp) {
200+
rp.setPolicies(null, null);
201+
}
135202
}
136203

137204
@AfterEach
@@ -589,6 +656,84 @@ private void complete(
589656
assertEquals(1.0, jobInstance.getProgress());
590657
}
591658

659+
@Test
660+
public void failingWorkChunks_withLargeErrorMsgs_shouldNotErrorOutTheJob() {
661+
// setup
662+
assertTrue(myRetryPolicyProvider instanceof RetryProviderOverride);
663+
664+
String jobId = getMethodNameForJobId();
665+
AtomicInteger counter = new AtomicInteger();
666+
667+
// we want an error message larger than can be contained in the db
668+
String errorMsg = RandomTextUtils.newSecureRandomAlphaNumericString(ERROR_MSG_MAX_LENGTH + 100);
669+
670+
// we want 1 more error than the allowed maximum
671+
// otherwise we won't be updating the error chunk to have
672+
// "Too many errors" error
673+
int errorCount = MAX_CHUNK_ERROR_COUNT + 1;
674+
675+
MaxAttemptsRetryPolicy retryPolicy = new MaxAttemptsRetryPolicy();
676+
retryPolicy.setMaxAttempts(errorCount);
677+
RetryProviderOverride overrideRetryProvider = (RetryProviderOverride) myRetryPolicyProvider;
678+
overrideRetryProvider.setPolicies(retryPolicy, new NoBackOffPolicy());
679+
680+
// create a job that fails and throws a large error
681+
IJobStepWorker<TestJobParameters, VoidModel, FirstStepOutput> first = (step, sink) -> {
682+
counter.getAndIncrement();
683+
throw new RuntimeException(errorMsg);
684+
};
685+
IJobStepWorker<TestJobParameters, FirstStepOutput, VoidModel> last = (step, sink) -> {
686+
// we don't care; we'll never get here
687+
return RunOutcome.SUCCESS;
688+
};
689+
690+
JobDefinition<? extends IModelJson> jd = JobDefinition.newBuilder()
691+
.setJobDefinitionId(jobId)
692+
.setJobDescription("test job")
693+
.setJobDefinitionVersion(TEST_JOB_VERSION)
694+
.setParametersType(TestJobParameters.class)
695+
.gatedExecution()
696+
.addFirstStep(
697+
FIRST_STEP_ID,
698+
"First step",
699+
FirstStepOutput.class,
700+
first
701+
)
702+
.addLastStep(
703+
LAST_STEP_ID,
704+
"Final step",
705+
last
706+
)
707+
.completionHandler(myCompletionHandler)
708+
.build();
709+
myJobDefinitionRegistry.addJobDefinition(jd);
710+
711+
// test
712+
JobInstanceStartRequest request = buildRequest(jobId);
713+
Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(new SystemRequestDetails(), request);
714+
String instanceId = startResponse.getInstanceId();
715+
716+
// waiting for the multitude of failures
717+
await().until(() -> {
718+
myJobMaintenanceService.runMaintenancePass();
719+
JobInstance instance = myJobCoordinator.getInstance(instanceId);
720+
ourLog.info("Attempt " + counter.get() + " for "
721+
+ instance.getInstanceId() + ". Status: " + instance.getStatus());
722+
return counter.get() > errorCount - 1;
723+
});
724+
725+
// verify
726+
Iterator<WorkChunk> iterator = myJobPersistence.fetchAllWorkChunksIterator(instanceId, true);
727+
List<WorkChunk> listOfChunks = new ArrayList<>();
728+
iterator.forEachRemaining(listOfChunks::add);
729+
assertEquals(1, listOfChunks.size());
730+
WorkChunk workChunk = listOfChunks.get(0);
731+
assertEquals(WorkChunkStatusEnum.FAILED, workChunk.getStatus());
732+
// should contain some of the original error msg, but not all
733+
assertTrue(workChunk.getErrorMessage().contains(errorMsg.substring(0, 100)));
734+
assertTrue(workChunk.getErrorMessage().startsWith("Too many errors"));
735+
}
736+
592737
@Test
593738
public void testJobWithLongPollingStep() throws InterruptedException {
594739
// create job definition
@@ -745,7 +890,7 @@ public void testUnknownException_KeepsInProgress_CanCancelManually() throws Inte
745890
callLatch(myFirstStepLatch, step);
746891
throw new RuntimeException("Expected Test Exception");
747892
};
748-
IJobStepWorker<TestJobParameters, FirstStepOutput, VoidModel> lastStep = (step, sink) -> fail();
893+
IJobStepWorker<TestJobParameters, FirstStepOutput, VoidModel> lastStep = (step, sink) -> fail();
749894

750895
String jobDefId = getMethodNameForJobId();
751896
JobDefinition<? extends IModelJson> definition = buildGatedJobDefinition(jobDefId, firstStep, lastStep);

hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/delete/job/DeleteExpungeJobTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
1111
import ca.uhn.fhir.jpa.test.BaseJpaR4Test;
1212
import ca.uhn.fhir.jpa.test.Batch2JobHelper;
13+
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
1314
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
1415
import org.hl7.fhir.instance.model.api.IIdType;
1516
import org.hl7.fhir.r4.model.DiagnosticReport;
@@ -79,7 +80,7 @@ public void testDeleteExpunge() {
7980
startRequest.setJobDefinitionId(DeleteExpungeAppCtx.JOB_DELETE_EXPUNGE);
8081

8182
// execute
82-
Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(startRequest);
83+
Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(new SystemRequestDetails(), startRequest);
8384
myBatch2JobHelper.awaitJobCompletion(startResponse);
8485

8586
// validate

0 commit comments

Comments
 (0)