Skip to content

Commit b404785

Browse files
committed
DLPR: Restore/deprecate KafkaTemplate CTORs
- allow binary backwards compatibility in SCSt
1 parent a9b9d44 commit b404785

10 files changed

+84
-38
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/KafkaOperations.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,11 @@ public interface KafkaOperations<K, V> {
222222
* @since 2.5
223223
* @see Producer#sendOffsetsToTransaction(Map, ConsumerGroupMetadata)
224224
*/
225-
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, ConsumerGroupMetadata groupMetadata);
225+
default void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
226+
ConsumerGroupMetadata groupMetadata) {
227+
228+
throw new UnsupportedOperationException();
229+
}
226230

227231
/**
228232
* Return true if the implementation supports transactions (has a transaction-capable

spring-kafka/src/main/java/org/springframework/kafka/listener/DeadLetterPublishingRecoverer.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535

3636
import org.springframework.core.log.LogAccessor;
3737
import org.springframework.kafka.core.KafkaOperations;
38+
import org.springframework.kafka.core.KafkaTemplate;
3839
import org.springframework.kafka.support.KafkaHeaders;
3940
import org.springframework.kafka.support.serializer.DeserializationException;
4041
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
@@ -92,6 +93,34 @@ public DeadLetterPublishingRecoverer(KafkaOperations<? extends Object, ? extends
9293
this(Collections.singletonMap(Object.class, template), destinationResolver);
9394
}
9495

96+
/**
97+
* Create an instance with the provided template and a default destination resolving
98+
* function that returns a TopicPartition based on the original topic (appended with ".DLT")
99+
* from the failed record, and the same partition as the failed record. Therefore the
100+
* dead-letter topic must have at least as many partitions as the original topic.
101+
* @param template the {@link KafkaTemplate} to use for publishing.
102+
* @deprecated in favor of {@link #DeadLetterPublishingRecoverer(KafkaOperations)}.
103+
*/
104+
@Deprecated
105+
public DeadLetterPublishingRecoverer(KafkaTemplate<? extends Object, ? extends Object> template) {
106+
this(template, DEFAULT_DESTINATION_RESOLVER);
107+
}
108+
109+
/**
110+
* Create an instance with the provided template and destination resolving function,
111+
* that receives the failed consumer record and the exception and returns a
112+
* {@link TopicPartition}. If the partition in the {@link TopicPartition} is less than
113+
* 0, no partition is set when publishing to the topic.
114+
* @param template the {@link KafkaOperations} to use for publishing.
115+
* @param destinationResolver the resolving function.
116+
* @deprecated in favor of {@link #DeadLetterPublishingRecoverer(KafkaOperations, BiFunction)}.
117+
*/
118+
@Deprecated
119+
public DeadLetterPublishingRecoverer(KafkaTemplate<? extends Object, ? extends Object> template,
120+
BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> destinationResolver) {
121+
this(Collections.singletonMap(Object.class, template), destinationResolver);
122+
}
123+
95124
/**
96125
* Create an instance with the provided templates and a default destination resolving
97126
* function that returns a TopicPartition based on the original topic (appended with

spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessor.java

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
2626
import org.apache.kafka.common.TopicPartition;
2727

28+
import org.springframework.kafka.core.KafkaOperations;
2829
import org.springframework.kafka.core.KafkaTemplate;
2930
import org.springframework.lang.Nullable;
3031
import org.springframework.util.backoff.BackOff;
@@ -47,7 +48,7 @@
4748
*/
4849
public class DefaultAfterRollbackProcessor<K, V> extends FailedRecordProcessor implements AfterRollbackProcessor<K, V> {
4950

50-
private KafkaTemplate<K, V> kafkaTemplate;
51+
private KafkaOperations<K, V> kafkaTemplate;
5152

5253
/**
5354
* Construct an instance with the default recoverer which simply logs the record after
@@ -118,12 +119,12 @@ public boolean isProcessInTransaction() {
118119
* Set to true and the container will run the
119120
* {@link #process(List, Consumer, Exception, boolean)} method in a transaction and,
120121
* if a record is skipped and recovered, we will send its offset to the transaction.
121-
* Requires a {@link KafkaTemplate}.
122+
* Requires a {@link KafkaOperations}.
122123
* @param commitRecovered true to process in a transaction.
123124
* @since 2.3
124125
* @see #isProcessInTransaction()
125126
* @see #process(List, Consumer, Exception, boolean)
126-
* @see #setKafkaTemplate(KafkaTemplate)
127+
* @see #setKafkaOperations(KafkaOperations)
127128
*/
128129
@Override
129130
public void setCommitRecovered(boolean commitRecovered) { // NOSONAR enhanced javadoc
@@ -133,12 +134,25 @@ public void setCommitRecovered(boolean commitRecovered) { // NOSONAR enhanced ja
133134
/**
134135
* Set a {@link KafkaTemplate} to use to send the offset of a recovered record
135136
* to a transaction.
136-
* @param kafkaTemplate the template
137+
* @param kafkaTemplate the template.
137138
* @since 2.2.5
138139
* @see #setCommitRecovered(boolean)
140+
* @deprecated in favor of {@link #setKafkaOperations(KafkaOperations)}.
139141
*/
142+
@Deprecated
140143
public void setKafkaTemplate(KafkaTemplate<K, V> kafkaTemplate) {
141144
this.kafkaTemplate = kafkaTemplate;
142145
}
143146

147+
/**
148+
* Set a {@link KafkaOperations} to use to send the offset of a recovered record
149+
* to a transaction.
150+
* @param kafkaOperations the operations.
151+
* @since 2.3.8
152+
* @see #setCommitRecovered(boolean)
153+
*/
154+
public void setKafkaOperations(KafkaOperations<K, V> kafkaOperations) {
155+
this.kafkaTemplate = kafkaOperations;
156+
}
157+
144158
}

spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -324,8 +324,8 @@ public void testDeadLetterPublisherWhileTransactionActive() {
324324
given(pf.transactionCapable()).willReturn(true);
325325
given(pf.createProducer(isNull())).willReturn(producer1).willReturn(producer2);
326326

327-
KafkaTemplate<Object, Object> template = spy(new KafkaTemplate<>(pf));
328-
template.setDefaultTopic(STRING_KEY_TOPIC);
327+
KafkaOperations<Object, Object> template = spy(new KafkaTemplate<>(pf));
328+
((KafkaTemplate<Object, Object>) template).setDefaultTopic(STRING_KEY_TOPIC);
329329

330330
KafkaTransactionManager<Object, Object> tm = new KafkaTransactionManager<>(pf);
331331

spring-kafka/src/test/java/org/springframework/kafka/listener/DeadLetterPublishingRecovererTests.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@
4141

4242
import org.springframework.kafka.core.KafkaOperations;
4343
import org.springframework.kafka.core.KafkaOperations.OperationsCallback;
44-
import org.springframework.kafka.core.KafkaTemplate;
4544
import org.springframework.kafka.support.serializer.DeserializationException;
4645
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
4746
import org.springframework.util.concurrent.SettableListenableFuture;
@@ -56,7 +55,7 @@ public class DeadLetterPublishingRecovererTests {
5655
@SuppressWarnings("unchecked")
5756
@Test
5857
void testTxNoTx() {
59-
KafkaTemplate<?, ?> template = mock(KafkaTemplate.class);
58+
KafkaOperations<?, ?> template = mock(KafkaOperations.class);
6059
given(template.isTransactional()).willReturn(true);
6160
given(template.inTransaction()).willReturn(false);
6261
given(template.isAllowNonTransactional()).willReturn(true);
@@ -70,7 +69,7 @@ void testTxNoTx() {
7069
@SuppressWarnings("unchecked")
7170
@Test
7271
void testTxExisting() {
73-
KafkaTemplate<?, ?> template = mock(KafkaTemplate.class);
72+
KafkaOperations<?, ?> template = mock(KafkaOperations.class);
7473
given(template.isTransactional()).willReturn(true);
7574
given(template.inTransaction()).willReturn(true);
7675
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
@@ -83,7 +82,7 @@ void testTxExisting() {
8382
@SuppressWarnings("unchecked")
8483
@Test
8584
void testNonTx() {
86-
KafkaTemplate<?, ?> template = mock(KafkaTemplate.class);
85+
KafkaOperations<?, ?> template = mock(KafkaOperations.class);
8786
given(template.isTransactional()).willReturn(false);
8887
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
8988
ConsumerRecord<String, String> record = new ConsumerRecord<>("foo", 0, 0L, "bar", "baz");
@@ -96,7 +95,7 @@ void testNonTx() {
9695
@SuppressWarnings({ "unchecked", "rawtypes" })
9796
@Test
9897
void testTxNewTx() {
99-
KafkaTemplate<?, ?> template = mock(KafkaTemplate.class);
98+
KafkaOperations<?, ?> template = mock(KafkaOperations.class);
10099
given(template.isTransactional()).willReturn(true);
101100
given(template.inTransaction()).willReturn(false);
102101
given(template.isAllowNonTransactional()).willReturn(false);

spring-kafka/src/test/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessorTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public void testClassifier() {
5858
@SuppressWarnings("unchecked")
5959
KafkaTemplate<String, String> template = mock(KafkaTemplate.class);
6060
given(template.isTransactional()).willReturn(true);
61-
processor.setKafkaTemplate(template);
61+
processor.setKafkaOperations(template);
6262
processor.setCommitRecovered(true);
6363
ConsumerRecord<String, String> record1 = new ConsumerRecord<>("foo", 0, 0L, "foo", "bar");
6464
ConsumerRecord<String, String> record2 = new ConsumerRecord<>("foo", 1, 1L, "foo", "bar");

spring-kafka/src/test/java/org/springframework/kafka/listener/RecoveringBatchErrorHandlerIntegrationTests.java

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434

3535
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
3636
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
37+
import org.springframework.kafka.core.KafkaOperations;
3738
import org.springframework.kafka.core.KafkaTemplate;
3839
import org.springframework.kafka.event.ConsumerStoppedEvent;
3940
import org.springframework.kafka.test.EmbeddedKafkaBroker;
@@ -80,7 +81,7 @@ public void recoveryAndDlt() throws InterruptedException {
8081

8182
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
8283
DefaultKafkaProducerFactory<Object, Object> pf = new DefaultKafkaProducerFactory<>(senderProps);
83-
final KafkaTemplate<Object, Object> template = new KafkaTemplate<>(pf);
84+
final KafkaOperations<Object, Object> template = new KafkaTemplate<>(pf);
8485
final CountDownLatch latch = new CountDownLatch(3);
8586
List<ConsumerRecord<Integer, String>> data = new ArrayList<>();
8687
containerProps.setMessageListener((BatchMessageListener<Integer, String>) records -> {
@@ -109,13 +110,12 @@ public void recoveryAndDlt() throws InterruptedException {
109110
});
110111
container.start();
111112

112-
template.setDefaultTopic(topic1);
113-
template.sendDefault(0, 0, "foo");
114-
template.sendDefault(0, 0, "bar");
115-
template.sendDefault(0, 0, "baz");
116-
template.sendDefault(0, 0, "qux");
117-
template.sendDefault(0, 0, "fiz");
118-
template.sendDefault(0, 0, "buz");
113+
template.send(topic1, 0, 0, "foo");
114+
template.send(topic1, 0, 0, "bar");
115+
template.send(topic1, 0, 0, "baz");
116+
template.send(topic1, 0, 0, "qux");
117+
template.send(topic1, 0, 0, "fiz");
118+
template.send(topic1, 0, 0, "buz");
119119
assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue();
120120
assertThat(data).hasSize(13);
121121
assertThat(data)
@@ -148,7 +148,7 @@ public void recoveryFails() throws InterruptedException {
148148

149149
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
150150
DefaultKafkaProducerFactory<Object, Object> pf = new DefaultKafkaProducerFactory<>(senderProps);
151-
final KafkaTemplate<Object, Object> template = new KafkaTemplate<>(pf);
151+
final KafkaOperations<Object, Object> template = new KafkaTemplate<>(pf);
152152
final CountDownLatch latch = new CountDownLatch(4);
153153
List<ConsumerRecord<Integer, String>> data = new ArrayList<>();
154154
containerProps.setMessageListener((BatchMessageListener<Integer, String>) records -> {
@@ -188,13 +188,12 @@ public void accept(ConsumerRecord<?, ?> record, Exception exception) {
188188
});
189189
container.start();
190190

191-
template.setDefaultTopic(topic2);
192-
template.sendDefault(0, 0, "foo");
193-
template.sendDefault(0, 0, "bar");
194-
template.sendDefault(0, 0, "baz");
195-
template.sendDefault(0, 0, "qux");
196-
template.sendDefault(0, 0, "fiz");
197-
template.sendDefault(0, 0, "buz");
191+
template.send(topic2, 0, 0, "foo");
192+
template.send(topic2, 0, 0, "bar");
193+
template.send(topic2, 0, 0, "baz");
194+
template.send(topic2, 0, 0, "qux");
195+
template.send(topic2, 0, 0, "fiz");
196+
template.send(topic2, 0, 0, "buz");
198197
assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue();
199198
assertThat(data).hasSize(17);
200199
assertThat(data)

spring-kafka/src/test/java/org/springframework/kafka/listener/RetryingBatchErrorHandlerIntegrationTests.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434

3535
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
3636
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
37+
import org.springframework.kafka.core.KafkaOperations;
3738
import org.springframework.kafka.core.KafkaTemplate;
3839
import org.springframework.kafka.event.ConsumerStoppedEvent;
3940
import org.springframework.kafka.test.EmbeddedKafkaBroker;
@@ -78,7 +79,7 @@ public void testRetriesAndDlt() throws InterruptedException {
7879

7980
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
8081
DefaultKafkaProducerFactory<Object, Object> pf = new DefaultKafkaProducerFactory<>(senderProps);
81-
final KafkaTemplate<Object, Object> template = new KafkaTemplate<>(pf);
82+
final KafkaOperations<Object, Object> template = new KafkaTemplate<>(pf);
8283
final CountDownLatch latch = new CountDownLatch(3);
8384
AtomicReference<List<ConsumerRecord<Integer, String>>> data = new AtomicReference<>();
8485
containerProps.setMessageListener((BatchMessageListener<Integer, String>) records -> {
@@ -116,8 +117,7 @@ public void accept(ConsumerRecord<?, ?> record, Exception exception) {
116117
});
117118
container.start();
118119

119-
template.setDefaultTopic(topic1);
120-
template.sendDefault(0, 0, "foo");
120+
template.send(topic1, 0, 0, "foo");
121121
assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue();
122122
assertThat(data.get()).hasSize(1);
123123
assertThat(data.get().iterator().next().value()).isEqualTo("foo");
@@ -145,7 +145,7 @@ public void testRetriesCantRecover() throws InterruptedException {
145145

146146
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
147147
DefaultKafkaProducerFactory<Object, Object> pf = new DefaultKafkaProducerFactory<>(senderProps);
148-
final KafkaTemplate<Object, Object> template = new KafkaTemplate<>(pf);
148+
final KafkaOperations<Object, Object> template = new KafkaTemplate<>(pf);
149149
final CountDownLatch latch = new CountDownLatch(6);
150150
AtomicReference<List<ConsumerRecord<Integer, String>>> data = new AtomicReference<>();
151151
containerProps.setMessageListener((BatchMessageListener<Integer, String>) records -> {
@@ -187,8 +187,7 @@ public void accept(ConsumerRecord<?, ?> record, Exception exception) {
187187
});
188188
container.start();
189189

190-
template.setDefaultTopic(topic2);
191-
template.sendDefault(0, 0, "foo");
190+
template.send(topic2, 0, 0, "foo");
192191
assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue();
193192
assertThat(data.get()).hasSize(1);
194193
assertThat(data.get().iterator().next().value()).isEqualTo("foo");

spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@
7676
import org.springframework.kafka.core.ConsumerFactory;
7777
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
7878
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
79+
import org.springframework.kafka.core.KafkaOperations;
7980
import org.springframework.kafka.core.KafkaTemplate;
8081
import org.springframework.kafka.core.ProducerFactory;
8182
import org.springframework.kafka.core.ProducerFactoryUtils;
@@ -603,7 +604,7 @@ public void testMaxFailures() throws Exception {
603604
new KafkaMessageListenerContainer<>(cf, containerProps);
604605
container.setBeanName("testMaxFailures");
605606
final CountDownLatch recoverLatch = new CountDownLatch(1);
606-
final KafkaTemplate<Object, Object> dlTemplate = spy(new KafkaTemplate<>(pf));
607+
final KafkaOperations<Object, Object> dlTemplate = spy(new KafkaTemplate<>(pf));
607608
AtomicBoolean recovererShouldFail = new AtomicBoolean(true);
608609
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(dlTemplate) {
609610

@@ -620,7 +621,7 @@ public void accept(ConsumerRecord<?, ?> record, Exception exception) {
620621
DefaultAfterRollbackProcessor<Object, Object> afterRollbackProcessor =
621622
spy(new DefaultAfterRollbackProcessor<>(recoverer, new FixedBackOff(0L, 2L)));
622623
afterRollbackProcessor.setCommitRecovered(true);
623-
afterRollbackProcessor.setKafkaTemplate(dlTemplate);
624+
afterRollbackProcessor.setKafkaOperations(dlTemplate);
624625
container.setAfterRollbackProcessor(afterRollbackProcessor);
625626
final CountDownLatch stopLatch = new CountDownLatch(1);
626627
container.setApplicationEventPublisher(e -> {

spring-kafka/src/test/java/org/springframework/kafka/streams/RecoveringDeserializationExceptionHandlerTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import org.springframework.kafka.core.ConsumerFactory;
5454
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
5555
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
56+
import org.springframework.kafka.core.KafkaOperations;
5657
import org.springframework.kafka.core.KafkaTemplate;
5758
import org.springframework.kafka.core.ProducerFactory;
5859
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
@@ -176,7 +177,7 @@ public Map<String, Object> producerConfigs() {
176177
}
177178

178179
@Bean
179-
public KafkaTemplate<byte[], byte[]> template() {
180+
public KafkaOperations<byte[], byte[]> template() {
180181
KafkaTemplate<byte[], byte[]> kafkaTemplate = new KafkaTemplate<>(producerFactory(), true);
181182
kafkaTemplate.setDefaultTopic("recoverer1");
182183
return kafkaTemplate;

0 commit comments

Comments
 (0)