Skip to content

Commit 1f3a71c

Browse files
committed
GH-3161: Fix HandlerAdapter for Kotlin suspend functions
Fixes: #3161 The Kotlin `suspend` are async by nature. However, the `HandlerAdapter` is missing to detect that for its `asyncReplies` property which is used to set `acknowledgeMode` in the container to the `MANUAL` mode. * Add `KotlinDetector.isSuspendingFunction()` check to the `HandlerAdapter` for its `asyncReplies` property * Modify `EnableRabbitKotlinTests` to remove `setAcknowledgeMode(AcknowledgeMode.MANUAL)` from listener container factory beans. Instead, verify in the respective tests that listener container is switched to the `AcknowledgeMode.MANUAL` due to `true` outcome from the `KotlinDetector.isSuspendingFunction()` for those `suspend` functions in the test configuration # Conflicts: # spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/adapter/HandlerAdapter.java # spring-rabbit/src/test/kotlin/org/springframework/amqp/rabbit/annotation/EnableRabbitKotlinTests.kt
1 parent 40b9f7a commit 1f3a71c

File tree

2 files changed

+42
-21
lines changed

2 files changed

+42
-21
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/adapter/HandlerAdapter.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.concurrent.CompletableFuture;
2222

2323
import org.springframework.lang.Nullable;
24+
import org.springframework.core.KotlinDetector;
2425
import org.springframework.messaging.Message;
2526
import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;
2627

@@ -30,6 +31,8 @@
3031
* underlying handler.
3132
*
3233
* @author Gary Russell
34+
* @author Artem Bilan
35+
*
3336
* @since 1.5
3437
*
3538
*/
@@ -48,9 +51,11 @@ public class HandlerAdapter {
4851
public HandlerAdapter(InvocableHandlerMethod invokerHandlerMethod) {
4952
this.invokerHandlerMethod = invokerHandlerMethod;
5053
this.delegatingHandler = null;
54+
Method method = invokerHandlerMethod.getMethod();
5155
this.asyncReplies = (AbstractAdaptableMessageListener.monoPresent
52-
&& MonoHandler.isMono(invokerHandlerMethod.getMethod().getReturnType()))
53-
|| CompletableFuture.class.isAssignableFrom(invokerHandlerMethod.getMethod().getReturnType());
56+
&& MonoHandler.isMono(method.getReturnType()))
57+
|| CompletableFuture.class.isAssignableFrom(method.getReturnType())
58+
|| KotlinDetector.isSuspendingFunction(method);
5459
}
5560

