Skip to content

Commit 698cdf7

Browse files
authored
1 parent 4ebeab0 commit 698cdf7

File tree

13 files changed

+914
-0
lines changed

13 files changed

+914
-0
lines changed

examples/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
<module>spring-reactive</module>
3030
<module>spring-rsocket</module>
3131
<module>spring-function</module>
32+
<module>rocketmq</module>
3233
</modules>
3334

3435
</project>

examples/rocketmq/README.md

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# RocketMQ + CloudEvents Sample
2+
3+
This example demonstrates the integration of [RocketMQ 5.x client library](https://github.com/apache/rocketmq-clients)
4+
with CloudEvents to create a RocketMQ binding.
5+
6+
## Building the Project
7+
8+
```shell
9+
mvn package
10+
```
11+
12+
## Setting Up a RocketMQ Instance
13+
14+
Follow the [quickstart guide](https://rocketmq.apache.org/docs/quick-start/01quickstart) on the official RocketMQ
15+
website to set up the necessary components, including nameserver, proxy, and broker.
16+
17+
## Event Production
18+
19+
```shell
20+
mvn exec:java -Dexec.mainClass="io.cloudevents.examples.rocketmq.RocketmqProducer" -Dexec.args="foobar:8081 sample-topic"
21+
```
22+
23+
## Event Consumption
24+
25+
```shell
26+
mvn exec:java -Dexec.mainClass="io.cloudevents.examples.rocketmq.RocketmqConsumer" -Dexec.args="foobar:8081 sample-topic sample-consumer-group"
27+
```

examples/rocketmq/pom.xml

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<parent>
6+
<artifactId>cloudevents-examples</artifactId>
7+
<groupId>io.cloudevents</groupId>
8+
<version>2.5.0-SNAPSHOT</version>
9+
</parent>
10+
<modelVersion>4.0.0</modelVersion>
11+
12+
<artifactId>cloudevents-rocketmq-example</artifactId>
13+
14+
<dependencies>
15+
<dependency>
16+
<groupId>io.cloudevents</groupId>
17+
<artifactId>cloudevents-rocketmq</artifactId>
18+
<version>${project.version}</version>
19+
</dependency>
20+
</dependencies>
21+
</project>
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package io.cloudevents.examples.rocketmq;
2+
3+
import io.cloudevents.CloudEvent;
4+
import io.cloudevents.core.message.MessageReader;
5+
import io.cloudevents.rocketmq.RocketMqMessageFactory;
6+
import java.io.IOException;
7+
import java.util.Collections;
8+
import org.apache.rocketmq.client.apis.ClientConfiguration;
9+
import org.apache.rocketmq.client.apis.ClientException;
10+
import org.apache.rocketmq.client.apis.ClientServiceProvider;
11+
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
12+
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
13+
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
14+
15+
public class RocketmqConsumer {
16+
private RocketmqConsumer() {
17+
}
18+
19+
public static void main(String[] args) throws InterruptedException, ClientException, IOException {
20+
if (args.length < 3) {
21+
System.out.println("Usage: rocketmq_consumer <endpoints> <topic> <consumer_group>");
22+
return;
23+
}
24+
final ClientServiceProvider provider = ClientServiceProvider.loadService();
25+
String endpoints = args[0];
26+
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
27+
.setEndpoints(endpoints)
28+
.build();
29+
FilterExpression filterExpression = new FilterExpression();
30+
String topic = args[1];
31+
String consumerGroup = args[2];
32+
33+
// Create the RocketMQ Consumer.
34+
PushConsumer pushConsumer = provider.newPushConsumerBuilder()
35+
.setClientConfiguration(clientConfiguration)
36+
.setConsumerGroup(consumerGroup)
37+
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
38+
.setMessageListener(messageView -> {
39+
final MessageReader reader = RocketMqMessageFactory.createReader(messageView);
40+
final CloudEvent event = reader.toEvent();
41+
System.out.println("Received event=" + event + ", messageId=" + messageView.getMessageId());
42+
return ConsumeResult.SUCCESS;
43+
})
44+
.build();
45+
// Block the main thread, no need for production environment.
46+
Thread.sleep(Long.MAX_VALUE);
47+
// Close the push consumer when you don't need it anymore.
48+
pushConsumer.close();
49+
}
50+
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
package io.cloudevents.examples.rocketmq;
2+
3+
import io.cloudevents.CloudEvent;
4+
import io.cloudevents.core.v1.CloudEventBuilder;
5+
import io.cloudevents.rocketmq.RocketMqMessageFactory;
6+
import io.cloudevents.types.Time;
7+
import java.io.IOException;
8+
import java.net.URI;
9+
import java.nio.charset.StandardCharsets;
10+
import java.util.HashMap;
11+
import java.util.Map;
12+
import org.apache.rocketmq.client.apis.ClientConfiguration;
13+
import org.apache.rocketmq.client.apis.ClientException;
14+
import org.apache.rocketmq.client.apis.ClientServiceProvider;
15+
import org.apache.rocketmq.client.apis.message.Message;
16+
import org.apache.rocketmq.client.apis.producer.Producer;
17+
import org.apache.rocketmq.client.apis.producer.SendReceipt;
18+
import org.apache.rocketmq.shaded.com.google.gson.Gson;
19+
20+
public class RocketmqProducer {
21+
private RocketmqProducer() {
22+
}
23+
24+
public static void main(String[] args) throws ClientException, IOException {
25+
if (args.length < 2) {
26+
System.out.println("Usage: rocketmq_producer <endpoints> <topic>");
27+
return;
28+
}
29+
final ClientServiceProvider provider = ClientServiceProvider.loadService();
30+
String endpoints = args[0];
31+
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
32+
.setEndpoints(endpoints)
33+
.build();
34+
String topic = args[1];
35+
36+
// Create the RocketMQ Producer.
37+
final Producer producer = provider.newProducerBuilder()
38+
.setClientConfiguration(clientConfiguration)
39+
.setTopics(topic)
40+
.build();
41+
final Gson gson = new Gson();
42+
Map<String, String> payload = new HashMap<>();
43+
payload.put("foo", "bar");
44+
final CloudEvent event = new CloudEventBuilder()
45+
.withId("client-id")
46+
.withSource(URI.create("http://127.0.0.1/rocketmq-client"))
47+
.withType("com.foobar")
48+
.withTime(Time.parseTime("2022-11-09T21:47:12.032198+00:00"))
49+
.withData(gson.toJson(payload).getBytes(StandardCharsets.UTF_8))
50+
.build();
51+
// Transform event into message.
52+
final Message message = RocketMqMessageFactory.createWriter(topic).writeBinary(event);
53+
try {
54+
// Send the message.
55+
final SendReceipt sendReceipt = producer.send(message);
56+
System.out.println("Send message successfully, messageId=" + sendReceipt.getMessageId());
57+
} catch (Exception e) {
58+
System.out.println("Failed to send message");
59+
e.printStackTrace();
60+
}
61+
// Close the producer when you don't need it anymore.
62+
producer.close();
63+
}
64+
}

pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@
8080
<module>spring</module>
8181
<module>sql</module>
8282
<module>bom</module>
83+
<module>rocketmq</module>
8384
</modules>
8485

8586
<properties>

rocketmq/pom.xml

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
~ Copyright 2018-Present The CloudEvents Authors
4+
~ <p>
5+
~ Licensed under the Apache License, Version 2.0 (the "License");
6+
~ you may not use this file except in compliance with the License.
7+
~ You may obtain a copy of the License at
8+
~ <p>
9+
~ http://www.apache.org/licenses/LICENSE-2.0
10+
~ <p>
11+
~ Unless required by applicable law or agreed to in writing, software
12+
~ distributed under the License is distributed on an "AS IS" BASIS,
13+
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
~ See the License for the specific language governing permissions and
15+
~ limitations under the License.
16+
~
17+
-->
18+
<project xmlns="http://maven.apache.org/POM/4.0.0"
19+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
20+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
21+
<modelVersion>4.0.0</modelVersion>
22+
23+
<parent>
24+
<artifactId>cloudevents-parent</artifactId>
25+
<groupId>io.cloudevents</groupId>
26+
<version>2.5.0-SNAPSHOT</version>
27+
</parent>
28+
29+
<artifactId>cloudevents-rocketmq</artifactId>
30+
<name>CloudEvents - RocketMQ Binding</name>
31+
<packaging>jar</packaging>
32+
33+
<properties>
34+
<rocketmq.version>5.0.4</rocketmq.version>
35+
<module-name>io.cloudevents.rocketmq</module-name>
36+
</properties>
37+
38+
<dependencies>
39+
<dependency>
40+
<groupId>io.cloudevents</groupId>
41+
<artifactId>cloudevents-core</artifactId>
42+
<version>${project.version}</version>
43+
</dependency>
44+
45+
<dependency>
46+
<groupId>org.apache.rocketmq</groupId>
47+
<artifactId>rocketmq-client-java</artifactId>
48+
<version>${rocketmq.version}</version>
49+
</dependency>
50+
51+
<!-- test dependencies -->
52+
<dependency>
53+
<groupId>org.junit.jupiter</groupId>
54+
<artifactId>junit-jupiter</artifactId>
55+
<version>${junit-jupiter.version}</version>
56+
<scope>test</scope>
57+
</dependency>
58+
<dependency>
59+
<groupId>io.cloudevents</groupId>
60+
<artifactId>cloudevents-core</artifactId>
61+
<classifier>tests</classifier>
62+
<type>test-jar</type>
63+
<version>${project.version}</version>
64+
<scope>test</scope>
65+
</dependency>
66+
<dependency>
67+
<groupId>org.assertj</groupId>
68+
<artifactId>assertj-core</artifactId>
69+
<version>${assertj-core.version}</version>
70+
<scope>test</scope>
71+
</dependency>
72+
</dependencies>
73+
</project>
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Copyright 2018-Present The CloudEvents Authors
3+
* <p>
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
* <p>
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
* <p>
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
package io.cloudevents.rocketmq;
19+
20+
import io.cloudevents.core.message.MessageReader;
21+
import io.cloudevents.core.message.MessageWriter;
22+
import io.cloudevents.core.message.impl.GenericStructuredMessageReader;
23+
import io.cloudevents.core.message.impl.MessageUtils;
24+
import io.cloudevents.rw.CloudEventWriter;
25+
import java.nio.ByteBuffer;
26+
import java.util.Map;
27+
import org.apache.rocketmq.client.apis.message.Message;
28+
import org.apache.rocketmq.client.apis.message.MessageView;
29+
30+
/**
31+
* A factory class providing convenience methods for creating {@link MessageReader} and {@link MessageWriter} instances
32+
* based on RocketMQ {@link MessageView} and {@link Message}.
33+
*/
34+
public class RocketMqMessageFactory {
35+
private RocketMqMessageFactory() {
36+
// prevent instantiation
37+
}
38+
39+
/**
40+
* Creates a {@link MessageReader} to read a RocketMQ {@link MessageView}.
41+
*
42+
* @param message The RocketMQ {@link MessageView} to read from.
43+
* @return A {@link MessageReader} that can read the given {@link MessageView} to a {@link io.cloudevents.CloudEvent} representation.
44+
*/
45+
public static MessageReader createReader(final MessageView message) {
46+
final ByteBuffer byteBuffer = message.getBody();
47+
byte[] body = new byte[byteBuffer.remaining()];
48+
byteBuffer.get(body);
49+
final Map<String, String> properties = message.getProperties();
50+
final String contentType = properties.get(RocketmqConstants.PROPERTY_CONTENT_TYPE);
51+
return createReader(contentType, properties, body);
52+
}
53+
54+
/**
55+
* Creates a {@link MessageReader} using the content type, properties, and body of a RocketMQ {@link MessageView}.
56+
*
57+
* @param contentType The content type of the message payload.
58+
* @param properties The properties of the RocketMQ message containing CloudEvent metadata (attributes and/or extensions).
59+
* @param body The message body as byte array.
60+
* @return A {@link MessageReader} capable of parsing a {@link io.cloudevents.CloudEvent} from the content-type, properties, and payload of a RocketMQ message.
61+
*/
62+
public static MessageReader createReader(final String contentType, final Map<String, String> properties, final byte[] body) {
63+
return MessageUtils.parseStructuredOrBinaryMessage(
64+
() -> contentType,
65+
format -> new GenericStructuredMessageReader(format, body),
66+
() -> properties.get(RocketmqConstants.MESSAGE_PROPERTY_SPEC_VERSION),
67+
sv -> new RocketmqBinaryMessageReader(sv, properties, contentType, body)
68+
);
69+
}
70+
71+
/**
72+
* Creates a {@link MessageWriter} instance capable of translating a {@link io.cloudevents.CloudEvent} to a RocketMQ {@link Message}.
73+
*
74+
* @param topic The topic to which the created RocketMQ message will be sent.
75+
* @return A {@link MessageWriter} capable of converting a {@link io.cloudevents.CloudEvent} to a RocketMQ {@link Message}.
76+
*/
77+
public static MessageWriter<CloudEventWriter<Message>, Message> createWriter(final String topic) {
78+
return new RocketmqMessageWriter(topic);
79+
}
80+
}

0 commit comments

Comments
 (0)