Skip to content

Commit 6eba87f

Browse files
committed
implement lite simple consumer
Change-Id: I937a6f31e35e4931d8e0ee87b6dd78e5fb62199e
1 parent c159a67 commit 6eba87f

File tree

18 files changed

+216
-674
lines changed

18 files changed

+216
-674
lines changed

java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientServiceProvider.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.Iterator;
2121
import java.util.ServiceLoader;
2222
import org.apache.rocketmq.client.apis.consumer.LitePushConsumerBuilder;
23+
import org.apache.rocketmq.client.apis.consumer.LiteSimpleConsumerBuilder;
2324
import org.apache.rocketmq.client.apis.consumer.PushConsumerBuilder;
2425
import org.apache.rocketmq.client.apis.consumer.SimpleConsumerBuilder;
2526
import org.apache.rocketmq.client.apis.message.MessageBuilder;
@@ -107,6 +108,13 @@ static ClientServiceProvider doLoad() {
107108
*/
108109
LitePushConsumerBuilder newLitePushConsumerBuilder();
109110

111+
/**
112+
* Get the lite simple consumer builder by the current provider.
113+
*
114+
* @return the lite simple consumer builder instance.
115+
*/
116+
LiteSimpleConsumerBuilder newLiteSimpleConsumerBuilder();
117+
110118
/**
111119
* Get the simple consumer builder by the current provider.
112120
*

java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@
9494
import org.slf4j.Logger;
9595
import org.slf4j.LoggerFactory;
9696

97-
@SuppressWarnings({"UnstableApiUsage", "NullableProblems"})
97+
@SuppressWarnings({"NullableProblems"})
9898
public abstract class ClientImpl extends AbstractIdleService implements Client, ClientSessionHandler,
9999
MessageInterceptor {
100100
private static final Logger log = LoggerFactory.getLogger(ClientImpl.class);
@@ -175,7 +175,6 @@ public ClientImpl(ClientConfiguration clientConfiguration, Set<String> topics) {
175175
new ThreadFactoryImpl("CommandExecutor", clientIdIndex));
176176
}
177177

178-
179178
/**
180179
* Start the rocketmq client and do some preparatory work.
181180
*/
@@ -205,8 +204,7 @@ protected void startUp() throws Exception {
205204
}
206205
}
207206
// Update route cache periodically.
208-
final ScheduledExecutorService scheduler = clientManager.getScheduler();
209-
this.updateRouteCacheFuture = scheduler.scheduleWithFixedDelay(() -> {
207+
updateRouteCacheFuture = getScheduler().scheduleWithFixedDelay(() -> {
210208
try {
211209
updateRouteCache();
212210
} catch (Throwable t) {
@@ -341,7 +339,7 @@ public void onPrintThreadStackTraceCommand(Endpoints endpoints, PrintThreadStack
341339
* @param settings settings received from remote.
342340
*/
343341
@Override
344-
public final void onSettingsCommand(Endpoints endpoints, apache.rocketmq.v2.Settings settings) {
342+
public void onSettingsCommand(Endpoints endpoints, apache.rocketmq.v2.Settings settings) {
345343
final Metric metric = new Metric(settings.getMetric());
346344
clientMeterManager.reset(metric);
347345
this.getSettings().sync(settings);
@@ -420,7 +418,7 @@ private ClientSessionImpl getClientSession(Endpoints endpoints) throws ClientExc
420418
* Triggered when {@link TopicRouteData} is fetched from remote.
421419
*/
422420
public ListenableFuture<TopicRouteData> onTopicRouteDataFetched(String topic,
423-
TopicRouteData topicRouteData) throws ClientException {
421+
TopicRouteData topicRouteData) throws ClientException {
424422
final Set<Endpoints> routeEndpoints = topicRouteData
425423
.getMessageQueues().stream()
426424
.map(mq -> mq.getBroker().getEndpoints())
@@ -481,8 +479,9 @@ public void onRecoverOrphanedTransactionCommand(Endpoints endpoints, RecoverOrph
481479

482480
/**
483481
* This method is invoked while request of unsubscribe lite topic is received from remote.
482+
*
484483
* @param endpoints remote endpoints.
485-
* @param command request of unsubscribe lite topic from remote.
484+
* @param command request of unsubscribe lite topic from remote.
486485
*/
487486
@Override
488487
public void onNotifyUnsubscribeLiteCommand(Endpoints endpoints, NotifyUnsubscribeLiteCommand command) {
@@ -762,7 +761,7 @@ public ScheduledExecutorService getScheduler() {
762761
return clientManager.getScheduler();
763762
}
764763

765-
protected <T> T handleClientFuture(ListenableFuture<T> future) throws ClientException {
764+
public <T> T handleClientFuture(ListenableFuture<T> future) throws ClientException {
766765
try {
767766
return future.get();
768767
} catch (InterruptedException e) {
@@ -787,4 +786,13 @@ public ClientConfiguration getClientConfiguration() {
787786
protected String serviceName() {
788787
return super.serviceName() + "-" + clientId.getIndex();
789788
}
789+
790+
public void checkRunning() {
791+
if (!isRunning()) {
792+
String msg = String.format("Client not running, state=%s, clientId=%s",
793+
state(), clientId);
794+
log.error(msg);
795+
throw new IllegalStateException(msg);
796+
}
797+
}
790798
}

java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientServiceProviderImpl.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,13 @@
1919

2020
import org.apache.rocketmq.client.apis.ClientServiceProvider;
2121
import org.apache.rocketmq.client.apis.consumer.LitePushConsumerBuilder;
22+
import org.apache.rocketmq.client.apis.consumer.LiteSimpleConsumerBuilder;
2223
import org.apache.rocketmq.client.apis.consumer.PushConsumerBuilder;
2324
import org.apache.rocketmq.client.apis.consumer.SimpleConsumerBuilder;
2425
import org.apache.rocketmq.client.apis.message.MessageBuilder;
2526
import org.apache.rocketmq.client.apis.producer.ProducerBuilder;
2627
import org.apache.rocketmq.client.java.impl.consumer.LitePushConsumerBuilderImpl;
28+
import org.apache.rocketmq.client.java.impl.consumer.LiteSimpleConsumerBuilderImpl;
2729
import org.apache.rocketmq.client.java.impl.consumer.PushConsumerBuilderImpl;
2830
import org.apache.rocketmq.client.java.impl.consumer.SimpleConsumerBuilderImpl;
2931
import org.apache.rocketmq.client.java.impl.producer.ProducerBuilderImpl;
@@ -54,6 +56,11 @@ public LitePushConsumerBuilder newLitePushConsumerBuilder() {
5456
return new LitePushConsumerBuilderImpl();
5557
}
5658

59+
@Override
60+
public LiteSimpleConsumerBuilder newLiteSimpleConsumerBuilder() {
61+
return new LiteSimpleConsumerBuilderImpl();
62+
}
63+
5764
/**
5865
* @see ClientServiceProvider#newSimpleConsumerBuilder()
5966
*/

java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientType.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ public enum ClientType {
2121
PRODUCER,
2222
PUSH_CONSUMER,
2323
LITE_PUSH_CONSUMER,
24-
SIMPLE_CONSUMER;
24+
SIMPLE_CONSUMER,
25+
LITE_SIMPLE_CONSUMER;
2526

2627
public apache.rocketmq.v2.ClientType toProtobuf() {
2728
if (PRODUCER.equals(this)) {
@@ -36,6 +37,9 @@ public apache.rocketmq.v2.ClientType toProtobuf() {
3637
if (SIMPLE_CONSUMER.equals(this)) {
3738
return apache.rocketmq.v2.ClientType.SIMPLE_CONSUMER;
3839
}
40+
if (LITE_SIMPLE_CONSUMER.equals(this)) {
41+
return apache.rocketmq.v2.ClientType.LITE_SIMPLE_CONSUMER;
42+
}
3943
return apache.rocketmq.v2.ClientType.CLIENT_TYPE_UNSPECIFIED;
4044
}
4145
}

java/client/src/main/java/org/apache/rocketmq/client/java/impl/Settings.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,10 +55,6 @@ public RetryPolicy getRetryPolicy() {
5555
return retryPolicy;
5656
}
5757

58-
public ClientType getClientType() {
59-
return clientType;
60-
}
61-
6258
@ExcludeFromJacocoGeneratedReport
6359
@Override
6460
public String toString() {

java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java

Lines changed: 35 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import apache.rocketmq.v2.NotifyClientTerminationRequest;
3030
import apache.rocketmq.v2.ReceiveMessageRequest;
3131
import apache.rocketmq.v2.ReceiveMessageResponse;
32-
import apache.rocketmq.v2.Resource;
3332
import apache.rocketmq.v2.Status;
3433
import com.google.common.util.concurrent.FutureCallback;
3534
import com.google.common.util.concurrent.Futures;
@@ -58,21 +57,22 @@
5857
import org.apache.rocketmq.client.java.message.GeneralMessage;
5958
import org.apache.rocketmq.client.java.message.GeneralMessageImpl;
6059
import org.apache.rocketmq.client.java.message.MessageViewImpl;
60+
import org.apache.rocketmq.client.java.message.protocol.Resource;
6161
import org.apache.rocketmq.client.java.route.Endpoints;
6262
import org.apache.rocketmq.client.java.route.MessageQueueImpl;
6363
import org.apache.rocketmq.client.java.rpc.RpcFuture;
6464
import org.slf4j.Logger;
6565
import org.slf4j.LoggerFactory;
6666

67-
@SuppressWarnings({"UnstableApiUsage", "NullableProblems"})
68-
abstract class ConsumerImpl extends ClientImpl {
67+
@SuppressWarnings({"NullableProblems"})
68+
public abstract class ConsumerImpl extends ClientImpl {
6969
static final Pattern CONSUMER_GROUP_PATTERN = Pattern.compile("^[%a-zA-Z0-9_-]+$");
7070
private static final Logger log = LoggerFactory.getLogger(ConsumerImpl.class);
71-
private final String consumerGroup;
71+
protected final Resource groupResource;
7272

7373
ConsumerImpl(ClientConfiguration clientConfiguration, String consumerGroup, Set<String> topics) {
7474
super(clientConfiguration, topics);
75-
this.consumerGroup = consumerGroup;
75+
this.groupResource = new Resource(clientConfiguration.getNamespace(), consumerGroup);
7676
}
7777

7878
@SuppressWarnings("SameParameterValue")
@@ -125,31 +125,36 @@ protected ListenableFuture<ReceiveMessageResult> receiveMessage(ReceiveMessageRe
125125
}
126126

127127
private AckMessageRequest wrapAckMessageRequest(MessageViewImpl messageView) {
128-
final Resource topicResource = Resource.newBuilder()
128+
final apache.rocketmq.v2.Resource topicResource = apache.rocketmq.v2.Resource.newBuilder()
129129
.setResourceNamespace(clientConfiguration.getNamespace())
130130
.setName(messageView.getTopic())
131131
.build();
132132
final AckMessageEntry.Builder builder = AckMessageEntry.newBuilder()
133133
.setMessageId(messageView.getMessageId().toString())
134134
.setReceiptHandle(messageView.getReceiptHandle());
135-
if (ClientType.LITE_PUSH_CONSUMER == getSettings().getClientType()) {
135+
if (isLiteConsumer()) {
136136
messageView.getLiteTopic().ifPresent(builder::setLiteTopic);
137137
}
138138
final AckMessageEntry entry = builder.build();
139139
return AckMessageRequest.newBuilder().setGroup(getProtobufGroup()).setTopic(topicResource)
140140
.addEntries(entry).build();
141141
}
142142

143-
private ChangeInvisibleDurationRequest wrapChangeInvisibleDuration(MessageViewImpl messageView,
143+
protected ChangeInvisibleDurationRequest wrapChangeInvisibleDuration(MessageViewImpl messageView,
144144
Duration invisibleDuration) {
145-
final Resource topicResource = Resource.newBuilder()
145+
final apache.rocketmq.v2.Resource topicResource = apache.rocketmq.v2.Resource.newBuilder()
146146
.setResourceNamespace(clientConfiguration.getNamespace())
147147
.setName(messageView.getTopic()).build();
148-
return ChangeInvisibleDurationRequest.newBuilder().setGroup(getProtobufGroup()).setTopic(topicResource)
148+
ChangeInvisibleDurationRequest.Builder builder = ChangeInvisibleDurationRequest.newBuilder()
149+
.setGroup(getProtobufGroup())
150+
.setTopic(topicResource)
149151
.setReceiptHandle(messageView.getReceiptHandle())
150152
.setInvisibleDuration(Durations.fromNanos(invisibleDuration.toNanos()))
151-
.setMessageId(messageView.getMessageId().toString()).build();
152-
153+
.setMessageId(messageView.getMessageId().toString());
154+
if (isLiteConsumer()) {
155+
messageView.getLiteTopic().ifPresent(builder::setLiteTopic);
156+
}
157+
return builder.build();
153158
}
154159

155160
protected RpcFuture<AckMessageRequest, AckMessageResponse> ackMessage(MessageViewImpl messageView) {
@@ -228,11 +233,8 @@ public void onFailure(Throwable t) {
228233
return future;
229234
}
230235

231-
protected Resource getProtobufGroup() {
232-
return Resource.newBuilder()
233-
.setResourceNamespace(clientConfiguration.getNamespace())
234-
.setName(consumerGroup)
235-
.build();
236+
protected apache.rocketmq.v2.Resource getProtobufGroup() {
237+
return groupResource.toProtobuf();
236238
}
237239

238240
@Override
@@ -276,6 +278,21 @@ ReceiveMessageRequest wrapReceiveMessageRequest(int batchSize, MessageQueueImpl
276278
@Override
277279
public HeartbeatRequest wrapHeartbeatRequest() {
278280
return HeartbeatRequest.newBuilder().setGroup(getProtobufGroup())
279-
.setClientType(getSettings().getClientType().toProtobuf()).build();
281+
.setClientType(clientType().toProtobuf()).build();
282+
}
283+
284+
/**
285+
* Get client type for this client instance
286+
*
287+
* @return corresponding ClientType
288+
*/
289+
protected abstract ClientType clientType();
290+
291+
public boolean isLiteConsumer() {
292+
return ClientType.LITE_PUSH_CONSUMER == clientType() || ClientType.LITE_SIMPLE_CONSUMER == clientType();
293+
}
294+
295+
public String getConsumerGroup() {
296+
return groupResource.getName();
280297
}
281298
}

java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/LitePushConsumerBuilderImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ public class LitePushConsumerBuilderImpl implements LitePushConsumerBuilder {
4747
public LitePushConsumerBuilder bindTopic(String bindTopic) {
4848
checkArgument(StringUtils.isNotBlank(bindTopic), "bindTopic should not be blank");
4949
this.bindTopic = bindTopic;
50+
// Default subscription: (bindTopic, *) for code reuse.
51+
this.subscriptionExpressions = ImmutableMap.of(bindTopic, FilterExpression.SUB_ALL);
5052
return this;
5153
}
5254

@@ -98,8 +100,6 @@ public LitePushConsumer build() throws ClientException {
98100
checkNotNull(consumerGroup, "consumerGroup has not been set yet");
99101
checkNotNull(messageListener, "messageListener has not been set yet");
100102
checkNotNull(bindTopic, "bindTopic has not been set yet");
101-
// passing bindTopic through subscriptionExpressions to ClientImpl
102-
subscriptionExpressions = ImmutableMap.of(bindTopic, FilterExpression.SUB_ALL);
103103
final LitePushConsumerImpl litePushConsumer = new LitePushConsumerImpl(this);
104104
litePushConsumer.startAsync().awaitRunning();
105105
return litePushConsumer;

0 commit comments

Comments
 (0)