Skip to content

Commit 89faa41

Browse files
[dpc-4710] Shuts down dpc-queue after AggregationEngine (#2708)
## 🎫 Ticket https://jira.cms.gov/browse/DPC-4710 Bugfix for #2706 ## 🛠 Changes Updated the order that `DPCQueueHibernateModule` and `AggregationAppModule` are added to the `DPCAggregationService`. ## ℹ️ Context DropWizard shuts these modules down in the opposite order they are added, and we don't want the queue to shutdown until after the aggregation engine has finished processing or it won't be able to update its last batch. This wasn't caught in initial testing for two reasons: 1. Our tests used the `MemoryBatchQueue` which doesn't connect to an external DB. 2. Our code was ignoring the errors when it tried to pause a batch instead of logging them. ## 🧪 Validation 1. Go into your local queue DB and pick out your most recently completed batch. 2. Update the batch's `status` to 0 (_queued_), its `aggregator_id` to `null`, and its patient to `9S99EU8XY92`. (this patient tells MockBfdClient to hang forever, simulating a long running patient) 3. Start up dpc-aggregation and it will pick up the batch and set its status to 1 (_running_). 4. Stop dpc-aggregation. If you do this on the current main branch, the status of the batch will still be 1 (_running_). If you recompile from this branch and go through the same process the batch will be reset to 0 (_queued_), which is what we want.
1 parent 3185945 commit 89faa41

File tree

5 files changed

+95
-11
lines changed

5 files changed

+95
-11
lines changed

dpc-aggregation/src/main/java/gov/cms/dpc/aggregation/DPCAggregationService.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,12 @@ public void initialize(Bootstrap<DPCAggregationConfiguration> bootstrap) {
4242
new SubstitutingSourceProvider(bootstrap.getConfigurationSourceProvider(), substitutor);
4343
bootstrap.setConfigurationSourceProvider(provider);
4444

45+
// AggregationAppModule needs to be added after DPCQueueHibernateModule, because DropWizard shuts these down
46+
// in reverse order, and we don't want the queue DB to be disconnected until after the aggregation engine stops
47+
// running.
4548
GuiceBundle guiceBundle = GuiceBundle.builder()
46-
.modules(new AggregationAppModule(),
47-
new DPCQueueHibernateModule<>(queueHibernateBundle),
49+
.modules(new DPCQueueHibernateModule<>(queueHibernateBundle),
50+
new AggregationAppModule(),
4851
new DPCHibernateModule<>(hibernateBundle),
4952
new JobQueueModule<DPCAggregationConfiguration>(),
5053
new BlueButtonClientModule<DPCAggregationConfiguration>())

dpc-aggregation/src/main/java/gov/cms/dpc/aggregation/engine/AggregationEngine.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,10 @@ public void stop() {
9696

9797
// If a batch is currently running, mark it paused.
9898
Optional<JobQueueBatch> optionalBatch = this.currentBatch.get();
99-
optionalBatch.ifPresent(jobQueueBatch -> this.queue.pauseBatch(jobQueueBatch, aggregatorID));
99+
optionalBatch.ifPresent(jobQueueBatch -> {
100+
logger.info("Pausing batch: {}", jobQueueBatch.getBatchID());
101+
this.queue.pauseBatch(jobQueueBatch, aggregatorID);
102+
});
100103
}
101104

102105
public boolean isRunning() {

dpc-aggregation/src/test/java/gov/cms/dpc/aggregation/AggregationServiceTest.java

Lines changed: 54 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,32 @@
11
package gov.cms.dpc.aggregation;
22

3+
import com.codahale.metrics.MetricRegistry;
34
import com.codahale.metrics.health.HealthCheckRegistry;
5+
import gov.cms.dpc.bluebutton.client.MockBlueButtonClient;
6+
import gov.cms.dpc.common.hibernate.queue.DPCQueueManagedSessionFactory;
7+
import gov.cms.dpc.common.utils.NPIUtil;
8+
import gov.cms.dpc.fhir.DPCResourceType;
9+
import gov.cms.dpc.queue.DistributedBatchQueue;
10+
import gov.cms.dpc.queue.JobStatus;
11+
import gov.cms.dpc.queue.models.JobQueueBatch;
412
import gov.cms.dpc.testing.BufferedLoggerHandler;
513
import gov.cms.dpc.testing.IntegrationTest;
614
import io.dropwizard.testing.ConfigOverride;
715
import io.dropwizard.testing.DropwizardTestSupport;
8-
import org.junit.jupiter.api.AfterAll;
9-
import org.junit.jupiter.api.BeforeAll;
16+
import org.hibernate.SessionFactory;
17+
import org.hibernate.cfg.Configuration;
18+
import org.junit.jupiter.api.AfterEach;
19+
import org.junit.jupiter.api.BeforeEach;
1020
import org.junit.jupiter.api.Test;
1121
import org.junit.jupiter.api.extension.ExtendWith;
1222

23+
import java.util.Collections;
24+
import java.util.List;
1325
import java.util.SortedSet;
26+
import java.util.UUID;
1427

15-
import static org.junit.jupiter.api.Assertions.assertAll;
16-
import static org.junit.jupiter.api.Assertions.assertTrue;
28+
import static org.awaitility.Awaitility.await;
29+
import static org.junit.jupiter.api.Assertions.*;
1730

1831
@IntegrationTest
1932
@ExtendWith(BufferedLoggerHandler.class)
@@ -25,13 +38,19 @@ public class AggregationServiceTest {
2538
ConfigOverride.config("server.applicationConnectors[0].port", "7777"),
2639
ConfigOverride.config("server.adminConnectors[0].port", "8888"));
2740

28-
@BeforeAll
29-
static void start() throws Exception{
41+
private DistributedBatchQueue queue;
42+
43+
@BeforeEach
44+
void start() throws Exception{
45+
final Configuration conf = new Configuration();
46+
SessionFactory sessionFactory = conf.configure().buildSessionFactory();
47+
queue = new DistributedBatchQueue(new DPCQueueManagedSessionFactory(sessionFactory), 100, new MetricRegistry());
48+
3049
APPLICATION.before();
3150
}
3251

33-
@AfterAll
34-
static void stop() {
52+
@AfterEach
53+
void stop() {
3554
APPLICATION.after();
3655
}
3756

@@ -48,4 +67,31 @@ void testHealthChecks() {
4867
// Everything should be true
4968
checks.runHealthChecks().forEach((key, value) -> assertTrue(value.isHealthy(), String.format("Healthcheck: %s is not ok.", key)));
5069
}
70+
71+
@Test
72+
void testStoppingEngineMidBatch() throws InterruptedException {
73+
// Create a batch that will hang forever
74+
final var orgID = UUID.randomUUID();
75+
final List<String> mbis = List.of(MockBlueButtonClient.TEST_PATIENT_TIME_OUT);
76+
77+
final var jobID = queue.createJob(
78+
orgID,
79+
NPIUtil.generateNPI(),
80+
NPIUtil.generateNPI(),
81+
mbis,
82+
Collections.singletonList(DPCResourceType.Patient),
83+
null,
84+
MockBlueButtonClient.BFD_TRANSACTION_TIME,
85+
null, null, true, false);
86+
87+
// Wait for dpc-aggregation to pick up the batch, set status to RUNNING and hang on getting the patient from BFD
88+
await().until(() -> {
89+
JobQueueBatch batch = queue.getJobBatches(jobID).get(0);
90+
return batch.getStatus() == JobStatus.RUNNING;
91+
});
92+
93+
// Stop aggregation and make sure it pauses the batch
94+
APPLICATION.after();
95+
assertEquals(JobStatus.QUEUED, queue.getJobBatches(jobID).get(0).getStatus());
96+
}
5197
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!DOCTYPE hibernate-configuration PUBLIC
3+
"-//Hibernate/Hibernate Configuration DTD 3.0//EN"
4+
"http://www.hibernate.org/dtd/hibernate-configuration-3.0.dtd">
5+
6+
<hibernate-configuration>
7+
<session-factory>
8+
9+
<!-- Connection settings -->
10+
<property name="hibernate.connection.driver_class">org.postgresql.Driver</property>
11+
<property name="hibernate.connection.url">jdbc:postgresql://localhost:5432/dpc_queue</property>
12+
<property name="hibernate.connection.username">postgres</property>
13+
<property name="hibernate.connection.password">dpc-safe</property>
14+
15+
<!-- SQL dialect -->
16+
<property name="hibernate.dialect">org.hibernate.dialect.PostgreSQLDialect</property>
17+
18+
<!-- Print executed SQL to stdout -->
19+
<property name="show_sql">false</property>
20+
21+
<!-- Update database on startup -->
22+
<property name="hibernate.hbm2ddl.auto">validate</property>
23+
<property name="hibernate.connection.autocommit">true</property>
24+
25+
<!-- Annotated entity classes -->
26+
<mapping class="gov.cms.dpc.queue.models.JobQueueBatch"/>
27+
<mapping class="gov.cms.dpc.queue.models.JobQueueBatchFile"/>
28+
29+
</session-factory>
30+
</hibernate-configuration>

dpc-queue/src/main/java/gov/cms/dpc/queue/DistributedBatchQueue.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,8 @@ public void pauseBatch(JobQueueBatch job, UUID aggregatorID) {
239239
} finally {
240240
tx.commit();
241241
}
242+
} catch(Exception e) {
243+
logger.error("Error pausing batch: {} {}", job.getBatchID(), e.getMessage());
242244
}
243245
}
244246

0 commit comments

Comments
 (0)