Skip to content

Commit be1156a

Browse files
committed
GH-3616: Defer default topic resolution to the ReplyingKafkaTemplate
Fixes: #3616 Issue link: #3616 The `KafkaProducerMessageHandler` uses an unnecessary logic to determine a default topic/partition. It is better to push such a logic down to the `ReplyingKafkaTemplate` as more general solution * Remove `KafkaProducerMessageHandler.getReplyTopic()` logic altogether * Clean up tests for removed logic * Add `exclude group: 'ch.qos.logback'` to be able to control logging for SI-Kafka module * Remove out-dated sentence from `kafka.adoc`
1 parent 05f5cd0 commit be1156a

File tree

4 files changed

+52
-137
lines changed

4 files changed

+52
-137
lines changed

build.gradle

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -786,7 +786,9 @@ project('spring-integration-kafka') {
786786
dependencies {
787787
api 'org.springframework.kafka:spring-kafka'
788788

789-
testImplementation 'org.springframework.kafka:spring-kafka-test'
789+
testImplementation ('org.springframework.kafka:spring-kafka-test') {
790+
exclude group: 'ch.qos.logback'
791+
}
790792
testImplementation 'com.fasterxml.jackson.core:jackson-databind'
791793
}
792794
}

spring-integration-kafka/src/main/java/org/springframework/integration/kafka/outbound/KafkaProducerMessageHandler.java

Lines changed: 43 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2013-2023 the original author or authors.
2+
* Copyright 2013-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -19,11 +19,9 @@
1919
import java.lang.reflect.Type;
2020
import java.nio.charset.StandardCharsets;
2121
import java.time.Duration;
22-
import java.util.Collection;
2322
import java.util.HashMap;
2423
import java.util.Map;
2524
import java.util.Set;
26-
import java.util.TreeSet;
2725
import java.util.concurrent.CompletableFuture;
2826
import java.util.concurrent.ExecutionException;
2927
import java.util.concurrent.Future;
@@ -33,9 +31,7 @@
3331

3432
import org.apache.kafka.clients.producer.ProducerConfig;
3533
import org.apache.kafka.clients.producer.ProducerRecord;
36-
import org.apache.kafka.common.TopicPartition;
3734
import org.apache.kafka.common.header.Headers;
38-
import org.apache.kafka.common.header.internals.RecordHeader;
3935
import org.apache.kafka.common.header.internals.RecordHeaders;
4036

4137
import org.springframework.expression.EvaluationContext;
@@ -423,8 +419,7 @@ public void setUseTemplateConverter(boolean useTemplateConverter) {
423419
}
424420

