From 5e8b3cc6a937b7022b0efab49d8c72f604e2bc10 Mon Sep 17 00:00:00 2001 From: Ivan Baisi Date: Wed, 19 Nov 2025 12:13:24 -0800 Subject: [PATCH 1/5] add BATCH2_JOB_START and BATCH2_JOB_COMPLETION pointcuts --- .../ca/uhn/fhir/interceptor/api/Pointcut.java | 30 +++++++++++++++++++ .../fhir/batch2/config/BaseBatch2Config.java | 8 +++-- .../coordinator/JobCoordinatorImpl.java | 16 +++++++++- .../ReductionStepExecutorServiceImpl.java | 19 ++++++++++-- .../coordinator/JobCoordinatorImplTest.java | 5 +++- .../ReductionStepExecutorServiceImplTest.java | 5 +++- 6 files changed, 76 insertions(+), 7 deletions(-) diff --git a/hapi-fhir-base/src/main/java/ca/uhn/fhir/interceptor/api/Pointcut.java b/hapi-fhir-base/src/main/java/ca/uhn/fhir/interceptor/api/Pointcut.java index 737e82ec5ee7..59c0b0092e80 100644 --- a/hapi-fhir-base/src/main/java/ca/uhn/fhir/interceptor/api/Pointcut.java +++ b/hapi-fhir-base/src/main/java/ca/uhn/fhir/interceptor/api/Pointcut.java @@ -3390,6 +3390,36 @@ public enum Pointcut implements IPointcut { BATCH2_CHUNK_PROCESS_FILTER( IInterceptorFilterHook.class, "ca.uhn.fhir.batch2.model.JobInstance", "ca.uhn.fhir.batch2.model.WorkChunk"), + /** + * Batch2 Hook: + *

Invoked before the job instance is started. JobInstance is mutable here.

+ *

Parameters:

+ * + *

+ * Hooks should return void. + *

+ */ + BATCH2_JOB_START( + void.class, "ca.uhn.fhir.batch2.model.JobInstance", "ca.uhn.fhir.rest.api.server.RequestDetails"), + + /** + * Batch2 Hook: + *

Invoked after the job instance is completed. JobInstance is immutable here.

+ *

Parameters:

+ * + *

+ * Hooks should return void. + *

+ */ + BATCH2_JOB_COMPLETION( + void.class, "ca.uhn.fhir.batch2.model.JobInstance", "ca.uhn.fhir.batch2.api.JobCompletionDetails"), + /** * Provenance Agents Pointcut: * This is a pointcut to retrieve data for populating the agent element of a Provenance resource that needs to be created diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/config/BaseBatch2Config.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/config/BaseBatch2Config.java index bbadd9e65b0c..04c2b0f4c9e5 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/config/BaseBatch2Config.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/config/BaseBatch2Config.java @@ -41,6 +41,7 @@ import ca.uhn.fhir.broker.api.IChannelProducer; import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster; +import ca.uhn.fhir.interceptor.api.IInterceptorService; import ca.uhn.fhir.jpa.api.config.JpaStorageSettings; import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService; import ca.uhn.fhir.jpa.model.sched.ISchedulerService; @@ -65,6 +66,9 @@ public abstract class BaseBatch2Config { @Autowired IHapiTransactionService myHapiTransactionService; + @Autowired + private IInterceptorService myInterceptorService; + @Bean public JobDefinitionRegistry batch2JobDefinitionRegistry() { return new JobDefinitionRegistry(); @@ -83,7 +87,7 @@ public BatchJobSender batchJobSender() { @Bean public IJobCoordinator batch2JobCoordinator( JobDefinitionRegistry theJobDefinitionRegistry, IHapiTransactionService theTransactionService) { - return new JobCoordinatorImpl(myPersistence, theJobDefinitionRegistry, theTransactionService); + return new JobCoordinatorImpl(myPersistence, theJobDefinitionRegistry, theTransactionService, myInterceptorService); } @Bean @@ -91,7 +95,7 @@ public IReductionStepExecutorService reductionStepExecutorService( IJobPersistence theJobPersistence, IHapiTransactionService theTransactionService, JobDefinitionRegistry theJobDefinitionRegistry) { - return new ReductionStepExecutorServiceImpl(theJobPersistence, theTransactionService, theJobDefinitionRegistry); + return new ReductionStepExecutorServiceImpl(theJobPersistence, theTransactionService, theJobDefinitionRegistry, myInterceptorService); } @Bean diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/JobCoordinatorImpl.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/JobCoordinatorImpl.java index 4bc0ba2011fb..cf08f1b911ed 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/JobCoordinatorImpl.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/JobCoordinatorImpl.java @@ -31,6 +31,9 @@ import ca.uhn.fhir.batch2.model.StatusEnum; import ca.uhn.fhir.batch2.models.JobInstanceFetchRequest; import ca.uhn.fhir.i18n.Msg; +import ca.uhn.fhir.interceptor.api.HookParams; +import ca.uhn.fhir.interceptor.api.IInterceptorService; +import ca.uhn.fhir.interceptor.api.Pointcut; import ca.uhn.fhir.jpa.batch.models.Batch2JobStartResponse; import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService; import ca.uhn.fhir.rest.api.server.RequestDetails; @@ -59,6 +62,7 @@ public class JobCoordinatorImpl implements IJobCoordinator { private final JobQuerySvc myJobQuerySvc; private final JobParameterJsonValidator myJobParameterJsonValidator; private final IHapiTransactionService myTransactionService; + private final IInterceptorService myInterceptorService; /** * Constructor @@ -66,7 +70,8 @@ public class JobCoordinatorImpl implements IJobCoordinator { public JobCoordinatorImpl( @Nonnull IJobPersistence theJobPersistence, @Nonnull JobDefinitionRegistry theJobDefinitionRegistry, - @Nonnull IHapiTransactionService theTransactionService) { + @Nonnull IHapiTransactionService theTransactionService, + @Nonnull IInterceptorService theInterceptorService) { Validate.notNull(theJobPersistence); myJobPersistence = theJobPersistence; @@ -75,6 +80,7 @@ public JobCoordinatorImpl( myJobQuerySvc = new JobQuerySvc(theJobPersistence, theJobDefinitionRegistry); myJobParameterJsonValidator = new JobParameterJsonValidator(); myTransactionService = theTransactionService; + myInterceptorService = theInterceptorService; } @Override @@ -127,6 +133,14 @@ public Batch2JobStartResponse startInstance( .withPropagation(Propagation.REQUIRES_NEW) .execute(() -> myJobPersistence.onCreateWithFirstChunk(jobDefinition, theStartRequest.getParameters())); + if (myInterceptorService.hasHooks(Pointcut.BATCH2_JOB_START)) { + final HookParams params = new HookParams() + .add(JobDefinition.class, jobDefinition) + .add(RequestDetails.class, theRequestDetails); + myInterceptorService.callHooks( + Pointcut.BATCH2_JOB_START, params); + } + Batch2JobStartResponse response = new Batch2JobStartResponse(); response.setInstanceId(instanceAndFirstChunk.jobInstanceId); return response; diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/ReductionStepExecutorServiceImpl.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/ReductionStepExecutorServiceImpl.java index ff392af36f59..d9e8f6691226 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/ReductionStepExecutorServiceImpl.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/ReductionStepExecutorServiceImpl.java @@ -28,6 +28,7 @@ import ca.uhn.fhir.batch2.api.ReductionStepFailureException; import ca.uhn.fhir.batch2.api.StepExecutionDetails; import ca.uhn.fhir.batch2.model.ChunkOutcome; +import ca.uhn.fhir.batch2.model.JobDefinition; import ca.uhn.fhir.batch2.model.JobDefinitionStep; import ca.uhn.fhir.batch2.model.JobInstance; import ca.uhn.fhir.batch2.model.JobWorkCursor; @@ -37,6 +38,9 @@ import ca.uhn.fhir.batch2.progress.JobInstanceStatusUpdater; import ca.uhn.fhir.batch2.util.BatchJobOpenTelemetryUtils; import ca.uhn.fhir.i18n.Msg; +import ca.uhn.fhir.interceptor.api.HookParams; +import ca.uhn.fhir.interceptor.api.IInterceptorService; +import ca.uhn.fhir.interceptor.api.Pointcut; import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService; import ca.uhn.fhir.jpa.model.sched.HapiJob; import ca.uhn.fhir.jpa.model.sched.IHasScheduledJobs; @@ -95,6 +99,7 @@ public class ReductionStepExecutorServiceImpl implements IReductionStepExecutorS private final JobDefinitionRegistry myJobDefinitionRegistry; private final JobInstanceStatusUpdater myJobInstanceStatusUpdater; private Timer myHeartbeatTimer; + private final IInterceptorService myInterceptorService; /** * Constructor @@ -102,11 +107,13 @@ public class ReductionStepExecutorServiceImpl implements IReductionStepExecutorS public ReductionStepExecutorServiceImpl( IJobPersistence theJobPersistence, IHapiTransactionService theTransactionService, - JobDefinitionRegistry theJobDefinitionRegistry) { + JobDefinitionRegistry theJobDefinitionRegistry, + @Nonnull IInterceptorService theInterceptorService) { myJobPersistence = theJobPersistence; myTransactionService = theTransactionService; myJobDefinitionRegistry = theJobDefinitionRegistry; myJobInstanceStatusUpdater = new JobInstanceStatusUpdater(theJobDefinitionRegistry); + myInterceptorService = theInterceptorService; myReducerExecutor = Executors.newSingleThreadExecutor(new CustomizableThreadFactory("batch2-reducer")); @@ -348,8 +355,16 @@ private vo */ IJobCompletionHandler completionHandler = theJobWorkCursor.getJobDefinition().getCompletionHandler(); + JobCompletionDetails jcd = new JobCompletionDetails<>(parameters, instance); if (completionHandler != null) { - completionHandler.jobComplete(new JobCompletionDetails<>(parameters, instance)); + completionHandler.jobComplete(jcd); + } + if (myInterceptorService.hasHooks(Pointcut.BATCH2_JOB_COMPLETION)) { + final HookParams params = new HookParams() + .add(JobDefinition.class, theJobWorkCursor.getJobDefinition()) + .add(JobCompletionDetails.class, jcd); + myInterceptorService.callHooks( + Pointcut.BATCH2_JOB_COMPLETION, params); } } diff --git a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/JobCoordinatorImplTest.java b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/JobCoordinatorImplTest.java index 87b0df9b6d1c..1736d51df19f 100644 --- a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/JobCoordinatorImplTest.java +++ b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/JobCoordinatorImplTest.java @@ -28,6 +28,7 @@ import ca.uhn.fhir.broker.api.IChannelProducer; import ca.uhn.fhir.broker.impl.LinkedBlockingBrokerClient; import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster; +import ca.uhn.fhir.interceptor.api.IInterceptorService; import ca.uhn.fhir.interceptor.executor.InterceptorService; import ca.uhn.fhir.jpa.batch.models.Batch2JobStartResponse; import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService; @@ -87,6 +88,8 @@ public class JobCoordinatorImplTest extends BaseBatch2Test { private JobDefinitionRegistry myJobDefinitionRegistry; @Mock private IJobMaintenanceService myJobMaintenanceService; + @Mock + private IInterceptorService myInterceptorService; private final IHapiTransactionService myTransactionService = new NonTransactionalHapiTransactionService(); @Captor private ArgumentCaptor> myStep1ExecutionDetailsCaptor; @@ -123,7 +126,7 @@ public void beforeEach() { myJobConsumer = myLinkedBlockingBrokerClient.getOrCreateConsumer(BATCH_CHANNEL_NAME, JobWorkNotificationJsonMessage.class, workChannelMessageListener, new ChannelConsumerSettings()); - mySvc = new JobCoordinatorImpl(myJobInstancePersister, myJobDefinitionRegistry, myTransactionService); + mySvc = new JobCoordinatorImpl(myJobInstancePersister, myJobDefinitionRegistry, myTransactionService, myInterceptorService); } @AfterEach diff --git a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/ReductionStepExecutorServiceImplTest.java b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/ReductionStepExecutorServiceImplTest.java index e31e45000c38..dd5f4c778bb6 100644 --- a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/ReductionStepExecutorServiceImplTest.java +++ b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/ReductionStepExecutorServiceImplTest.java @@ -15,6 +15,7 @@ import ca.uhn.fhir.batch2.model.StatusEnum; import ca.uhn.fhir.batch2.model.WorkChunk; import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum; +import ca.uhn.fhir.interceptor.api.IInterceptorService; import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService; import ca.uhn.fhir.jpa.dao.tx.NonTransactionalHapiTransactionService; import org.junit.jupiter.api.BeforeEach; @@ -65,12 +66,14 @@ public class ReductionStepExecutorServiceImplTest { private IJobPersistence myJobPersistence; @Mock private IReductionStepWorker myReductionStepWorker; + @Mock + private IInterceptorService myInterceptorService; private ReductionStepExecutorServiceImpl mySvc; private final JobDefinitionRegistry myJobDefinitionRegistry = new JobDefinitionRegistry(); @BeforeEach public void before() { - mySvc = new ReductionStepExecutorServiceImpl(myJobPersistence, myTransactionService, myJobDefinitionRegistry); + mySvc = new ReductionStepExecutorServiceImpl(myJobPersistence, myTransactionService, myJobDefinitionRegistry, myInterceptorService); } // QUEUED, IN_PROGRESS are supported because of backwards compatibility From bd9c56f6d65468ea1a87a31bb023a33dbd6b6cbe Mon Sep 17 00:00:00 2001 From: Ivan Baisi Date: Tue, 25 Nov 2025 07:33:49 -0800 Subject: [PATCH 2/5] add pointcuts and test post save --- .../ca/uhn/fhir/interceptor/api/Pointcut.java | 40 ++++++++++--------- .../jpa/batch2/JpaJobPersistenceImpl.java | 12 ++++++ .../jpa/batch2/JpaJobPersistenceImplTest.java | 23 +++++++++++ .../test/IInstanceStateTransitions.java | 7 +++- .../fhir/batch2/config/BaseBatch2Config.java | 12 ++++-- .../coordinator/JobCoordinatorImpl.java | 11 +---- .../batch2/coordinator/JobStepExecutor.java | 6 ++- .../coordinator/JobStepExecutorFactory.java | 9 ++++- .../coordinator/ReductionStepDataSink.java | 10 +++-- .../ReductionStepExecutorServiceImpl.java | 23 ++++------- .../WorkChannelMessageListener.java | 7 +++- .../maintenance/JobInstanceProcessor.java | 11 +++-- .../JobMaintenanceServiceImpl.java | 9 ++++- .../JobInstanceProgressCalculator.java | 6 ++- .../progress/JobInstanceStatusUpdater.java | 12 +++++- .../coordinator/JobCoordinatorImplTest.java | 2 +- .../ReductionStepDataSinkTest.java | 7 +++- .../WorkChannelMessageListenerTest.java | 5 ++- .../JobMaintenanceServiceImplTest.java | 6 ++- .../JobInstanceStatusUpdaterTest.java | 6 +++ 20 files changed, 153 insertions(+), 71 deletions(-) diff --git a/hapi-fhir-base/src/main/java/ca/uhn/fhir/interceptor/api/Pointcut.java b/hapi-fhir-base/src/main/java/ca/uhn/fhir/interceptor/api/Pointcut.java index 59c0b0092e80..53e3a99e709d 100644 --- a/hapi-fhir-base/src/main/java/ca/uhn/fhir/interceptor/api/Pointcut.java +++ b/hapi-fhir-base/src/main/java/ca/uhn/fhir/interceptor/api/Pointcut.java @@ -3304,6 +3304,25 @@ public enum Pointcut implements IPointcut { STORAGE_PRESTORAGE_BATCH_JOB_CREATE( void.class, "ca.uhn.fhir.batch2.model.JobInstance", "ca.uhn.fhir.rest.api.server.RequestDetails"), + /** + * Storage Hook: + * Invoked after a batch job is persisted to the database. + *

+ * Hooks will have access to the content of the job created + * and the id set to it. + *

+ * Hooks may accept the following parameters: + *
    + *
  • + * ca.uhn.fhir.batch2.model.JobInstance + *
  • + *
+ *

+ * Hooks should return void. + *

+ */ + STORAGE_POSTSTORAGE_BATCH_JOB_CREATE(void.class, "ca.uhn.fhir.batch2.model.JobInstance"), + /** * CDS Hooks Prefetch Hook: * Invoked before a CDS Hooks prefetch request is made. @@ -3390,35 +3409,18 @@ public enum Pointcut implements IPointcut { BATCH2_CHUNK_PROCESS_FILTER( IInterceptorFilterHook.class, "ca.uhn.fhir.batch2.model.JobInstance", "ca.uhn.fhir.batch2.model.WorkChunk"), - /** - * Batch2 Hook: - *

Invoked before the job instance is started. JobInstance is mutable here.

- *

Parameters:

- *
    - *
  • ca.uhn.fhir.batch2.model.JobInstance - The job instance
  • - *
  • ca.uhn.fhir.rest.api.server.RequestDetails - The request details
  • - *
- *

- * Hooks should return void. - *

- */ - BATCH2_JOB_START( - void.class, "ca.uhn.fhir.batch2.model.JobInstance", "ca.uhn.fhir.rest.api.server.RequestDetails"), - /** * Batch2 Hook: *

Invoked after the job instance is completed. JobInstance is immutable here.

*

Parameters:

*
    *
  • ca.uhn.fhir.batch2.model.JobInstance - The job instance
  • - *
  • ca.uhn.fhir.batch2.api.JobCompletionDetails - The job completions details
  • - *
+ * *

* Hooks should return void. *

*/ - BATCH2_JOB_COMPLETION( - void.class, "ca.uhn.fhir.batch2.model.JobInstance", "ca.uhn.fhir.batch2.api.JobCompletionDetails"), + BATCH2_JOB_COMPLETION(void.class, "ca.uhn.fhir.batch2.model.JobInstance"), /** * Provenance Agents Pointcut: diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImpl.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImpl.java index 4a2dac624734..3dfdea17838e 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImpl.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImpl.java @@ -207,6 +207,10 @@ public String storeNewInstance(JobInstance theInstance) { entity.setUserDataJson(theInstance.getUserDataAsString()); entity = myJobInstanceRepository.save(entity); + JobInstance savedJobInstance = new JobInstance(theInstance); + savedJobInstance.setInstanceId(entity.getId()); + invokePostStorageBatchHooks(savedJobInstance); + return entity.getId(); } @@ -638,6 +642,14 @@ private void invokePreStorageBatchHooks(JobInstance theJobInstance) { } } + private void invokePostStorageBatchHooks(JobInstance theJobInstance) { + if (myInterceptorBroadcaster.hasHooks(Pointcut.STORAGE_POSTSTORAGE_BATCH_JOB_CREATE)) { + HookParams params = new HookParams().add(JobInstance.class, theJobInstance); + + myInterceptorBroadcaster.callHooks(Pointcut.STORAGE_POSTSTORAGE_BATCH_JOB_CREATE, params); + } + } + @Override @Transactional(propagation = Propagation.REQUIRES_NEW) public boolean advanceJobStepAndUpdateChunkStatus( diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImplTest.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImplTest.java index 9cb54f84c57b..623649db8ff4 100644 --- a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImplTest.java +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImplTest.java @@ -1031,6 +1031,29 @@ public void testPrestorageInterceptor_whenModifyingJobInstance_modifiedJobInstan } + @Test + public void testPostStorageInterceptor_hasJobInstanceId_preStorageHasNot(){ + IAnonymousInterceptor poststorageBatchJobCreateInterceptor = (pointcut, params) -> { + JobInstance jobInstance = params.get(JobInstance.class); + assertNotNull(jobInstance.getInstanceId()); + }; + IAnonymousInterceptor prestorageBatchJobCreateInterceptor = (pointcut, params) -> { + JobInstance jobInstance = params.get(JobInstance.class); + assertNull(jobInstance.getInstanceId()); + }; + + try{ + myInterceptorRegistry.registerAnonymousInterceptor(Pointcut.STORAGE_POSTSTORAGE_BATCH_JOB_CREATE, poststorageBatchJobCreateInterceptor); + myInterceptorRegistry.registerAnonymousInterceptor(Pointcut.STORAGE_PRESTORAGE_BATCH_JOB_CREATE, prestorageBatchJobCreateInterceptor); + JobInstance instance = createInstance(); + mySvc.storeNewInstance(instance); + } finally { + myInterceptorRegistry.unregisterInterceptor(poststorageBatchJobCreateInterceptor); + myInterceptorRegistry.unregisterInterceptor(prestorageBatchJobCreateInterceptor); + } + + } + @Test public void testFetchInstanceAndWorkChunkStatus() { // Setup diff --git a/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/IInstanceStateTransitions.java b/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/IInstanceStateTransitions.java index 8de64d5ccc81..9ad83c05ec29 100644 --- a/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/IInstanceStateTransitions.java +++ b/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/IInstanceStateTransitions.java @@ -28,6 +28,7 @@ import ca.uhn.fhir.batch2.model.StatusEnum; import ca.uhn.fhir.batch2.model.WorkChunk; import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum; +import ca.uhn.fhir.interceptor.api.IInterceptorService; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; @@ -38,6 +39,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.mockito.Mockito.mock; public interface IInstanceStateTransitions extends IWorkChunkCommon, WorkChunkTestConstants { Logger ourLog = LoggerFactory.getLogger(IInstanceStateTransitions.class); @@ -86,6 +88,8 @@ default void testCreateInstance_firstChunkDequeued_movesToInProgress() { @ParameterizedTest @EnumSource(StatusEnum.class) default void cancelRequest_cancelsJob_whenNotFinalState(StatusEnum theState) { + //Mocks + IInterceptorService interceptorService = mock(IInterceptorService.class); // given JobInstance cancelledInstance = createInstance(); cancelledInstance.setStatus(theState); @@ -107,7 +111,8 @@ default void cancelRequest_cancelsJob_whenNotFinalState(StatusEnum theState) { instanceId1, new JobChunkProgressAccumulator(), null, - jobDefinitionRegistry + jobDefinitionRegistry, + interceptorService ).process(); }); diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/config/BaseBatch2Config.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/config/BaseBatch2Config.java index 04c2b0f4c9e5..11b823f3ad89 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/config/BaseBatch2Config.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/config/BaseBatch2Config.java @@ -87,7 +87,8 @@ public BatchJobSender batchJobSender() { @Bean public IJobCoordinator batch2JobCoordinator( JobDefinitionRegistry theJobDefinitionRegistry, IHapiTransactionService theTransactionService) { - return new JobCoordinatorImpl(myPersistence, theJobDefinitionRegistry, theTransactionService, myInterceptorService); + return new JobCoordinatorImpl( + myPersistence, theJobDefinitionRegistry, theTransactionService, myInterceptorService); } @Bean @@ -95,7 +96,8 @@ public IReductionStepExecutorService reductionStepExecutorService( IJobPersistence theJobPersistence, IHapiTransactionService theTransactionService, JobDefinitionRegistry theJobDefinitionRegistry) { - return new ReductionStepExecutorServiceImpl(theJobPersistence, theTransactionService, theJobDefinitionRegistry, myInterceptorService); + return new ReductionStepExecutorServiceImpl( + theJobPersistence, theTransactionService, theJobDefinitionRegistry, myInterceptorService); } @Bean @@ -113,7 +115,8 @@ public IJobMaintenanceService batch2JobMaintenanceService( theJobDefinitionRegistry, theBatchJobSender, theExecutor, - theReductionStepExecutorService); + theReductionStepExecutorService, + myInterceptorService); } @Bean @@ -139,7 +142,8 @@ public WorkChannelMessageListener workChannelMessageListener( theExecutorSvc, theJobMaintenanceService, theHapiTransactionService, - theInterceptorBroadcaster); + theInterceptorBroadcaster, + myInterceptorService); } @Bean diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/JobCoordinatorImpl.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/JobCoordinatorImpl.java index cf08f1b911ed..68a432e0c83b 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/JobCoordinatorImpl.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/JobCoordinatorImpl.java @@ -31,9 +31,7 @@ import ca.uhn.fhir.batch2.model.StatusEnum; import ca.uhn.fhir.batch2.models.JobInstanceFetchRequest; import ca.uhn.fhir.i18n.Msg; -import ca.uhn.fhir.interceptor.api.HookParams; import ca.uhn.fhir.interceptor.api.IInterceptorService; -import ca.uhn.fhir.interceptor.api.Pointcut; import ca.uhn.fhir.jpa.batch.models.Batch2JobStartResponse; import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService; import ca.uhn.fhir.rest.api.server.RequestDetails; @@ -92,6 +90,7 @@ public Batch2JobStartResponse startInstance( } Validate.notBlank(theStartRequest.getJobDefinitionId(), "No job definition ID supplied in start request"); + // ALTERNATIVE 1: // if cache - use that first if (theStartRequest.isUseCache()) { FetchJobInstancesRequest request = new FetchJobInstancesRequest( @@ -133,14 +132,6 @@ public Batch2JobStartResponse startInstance( .withPropagation(Propagation.REQUIRES_NEW) .execute(() -> myJobPersistence.onCreateWithFirstChunk(jobDefinition, theStartRequest.getParameters())); - if (myInterceptorService.hasHooks(Pointcut.BATCH2_JOB_START)) { - final HookParams params = new HookParams() - .add(JobDefinition.class, jobDefinition) - .add(RequestDetails.class, theRequestDetails); - myInterceptorService.callHooks( - Pointcut.BATCH2_JOB_START, params); - } - Batch2JobStartResponse response = new Batch2JobStartResponse(); response.setInstanceId(instanceAndFirstChunk.jobInstanceId); return response; diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/JobStepExecutor.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/JobStepExecutor.java index 04c6b81e445c..657020e17e69 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/JobStepExecutor.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/JobStepExecutor.java @@ -28,6 +28,7 @@ import ca.uhn.fhir.batch2.model.WorkChunk; import ca.uhn.fhir.batch2.progress.JobInstanceStatusUpdater; import ca.uhn.fhir.batch2.util.BatchJobOpenTelemetryUtils; +import ca.uhn.fhir.interceptor.api.IInterceptorService; import ca.uhn.fhir.model.api.IModelJson; import ca.uhn.fhir.util.Logs; import io.opentelemetry.instrumentation.annotations.WithSpan; @@ -59,7 +60,8 @@ public class JobStepExecutor theCursor, @Nonnull WorkChunkProcessor theExecutor, @Nonnull IJobMaintenanceService theJobMaintenanceService, - @Nonnull JobDefinitionRegistry theJobDefinitionRegistry) { + @Nonnull JobDefinitionRegistry theJobDefinitionRegistry, + @Nonnull IInterceptorService theInterceptorService) { myJobPersistence = theJobPersistence; myDefinition = theCursor.jobDefinition; myInstance = theInstance; @@ -68,7 +70,7 @@ public class JobStepExecutor @@ -60,6 +64,7 @@ JobStepExecutor newJobStepExecutor( theCursor, myJobStepExecutorSvc, myJobMaintenanceService, - myJobDefinitionRegistry); + myJobDefinitionRegistry, + myInterceptorService); } } diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/ReductionStepDataSink.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/ReductionStepDataSink.java index 9f6324fa809f..8f8bdf7d9fae 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/ReductionStepDataSink.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/ReductionStepDataSink.java @@ -28,6 +28,7 @@ import ca.uhn.fhir.batch2.progress.InstanceProgress; import ca.uhn.fhir.batch2.progress.JobInstanceProgressCalculator; import ca.uhn.fhir.i18n.Msg; +import ca.uhn.fhir.interceptor.api.IInterceptorService; import ca.uhn.fhir.model.api.IModelJson; import ca.uhn.fhir.model.primitive.InstantDt; import ca.uhn.fhir.util.JsonUtil; @@ -43,15 +44,18 @@ public class ReductionStepDataSink theJobWorkCursor, IJobPersistence thePersistence, - JobDefinitionRegistry theJobDefinitionRegistry) { + JobDefinitionRegistry theJobDefinitionRegistry, + IInterceptorService theInterceptorService) { super(theInstanceId, theJobWorkCursor); myJobPersistence = thePersistence; myJobDefinitionRegistry = theJobDefinitionRegistry; + myInterceptorService = theInterceptorService; } @Override @@ -60,8 +64,8 @@ public void accept(WorkChunkData theData) { OT data = theData.getData(); String dataString = JsonUtil.serialize(data, false); JobChunkProgressAccumulator progressAccumulator = new JobChunkProgressAccumulator(); - JobInstanceProgressCalculator myJobInstanceProgressCalculator = - new JobInstanceProgressCalculator(myJobPersistence, progressAccumulator, myJobDefinitionRegistry); + JobInstanceProgressCalculator myJobInstanceProgressCalculator = new JobInstanceProgressCalculator( + myJobPersistence, progressAccumulator, myJobDefinitionRegistry, myInterceptorService); InstanceProgress progress = myJobInstanceProgressCalculator.calculateInstanceProgress(instanceId); boolean changed = myJobPersistence.updateInstance(instanceId, instance -> { diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/ReductionStepExecutorServiceImpl.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/ReductionStepExecutorServiceImpl.java index d9e8f6691226..aff3f4d6209c 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/ReductionStepExecutorServiceImpl.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/ReductionStepExecutorServiceImpl.java @@ -28,7 +28,6 @@ import ca.uhn.fhir.batch2.api.ReductionStepFailureException; import ca.uhn.fhir.batch2.api.StepExecutionDetails; import ca.uhn.fhir.batch2.model.ChunkOutcome; -import ca.uhn.fhir.batch2.model.JobDefinition; import ca.uhn.fhir.batch2.model.JobDefinitionStep; import ca.uhn.fhir.batch2.model.JobInstance; import ca.uhn.fhir.batch2.model.JobWorkCursor; @@ -38,9 +37,7 @@ import ca.uhn.fhir.batch2.progress.JobInstanceStatusUpdater; import ca.uhn.fhir.batch2.util.BatchJobOpenTelemetryUtils; import ca.uhn.fhir.i18n.Msg; -import ca.uhn.fhir.interceptor.api.HookParams; import ca.uhn.fhir.interceptor.api.IInterceptorService; -import ca.uhn.fhir.interceptor.api.Pointcut; import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService; import ca.uhn.fhir.jpa.model.sched.HapiJob; import ca.uhn.fhir.jpa.model.sched.IHasScheduledJobs; @@ -98,8 +95,8 @@ public class ReductionStepExecutorServiceImpl implements IReductionStepExecutorS private final AtomicReference myCurrentlyFinalizingInstanceId = new AtomicReference<>(); private final JobDefinitionRegistry myJobDefinitionRegistry; private final JobInstanceStatusUpdater myJobInstanceStatusUpdater; - private Timer myHeartbeatTimer; private final IInterceptorService myInterceptorService; + private Timer myHeartbeatTimer; /** * Constructor @@ -108,13 +105,12 @@ public ReductionStepExecutorServiceImpl( IJobPersistence theJobPersistence, IHapiTransactionService theTransactionService, JobDefinitionRegistry theJobDefinitionRegistry, - @Nonnull IInterceptorService theInterceptorService) { + IInterceptorService theInterceptorService) { myJobPersistence = theJobPersistence; myTransactionService = theTransactionService; myJobDefinitionRegistry = theJobDefinitionRegistry; - myJobInstanceStatusUpdater = new JobInstanceStatusUpdater(theJobDefinitionRegistry); + myJobInstanceStatusUpdater = new JobInstanceStatusUpdater(theJobDefinitionRegistry, theInterceptorService); myInterceptorService = theInterceptorService; - myReducerExecutor = Executors.newSingleThreadExecutor(new CustomizableThreadFactory("batch2-reducer")); // This is a single thread executor because there are no guarantees that the chunk @@ -304,7 +300,11 @@ private vo response.getFailedChunksIds().size()); ReductionStepDataSink dataSink = new ReductionStepDataSink<>( - instance.getInstanceId(), theJobWorkCursor, myJobPersistence, myJobDefinitionRegistry); + instance.getInstanceId(), + theJobWorkCursor, + myJobPersistence, + myJobDefinitionRegistry, + myInterceptorService); StepExecutionDetails chunkDetails = StepExecutionDetails.createReductionStepDetails(parameters, null, instance); @@ -359,13 +359,6 @@ private vo if (completionHandler != null) { completionHandler.jobComplete(jcd); } - if (myInterceptorService.hasHooks(Pointcut.BATCH2_JOB_COMPLETION)) { - final HookParams params = new HookParams() - .add(JobDefinition.class, theJobWorkCursor.getJobDefinition()) - .add(JobCompletionDetails.class, jcd); - myInterceptorService.callHooks( - Pointcut.BATCH2_JOB_COMPLETION, params); - } } return null; diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/WorkChannelMessageListener.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/WorkChannelMessageListener.java index 250e95129177..365a3a3d2813 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/WorkChannelMessageListener.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/WorkChannelMessageListener.java @@ -30,6 +30,7 @@ import ca.uhn.fhir.broker.api.IMessageListener; import ca.uhn.fhir.interceptor.api.HookParams; import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster; +import ca.uhn.fhir.interceptor.api.IInterceptorService; import ca.uhn.fhir.interceptor.api.Pointcut; import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService; import ca.uhn.fhir.rest.server.messaging.IMessage; @@ -59,7 +60,8 @@ public WorkChannelMessageListener( @Nonnull WorkChunkProcessor theExecutorSvc, @Nonnull IJobMaintenanceService theJobMaintenanceService, IHapiTransactionService theHapiTransactionService, - IInterceptorBroadcaster theInterceptorBroadcaster) { + IInterceptorBroadcaster theInterceptorBroadcaster, + @Nonnull IInterceptorService theInterceptorService) { myJobPersistence = theJobPersistence; myJobDefinitionRegistry = theJobDefinitionRegistry; myHapiTransactionService = theHapiTransactionService; @@ -69,7 +71,8 @@ public WorkChannelMessageListener( theBatchJobSender, theExecutorSvc, theJobMaintenanceService, - theJobDefinitionRegistry); + theJobDefinitionRegistry, + theInterceptorService); } public Class getPayloadType() { diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java index 6b6e28f39661..808ecf9cad57 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java @@ -32,10 +32,12 @@ import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum; import ca.uhn.fhir.batch2.progress.JobInstanceProgressCalculator; import ca.uhn.fhir.batch2.progress.JobInstanceStatusUpdater; +import ca.uhn.fhir.interceptor.api.IInterceptorService; import ca.uhn.fhir.model.api.IModelJson; import ca.uhn.fhir.model.api.PagingIterator; import ca.uhn.fhir.util.Logs; import ca.uhn.fhir.util.StopWatch; +import jakarta.annotation.Nonnull; import org.apache.commons.lang3.time.DateUtils; import org.slf4j.Logger; import org.springframework.data.domain.Page; @@ -69,16 +71,17 @@ public JobInstanceProcessor( String theInstanceId, JobChunkProgressAccumulator theProgressAccumulator, IReductionStepExecutorService theReductionStepExecutorService, - JobDefinitionRegistry theJobDefinitionRegistry) { + JobDefinitionRegistry theJobDefinitionRegistry, + @Nonnull IInterceptorService theInterceptorService) { myJobPersistence = theJobPersistence; myBatchJobSender = theBatchJobSender; myInstanceId = theInstanceId; myProgressAccumulator = theProgressAccumulator; myReductionStepExecutorService = theReductionStepExecutorService; myJobDefinitionegistry = theJobDefinitionRegistry; - myJobInstanceProgressCalculator = - new JobInstanceProgressCalculator(theJobPersistence, theProgressAccumulator, theJobDefinitionRegistry); - myJobInstanceStatusUpdater = new JobInstanceStatusUpdater(theJobDefinitionRegistry); + myJobInstanceProgressCalculator = new JobInstanceProgressCalculator( + theJobPersistence, theProgressAccumulator, theJobDefinitionRegistry, theInterceptorService); + myJobInstanceStatusUpdater = new JobInstanceStatusUpdater(theJobDefinitionRegistry, theInterceptorService); } public void setPurgeThreshold(long thePurgeThreshold) { diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobMaintenanceServiceImpl.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobMaintenanceServiceImpl.java index 03355d4b96ad..be02fac02aff 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobMaintenanceServiceImpl.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobMaintenanceServiceImpl.java @@ -27,6 +27,7 @@ import ca.uhn.fhir.batch2.coordinator.WorkChunkProcessor; import ca.uhn.fhir.batch2.model.JobInstance; import ca.uhn.fhir.i18n.Msg; +import ca.uhn.fhir.interceptor.api.IInterceptorService; import ca.uhn.fhir.jpa.api.config.JpaStorageSettings; import ca.uhn.fhir.jpa.model.sched.HapiJob; import ca.uhn.fhir.jpa.model.sched.IHasScheduledJobs; @@ -92,6 +93,7 @@ public class JobMaintenanceServiceImpl implements IJobMaintenanceService, IHasSc private final BatchJobSender myBatchJobSender; private final WorkChunkProcessor myJobExecutorSvc; private final IReductionStepExecutorService myReductionStepExecutorService; + private IInterceptorService myInterceptorService; private final Semaphore myRunMaintenanceSemaphore = new Semaphore(1); @@ -111,7 +113,8 @@ public JobMaintenanceServiceImpl( @Nonnull JobDefinitionRegistry theJobDefinitionRegistry, @Nonnull BatchJobSender theBatchJobSender, @Nonnull WorkChunkProcessor theExecutor, - @Nonnull IReductionStepExecutorService theReductionStepExecutorService) { + @Nonnull IReductionStepExecutorService theReductionStepExecutorService, + @Nonnull IInterceptorService theInterceptorService) { myStorageSettings = theStorageSettings; myReductionStepExecutorService = theReductionStepExecutorService; Validate.notNull(theSchedulerService); @@ -124,6 +127,7 @@ public JobMaintenanceServiceImpl( myJobDefinitionRegistry = theJobDefinitionRegistry; myBatchJobSender = theBatchJobSender; myJobExecutorSvc = theExecutor; + myInterceptorService = theInterceptorService; } @Override @@ -270,7 +274,8 @@ private JobInstanceProcessor createJobInstanceProcessor( theInstanceId, theAccumulator, myReductionStepExecutorService, - myJobDefinitionRegistry); + myJobDefinitionRegistry, + myInterceptorService); if (myFailedJobLifetimeOverride >= 0) { processor.setPurgeThreshold(myFailedJobLifetimeOverride); } diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/progress/JobInstanceProgressCalculator.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/progress/JobInstanceProgressCalculator.java index ce544b0fb51a..468bcf825922 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/progress/JobInstanceProgressCalculator.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/progress/JobInstanceProgressCalculator.java @@ -26,6 +26,7 @@ import ca.uhn.fhir.batch2.model.JobInstance; import ca.uhn.fhir.batch2.model.WorkChunk; import ca.uhn.fhir.i18n.Msg; +import ca.uhn.fhir.interceptor.api.IInterceptorService; import ca.uhn.fhir.model.api.IModelJson; import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; import ca.uhn.fhir.util.Logs; @@ -46,11 +47,12 @@ public class JobInstanceProgressCalculator { public JobInstanceProgressCalculator( IJobPersistence theJobPersistence, JobChunkProgressAccumulator theProgressAccumulator, - JobDefinitionRegistry theJobDefinitionRegistry) { + JobDefinitionRegistry theJobDefinitionRegistry, + IInterceptorService theInterceptorService) { myJobPersistence = theJobPersistence; myProgressAccumulator = theProgressAccumulator; myJobDefinitionRegistry = theJobDefinitionRegistry; - myJobInstanceStatusUpdater = new JobInstanceStatusUpdater(theJobDefinitionRegistry); + myJobInstanceStatusUpdater = new JobInstanceStatusUpdater(theJobDefinitionRegistry, theInterceptorService); } public void calculateAndStoreInstanceProgress(String theInstanceId) { diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/progress/JobInstanceStatusUpdater.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/progress/JobInstanceStatusUpdater.java index 6096a4a6b45a..2b0efe84c2b7 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/progress/JobInstanceStatusUpdater.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/progress/JobInstanceStatusUpdater.java @@ -25,6 +25,9 @@ import ca.uhn.fhir.batch2.model.JobDefinition; import ca.uhn.fhir.batch2.model.JobInstance; import ca.uhn.fhir.batch2.model.StatusEnum; +import ca.uhn.fhir.interceptor.api.HookParams; +import ca.uhn.fhir.interceptor.api.IInterceptorService; +import ca.uhn.fhir.interceptor.api.Pointcut; import ca.uhn.fhir.model.api.IModelJson; import ca.uhn.fhir.util.Logs; import org.slf4j.Logger; @@ -32,9 +35,12 @@ public class JobInstanceStatusUpdater { private static final Logger ourLog = Logs.getBatchTroubleshootingLog(); private final JobDefinitionRegistry myJobDefinitionRegistry; + private final IInterceptorService myInterceptorService; - public JobInstanceStatusUpdater(JobDefinitionRegistry theJobDefinitionRegistry) { + public JobInstanceStatusUpdater( + JobDefinitionRegistry theJobDefinitionRegistry, IInterceptorService theInterceptorService) { myJobDefinitionRegistry = theJobDefinitionRegistry; + myInterceptorService = theInterceptorService; } /** @@ -100,5 +106,9 @@ private void invokeCompletionHandler( PT jobParameters = theJobInstance.getParameters(theJobDefinition.getParametersType()); JobCompletionDetails completionDetails = new JobCompletionDetails<>(jobParameters, theJobInstance); theJobCompletionHandler.jobComplete(completionDetails); + if (myInterceptorService.hasHooks(Pointcut.BATCH2_JOB_COMPLETION)) { + final HookParams params = new HookParams().add(JobInstance.class, theJobInstance); + myInterceptorService.callHooks(Pointcut.BATCH2_JOB_COMPLETION, params); + } } } diff --git a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/JobCoordinatorImplTest.java b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/JobCoordinatorImplTest.java index 1736d51df19f..d95e43c3d2b7 100644 --- a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/JobCoordinatorImplTest.java +++ b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/JobCoordinatorImplTest.java @@ -122,7 +122,7 @@ public void beforeEach() { // but in this service (so it's a real service here!) WorkChunkProcessor jobStepExecutorSvc = new WorkChunkProcessor(myJobInstancePersister, myBatchJobSender, new NonTransactionalHapiTransactionService()); WorkChannelMessageListener workChannelMessageListener = new WorkChannelMessageListener(myJobInstancePersister, - myJobDefinitionRegistry, myBatchJobSender, jobStepExecutorSvc, myJobMaintenanceService, myTransactionService,myInterceptorBroadcaster); + myJobDefinitionRegistry, myBatchJobSender, jobStepExecutorSvc, myJobMaintenanceService, myTransactionService,myInterceptorBroadcaster, myInterceptorService); myJobConsumer = myLinkedBlockingBrokerClient.getOrCreateConsumer(BATCH_CHANNEL_NAME, JobWorkNotificationJsonMessage.class, workChannelMessageListener, new ChannelConsumerSettings()); diff --git a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/ReductionStepDataSinkTest.java b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/ReductionStepDataSinkTest.java index 3f5f1c0a11b6..ddaac7d2b8ad 100644 --- a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/ReductionStepDataSinkTest.java +++ b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/ReductionStepDataSinkTest.java @@ -7,6 +7,7 @@ import ca.uhn.fhir.batch2.model.JobWorkCursor; import ca.uhn.fhir.batch2.model.StatusEnum; import ca.uhn.fhir.batch2.model.WorkChunkData; +import ca.uhn.fhir.interceptor.api.IInterceptorService; import ca.uhn.fhir.model.api.IModelJson; import ca.uhn.fhir.util.JsonUtil; import ca.uhn.fhir.util.Logs; @@ -67,6 +68,9 @@ public StepOutputData(String theData) { @Mock private Appender myListAppender; + @Mock + private IInterceptorService myInterceptorService; + private Logger ourLogger; @@ -81,7 +85,8 @@ public void init() { INSTANCE_ID, myWorkCursor, myJobPersistence, - myJobDefinitionRegistry); + myJobDefinitionRegistry, + myInterceptorService); ourLogger = (Logger) Logs.getBatchTroubleshootingLog(); ourLogger.addAppender(myListAppender); } diff --git a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/WorkChannelMessageListenerTest.java b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/WorkChannelMessageListenerTest.java index 54cc14d6594e..72d25976ea74 100644 --- a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/WorkChannelMessageListenerTest.java +++ b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/WorkChannelMessageListenerTest.java @@ -7,6 +7,7 @@ import ca.uhn.fhir.batch2.model.JobWorkNotificationJsonMessage; import ca.uhn.fhir.broker.api.IChannelConsumer; import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster; +import ca.uhn.fhir.interceptor.api.IInterceptorService; import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService; import ca.uhn.fhir.jpa.dao.tx.NonTransactionalHapiTransactionService; import ca.uhn.fhir.util.Logs; @@ -40,6 +41,8 @@ class WorkChannelMessageListenerTest extends BaseBatch2Test { private IJobMaintenanceService myJobMaintenanceService; @Mock private IInterceptorBroadcaster myInterceptorBroadcaster; + @Mock + private IInterceptorService myInterceptorService; private final IHapiTransactionService myTransactionService = new NonTransactionalHapiTransactionService(); private WorkChunkProcessor jobStepExecutorSvc; @@ -59,7 +62,7 @@ public void testWorkChannelMessageHandlerLogging_containsJobAndBatchIdInLoggingC ((Logger) Logs.getBatchTroubleshootingLog()).addAppender(myAppender); // When - WorkChannelMessageListener listener = new WorkChannelMessageListener(myJobInstancePersister, myJobDefinitionRegistry, myBatchJobSender, jobStepExecutorSvc, myJobMaintenanceService, myTransactionService, myInterceptorBroadcaster); + WorkChannelMessageListener listener = new WorkChannelMessageListener(myJobInstancePersister, myJobDefinitionRegistry, myBatchJobSender, jobStepExecutorSvc, myJobMaintenanceService, myTransactionService, myInterceptorBroadcaster, myInterceptorService); listener.handleMessage(new JobWorkNotificationJsonMessage(createWorkNotification(STEP_1))); // Then diff --git a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/maintenance/JobMaintenanceServiceImplTest.java b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/maintenance/JobMaintenanceServiceImplTest.java index 3e03ff02829e..46442e5484e1 100644 --- a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/maintenance/JobMaintenanceServiceImplTest.java +++ b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/maintenance/JobMaintenanceServiceImplTest.java @@ -19,6 +19,7 @@ import ca.uhn.fhir.batch2.model.WorkChunkMetadata; import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum; import ca.uhn.fhir.broker.api.IChannelProducer; +import ca.uhn.fhir.interceptor.api.IInterceptorService; import ca.uhn.fhir.jpa.api.config.JpaStorageSettings; import ca.uhn.fhir.jpa.model.sched.ISchedulerService; import ca.uhn.fhir.rest.server.messaging.IMessage; @@ -106,6 +107,8 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test { private ArgumentCaptor> myJobCompletionCaptor; @Mock private IReductionStepExecutorService myReductionStepExecutorService; + @Mock + private IInterceptorService myInterceptorService; @BeforeEach public void beforeEach() { @@ -117,7 +120,8 @@ public void beforeEach() { myJobDefinitionRegistry, batchJobSender, myJobExecutorSvc, - myReductionStepExecutorService + myReductionStepExecutorService, + myInterceptorService ); myStorageSettings.setJobFastTrackingEnabled(true); } diff --git a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/progress/JobInstanceStatusUpdaterTest.java b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/progress/JobInstanceStatusUpdaterTest.java index b0b57ce688b1..dd3c2efc9e84 100644 --- a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/progress/JobInstanceStatusUpdaterTest.java +++ b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/progress/JobInstanceStatusUpdaterTest.java @@ -7,6 +7,7 @@ import ca.uhn.fhir.batch2.model.JobDefinition; import ca.uhn.fhir.batch2.model.JobInstance; import ca.uhn.fhir.batch2.model.StatusEnum; +import ca.uhn.fhir.interceptor.api.IInterceptorService; import ca.uhn.fhir.model.api.IModelJson; import com.fasterxml.jackson.annotation.JsonProperty; import org.junit.jupiter.api.BeforeEach; @@ -15,6 +16,9 @@ import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit.jupiter.SpringExtension; import java.util.concurrent.atomic.AtomicReference; @@ -40,6 +44,8 @@ class JobInstanceStatusUpdaterTest { private JobInstance myInstance; private TestParameters myTestParameters; private AtomicReference myDetails; + @Mock + protected IInterceptorService myInterceptorRegistry; @BeforeEach public void before() { From 57d69eacd66df673cfb73f4aedd7d759b61d223f Mon Sep 17 00:00:00 2001 From: Ivan Baisi Date: Tue, 25 Nov 2025 07:37:04 -0800 Subject: [PATCH 3/5] remove comment --- .../java/ca/uhn/fhir/batch2/coordinator/JobCoordinatorImpl.java | 1 - 1 file changed, 1 deletion(-) diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/JobCoordinatorImpl.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/JobCoordinatorImpl.java index 68a432e0c83b..21c1f89bd99a 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/JobCoordinatorImpl.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/JobCoordinatorImpl.java @@ -90,7 +90,6 @@ public Batch2JobStartResponse startInstance( } Validate.notBlank(theStartRequest.getJobDefinitionId(), "No job definition ID supplied in start request"); - // ALTERNATIVE 1: // if cache - use that first if (theStartRequest.isUseCache()) { FetchJobInstancesRequest request = new FetchJobInstancesRequest( From 22e1d8408c9de3b3c5a188bbb0395265f0e7c288 Mon Sep 17 00:00:00 2001 From: Ivan Baisi Date: Tue, 25 Nov 2025 08:54:14 -0800 Subject: [PATCH 4/5] add BATCH2_JOB_COMPLETION interceptor test --- .../JobInstanceStatusUpdaterTest.java | 29 +++++++++++++++++-- 1 file changed, 27 insertions(+), 2 deletions(-) diff --git a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/progress/JobInstanceStatusUpdaterTest.java b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/progress/JobInstanceStatusUpdaterTest.java index dd3c2efc9e84..fb63833e2e23 100644 --- a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/progress/JobInstanceStatusUpdaterTest.java +++ b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/progress/JobInstanceStatusUpdaterTest.java @@ -7,7 +7,10 @@ import ca.uhn.fhir.batch2.model.JobDefinition; import ca.uhn.fhir.batch2.model.JobInstance; import ca.uhn.fhir.batch2.model.StatusEnum; +import ca.uhn.fhir.interceptor.api.IAnonymousInterceptor; import ca.uhn.fhir.interceptor.api.IInterceptorService; +import ca.uhn.fhir.interceptor.api.Pointcut; +import ca.uhn.fhir.interceptor.executor.InterceptorService; import ca.uhn.fhir.model.api.IModelJson; import com.fasterxml.jackson.annotation.JsonProperty; import org.junit.jupiter.api.BeforeEach; @@ -15,6 +18,7 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.InjectMocks; import org.mockito.Mock; +import org.mockito.Spy; import org.mockito.junit.jupiter.MockitoExtension; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.test.context.ContextConfiguration; @@ -23,6 +27,7 @@ import java.util.concurrent.atomic.AtomicReference; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.when; @@ -44,8 +49,8 @@ class JobInstanceStatusUpdaterTest { private JobInstance myInstance; private TestParameters myTestParameters; private AtomicReference myDetails; - @Mock - protected IInterceptorService myInterceptorRegistry; + @Spy + private IInterceptorService myInterceptorRegistry = new InterceptorService("testIS"); @BeforeEach public void before() { @@ -72,6 +77,26 @@ public void testCompletionHandler() { assertCompleteCallbackCalled(); } + @Test + public void testCompletionHandler_withCompleteInterceptor() { + IAnonymousInterceptor postBatchJobCcompletionInterceptor = (pointcut, params) -> { + JobInstance jobInstance = params.get(JobInstance.class); + assertEquals("test-instance-id", jobInstance.getInstanceId()); + }; + try { + // setup + myInterceptorRegistry.registerAnonymousInterceptor(Pointcut.BATCH2_JOB_COMPLETION, postBatchJobCcompletionInterceptor); + setupCompleteCallback(); + + // execute + mySvc.updateInstanceStatus(myInstance, StatusEnum.COMPLETED); + + assertCompleteCallbackCalled(); + } finally { + myInterceptorRegistry.unregisterInterceptor(postBatchJobCcompletionInterceptor); + } + } + @Test public void testCompletionHandler_ERROR_to_COMPLETED() { setupCompleteCallback(); From ffc9dc58c69cff556dcb254c2efc505af93fc74c Mon Sep 17 00:00:00 2001 From: Ivan Baisi Date: Wed, 26 Nov 2025 09:46:40 -0800 Subject: [PATCH 5/5] add interceptor in IT test to check wiring + changelog --- ...tend-transaction-log-support-to-batch-jobs | 8 +++ .../fhir/jpa/batch2/Batch2CoordinatorIT.java | 68 ++++++++++++------- 2 files changed, 52 insertions(+), 24 deletions(-) create mode 100644 hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/8_8_0/8044-extend-transaction-log-support-to-batch-jobs diff --git a/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/8_8_0/8044-extend-transaction-log-support-to-batch-jobs b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/8_8_0/8044-extend-transaction-log-support-to-batch-jobs new file mode 100644 index 000000000000..fccb1edb50c3 --- /dev/null +++ b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/8_8_0/8044-extend-transaction-log-support-to-batch-jobs @@ -0,0 +1,8 @@ +--- +type: add +issue: 8044 +title: "Add STORAGE_POSTSTORAGE_BATCH_JOB_CREATE and BATCH2_JOB_COMPLETION pointcuts in batch 2 jobs. +STORAGE_POSTSTORAGE_BATCH_JOB_CREATE is invoked after a batch job is persisted to the database and receives a +ca.uhn.fhir.batch2.model.JobInstance that includes the job instance id as stored in the db +BATCH2_JOB_COMPLETION is invoked after the job instance is completed. JobInstance is immutable here and receives a +ca.uhn.fhir.batch2.model.JobInstance" diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/batch2/Batch2CoordinatorIT.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/batch2/Batch2CoordinatorIT.java index 071948a3b90b..3896a652f0a1 100644 --- a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/batch2/Batch2CoordinatorIT.java +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/batch2/Batch2CoordinatorIT.java @@ -28,6 +28,8 @@ import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum; import ca.uhn.fhir.batch2.models.JobInstanceFetchRequest; import ca.uhn.fhir.broker.api.ChannelConsumerSettings; +import ca.uhn.fhir.interceptor.api.IAnonymousInterceptor; +import ca.uhn.fhir.interceptor.api.Pointcut; import ca.uhn.fhir.jpa.batch.models.Batch2JobStartResponse; import ca.uhn.fhir.jpa.entity.Batch2WorkChunkEntity; import ca.uhn.fhir.jpa.subscription.channel.impl.LinkedBlockingChannel; @@ -638,38 +640,56 @@ public void reductionStepRun(StepExecutionDetails { + JobInstance jobInstance = params.get(JobInstance.class); + assertNotNull(jobInstance.getInstanceId()); + }; - // test - JobInstanceStartRequest request = buildRequest(jobId); - myFirstStepLatch.setExpectedCount(1); - Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(new SystemRequestDetails(), request); + IAnonymousInterceptor postBatchJobCcompletionInterceptor = (pointcut, params) -> { + JobInstance jobInstance = params.get(JobInstance.class); + assertNotNull(jobInstance.getInstanceId()); + }; - String instanceId = startResponse.getInstanceId(); - myBatch2JobHelper.runMaintenancePass(); - myFirstStepLatch.awaitExpected(); - assertNotNull(instanceId); + try { - myBatch2JobHelper.awaitGatedStepId(SECOND_STEP_ID, instanceId); + myInterceptorRegistry.registerAnonymousInterceptor(Pointcut.STORAGE_POSTSTORAGE_BATCH_JOB_CREATE, poststorageBatchJobCreateInterceptor); + myInterceptorRegistry.registerAnonymousInterceptor(Pointcut.BATCH2_JOB_COMPLETION, postBatchJobCcompletionInterceptor); - // wait for last step to finish - ourLog.info("Setting last step latch"); - myLastStepLatch.setExpectedCount(1); + // test + JobInstanceStartRequest request = buildRequest(jobId); + myFirstStepLatch.setExpectedCount(1); + Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(new SystemRequestDetails(), request); - // waiting - myBatch2JobHelper.awaitJobCompletion(instanceId); - ourLog.info("awaited the last step"); - myLastStepLatch.awaitExpected(); + String instanceId = startResponse.getInstanceId(); + myBatch2JobHelper.runMaintenancePass(); + myFirstStepLatch.awaitExpected(); + assertNotNull(instanceId); - // verify - Optional instanceOp = myJobPersistence.fetchInstance(instanceId); - assertTrue(instanceOp.isPresent()); - JobInstance jobInstance = instanceOp.get(); + myBatch2JobHelper.awaitGatedStepId(SECOND_STEP_ID, instanceId); - // ensure our completion handler fired - assertTrue(completionBool.get()); + // wait for last step to finish + ourLog.info("Setting last step latch"); + myLastStepLatch.setExpectedCount(1); - assertEquals(StatusEnum.COMPLETED, jobInstance.getStatus()); - assertEquals(1.0, jobInstance.getProgress()); + // waiting + myBatch2JobHelper.awaitJobCompletion(instanceId); + ourLog.info("awaited the last step"); + myLastStepLatch.awaitExpected(); + + // verify + Optional instanceOp = myJobPersistence.fetchInstance(instanceId); + assertTrue(instanceOp.isPresent()); + JobInstance jobInstance = instanceOp.get(); + + // ensure our completion handler fired + assertTrue(completionBool.get()); + + assertEquals(StatusEnum.COMPLETED, jobInstance.getStatus()); + assertEquals(1.0, jobInstance.getProgress()); + } finally { + myInterceptorRegistry.unregisterInterceptor(poststorageBatchJobCreateInterceptor); + myInterceptorRegistry.unregisterInterceptor(postBatchJobCcompletionInterceptor); + } } @ParameterizedTest