Skip to content

Commit 49aedaa

Browse files
garyrussellartembilan
authored andcommitted
GH-1612: Option: Producer Fenced: Stop Container
Resolves #1612 **cherry-pick to 2.5.x** * * Add @SInCE to javadocs; retain route cause of `StopAfterFenceException`. * * Add reason to `ConsumerStoppedEvent`. Resolves #1618 Also provide access to the actual container that stopped the consumer, for example to allow restarting after stopping due to a producer fenced exception. * * Add `@Nullable`s. * * Test Polishing.
1 parent f195b62 commit 49aedaa

File tree

5 files changed

+196
-36
lines changed

5 files changed

+196
-36
lines changed

spring-kafka/src/main/java/org/springframework/kafka/event/ConsumerStoppedEvent.java

Lines changed: 67 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2019 the original author or authors.
2+
* Copyright 2018-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,6 +16,8 @@
1616

1717
package org.springframework.kafka.event;
1818

19+
import org.springframework.lang.Nullable;
20+
1921
/**
2022
* An event published when a consumer is stopped. While it is best practice to use
2123
* stateless listeners, you can consume this event to clean up any thread-based resources
@@ -30,6 +32,30 @@ public class ConsumerStoppedEvent extends KafkaEvent {
3032

3133
private static final long serialVersionUID = 1L;
3234

35+
public enum Reason {
36+
37+
/**
38+
* The consumer was stopped because the container was stopped.
39+
*/
40+
NORMAL,
41+
42+
/**
43+
* The transactional producer was fenced anf the container
44+
* {@code stopContainerWhenFenced} property is true.
45+
*/
46+
FENCED,
47+
48+
/**
49+
* A {@link java.lang.Error} was thrown.
50+
*/
51+
ERROR
52+
53+
}
54+
55+
private final Reason reason;
56+
57+
private final Object container;
58+
3359
/**
3460
* Construct an instance with the provided source.
3561
* @param source the container.
@@ -45,13 +71,52 @@ public ConsumerStoppedEvent(Object source) {
4571
* @param container the container or the parent container if the container is a child.
4672
* @since 2.2.1
4773
*/
74+
@Deprecated
4875
public ConsumerStoppedEvent(Object source, Object container) {
76+
this(source, container, null, Reason.NORMAL);
77+
}
78+
79+
/**
80+
* Construct an instance with the provided source and container.
81+
* @param source the container instance that generated the event.
82+
* @param container the container or the parent container if the container is a child.
83+
* @param childContainer the child container, or null.
84+
* @param reason the reason.
85+
* @since 2.5.8
86+
*/
87+
public ConsumerStoppedEvent(Object source, Object container, @Nullable Object childContainer,
88+
Reason reason) {
89+
4990
super(source, container);
91+
this.container = childContainer;
92+
this.reason = reason;
93+
}
94+
95+
/**
96+
* Return the reason why the consumer was stopped.
97+
* @return the reason.
98+
* @since 2.5.8
99+
*/
100+
public Reason getReason() {
101+
return this.reason;
102+
}
103+
104+
/**
105+
* Return the container that the Consumer belonged to.
106+
* @param <T> the container type.
107+
* @return the container.
108+
* @since 2.5.8
109+
*/
110+
@SuppressWarnings("unchecked")
111+
public <T> T getContainer() {
112+
return this.container == null ? (T) getSource() : (T) this.container;
50113
}
51114

52115
@Override
53116
public String toString() {
54-
return "ConsumerStoppedEvent [source=" + getSource() + "]";
117+
return "ConsumerStoppedEvent [source=" + getSource()
118+
+ (this.container == null ? "" : (", container=" + this.container))
119+
+ ", reason=" + this.reason + "]";
55120
}
56121

57122
}

spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,8 @@ public enum EOSMode {
254254

255255
private TransactionDefinition transactionDefinition;
256256

257+
private boolean stopContainerWhenFenced;
258+
257259
/**
258260
* Create properties for a container that will subscribe to the specified topics.
259261
* @param topics the topics.
@@ -745,6 +747,30 @@ public void setAdviceChain(Advice... adviceChain) {
745747
}
746748
}
747749

750+
/**
751+
* When true, the container will stop after a
752+
* {@link org.apache.kafka.common.errors.ProducerFencedException}.
753+
* @return the stopContainerWhenFenced
754+
* @since 2.5.8
755+
*/
756+
public boolean isStopContainerWhenFenced() {
757+
return this.stopContainerWhenFenced;
758+
}
759+
760+
/**
761+
* Set to true to stop the container when a
762+
* {@link org.apache.kafka.common.errors.ProducerFencedException} is thrown.
763+
* Currently, there is no way to determine if such an exception is thrown due to a
764+
* rebalance Vs. a timeout. We therefore cannot call the after rollback processor. The
765+
* best solution is to ensure that the {@code transaction.timeout.ms} is large enough
766+
* so that transactions don't time out.
767+
* @param stopContainerWhenFenced true to stop the container.
768+
* @since 2.5.8
769+
*/
770+
public void setStopContainerWhenFenced(boolean stopContainerWhenFenced) {
771+
this.stopContainerWhenFenced = stopContainerWhenFenced;
772+
}
773+
748774
private void adviseListenerIfNeeded() {
749775
if (!CollectionUtils.isEmpty(this.adviceChain)) {
750776
if (AopUtils.isAopProxy(this.messageListener)) {

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 33 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@
8080
import org.springframework.kafka.event.ConsumerStartedEvent;
8181
import org.springframework.kafka.event.ConsumerStartingEvent;
8282
import org.springframework.kafka.event.ConsumerStoppedEvent;
83+
import org.springframework.kafka.event.ConsumerStoppedEvent.Reason;
8384
import org.springframework.kafka.event.ConsumerStoppingEvent;
8485
import org.springframework.kafka.event.ListenerContainerIdleEvent;
8586
import org.springframework.kafka.event.NonResponsiveConsumerEvent;
@@ -393,9 +394,15 @@ private void publishConsumerStoppingEvent(Consumer<?, ?> consumer) {
393394
}
394395
}
395396

396-
private void publishConsumerStoppedEvent() {
397+
private void publishConsumerStoppedEvent(@Nullable Throwable throwable) {
397398
if (getApplicationEventPublisher() != null) {
398-
getApplicationEventPublisher().publishEvent(new ConsumerStoppedEvent(this, this.thisOrParentContainer));
399+
getApplicationEventPublisher().publishEvent(new ConsumerStoppedEvent(this, this.thisOrParentContainer,
400+
this.thisOrParentContainer.equals(this) ? null : this,
401+
throwable instanceof Error
402+
? Reason.ERROR
403+
: throwable instanceof StopAfterFenceException
404+
? Reason.FENCED
405+
: Reason.NORMAL));
399406
}
400407
}
401408

@@ -1066,20 +1073,20 @@ public void run() {
10661073
+ "' has been fenced");
10671074
break;
10681075
}
1069-
catch (Exception e) {
1070-
handleConsumerException(e);
1071-
}
1072-
catch (Error e) { // NOSONAR - rethrown
1076+
catch (StopAfterFenceException | Error e) { // NOSONAR - rethrown
10731077
Runnable runnable = KafkaMessageListenerContainer.this.emergencyStop;
10741078
if (runnable != null) {
10751079
runnable.run();
10761080
}
10771081
this.logger.error(e, "Stopping container due to an Error");
1078-
wrapUp();
1082+
wrapUp(e);
10791083
throw e;
10801084
}
1085+
catch (Exception e) {
1086+
handleConsumerException(e);
1087+
}
10811088
}
1082-
wrapUp();
1089+
wrapUp(null);
10831090
}
10841091

10851092
private void setupSeeks() {
@@ -1311,7 +1318,7 @@ private void idleBetweenPollIfNecessary() {
13111318
}
13121319
}
13131320

1314-
private void wrapUp() {
1321+
private void wrapUp(@Nullable Throwable throwable) {
13151322
KafkaUtils.clearConsumerGroupId();
13161323
if (this.micrometerHolder != null) {
13171324
this.micrometerHolder.destroy();
@@ -1350,7 +1357,7 @@ private void wrapUp() {
13501357
this.consumerSeekAwareListener.unregisterSeekCallback();
13511358
}
13521359
this.logger.info(() -> getGroupId() + ": Consumer stopped");
1353-
publishConsumerStoppedEvent();
1360+
publishConsumerStoppedEvent(throwable);
13541361
}
13551362

13561363
/**
@@ -1520,6 +1527,9 @@ public void doInTransactionWithoutResult(TransactionStatus s) {
15201527
this.logger.error(e, "Producer or '"
15211528
+ ConsumerConfig.GROUP_INSTANCE_ID_CONFIG
15221529
+ "' fenced during transaction");
1530+
if (this.containerProperties.isStopContainerWhenFenced()) {
1531+
throw new StopAfterFenceException("Container stopping due to fencing", e);
1532+
}
15231533
}
15241534
catch (RuntimeException e) {
15251535
this.logger.error(e, "Transaction rolled back");
@@ -1786,6 +1796,9 @@ public void doInTransactionWithoutResult(TransactionStatus s) {
17861796
}
17871797
catch (ProducerFencedException | FencedInstanceIdException e) {
17881798
this.logger.error(e, "Producer or 'group.instance.id' fenced during transaction");
1799+
if (this.containerProperties.isStopContainerWhenFenced()) {
1800+
throw new StopAfterFenceException("Container stopping due to fencing", e);
1801+
}
17891802
break;
17901803
}
17911804
catch (RuntimeException e) {
@@ -2073,12 +2086,7 @@ else if (this.producer != null
20732086
this.acks.add(record);
20742087
}
20752088
if (this.producer != null) {
2076-
try {
2077-
sendOffsetsToTransaction();
2078-
}
2079-
catch (Exception e) {
2080-
this.logger.error(e, "Send offsets to transaction failed");
2081-
}
2089+
sendOffsetsToTransaction();
20822090
}
20832091
}
20842092

@@ -2828,4 +2836,13 @@ public void onSuccess(Object result) {
28282836

28292837
}
28302838

2839+
@SuppressWarnings("serial")
2840+
private static class StopAfterFenceException extends KafkaException {
2841+
2842+
StopAfterFenceException(String message, Throwable t) {
2843+
super(message, t);
2844+
}
2845+
2846+
}
2847+
28312848
}

spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java

Lines changed: 60 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@
8484
import org.springframework.kafka.core.ProducerFactory;
8585
import org.springframework.kafka.core.ProducerFactoryUtils;
8686
import org.springframework.kafka.event.ConsumerStoppedEvent;
87+
import org.springframework.kafka.event.ConsumerStoppedEvent.Reason;
8788
import org.springframework.kafka.event.ListenerContainerIdleEvent;
8889
import org.springframework.kafka.listener.ContainerProperties.AckMode;
8990
import org.springframework.kafka.listener.ContainerProperties.AssignmentCommitOption;
@@ -172,15 +173,29 @@ public void testConsumeAndProduceTransactionKTM_BETA() throws Exception {
172173
testConsumeAndProduceTransactionGuts(false, false, AckMode.RECORD, EOSMode.BETA);
173174
}
174175

176+
@Test
177+
public void testConsumeAndProduceTransactionStopWhenFenced() throws Exception {
178+
testConsumeAndProduceTransactionGuts(false, false, AckMode.RECORD, EOSMode.BETA, true);
179+
}
180+
175181
@SuppressWarnings({ "rawtypes", "unchecked" })
176182
private void testConsumeAndProduceTransactionGuts(boolean chained, boolean handleError, AckMode ackMode,
177183
EOSMode eosMode) throws Exception {
178184

185+
testConsumeAndProduceTransactionGuts(chained, handleError, ackMode, eosMode, false);
186+
}
187+
188+
@SuppressWarnings({ "rawtypes", "unchecked" })
189+
private void testConsumeAndProduceTransactionGuts(boolean chained, boolean handleError, AckMode ackMode,
190+
EOSMode eosMode, boolean stopWhenFenced) throws Exception {
191+
179192
Consumer consumer = mock(Consumer.class);
193+
AtomicBoolean assigned = new AtomicBoolean();
180194
final TopicPartition topicPartition = new TopicPartition("foo", 0);
181195
willAnswer(i -> {
182196
((ConsumerRebalanceListener) i.getArgument(1))
183197
.onPartitionsAssigned(Collections.singletonList(topicPartition));
198+
assigned.set(true);
184199
return null;
185200
}).given(consumer).subscribe(any(Collection.class), any(ConsumerRebalanceListener.class));
186201
ConsumerRecords records = new ConsumerRecords(Collections.singletonMap(topicPartition,
@@ -199,6 +214,14 @@ private void testConsumeAndProduceTransactionGuts(boolean chained, boolean handl
199214
ConsumerFactory cf = mock(ConsumerFactory.class);
200215
willReturn(consumer).given(cf).createConsumer("group", "", null, KafkaTestUtils.defaultPropertyOverrides());
201216
Producer producer = mock(Producer.class);
217+
if (stopWhenFenced) {
218+
willAnswer(inv -> {
219+
if (assigned.get()) {
220+
throw new ProducerFencedException("fenced");
221+
}
222+
return null;
223+
}).given(producer).sendOffsetsToTransaction(any(), any(ConsumerGroupMetadata.class));
224+
}
202225
given(producer.send(any(), any())).willReturn(new SettableListenableFuture<>());
203226
final CountDownLatch closeLatch = new CountDownLatch(2);
204227
willAnswer(i -> {
@@ -224,6 +247,7 @@ private void testConsumeAndProduceTransactionGuts(boolean chained, boolean handl
224247
props.setTransactionManager(ptm);
225248
props.setAssignmentCommitOption(AssignmentCommitOption.ALWAYS);
226249
props.setEosMode(eosMode);
250+
props.setStopContainerWhenFenced(stopWhenFenced);
227251
ConsumerGroupMetadata consumerGroupMetadata = new ConsumerGroupMetadata("group");
228252
given(consumer.groupMetadata()).willReturn(consumerGroupMetadata);
229253
final KafkaTemplate template = new KafkaTemplate(pf);
@@ -260,6 +284,14 @@ public void onMessage(Object data) {
260284
if (handleError) {
261285
container.setErrorHandler((e, data) -> { });
262286
}
287+
CountDownLatch stopEventLatch = new CountDownLatch(1);
288+
AtomicReference<ConsumerStoppedEvent> stopEvent = new AtomicReference<>();
289+
container.setApplicationEventPublisher(event -> {
290+
if (event instanceof ConsumerStoppedEvent) {
291+
stopEvent.set((ConsumerStoppedEvent) event);
292+
stopEventLatch.countDown();
293+
}
294+
});
263295
container.start();
264296
assertThat(closeLatch.await(10, TimeUnit.SECONDS)).isTrue();
265297
InOrder inOrder = inOrder(producer);
@@ -272,27 +304,37 @@ public void onMessage(Object data) {
272304
inOrder.verify(producer).sendOffsetsToTransaction(Collections.singletonMap(topicPartition,
273305
new OffsetAndMetadata(0)), consumerGroupMetadata);
274306
}
275-
inOrder.verify(producer).commitTransaction();
276-
inOrder.verify(producer).close(any());
277-
inOrder.verify(producer).beginTransaction();
278-
ArgumentCaptor<ProducerRecord> captor = ArgumentCaptor.forClass(ProducerRecord.class);
279-
inOrder.verify(producer).send(captor.capture(), any(Callback.class));
280-
assertThat(captor.getValue()).isEqualTo(new ProducerRecord("bar", "baz"));
281-
if (eosMode.equals(EOSMode.ALPHA)) {
282-
inOrder.verify(producer).sendOffsetsToTransaction(Collections.singletonMap(topicPartition,
283-
new OffsetAndMetadata(1)), "group");
307+
if (stopWhenFenced) {
308+
assertThat(stopEventLatch.await(10, TimeUnit.SECONDS)).isTrue();
309+
assertThat(stopEvent.get().getReason()).isEqualTo(Reason.FENCED);
284310
}
285311
else {
286-
inOrder.verify(producer).sendOffsetsToTransaction(Collections.singletonMap(topicPartition,
287-
new OffsetAndMetadata(1)), consumerGroupMetadata);
312+
inOrder.verify(producer).commitTransaction();
313+
inOrder.verify(producer).close(any());
314+
inOrder.verify(producer).beginTransaction();
315+
ArgumentCaptor<ProducerRecord> captor = ArgumentCaptor.forClass(ProducerRecord.class);
316+
inOrder.verify(producer).send(captor.capture(), any(Callback.class));
317+
assertThat(captor.getValue()).isEqualTo(new ProducerRecord("bar", "baz"));
318+
if (eosMode.equals(EOSMode.ALPHA)) {
319+
inOrder.verify(producer).sendOffsetsToTransaction(Collections.singletonMap(topicPartition,
320+
new OffsetAndMetadata(1)), "group");
321+
}
322+
else {
323+
inOrder.verify(producer).sendOffsetsToTransaction(Collections.singletonMap(topicPartition,
324+
new OffsetAndMetadata(1)), consumerGroupMetadata);
325+
}
326+
inOrder.verify(producer).commitTransaction();
327+
inOrder.verify(producer).close(any());
328+
container.stop();
329+
verify(pf, times(2)).createProducer(isNull());
330+
verifyNoMoreInteractions(producer);
331+
assertThat(transactionalIds.get(0)).isEqualTo("group.foo.0");
332+
assertThat(transactionalIds.get(0)).isEqualTo("group.foo.0");
333+
assertThat(stopEventLatch.await(10, TimeUnit.SECONDS)).isTrue();
334+
assertThat(stopEvent.get().getReason()).isEqualTo(Reason.NORMAL);
288335
}
289-
inOrder.verify(producer).commitTransaction();
290-
inOrder.verify(producer).close(any());
291-
container.stop();
292-
verify(pf, times(2)).createProducer(isNull());
293-
verifyNoMoreInteractions(producer);
294-
assertThat(transactionalIds.get(0)).isEqualTo("group.foo.0");
295-
assertThat(transactionalIds.get(0)).isEqualTo("group.foo.0");
336+
MessageListenerContainer stoppedContainer = stopEvent.get().getContainer();
337+
assertThat(stoppedContainer).isSameAs(container);
296338
}
297339

298340
@SuppressWarnings({ "rawtypes", "unchecked" })

0 commit comments

Comments
 (0)