Skip to content

Commit 1304168

Browse files
artembilangaryrussell
authored andcommitted
GH-2817: KafkaTemplate: No double error timers (#2823)
* GH-2817: KafkaTemplate: No double error timers Fixes #2817 An observation in `KafkaTemplate` can be marked with error from a `Callback`. Then `Future` is evaluated and its exception is thrown back to the `observeSend()`. Here this exception is caught and reported to the observation again. This creates a second timer in the Micrometer, but with different error tag * Check for error presence in the `observeSend()` `catch` block and skip second report **Cherry-pick to `3.0.x`** * * Fix import order for Checkstyle rule
1 parent 4379771 commit 1304168

File tree

2 files changed

+21
-6
lines changed

2 files changed

+21
-6
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -754,8 +754,11 @@ private CompletableFuture<SendResult<K, V>> observeSend(final ProducerRecord<K,
754754
return doSend(producerRecord, observation);
755755
}
756756
catch (RuntimeException ex) {
757-
observation.error(ex);
758-
observation.stop();
757+
// The error is added from org.apache.kafka.clients.producer.Callback
758+
if (observation.getContext().getError() == null) {
759+
observation.error(ex);
760+
observation.stop();
761+
}
759762
throw ex;
760763
}
761764
}
@@ -781,7 +784,7 @@ protected CompletableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V>
781784
}
782785
Future<RecordMetadata> sendFuture =
783786
producer.send(producerRecord, buildCallback(producerRecord, producer, future, sample, observation));
784-
// May be an immediate failure
787+
// Maybe an immediate failure
785788
if (sendFuture.isDone()) {
786789
try {
787790
sendFuture.get();

spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.springframework.kafka.support.micrometer;
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
2021
import static org.awaitility.Awaitility.await;
2122
import static org.mockito.Mockito.mock;
2223

@@ -33,12 +34,14 @@
3334
import org.apache.kafka.clients.consumer.ConsumerConfig;
3435
import org.apache.kafka.clients.consumer.ConsumerRecord;
3536
import org.apache.kafka.clients.producer.ProducerConfig;
37+
import org.apache.kafka.common.errors.InvalidTopicException;
3638
import org.apache.kafka.common.header.Headers;
3739
import org.junit.jupiter.api.Test;
3840

3941
import org.springframework.beans.factory.annotation.Autowired;
4042
import org.springframework.context.annotation.Bean;
4143
import org.springframework.context.annotation.Configuration;
44+
import org.springframework.kafka.KafkaException;
4245
import org.springframework.kafka.annotation.EnableKafka;
4346
import org.springframework.kafka.annotation.KafkaListener;
4447
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
@@ -79,8 +82,9 @@
7982

8083
/**
8184
* @author Gary Russell
82-
* @since 3.0
85+
* @author Artem Bilan
8386
*
87+
* @since 3.0
8488
*/
8589
@SpringJUnitConfig
8690
@EmbeddedKafka(topics = { "observation.testT1", "observation.testT2", "ObservationTests.testT3" })
@@ -213,6 +217,14 @@ public KeyValues getLowCardinalityKeyValues(KafkaRecordReceiverContext context)
213217
.getPropertyValue(endpointRegistry.getListenerContainer("obs3"), "containers", List.class).get(0);
214218
cAdmin = KafkaTestUtils.getPropertyValue(container, "listenerConsumer.kafkaAdmin", KafkaAdmin.class);
215219
assertThat(cAdmin).isSameAs(config.mockAdmin);
220+
221+
assertThatExceptionOfType(KafkaException.class)
222+
.isThrownBy(() -> template.send("wrong%Topic", "data"))
223+
.withCauseExactlyInstanceOf(InvalidTopicException.class);
224+
225+
MeterRegistryAssert.assertThat(meterRegistry)
226+
.hasTimerWithNameAndTags("spring.kafka.template", KeyValues.of("error", "InvalidTopicException"))
227+
.doesNotHaveMeterWithNameAndTags("spring.kafka.template", KeyValues.of("error", "KafkaException"));
216228
}
217229

218230
@Configuration
@@ -232,15 +244,15 @@ KafkaAdmin admin(EmbeddedKafkaBroker broker) {
232244
@Bean
233245
ProducerFactory<Integer, String> producerFactory(EmbeddedKafkaBroker broker) {
234246
Map<String, Object> producerProps = KafkaTestUtils.producerProps(broker);
235-
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString() + ","
247+
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString() + ","
236248
+ broker.getBrokersAsString());
237249
return new DefaultKafkaProducerFactory<>(producerProps);
238250
}
239251

240252
@Bean
241253
ConsumerFactory<Integer, String> consumerFactory(EmbeddedKafkaBroker broker) {
242254
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("obs", "false", broker);
243-
consumerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString() + ","
255+
consumerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString() + ","
244256
+ broker.getBrokersAsString() + "," + broker.getBrokersAsString());
245257
return new DefaultKafkaConsumerFactory<>(consumerProps);
246258
}

0 commit comments

Comments
 (0)