Skip to content

Commit a7be6ac

Browse files
garyrussellartembilan
authored andcommitted
GH-1388: Support non-Tx with a Tx Template
Resolves #1388
1 parent 31d08b4 commit a7be6ac

File tree

7 files changed

+100
-19
lines changed

7 files changed

+100
-19
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -193,9 +193,10 @@ public DefaultKafkaProducerFactory(Map<String, Object> configs,
193193
String txId = (String) this.configs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
194194
if (StringUtils.hasText(txId)) {
195195
setTransactionIdPrefix(txId);
196-
LOGGER.info(() -> "If 'setTransactionIdPrefix()' is not going to be configured, "
196+
LOGGER.info(() -> "If 'setTransactionIdPrefix()' is not configured, "
197197
+ "the existing 'transactional.id' config with value: '" + txId
198-
+ "' will be suffixed for concurrent transactions support.");
198+
+ "' will be suffixed for concurrent transaction support.");
199+
this.configs.remove(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
199200
}
200201
}
201202

@@ -370,6 +371,15 @@ public Producer<K, V> createProducer() {
370371
@Override
371372
public Producer<K, V> createProducer(@Nullable String txIdPrefixArg) {
372373
String txIdPrefix = txIdPrefixArg == null ? this.transactionIdPrefix : txIdPrefixArg;
374+
return doCreateProducer(txIdPrefix);
375+
}
376+
377+
@Override
378+
public Producer<K, V> createNonTransactionalProducer() {
379+
return doCreateProducer(null);
380+
}
381+
382+
private Producer<K, V> doCreateProducer(@Nullable String txIdPrefix) {
373383
if (txIdPrefix != null) {
374384
if (this.producerPerConsumerPartition) {
375385
return createTransactionalProducerForPartition(txIdPrefix);

spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2019 the original author or authors.
2+
* Copyright 2015-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -83,6 +83,8 @@ public class KafkaTemplate<K, V> implements KafkaOperations<K, V> {
8383

8484
private Duration closeTimeout = ProducerFactoryUtils.DEFAULT_CLOSE_TIMEOUT;
8585

86+
private boolean allowNonTransactional;
87+
8688
/**
8789
* Create an instance using the supplied producer factory and autoFlush false.
8890
* @param producerFactory the producer factory.
@@ -181,6 +183,15 @@ public void setCloseTimeout(Duration closeTimeout) {
181183
this.closeTimeout = closeTimeout;
182184
}
183185

186+
/**
187+
* Set to true to allow a non-transactional send when the template is transactional.
188+
* @param allowNonTransactional true to allow.
189+
* @since 2.4.3
190+
*/
191+
public void setAllowNonTransactional(boolean allowNonTransactional) {
192+
this.allowNonTransactional = allowNonTransactional;
193+
}
194+
184195
/**
185196
* Return the producer factory used by this template.
186197
* @return the factory.
@@ -390,14 +401,6 @@ protected void closeProducer(Producer<K, V> producer, boolean inTx) {
390401
* RecordMetadata}.
391402
*/
392403
protected ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord) {
393-
if (this.transactional) {
394-
Assert.state(inTransaction(),
395-
"No transaction is in process; "
396-
+ "possible solutions: run the template operation within the scope of a "
397-
+ "template.executeInTransaction() operation, start a transaction with @Transactional "
398-
+ "before invoking the template method, "
399-
+ "run in a transaction started by a listener container when consuming a record");
400-
}
401404
final Producer<K, V> producer = getTheProducer();
402405
this.logger.trace(() -> "Sending: " + producerRecord);
403406
final SettableListenableFuture<SendResult<K, V>> future = new SettableListenableFuture<>();
@@ -450,7 +453,20 @@ public boolean inTransaction() {
450453
}
451454

452455
private Producer<K, V> getTheProducer() {
453-
if (this.transactional) {
456+
boolean transactionalProducer = this.transactional;
457+
if (transactionalProducer) {
458+
boolean inTransaction = inTransaction();
459+
Assert.state(this.allowNonTransactional || inTransaction,
460+
"No transaction is in process; "
461+
+ "possible solutions: run the template operation within the scope of a "
462+
+ "template.executeInTransaction() operation, start a transaction with @Transactional "
463+
+ "before invoking the template method, "
464+
+ "run in a transaction started by a listener container when consuming a record");
465+
if (!inTransaction) {
466+
transactionalProducer = false;
467+
}
468+
}
469+
if (transactionalProducer) {
454470
Producer<K, V> producer = this.producers.get();
455471
if (producer != null) {
456472
return producer;
@@ -459,8 +475,11 @@ private Producer<K, V> getTheProducer() {
459475
.getTransactionalResourceHolder(this.producerFactory, this.transactionIdPrefix, this.closeTimeout);
460476
return holder.getProducer();
461477
}
478+
else if (this.allowNonTransactional) {
479+
return this.producerFactory.createNonTransactionalProducer();
480+
}
462481
else {
463-
return this.producerFactory.createProducer(this.transactionIdPrefix);
482+
return this.producerFactory.createProducer();
464483
}
465484
}
466485

spring-kafka/src/main/java/org/springframework/kafka/core/ProducerFactory.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2019 the original author or authors.
2+
* Copyright 2016-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -29,8 +29,9 @@
2929
public interface ProducerFactory<K, V> {
3030

3131
/**
32-
* Create a producer.
32+
* Create a producer which will be transactional if the factory is so configured.
3333
* @return the producer.
34+
* @see #transactionCapable()
3435
*/
3536
Producer<K, V> createProducer();
3637

@@ -44,6 +45,16 @@ default Producer<K, V> createProducer(@SuppressWarnings("unused") String txIdPre
4445
throw new UnsupportedOperationException("This factory does not support this method");
4546
}
4647

48+
/**
49+
* Create a non-transactional producer.
50+
* @return the producer.
51+
* @since 2.4.3
52+
* @see #transactionCapable()
53+
*/
54+
default Producer<K, V> createNonTransactionalProducer() {
55+
throw new UnsupportedOperationException("This factory does not support this method");
56+
}
57+
4758
/**
4859
* Return true if the factory supports transactions.
4960
* @return true if transactional.

spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2019 the original author or authors.
2+
* Copyright 2017-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -526,6 +526,30 @@ protected Producer<Object, Object> createTransactionalProducerForPartition(Strin
526526
}
527527
}
528528

529+
@Test
530+
void testNonTxWithTx() {
531+
Map<String, Object> senderProps = KafkaTestUtils.producerProps(this.embeddedKafka);
532+
senderProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tx.");
533+
senderProps.put(ProducerConfig.RETRIES_CONFIG, 2);
534+
DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
535+
pf.setKeySerializer(new StringSerializer());
536+
KafkaTemplate<String, String> template = new KafkaTemplate<>(pf, true);
537+
template.executeInTransaction(tmp -> tmp.execute(prod -> {
538+
assertThat(KafkaTestUtils.getPropertyValue(prod, "delegate.transactionManager.transactionalId"))
539+
.isEqualTo("tx.0");
540+
return null;
541+
}));
542+
assertThatIllegalStateException().isThrownBy(() -> template.execute(prod -> {
543+
return null;
544+
}));
545+
template.setAllowNonTransactional(true);
546+
template.execute(prod -> {
547+
assertThat(KafkaTestUtils.getPropertyValue(prod, "delegate.transactionManager.transactionalId")).isNull();
548+
return null;
549+
});
550+
pf.destroy();
551+
}
552+
529553
@Configuration
530554
@EnableTransactionManagement
531555
public static class DeclarativeConfig {

spring-kafka/src/test/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplateTests.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2019 the original author or authors.
2+
* Copyright 2018-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,7 +20,6 @@
2020
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
2121
import static org.assertj.core.api.Assertions.fail;
2222
import static org.mockito.ArgumentMatchers.any;
23-
import static org.mockito.ArgumentMatchers.isNull;
2423
import static org.mockito.BDDMockito.given;
2524
import static org.mockito.BDDMockito.willAnswer;
2625
import static org.mockito.Mockito.mock;
@@ -396,7 +395,7 @@ public void testAggregateOrphansNotStored() throws Exception {
396395
given(container.getContainerProperties()).willReturn(properties);
397396
ProducerFactory pf = mock(ProducerFactory.class);
398397
Producer producer = mock(Producer.class);
399-
given(pf.createProducer(isNull())).willReturn(producer);
398+
given(pf.createProducer()).willReturn(producer);
400399
AtomicReference<byte[]> correlation = new AtomicReference<>();
401400
willAnswer(invocation -> {
402401
ProducerRecord rec = invocation.getArgument(0);

src/reference/asciidoc/kafka.adoc

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,8 @@ IMPORTANT: When `producerPerThread` is `true`, user code **must** call `closeThr
299299
This will physically close the producer and remove it from the `ThreadLocal`.
300300
Calling `reset()` or `destroy()` will not clean up these producers.
301301

302+
Also see <<tx-template-mixed>>.
303+
302304
When creating a `DefaultKafkaProducerFactory`, key and/or value `Serializer` classes can be picked up from configuration by calling the constructor that only takes in a Map of properties (see example in <<kafka-template>>), or `Serializer` instances may be passed to the `DefaultKafkaProducerFactory` constructor (in which case all `Producer` s share the same instances).
303305
Alternatively you can provide `Supplier<Serializer>` s (starting with version 2.3) that will be used to obtain separate `Serializer` instances for each `Producer`:
304306

@@ -2453,6 +2455,16 @@ This value should be the same for all application instances.
24532455
For transactions started by the template (or the transaction manager for `@Transaction`) you should set the property on the template and transaction manager respectively.
24542456
This property must have a different value on each application instance.
24552457

2458+
[[tx-template-mixed]]
2459+
===== `KafkaTemplate` Transactional and non-Transactional Publishing
2460+
2461+
Normally, when a `KafkaTemplate` is transactional (configured with a transaction-capable producer factory), transactions are required.
2462+
The transaction can be started by a `TransactionTemplate`, a `@Transactional` method, calling `executeInTransaction`, or by a listener container, when configured with a `KafkaTransactionManager`.
2463+
Any attempt to use the template outside the scope of a transaction results in the template throwing an `IllegalStateException`.
2464+
Starting with version 2.4.3, you can set the template's `allowNonTransactional` property to `true`.
2465+
In that case, the template will allow the operation to run without a transaction, by calling the `ProducerFactory` 's `createNonTransactionalProducer()` method; the producer will be cached, or thread-bound, as normal for reuse.
2466+
See <<producer-factory>>.
2467+
24562468
[[transactions-batch]]
24572469
===== Transactions with Batch Listeners
24582470

src/reference/asciidoc/whats-new.adoc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,12 @@ See the IMPORTANT note at the end of <<rebalance-listeners>> for more informatio
2525

2626
The `isAckAfterHandle()` default implementation now returns true by default.
2727

28+
[[x24-template]]
29+
==== KafkaTemplate
30+
31+
The `KafkaTemplate` now supports non-transactional publishing alongside transactional.
32+
See <<tx-template-mixed>> for more information.
33+
2834
[[x24-agg]]
2935
==== AggregatingReplyingKafkaTemplate
3036

0 commit comments

Comments
 (0)