Skip to content

Commit eb67289

Browse files
odrotbohmartembilan
authored andcommitted
Add Spring Data projection support
This commit introduces a `ProjectingMessageConverter` that supports binding `String` and `byte[]`-backed JSON strings to Spring Data Projection interfaces. This allows very selective, and low-coupled bindings to data including the lookup of values from multiple places inside the JSON document. For example the following interface can be defined as message payload type: ``` interface SomeSample { @JsonPath({ "$.username", "$.user.name" }) String getUsername(); } ``` Accessor methods will be used to lookup the property name as field in the received JSON document by default. The @JsonPath expression allows to customize the value lookup and even to define multiple JSONPath expression to lookup values from multiple places until an expression returns an actual value. If the type we're supposed to unmarshal into is a plain class the `ProjectingMessageConverter` delegates to the default `StringJsonMessageConverter`. Added Spring Data and the transitively required Jayway JSONPath library as optional build dependencies. The `ProjectingMessageConverter` now delegates writes to a `StringJsonMessageConverter`. * Some code style polishing
1 parent f7cabca commit eb67289

File tree

3 files changed

+281
-0
lines changed

3 files changed

+281
-0
lines changed

build.gradle

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,13 +73,15 @@ subprojects { subproject ->
7373
assertjVersion = '2.6.0'
7474
hamcrestVersion = '1.3'
7575
jacksonVersion = '2.9.1'
76+
jaywayJsonPathVersion = '2.4.0'
7677
junitVersion = '4.12'
7778
kafkaVersion = '1.0.0'
7879
mockitoVersion = '2.11.0'
7980
scalaVersion = '2.11'
8081
slf4jVersion = '1.7.25'
8182
springRetryVersion = '1.2.1.RELEASE'
8283
springVersion = '5.0.2.RELEASE'
84+
springDataCommonsVersion = '2.0.2.RELEASE'
8385

8486
idPrefix = 'kafka'
8587

@@ -170,6 +172,10 @@ project ('spring-kafka') {
170172
compile ("com.fasterxml.jackson.core:jackson-core:$jacksonVersion", optional)
171173
compile ("com.fasterxml.jackson.core:jackson-databind:$jacksonVersion", optional)
172174

175+
// Spring Data projection message binding support
176+
compile ("org.springframework.data:spring-data-commons:$springDataCommonsVersion", optional)
177+
compile ("com.jayway.jsonpath:json-path:$jaywayJsonPathVersion", optional)
178+
173179
testCompile project (":spring-kafka-test")
174180
testCompile "org.assertj:assertj-core:$assertjVersion"
175181
testCompile "org.springframework:spring-tx:$springVersion"
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
/*
2+
* Copyright 2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.support.converter;
18+
19+
import java.io.ByteArrayInputStream;
20+
import java.io.InputStream;
21+
import java.lang.reflect.Type;
22+
import java.nio.charset.StandardCharsets;
23+
24+
import org.apache.kafka.clients.consumer.ConsumerRecord;
25+
26+
import org.springframework.core.ResolvableType;
27+
import org.springframework.data.projection.MethodInterceptorFactory;
28+
import org.springframework.data.projection.ProjectionFactory;
29+
import org.springframework.data.projection.SpelAwareProxyProjectionFactory;
30+
import org.springframework.data.web.JsonProjectingMethodInterceptorFactory;
31+
import org.springframework.kafka.support.KafkaNull;
32+
import org.springframework.messaging.Message;
33+
import org.springframework.util.Assert;
34+
35+
import com.fasterxml.jackson.databind.ObjectMapper;
36+
import com.jayway.jsonpath.spi.mapper.JacksonMappingProvider;
37+
38+
/**
39+
* A {@link MessageConverter} implementation that uses a Spring Data
40+
* {@link ProjectionFactory} to bind incoming messages to projection interfaces.
41+
*
42+
* @author Oliver Gierke
43+
*
44+
* @since 2.1.1
45+
*/
46+
public class ProjectingMessageConverter extends MessagingMessageConverter {
47+
48+
private final ProjectionFactory projectionFactory;
49+
50+
private final MessagingMessageConverter delegate;
51+
52+
/**
53+
* Creates a new {@link ProjectingMessageConverter} using the given {@link ObjectMapper}.
54+
* @param mapper must not be {@literal null}.
55+
*/
56+
public ProjectingMessageConverter(ObjectMapper mapper) {
57+
Assert.notNull(mapper, "ObjectMapper must not be null");
58+
59+
JacksonMappingProvider provider = new JacksonMappingProvider(mapper);
60+
MethodInterceptorFactory interceptorFactory = new JsonProjectingMethodInterceptorFactory(provider);
61+
62+
SpelAwareProxyProjectionFactory factory = new SpelAwareProxyProjectionFactory();
63+
factory.registerMethodInvokerFactory(interceptorFactory);
64+
65+
this.projectionFactory = factory;
66+
this.delegate = new StringJsonMessageConverter(mapper);
67+
}
68+
69+
@Override
70+
protected Object convertPayload(Message<?> message) {
71+
return this.delegate.convertPayload(message);
72+
}
73+
74+
@Override
75+
protected Object extractAndConvertValue(ConsumerRecord<?, ?> record, Type type) {
76+
Object value = record.value();
77+
78+
if (value == null) {
79+
return KafkaNull.INSTANCE;
80+
}
81+
82+
Class<?> rawType = ResolvableType.forType(type).resolve(Object.class);
83+
84+
if (!rawType.isInterface()) {
85+
return this.delegate.extractAndConvertValue(record, type);
86+
}
87+
88+
InputStream inputStream = new ByteArrayInputStream(getAsByteArray(value));
89+
90+
// The inputStream is closed underneath by the ObjectMapper#_readTreeAndClose()
91+
return this.projectionFactory.createProjection(rawType, inputStream);
92+
}
93+
94+
/**
95+
* Return the given source value as byte array.
96+
* @param source must not be {@literal null}.
97+
* @return the source instance as byte array.
98+
*/
99+
private static byte[] getAsByteArray(Object source) {
100+
Assert.notNull(source, "Source must not be null");
101+
102+
if (source instanceof String) {
103+
return String.class.cast(source).getBytes(StandardCharsets.UTF_8);
104+
}
105+
106+
if (source instanceof byte[]) {
107+
return byte[].class.cast(source);
108+
}
109+
110+
throw new ConversionException(String.format("Unsupported payload type '%s'. Expected 'String' or 'byte[]'",
111+
source.getClass()), null);
112+
}
113+
114+
}
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
/*
2+
* Copyright 2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.support.converter;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.mockito.Mockito.doReturn;
21+
22+
import java.nio.charset.StandardCharsets;
23+
import java.util.HashMap;
24+
import java.util.Map;
25+
26+
import org.apache.kafka.clients.consumer.ConsumerRecord;
27+
import org.junit.Rule;
28+
import org.junit.Test;
29+
import org.junit.rules.ExpectedException;
30+
import org.junit.runner.RunWith;
31+
import org.mockito.Mock;
32+
import org.mockito.junit.MockitoJUnitRunner;
33+
34+
import org.springframework.data.projection.SpelAwareProxyProjectionFactory;
35+
import org.springframework.data.web.JsonPath;
36+
import org.springframework.kafka.support.KafkaNull;
37+
import org.springframework.messaging.support.MessageBuilder;
38+
39+
import com.fasterxml.jackson.databind.ObjectMapper;
40+
import com.jayway.jsonpath.DocumentContext;
41+
42+
/**
43+
* @author Oliver Gierke
44+
*
45+
* @since 2.1.1
46+
*/
47+
@RunWith(MockitoJUnitRunner.class)
48+
public class ProjectingMessageConverterTests {
49+
50+
private static final String STRING_PAYLOAD =
51+
"{ \"username\" : \"SomeUsername\", \"user\" : { \"name\" : \"SomeName\"}}";
52+
53+
private static final byte[] BYTE_ARRAY_PAYLOAD = STRING_PAYLOAD.getBytes(StandardCharsets.UTF_8);
54+
55+
private final ProjectingMessageConverter converter = new ProjectingMessageConverter(new ObjectMapper());
56+
57+
@Mock
58+
private ConsumerRecord<?, ?> record;
59+
60+
@Rule
61+
public ExpectedException exception = ExpectedException.none();
62+
63+
@Test
64+
public void rejectsNullObjectMapper() {
65+
this.exception.expect(IllegalArgumentException.class);
66+
new ProjectingMessageConverter(null);
67+
}
68+
69+
@Test
70+
public void returnsKafkaNullForNullPayload() {
71+
doReturn(null).when(this.record).value();
72+
73+
assertThat(this.converter.extractAndConvertValue(this.record, Object.class)).isEqualTo(KafkaNull.INSTANCE);
74+
}
75+
76+
@Test
77+
public void createsProjectedPayloadForInterface() {
78+
assertProjectionProxy(STRING_PAYLOAD);
79+
assertProjectionProxy(BYTE_ARRAY_PAYLOAD);
80+
}
81+
82+
@Test
83+
public void usesJacksonToCreatePayloadForClass() {
84+
assertSimpleObject(STRING_PAYLOAD);
85+
assertSimpleObject(BYTE_ARRAY_PAYLOAD);
86+
}
87+
88+
@Test
89+
public void rejectsInvalidPayload() {
90+
this.exception.expect(ConversionException.class);
91+
this.exception.expectMessage(Object.class.getName());
92+
93+
assertProjectionProxy(new Object());
94+
}
95+
96+
@Test
97+
public void writesProjectedPayloadUsingJackson() {
98+
Map<String, Object> values = new HashMap<>();
99+
values.put("username", "SomeUsername");
100+
values.put("name", "SomeName");
101+
102+
Sample sample = new SpelAwareProxyProjectionFactory().createProjection(Sample.class, values);
103+
104+
Object payload = this.converter.convertPayload(MessageBuilder.withPayload(sample).build());
105+
106+
DocumentContext json = com.jayway.jsonpath.JsonPath.parse(payload.toString());
107+
108+
assertThat(json.read("$.username", String.class)).isEqualTo("SomeUsername");
109+
assertThat(json.read("$.name", String.class)).isEqualTo("SomeName");
110+
}
111+
112+
private void assertProjectionProxy(Object payload) {
113+
doReturn(payload).when(this.record).value();
114+
115+
Object value = this.converter.extractAndConvertValue(this.record, Sample.class);
116+
117+
assertThat(value).isInstanceOf(Sample.class);
118+
119+
Sample sample = (Sample) value;
120+
121+
assertThat(sample.getName()).isEqualTo("SomeName");
122+
assertThat(sample.getUsername()).isEqualTo("SomeUsername");
123+
}
124+
125+
private void assertSimpleObject(Object payload) {
126+
doReturn(payload).when(this.record).value();
127+
128+
Object value = this.converter.extractAndConvertValue(this.record, AnotherSample.class);
129+
130+
assertThat(value).isInstanceOf(AnotherSample.class);
131+
132+
AnotherSample sample = (AnotherSample) value;
133+
134+
assertThat(sample.user.name).isEqualTo("SomeName");
135+
assertThat(sample.username).isEqualTo("SomeUsername");
136+
}
137+
138+
interface Sample {
139+
140+
String getUsername();
141+
142+
@JsonPath("$.user.name")
143+
String getName();
144+
145+
}
146+
147+
public static class AnotherSample {
148+
149+
public String username;
150+
151+
public User user;
152+
153+
public static class User {
154+
155+
public String name;
156+
157+
}
158+
159+
}
160+
161+
}

0 commit comments

Comments
 (0)