Skip to content

Commit 59643c3

Browse files
author
Dave Syer
authored
Add support for Message<CloudEvent> (#315)
* Add support for Message<CloudEvent> Signed-off-by: Dave Syer <[email protected]> * Add support for structured messages with Spring Message<?> Signed-off-by: Dave Syer <[email protected]> * Push private classes out to shared utilities Signed-off-by: Dave Syer <[email protected]> * Resolve some more review comments Signed-off-by: Dave Syer <[email protected]> * Restructure MessageReader and MessageWriter Signed-off-by: Dave Syer <[email protected]> * Remove integration test (depends on snapshots still) Signed-off-by: Dave Syer <[email protected]> * Simplify message converter but drop support for structured format Signed-off-by: Dave Syer <[email protected]> * Make HTTP optional Signed-off-by: Dave Syer <[email protected]> * Drop snapshot reporitory declarations Signed-off-by: Dave Syer <[email protected]>
1 parent 3448302 commit 59643c3

File tree

10 files changed

+574
-3
lines changed

10 files changed

+574
-3
lines changed

spring/pom.xml

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,11 @@
5858
<artifactId>spring-webflux</artifactId>
5959
<optional>true</optional>
6060
</dependency>
61+
<dependency>
62+
<groupId>org.springframework</groupId>
63+
<artifactId>spring-messaging</artifactId>
64+
<optional>true</optional>
65+
</dependency>
6166
<dependency>
6267
<groupId>io.cloudevents</groupId>
6368
<artifactId>cloudevents-core</artifactId>
@@ -67,7 +72,7 @@
6772
<groupId>io.cloudevents</groupId>
6873
<artifactId>cloudevents-http-basic</artifactId>
6974
<version>${project.version}</version>
70-
<!-- This will be optional too if we want to support messaging -->
75+
<optional>true</optional>
7176
</dependency>
7277
<dependency>
7378
<groupId>org.apache.tomcat.embed</groupId>
@@ -101,5 +106,6 @@
101106
<scope>test</scope>
102107
</dependency>
103108
</dependencies>
109+
104110
</project>
105111

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Copyright 2019-2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.cloudevents.spring.messaging;
17+
18+
import java.util.HashMap;
19+
import java.util.Map;
20+
21+
import io.cloudevents.CloudEvent;
22+
import io.cloudevents.CloudEventContext;
23+
import io.cloudevents.SpecVersion;
24+
import io.cloudevents.core.CloudEventUtils;
25+
26+
import org.springframework.messaging.MessageHeaders;
27+
28+
import static io.cloudevents.spring.messaging.CloudEventsHeaders.CE_PREFIX;
29+
30+
/**
31+
* Utility class for copying message headers to and from {@link CloudEventContext}.
32+
*
33+
* @author Dave Syer
34+
*
35+
*/
36+
public class CloudEventHeaderUtils {
37+
38+
/**
39+
* Helper method for converting {@link MessageHeaders} to a {@link CloudEventContext}.
40+
* The input headers must represent a valid event in "binary" form, i.e. it must have
41+
* headers "ce-id", "ce-specversion" etc.
42+
* @param headers the input request headers
43+
* @return a {@link CloudEventContext} that can be used to create a new
44+
* {@link CloudEvent}
45+
*
46+
*/
47+
public static CloudEventContext fromMap(Map<String, Object> headers) {
48+
Object value = headers.get(CloudEventsHeaders.SPEC_VERSION);
49+
SpecVersion version = value == null ? SpecVersion.V1 : SpecVersion.parse(value.toString());
50+
return CloudEventUtils.toEvent(new MessageBinaryMessageReader(version, headers));
51+
}
52+
53+
/**
54+
* Helper method for extracting {@link MessageHeaders} from a {@link CloudEvent}. The
55+
* result will contain headers canonicalized with a "ce-" prefix, analogous to the
56+
* "binary" message format in Cloud Events.
57+
* @param event the input {@link CloudEvent}
58+
* @return the response headers represented by the event
59+
*/
60+
public static Map<String, Object> toMap(CloudEvent event) {
61+
Map<String, Object> headers = new HashMap<>();
62+
// Probably this should be done in CloudEventContextReaderAdapter
63+
headers.put(CE_PREFIX + "specversion", event.getSpecVersion().toString());
64+
MessageBuilderMessageWriter writer = new MessageBuilderMessageWriter(headers);
65+
CloudEventUtils.toContextReader(event).readContext(writer);
66+
return writer.end().getHeaders();
67+
}
68+
69+
}
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
/*
2+
* Copyright 2019-2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.cloudevents.spring.messaging;
17+
18+
import java.nio.charset.Charset;
19+
20+
import io.cloudevents.CloudEvent;
21+
import io.cloudevents.CloudEventContext;
22+
import io.cloudevents.SpecVersion;
23+
import io.cloudevents.core.CloudEventUtils;
24+
import io.cloudevents.core.format.EventFormat;
25+
import io.cloudevents.core.message.MessageReader;
26+
import io.cloudevents.core.message.impl.GenericStructuredMessageReader;
27+
import io.cloudevents.core.message.impl.MessageUtils;
28+
29+
import org.springframework.messaging.Message;
30+
import org.springframework.messaging.MessageHeaders;
31+
import org.springframework.messaging.converter.MessageConverter;
32+
33+
/**
34+
* A {@link MessageConverter} that can translate to and from a {@link Message
35+
* Message&lt;byte[]>} or {@link Message Message&lt;String>} and a {@link CloudEvent}. The
36+
* {@link CloudEventContext} is canonicalized, with key names given a "ce-" prefix in the
37+
* {@link MessageHeaders}.
38+
*
39+
* @author Dave Syer
40+
*
41+
*/
42+
public class CloudEventMessageConverter implements MessageConverter {
43+
44+
@Override
45+
public Object fromMessage(Message<?> message, Class<?> targetClass) {
46+
if (CloudEvent.class.isAssignableFrom(targetClass)) {
47+
return createMessageReader(message).toEvent();
48+
}
49+
return null;
50+
}
51+
52+
@Override
53+
public Message<?> toMessage(Object payload, MessageHeaders headers) {
54+
if (payload instanceof CloudEvent) {
55+
CloudEvent event = (CloudEvent) payload;
56+
return CloudEventUtils.toReader(event).read(new MessageBuilderMessageWriter(headers));
57+
}
58+
return null;
59+
}
60+
61+
private MessageReader createMessageReader(Message<?> message) {
62+
return MessageUtils.parseStructuredOrBinaryMessage( //
63+
() -> contentType(message.getHeaders()), //
64+
format -> structuredMessageReader(message, format), //
65+
() -> version(message.getHeaders()), //
66+
version -> binaryMessageReader(message, version) //
67+
);
68+
}
69+
70+
private String version(MessageHeaders message) {
71+
if (message.containsKey(CloudEventsHeaders.SPEC_VERSION)) {
72+
return message.get(CloudEventsHeaders.SPEC_VERSION).toString();
73+
}
74+
return null;
75+
}
76+
77+
private MessageReader binaryMessageReader(Message<?> message, SpecVersion version) {
78+
return new MessageBinaryMessageReader(version, message.getHeaders(), getBinaryData(message));
79+
}
80+
81+
private MessageReader structuredMessageReader(Message<?> message, EventFormat format) {
82+
return new GenericStructuredMessageReader(format, getBinaryData(message));
83+
}
84+
85+
private String contentType(MessageHeaders message) {
86+
if (message.containsKey(MessageHeaders.CONTENT_TYPE)) {
87+
return message.get(MessageHeaders.CONTENT_TYPE).toString();
88+
}
89+
if (message.containsKey(CloudEventsHeaders.CONTENT_TYPE)) {
90+
return message.get(CloudEventsHeaders.CONTENT_TYPE).toString();
91+
}
92+
return null;
93+
}
94+
95+
private byte[] getBinaryData(Message<?> message) {
96+
Object payload = message.getPayload();
97+
if (payload instanceof byte[]) {
98+
return (byte[]) payload;
99+
}
100+
else if (payload instanceof String) {
101+
return ((String) payload).getBytes(Charset.defaultCharset());
102+
}
103+
return null;
104+
}
105+
106+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Copyright 2018-Present The CloudEvents Authors
3+
* <p>
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
* <p>
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
* <p>
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
package io.cloudevents.spring.messaging;
19+
20+
public class CloudEventsHeaders {
21+
22+
public static final String CE_PREFIX = "ce-";
23+
24+
public static final String SPEC_VERSION = CE_PREFIX + "specversion";
25+
26+
public static final String CONTENT_TYPE = CE_PREFIX + "datacontenttype";
27+
28+
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Copyright 2019-2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.cloudevents.spring.messaging;
17+
18+
import java.util.Map;
19+
import java.util.function.BiConsumer;
20+
21+
import io.cloudevents.SpecVersion;
22+
import io.cloudevents.core.data.BytesCloudEventData;
23+
import io.cloudevents.core.message.impl.BaseGenericBinaryMessageReaderImpl;
24+
25+
import static io.cloudevents.spring.messaging.CloudEventsHeaders.CE_PREFIX;
26+
import static org.springframework.messaging.MessageHeaders.CONTENT_TYPE;
27+
28+
/**
29+
* Utility for converting maps (message headers) to `CloudEvent` contexts.
30+
*
31+
* @author Dave Syer
32+
*
33+
*/
34+
class MessageBinaryMessageReader extends BaseGenericBinaryMessageReaderImpl<String, Object> {
35+
36+
private final Map<String, Object> headers;
37+
38+
public MessageBinaryMessageReader(SpecVersion version, Map<String, Object> headers, byte[] payload) {
39+
super(version, payload == null ? null : BytesCloudEventData.wrap(payload));
40+
this.headers = headers;
41+
}
42+
43+
public MessageBinaryMessageReader(SpecVersion version, Map<String, Object> headers) {
44+
this(version, headers, null);
45+
}
46+
47+
@Override
48+
protected boolean isContentTypeHeader(String key) {
49+
return CONTENT_TYPE.equalsIgnoreCase(key);
50+
}
51+
52+
@Override
53+
protected boolean isCloudEventsHeader(String key) {
54+
return key != null && key.length() > 3
55+
&& key.substring(0, CE_PREFIX.length()).toLowerCase().startsWith(CE_PREFIX);
56+
}
57+
58+
@Override
59+
protected String toCloudEventsKey(String key) {
60+
return key.substring(CE_PREFIX.length()).toLowerCase();
61+
}
62+
63+
@Override
64+
protected void forEachHeader(BiConsumer<String, Object> fn) {
65+
headers.forEach((k, v) -> fn.accept(k, v));
66+
}
67+
68+
@Override
69+
protected String toCloudEventsValue(Object value) {
70+
return value.toString();
71+
}
72+
73+
}
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* Copyright 2019-2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.cloudevents.spring.messaging;
17+
18+
import java.util.HashMap;
19+
import java.util.Map;
20+
21+
import io.cloudevents.CloudEventData;
22+
import io.cloudevents.SpecVersion;
23+
import io.cloudevents.core.format.EventFormat;
24+
import io.cloudevents.core.message.MessageWriter;
25+
import io.cloudevents.rw.CloudEventContextWriter;
26+
import io.cloudevents.rw.CloudEventRWException;
27+
import io.cloudevents.rw.CloudEventWriter;
28+
29+
import org.springframework.messaging.Message;
30+
import org.springframework.messaging.support.MessageBuilder;
31+
32+
/**
33+
* Internal utility class for copying <code>CloudEvent</code> context to a map (message
34+
* headers).
35+
*
36+
* @author Dave Syer
37+
*
38+
*/
39+
class MessageBuilderMessageWriter
40+
implements CloudEventWriter<Message<byte[]>>, MessageWriter<MessageBuilderMessageWriter, Message<byte[]>> {
41+
42+
private Map<String, Object> headers = new HashMap<>();
43+
44+
public MessageBuilderMessageWriter(Map<String, Object> headers) {
45+
this.headers.putAll(headers);
46+
}
47+
48+
public MessageBuilderMessageWriter() {
49+
}
50+
51+
@Override
52+
public Message<byte[]> setEvent(EventFormat format, byte[] value) throws CloudEventRWException {
53+
headers.put(CloudEventsHeaders.CONTENT_TYPE, format.serializedContentType());
54+
return MessageBuilder.withPayload(value).copyHeaders(headers).build();
55+
}
56+
57+
@Override
58+
public Message<byte[]> end(CloudEventData value) throws CloudEventRWException {
59+
return MessageBuilder.withPayload(value == null ? new byte[0] : value.toBytes()).copyHeaders(headers).build();
60+
}
61+
62+
@Override
63+
public Message<byte[]> end() {
64+
return MessageBuilder.withPayload(new byte[0]).copyHeaders(headers).build();
65+
}
66+
67+
@Override
68+
public CloudEventContextWriter withContextAttribute(String name, String value) throws CloudEventRWException {
69+
headers.put(CloudEventsHeaders.CE_PREFIX + name, value);
70+
return this;
71+
}
72+
73+
@Override
74+
public MessageBuilderMessageWriter create(SpecVersion version) {
75+
headers.put(CloudEventsHeaders.SPEC_VERSION, version.toString());
76+
return this;
77+
}
78+
79+
}
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
/**
2+
* Provides classes related to working with Cloud Events within the context of Spring Messaging.
3+
*/
4+
package io.cloudevents.spring.messaging;

0 commit comments

Comments
 (0)