Skip to content

Commit 5441fb2

Browse files
committed
GH-1437: Check for immediate failure on send
Resolves #1437 The future returned by `Producer.send()` may have been immediately completed with an exception; check if `future.isDone()` and call `get()` so that any such exception is propagated to the caller. **I will perform backports after review/merge.** * Fix mock tests.
1 parent e8bdcf3 commit 5441fb2

File tree

6 files changed

+68
-16
lines changed

6 files changed

+68
-16
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,22 @@
1919
import java.time.Duration;
2020
import java.util.List;
2121
import java.util.Map;
22+
import java.util.concurrent.ExecutionException;
23+
import java.util.concurrent.Future;
2224

2325
import org.apache.commons.logging.LogFactory;
2426
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
2527
import org.apache.kafka.clients.producer.Callback;
2628
import org.apache.kafka.clients.producer.Producer;
2729
import org.apache.kafka.clients.producer.ProducerRecord;
30+
import org.apache.kafka.clients.producer.RecordMetadata;
2831
import org.apache.kafka.common.Metric;
2932
import org.apache.kafka.common.MetricName;
3033
import org.apache.kafka.common.PartitionInfo;
3134
import org.apache.kafka.common.TopicPartition;
3235

3336
import org.springframework.core.log.LogAccessor;
37+
import org.springframework.kafka.KafkaException;
3438
import org.springframework.kafka.support.KafkaHeaders;
3539
import org.springframework.kafka.support.KafkaUtils;
3640
import org.springframework.kafka.support.LoggingProducerListener;
@@ -401,7 +405,21 @@ protected ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> p
401405
final Producer<K, V> producer = getTheProducer();
402406
this.logger.trace(() -> "Sending: " + producerRecord);
403407
final SettableListenableFuture<SendResult<K, V>> future = new SettableListenableFuture<>();
404-
producer.send(producerRecord, buildCallback(producerRecord, producer, future));
408+
Future<RecordMetadata> sendFuture =
409+
producer.send(producerRecord, buildCallback(producerRecord, producer, future));
410+
// May be an immediate failure
411+
if (sendFuture.isDone()) {
412+
try {
413+
sendFuture.get();
414+
}
415+
catch (InterruptedException e) {
416+
Thread.currentThread().interrupt();
417+
throw new KafkaException("Interrupted", e);
418+
}
419+
catch (ExecutionException e) {
420+
throw new KafkaException("Send failed", e.getCause()); // NOSONAR, stack trace
421+
}
422+
}
405423
if (this.autoFlush) {
406424
flush();
407425
}

spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaProducerFactoryTests.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2019 the original author or authors.
2+
* Copyright 2018-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -19,6 +19,7 @@
1919
import static org.assertj.core.api.Assertions.assertThat;
2020
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
2121
import static org.mockito.ArgumentMatchers.any;
22+
import static org.mockito.BDDMockito.given;
2223
import static org.mockito.BDDMockito.willAnswer;
2324
import static org.mockito.BDDMockito.willThrow;
2425
import static org.mockito.Mockito.inOrder;
@@ -45,6 +46,7 @@
4546
import org.springframework.kafka.transaction.KafkaTransactionManager;
4647
import org.springframework.transaction.CannotCreateTransactionException;
4748
import org.springframework.transaction.support.TransactionTemplate;
49+
import org.springframework.util.concurrent.SettableListenableFuture;
4850

