Skip to content

Commit fb32210

Browse files
KaiStapelgaryrussell
authored andcommitted
[#1145] Fix generic parameter type detection for batch listeners … (#1146)
* [#1145] Fix generic parameter type detection for batch listeners with typed 'org.springframework.messaging.Message's * [#1145] Update integration test * [#1145] Add myself as author
1 parent abc96cf commit fb32210

File tree

3 files changed

+103
-11
lines changed

3 files changed

+103
-11
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/adapter/MessagingMessageListenerAdapter.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -57,6 +57,7 @@
5757
* @author Stephane Nicoll
5858
* @author Gary Russell
5959
* @author Artem Bilan
60+
* @author Kai Stapel
6061
*
6162
* @since 1.4
6263
*/
@@ -381,7 +382,8 @@ private Type extractGenericParameterTypFromMethodParameter(MethodParameter metho
381382
if (parameterizedType.getRawType().equals(Message.class)) {
382383
genericParameterType = ((ParameterizedType) genericParameterType).getActualTypeArguments()[0];
383384
}
384-
else if (parameterizedType.getRawType().equals(List.class)
385+
else if (this.isBatch
386+
&& parameterizedType.getRawType().equals(List.class)
385387
&& parameterizedType.getActualTypeArguments().length == 1) {
386388

387389
Type paramType = parameterizedType.getActualTypeArguments()[0];
@@ -392,7 +394,7 @@ else if (parameterizedType.getRawType().equals(List.class)
392394
if (messageHasGeneric) {
393395
genericParameterType = ((ParameterizedType) paramType).getActualTypeArguments()[0];
394396
}
395-
if (this.isBatch) {
397+
else {
396398
// when decoding batch messages we convert to the List's generic type
397399
genericParameterType = paramType;
398400
}

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/annotation/EnableRabbitBatchJsonIntegrationTests.java

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019 the original author or authors.
2+
* Copyright 2019-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -18,12 +18,14 @@
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
2020

21+
import java.io.UnsupportedEncodingException;
2122
import java.util.List;
2223
import java.util.concurrent.CountDownLatch;
2324
import java.util.concurrent.TimeUnit;
2425

2526
import org.junit.jupiter.api.Test;
2627

28+
import org.springframework.amqp.core.MessageProperties;
2729
import org.springframework.amqp.rabbit.batch.SimpleBatchingStrategy;
2830
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
2931
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
@@ -32,6 +34,7 @@
3234
import org.springframework.amqp.rabbit.junit.RabbitAvailable;
3335
import org.springframework.amqp.rabbit.junit.RabbitAvailableCondition;
3436
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
37+
import org.springframework.amqp.support.converter.SimpleMessageConverter;
3538
import org.springframework.beans.factory.annotation.Autowired;
3639
import org.springframework.context.annotation.Bean;
3740
import org.springframework.context.annotation.Configuration;
@@ -43,6 +46,7 @@
4346

4447
/**
4548
* @author Gary Russell
49+
* @author Kai Stapel
4650
* @since 2.2
4751
*
4852
*/
@@ -58,23 +62,29 @@ public class EnableRabbitBatchJsonIntegrationTests {
5862
private Listener listener;
5963

6064
@Test
61-
public void testSimpleList() throws InterruptedException {
62-
this.template.convertAndSend("json.batch.1", new Foo("foo"));
63-
this.template.convertAndSend("json.batch.1", new Foo("bar"));
65+
public void testSimpleList() throws Exception {
66+
this.template.send("json.batch.1", msg("{\"bar\":\"foo\"}"));
67+
this.template.send("json.batch.1", msg("{\"bar\":\"bar\"}"));
6468
assertThat(this.listener.foosLatch.await(10, TimeUnit.SECONDS)).isTrue();
6569
assertThat(this.listener.foos.get(0).getBar()).isEqualTo("foo");
6670
assertThat(this.listener.foos.get(1).getBar()).isEqualTo("bar");
6771
}
6872

6973
@Test
70-
public void testMessageList() throws InterruptedException {
71-
this.template.convertAndSend("json.batch.2", new Foo("foo"));
72-
this.template.convertAndSend("json.batch.2", new Foo("bar"));
74+
public void testMessageList() throws Exception {
75+
this.template.send("json.batch.2", msg("{\"bar\":\"foo\"}"));
76+
this.template.send("json.batch.2", msg("{\"bar\":\"bar\"}"));
7377
assertThat(this.listener.fooMessagesLatch.await(10, TimeUnit.SECONDS)).isTrue();
7478
assertThat(this.listener.fooMessages.get(0).getPayload().getBar()).isEqualTo("foo");
7579
assertThat(this.listener.fooMessages.get(1).getPayload().getBar()).isEqualTo("bar");
7680
}
7781

82+
private org.springframework.amqp.core.Message msg(String body) throws UnsupportedEncodingException {
83+
MessageProperties properties = new MessageProperties();
84+
properties.setContentType("application/json");
85+
return new org.springframework.amqp.core.Message(body.getBytes(SimpleMessageConverter.DEFAULT_CHARSET), properties);
86+
}
87+
7888
@Configuration
7989
@EnableRabbit
8090
public static class Config {
@@ -84,6 +94,8 @@ public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
8494
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
8595
factory.setConnectionFactory(connectionFactory());
8696
factory.setBatchListener(true);
97+
factory.setConsumerBatchEnabled(true);
98+
factory.setBatchSize(2);
8799
factory.setMessageConverter(converter());
88100
return factory;
89101
}

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/adapter/MessagingMessageListenerAdapterTests.java

Lines changed: 79 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2019 the original author or authors.
2+
* Copyright 2014-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -25,8 +25,10 @@
2525

2626
import java.lang.reflect.Method;
2727
import java.util.ArrayList;
28+
import java.util.Arrays;
2829
import java.util.LinkedHashMap;
2930
import java.util.List;
31+
import java.util.stream.Collectors;
3032

3133
import org.junit.jupiter.api.BeforeEach;
3234
import org.junit.jupiter.api.Test;
@@ -51,6 +53,7 @@
5153
* @author Stephane Nicoll
5254
* @author Artem Bilan
5355
* @author Gary Russell
56+
* @author Kai Stapel
5457
*/
5558
public class MessagingMessageListenerAdapterTests {
5659

@@ -212,6 +215,54 @@ public void genericMessageTest3() throws Exception {
212215
assertThat(this.sample.payload.getClass()).isEqualTo(LinkedHashMap.class);
213216
}
214217

218+
@Test
219+
public void batchAmqpMessagesTest() {
220+
// given
221+
org.springframework.amqp.core.Message message1 = MessageTestUtils.createTextMessage("{ \"foo1\" : \"bar1\" }");
222+
message1.getMessageProperties().setContentType("application/json");
223+
Channel channel = mock(Channel.class);
224+
BatchMessagingMessageListenerAdapter listener = getBatchInstance("withAmqpMessageBatch");
225+
listener.setMessageConverter(new Jackson2JsonMessageConverter());
226+
227+
// when
228+
listener.onMessageBatch(Arrays.asList(message1), channel);
229+
230+
// then
231+
assertThat(this.sample.batchPayloads.get(0).getClass()).isEqualTo(String.class);
232+
}
233+
234+
@Test
235+
public void batchTypedMessagesTest() {
236+
// given
237+
org.springframework.amqp.core.Message message1 = MessageTestUtils.createTextMessage("{ \"foo1\" : \"bar1\" }");
238+
message1.getMessageProperties().setContentType("application/json");
239+
Channel channel = mock(Channel.class);
240+
BatchMessagingMessageListenerAdapter listener = getBatchInstance("withTypedMessageBatch");
241+
listener.setMessageConverter(new Jackson2JsonMessageConverter());
242+
243+
// when
244+
listener.onMessageBatch(Arrays.asList(message1), channel);
245+
246+
// then
247+
assertThat(this.sample.batchPayloads.get(0).getClass()).isEqualTo(Foo.class);
248+
}
249+
250+
@Test
251+
public void batchTypedObjectTest() {
252+
// given
253+
org.springframework.amqp.core.Message message1 = MessageTestUtils.createTextMessage("{ \"foo1\" : \"bar1\" }");
254+
message1.getMessageProperties().setContentType("application/json");
255+
Channel channel = mock(Channel.class);
256+
BatchMessagingMessageListenerAdapter listener = getBatchInstance("withFooBatch");
257+
listener.setMessageConverter(new Jackson2JsonMessageConverter());
258+
259+
// when
260+
listener.onMessageBatch(Arrays.asList(message1), channel);
261+
262+
// then
263+
assertThat(this.sample.batchPayloads.get(0).getClass()).isEqualTo(Foo.class);
264+
}
265+
215266
protected MessagingMessageListenerAdapter getSimpleInstance(String methodName, Class<?>... parameterTypes) {
216267
Method m = ReflectionUtils.findMethod(SampleBean.class, methodName, parameterTypes);
217268
return createInstance(m, false);
@@ -246,6 +297,17 @@ protected MessagingMessageListenerAdapter createMultiInstance(Method m1, Method
246297
return adapter;
247298
}
248299

300+
protected BatchMessagingMessageListenerAdapter getBatchInstance(String methodName) {
301+
Method m = ReflectionUtils.findMethod(SampleBean.class, methodName, List.class);
302+
return createBatchInstance(m);
303+
}
304+
305+
protected BatchMessagingMessageListenerAdapter createBatchInstance(Method m) {
306+
BatchMessagingMessageListenerAdapter adapter = new BatchMessagingMessageListenerAdapter(null, m, false, null, null);
307+
adapter.setHandlerAdapter(new HandlerAdapter(factory.createInvocableHandlerMethod(sample, m)));
308+
return adapter;
309+
}
310+
249311
private void initializeFactory(DefaultMessageHandlerMethodFactory factory) {
250312
factory.setBeanFactory(new StaticListableBeanFactory());
251313
factory.afterPropertiesSet();
@@ -254,6 +316,7 @@ private void initializeFactory(DefaultMessageHandlerMethodFactory factory) {
254316
private static class SampleBean {
255317

256318
private Object payload;
319+
private List<Object> batchPayloads;
257320

258321
SampleBean() {
259322
}
@@ -295,6 +358,21 @@ public void withNonGenericMessage(@SuppressWarnings("rawtypes") Message message)
295358
this.payload = message.getPayload();
296359
}
297360

361+
@SuppressWarnings("unused")
362+
public void withAmqpMessageBatch(List<org.springframework.amqp.core.Message> messageBatch) {
363+
this.batchPayloads = messageBatch.stream().map(m -> new String(m.getBody())).collect(Collectors.toList());
364+
}
365+
366+
@SuppressWarnings("unused")
367+
public void withTypedMessageBatch(List<Message<Foo>> messageBatch) {
368+
this.batchPayloads = messageBatch.stream().map(Message::getPayload).collect(Collectors.toList());
369+
}
370+
371+
@SuppressWarnings("unused")
372+
public void withFooBatch(List<Foo> messageBatch) {
373+
this.batchPayloads = new ArrayList<>(messageBatch);
374+
}
375+
298376
@SuppressWarnings("unused")
299377
public String failWithReturn(Integer input) {
300378
throw new IllegalArgumentException("Expected test exception");

0 commit comments

Comments
 (0)