425421
/**
426-
* Set the time to wait for partition assignment, when used as a gateway, to determine
427-
* the default reply-to topic/partition.
422+
* Set the time to wait for partition assignment, when used as a gateway.
428423
* @param assignmentDuration the assignmentDuration to set.
429424
* @since 6.0
430425
*/
@@ -500,8 +495,7 @@ protected Object handleRequestMessage(final Message<?> message) {
500495
final ProducerRecord<K, V> producerRecord;
501496
boolean flush =
502497
Boolean.TRUE.equals(this.flushExpression.getValue(this.evaluationContext, message, Boolean.class));
503-
boolean preBuilt = message.getPayload() instanceof ProducerRecord;
504-
if (preBuilt) {
498+
if (message.getPayload() instanceof ProducerRecord) {
505499
producerRecord = (ProducerRecord<K, V>) message.getPayload();
506500
}
507501
else {
@@ -517,11 +511,11 @@ protected Object handleRequestMessage(final Message<?> message) {
517511
CompletableFuture<SendResult<K, V>> sendFuture;
518512
RequestReplyFuture<K, V, Object> gatewayFuture = null;
519513
try {
520-
if (this.isGateway
521-
&& (!preBuilt || producerRecord.headers().lastHeader(KafkaHeaders.REPLY_TOPIC) == null)) {
522-
producerRecord.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, getReplyTopic(message)));
523-
gatewayFuture = ((ReplyingKafkaTemplate<K, V, Object>) this.kafkaTemplate)
524-
.sendAndReceive(producerRecord);
514+
if (this.isGateway) {
515+
waitForAssignment();
516+
addReplyTopicIfAny(message.getHeaders(), producerRecord.headers());
517+
gatewayFuture =
518+
((ReplyingKafkaTemplate<K, V, Object>) this.kafkaTemplate).sendAndReceive(producerRecord);
525519
sendFuture = gatewayFuture.getSendFuture();
526520
}
527521
else {
@@ -554,24 +548,6 @@ protected Object handleRequestMessage(final Message<?> message) {
554548
return processReplyFuture(gatewayFuture);
555549
}
556550

557-
private void sendFutureIfRequested(CompletableFuture<SendResult<K, V>> sendFuture, Object futureToken) {
558-
559-
if (futureToken != null) {
560-
MessageChannel futures = getFuturesChannel();
561-
if (futures != null) {
562-
try {
563-
futures.send(getMessageBuilderFactory()
564-
.withPayload(sendFuture)
565-
.setHeader(KafkaIntegrationHeaders.FUTURE_TOKEN, futureToken)
566-
.build());
567-
}
568-
catch (Exception e) {
569-
this.logger.error(e, "Failed to send sendFuture");
570-
}
571-
}
572-
}
573-
}
574-
575551
@SuppressWarnings("unchecked")
576552
private ProducerRecord<K, V> createProducerRecord(final Message<?> message) {
577553
MessageHeaders messageHeaders = message.getHeaders();
@@ -604,91 +580,56 @@ private ProducerRecord<K, V> createProducerRecord(final Message<?> message) {
604580
payload = null;
605581
}
606582

607-
Headers headers = null;
583+
Headers headers = new RecordHeaders();
608584
if (this.headerMapper != null) {
609-
headers = new RecordHeaders();
610585
this.headerMapper.fromHeaders(messageHeaders, headers);
611586
}
587+
612588
return this.producerRecordCreator.create(message, topic, partitionId, timestamp, (K) messageKey, payload,
613589
headers);
614590
}
615591

616-
private byte[] getReplyTopic(Message<?> message) { // NOSONAR
617-
if (this.replyTopicsAndPartitions.isEmpty()) {
618-
determineValidReplyTopicsAndPartitions();
619-
}
620-
Object replyHeader = message.getHeaders().get(KafkaHeaders.REPLY_TOPIC);
621-
byte[] replyTopic = null;
622-
String topicToCheck = null;
623-
if (replyHeader instanceof String) {
624-
replyTopic = ((String) replyHeader).getBytes(StandardCharsets.UTF_8);
625-
topicToCheck = (String) replyHeader;
626-
}
627-
else if (replyHeader instanceof byte[]) {
628-
replyTopic = (byte[]) replyHeader;
629-
}
630-
else if (replyHeader != null) {
631-
throw new IllegalStateException(KafkaHeaders.REPLY_TOPIC + " must be String or byte[]");
592+
private void waitForAssignment() {
593+
ReplyingKafkaTemplate<?, ?, ?> rkt = (ReplyingKafkaTemplate<?, ?, ?>) this.kafkaTemplate;
594+
try {
595+
rkt.waitForAssignment(this.assignmentDuration);
632596
}
633-
if (replyTopic == null) {
634-
if (this.replyTopicsAndPartitions.size() == 1) {
635-
replyTopic = getSingleReplyTopic();
636-
}
637-
else {
638-
throw new IllegalStateException("No reply topic header and no default reply topic can be determined; "
639-
+ "container's assigned partitions: " + this.replyTopicsAndPartitions);
640-
}
597+
catch (InterruptedException e) {
598+
Thread.currentThread().interrupt();
641599
}
642-
else {
643-
if (topicToCheck == null) {
644-
topicToCheck = new String(replyTopic, StandardCharsets.UTF_8);
645-
}
646-
if (!this.replyTopicsAndPartitions.containsKey(topicToCheck)) {
647-
throw new IllegalStateException("The reply topic header ["
648-
+ topicToCheck +
649-
"] does not match any reply container topic: " + this.replyTopicsAndPartitions.keySet());
600+
}
601+
602+
@Nullable
603+
private void addReplyTopicIfAny(MessageHeaders messageHeaders, Headers headers) {
604+
if (this.isGateway) {
605+
Object replyHeader = messageHeaders.get(KafkaHeaders.REPLY_TOPIC);
606+
if (replyHeader instanceof String topicString) {
607+
headers.add(KafkaHeaders.REPLY_TOPIC, topicString.getBytes(StandardCharsets.UTF_8));
650608
}
651-
}
652-
Integer replyPartition = message.getHeaders().get(KafkaHeaders.REPLY_PARTITION, Integer.class);
653-
if (replyPartition != null) {
654-
if (topicToCheck == null) {
655-
topicToCheck = new String(replyTopic, StandardCharsets.UTF_8);
609+
else if (replyHeader instanceof byte[] topicBytes) {
610+
headers.add(KafkaHeaders.REPLY_TOPIC, topicBytes);
656611
}
657-
if (!this.replyTopicsAndPartitions.get(topicToCheck).contains(replyPartition)) {
658-
throw new IllegalStateException("The reply partition header ["
659-
+ replyPartition + "] does not match any reply container partition for topic ["
660-
+ topicToCheck + "]: " + this.replyTopicsAndPartitions.get(topicToCheck));
612+
else if (replyHeader != null) {
613+
throw new IllegalStateException(KafkaHeaders.REPLY_TOPIC + " must be String or byte[]");
661614
}
662615
}
663-
return replyTopic;
664616
}
665617

666-
private byte[] getSingleReplyTopic() {
667-
if (this.singleReplyTopic == null) {
668-
this.singleReplyTopic = this.replyTopicsAndPartitions.keySet()
669-
.iterator()
670-
.next()
671-
.getBytes(StandardCharsets.UTF_8);
672-
}
673-
return this.singleReplyTopic;
674-
}
618+
private void sendFutureIfRequested(CompletableFuture<SendResult<K, V>> sendFuture, Object futureToken) {
675619

676-
private void determineValidReplyTopicsAndPartitions() {
677-
ReplyingKafkaTemplate<?, ?, ?> rkt = (ReplyingKafkaTemplate<?, ?, ?>) this.kafkaTemplate;
678-
try {
679-
rkt.waitForAssignment(this.assignmentDuration);
680-
}
681-
catch (InterruptedException e) {
682-
Thread.currentThread().interrupt();
683-
}
684-
Collection<TopicPartition> replyTopics = rkt.getAssignedReplyTopicPartitions();
685-
Map<String, Set<Integer>> topicsAndPartitions = new HashMap<>();
686-
if (replyTopics != null) {
687-
replyTopics.forEach(tp -> {
688-
topicsAndPartitions.computeIfAbsent(tp.topic(), (k) -> new TreeSet<>());
689-
topicsAndPartitions.get(tp.topic()).add(tp.partition());
690-
});
691-
this.replyTopicsAndPartitions.putAll(topicsAndPartitions);
620+
if (futureToken != null) {
621+
MessageChannel futures = getFuturesChannel();
622+
if (futures != null) {
623+
try {
624+
futures.send(getMessageBuilderFactory()
625+
.withPayload(sendFuture)
626+
.setHeader(KafkaIntegrationHeaders.FUTURE_TOKEN, futureToken)
627+
.build());
628+
}
629+
catch (Exception e) {
630+
this.logger.error(e, "Failed to send sendFuture");
631+
}
632+
}
692633
}
693634
}
694635

@@ -804,7 +745,7 @@ private Message<?> dontLeakHeaders(Message<?> message) {
804745
* @param <K> the key type.
805746
* @param <V> the value type.
806747
*
807-
* @since 3.2.1
748+
* @since 5.4
808749
*
809750
*/
810751
@FunctionalInterface

spring-integration-kafka/src/test/java/org/springframework/integration/kafka/outbound/KafkaProducerMessageHandlerTests.java

Lines changed: 6 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -121,17 +121,17 @@
121121
*/
122122
class KafkaProducerMessageHandlerTests {
123123

124-
private static String topic1 = "testTopic1out";
124+
private static final String topic1 = "testTopic1out";
125125

126-
private static String topic2 = "testTopic2out";
126+
private static final String topic2 = "testTopic2out";
127127

128-
private static String topic3 = "testTopic3out";
128+
private static final String topic3 = "testTopic3out";
129129

130-
private static String topic4 = "testTopic4out";
130+
private static final String topic4 = "testTopic4out";
131131

132-
private static String topic5 = "testTopic5out";
132+
private static final String topic5 = "testTopic5out";
133133

134-
private static String topic6 = "testTopic6in";
134+
private static final String topic6 = "testTopic6in";
135135

136136
private static EmbeddedKafkaBroker embeddedKafka;
137137

@@ -467,31 +467,6 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
467467
assertThat(reply.getHeaders().get(KafkaHeaders.TOPIC)).isNull();
468468
assertThat(reply.getHeaders().get(KafkaHeaders.CORRELATION_ID)).isNull();
469469

470-
final Message<?> messageToHandle1 = MessageBuilder.withPayload("foo")
471-
.setHeader(KafkaHeaders.TOPIC, topic5)
472-
.setHeader(KafkaHeaders.KEY, 2)
473-
.setHeader(KafkaHeaders.PARTITION, 1)
474-
.setHeader(KafkaHeaders.REPLY_TOPIC, "bad")
475-
.build();
476-
477-
assertThatExceptionOfType(MessageHandlingException.class)
478-
.isThrownBy(() -> handler.handleMessage(messageToHandle1))
479-
.withStackTraceContaining("The reply topic header [bad] does not match any reply container topic: "
480-
+ "[" + topic6 + "]");
481-
482-
final Message<?> messageToHandle2 = MessageBuilder.withPayload("foo")
483-
.setHeader(KafkaHeaders.TOPIC, topic5)
484-
.setHeader(KafkaHeaders.KEY, 2)
485-
.setHeader(KafkaHeaders.PARTITION, 1)
486-
.setHeader(KafkaHeaders.REPLY_PARTITION, 999)
487-
.build();
488-
489-
assertThatExceptionOfType(MessageHandlingException.class)
490-
.isThrownBy(() -> handler.handleMessage(messageToHandle2))
491-
.withStackTraceContaining("The reply partition header [999] " +
492-
"does not match any reply container partition for topic ["
493-
+ topic6 + "]: [0, 1]");
494-
495470
template.stop();
496471
// discard from the test consumer
497472
KafkaTestUtils.getSingleRecord(consumer, topic6);

src/reference/antora/modules/ROOT/pages/kafka.adoc

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -468,9 +468,6 @@ The outbound gateway is for request/reply operations.
468468
It differs from most Spring Integration gateways in that the sending thread does not block in the gateway, and the reply is processed on the reply listener container thread.
469469
If your code invokes the gateway behind a synchronous https://docs.spring.io/spring-integration/reference/html/messaging-endpoints-chapter.html#gateway[Messaging Gateway], the user thread blocks there until the reply is received (or a timeout occurs).
470470

471-
IMPORTANT: The gateway does not accept requests until the reply container has been assigned its topics and partitions.
472-
It is suggested that you add a `ConsumerRebalanceListener` to the template's reply container properties and wait for the `onPartitionsAssigned` call before sending messages to the gateway.
473-
474471
The `KafkaProducerMessageHandler` `sendTimeoutExpression` default is `delivery.timeout.ms` Kafka producer property `+ 5000` so that the actual Kafka error after a timeout is propagated to the application, instead of a timeout generated by this framework.
475472
This has been changed for consistency because you may get unexpected behavior (Spring may time out the `send()`, while it is actually, eventually, successful).
476473
IMPORTANT: That timeout is 120 seconds by default, so you may wish to reduce it to get more timely failures.

0 commit comments

Comments
 (0)