Skip to content

Commit 72fe0f3

Browse files
committed
GH-3161: Fix HandlerAdapter for Kotlin suspend functions
Fixes: #3161 The Kotlin `suspend` are async by nature. However, the `HandlerAdapter` is missing to detect that for its `asyncReplies` property which is used to set `acknowledgeMode` in the container to the `MANUAL` mode. * Add `KotlinDetector.isSuspendingFunction()` check to the `HandlerAdapter` for its `asyncReplies` property * Modify `EnableRabbitKotlinTests` to remove `setAcknowledgeMode(AcknowledgeMode.MANUAL)` from listener container factory beans. Instead, verify in the respective tests that listener container is switched to the `AcknowledgeMode.MANUAL` due to `true` outcome from the `KotlinDetector.isSuspendingFunction()` for those `suspend` functions in the test configuration **Auto-cherry-pick to `3.2.x`**
1 parent ed0a0b7 commit 72fe0f3

File tree

2 files changed

+40
-22
lines changed

2 files changed

+40
-22
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/adapter/HandlerAdapter.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import org.jspecify.annotations.Nullable;
2424

25+
import org.springframework.core.KotlinDetector;
2526
import org.springframework.messaging.Message;
2627
import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;
2728
import org.springframework.util.Assert;
@@ -32,6 +33,8 @@
3233
* underlying handler.
3334
*
3435
* @author Gary Russell
36+
* @author Artem Bilan
37+
*
3538
* @since 1.5
3639
*
3740
*/
@@ -50,9 +53,11 @@ public class HandlerAdapter {
5053
public HandlerAdapter(InvocableHandlerMethod invokerHandlerMethod) {
5154
this.invokerHandlerMethod = invokerHandlerMethod;
5255
this.delegatingHandler = null;
56+
Method method = invokerHandlerMethod.getMethod();
5357
this.asyncReplies = (AbstractAdaptableMessageListener.monoPresent
54-
&& MonoHandler.isMono(invokerHandlerMethod.getMethod().getReturnType()))
55-
|| CompletableFuture.class.isAssignableFrom(invokerHandlerMethod.getMethod().getReturnType());
58+
&& MonoHandler.isMono(method.getReturnType()))
59+
|| CompletableFuture.class.isAssignableFrom(method.getReturnType())
60+
|| KotlinDetector.isSuspendingFunction(method);
5661
}
5762