4951
/**
5052
* @author Gary Russell
@@ -57,6 +59,7 @@ public class DefaultKafkaProducerFactoryTests {
5759
@Test
5860
void testProducerClosedAfterBadTransition() throws Exception {
5961
final Producer producer = mock(Producer.class);
62+
given(producer.send(any(), any())).willReturn(new SettableListenableFuture<>());
6063
DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory(new HashMap<>()) {
6164

6265
@Override

spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTests.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2019 the original author or authors.
2+
* Copyright 2016-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -18,6 +18,7 @@
1818

1919
import static org.assertj.core.api.Assertions.allOf;
2020
import static org.assertj.core.api.Assertions.assertThat;
21+
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
2122
import static org.mockito.Mockito.mock;
2223
import static org.springframework.kafka.test.assertj.KafkaConditions.key;
2324
import static org.springframework.kafka.test.assertj.KafkaConditions.keyValue;
@@ -39,11 +40,13 @@
3940
import org.apache.kafka.clients.consumer.ConsumerConfig;
4041
import org.apache.kafka.clients.consumer.ConsumerRecord;
4142
import org.apache.kafka.clients.producer.Producer;
43+
import org.apache.kafka.clients.producer.ProducerConfig;
4244
import org.apache.kafka.clients.producer.ProducerRecord;
4345
import org.apache.kafka.clients.producer.RecordMetadata;
4446
import org.apache.kafka.common.Metric;
4547
import org.apache.kafka.common.MetricName;
4648
import org.apache.kafka.common.PartitionInfo;
49+
import org.apache.kafka.common.errors.TimeoutException;
4750
import org.apache.kafka.common.header.Header;
4851
import org.apache.kafka.common.serialization.StringDeserializer;
4952
import org.apache.kafka.common.serialization.StringSerializer;
@@ -52,6 +55,7 @@
5255
import org.junit.jupiter.api.BeforeAll;
5356
import org.junit.jupiter.api.Test;
5457

58+
import org.springframework.kafka.KafkaException;
5559
import org.springframework.kafka.support.Acknowledgment;
5660
import org.springframework.kafka.support.CompositeProducerListener;
5761
import org.springframework.kafka.support.DefaultKafkaHeaderMapper;
@@ -358,4 +362,17 @@ public void testTemplateDisambiguation() {
358362
pf.destroy();
359363
}
360364

365+
@Test
366+
void testFutureFailureOnSend() {
367+
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
368+
senderProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 10);
369+
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
370+
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
371+
372+
assertThatExceptionOfType(KafkaException.class).isThrownBy(() ->
373+
template.send("missing.topic", "foo"))
374+
.withCauseExactlyInstanceOf(TimeoutException.class);
375+
pf.destroy();
376+
}
377+
361378
}

spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@
7777
import org.springframework.transaction.annotation.Transactional;
7878
import org.springframework.transaction.support.AbstractPlatformTransactionManager;
7979
import org.springframework.transaction.support.TransactionTemplate;
80+
import org.springframework.util.concurrent.SettableListenableFuture;
8081

8182
/**
8283
* @author Gary Russell
@@ -306,8 +307,10 @@ public void testTransactionSynchronizationExceptionOnCommit() {
306307
public void testDeadLetterPublisherWhileTransactionActive() {
307308
@SuppressWarnings("unchecked")
308309
Producer<Object, Object> producer1 = mock(Producer.class);
310+
given(producer1.send(any(), any())).willReturn(new SettableListenableFuture<>());
309311
@SuppressWarnings("unchecked")
310312
Producer<Object, Object> producer2 = mock(Producer.class);
313+
given(producer2.send(any(), any())).willReturn(new SettableListenableFuture<>());
311314
producer1.initTransactions();
312315

313316
@SuppressWarnings("unchecked")
@@ -329,6 +332,7 @@ public void testDeadLetterPublisherWhileTransactionActive() {
329332
});
330333

331334
verify(producer1).beginTransaction();
335+
332336
verify(producer1).commitTransaction();
333337
verify(producer1).close(any());
334338
verify(producer2, never()).beginTransaction();
@@ -477,8 +481,10 @@ public void testAbort() {
477481
public void testExecuteInTransactionNewInnerTx() {
478482
@SuppressWarnings("unchecked")
479483
Producer<Object, Object> producer1 = mock(Producer.class);
484+
given(producer1.send(any(), any())).willReturn(new SettableListenableFuture<>());
480485
@SuppressWarnings("unchecked")
481486
Producer<Object, Object> producer2 = mock(Producer.class);
487+
given(producer2.send(any(), any())).willReturn(new SettableListenableFuture<>());
482488
producer1.initTransactions();
483489
AtomicBoolean first = new AtomicBoolean(true);
484490

@@ -530,26 +536,30 @@ protected Producer<Object, Object> createTransactionalProducerForPartition(Strin
530536
@EnableTransactionManagement
531537
public static class DeclarativeConfig {
532538

533-
@SuppressWarnings("rawtypes")
539+
@SuppressWarnings({ "rawtypes", "unchecked" })
534540
@Bean
535-
public ProducerFactory pf() {
536-
ProducerFactory pf = mock(ProducerFactory.class);
537-
given(pf.transactionCapable()).willReturn(true);
538-
given(pf.createProducer(isNull())).willReturn(producer1());
539-
given(pf.createProducer(anyString())).willReturn(producer2());
540-
return pf;
541+
public Producer producer1() {
542+
Producer mock = mock(Producer.class);
543+
given(mock.send(any(), any())).willReturn(new SettableListenableFuture<>());
544+
return mock;
541545
}
542546

543-
@SuppressWarnings("rawtypes")
547+
@SuppressWarnings({ "rawtypes", "unchecked" })
544548
@Bean
545-
public Producer producer1() {
546-
return mock(Producer.class);
549+
public Producer producer2() {
550+
Producer mock = mock(Producer.class);
551+
given(mock.send(any(), any())).willReturn(new SettableListenableFuture<>());
552+
return mock;
547553
}
548554

549555
@SuppressWarnings("rawtypes")
550556
@Bean
551-
public Producer producer2() {
552-
return mock(Producer.class);
557+
public ProducerFactory pf() {
558+
ProducerFactory pf = mock(ProducerFactory.class);
559+
given(pf.transactionCapable()).willReturn(true);
560+
given(pf.createProducer(isNull())).willReturn(producer1());
561+
given(pf.createProducer(anyString())).willReturn(producer2());
562+
return pf;
553563
}
554564

555565
@SuppressWarnings({ "rawtypes", "unchecked" })

spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@
9797
import org.springframework.transaction.support.AbstractPlatformTransactionManager;
9898
import org.springframework.transaction.support.DefaultTransactionStatus;
9999
import org.springframework.util.backoff.FixedBackOff;
100+
import org.springframework.util.concurrent.SettableListenableFuture;
100101

101102
/**
102103
* @author Gary Russell
@@ -176,6 +177,7 @@ private void testConsumeAndProduceTransactionGuts(boolean chained, boolean handl
176177
ConsumerFactory cf = mock(ConsumerFactory.class);
177178
willReturn(consumer).given(cf).createConsumer("group", "", null, KafkaTestUtils.defaultPropertyOverrides());
178179
Producer producer = mock(Producer.class);
180+
given(producer.send(any(), any())).willReturn(new SettableListenableFuture<>());
179181
final CountDownLatch closeLatch = new CountDownLatch(2);
180182
willAnswer(i -> {
181183
closeLatch.countDown();
@@ -413,6 +415,7 @@ public void testConsumeAndProduceTransactionExternalTM() throws Exception {
413415
ConsumerFactory cf = mock(ConsumerFactory.class);
414416
willReturn(consumer).given(cf).createConsumer("group", "", null, KafkaTestUtils.defaultPropertyOverrides());
415417
Producer producer = mock(Producer.class);
418+
given(producer.send(any(), any())).willReturn(new SettableListenableFuture<>());
416419

417420
final CountDownLatch closeLatch = new CountDownLatch(1);
418421

spring-kafka/src/test/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplateTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@
8888
import org.springframework.messaging.support.MessageBuilder;
8989
import org.springframework.test.annotation.DirtiesContext;
9090
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
91+
import org.springframework.util.concurrent.SettableListenableFuture;
9192

9293
/**
9394
* @author Gary Russell
@@ -401,7 +402,7 @@ public void testAggregateOrphansNotStored() throws Exception {
401402
willAnswer(invocation -> {
402403
ProducerRecord rec = invocation.getArgument(0);
403404
correlation.set(rec.headers().lastHeader(KafkaHeaders.CORRELATION_ID).value());
404-
return null;
405+
return new SettableListenableFuture<>();
405406
}).given(producer).send(any(), any());
406407
AggregatingReplyingKafkaTemplate template = new AggregatingReplyingKafkaTemplate(pf, container,
407408
(list, timeout) -> true);

0 commit comments

Comments
 (0)