5661
/**

spring-rabbit/src/test/kotlin/org/springframework/amqp/rabbit/annotation/EnableRabbitKotlinTests.kt

Lines changed: 35 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import org.springframework.amqp.rabbit.connection.CachingConnectionFactory
2929
import org.springframework.amqp.rabbit.core.RabbitTemplate
3030
import org.springframework.amqp.rabbit.junit.RabbitAvailable
3131
import org.springframework.amqp.rabbit.junit.RabbitAvailableCondition
32+
import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer
3233
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry
3334
import org.springframework.amqp.rabbit.listener.api.RabbitListenerErrorHandler
3435
import org.springframework.amqp.utils.test.TestUtils
@@ -43,7 +44,6 @@ import org.springframework.test.annotation.DirtiesContext
4344
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig
4445
import java.util.concurrent.CountDownLatch
4546
import java.util.concurrent.TimeUnit
46-
import java.util.concurrent.atomic.AtomicBoolean
4747

4848
/**
4949
* Kotlin Annotated listener tests.
@@ -62,14 +62,24 @@ class EnableRabbitKotlinTests {
6262
@Autowired
6363
private lateinit var config: Config
6464

65+
@Autowired
66+
private lateinit var registry: RabbitListenerEndpointRegistry
67+
6568
@Test
66-
fun `send and wait for consume`(@Autowired registry: RabbitListenerEndpointRegistry) {
69+
fun `send and wait for consume`() {
6770
val template = RabbitTemplate(this.config.cf())
6871
template.setReplyTimeout(10_000)
6972
val result = template.convertSendAndReceive("kotlinQueue", "test")
7073
assertThat(result).isEqualTo("TEST")
71-
val listener = registry.getListenerContainer("single").messageListener
72-
assertThat(TestUtils.getPropertyValue(listener, "messagingMessageConverter.inferredArgumentType").toString())
74+
var listenerContainer = registry.getListenerContainer("single") as AbstractMessageListenerContainer
75+
assertThat(listenerContainer.acknowledgeMode).isEqualTo(AcknowledgeMode.MANUAL)
76+
val listener = listenerContainer?.messageListener
77+
assertThat(listener).isNotNull()
78+
listener?.let { nonNullableListener ->
79+
assertThat(
80+
TestUtils.getPropertyValue(nonNullableListener, "messagingMessageConverter.inferredArgumentType")
81+
.toString()
82+
)
7383
.isEqualTo("class java.lang.String")
7484
}
7585

@@ -81,6 +91,9 @@ class EnableRabbitKotlinTests {
8191
assertThat(this.config.batchReceived.await(10, TimeUnit.SECONDS)).isTrue()
8292
assertThat(this.config.batch[0]).isInstanceOf(Message::class.java)
8393
assertThat(this.config.batch.map { m -> String(m.body) }).containsOnly("test1", "test2")
94+
95+
var listenerContainer = registry.getListenerContainer("batch") as AbstractMessageListenerContainer
96+
assertThat(listenerContainer.acknowledgeMode).isEqualTo(AcknowledgeMode.MANUAL)
8497
}
8598

8699
@Test
@@ -90,45 +103,48 @@ class EnableRabbitKotlinTests {
90103
assertThat(this.config.ehLatch.await(10, TimeUnit.SECONDS)).isTrue()
91104
val reply = template.receiveAndConvert("kotlinReplyQueue", 10_000)
92105
assertThat(reply).isEqualTo("error processed")
106+
107+
var listenerContainer = registry.getListenerContainer("multi") as AbstractMessageListenerContainer
108+
assertThat(listenerContainer.acknowledgeMode).isEqualTo(AcknowledgeMode.AUTO)
93109
}
94110

95111
@Configuration
96112
@EnableRabbit
97113
class Config {
98114

99115
@RabbitListener(id = "single", queues = ["kotlinQueue"])
100-
suspend fun handle(@Suppress("UNUSED_PARAMETER") data: String) : String? {
116+
suspend fun handle(@Suppress("UNUSED_PARAMETER") data: String): String? {
101117
return data.uppercase()
102118
}
103119

104120
val batchReceived = CountDownLatch(1)
105121

106122
lateinit var batch: List<Message>
107123

108-
@RabbitListener(id = "batch", queues = ["kotlinBatchQueue"],
109-
containerFactory = "batchRabbitListenerContainerFactory")
124+
@RabbitListener(
125+
id = "batch", queues = ["kotlinBatchQueue"],
126+
containerFactory = "batchRabbitListenerContainerFactory"
127+
)
110128
suspend fun receiveBatch(messages: List<Message>) {
111129
batch = messages
112130
batchReceived.countDown()
113131
}
114132

115133
@Bean
116134
fun rabbitListenerContainerFactory(cf: CachingConnectionFactory) =
117-
SimpleRabbitListenerContainerFactory().also {
118-
it.setAcknowledgeMode(AcknowledgeMode.MANUAL)
119-
it.setReceiveTimeout(10)
120-
it.setConnectionFactory(cf)
121-
}
135+
SimpleRabbitListenerContainerFactory().also {
136+
it.setReceiveTimeout(10)
137+
it.setConnectionFactory(cf)
138+
}
122139

123140
@Bean
124141
fun batchRabbitListenerContainerFactory(cf: CachingConnectionFactory) =
125-
SimpleRabbitListenerContainerFactory().also {
126-
it.setAcknowledgeMode(AcknowledgeMode.MANUAL)
127-
it.setConsumerBatchEnabled(true)
128-
it.setDeBatchingEnabled(true)
129-
it.setBatchSize(3)
130-
it.setConnectionFactory(cf)
131-
}
142+
SimpleRabbitListenerContainerFactory().also {
143+
it.setConsumerBatchEnabled(true)
144+
it.setDeBatchingEnabled(true)
145+
it.setBatchSize(3)
146+
it.setConnectionFactory(cf)
147+
}
132148

133149
@Bean
134150
fun cf() = CachingConnectionFactory(RabbitAvailableCondition.getBrokerRunning().connectionFactory)

0 commit comments

Comments
 (0)