Skip to content

Commit 96a7101

Browse files
garyrussellartembilan
authored andcommitted
AMQP-776: More Consumer Events
JIRA: https://jira.spring.io/browse/AMQP-776 JIRA: https://jira.spring.io/browse/AMQP-777 JIRA: https://jira.spring.io/browse/AMQP-782 Publish an event when a consumer successfully consumes from a queue. Publish an event when an SMLC listener throws an `Error`. Doc polishing. Update minimum client version in docs; remove reference to broker version since that's no longer linked to the client. __cherry-pick to 1.7.x (minus DMLC change)__ # Conflicts: # spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/DirectMessageListenerContainer.java # spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java # spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/DirectMessageListenerContainerIntegrationTests.java # spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerIntegration2Tests.java # src/reference/asciidoc/quick-tour.adoc * Rework `SimpleMessageListenerContainerIntegration2Tests` do not use lambda for the `ApplicationEventPublisher` since it is there since Spring 5 only
1 parent 4f4d0c5 commit 96a7101

File tree

6 files changed

+134
-16
lines changed

6 files changed

+134
-16
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/BlockingQueueConsumer.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import org.springframework.amqp.rabbit.support.MessagePropertiesConverter;
5555
import org.springframework.amqp.rabbit.support.RabbitExceptionTranslator;
5656
import org.springframework.amqp.support.ConsumerTagStrategy;
57+
import org.springframework.context.ApplicationEventPublisher;
5758
import org.springframework.transaction.support.TransactionSynchronizationManager;
5859
import org.springframework.util.ObjectUtils;
5960
import org.springframework.util.backoff.BackOffExecution;
@@ -143,6 +144,8 @@ public class BlockingQueueConsumer {
143144

144145
private boolean locallyTransacted;
145146

147+
private ApplicationEventPublisher applicationEventPublisher;
148+
146149
private volatile long abortStarted;
147150

148151
private volatile boolean normalCancel;
@@ -354,6 +357,10 @@ public void setLocallyTransacted(boolean locallyTransacted) {
354357
this.locallyTransacted = locallyTransacted;
355358
}
356359

360+
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
361+
this.applicationEventPublisher = applicationEventPublisher;
362+
}
363+
357364
/**
358365
* Clear the delivery tags when rolling back with an external transaction
359366
* manager.
@@ -641,6 +648,9 @@ private void consumeFromQueue(String queue) throws IOException {
641648
else {
642649
logger.error("Null consumer tag received for queue " + queue);
643650
}
651+
if (this.applicationEventPublisher != null) {
652+
this.applicationEventPublisher.publishEvent(new ConsumeOkEvent(this, queue, consumerTag));
653+
}
644654
}
645655

646656
private void attemptPassiveDeclarations() {
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Copyright 2017 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.amqp.rabbit.listener;
18+
19+
import org.springframework.amqp.event.AmqpEvent;
20+
21+
/**
22+
* @author Gary Russell
23+
* @since 1.7.5
24+
*
25+
*/
26+
@SuppressWarnings("serial")
27+
public class ConsumeOkEvent extends AmqpEvent {
28+
29+
private final String queue;
30+
31+
private final String consumerTag;
32+
33+
public ConsumeOkEvent(Object source, String queue, String consumerTag) {
34+
super(source);
35+
this.queue = queue;
36+
this.consumerTag = consumerTag;
37+
}
38+
39+
@Override
40+
public String toString() {
41+
return "ConsumeOkEvent [queue=" + this.queue + ", consumerTag=" + this.consumerTag
42+
+ ", consumer=" + getSource() + "]";
43+
}
44+
45+
}

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1120,6 +1120,7 @@ protected BlockingQueueConsumer createBlockingQueueConsumer() {
11201120
}
11211121
consumer.setBackOffExecution(this.recoveryBackOff.start());
11221122
consumer.setShutdownTimeout(this.shutdownTimeout);
1123+
consumer.setApplicationEventPublisher(this.applicationEventPublisher);
11231124
return consumer;
11241125
}
11251126

