Skip to content

Commit 4b91ea6

Browse files
garyrussellartembilan
authored andcommitted
Transactions Doc Polishing
Clarify synchronization of producer-initiated transactions.
1 parent 066a3c0 commit 4b91ea6

File tree

2 files changed

+18
-39
lines changed

2 files changed

+18
-39
lines changed

src/reference/asciidoc/kafka.adoc

Lines changed: 17 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -3050,6 +3050,8 @@ Spring for Apache Kafka adds support in the following ways:
30503050
* `KafkaTransactionManager`: Used with normal Spring transaction support (`@Transactional`, `TransactionTemplate` etc).
30513051
* Transactional `KafkaMessageListenerContainer`
30523052
* Local transactions with `KafkaTemplate`
3053+
* Transaction synchronization with other transaction managers
3054+
* `ChainedKafkaTransactionManager`
30533055

30543056
Transactions are enabled by providing the `DefaultKafkaProducerFactory` with a `transactionIdPrefix`.
30553057
In that case, instead of managing a single shared `Producer`, the factory maintains a cache of transactional producers.
@@ -3064,9 +3066,12 @@ NOTE: While transactions are supported with batch listeners, by default, zombie
30643066
However, starting with version 2.3.2, zombie fencing is supported if you set the container property `subBatchPerPartition` to true.
30653067
In that case, the batch listener is invoked once per partition received from the last poll, as if each poll only returned records for a single partition.
30663068
This is `true` by default since version 2.5 when transactions are enabled with `EOSMode.ALPHA`; set it to `false` if you are using transactions but are not concerned about zombie fencing.
3069+
Also see <<exactly-once>>.
30673070

30683071
Also see <<transaction-id-prefix>>.
30693072

3073+
With Spring Boot, it is only necessary to set the `spring.kafka.producer.transaction-id-prefix` property - Boot will automatically configure a `KafkaTransactionManager` bean and wire it into the listener container.
3074+
30703075
===== Using `KafkaTransactionManager`
30713076

30723077
The `KafkaTransactionManager` is an implementation of Spring Framework's `PlatformTransactionManager`.
@@ -3081,52 +3086,24 @@ You must configure the `KafkaTemplate` to use the same `ProducerFactory` as the
30813086

30823087
===== Transaction Synchronization
30833088

3084-
If you need to synchronize a Kafka transaction with some other transaction, configure the listener container with the appropriate transaction manager (one that supports synchronization, such as the `DataSourceTransactionManager`).
3085-
Any operations performed on a transactional `KafkaTemplate` from the listener participate in a single transaction.
3086-
The Kafka transaction is committed (or rolled back) immediately after the controlling transaction.
3087-
Before exiting the listener, you should invoke one of the template's `sendOffsetsToTransaction` methods (unless you use a <<chained-transaction-manager,`ChainedKafkaTransactionManager`>>).
3088-
For convenience, the listener container binds its consumer group ID to the thread, so, generally, you can use the first method.
3089-
The following listing shows the two method signatures:
3090-
3091-
====
3092-
[source, java]
3093-
----
3094-
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets);
3095-
3096-
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId);
3097-
----
3098-
====
3089+
This section refers to producer-only transactions (transactions not started by a listener container); see <<chained-transaction-manager>> for information about synchronizing transactions when the container starts the transaction.
30993090

3100-
The following example shows how to use the first signature of the `sendOffsetsToTransaction` method:
3091+
If you want to send records to kafka and perform some database updates, you can use normal Spring transaction management with, say, a `DataSourceTransactionManager`.
31013092

31023093
====
31033094
[source, java]
31043095
----
3105-
@Bean
3106-
KafkaMessageListenerContainer container(ConsumerFactory<String, String> cf,
3107-
final KafkaTemplate template) {
3108-
ContainerProperties props = new ContainerProperties("foo");
3109-
props.setGroupId("group");
3110-
props.setTransactionManager(new SomeOtherTransactionManager());
3111-
...
3112-
props.setMessageListener((MessageListener<String, String>) m -> {
3113-
template.send("foo", "bar");
3114-
template.send("baz", "qux");
3115-
template.sendOffsetsToTransaction(
3116-
Collections.singletonMap(new TopicPartition(m.topic(), m.partition()),
3117-
new OffsetAndMetadata(m.offset() + 1)));
3118-
});
3119-
return new KafkaMessageListenerContainer<>(cf, props);
3096+
@Transactional
3097+
public void process(List<Thing> things) {
3098+
things.forEach(thing -> this.kafkaTemplate.send("topic", thing));
3099+
updateDb(things);
31203100
}
31213101
----
31223102
====
31233103

3124-
NOTE: The offset to be committed is one greater than the offset of the records processed by the listener.
3125-
3126-
IMPORTANT: You should call this only when you use transaction synchronization.
3127-
When a listener container is configured to use a `KafkaTransactionManager` or `ChainedKafkaTransactionManager`, it takes care of sending the offsets to the transaction.
3128-
3129-
See <<ex-jdbc-sync>> for an example application that synchronizes JDBC and Kafka transactions.
3104+
The interceptor for the `@Transactional` annotation starts the transaction and the `KafkaTemplate` will synchronize a transaction with that transaction manager; each send will participate in that transaction.
3105+
When the method exits, the database transaction will commit followed by the Kafka transaction.
3106+
If you wish the commits to be performed in the reverse order (Kafka first), use a <<chained-transaction-manager,`ChainedTransactionManager`>> configured with the `DataSourceTransactionManager` as the first TM and then the `KafkaTransactionManager`, in that order.
31303107

31313108
[[chained-transaction-manager]]
31323109
===== Using `ChainedKafkaTransactionManager`
@@ -3137,7 +3114,7 @@ Since it is a `KafkaAwareTransactionManager`, the container can send the offsets
31373114
This provides another mechanism for synchronizing transactions without having to send the offsets to the transaction in the listener code.
31383115
You should chain your transaction managers in the desired order and provide the `ChainedTransactionManager` in the `ContainerProperties`.
31393116

3140-
See <<ex-jdbc-sync>> for an example application that synchronizes JDBC and Kafka transactions.
3117+
See <<ex-jdbc-sync>> for an example application that chains JDBC and Kafka transactions.
31413118

31423119
Starting with version 2.5.4, you can configure a `TransactionDefinition` in the `ContainerProperties`; its properties will be copied to the container's `TransactionTemplate` used to start transactions.
31433120
This allows, for example, setting a transaction timeout for other transaction managers within the `ChainedKafkaTransactionManager`.
@@ -3182,6 +3159,8 @@ This value should be the same for all application instances.
31823159
For transactions started by the template (or the transaction manager for `@Transaction`) you should set the property on the template and transaction manager respectively.
31833160
This property must have a different value on each application instance.
31843161

3162+
This problem has been eliminated when `EOSMode.BETA` is being used (with broker versions >= 2.5); see <<exactly-once>>.
3163+
31853164
[[tx-template-mixed]]
31863165
===== `KafkaTemplate` Transactional and non-Transactional Publishing
31873166

src/reference/asciidoc/tips.adoc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ You should also set the container's `AckMode` to `MANUAL` to prevent the contain
4747
Howewever, starting with version 2.5.5, as shown above, you can apply an initial offset to all partitions; see <<manual-assignment>> for more information.
4848

4949
[[ex-jdbc-sync]]
50-
=== Example of Transaction Synchronization
50+
=== Example of Transactions with ChainedKafkaTransactionManager
5151

5252
The following Spring Boot application is an example of synchronizing database and Kafka transactions.
5353

0 commit comments

Comments
 (0)