Skip to content

Commit ef21697

Browse files
f1amingoQuanmoling
authored
[ISSUE #1107] [Java] LiteTopic support for "RIP‐83 Lite Topic" (#1108)
* producer lite topic * new MessageImpl with MessageBuilderImpl * lite push consumer * createFifoConsumeService * default fifo true for LitePushConsumerSettings * lite push consumer sub bindTopic * * 1. fix heartbeat; 2. add version for interest; * upgrade to 5.0.9-lite-topic-SNAPSHOT and fix logback * add some test * subscribeLite List<String> * Collection instead of List * check point * syncLiteSubscription * code refactor * fix check style * onNotifyUnsubscribeLiteCommand * LiteSubscriptionQuotaExceededException and LiteTopicQuotaExceededException * subscribeLite atomic * client side quota check * client-side lite subscription quota limit * checkLiteSubscriptionQuota 3000 * validateLiteTopic for lite push consumer * ack with lite topic if present. * forward syncSettings exception to upper layer * set liteSubscriptionQuota 1200 * lite push conusmer setInvisibleDuration * LitePushConsumerSettings#toString more fields * revert InvisibleDuration * log when subscribeLite raise ClientException * LitePushConsumerImplTest * more clearer LitePushConsumer Interface * remove subscribeLite(Collection<String>) * code refactor and more unit test * remove setEnableFifoConsumeAccelerator * code recover * fix * fix * fix code style * add liteTopic for ForwardMessageToDeadLetterQueueRequest * fix normal group consume lite topic * log client type * notify unsubscribe lite with liteTopic only * fix test * complete lite example * LitePushConsumer#getLiteTopicSet * fix comments * remove fifo = true * log warn for onNotifyUnsubscribeLiteCommand default implementation * use new proto and update to 5.1.0-lite-topic-SNAPSHOT Change-Id: I01f0c9c64be2614d0215560240548bba2078e505 * log when subscribeLite and unsubscribeLite Change-Id: I01cb9cd4d9a474218e674662407708cf069bf426 * revert pom config Change-Id: I7cc3f5624bf73b8cfeb902f4e0fc3b1fc7d4062d * rocketmq-proto.version to 2.1.0-SNAPSHOT Change-Id: I686e9ab7eaed1af1d1b01a57a88438af4bfa1878 * liteSubscriptionQuota default to 0 Change-Id: I3bcfaaa323b1eea3bd0665ab1d8722a7bc47a4fe * upgrade to 5.1.0-SNAPSHOT Change-Id: I360dbd38de8fa2cc21ab4ebb827fcbdce0258fa6 * add comments Change-Id: I062b4a92032788b6c361304d45b6a40e9a28923a * add comments Change-Id: I53a21ec7faa0ba85d46c020a1709f4e011c799ec --------- Co-authored-by: Quan <[email protected]> Co-authored-by: moling <[email protected]>
1 parent 6cc3717 commit ef21697

File tree

53 files changed

+1631
-114
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

53 files changed

+1631
-114
lines changed

java/client-apis/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
<parent>
1919
<artifactId>rocketmq-client-java-parent</artifactId>
2020
<groupId>org.apache.rocketmq</groupId>
21-
<version>5.0.9-SNAPSHOT</version>
21+
<version>5.1.0-SNAPSHOT</version>
2222
</parent>
2323
<modelVersion>4.0.0</modelVersion>
2424

java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientServiceProvider.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import java.util.Iterator;
2121
import java.util.ServiceLoader;
22+
import org.apache.rocketmq.client.apis.consumer.LitePushConsumerBuilder;
2223
import org.apache.rocketmq.client.apis.consumer.PushConsumerBuilder;
2324
import org.apache.rocketmq.client.apis.consumer.SimpleConsumerBuilder;
2425
import org.apache.rocketmq.client.apis.message.MessageBuilder;
@@ -59,6 +60,13 @@ static ClientServiceProvider loadService() {
5960
*/
6061
PushConsumerBuilder newPushConsumerBuilder();
6162

63+
/**
64+
* Get the lite push consumer builder by the current provider.
65+
*
66+
* @return the lite push consumer builder instance.
67+
*/
68+
LitePushConsumerBuilder newLitePushConsumerBuilder();
69+
6270
/**
6371
* Get the simple consumer builder by the current provider.
6472
*
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.rocketmq.client.apis.consumer;
19+
20+
import java.io.Closeable;
21+
import java.io.IOException;
22+
import java.util.Set;
23+
import org.apache.rocketmq.client.apis.ClientException;
24+
25+
public interface LitePushConsumer extends Closeable {
26+
27+
/**
28+
* Subscribe to a lite topic.
29+
* <p>
30+
* The subscribeLite() method initiates network requests and performs quota verification, so it may fail.
31+
* It's important to check the result of this call to ensure that the subscription was successfully added.
32+
* Possible failure scenarios include:
33+
* 1. Network request errors, which can be retried.
34+
* 2. Quota verification failures, indicated by LiteSubscriptionQuotaExceededException. In this case,
35+
* evaluate whether the quota is insufficient and promptly unsubscribe from unused subscriptions
36+
* using unsubscribeLite() to free up resources.
37+
*
38+
* @param liteTopic the name of the lite topic to subscribe to
39+
* @throws ClientException if an error occurs during subscription
40+
*/
41+
void subscribeLite(String liteTopic) throws ClientException;
42+
43+
/**
44+
* Unsubscribe from a lite topic.
45+
*
46+
* @param liteTopic the name of the lite topic to unsubscribe from
47+
* @throws ClientException if an error occurs during unsubscription
48+
*/
49+
void unsubscribeLite(String liteTopic) throws ClientException;
50+
51+
/**
52+
* Get the lite topic immutable set.
53+
*
54+
* @return lite topic immutable set.
55+
*/
56+
Set<String> getLiteTopicSet();
57+
58+
/**
59+
* Get the load balancing group for the consumer.
60+
*
61+
* @return consumer load balancing group.
62+
*/
63+
String getConsumerGroup();
64+
65+
/**
66+
* Close the consumer and release all related resources.
67+
*
68+
* <p>Once consumer is closed, <strong>it could not be started once again.</strong> we maintained an FSM
69+
* (finite-state machine) to record the different states for each push consumer.
70+
*/
71+
@Override
72+
void close() throws IOException;
73+
}
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.rocketmq.client.apis.consumer;
19+
20+
import org.apache.rocketmq.client.apis.ClientConfiguration;
21+
import org.apache.rocketmq.client.apis.ClientException;
22+
23+
/**
24+
* Builder to config and start {@link LitePushConsumer}.
25+
*/
26+
public interface LitePushConsumerBuilder {
27+
28+
/**
29+
* Set the bind topic for lite push consumer.
30+
*
31+
* @return the consumer builder instance.
32+
*/
33+
LitePushConsumerBuilder bindTopic(String bindTopic);
34+
35+
/**
36+
* Set the client configuration for the consumer.
37+
*
38+
* @param clientConfiguration client's configuration.
39+
* @return the consumer builder instance.
40+
*/
41+
LitePushConsumerBuilder setClientConfiguration(ClientConfiguration clientConfiguration);
42+
43+
/**
44+
* Set the load balancing group for the consumer.
45+
*
46+
* @param consumerGroup consumer load balancing group.
47+
* @return the consumer builder instance.
48+
*/
49+
LitePushConsumerBuilder setConsumerGroup(String consumerGroup);
50+
51+
/**
52+
* Register message listener, all messages meet the subscription expression would across listener here.
53+
*
54+
* @param listener message listener.
55+
* @return the consumer builder instance.
56+
*/
57+
LitePushConsumerBuilder setMessageListener(MessageListener listener);
58+
59+
/**
60+
* Set the maximum number of messages cached locally.
61+
*
62+
* @param count message count.
63+
* @return the consumer builder instance.
64+
*/
65+
LitePushConsumerBuilder setMaxCacheMessageCount(int count);
66+
67+
/**
68+
* Set the maximum bytes of messages cached locally.
69+
*
70+
* @param bytes message size.
71+
* @return the consumer builder instance.
72+
*/
73+
LitePushConsumerBuilder setMaxCacheMessageSizeInBytes(int bytes);
74+
75+
/**
76+
* Set the consumption thread count in parallel.
77+
*
78+
* @param count thread count.
79+
* @return the consumer builder instance.
80+
*/
81+
LitePushConsumerBuilder setConsumptionThreadCount(int count);
82+
83+
/**
84+
* Finalize the build of {@link LitePushConsumer} and start.
85+
*
86+
* <p>This method will block until the push consumer starts successfully.
87+
*
88+
* <p>Especially, if this method is invoked more than once, different push consumers will be created and started.
89+
*
90+
* @return the lite push consumer instance.
91+
*/
92+
LitePushConsumer build() throws ClientException;
93+
}

java/client-apis/src/main/java/org/apache/rocketmq/client/apis/message/Message.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,13 @@ public interface Message {
7272
*/
7373
Optional<String> getMessageGroup();
7474

75+
/**
76+
* Get the lite topic, which is used for lite topic message type.
77+
*
78+
* @return lite topic, which is optional, {@link Optional#empty()} means lite topic is not specified.
79+
*/
80+
Optional<String> getLiteTopic();
81+
7582
/**
7683
* Get the expected delivery timestamp, which make sense only when topic type is delay.
7784
*

java/client-apis/src/main/java/org/apache/rocketmq/client/apis/message/MessageBuilder.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,14 @@ public interface MessageBuilder {
8787
*/
8888
MessageBuilder setMessageGroup(String messageGroup);
8989

90+
/**
91+
* Set the lite topic for the message, which is optional.
92+
*
93+
* @param liteTopic lite topic for the message.
94+
* @return the message builder instance.
95+
*/
96+
MessageBuilder setLiteTopic(String liteTopic);
97+
9098
/**
9199
* Set the delivery timestamp for the message, which is optional.
92100
*

java/client-apis/src/main/java/org/apache/rocketmq/client/apis/message/MessageView.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,13 @@ public interface MessageView {
7979
*/
8080
Optional<String> getMessageGroup();
8181

82+
/**
83+
* Get the lite topic, which makes sense only when the topic type is LITE.
84+
*
85+
* @return lite topic, which is optional, {@link Optional#empty()} means lite topic is not specified.
86+
*/
87+
Optional<String> getLiteTopic();
88+
8289
/**
8390
* Get the expected delivery timestamp, which makes sense only when the topic type is delay.
8491
*

java/client-shade/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
<parent>
1919
<artifactId>rocketmq-client-java-parent</artifactId>
2020
<groupId>org.apache.rocketmq</groupId>
21-
<version>5.0.9-SNAPSHOT</version>
21+
<version>5.1.0-SNAPSHOT</version>
2222
</parent>
2323
<modelVersion>4.0.0</modelVersion>
2424

java/client/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
<parent>
1919
<artifactId>rocketmq-client-java-parent</artifactId>
2020
<groupId>org.apache.rocketmq</groupId>
21-
<version>5.0.9-SNAPSHOT</version>
21+
<version>5.1.0-SNAPSHOT</version>
2222
</parent>
2323
<modelVersion>4.0.0</modelVersion>
2424

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.rocketmq.client.java.example;
19+
20+
import java.nio.charset.StandardCharsets;
21+
import org.apache.rocketmq.client.apis.ClientException;
22+
import org.apache.rocketmq.client.apis.ClientServiceProvider;
23+
import org.apache.rocketmq.client.apis.message.Message;
24+
import org.apache.rocketmq.client.apis.producer.Producer;
25+
import org.apache.rocketmq.client.apis.producer.SendReceipt;
26+
import org.apache.rocketmq.client.java.exception.LiteTopicQuotaExceededException;
27+
import org.slf4j.Logger;
28+
import org.slf4j.LoggerFactory;
29+
30+
public class LiteProducerExample {
31+
static final Logger log = LoggerFactory.getLogger(LiteProducerExample.class);
32+
33+
private LiteProducerExample() {
34+
}
35+
36+
public static void main(String[] args) throws ClientException {
37+
final ClientServiceProvider provider = ClientServiceProvider.loadService();
38+
39+
String topic = "yourParentTopic";
40+
final Producer producer = ProducerSingleton.getInstance(topic);
41+
// Define your message body.
42+
byte[] body = "This is a lite message for Apache RocketMQ".getBytes(StandardCharsets.UTF_8);
43+
final Message message = provider.newMessageBuilder()
44+
// Set topic for the current message.
45+
.setTopic(topic)
46+
// Key(s) of the message, another way to mark message besides message id.
47+
.setKeys("yourMessageKey-3ee439f945d7")
48+
// Set your lite topic
49+
.setLiteTopic("lite-topic-1")
50+
.setBody(body)
51+
.build();
52+
try {
53+
final SendReceipt sendReceipt = producer.send(message);
54+
log.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
55+
} catch (LiteTopicQuotaExceededException e) {
56+
// Lite topic quota exceeded.
57+
// Evaluate and increase the lite topic resource limit.
58+
log.error("Lite topic quota exceeded", e);
59+
} catch (Throwable t) {
60+
log.error("Failed to send message", t);
61+
}
62+
// Close the producer when you don't need it anymore.
63+
// You could close it manually or add this into the JVM shutdown hook.
64+
// producer.close();
65+
}
66+
}

0 commit comments

Comments
 (0)