@@ -1608,6 +1609,7 @@ public void run() {
16081609
catch (Error e) { //NOSONAR
16091610
// ok to catch Error - we're aborting so will stop
16101611
logger.error("Consumer thread error, thread abort.", e);
1612+
logConsumerException(e);
16111613
aborted = true;
16121614
}
16131615
catch (Throwable t) { //NOSONAR

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerIntegration2Tests.java

Lines changed: 67 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -280,10 +280,15 @@ public void publishEvent(ApplicationEvent event) {
280280
assertNull(template.receiveAndConvert(queue.getName()));
281281
container.stop();
282282
assertTrue(eventLatch.await(10, TimeUnit.SECONDS));
283+
assertThat(events.size(), equalTo(8));
283284
assertThat(events.get(0), instanceOf(AsyncConsumerStartedEvent.class));
284-
assertSame(events.get(1), eventRef.get());
285-
assertThat(events.get(2), instanceOf(AsyncConsumerRestartedEvent.class));
286-
assertThat(events.get(3), instanceOf(AsyncConsumerStoppedEvent.class));
285+
assertThat(events.get(1), instanceOf(ConsumeOkEvent.class));
286+
assertThat(events.get(2), instanceOf(ConsumeOkEvent.class));
287+
assertSame(events.get(3), eventRef.get());
288+
assertThat(events.get(4), instanceOf(AsyncConsumerRestartedEvent.class));
289+
assertThat(events.get(5), instanceOf(ConsumeOkEvent.class));
290+
assertThat(events.get(6), instanceOf(ConsumeOkEvent.class));
291+
assertThat(events.get(7), instanceOf(AsyncConsumerStoppedEvent.class));
287292
}
288293

289294
@Test
@@ -332,13 +337,25 @@ public void testExclusive() throws Exception {
332337
context.refresh();
333338
container1.setApplicationContext(context);
334339
container1.setExclusive(true);
340+
final CountDownLatch consumeLatch1 = new CountDownLatch(1);
341+
container1.setApplicationEventPublisher(new ApplicationEventPublisher() {
342+
343+
@Override
344+
public void publishEvent(ApplicationEvent event) {
345+
if (event instanceof ConsumeOkEvent) {
346+
consumeLatch1.countDown();
347+
}
348+
}
349+
350+
@Override
351+
public void publishEvent(Object event) {
352+
353+
}
354+
355+
});
335356
container1.afterPropertiesSet();
336357
container1.start();
337-
int n = 0;
338-
while (n++ < 100 && container1.getActiveConsumerCount() < 1) {
339-
Thread.sleep(100);
340-
}
341-
assertTrue(n < 100);
358+
assertTrue(consumeLatch1.await(10, TimeUnit.SECONDS));
342359
CountDownLatch latch2 = new CountDownLatch(1000);
343360
SimpleMessageListenerContainer container2 = new SimpleMessageListenerContainer(template.getConnectionFactory());
344361
container2.setMessageListener(new MessageListenerAdapter(new PojoListener(latch2)));
@@ -347,18 +364,22 @@ public void testExclusive() throws Exception {
347364
container2.setRecoveryInterval(1000);
348365
container2.setExclusive(true); // not really necessary, but likely people will make all consumers exclusive.
349366
final AtomicReference<ListenerContainerConsumerFailedEvent> eventRef = new AtomicReference<>();
367+
final CountDownLatch consumeLatch2 = new CountDownLatch(1);
350368
container2.setApplicationEventPublisher(new ApplicationEventPublisher() {
351369

352-
@Override
353-
public void publishEvent(Object event) {
354-
//NOSONAR
355-
}
356-
357370
@Override
358371
public void publishEvent(ApplicationEvent event) {
359372
if (event instanceof ListenerContainerConsumerFailedEvent) {
360373
eventRef.set((ListenerContainerConsumerFailedEvent) event);
361374
}
375+
else if (event instanceof ConsumeOkEvent) {
376+
consumeLatch2.countDown();
377+
}
378+
}
379+
380+
@Override
381+
public void publishEvent(Object event) {
382+
362383
}
363384

364385
});
@@ -374,6 +395,7 @@ public void publishEvent(ApplicationEvent event) {
374395
assertEquals(1000, latch2.getCount());
375396
container1.stop();
376397
// container 2 should recover and process the next batch of messages
398+
assertTrue(consumeLatch2.await(10, TimeUnit.SECONDS));
377399
for (int i = 0; i < 1000; i++) {
378400
template.convertAndSend(queue.getName(), i + "foo");
379401
}
@@ -596,6 +618,38 @@ public void testTooSmallExecutor() {
596618
+ "executor have enough threads to support the container concurrency?"));
597619
}
598620

621+
@Test
622+
public void testErrorStopsContainer() throws Exception {
623+
this.container = createContainer((MessageListener) (m) -> {
624+
throw new Error("testError");
625+
}, false, this.queue.getName());
626+
final CountDownLatch latch = new CountDownLatch(1);
627+
this.container.setApplicationEventPublisher(new ApplicationEventPublisher() {
628+
629+
@Override
630+
public void publishEvent(ApplicationEvent event) {
631+
if (event instanceof ListenerContainerConsumerFailedEvent) {
632+
latch.countDown();
633+
}
634+
}
635+
636+
@Override
637+
public void publishEvent(Object event) {
638+
639+
}
640+
641+
});
642+
this.container.setDefaultRequeueRejected(false);
643+
this.container.start();
644+
this.template.convertAndSend(this.queue.getName(), "foo");
645+
assertTrue(latch.await(10, TimeUnit.SECONDS));
646+
int n = 0;
647+
while (n++ < 100 && this.container.isRunning()) {
648+
Thread.sleep(100);
649+
}
650+
assertFalse(this.container.isRunning());
651+
}
652+
599653
private boolean containerStoppedForAbortWithBadListener() throws InterruptedException {
600654
Log logger = spy(TestUtils.getPropertyValue(container, "logger", Log.class));
601655
new DirectFieldAccessor(container).setPropertyValue("logger", logger);

src/reference/asciidoc/amqp.adoc

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1382,7 +1382,7 @@ Batched messages are automatically de-batched by listener containers (using the
13821382
See <<template-batching>> for more information about batching.
13831383

13841384
[[consumer-events]]
1385-
===== Consumer Failure Events
1385+
===== Consumer Events
13861386

13871387
Starting with _version 1.5_, the `SimpleMessageListenerContainer` publishes application events whenever a listener
13881388
(consumer) experiences a failure of some kind.
@@ -1406,6 +1406,15 @@ See also <<channel-close-logging>>.
14061406

14071407
Fatal errors are always logged at `ERROR` level; this it not modifiable.
14081408

1409+
Several other events are published at various stages of the container lifecycle:
1410+
1411+
- `AsyncConsumerStartedEvent` (when the consumer is started)
1412+
- `AsyncConsumerRestartedEvent` (when the consumer is restarted after a failure - `SimpleMessageListenerContainer` only)
1413+
- `AsyncConsumerTerminatedEvent` (when a consumer is stopped normally)
1414+
- `AsyncConsumerStoppedEvent` (when the consumer is stopped - `SimpleMessageListenerContainer` only)
1415+
- `ConsumeOkEvent` (when a `consumeOk` is received from the broker, contains the queue name and `consumerTag`)
1416+
- `ListenerContainerIdleEvent` (see <<idle-containers>>)
1417+
14091418
[[consumerTags]]
14101419
===== Consumer Tags
14111420

src/reference/asciidoc/quick-tour.adoc

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,6 @@ Annotation-based listeners and the `RabbitMessagingTemplate` require Spring Fram
3434

3535
The minimum `amqp-client` java client library version is 4.0.0.
3636

37-
Note the this refers to the java client library; generally, it will work with older broker versions.
38-
3937
===== Very, Very Quick
4038

4139
Using plain, imperative Java to send and receive a message:

0 commit comments

Comments
 (0)