Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.Iterator;
import java.util.ServiceLoader;
import org.apache.rocketmq.client.apis.consumer.LitePushConsumerBuilder;
import org.apache.rocketmq.client.apis.consumer.LiteSimpleConsumerBuilder;
import org.apache.rocketmq.client.apis.consumer.PushConsumerBuilder;
import org.apache.rocketmq.client.apis.consumer.SimpleConsumerBuilder;
import org.apache.rocketmq.client.apis.message.MessageBuilder;
Expand Down Expand Up @@ -107,6 +108,13 @@ static ClientServiceProvider doLoad() {
*/
LitePushConsumerBuilder newLitePushConsumerBuilder();

/**
* Get the lite simple consumer builder by the current provider.
*
* @return the lite simple consumer builder instance.
*/
LiteSimpleConsumerBuilder newLiteSimpleConsumerBuilder();

/**
* Get the simple consumer builder by the current provider.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.client.apis.consumer;

import java.io.Closeable;
import java.io.IOException;
import java.time.Duration;
import java.util.List;
import java.util.Set;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.message.MessageView;

/**
* Similar to {@link SimpleConsumer}, but for lite topic.
*/
public interface LiteSimpleConsumer extends Closeable {

/**
* Get the load balancing group for the lite simple consumer.
*
* @return consumer load balancing group.
*/
String getConsumerGroup();

/**
* Subscribe to a lite topic.
* Similar to {@link LitePushConsumer#subscribeLite(String)}
*
* @param liteTopic the name of the lite topic to subscribe
* @throws ClientException if an error occurs during subscription
*/
void subscribeLite(String liteTopic) throws ClientException;

/**
* Subscribe to a lite topic with consumeFromOption to specify the consume from offset.
* Similar to {@link LitePushConsumer#subscribeLite(String, OffsetOption)}
*
* @param liteTopic the name of the lite topic to subscribe
* @param offsetOption the consume from offset
* @throws ClientException if an error occurs during subscription
*/
void subscribeLite(String liteTopic, OffsetOption offsetOption) throws ClientException;

/**
* Unsubscribe from a lite topic.
*
* @param liteTopic the name of the lite topic to unsubscribe from
* @throws ClientException if an error occurs during unsubscription
*/
void unsubscribeLite(String liteTopic) throws ClientException;

/**
* Get the lite topic immutable set.
*
* @return lite topic immutable set.
*/
Set<String> getLiteTopicSet();

/**
* Fetch messages from the server synchronously.
* <p> This method returns immediately if there are messages available.
* Otherwise, it will await the passed timeout. If the timeout expires, an empty list will be returned.
*
* @param maxMessageNum max message num of server returned.
* @param invisibleDuration set the invisibleDuration of messages to return from the server. These messages will be
* invisible to other consumers unless timeout.
* @return list of message view.
*/
List<MessageView> receive(int maxMessageNum, Duration invisibleDuration) throws ClientException;

/**
* Ack message to server synchronously, server commit this message.
*
* <p>Duplicate ack request does not take effect and throw an exception.
*
* @param messageView special message view with handle want to ack.
*/
void ack(MessageView messageView) throws ClientException;

/**
* Changes the invisible duration of a specified message synchronously.
*
* <p> The origin invisible duration for a message decide by ack request.
*
* <p>Duplicate change requests will refresh the next visible time of this message to consumers.
*
* @param messageView the message view to change invisible time.
* @param invisibleDuration new timestamp the message could be visible and re-consume which start from current time.
*/
void changeInvisibleDuration(MessageView messageView, Duration invisibleDuration) throws ClientException;

/**
* Close the lite simple consumer and release all related resources.
*
* <p>Once lite simple consumer is closed, <strong>it could not be started once again.</strong> we maintained an FSM
* (finite-state machine) to record the different states for each lite simple consumer.
*/
@Override
void close() throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.client.apis.consumer;

import java.time.Duration;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientConfigurationBuilder;
import org.apache.rocketmq.client.apis.ClientException;

/**
* Builder to config and start {@link LiteSimpleConsumer}.
*/
public interface LiteSimpleConsumerBuilder {

/**
* Set the bind topic for lite consumer.
*
* @return the consumer builder instance.
*/
LiteSimpleConsumerBuilder bindTopic(String bindTopic);

/**
* Set the client configuration for the lite simple consumer.
*
* @param clientConfiguration client's configuration.
* @return the lite simple consumer builder instance.
*/
LiteSimpleConsumerBuilder setClientConfiguration(ClientConfiguration clientConfiguration);

/**
* Set the load balancing group for the lite simple consumer.
*
* @param consumerGroup consumer load balancing group.
* @return the consumer builder instance.
*/
LiteSimpleConsumerBuilder setConsumerGroup(String consumerGroup);

/**
* Set the max await time when receive messages from the server.
*
* <p>The lite simple consumer will hold this long-polling receive requests until a message is returned or a timeout
* occurs.
*
* <p> Especially, the RPC request timeout for long-polling of {@link LiteSimpleConsumer} is increased by
* {@linkplain ClientConfigurationBuilder#setRequestTimeout(Duration) request timeout} based on await duration here.
*
* @param awaitDuration The maximum time to block when no message is available.
* @return the consumer builder instance.
*/
LiteSimpleConsumerBuilder setAwaitDuration(Duration awaitDuration);

/**
* Finalize the build of the {@link LiteSimpleConsumer} instance and start.
*
* <p>This method will block until the lite simple consumer starts successfully.
*
* <p>Especially, if this method is invoked more than once,
* different lite simple consumers will be created and started.
*
* @return the lite simple consumer instance.
*/
LiteSimpleConsumer build() throws ClientException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@SuppressWarnings({"UnstableApiUsage", "NullableProblems"})
@SuppressWarnings({"NullableProblems"})
public abstract class ClientImpl extends AbstractIdleService implements Client, ClientSessionHandler,
MessageInterceptor {
private static final Logger log = LoggerFactory.getLogger(ClientImpl.class);
Expand Down Expand Up @@ -175,7 +175,6 @@ public ClientImpl(ClientConfiguration clientConfiguration, Set<String> topics) {
new ThreadFactoryImpl("CommandExecutor", clientIdIndex));
}


/**
* Start the rocketmq client and do some preparatory work.
*/
Expand Down Expand Up @@ -205,8 +204,7 @@ protected void startUp() throws Exception {
}
}
// Update route cache periodically.
final ScheduledExecutorService scheduler = clientManager.getScheduler();
this.updateRouteCacheFuture = scheduler.scheduleWithFixedDelay(() -> {
updateRouteCacheFuture = getScheduler().scheduleWithFixedDelay(() -> {
try {
updateRouteCache();
} catch (Throwable t) {
Expand Down Expand Up @@ -341,7 +339,7 @@ public void onPrintThreadStackTraceCommand(Endpoints endpoints, PrintThreadStack
* @param settings settings received from remote.
*/
@Override
public final void onSettingsCommand(Endpoints endpoints, apache.rocketmq.v2.Settings settings) {
public void onSettingsCommand(Endpoints endpoints, apache.rocketmq.v2.Settings settings) {
final Metric metric = new Metric(settings.getMetric());
clientMeterManager.reset(metric);
this.getSettings().sync(settings);
Expand Down Expand Up @@ -420,7 +418,7 @@ private ClientSessionImpl getClientSession(Endpoints endpoints) throws ClientExc
* Triggered when {@link TopicRouteData} is fetched from remote.
*/
public ListenableFuture<TopicRouteData> onTopicRouteDataFetched(String topic,
TopicRouteData topicRouteData) throws ClientException {
TopicRouteData topicRouteData) throws ClientException {
final Set<Endpoints> routeEndpoints = topicRouteData
.getMessageQueues().stream()
.map(mq -> mq.getBroker().getEndpoints())
Expand Down Expand Up @@ -481,8 +479,9 @@ public void onRecoverOrphanedTransactionCommand(Endpoints endpoints, RecoverOrph

/**
* This method is invoked while request of unsubscribe lite topic is received from remote.
*
* @param endpoints remote endpoints.
* @param command request of unsubscribe lite topic from remote.
* @param command request of unsubscribe lite topic from remote.
*/
@Override
public void onNotifyUnsubscribeLiteCommand(Endpoints endpoints, NotifyUnsubscribeLiteCommand command) {
Expand Down Expand Up @@ -762,7 +761,7 @@ public ScheduledExecutorService getScheduler() {
return clientManager.getScheduler();
}

protected <T> T handleClientFuture(ListenableFuture<T> future) throws ClientException {
public <T> T handleClientFuture(ListenableFuture<T> future) throws ClientException {
try {
return future.get();
} catch (InterruptedException e) {
Expand All @@ -787,4 +786,13 @@ public ClientConfiguration getClientConfiguration() {
protected String serviceName() {
return super.serviceName() + "-" + clientId.getIndex();
}

public void checkRunning() {
if (!isRunning()) {
String msg = String.format("Client not running, state=%s, clientId=%s",
state(), clientId);
log.error(msg);
throw new IllegalStateException(msg);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@

import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.consumer.LitePushConsumerBuilder;
import org.apache.rocketmq.client.apis.consumer.LiteSimpleConsumerBuilder;
import org.apache.rocketmq.client.apis.consumer.PushConsumerBuilder;
import org.apache.rocketmq.client.apis.consumer.SimpleConsumerBuilder;
import org.apache.rocketmq.client.apis.message.MessageBuilder;
import org.apache.rocketmq.client.apis.producer.ProducerBuilder;
import org.apache.rocketmq.client.java.impl.consumer.LitePushConsumerBuilderImpl;
import org.apache.rocketmq.client.java.impl.consumer.LiteSimpleConsumerBuilderImpl;
import org.apache.rocketmq.client.java.impl.consumer.PushConsumerBuilderImpl;
import org.apache.rocketmq.client.java.impl.consumer.SimpleConsumerBuilderImpl;
import org.apache.rocketmq.client.java.impl.producer.ProducerBuilderImpl;
Expand Down Expand Up @@ -54,6 +56,11 @@ public LitePushConsumerBuilder newLitePushConsumerBuilder() {
return new LitePushConsumerBuilderImpl();
}

@Override
public LiteSimpleConsumerBuilder newLiteSimpleConsumerBuilder() {
return new LiteSimpleConsumerBuilderImpl();
}

/**
* @see ClientServiceProvider#newSimpleConsumerBuilder()
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ public enum ClientType {
PRODUCER,
PUSH_CONSUMER,
LITE_PUSH_CONSUMER,
SIMPLE_CONSUMER;
SIMPLE_CONSUMER,
LITE_SIMPLE_CONSUMER;

public apache.rocketmq.v2.ClientType toProtobuf() {
if (PRODUCER.equals(this)) {
Expand All @@ -36,6 +37,9 @@ public apache.rocketmq.v2.ClientType toProtobuf() {
if (SIMPLE_CONSUMER.equals(this)) {
return apache.rocketmq.v2.ClientType.SIMPLE_CONSUMER;
}
if (LITE_SIMPLE_CONSUMER.equals(this)) {
return apache.rocketmq.v2.ClientType.LITE_SIMPLE_CONSUMER;
}
return apache.rocketmq.v2.ClientType.CLIENT_TYPE_UNSPECIFIED;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,6 @@ public RetryPolicy getRetryPolicy() {
return retryPolicy;
}

public ClientType getClientType() {
return clientType;
}

@ExcludeFromJacocoGeneratedReport
@Override
public String toString() {
Expand Down
Loading
Loading