Skip to content

Commit 50196bf

Browse files
garyrussellartembilan
authored andcommitted
GH-615: More CommonErrorHandlers
See #615 - CommonContainerStoppingErrorHandler - CommonMixedErrorHandler
1 parent b53bb33 commit 50196bf

17 files changed

+980
-16
lines changed
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/*
2+
* Copyright 2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import java.util.List;
20+
import java.util.concurrent.Executor;
21+
22+
import org.apache.kafka.clients.consumer.Consumer;
23+
import org.apache.kafka.clients.consumer.ConsumerRecord;
24+
import org.apache.kafka.clients.consumer.ConsumerRecords;
25+
26+
import org.springframework.core.task.SimpleAsyncTaskExecutor;
27+
import org.springframework.kafka.KafkaException;
28+
import org.springframework.util.Assert;
29+
30+
/**
31+
* A {@link CommonErrorHandler} that stops the container when an error occurs. Replaces
32+
* the legacy {@link ContainerStoppingErrorHandler} and
33+
* {@link ContainerStoppingBatchErrorHandler}.
34+
*
35+
* @author Gary Russell
36+
* @since 2.8
37+
*
38+
*/
39+
public class CommonContainerStoppingErrorHandler extends KafkaExceptionLogLevelAware implements CommonErrorHandler {
40+
41+
private final Executor executor;
42+
43+
/**
44+
* Construct an instance with a default {@link SimpleAsyncTaskExecutor}.
45+
*/
46+
public CommonContainerStoppingErrorHandler() {
47+
this(new SimpleAsyncTaskExecutor("containerStop-"));
48+
}
49+
50+
/**
51+
* Construct an instance with the provided {@link Executor}.
52+
* @param executor the executor.
53+
*/
54+
public CommonContainerStoppingErrorHandler(Executor executor) {
55+
Assert.notNull(executor, "'executor' cannot be null");
56+
this.executor = executor;
57+
}
58+
59+
@Override
60+
public boolean remainingRecords() {
61+
return true;
62+
}
63+
64+
@Override
65+
public void handleOtherException(Exception thrownException, Consumer<?, ?> consumer,
66+
MessageListenerContainer container, boolean batchListener) {
67+
68+
stopContainer(container, thrownException);
69+
}
70+
71+
72+
@Override
73+
public void handleRemaining(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer,
74+
MessageListenerContainer container) {
75+
76+
stopContainer(container, thrownException);
77+
}
78+
79+
@Override
80+
public void handleBatch(Exception thrownException, ConsumerRecords<?, ?> data, Consumer<?, ?> consumer,
81+
MessageListenerContainer container, Runnable invokeListener) {
82+
83+
stopContainer(container, thrownException);
84+
}
85+
86+
private void stopContainer(MessageListenerContainer container, Exception thrownException) {
87+
this.executor.execute(() -> container.stop());
88+
// isRunning is false before the container.stop() waits for listener thread
89+
try {
90+
ListenerUtils.stoppableSleep(container, 10_000);
91+
}
92+
catch (InterruptedException e) {
93+
Thread.currentThread().interrupt();
94+
}
95+
throw new KafkaException("Stopped container", getLogLevel(), thrownException);
96+
}
97+
98+
}

