Skip to content

Commit ee4d79e

Browse files
authored
Merge pull request santanusinha#54 from pvntohare/publish_bug_fix
Adding infinite expiry for publishWithExpiry(), NPE bug fix for publish()
2 parents 4654492 + 2da419a commit ee4d79e

File tree

3 files changed

+81
-5
lines changed

3 files changed

+81
-5
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
<groupId>io.appform.dropwizard.actors</groupId>
88
<artifactId>dropwizard-rabbitmq-actors</artifactId>
9-
<version>2.0.28-4</version>
9+
<version>2.0.28-5</version>
1010
<name>Dropwizard RabbitMQ Bundle</name>
1111
<url>https://github.com/santanusinha/dropwizard-rabbitmq-actors</url>
1212
<description>Provides actor abstraction on RabbitMQ for dropwizard based projects.</description>

src/main/java/io/appform/dropwizard/actors/base/UnmanagedPublisher.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,11 +67,15 @@ public final void publishWithDelay(final Message message, final long delayMillis
6767
}
6868

6969
public final void publishWithExpiry(final Message message, final long expiryInMs) throws Exception {
70-
val expiresAt = Instant.now().toEpochMilli() + expiryInMs;
71-
val properties = new AMQP.BasicProperties.Builder()
70+
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
7271
.deliveryMode(2)
73-
.headers(ImmutableMap.of(MESSAGE_EXPIRY_TEXT, expiresAt))
7472
.build();
73+
if (expiryInMs > 0) {
74+
val expiresAt = Instant.now().toEpochMilli() + expiryInMs;
75+
properties = properties.builder()
76+
.headers(ImmutableMap.of(MESSAGE_EXPIRY_TEXT, expiresAt))
77+
.build();
78+
}
7579
publish(message, properties);
7680
}
7781

@@ -91,7 +95,10 @@ public final void publish(final Message message, final AMQP.BasicProperties prop
9195
}
9296

9397
private AMQP.BasicProperties getEnrichedProperties(AMQP.BasicProperties properties) {
94-
val enrichedHeaders = new HashMap<>(properties.getHeaders());
98+
HashMap<String, Object> enrichedHeaders = new HashMap<>();
99+
if (properties.getHeaders() != null) {
100+
enrichedHeaders.putAll(properties.getHeaders());
101+
}
95102
enrichedHeaders.put(MESSAGE_PUBLISHED_TEXT, Instant.now().toEpochMilli());
96103
return properties.builder()
97104
.headers(Collections.unmodifiableMap(enrichedHeaders))

src/test/java/io/appform/dropwizard/actors/connectivity/actor/ExpiryMessagesTest.java

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,66 @@ queueName, actorConfig, connection, objectMapper, new RetryStrategyFactory(), ne
275275
Assertions.assertEquals(1, normalDeliveryCount.get());
276276
}
277277

278+
/**
279+
* This test does the following:
280+
* - Publisher publishes the message with an expiry of -1ms with 1 consumer
281+
* - Consumer would consume the message normally and delay in consumption is 1000ms
282+
*/
283+
@Test
284+
public void testWhenMessagesAreNotExpiredCase4() throws Exception {
285+
val queueName = "queue-6";
286+
val objectMapper = new ObjectMapper();
287+
val actorConfig = new ActorConfig();
288+
actorConfig.setExchange("test-exchange-1");
289+
val publisher = new UnmanagedPublisher<>(
290+
queueName, actorConfig, connection, objectMapper);
291+
publisher.start();
292+
293+
val message = ImmutableMap.of(
294+
"key", "value"
295+
);
296+
297+
publisher.publishWithExpiry(message, -1);
298+
Thread.sleep(1000);
299+
val normalDeliveryCount = new AtomicInteger();
300+
val consumer = new UnmanagedConsumer<>(
301+
queueName, actorConfig, connection, objectMapper, new RetryStrategyFactory(), new ExceptionHandlingFactory(),
302+
Map.class, handleExpectedMessageWithDelay(1000, normalDeliveryCount), this::handleForNoExpectedMsg, (x) -> true);
303+
consumer.start();
304+
305+
306+
Assertions.assertEquals(1, normalDeliveryCount.get());
307+
}
308+
309+
/**
310+
* This test does the following:
311+
* - Publisher publishes the message with 1 consumer
312+
* - Consumer would consume the message normally and delay in consumption is 1000ms
313+
*/
314+
@Test
315+
public void testRegressionForPublish() throws Exception {
316+
val queueName = "queue-7";
317+
val objectMapper = new ObjectMapper();
318+
val actorConfig = new ActorConfig();
319+
actorConfig.setExchange("test-exchange-1");
320+
val publisher = new UnmanagedPublisher<>(
321+
queueName, actorConfig, connection, objectMapper);
322+
publisher.start();
323+
324+
val message = ImmutableMap.of(
325+
"key", "value"
326+
);
327+
328+
publisher.publish(message);
329+
val normalDeliveryCount = new AtomicInteger();
330+
val consumer = new UnmanagedConsumer<>(
331+
queueName, actorConfig, connection, objectMapper, new RetryStrategyFactory(), new ExceptionHandlingFactory(),
332+
Map.class, handleExpectedMessageWithDelay(0, normalDeliveryCount), this::handleForNoExpectedMsg, (x) -> true);
333+
consumer.start();
334+
Thread.sleep(500);
335+
336+
Assertions.assertEquals(1, normalDeliveryCount.get());
337+
}
278338
@NotNull
279339
private MessageHandlingFunction<Map, Boolean> handleExpectedMessage(int maxDelay, AtomicInteger normalDeliveryCount) {
280340
return (msg, meta) -> {
@@ -284,6 +344,15 @@ private MessageHandlingFunction<Map, Boolean> handleExpectedMessage(int maxDelay
284344
};
285345
}
286346

347+
@NotNull
348+
private MessageHandlingFunction<Map, Boolean> handleExpectedMessageWithDelay(int minDelay, AtomicInteger normalDeliveryCount) {
349+
return (msg, meta) -> {
350+
Assertions.assertTrue(meta.getDelayInMs() > minDelay);
351+
normalDeliveryCount.getAndIncrement();
352+
return true;
353+
};
354+
}
355+
287356
@NotNull
288357
private MessageHandlingFunction<Map, Boolean> handleExpectedMessageForReDelivery(AtomicInteger expiredDeliveryCount) {
289358
return (msg, meta) -> {

0 commit comments

Comments
 (0)