Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3304,6 +3304,25 @@ public enum Pointcut implements IPointcut {
STORAGE_PRESTORAGE_BATCH_JOB_CREATE(
void.class, "ca.uhn.fhir.batch2.model.JobInstance", "ca.uhn.fhir.rest.api.server.RequestDetails"),

/**
* <b>Storage Hook:</b>
* Invoked after a batch job is persisted to the database.
* <p>
* Hooks will have access to the content of the job created
* and the id set to it.
* </p>
* Hooks may accept the following parameters:
* <ul>
* <li>
* ca.uhn.fhir.batch2.model.JobInstance
* </li>
* </ul>
* <p>
* Hooks should return <code>void</code>.
* </p>
*/
STORAGE_POSTSTORAGE_BATCH_JOB_CREATE(void.class, "ca.uhn.fhir.batch2.model.JobInstance"),

/**
* <b>CDS Hooks Prefetch Hook:</b>
* Invoked before a CDS Hooks prefetch request is made.
Expand Down Expand Up @@ -3390,6 +3409,19 @@ public enum Pointcut implements IPointcut {
BATCH2_CHUNK_PROCESS_FILTER(
IInterceptorFilterHook.class, "ca.uhn.fhir.batch2.model.JobInstance", "ca.uhn.fhir.batch2.model.WorkChunk"),

/**
* <b>Batch2 Hook:</b>
* <p>Invoked after the job instance is completed. JobInstance is immutable here.</p>
* <p>Parameters:</p>
* <ul>
* <li>ca.uhn.fhir.batch2.model.JobInstance - The job instance</li>
* </ul>
* <p>
* Hooks should return <code>void</code>.
* </p>
*/
BATCH2_JOB_COMPLETION(void.class, "ca.uhn.fhir.batch2.model.JobInstance"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: this should match our other storage names.

Suggested change
BATCH2_JOB_COMPLETION(void.class, "ca.uhn.fhir.batch2.model.JobInstance"),
STORAGE_POSTCOMPLETE_BATCH_JOB(void.class, "ca.uhn.fhir.batch2.model.JobInstance"),


/**
* <b>Provenance Agents Pointcut:</b>
* This is a pointcut to retrieve data for populating the agent element of a Provenance resource that needs to be created
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
type: add
issue: 8044
title: "Add STORAGE_POSTSTORAGE_BATCH_JOB_CREATE and BATCH2_JOB_COMPLETION pointcuts in batch 2 jobs.
STORAGE_POSTSTORAGE_BATCH_JOB_CREATE is invoked after a batch job is persisted to the database and receives a
ca.uhn.fhir.batch2.model.JobInstance that includes the job instance id as stored in the db
BATCH2_JOB_COMPLETION is invoked after the job instance is completed. JobInstance is immutable here and receives a
ca.uhn.fhir.batch2.model.JobInstance"
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,10 @@ public String storeNewInstance(JobInstance theInstance) {
entity.setUserDataJson(theInstance.getUserDataAsString());

entity = myJobInstanceRepository.save(entity);
JobInstance savedJobInstance = new JobInstance(theInstance);
savedJobInstance.setInstanceId(entity.getId());
invokePostStorageBatchHooks(savedJobInstance);

return entity.getId();
}

Expand Down Expand Up @@ -638,6 +642,14 @@ private void invokePreStorageBatchHooks(JobInstance theJobInstance) {
}
}

private void invokePostStorageBatchHooks(JobInstance theJobInstance) {
if (myInterceptorBroadcaster.hasHooks(Pointcut.STORAGE_POSTSTORAGE_BATCH_JOB_CREATE)) {
HookParams params = new HookParams().add(JobInstance.class, theJobInstance);

myInterceptorBroadcaster.callHooks(Pointcut.STORAGE_POSTSTORAGE_BATCH_JOB_CREATE, params);
}
}

@Override
@Transactional(propagation = Propagation.REQUIRES_NEW)
public boolean advanceJobStepAndUpdateChunkStatus(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
import ca.uhn.fhir.batch2.models.JobInstanceFetchRequest;
import ca.uhn.fhir.broker.api.ChannelConsumerSettings;
import ca.uhn.fhir.interceptor.api.IAnonymousInterceptor;
import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.jpa.batch.models.Batch2JobStartResponse;
import ca.uhn.fhir.jpa.entity.Batch2WorkChunkEntity;
import ca.uhn.fhir.jpa.subscription.channel.impl.LinkedBlockingChannel;
Expand Down Expand Up @@ -638,38 +640,56 @@ public void reductionStepRun(StepExecutionDetails<TestJobParameters, SecondStepO
callLatch(myLastStepLatch, theStepExecutionDetails);
}
});
IAnonymousInterceptor poststorageBatchJobCreateInterceptor = (pointcut, params) -> {
JobInstance jobInstance = params.get(JobInstance.class);
assertNotNull(jobInstance.getInstanceId());
};

// test
JobInstanceStartRequest request = buildRequest(jobId);
myFirstStepLatch.setExpectedCount(1);
Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(new SystemRequestDetails(), request);
IAnonymousInterceptor postBatchJobCcompletionInterceptor = (pointcut, params) -> {
JobInstance jobInstance = params.get(JobInstance.class);
assertNotNull(jobInstance.getInstanceId());
};

String instanceId = startResponse.getInstanceId();
myBatch2JobHelper.runMaintenancePass();
myFirstStepLatch.awaitExpected();
assertNotNull(instanceId);
try {

myBatch2JobHelper.awaitGatedStepId(SECOND_STEP_ID, instanceId);
myInterceptorRegistry.registerAnonymousInterceptor(Pointcut.STORAGE_POSTSTORAGE_BATCH_JOB_CREATE, poststorageBatchJobCreateInterceptor);
myInterceptorRegistry.registerAnonymousInterceptor(Pointcut.BATCH2_JOB_COMPLETION, postBatchJobCcompletionInterceptor);

// wait for last step to finish
ourLog.info("Setting last step latch");
myLastStepLatch.setExpectedCount(1);
// test
JobInstanceStartRequest request = buildRequest(jobId);
myFirstStepLatch.setExpectedCount(1);
Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(new SystemRequestDetails(), request);

// waiting
myBatch2JobHelper.awaitJobCompletion(instanceId);
ourLog.info("awaited the last step");
myLastStepLatch.awaitExpected();
String instanceId = startResponse.getInstanceId();
myBatch2JobHelper.runMaintenancePass();
myFirstStepLatch.awaitExpected();
assertNotNull(instanceId);

// verify
Optional<JobInstance> instanceOp = myJobPersistence.fetchInstance(instanceId);
assertTrue(instanceOp.isPresent());
JobInstance jobInstance = instanceOp.get();
myBatch2JobHelper.awaitGatedStepId(SECOND_STEP_ID, instanceId);

// ensure our completion handler fired
assertTrue(completionBool.get());
// wait for last step to finish
ourLog.info("Setting last step latch");
myLastStepLatch.setExpectedCount(1);

assertEquals(StatusEnum.COMPLETED, jobInstance.getStatus());
assertEquals(1.0, jobInstance.getProgress());
// waiting
myBatch2JobHelper.awaitJobCompletion(instanceId);
ourLog.info("awaited the last step");
myLastStepLatch.awaitExpected();

// verify
Optional<JobInstance> instanceOp = myJobPersistence.fetchInstance(instanceId);
assertTrue(instanceOp.isPresent());
JobInstance jobInstance = instanceOp.get();

// ensure our completion handler fired
assertTrue(completionBool.get());

assertEquals(StatusEnum.COMPLETED, jobInstance.getStatus());
assertEquals(1.0, jobInstance.getProgress());
} finally {
myInterceptorRegistry.unregisterInterceptor(poststorageBatchJobCreateInterceptor);
myInterceptorRegistry.unregisterInterceptor(postBatchJobCcompletionInterceptor);
}
}

@ParameterizedTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1031,6 +1031,29 @@ public void testPrestorageInterceptor_whenModifyingJobInstance_modifiedJobInstan

}

@Test
public void testPostStorageInterceptor_hasJobInstanceId_preStorageHasNot(){
IAnonymousInterceptor poststorageBatchJobCreateInterceptor = (pointcut, params) -> {
JobInstance jobInstance = params.get(JobInstance.class);
assertNotNull(jobInstance.getInstanceId());
};
IAnonymousInterceptor prestorageBatchJobCreateInterceptor = (pointcut, params) -> {
JobInstance jobInstance = params.get(JobInstance.class);
assertNull(jobInstance.getInstanceId());
};

try{
myInterceptorRegistry.registerAnonymousInterceptor(Pointcut.STORAGE_POSTSTORAGE_BATCH_JOB_CREATE, poststorageBatchJobCreateInterceptor);
myInterceptorRegistry.registerAnonymousInterceptor(Pointcut.STORAGE_PRESTORAGE_BATCH_JOB_CREATE, prestorageBatchJobCreateInterceptor);
JobInstance instance = createInstance();
mySvc.storeNewInstance(instance);
} finally {
myInterceptorRegistry.unregisterInterceptor(poststorageBatchJobCreateInterceptor);
myInterceptorRegistry.unregisterInterceptor(prestorageBatchJobCreateInterceptor);
}

}

@Test
public void testFetchInstanceAndWorkChunkStatus() {
// Setup
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
import ca.uhn.fhir.interceptor.api.IInterceptorService;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
Expand All @@ -38,6 +39,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.mockito.Mockito.mock;

public interface IInstanceStateTransitions extends IWorkChunkCommon, WorkChunkTestConstants {
Logger ourLog = LoggerFactory.getLogger(IInstanceStateTransitions.class);
Expand Down Expand Up @@ -86,6 +88,8 @@ default void testCreateInstance_firstChunkDequeued_movesToInProgress() {
@ParameterizedTest
@EnumSource(StatusEnum.class)
default void cancelRequest_cancelsJob_whenNotFinalState(StatusEnum theState) {
//Mocks
IInterceptorService interceptorService = mock(IInterceptorService.class);
// given
JobInstance cancelledInstance = createInstance();
cancelledInstance.setStatus(theState);
Expand All @@ -107,7 +111,8 @@ default void cancelRequest_cancelsJob_whenNotFinalState(StatusEnum theState) {
instanceId1,
new JobChunkProgressAccumulator(),
null,
jobDefinitionRegistry
jobDefinitionRegistry,
interceptorService
).process();
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import ca.uhn.fhir.broker.api.IChannelProducer;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
import ca.uhn.fhir.interceptor.api.IInterceptorService;
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
Expand All @@ -65,6 +66,9 @@ public abstract class BaseBatch2Config {
@Autowired
IHapiTransactionService myHapiTransactionService;

@Autowired
private IInterceptorService myInterceptorService;

@Bean
public JobDefinitionRegistry batch2JobDefinitionRegistry() {
return new JobDefinitionRegistry();
Expand All @@ -83,15 +87,17 @@ public BatchJobSender batchJobSender() {
@Bean
public IJobCoordinator batch2JobCoordinator(
JobDefinitionRegistry theJobDefinitionRegistry, IHapiTransactionService theTransactionService) {
return new JobCoordinatorImpl(myPersistence, theJobDefinitionRegistry, theTransactionService);
return new JobCoordinatorImpl(
myPersistence, theJobDefinitionRegistry, theTransactionService, myInterceptorService);
}

@Bean
public IReductionStepExecutorService reductionStepExecutorService(
IJobPersistence theJobPersistence,
IHapiTransactionService theTransactionService,
JobDefinitionRegistry theJobDefinitionRegistry) {
return new ReductionStepExecutorServiceImpl(theJobPersistence, theTransactionService, theJobDefinitionRegistry);
return new ReductionStepExecutorServiceImpl(
theJobPersistence, theTransactionService, theJobDefinitionRegistry, myInterceptorService);
}

@Bean
Expand All @@ -109,7 +115,8 @@ public IJobMaintenanceService batch2JobMaintenanceService(
theJobDefinitionRegistry,
theBatchJobSender,
theExecutor,
theReductionStepExecutorService);
theReductionStepExecutorService,
myInterceptorService);
}

@Bean
Expand All @@ -135,7 +142,8 @@ public WorkChannelMessageListener workChannelMessageListener(
theExecutorSvc,
theJobMaintenanceService,
theHapiTransactionService,
theInterceptorBroadcaster);
theInterceptorBroadcaster,
myInterceptorService);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.models.JobInstanceFetchRequest;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.interceptor.api.IInterceptorService;
import ca.uhn.fhir.jpa.batch.models.Batch2JobStartResponse;
import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
import ca.uhn.fhir.rest.api.server.RequestDetails;
Expand Down Expand Up @@ -59,14 +60,16 @@ public class JobCoordinatorImpl implements IJobCoordinator {
private final JobQuerySvc myJobQuerySvc;
private final JobParameterJsonValidator myJobParameterJsonValidator;
private final IHapiTransactionService myTransactionService;
private final IInterceptorService myInterceptorService;

/**
* Constructor
*/
public JobCoordinatorImpl(
@Nonnull IJobPersistence theJobPersistence,
@Nonnull JobDefinitionRegistry theJobDefinitionRegistry,
@Nonnull IHapiTransactionService theTransactionService) {
@Nonnull IHapiTransactionService theTransactionService,
@Nonnull IInterceptorService theInterceptorService) {
Validate.notNull(theJobPersistence);

myJobPersistence = theJobPersistence;
Expand All @@ -75,6 +78,7 @@ public JobCoordinatorImpl(
myJobQuerySvc = new JobQuerySvc(theJobPersistence, theJobDefinitionRegistry);
myJobParameterJsonValidator = new JobParameterJsonValidator();
myTransactionService = theTransactionService;
myInterceptorService = theInterceptorService;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.batch2.progress.JobInstanceStatusUpdater;
import ca.uhn.fhir.batch2.util.BatchJobOpenTelemetryUtils;
import ca.uhn.fhir.interceptor.api.IInterceptorService;
import ca.uhn.fhir.model.api.IModelJson;
import ca.uhn.fhir.util.Logs;
import io.opentelemetry.instrumentation.annotations.WithSpan;
Expand Down Expand Up @@ -59,7 +60,8 @@ public class JobStepExecutor<PT extends IModelJson, IT extends IModelJson, OT ex
@Nonnull JobWorkCursor<PT, IT, OT> theCursor,
@Nonnull WorkChunkProcessor theExecutor,
@Nonnull IJobMaintenanceService theJobMaintenanceService,
@Nonnull JobDefinitionRegistry theJobDefinitionRegistry) {
@Nonnull JobDefinitionRegistry theJobDefinitionRegistry,
@Nonnull IInterceptorService theInterceptorService) {
myJobPersistence = theJobPersistence;
myDefinition = theCursor.jobDefinition;
myInstance = theInstance;
Expand All @@ -68,7 +70,7 @@ public class JobStepExecutor<PT extends IModelJson, IT extends IModelJson, OT ex
myCursor = theCursor;
myJobExecutorSvc = theExecutor;
myJobMaintenanceService = theJobMaintenanceService;
myJobInstanceStatusUpdater = new JobInstanceStatusUpdater(theJobDefinitionRegistry);
myJobInstanceStatusUpdater = new JobInstanceStatusUpdater(theJobDefinitionRegistry, theInterceptorService);
}

@WithSpan(JOB_STEP_EXECUTION_SPAN_NAME)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobWorkCursor;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.interceptor.api.IInterceptorService;
import ca.uhn.fhir.model.api.IModelJson;
import jakarta.annotation.Nonnull;

Expand All @@ -34,18 +35,21 @@ public class JobStepExecutorFactory {
private final WorkChunkProcessor myJobStepExecutorSvc;
private final IJobMaintenanceService myJobMaintenanceService;
private final JobDefinitionRegistry myJobDefinitionRegistry;
private final IInterceptorService myInterceptorService;

public JobStepExecutorFactory(
@Nonnull IJobPersistence theJobPersistence,
@Nonnull BatchJobSender theBatchJobSender,
@Nonnull WorkChunkProcessor theExecutorSvc,
@Nonnull IJobMaintenanceService theJobMaintenanceService,
@Nonnull JobDefinitionRegistry theJobDefinitionRegistry) {
@Nonnull JobDefinitionRegistry theJobDefinitionRegistry,
@Nonnull IInterceptorService theInterceptorService) {
myJobPersistence = theJobPersistence;
myBatchJobSender = theBatchJobSender;
myJobStepExecutorSvc = theExecutorSvc;
myJobMaintenanceService = theJobMaintenanceService;
myJobDefinitionRegistry = theJobDefinitionRegistry;
myInterceptorService = theInterceptorService;
}

public <PT extends IModelJson, IT extends IModelJson, OT extends IModelJson>
Expand All @@ -60,6 +64,7 @@ JobStepExecutor<PT, IT, OT> newJobStepExecutor(
theCursor,
myJobStepExecutorSvc,
myJobMaintenanceService,
myJobDefinitionRegistry);
myJobDefinitionRegistry,
myInterceptorService);
}
}
Loading
Loading