5863
/**

spring-rabbit/src/test/kotlin/org/springframework/amqp/rabbit/annotation/EnableRabbitKotlinTests.kt

Lines changed: 33 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import org.springframework.amqp.rabbit.connection.CachingConnectionFactory
3030
import org.springframework.amqp.rabbit.core.RabbitTemplate
3131
import org.springframework.amqp.rabbit.junit.RabbitAvailable
3232
import org.springframework.amqp.rabbit.junit.RabbitAvailableCondition
33+
import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer
3334
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry
3435
import org.springframework.amqp.rabbit.listener.api.RabbitListenerErrorHandler
3536
import org.springframework.amqp.utils.test.TestUtils
@@ -44,7 +45,6 @@ import org.springframework.test.annotation.DirtiesContext
4445
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig
4546
import java.util.concurrent.CountDownLatch
4647
import java.util.concurrent.TimeUnit
47-
import java.util.concurrent.atomic.AtomicBoolean
4848

4949
/**
5050
* Kotlin Annotated listener tests.
@@ -63,17 +63,24 @@ class EnableRabbitKotlinTests {
6363
@Autowired
6464
private lateinit var config: Config
6565

66+
@Autowired
67+
private lateinit var registry: RabbitListenerEndpointRegistry
68+
6669
@Test
67-
fun `send and wait for consume`(@Autowired registry: RabbitListenerEndpointRegistry) {
70+
fun `send and wait for consume`() {
6871
val template = RabbitTemplate(this.config.cf())
6972
template.setReplyTimeout(10_000)
7073
val result = template.convertSendAndReceive("kotlinQueue", "test")
7174
assertThat(result).isEqualTo("TEST")
72-
val listener = registry.getListenerContainer("single")?.messageListener
75+
var listenerContainer = registry.getListenerContainer("single") as AbstractMessageListenerContainer
76+
assertThat(listenerContainer.acknowledgeMode).isEqualTo(AcknowledgeMode.MANUAL)
77+
val listener = listenerContainer?.messageListener
7378
assertThat(listener).isNotNull()
7479
listener?.let { nonNullableListener ->
75-
assertThat(TestUtils.getPropertyValue(nonNullableListener, "messagingMessageConverter.inferredArgumentType")
76-
.toString())
80+
assertThat(
81+
TestUtils.getPropertyValue(nonNullableListener, "messagingMessageConverter.inferredArgumentType")
82+
.toString()
83+
)
7784
.isEqualTo("class java.lang.String")
7885
}
7986
}
@@ -86,6 +93,9 @@ class EnableRabbitKotlinTests {
8693
assertThat(this.config.batchReceived.await(10, TimeUnit.SECONDS)).isTrue()
8794
assertThat(this.config.batch[0]).isInstanceOf(Message::class.java)
8895
assertThat(this.config.batch.map { m -> String(m.body) }).containsOnly("test1", "test2")
96+
97+
var listenerContainer = registry.getListenerContainer("batch") as AbstractMessageListenerContainer
98+
assertThat(listenerContainer.acknowledgeMode).isEqualTo(AcknowledgeMode.MANUAL)
8999
}
90100

91101
@Test
@@ -95,45 +105,48 @@ class EnableRabbitKotlinTests {
95105
assertThat(this.config.ehLatch.await(10, TimeUnit.SECONDS)).isTrue()
96106
val reply = template.receiveAndConvert("kotlinReplyQueue", 10_000)
97107
assertThat(reply).isEqualTo("error processed")
108+
109+
var listenerContainer = registry.getListenerContainer("multi") as AbstractMessageListenerContainer
110+
assertThat(listenerContainer.acknowledgeMode).isEqualTo(AcknowledgeMode.AUTO)
98111
}
99112

100113
@Configuration
101114
@EnableRabbit
102115
class Config {
103116

104117
@RabbitListener(id = "single", queues = ["kotlinQueue"])
105-
suspend fun handle(@Suppress("UNUSED_PARAMETER") data: String) : String? {
118+
suspend fun handle(@Suppress("UNUSED_PARAMETER") data: String): String? {
106119
return data.uppercase()
107120
}
108121

109122
val batchReceived = CountDownLatch(1)
110123

111124
lateinit var batch: List<Message>
112125

113-
@RabbitListener(id = "batch", queues = ["kotlinBatchQueue"],
114-
containerFactory = "batchRabbitListenerContainerFactory")
126+
@RabbitListener(
127+
id = "batch", queues = ["kotlinBatchQueue"],
128+
containerFactory = "batchRabbitListenerContainerFactory"
129+
)
115130
suspend fun receiveBatch(messages: List<Message>) {
116131
batch = messages
117132
batchReceived.countDown()
118133
}
119134

120135
@Bean
121136
fun rabbitListenerContainerFactory(cf: CachingConnectionFactory) =
122-
SimpleRabbitListenerContainerFactory().also {
123-
it.setAcknowledgeMode(AcknowledgeMode.MANUAL)
124-
it.setReceiveTimeout(10)
125-
it.setConnectionFactory(cf)
126-
}
137+
SimpleRabbitListenerContainerFactory().also {
138+
it.setReceiveTimeout(10)
139+
it.setConnectionFactory(cf)
140+
}
127141

128142
@Bean
129143
fun batchRabbitListenerContainerFactory(cf: CachingConnectionFactory) =
130-
SimpleRabbitListenerContainerFactory().also {
131-
it.setAcknowledgeMode(AcknowledgeMode.MANUAL)
132-
it.setConsumerBatchEnabled(true)
133-
it.setDeBatchingEnabled(true)
134-
it.setBatchSize(3)
135-
it.setConnectionFactory(cf)
136-
}
144+
SimpleRabbitListenerContainerFactory().also {
145+
it.setConsumerBatchEnabled(true)
146+
it.setDeBatchingEnabled(true)
147+
it.setBatchSize(3)
148+
it.setConnectionFactory(cf)
149+
}
137150

138151
@Bean
139152
fun cf() = CachingConnectionFactory(RabbitAvailableCondition.getBrokerRunning().connectionFactory)

0 commit comments

Comments
 (0)