Skip to content

Commit 8599625

Browse files
committed
GH-1437: Check for immediate failure on send
Resolves #1437 The future returned by `Producer.send()` may have been immediately completed with an exception; check if `future.isDone()` and call `get()` so that any such exception is propagated to the caller. **I will perform backports after review/merge.** * Fix mock tests.
1 parent 3d04b59 commit 8599625

File tree

5 files changed

+66
-15
lines changed

5 files changed

+66
-15
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
import java.util.List;
2020
import java.util.Map;
21+
import java.util.concurrent.ExecutionException;
22+
import java.util.concurrent.Future;
2123
import java.util.concurrent.TimeUnit;
2224

2325
import org.apache.commons.logging.Log;
@@ -32,6 +34,7 @@
3234
import org.apache.kafka.common.PartitionInfo;
3335
import org.apache.kafka.common.TopicPartition;
3436

37+
import org.springframework.kafka.KafkaException;
3538
import org.springframework.kafka.support.LoggingProducerListener;
3639
import org.springframework.kafka.support.ProducerListener;
3740
import org.springframework.kafka.support.SendResult;
@@ -365,7 +368,8 @@ protected ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> p
365368
this.logger.trace("Sending: " + producerRecord);
366369
}
367370
final SettableListenableFuture<SendResult<K, V>> future = new SettableListenableFuture<>();
368-
producer.send(producerRecord, new Callback() {
371+
Future<RecordMetadata> sendFuture =
372+
producer.send(producerRecord, new Callback() {
369373

370374
@Override
371375
public void onCompletion(RecordMetadata metadata, Exception exception) {
@@ -397,6 +401,19 @@ public void onCompletion(RecordMetadata metadata, Exception exception) {
397401
}
398402

399403
});
404+
// May be an immediate failure
405+
if (sendFuture.isDone()) {
406+
try {
407+
sendFuture.get();
408+
}
409+
catch (InterruptedException e) {
410+
Thread.currentThread().interrupt();
411+
throw new KafkaException("Interrupted", e);
412+
}
413+
catch (ExecutionException e) {
414+
throw new KafkaException("Send failed", e.getCause()); // NOSONAR, stack trace
415+
}
416+
}
400417
if (this.autoFlush) {
401418
flush();
402419
}

spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaProducerFactoryTests.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018 the original author or authors.
2+
* Copyright 2018-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,6 +17,7 @@
1717
package org.springframework.kafka.core;
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.mockito.BDDMockito.given;
2021
import static org.mockito.BDDMockito.willAnswer;
2122
import static org.mockito.Matchers.any;
2223
import static org.mockito.Mockito.inOrder;
@@ -36,6 +37,7 @@
3637
import org.springframework.kafka.transaction.KafkaTransactionManager;
3738
import org.springframework.transaction.CannotCreateTransactionException;
3839
import org.springframework.transaction.support.TransactionTemplate;
40+
import org.springframework.util.concurrent.SettableListenableFuture;
3941

4042
/**
4143
* @author Gary Russell
@@ -48,6 +50,7 @@ public class DefaultKafkaProducerFactoryTests {
4850
@Test
4951
public void testProducerClosedAfterBadTransition() throws Exception {
5052
final Producer producer = mock(Producer.class);
53+
given(producer.send(any(), any())).willReturn(new SettableListenableFuture<>());
5154
DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory(new HashMap<>()) {
5255

5356
@Override

spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTests.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2017 the original author or authors.
2+
* Copyright 2016-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,6 +17,7 @@
1717
package org.springframework.kafka.core;
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.assertj.core.api.Assertions.fail;
2021
import static org.mockito.Mockito.mock;
2122
import static org.springframework.kafka.test.assertj.KafkaConditions.key;
2223
import static org.springframework.kafka.test.assertj.KafkaConditions.partition;
@@ -33,10 +34,12 @@
3334
import org.apache.kafka.clients.consumer.Consumer;
3435
import org.apache.kafka.clients.consumer.ConsumerRecord;
3536
import org.apache.kafka.clients.producer.Producer;
37+
import org.apache.kafka.clients.producer.ProducerConfig;
3638
import org.apache.kafka.clients.producer.RecordMetadata;
3739
import org.apache.kafka.common.Metric;
3840
import org.apache.kafka.common.MetricName;
3941
import org.apache.kafka.common.PartitionInfo;
42+
import org.apache.kafka.common.errors.TimeoutException;
4043
import org.apache.kafka.common.header.Header;
4144
import org.apache.kafka.common.serialization.StringDeserializer;
4245
import org.apache.kafka.common.serialization.StringSerializer;
@@ -46,6 +49,7 @@
4649
import org.junit.ClassRule;
4750
import org.junit.Test;
4851

52+
import org.springframework.kafka.KafkaException;
4953
import org.springframework.kafka.support.Acknowledgment;
5054
import org.springframework.kafka.support.DefaultKafkaHeaderMapper;
5155
import org.springframework.kafka.support.KafkaHeaders;
@@ -309,4 +313,21 @@ public void testTemplateDisambiguation() throws Exception {
309313
pf.destroy();
310314
}
311315

316+
@Test
317+
public void testFutureFailureOnSend() {
318+
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
319+
senderProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 10);
320+
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
321+
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
322+
323+
try {
324+
template.send("missing.topic", "foo");
325+
fail("Expected exception");
326+
}
327+
catch (KafkaException e) {
328+
assertThat(e.getCause()).isInstanceOf(TimeoutException.class);
329+
}
330+
pf.destroy();
331+
}
332+
312333
}

spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@
7474
import org.springframework.transaction.annotation.Transactional;
7575
import org.springframework.transaction.support.AbstractPlatformTransactionManager;
7676
import org.springframework.transaction.support.TransactionTemplate;
77+
import org.springframework.util.concurrent.SettableListenableFuture;
7778

7879
import kafka.server.KafkaConfig;
7980

@@ -195,7 +196,7 @@ public void testDeclarative() {
195196
ProducerFactory producerFactory = ctx.getBean(ProducerFactory.class);
196197
verify(producerFactory, times(2)).createProducer();
197198
Producer producer1 = ctx.getBean("producer1", Producer.class);
198-
Producer producer2 = ctx.getBean("producer1", Producer.class);
199+
Producer producer2 = ctx.getBean("producer2", Producer.class);
199200
InOrder inOrder = inOrder(producer1, producer2);
200201
inOrder.verify(producer1).beginTransaction();
201202
inOrder.verify(producer1).send(eq(new ProducerRecord("foo", "bar")), any(Callback.class));
@@ -344,8 +345,10 @@ public Producer<String, String> createProducer() {
344345
public void testExcecuteInTransactionNewInnerTx() {
345346
@SuppressWarnings("unchecked")
346347
Producer<Object, Object> producer1 = mock(Producer.class);
348+
given(producer1.send(any(), any())).willReturn(new SettableListenableFuture<>());
347349
@SuppressWarnings("unchecked")
348350
Producer<Object, Object> producer2 = mock(Producer.class);
351+
given(producer2.send(any(), any())).willReturn(new SettableListenableFuture<>());
349352
producer1.initTransactions();
350353
AtomicBoolean first = new AtomicBoolean(true);
351354

@@ -396,25 +399,29 @@ Producer<Object, Object> createTransactionalProducerForPartition() {
396399
@EnableTransactionManagement
397400
public static class DeclarativeConfig {
398401

399-
@SuppressWarnings("rawtypes")
402+
@SuppressWarnings({ "rawtypes", "unchecked" })
400403
@Bean
401-
public ProducerFactory pf() {
402-
ProducerFactory pf = mock(ProducerFactory.class);
403-
given(pf.transactionCapable()).willReturn(true);
404-
given(pf.createProducer()).willReturn(producer1(), producer2());
405-
return pf;
404+
public Producer producer1() {
405+
Producer mock = mock(Producer.class);
406+
given(mock.send(any(), any())).willReturn(new SettableListenableFuture<>());
407+
return mock;
406408
}
407409

408-
@SuppressWarnings("rawtypes")
410+
@SuppressWarnings({ "rawtypes", "unchecked" })
409411
@Bean
410-
public Producer producer1() {
411-
return mock(Producer.class);
412+
public Producer producer2() {
413+
Producer mock = mock(Producer.class);
414+
given(mock.send(any(), any())).willReturn(new SettableListenableFuture<>());
415+
return mock;
412416
}
413417

414418
@SuppressWarnings("rawtypes")
415419
@Bean
416-
public Producer producer2() {
417-
return producer1();
420+
public ProducerFactory pf() {
421+
ProducerFactory pf = mock(ProducerFactory.class);
422+
given(pf.transactionCapable()).willReturn(true);
423+
given(pf.createProducer()).willReturn(producer1(), producer2());
424+
return pf;
418425
}
419426

420427
@SuppressWarnings({ "rawtypes", "unchecked" })

spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@
7979
import org.springframework.transaction.TransactionException;
8080
import org.springframework.transaction.support.AbstractPlatformTransactionManager;
8181
import org.springframework.transaction.support.DefaultTransactionStatus;
82+
import org.springframework.util.concurrent.SettableListenableFuture;
8283

8384
/**
8485
* @author Gary Russell
@@ -145,6 +146,7 @@ private void testConsumeAndProduceTransactionGuts(boolean chained, boolean handl
145146
ConsumerFactory cf = mock(ConsumerFactory.class);
146147
willReturn(consumer).given(cf).createConsumer("group", null);
147148
Producer producer = mock(Producer.class);
149+
given(producer.send(any(), any())).willReturn(new SettableListenableFuture<>());
148150
final CountDownLatch closeLatch = new CountDownLatch(2);
149151
willAnswer(i -> {
150152
closeLatch.countDown();
@@ -375,6 +377,7 @@ public void testConsumeAndProduceTransactionExternalTM() throws Exception {
375377
ConsumerFactory cf = mock(ConsumerFactory.class);
376378
willReturn(consumer).given(cf).createConsumer("group", null);
377379
Producer producer = mock(Producer.class);
380+
given(producer.send(any(), any())).willReturn(new SettableListenableFuture<>());
378381

379382
final CountDownLatch closeLatch = new CountDownLatch(1);
380383

0 commit comments

Comments
 (0)