spring-kafka/src/main/java/org/springframework/kafka/listener/CommonDelegatingErrorHandler.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -135,14 +135,14 @@ public void handleBatch(Exception thrownException, ConsumerRecords<?, ?> data, C
135135

136136
@Override
137137
public void handleOtherException(Exception thrownException, Consumer<?, ?> consumer,
138-
MessageListenerContainer container) {
138+
MessageListenerContainer container, boolean batchListener) {
139139

140140
CommonErrorHandler handler = findDelegate(thrownException);
141141
if (handler != null) {
142-
handler.handleOtherException(thrownException, consumer, container);
142+
handler.handleOtherException(thrownException, consumer, container, batchListener);
143143
}
144144
else {
145-
this.defaultErrorHandler.handleOtherException(thrownException, consumer, container);
145+
this.defaultErrorHandler.handleOtherException(thrownException, consumer, container, batchListener);
146146
}
147147
}
148148

spring-kafka/src/main/java/org/springframework/kafka/listener/CommonErrorHandler.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,10 @@ default boolean deliveryAttemptHeader() {
6262
* @param thrownException the exception.
6363
* @param consumer the consumer.
6464
* @param container the container.
65+
* @param batchListener true if the listener is a batch listener.
6566
*/
6667
default void handleOtherException(Exception thrownException, Consumer<?, ?> consumer,
67-
MessageListenerContainer container) {
68+
MessageListenerContainer container, boolean batchListener) {
6869

6970
LogFactory.getLog(getClass()).error("'handleOtherException' is not implemented by this handler",
7071
thrownException);

spring-kafka/src/main/java/org/springframework/kafka/listener/CommonLoggingErrorHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ public void handleBatch(Exception thrownException, ConsumerRecords<?, ?> data, C
6666

6767
@Override
6868
public void handleOtherException(Exception thrownException, Consumer<?, ?> consumer,
69-
MessageListenerContainer container) {
69+
MessageListenerContainer container, boolean batchListener) {
7070

7171
LOGGER.error(thrownException, () -> "Error occurred while not processing records");
7272
}
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
/*
2+
* Copyright 2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import java.util.List;
20+
21+
import org.apache.kafka.clients.consumer.Consumer;
22+
import org.apache.kafka.clients.consumer.ConsumerRecord;
23+
import org.apache.kafka.clients.consumer.ConsumerRecords;
24+
25+
import org.springframework.kafka.support.TopicPartitionOffset;
26+
import org.springframework.util.Assert;
27+
28+
/**
29+
* A {@link CommonErrorHandler} that delegates to different {@link CommonErrorHandler}s
30+
* for record and batch listeners.
31+
*
32+
* @author Gary Russell
33+
* @since 2.8
34+
*
35+
*/
36+
public class CommonMixedErrorHandler implements CommonErrorHandler {
37+
38+
private final CommonErrorHandler recordErrorHandler;
39+
40+
private final CommonErrorHandler batchErrorHandler;
41+
42+
/**
43+
* Construct an instance with the provided delegate {@link CommonErrorHandler}s.
44+
* @param recordErrorHandler the error handler for record listeners.
45+
* @param batchErrorHandler the error handler for batch listeners.
46+
*/
47+
public CommonMixedErrorHandler(CommonErrorHandler recordErrorHandler, CommonErrorHandler batchErrorHandler) {
48+
Assert.notNull(recordErrorHandler, "'recordErrorHandler' cannot be null");
49+
Assert.notNull(recordErrorHandler, "'batchErrorHandler' cannot be null");
50+
this.recordErrorHandler = recordErrorHandler;
51+
this.batchErrorHandler = batchErrorHandler;
52+
}
53+
54+
@Override
55+
public boolean remainingRecords() {
56+
return this.recordErrorHandler.remainingRecords();
57+
}
58+
59+
@Override
60+
public boolean deliveryAttemptHeader() {
61+
return this.recordErrorHandler.deliveryAttemptHeader();
62+
}
63+
64+
@Override
65+
public void handleOtherException(Exception thrownException, Consumer<?, ?> consumer,
66+
MessageListenerContainer container, boolean batchListener) {
67+
if (batchListener) {
68+
this.batchErrorHandler.handleOtherException(thrownException, consumer, container, batchListener);
69+
}
70+
else {
71+
this.recordErrorHandler.handleOtherException(thrownException, consumer, container, batchListener);
72+
}
73+
}
74+
75+
@Override
76+
public void handleRecord(Exception thrownException, ConsumerRecord<?, ?> record, Consumer<?, ?> consumer,
77+
MessageListenerContainer container) {
78+
79+
this.recordErrorHandler.handleRecord(thrownException, record, consumer, container);
80+
}
81+
82+
@Override
83+
public void handleRemaining(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer,
84+
MessageListenerContainer container) {
85+
86+
this.recordErrorHandler.handleRemaining(thrownException, records, consumer, container);
87+
}
88+
89+
@Override
90+
public void handleBatch(Exception thrownException, ConsumerRecords<?, ?> data, Consumer<?, ?> consumer,
91+
MessageListenerContainer container, Runnable invokeListener) {
92+
93+
this.batchErrorHandler.handleBatch(thrownException, data, consumer, container, invokeListener);
94+
}
95+
96+
@Override
97+
public int deliveryAttempt(TopicPartitionOffset topicPartitionOffset) {
98+
return this.recordErrorHandler.deliveryAttempt(topicPartitionOffset);
99+
}
100+
101+
@Override
102+
public void clearThreadState() {
103+
this.batchErrorHandler.clearThreadState();
104+
this.recordErrorHandler.clearThreadState();
105+
}
106+
107+
@Override
108+
public boolean isAckAfterHandle() {
109+
return this.recordErrorHandler.isAckAfterHandle();
110+
}
111+
112+
@Override
113+
public void setAckAfterHandle(boolean ack) {
114+
this.batchErrorHandler.setAckAfterHandle(ack);
115+
this.recordErrorHandler.setAckAfterHandle(ack);
116+
}
117+
118+
}

spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerStoppingBatchErrorHandler.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2019 the original author or authors.
2+
* Copyright 2017-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -30,10 +30,13 @@
3030
* A container error handler that stops the container after an exception
3131
* is thrown by the listener.
3232
*
33+
* @deprecated in favor of {@link CommonContainerStoppingErrorHandler}.
34+
*
3335
* @author Gary Russell
3436
* @since 2.1
3537
*
3638
*/
39+
@Deprecated
3740
public class ContainerStoppingBatchErrorHandler extends KafkaExceptionLogLevelAware
3841
implements ContainerAwareBatchErrorHandler {
3942

@@ -58,6 +61,7 @@ public ContainerStoppingBatchErrorHandler(Executor executor) {
5861
@Override
5962
public void handle(Exception thrownException, @Nullable ConsumerRecords<?, ?> data, Consumer<?, ?> consumer,
6063
MessageListenerContainer container) {
64+
6165
this.executor.execute(() -> container.stop());
6266
// isRunning is false before the container.stop() waits for listener thread
6367
int n = 0;

spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerStoppingErrorHandler.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2019 the original author or authors.
2+
* Copyright 2017-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -30,18 +30,28 @@
3030
* A container error handler that stops the container after an exception
3131
* is thrown by the listener.
3232
*
33+
* @deprecated in favor of {@link CommonContainerStoppingErrorHandler}.
34+
*
3335
* @author Gary Russell
3436
* @since 2.1
3537
*
3638
*/
39+
@Deprecated
3740
public class ContainerStoppingErrorHandler extends KafkaExceptionLogLevelAware implements ContainerAwareErrorHandler {
3841

3942
private final Executor executor;
4043

44+
/**
45+
* Construct an instance with a default {@link SimpleAsyncTaskExecutor}.
46+
*/
4147
public ContainerStoppingErrorHandler() {
4248
this.executor = new SimpleAsyncTaskExecutor();
4349
}
4450

51+
/**
52+
* Construct an instance with the provided {@link Executor}.
53+
* @param executor the executor.
54+
*/
4555
public ContainerStoppingErrorHandler(Executor executor) {
4656
Assert.notNull(executor, "'executor' cannot be null");
4757
this.executor = executor;

spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultErrorHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ public void handleBatch(Exception thrownException, ConsumerRecords<?, ?> data, C
133133

134134
@Override
135135
public void handleOtherException(Exception thrownException, Consumer<?, ?> consumer,
136-
MessageListenerContainer container) {
136+
MessageListenerContainer container, boolean batchListener) {
137137

138138
if (thrownException instanceof SerializationException) {
139139
throw new IllegalStateException("This error handler cannot process 'SerializationException's directly; "

spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlerAdapter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ public int deliveryAttempt(TopicPartitionOffset topicPartitionOffset) {
111111
@SuppressWarnings({ "unchecked" })
112112
@Override
113113
public void handleOtherException(Exception thrownException, Consumer<?, ?> consumer,
114-
MessageListenerContainer container) {
114+
MessageListenerContainer container, boolean batchListener) {
115115

116116
if (this.errorHandler != null) {
117117
this.errorHandler.handle(thrownException, Collections.EMPTY_LIST, consumer, container);

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1684,7 +1684,7 @@ protected void handleConsumerException(Exception e) {
16841684
try {
16851685
if (this.commonErrorHandler != null) {
16861686
this.commonErrorHandler.handleOtherException(e, this.consumer,
1687-
KafkaMessageListenerContainer.this.thisOrParentContainer);
1687+
KafkaMessageListenerContainer.this.thisOrParentContainer, this.isBatchListener);
16881688
}
16891689
else {
16901690
this.logger.error(e, "Consumer exception");

0 commit comments

Comments
 (0)