Skip to content

Commit 2310d55

Browse files
committed
implement lite simple consumer
Change-Id: I7d00d5348672f88d6d2cf492f7323131656a6c0c
1 parent c159a67 commit 2310d55

28 files changed

+1823
-674
lines changed

java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientServiceProvider.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.Iterator;
2121
import java.util.ServiceLoader;
2222
import org.apache.rocketmq.client.apis.consumer.LitePushConsumerBuilder;
23+
import org.apache.rocketmq.client.apis.consumer.LiteSimpleConsumerBuilder;
2324
import org.apache.rocketmq.client.apis.consumer.PushConsumerBuilder;
2425
import org.apache.rocketmq.client.apis.consumer.SimpleConsumerBuilder;
2526
import org.apache.rocketmq.client.apis.message.MessageBuilder;
@@ -107,6 +108,13 @@ static ClientServiceProvider doLoad() {
107108
*/
108109
LitePushConsumerBuilder newLitePushConsumerBuilder();
109110

111+
/**
112+
* Get the lite simple consumer builder by the current provider.
113+
*
114+
* @return the lite simple consumer builder instance.
115+
*/
116+
LiteSimpleConsumerBuilder newLiteSimpleConsumerBuilder();
117+
110118
/**
111119
* Get the simple consumer builder by the current provider.
112120
*
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.rocketmq.client.apis.consumer;
19+
20+
import java.io.Closeable;
21+
import java.io.IOException;
22+
import java.time.Duration;
23+
import java.util.List;
24+
import java.util.Set;
25+
import org.apache.rocketmq.client.apis.ClientException;
26+
import org.apache.rocketmq.client.apis.message.MessageView;
27+
28+
/**
29+
* Similar to {@link SimpleConsumer}, but for lite topic.
30+
*/
31+
public interface LiteSimpleConsumer extends Closeable {
32+
33+
/**
34+
* Get the load balancing group for the lite simple consumer.
35+
*
36+
* @return consumer load balancing group.
37+
*/
38+
String getConsumerGroup();
39+
40+
/**
41+
* Subscribe to a lite topic.
42+
* Similar to {@link LitePushConsumer#subscribeLite(String)}
43+
*
44+
* @param liteTopic the name of the lite topic to subscribe
45+
* @throws ClientException if an error occurs during subscription
46+
*/
47+
void subscribeLite(String liteTopic) throws ClientException;
48+
49+
/**
50+
* Subscribe to a lite topic with consumeFromOption to specify the consume from offset.
51+
* Similar to {@link LitePushConsumer#subscribeLite(String, OffsetOption)}
52+
*
53+
* @param liteTopic the name of the lite topic to subscribe
54+
* @param offsetOption the consume from offset
55+
* @throws ClientException if an error occurs during subscription
56+
*/
57+
void subscribeLite(String liteTopic, OffsetOption offsetOption) throws ClientException;
58+
59+
/**
60+
* Unsubscribe from a lite topic.
61+
*
62+
* @param liteTopic the name of the lite topic to unsubscribe from
63+
* @throws ClientException if an error occurs during unsubscription
64+
*/
65+
void unsubscribeLite(String liteTopic) throws ClientException;
66+
67+
/**
68+
* Get the lite topic immutable set.
69+
*
70+
* @return lite topic immutable set.
71+
*/
72+
Set<String> getLiteTopicSet();
73+
74+
/**
75+
* Fetch messages from the server synchronously.
76+
* <p> This method returns immediately if there are messages available.
77+
* Otherwise, it will await the passed timeout. If the timeout expires, an empty list will be returned.
78+
*
79+
* @param maxMessageNum max message num of server returned.
80+
* @param invisibleDuration set the invisibleDuration of messages to return from the server. These messages will be
81+
* invisible to other consumers unless timeout.
82+
* @return list of message view.
83+
*/
84+
List<MessageView> receive(int maxMessageNum, Duration invisibleDuration) throws ClientException;
85+
86+
/**
87+
* Ack message to server synchronously, server commit this message.
88+
*
89+
* <p>Duplicate ack request does not take effect and throw an exception.
90+
*
91+
* @param messageView special message view with handle want to ack.
92+
*/
93+
void ack(MessageView messageView) throws ClientException;
94+
95+
/**
96+
* Changes the invisible duration of a specified message synchronously.
97+
*
98+
* <p> The origin invisible duration for a message decide by ack request.
99+
*
100+
* <p>Duplicate change requests will refresh the next visible time of this message to consumers.
101+
*
102+
* @param messageView the message view to change invisible time.
103+
* @param invisibleDuration new timestamp the message could be visible and re-consume which start from current time.
104+
*/
105+
void changeInvisibleDuration(MessageView messageView, Duration invisibleDuration) throws ClientException;
106+
107+
/**
108+
* Close the lite simple consumer and release all related resources.
109+
*
110+
* <p>Once lite simple consumer is closed, <strong>it could not be started once again.</strong> we maintained an FSM
111+
* (finite-state machine) to record the different states for each lite simple consumer.
112+
*/
113+
@Override
114+
void close() throws IOException;
115+
}
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.rocketmq.client.apis.consumer;
19+
20+
import java.time.Duration;
21+
import org.apache.rocketmq.client.apis.ClientConfiguration;
22+
import org.apache.rocketmq.client.apis.ClientConfigurationBuilder;
23+
import org.apache.rocketmq.client.apis.ClientException;
24+
25+
/**
26+
* Builder to config and start {@link LiteSimpleConsumer}.
27+
*/
28+
public interface LiteSimpleConsumerBuilder {
29+
30+
/**
31+
* Set the bind topic for lite consumer.
32+
*
33+
* @return the consumer builder instance.
34+
*/
35+
LiteSimpleConsumerBuilder bindTopic(String bindTopic);
36+
37+
/**
38+
* Set the client configuration for the lite simple consumer.
39+
*
40+
* @param clientConfiguration client's configuration.
41+
* @return the lite simple consumer builder instance.
42+
*/
43+
LiteSimpleConsumerBuilder setClientConfiguration(ClientConfiguration clientConfiguration);
44+
45+
/**
46+
* Set the load balancing group for the lite simple consumer.
47+
*
48+
* @param consumerGroup consumer load balancing group.
49+
* @return the consumer builder instance.
50+
*/
51+
LiteSimpleConsumerBuilder setConsumerGroup(String consumerGroup);
52+
53+
/**
54+
* Set the max await time when receive messages from the server.
55+
*
56+
* <p>The lite simple consumer will hold this long-polling receive requests until a message is returned or a timeout
57+
* occurs.
58+
*
59+
* <p> Especially, the RPC request timeout for long-polling of {@link LiteSimpleConsumer} is increased by
60+
* {@linkplain ClientConfigurationBuilder#setRequestTimeout(Duration) request timeout} based on await duration here.
61+
*
62+
* @param awaitDuration The maximum time to block when no message is available.
63+
* @return the consumer builder instance.
64+
*/
65+
LiteSimpleConsumerBuilder setAwaitDuration(Duration awaitDuration);
66+
67+
/**
68+
* Finalize the build of the {@link LiteSimpleConsumer} instance and start.
69+
*
70+
* <p>This method will block until the lite simple consumer starts successfully.
71+
*
72+
* <p>Especially, if this method is invoked more than once,
73+
* different lite simple consumers will be created and started.
74+
*
75+
* @return the lite simple consumer instance.
76+
*/
77+
LiteSimpleConsumer build() throws ClientException;
78+
}

java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@
9494
import org.slf4j.Logger;
9595
import org.slf4j.LoggerFactory;
9696

97-
@SuppressWarnings({"UnstableApiUsage", "NullableProblems"})
97+
@SuppressWarnings({"NullableProblems"})
9898
public abstract class ClientImpl extends AbstractIdleService implements Client, ClientSessionHandler,
9999
MessageInterceptor {
100100
private static final Logger log = LoggerFactory.getLogger(ClientImpl.class);
@@ -175,7 +175,6 @@ public ClientImpl(ClientConfiguration clientConfiguration, Set<String> topics) {
175175
new ThreadFactoryImpl("CommandExecutor", clientIdIndex));
176176
}
177177

178-
179178
/**
180179
* Start the rocketmq client and do some preparatory work.
181180
*/
@@ -205,8 +204,7 @@ protected void startUp() throws Exception {
205204
}
206205
}
207206
// Update route cache periodically.
208-
final ScheduledExecutorService scheduler = clientManager.getScheduler();
209-
this.updateRouteCacheFuture = scheduler.scheduleWithFixedDelay(() -> {
207+
updateRouteCacheFuture = getScheduler().scheduleWithFixedDelay(() -> {
210208
try {
211209
updateRouteCache();
212210
} catch (Throwable t) {
@@ -341,7 +339,7 @@ public void onPrintThreadStackTraceCommand(Endpoints endpoints, PrintThreadStack
341339
* @param settings settings received from remote.
342340
*/
343341
@Override
344-
public final void onSettingsCommand(Endpoints endpoints, apache.rocketmq.v2.Settings settings) {
342+
public void onSettingsCommand(Endpoints endpoints, apache.rocketmq.v2.Settings settings) {
345343
final Metric metric = new Metric(settings.getMetric());
346344
clientMeterManager.reset(metric);
347345
this.getSettings().sync(settings);
@@ -420,7 +418,7 @@ private ClientSessionImpl getClientSession(Endpoints endpoints) throws ClientExc
420418
* Triggered when {@link TopicRouteData} is fetched from remote.
421419
*/
422420
public ListenableFuture<TopicRouteData> onTopicRouteDataFetched(String topic,
423-
TopicRouteData topicRouteData) throws ClientException {
421+
TopicRouteData topicRouteData) throws ClientException {
424422
final Set<Endpoints> routeEndpoints = topicRouteData
425423
.getMessageQueues().stream()
426424
.map(mq -> mq.getBroker().getEndpoints())
@@ -481,8 +479,9 @@ public void onRecoverOrphanedTransactionCommand(Endpoints endpoints, RecoverOrph
481479

482480
/**
483481
* This method is invoked while request of unsubscribe lite topic is received from remote.
482+
*
484483
* @param endpoints remote endpoints.
485-
* @param command request of unsubscribe lite topic from remote.
484+
* @param command request of unsubscribe lite topic from remote.
486485
*/
487486
@Override
488487
public void onNotifyUnsubscribeLiteCommand(Endpoints endpoints, NotifyUnsubscribeLiteCommand command) {
@@ -762,7 +761,7 @@ public ScheduledExecutorService getScheduler() {
762761
return clientManager.getScheduler();
763762
}
764763

765-
protected <T> T handleClientFuture(ListenableFuture<T> future) throws ClientException {
764+
public <T> T handleClientFuture(ListenableFuture<T> future) throws ClientException {
766765
try {
767766
return future.get();
768767
} catch (InterruptedException e) {
@@ -787,4 +786,13 @@ public ClientConfiguration getClientConfiguration() {
787786
protected String serviceName() {
788787
return super.serviceName() + "-" + clientId.getIndex();
789788
}
789+
790+
public void checkRunning() {
791+
if (!isRunning()) {
792+
String msg = String.format("Client not running, state=%s, clientId=%s",
793+
state(), clientId);
794+
log.error(msg);
795+
throw new IllegalStateException(msg);
796+
}
797+
}
790798
}

java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientServiceProviderImpl.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,13 @@
1919

2020
import org.apache.rocketmq.client.apis.ClientServiceProvider;
2121
import org.apache.rocketmq.client.apis.consumer.LitePushConsumerBuilder;
22+
import org.apache.rocketmq.client.apis.consumer.LiteSimpleConsumerBuilder;
2223
import org.apache.rocketmq.client.apis.consumer.PushConsumerBuilder;
2324
import org.apache.rocketmq.client.apis.consumer.SimpleConsumerBuilder;
2425
import org.apache.rocketmq.client.apis.message.MessageBuilder;
2526
import org.apache.rocketmq.client.apis.producer.ProducerBuilder;
2627
import org.apache.rocketmq.client.java.impl.consumer.LitePushConsumerBuilderImpl;
28+
import org.apache.rocketmq.client.java.impl.consumer.LiteSimpleConsumerBuilderImpl;
2729
import org.apache.rocketmq.client.java.impl.consumer.PushConsumerBuilderImpl;
2830
import org.apache.rocketmq.client.java.impl.consumer.SimpleConsumerBuilderImpl;
2931
import org.apache.rocketmq.client.java.impl.producer.ProducerBuilderImpl;
@@ -54,6 +56,11 @@ public LitePushConsumerBuilder newLitePushConsumerBuilder() {
5456
return new LitePushConsumerBuilderImpl();
5557
}
5658

59+
@Override
60+
public LiteSimpleConsumerBuilder newLiteSimpleConsumerBuilder() {
61+
return new LiteSimpleConsumerBuilderImpl();
62+
}
63+
5764
/**
5865
* @see ClientServiceProvider#newSimpleConsumerBuilder()
5966
*/

java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientType.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ public enum ClientType {
2121
PRODUCER,
2222
PUSH_CONSUMER,
2323
LITE_PUSH_CONSUMER,
24-
SIMPLE_CONSUMER;
24+
SIMPLE_CONSUMER,
25+
LITE_SIMPLE_CONSUMER;
2526

2627
public apache.rocketmq.v2.ClientType toProtobuf() {
2728
if (PRODUCER.equals(this)) {
@@ -36,6 +37,9 @@ public apache.rocketmq.v2.ClientType toProtobuf() {
3637
if (SIMPLE_CONSUMER.equals(this)) {
3738
return apache.rocketmq.v2.ClientType.SIMPLE_CONSUMER;
3839
}
40+
if (LITE_SIMPLE_CONSUMER.equals(this)) {
41+
return apache.rocketmq.v2.ClientType.LITE_SIMPLE_CONSUMER;
42+
}
3943
return apache.rocketmq.v2.ClientType.CLIENT_TYPE_UNSPECIFIED;
4044
}
4145
}

java/client/src/main/java/org/apache/rocketmq/client/java/impl/Settings.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,10 +55,6 @@ public RetryPolicy getRetryPolicy() {
5555
return retryPolicy;
5656
}
5757

58-
public ClientType getClientType() {
59-
return clientType;
60-
}
61-
6258
@ExcludeFromJacocoGeneratedReport
6359
@Override
6460
public String toString() {

0 commit comments

Comments
 (0)