Skip to content

Commit 8903b2f

Browse files
authored
[ISSUE #1152] [RIP-80] [Java] Implementation of Priority Message (#1153)
Change-Id: Ib4d1d67573948a5c66ec1e0c83082d2bfb44d9e6
1 parent 692dd72 commit 8903b2f

File tree

17 files changed

+249
-7
lines changed

17 files changed

+249
-7
lines changed

README-CN.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
| Push consumer with concurrent message listener |||| 🚧 || 🚧 | 🚧 | 🚧 |
2929
| Push consumer with FIFO message listener |||| 🚧 || 🚧 | 🚧 | 🚧 |
3030
| Push consumer with FIFO consume accelerator ||| 🚧 | 🚧 | 🚧 | 🚧 | 🚧 | 🚧 |
31+
| Priority Message || 🚧 | 🚧 | 🚧 | 🚧 | 🚧 | 🚧 | 🚧 |
3132

3233
## 先决条件和构建
3334

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ Provide cloud-native and robust solutions for Java, C++, C#, Golang, Rust and al
2828
| Push consumer with concurrent message listener |||| 🚧 || 🚧 | 🚧 | 🚧 |
2929
| Push consumer with FIFO message listener |||| 🚧 || 🚧 | 🚧 | 🚧 |
3030
| Push consumer with FIFO consume accelerator ||| 🚧 | 🚧 | 🚧 | 🚧 | 🚧 | 🚧 |
31+
| Priority Message || 🚧 | 🚧 | 🚧 | 🚧 | 🚧 | 🚧 | 🚧 |
3132

3233
## Prerequisite and Build
3334

java/client-apis/src/main/java/org/apache/rocketmq/client/apis/message/Message.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,4 +86,11 @@ public interface Message {
8686
* timestamp is not specified.
8787
*/
8888
Optional<Long> getDeliveryTimestamp();
89+
90+
/**
91+
* Get the priority of the message, which makes sense only when topic type is priority.
92+
*
93+
* @return message priority, which is optional, {@link Optional#empty()} means priority is not specified.
94+
*/
95+
Optional<Integer> getPriority();
8996
}

java/client-apis/src/main/java/org/apache/rocketmq/client/apis/message/MessageBuilder.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,13 @@ public interface MessageBuilder {
105105
*/
106106
MessageBuilder setDeliveryTimestamp(long deliveryTimestamp);
107107

108+
/**
109+
* Set the priority for the message, which is optional.
110+
* @param priority non-negative number in the range [0, N], regarded as highest priority if exceeds N
111+
* @return the message builder instance.
112+
*/
113+
MessageBuilder setPriority(int priority);
114+
108115
/**
109116
* Add user property for the message.
110117
*

java/client-apis/src/main/java/org/apache/rocketmq/client/apis/message/MessageView.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,13 @@ public interface MessageView {
9494
*/
9595
Optional<Long> getDeliveryTimestamp();
9696

97+
/**
98+
* Get the priority of the message, which makes sense only when topic type is priority.
99+
*
100+
* @return message priority, which is optional, {@link Optional#empty()} means priority is not specified.
101+
*/
102+
Optional<Integer> getPriority();
103+
97104
/**
98105
* Get the born host of the message.
99106
*
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.rocketmq.client.java.example;
19+
20+
import java.nio.charset.StandardCharsets;
21+
import org.apache.rocketmq.client.apis.ClientException;
22+
import org.apache.rocketmq.client.apis.ClientServiceProvider;
23+
import org.apache.rocketmq.client.apis.message.Message;
24+
import org.apache.rocketmq.client.apis.producer.Producer;
25+
import org.apache.rocketmq.client.apis.producer.SendReceipt;
26+
import org.slf4j.Logger;
27+
import org.slf4j.LoggerFactory;
28+
29+
public class ProducerPriorityMessageExample {
30+
private static final Logger log = LoggerFactory.getLogger(ProducerPriorityMessageExample.class);
31+
32+
private ProducerPriorityMessageExample() {
33+
}
34+
35+
public static void main(String[] args) throws ClientException {
36+
final ClientServiceProvider provider = ClientServiceProvider.loadService();
37+
38+
String topic = "yourPriorityTopic";
39+
final Producer producer = ProducerSingleton.getInstance(topic);
40+
// Define your message body.
41+
byte[] body = "This is a delay message for Apache RocketMQ".getBytes(StandardCharsets.UTF_8);
42+
String tag = "yourMessageTagA";
43+
final Message message = provider.newMessageBuilder()
44+
// Set topic for the current message.
45+
.setTopic(topic)
46+
// Message secondary classifier of message besides topic.
47+
.setTag(tag)
48+
// Key(s) of the message, another way to mark message besides message id.
49+
.setKeys("yourMessageKey")
50+
// Set priority of message.
51+
.setPriority(1)
52+
.setBody(body)
53+
.build();
54+
try {
55+
final SendReceipt sendReceipt = producer.send(message);
56+
log.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
57+
} catch (Throwable t) {
58+
log.error("Failed to send message", t);
59+
}
60+
// Close the producer when you don't need it anymore.
61+
// You could close it manually or add this into the JVM shutdown hook.
62+
// producer.close();
63+
}
64+
}

java/client/src/main/java/org/apache/rocketmq/client/java/message/GeneralMessage.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,13 @@ public interface GeneralMessage {
9696
*/
9797
Optional<Long> getDeliveryTimestamp();
9898

99+
/**
100+
* Get the priority of the message, which makes sense only when topic type is priority.
101+
*
102+
* @return message priority, which is optional, {@link Optional#empty()} means priority is not specified.
103+
*/
104+
Optional<Integer> getPriority();
105+
99106
/**
100107
* Get the born host of the message.
101108
*

java/client/src/main/java/org/apache/rocketmq/client/java/message/GeneralMessageImpl.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ public class GeneralMessageImpl implements GeneralMessage {
3737
private final String messageGroup;
3838
private final String liteTopic;
3939
private final Long deliveryTimestamp;
40+
private final Integer priority;
4041
private final String bornHost;
4142
private final Long bornTimestamp;
4243
private final Integer deliveryAttempt;
@@ -61,6 +62,7 @@ public GeneralMessageImpl(Message message) {
6162
this.messageGroup = message.getMessageGroup().orElse(null);
6263
this.liteTopic = message.getLiteTopic().orElse(null);
6364
this.deliveryTimestamp = message.getDeliveryTimestamp().orElse(null);
65+
this.priority = message.getPriority().orElse(null);
6466
this.bornHost = null;
6567
this.bornTimestamp = null;
6668
this.deliveryAttempt = null;
@@ -96,6 +98,7 @@ public GeneralMessageImpl(MessageView message) {
9698
this.messageGroup = message.getMessageGroup().orElse(null);
9799
this.liteTopic = message.getLiteTopic().orElse(null);
98100
this.deliveryTimestamp = message.getDeliveryTimestamp().orElse(null);
101+
this.priority = message.getPriority().orElse(null);
99102
this.bornHost = message.getBornHost();
100103
this.bornTimestamp = message.getBornTimestamp();
101104
this.deliveryAttempt = message.getDeliveryAttempt();
@@ -149,6 +152,11 @@ public Optional<Long> getDeliveryTimestamp() {
149152
return Optional.ofNullable(deliveryTimestamp);
150153
}
151154

155+
@Override
156+
public Optional<Integer> getPriority() {
157+
return Optional.ofNullable(priority);
158+
}
159+
152160
@Override
153161
public Optional<String> getBornHost() {
154162
return Optional.ofNullable(bornHost);

java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageBuilderImpl.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ public class MessageBuilderImpl implements MessageBuilder {
4141
protected String messageGroup = null;
4242
protected String liteTopic = null;
4343
protected Long deliveryTimestamp = null;
44+
protected Integer priority = null;
4445
protected Collection<String> keys = new HashSet<>();
4546
protected final Map<String, String> properties = new HashMap<>();
4647

@@ -100,6 +101,7 @@ public MessageBuilder setKeys(String... keys) {
100101
public MessageBuilder setMessageGroup(String messageGroup) {
101102
checkArgument(null == deliveryTimestamp, "messageGroup and deliveryTimestamp should not be set at same time");
102103
checkArgument(null == liteTopic, "messageGroup and liteTopic should not be set at same time");
104+
checkArgument(null == priority, "messageGroup and priority should not be set at same time");
103105
checkArgument(StringUtils.isNotBlank(messageGroup), "messageGroup should not be blank");
104106
this.messageGroup = messageGroup;
105107
return this;
@@ -109,6 +111,7 @@ public MessageBuilder setMessageGroup(String messageGroup) {
109111
public MessageBuilder setLiteTopic(String liteTopic) {
110112
checkArgument(null == deliveryTimestamp, "liteTopic and deliveryTimestamp should not be set at same time");
111113
checkArgument(null == messageGroup, "liteTopic and messageGroup should not be set at same time");
114+
checkArgument(null == priority, "liteTopic and priority should not be set at same time");
112115
checkArgument(StringUtils.isNotBlank(liteTopic), "liteTopic should not be blank");
113116
this.liteTopic = liteTopic;
114117
return this;
@@ -121,10 +124,24 @@ public MessageBuilder setLiteTopic(String liteTopic) {
121124
public MessageBuilder setDeliveryTimestamp(long deliveryTimestamp) {
122125
checkArgument(null == messageGroup, "deliveryTimestamp and messageGroup should not be set at same time");
123126
checkArgument(null == liteTopic, "deliveryTimestamp and liteTopic should not be set at same time");
127+
checkArgument(null == priority, "deliveryTimestamp and priority should not be set at same time");
124128
this.deliveryTimestamp = deliveryTimestamp;
125129
return this;
126130
}
127131

132+
/**
133+
* See {@link MessageBuilder#setPriority(int)}
134+
*/
135+
@Override
136+
public MessageBuilder setPriority(int priority) {
137+
checkArgument(null == deliveryTimestamp, "priority and deliveryTimestamp should not be set at same time");
138+
checkArgument(null == messageGroup, "priority and messageGroup should not be set at same time");
139+
checkArgument(null == liteTopic, "priority and liteTopic should not be set at same time");
140+
checkArgument(priority >= 0, "priority must be greater than or equal to 0");
141+
this.priority = priority;
142+
return this;
143+
}
144+
128145
/**
129146
* See {@link MessageBuilder#addProperty(String, String)}
130147
*/

java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageImpl.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ public class MessageImpl implements Message {
4949
private final String liteTopic;
5050
@Nullable
5151
private final Long deliveryTimestamp;
52+
@Nullable
53+
private final Integer priority;
5254

5355
/**
5456
* The caller is supposed to have validated the arguments and handled throwing exception or
@@ -61,6 +63,7 @@ public class MessageImpl implements Message {
6163
this.messageGroup = builder.messageGroup;
6264
this.liteTopic = builder.liteTopic;
6365
this.deliveryTimestamp = builder.deliveryTimestamp;
66+
this.priority = builder.priority;
6467
this.keys = builder.keys;
6568
this.properties = builder.properties;
6669
}
@@ -81,6 +84,7 @@ public class MessageImpl implements Message {
8184
this.messageGroup = message.getMessageGroup().orElse(null);
8285
this.deliveryTimestamp = message.getDeliveryTimestamp().orElse(null);
8386
this.liteTopic = message.getLiteTopic().orElse(null);
87+
this.priority = message.getPriority().orElse(null);
8488
this.keys = message.getKeys();
8589
this.properties = message.getProperties();
8690
}
@@ -133,6 +137,14 @@ public Optional<Long> getDeliveryTimestamp() {
133137
return Optional.ofNullable(deliveryTimestamp);
134138
}
135139

140+
/**
141+
* @see Message#getPriority()
142+
*/
143+
@Override
144+
public Optional<Integer> getPriority() {
145+
return Optional.ofNullable(priority);
146+
}
147+
136148
/**
137149
* @see Message#getMessageGroup()
138150
*/
@@ -152,7 +164,9 @@ public String toString() {
152164
.add("topic", topic)
153165
.add("tag", tag)
154166
.add("messageGroup", messageGroup)
167+
.add("liteTopic", liteTopic)
155168
.add("deliveryTimestamp", deliveryTimestamp)
169+
.add("priority", priority)
156170
.add("keys", keys)
157171
.add("properties", properties)
158172
.toString();

0 commit comments

Comments
 (0)