Skip to content

Commit 54094da

Browse files
artembilangaryrussell
authored andcommitted
INT-4523: Add DSL convert(Class<> cls) operator (#2556)
* INT-4523: Add DSL `convert(Class<> cls)` operator JIRA: https://jira.spring.io/browse/INT-4523 * Change the `LambdaMessageProcessor` to rely on the `MessageConverter` populated by the `IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME` This way all the Lambda-based handlers are going to work the same way as POJO-based via `MessagingMethodInvokerHelper` * Add `convert(Class<P> payloadType)` EIP-operator to perform similar to POJO-based method invocation argument conversion * * Fix `LambdaMessageProcessorTests` to inject a `ConfigurableCompositeMessageConverter` from the mocked `BeanFactory` * Make a `mappingJackson2MessageConverter.setStrictContentTypeMatch(true)` * Also obtain an `ObjectMapper` from the `Jackson2JsonObjectMapper` which is configured with the scanned possible Jackson modules. in the `ConfigurableCompositeMessageConverter` do not try to convert all the potential content without an appropriate JSON content-type header
1 parent 5bfd697 commit 54094da

File tree

5 files changed

+110
-26
lines changed

5 files changed

+110
-26
lines changed

spring-integration-core/src/main/java/org/springframework/integration/dsl/IntegrationFlowDefinition.java

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -587,6 +587,21 @@ public B transform(MessageProcessorSpec<?> messageProcessorSpec,
587587
.transform(new MethodInvokingTransformer(processor), endpointConfigurer);
588588
}
589589

590+
/**
591+
* Populate the {@link MessageTransformingHandler} instance
592+
* for the provided {@code payloadType} to convert at runtime.
593+
* @param payloadType the {@link Class} for expected payload type.
594+
* @param <P> the payload type - 'convert to'.
595+
* @return the current {@link IntegrationFlowDefinition}.
596+
* @since 5.1
597+
* @see MethodInvokingTransformer
598+
* @see LambdaMessageProcessor
599+
*/
600+
public <P> B convert(Class<P> payloadType) {
601+
return transform(payloadType, p -> p);
602+
}
603+
604+
590605
/**
591606
* Populate the {@link MessageTransformingHandler} instance for the provided {@link GenericTransformer}
592607
* for the specific {@code payloadType} to convert at runtime.
@@ -599,7 +614,7 @@ public B transform(MessageProcessorSpec<?> messageProcessorSpec,
599614
* @see LambdaMessageProcessor
600615
*/
601616
public <P, T> B transform(Class<P> payloadType, GenericTransformer<P, T> genericTransformer) {
602-
return this.transform(payloadType, genericTransformer, null);
617+
return transform(payloadType, genericTransformer, null);
603618
}
604619

605620
/**
@@ -619,6 +634,25 @@ public <S, T> B transform(GenericTransformer<S, T> genericTransformer,
619634
return this.transform(null, genericTransformer, endpointConfigurer);
620635
}
621636

637+
/**
638+
* Populate the {@link MessageTransformingHandler} instance
639+
* for the provided {@code payloadType} to convert at runtime.
640+
* In addition accept options for the integration endpoint using {@link GenericEndpointSpec}.
641+
* @param payloadType the {@link Class} for expected payload type.
642+
* @param endpointConfigurer the {@link Consumer} to provide integration endpoint options.
643+
* @param <P> the payload type - 'transform to'.
644+
* @return the current {@link IntegrationFlowDefinition}.
645+
* @since 5.1
646+
* @see MethodInvokingTransformer
647+
* @see LambdaMessageProcessor
648+
* @see GenericEndpointSpec
649+
*/
650+
public <P> B convert(Class<P> payloadType,
651+
Consumer<GenericEndpointSpec<MessageTransformingHandler>> endpointConfigurer) {
652+
653+
return transform(payloadType, p -> p, endpointConfigurer);
654+
}
655+
622656
/**
623657
* Populate the {@link MessageTransformingHandler} instance for the provided {@link GenericTransformer}
624658
* for the specific {@code payloadType} to convert at runtime.

spring-integration-core/src/main/java/org/springframework/integration/handler/LambdaMessageProcessor.java

Lines changed: 11 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2017 the original author or authors.
2+
* Copyright 2016-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -25,12 +25,10 @@
2525
import org.springframework.beans.BeansException;
2626
import org.springframework.beans.factory.BeanFactory;
2727
import org.springframework.beans.factory.BeanFactoryAware;
28-
import org.springframework.core.convert.ConversionService;
29-
import org.springframework.core.convert.TypeDescriptor;
30-
import org.springframework.core.convert.support.DefaultConversionService;
31-
import org.springframework.integration.support.utils.IntegrationUtils;
28+
import org.springframework.integration.context.IntegrationContextUtils;
3229
import org.springframework.messaging.Message;
3330
import org.springframework.messaging.MessageHandlingException;
31+
import org.springframework.messaging.converter.MessageConverter;
3432
import org.springframework.util.Assert;
3533
import org.springframework.util.ReflectionUtils;
3634

@@ -48,11 +46,11 @@ public class LambdaMessageProcessor implements MessageProcessor<Object>, BeanFac
4846

4947
private final Method method;
5048

51-
private final TypeDescriptor payloadType;
49+
private final Class<?> payloadType;
5250

5351
private final Class<?>[] parameterTypes;
5452

55-
private ConversionService conversionService;
53+
private MessageConverter messageConverter;
5654

5755
public LambdaMessageProcessor(Object target, Class<?> payloadType) {
5856
Assert.notNull(target, "'target' must not be null");
@@ -79,16 +77,14 @@ public LambdaMessageProcessor(Object target, Class<?> payloadType) {
7977
this.method = methodValue.get();
8078
this.method.setAccessible(true);
8179
this.parameterTypes = this.method.getParameterTypes();
82-
this.payloadType = payloadType != null ? TypeDescriptor.valueOf(payloadType) : null;
80+
this.payloadType = payloadType;
8381
}
8482

8583
@Override
8684
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
87-
ConversionService conversionService = IntegrationUtils.getConversionService(beanFactory);
88-
if (conversionService == null) {
89-
conversionService = DefaultConversionService.getSharedInstance();
90-
}
91-
this.conversionService = conversionService;
85+
this.messageConverter =
86+
beanFactory.getBean(IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME,
87+
MessageConverter.class);
9288
}
9389

9490
@Override
@@ -109,14 +105,12 @@ else if (Map.class.isAssignableFrom(parameterType)) {
109105
}
110106
else {
111107
if (this.payloadType != null) {
112-
if (Message.class.isAssignableFrom(this.payloadType.getType())) {
108+
if (Message.class.isAssignableFrom(this.payloadType)) {
113109
args[i] = message;
114110
}
115111
else {
116-
args[i] = this.conversionService.convert(message.getPayload(),
117-
TypeDescriptor.forObject(message.getPayload()), this.payloadType);
112+
args[i] = this.messageConverter.fromMessage(message, this.payloadType);
118113
}
119-
120114
}
121115
else {
122116
args[i] = message.getPayload();

spring-integration-core/src/main/java/org/springframework/integration/support/converter/ConfigurableCompositeMessageConverter.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017 the original author or authors.
2+
* Copyright 2017-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -22,6 +22,7 @@
2222
import java.util.stream.Collectors;
2323
import java.util.stream.Stream;
2424

25+
import org.springframework.integration.support.json.Jackson2JsonObjectMapper;
2526
import org.springframework.integration.support.json.JacksonPresent;
2627
import org.springframework.messaging.converter.ByteArrayMessageConverter;
2728
import org.springframework.messaging.converter.CompositeMessageConverter;
@@ -78,7 +79,10 @@ private static Collection<MessageConverter> initDefaults() {
7879
List<MessageConverter> converters = new LinkedList<>();
7980

8081
if (JacksonPresent.isJackson2Present()) {
81-
converters.add(new MappingJackson2MessageConverter());
82+
MappingJackson2MessageConverter mappingJackson2MessageConverter = new MappingJackson2MessageConverter();
83+
mappingJackson2MessageConverter.setStrictContentTypeMatch(true);
84+
mappingJackson2MessageConverter.setObjectMapper(new Jackson2JsonObjectMapper().getObjectMapper());
85+
converters.add(mappingJackson2MessageConverter);
8286
}
8387
converters.add(new ByteArrayMessageConverter());
8488
converters.add(new ObjectStringMessageConverter());

spring-integration-core/src/test/java/org/springframework/integration/dsl/LambdaMessageProcessorTests.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2017 the original author or authors.
2+
* Copyright 2016-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,20 +20,25 @@
2020
import static org.junit.Assert.assertSame;
2121
import static org.junit.Assert.assertThat;
2222
import static org.junit.Assert.fail;
23+
import static org.mockito.BDDMockito.given;
2324
import static org.mockito.Mockito.mock;
2425

2526
import org.junit.Test;
2627

2728
import org.springframework.beans.factory.BeanFactory;
29+
import org.springframework.integration.context.IntegrationContextUtils;
2830
import org.springframework.integration.handler.GenericHandler;
2931
import org.springframework.integration.handler.LambdaMessageProcessor;
32+
import org.springframework.integration.support.converter.ConfigurableCompositeMessageConverter;
3033
import org.springframework.integration.transformer.GenericTransformer;
3134
import org.springframework.messaging.Message;
35+
import org.springframework.messaging.converter.MessageConverter;
3236
import org.springframework.messaging.support.GenericMessage;
3337

3438

3539
/**
3640
* @author Gary Russell
41+
* @author Artem Bilan
3742
*
3843
* @since 5.0
3944
*/
@@ -69,12 +74,22 @@ public Message<?> transform(Message<?> source) {
6974

7075
private void handle(GenericHandler<?> h) {
7176
LambdaMessageProcessor lmp = new LambdaMessageProcessor(h, String.class);
72-
lmp.setBeanFactory(mock(BeanFactory.class));
77+
lmp.setBeanFactory(getBeanFactory());
78+
7379
lmp.processMessage(new GenericMessage<>("foo"));
7480
}
7581

7682
private Message<?> messageTransformer(Message<?> message) {
7783
return message;
7884
}
7985

86+
87+
private BeanFactory getBeanFactory() {
88+
BeanFactory mockBeanFactory = mock(BeanFactory.class);
89+
given(mockBeanFactory.getBean(IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME,
90+
MessageConverter.class))
91+
.willReturn(new ConfigurableCompositeMessageConverter());
92+
return mockBeanFactory;
93+
}
94+
8095
}

spring-integration-core/src/test/java/org/springframework/integration/dsl/transformers/TransformerTests.java

Lines changed: 41 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@ public void testSubFlowContentEnricher() {
196196
private PollableChannel codecReplyChannel;
197197

198198
@Test
199-
public void testCodec() throws Exception {
199+
public void testCodec() {
200200
this.encodingFlowInput.send(new GenericMessage<>("bar"));
201201
Message<?> receive = this.codecReplyChannel.receive(10000);
202202
assertNotNull(receive);
@@ -246,6 +246,34 @@ public void transformWithHeader() {
246246
assertNotNull(this.adviceChannel.receive(10000));
247247
}
248248

249+
@Autowired
250+
@Qualifier("convertFlow.input")
251+
private MessageChannel convertFlowInput;
252+
253+
@Test
254+
public void testConvertOperator() {
255+
QueueChannel replyChannel = new QueueChannel();
256+
Date date = new Date();
257+
this.convertFlowInput.send(
258+
MessageBuilder.withPayload("{\"name\": \"Baz\",\"date\": " + date.getTime() + "}")
259+
.setHeader(MessageHeaders.CONTENT_TYPE, "application/json")
260+
.setReplyChannel(replyChannel)
261+
.build());
262+
263+
Message<?> receive = replyChannel.receive(10_000);
264+
265+
assertNotNull(receive);
266+
267+
Object payload = receive.getPayload();
268+
269+
assertThat(payload, instanceOf(TestPojo.class));
270+
271+
TestPojo testPojo = (TestPojo) payload;
272+
273+
assertEquals("Baz", testPojo.getName());
274+
assertEquals(date, testPojo.getDate());
275+
}
276+
249277
@Configuration
250278
@EnableIntegration
251279
public static class ContextConfiguration {
@@ -413,6 +441,12 @@ public SomeService someService() {
413441
return new SomeService();
414442
}
415443

444+
@Bean
445+
public IntegrationFlow convertFlow() {
446+
return f -> f
447+
.convert(TestPojo.class);
448+
}
449+
416450
}
417451

418452
private static final class TestPojo {
@@ -421,6 +455,9 @@ private static final class TestPojo {
421455

422456
private Date date;
423457

458+
private TestPojo() {
459+
}
460+
424461
private TestPojo(String name) {
425462
this.name = name;
426463
}
@@ -448,7 +485,7 @@ public void setDate(Date date) {
448485
public static class MyCodec implements Codec {
449486

450487
@Override
451-
public void encode(Object object, OutputStream outputStream) throws IOException {
488+
public void encode(Object object, OutputStream outputStream) {
452489
}
453490

454491
@Override
@@ -457,13 +494,13 @@ public byte[] encode(Object object) throws IOException {
457494
}
458495

459496
@Override
460-
public <T> T decode(InputStream inputStream, Class<T> type) throws IOException {
497+
public <T> T decode(InputStream inputStream, Class<T> type) {
461498
return null;
462499
}
463500

464501
@SuppressWarnings("unchecked")
465502
@Override
466-
public <T> T decode(byte[] bytes, Class<T> type) throws IOException {
503+
public <T> T decode(byte[] bytes, Class<T> type) {
467504
return (T) (type.equals(String.class) ? new String(bytes) :
468505
type.equals(Integer.class) ? Integer.valueOf(42) : Integer.valueOf(43));
469506
}

0 commit comments

Comments
 (0)