Skip to content

Commit e876e0a

Browse files
committed
GH-3989 Consder the custom name of the reply topic name in sendAndReceive
Signed-off-by: mipo256 <[email protected]>
1 parent 8126e4b commit e876e0a

File tree

4 files changed

+24
-28
lines changed

4 files changed

+24
-28
lines changed

build.gradle

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
import org.jetbrains.kotlin.gradle.dsl.JvmTarget
2+
import org.jetbrains.kotlin.gradle.tasks.KotlinCompilationTask
3+
14
buildscript {
25
ext.kotlinVersion = '2.1.21'
36
ext.isCI = System.getenv('GITHUB_ACTION')
@@ -147,9 +150,9 @@ configure(javaProjects) { subproject ->
147150
options.encoding = 'UTF-8'
148151
}
149152

150-
tasks.withType(org.jetbrains.kotlin.gradle.tasks.KotlinCompilationTask).configureEach {
153+
tasks.withType(KotlinCompilationTask).configureEach {
151154
compilerOptions {
152-
jvmTarget = org.jetbrains.kotlin.gradle.dsl.JvmTarget.JVM_17
155+
jvmTarget = JvmTarget.JVM_17
153156
javaParameters = true
154157
allWarningsAsErrors = true
155158
}

spring-kafka/src/main/java/org/springframework/kafka/listener/DeliveryAttemptAware.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
*
2424
* @author Gary Russell
2525
* @since 2.5
26-
*
2726
*/
2827
@FunctionalInterface
2928
public interface DeliveryAttemptAware {

spring-kafka/src/main/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplate.java

Lines changed: 17 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -16,19 +16,6 @@
1616

1717
package org.springframework.kafka.requestreply;
1818

19-
import java.nio.ByteBuffer;
20-
import java.nio.charset.StandardCharsets;
21-
import java.time.Duration;
22-
import java.time.Instant;
23-
import java.util.Collection;
24-
import java.util.List;
25-
import java.util.UUID;
26-
import java.util.concurrent.ConcurrentHashMap;
27-
import java.util.concurrent.ConcurrentMap;
28-
import java.util.concurrent.CountDownLatch;
29-
import java.util.concurrent.TimeUnit;
30-
import java.util.function.Function;
31-
3219
import io.micrometer.observation.Observation;
3320
import org.apache.kafka.clients.consumer.ConsumerRecord;
3421
import org.apache.kafka.clients.producer.ProducerRecord;
@@ -37,7 +24,6 @@
3724
import org.apache.kafka.common.header.Headers;
3825
import org.apache.kafka.common.header.internals.RecordHeader;
3926
import org.jspecify.annotations.Nullable;
40-
4127
import org.springframework.beans.factory.DisposableBean;
4228
import org.springframework.beans.factory.InitializingBean;
4329
import org.springframework.context.SmartLifecycle;
@@ -62,6 +48,20 @@
6248
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
6349
import org.springframework.util.Assert;
6450

51+
import java.nio.ByteBuffer;
52+
import java.nio.charset.StandardCharsets;
53+
import java.time.Duration;
54+
import java.time.Instant;
55+
import java.util.Collection;
56+
import java.util.List;
57+
import java.util.Optional;
58+
import java.util.UUID;
59+
import java.util.concurrent.ConcurrentHashMap;
60+
import java.util.concurrent.ConcurrentMap;
61+
import java.util.concurrent.CountDownLatch;
62+
import java.util.concurrent.TimeUnit;
63+
import java.util.function.Function;
64+
6565
/**
6666
* A KafkaTemplate that implements request/reply semantics.
6767
*
@@ -73,6 +73,7 @@
7373
* @author Artem Bilan
7474
* @author Borahm Lee
7575
* @author Francois Rosiere
76+
* @author Mikhail Polivakha
7677
*
7778
* @since 2.1.3
7879
*
@@ -415,14 +416,11 @@ public RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record) {
415416
@Override
416417
public RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record, @Nullable Duration replyTimeout) {
417418
Assert.state(this.running, "Template has not been started"); // NOSONAR (sync)
418-
Duration timeout = replyTimeout;
419-
if (timeout == null) {
420-
timeout = this.defaultReplyTimeout;
421-
}
419+
Duration timeout = Optional.ofNullable(replyTimeout).orElse(defaultReplyTimeout);
422420
CorrelationKey correlationId = this.correlationStrategy.apply(record);
423421
Assert.notNull(correlationId, "the created 'correlationId' cannot be null");
424422
Headers headers = record.headers();
425-
boolean hasReplyTopic = headers.lastHeader(KafkaHeaders.REPLY_TOPIC) != null;
423+
boolean hasReplyTopic = headers.lastHeader(this.replyTopicHeaderName) != null;
426424
if (!hasReplyTopic && this.replyTopic != null) {
427425
headers.add(new RecordHeader(this.replyTopicHeaderName, this.replyTopic));
428426
if (this.replyPartition != null) {

spring-kafka/src/main/java/org/springframework/kafka/support/AbstractKafkaHeaderMapper.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -291,12 +291,8 @@ protected boolean doesMatchMultiValueHeader(String headerName) {
291291
}
292292

293293
private boolean doesMatchMultiValueHeaderInternal(String headerName) {
294-
for (HeaderMatcher headerMatcher : this.multiValueHeaderMatchers) {
295-
if (headerMatcher.matchHeader(headerName)) {
296-
return true;
297-
}
298-
}
299-
return false;
294+
return multiValueHeaderMatchers.stream()
295+
.anyMatch(headerMatcher -> headerMatcher.matchHeader(headerName));
300296
}
301297

302298
/**

0 commit comments

Comments
 (0)