Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import apache.rocketmq.v2.Subscription;
import apache.rocketmq.v2.SubscriptionEntry;
import apache.rocketmq.v2.TelemetryCommand;
import apache.rocketmq.v2.SyncLiteSubscriptionRequest;
import com.google.protobuf.GeneratedMessageV3;
import io.grpc.Metadata;
import io.netty.channel.ChannelHandlerContext;
Expand Down Expand Up @@ -124,6 +125,13 @@ public List<DefaultAuthorizationContext> build(Metadata metadata, GeneratedMessa
}
result = newSubContexts(metadata, request.getGroup(), request.getMessageQueue().getTopic());
}
if (message instanceof SyncLiteSubscriptionRequest) {
SyncLiteSubscriptionRequest request = (SyncLiteSubscriptionRequest) message;
if (request.getLiteTopicSetCount() <= 0) {
return null;
}
result = newSubContexts(metadata, request.getGroup(), request.getTopic());
}
if (message instanceof AckMessageRequest) {
AckMessageRequest request = (AckMessageRequest) message;
result = newSubContexts(metadata, request.getGroup(), request.getTopic());
Expand Down
127 changes: 124 additions & 3 deletions broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,14 @@
import org.apache.rocketmq.broker.filter.CommitLogDispatcherCalcBitMap;
import org.apache.rocketmq.broker.filter.ConsumerFilterManager;
import org.apache.rocketmq.broker.latency.BrokerFastFailure;
import org.apache.rocketmq.broker.lite.AbstractLiteLifecycleManager;
import org.apache.rocketmq.broker.lite.LiteEventDispatcher;
import org.apache.rocketmq.broker.lite.LiteSubscriptionRegistry;
import org.apache.rocketmq.broker.lite.LiteSubscriptionRegistryImpl;
import org.apache.rocketmq.broker.lite.LiteLifecycleManager;
import org.apache.rocketmq.broker.lite.LiteSharding;
import org.apache.rocketmq.broker.lite.LiteShardingImpl;
import org.apache.rocketmq.broker.lite.RocksDBLiteLifecycleManager;
import org.apache.rocketmq.broker.longpolling.LmqPullRequestHoldService;
import org.apache.rocketmq.broker.longpolling.NotifyMessageArrivingListener;
import org.apache.rocketmq.broker.longpolling.PullRequestHoldService;
Expand All @@ -69,10 +77,13 @@
import org.apache.rocketmq.broker.processor.ClientManageProcessor;
import org.apache.rocketmq.broker.processor.ConsumerManageProcessor;
import org.apache.rocketmq.broker.processor.EndTransactionProcessor;
import org.apache.rocketmq.broker.processor.LiteManagerProcessor;
import org.apache.rocketmq.broker.processor.LiteSubscriptionCtlProcessor;
import org.apache.rocketmq.broker.processor.NotificationProcessor;
import org.apache.rocketmq.broker.processor.PeekMessageProcessor;
import org.apache.rocketmq.broker.processor.PollingInfoProcessor;
import org.apache.rocketmq.broker.processor.PopInflightMessageCounter;
import org.apache.rocketmq.broker.processor.PopLiteMessageProcessor;
import org.apache.rocketmq.broker.processor.PopMessageProcessor;
import org.apache.rocketmq.broker.processor.PullMessageProcessor;
import org.apache.rocketmq.broker.processor.QueryAssignmentProcessor;
Expand Down Expand Up @@ -204,12 +215,19 @@ public class BrokerController {
protected final PullMessageProcessor pullMessageProcessor;
protected final PeekMessageProcessor peekMessageProcessor;
protected final PopMessageProcessor popMessageProcessor;
protected final PopLiteMessageProcessor popLiteMessageProcessor;
protected final AckMessageProcessor ackMessageProcessor;
protected final ChangeInvisibleTimeProcessor changeInvisibleTimeProcessor;
protected final NotificationProcessor notificationProcessor;
protected final PollingInfoProcessor pollingInfoProcessor;
protected final QueryAssignmentProcessor queryAssignmentProcessor;
protected final ClientManageProcessor clientManageProcessor;
protected final LiteSubscriptionCtlProcessor liteSubscriptionCtlProcessor;
protected final LiteSharding liteSharding;
protected final AbstractLiteLifecycleManager liteLifecycleManager;
protected final LiteSubscriptionRegistry liteSubscriptionRegistry;
protected final LiteEventDispatcher liteEventDispatcher;
protected final LiteManagerProcessor liteManagerProcessor;
protected final SendMessageProcessor sendMessageProcessor;
protected final RecallMessageProcessor recallMessageProcessor;
protected final ReplyMessageProcessor replyMessageProcessor;
Expand Down Expand Up @@ -371,18 +389,27 @@ public BrokerController(
this.topicQueueMappingManager = new TopicQueueMappingManager(this);
this.authenticationMetadataManager = AuthenticationFactory.getMetadataManager(this.authConfig);
this.authorizationMetadataManager = AuthorizationFactory.getMetadataManager(this.authConfig);
this.topicRouteInfoManager = new TopicRouteInfoManager(this);
this.liteSharding = new LiteShardingImpl(this, this.topicRouteInfoManager);
this.liteLifecycleManager = this.messageStoreConfig.isEnableRocksDBStore() ?
new RocksDBLiteLifecycleManager(this, this.liteSharding) : new LiteLifecycleManager(this, this.liteSharding);
this.liteSubscriptionRegistry = new LiteSubscriptionRegistryImpl(this, liteLifecycleManager);
this.liteSubscriptionCtlProcessor = new LiteSubscriptionCtlProcessor(this, liteSubscriptionRegistry);
this.liteEventDispatcher = new LiteEventDispatcher(this, this.liteSubscriptionRegistry, this.liteLifecycleManager);
this.liteManagerProcessor = new LiteManagerProcessor(this, liteLifecycleManager, liteSharding);
this.pullMessageProcessor = new PullMessageProcessor(this);
this.peekMessageProcessor = new PeekMessageProcessor(this);
this.pullRequestHoldService = messageStoreConfig.isEnableLmq() ? new LmqPullRequestHoldService(this) : new PullRequestHoldService(this);
this.popMessageProcessor = new PopMessageProcessor(this);
this.popLiteMessageProcessor = new PopLiteMessageProcessor(this, this.liteEventDispatcher);
this.notificationProcessor = new NotificationProcessor(this);
this.pollingInfoProcessor = new PollingInfoProcessor(this);
this.ackMessageProcessor = new AckMessageProcessor(this);
this.changeInvisibleTimeProcessor = new ChangeInvisibleTimeProcessor(this);
this.sendMessageProcessor = new SendMessageProcessor(this);
this.recallMessageProcessor = new RecallMessageProcessor(this);
this.replyMessageProcessor = new ReplyMessageProcessor(this);
this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService, this.popMessageProcessor, this.notificationProcessor);
this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService, this.popMessageProcessor, this.notificationProcessor, this.liteEventDispatcher);
this.consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this);
this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener, this.brokerStatsManager, this.brokerConfig);
this.producerManager = new ProducerManager(this.brokerStatsManager);
Expand Down Expand Up @@ -461,8 +488,6 @@ public boolean online(String instanceId, String group, String topic) {

this.escapeBridge = new EscapeBridge(this);

this.topicRouteInfoManager = new TopicRouteInfoManager(this);

if (this.brokerConfig.isEnableSlaveActingMaster() && !this.brokerConfig.isSkipPreOnline()) {
this.brokerPreOnlineService = new BrokerPreOnlineService(this);
}
Expand Down Expand Up @@ -934,6 +959,8 @@ public boolean recoverAndInitService() throws CloneNotSupportedException {

initialRequestPipeline();

initLiteService();

if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
// Register a listener to reload SslContext
try {
Expand Down Expand Up @@ -1029,6 +1056,21 @@ public PutMessageResult executeBeforePutMessage(MessageExt msg) {
}
});

putMessageHookList.add(new PutMessageHook() {
@Override
public String hookName() {
return "handleLmqQuota";
}

@Override
public PutMessageResult executeBeforePutMessage(MessageExt msg) {
if (msg instanceof MessageExtBrokerInner) {
return HookUtils.handleLmqQuota(BrokerController.this, (MessageExtBrokerInner) msg);
}
return null;
}
});

SendMessageBackHook sendMessageBackHook = new SendMessageBackHook() {
@Override
public boolean executeSendMessageBack(List<MessageExt> msgList, String brokerName, String brokerAddr) {
Expand Down Expand Up @@ -1091,6 +1133,11 @@ private void initialRequestPipeline() {
}
}

private void initLiteService() {
this.liteEventDispatcher.init();
this.liteLifecycleManager.init();
}

public void registerProcessor() {
RemotingServer remotingServer = remotingServerMap.get(TCP_REMOTING_SERVER);
RemotingServer fastRemotingServer = remotingServerMap.get(FAST_REMOTING_SERVER);
Expand Down Expand Up @@ -1125,6 +1172,7 @@ public void registerProcessor() {
* PopMessageProcessor
*/
remotingServer.registerProcessor(RequestCode.POP_MESSAGE, this.popMessageProcessor, this.pullMessageExecutor);
remotingServer.registerProcessor(RequestCode.POP_LITE_MESSAGE, this.popLiteMessageProcessor, this.pullMessageExecutor);

/**
* AckMessageProcessor
Expand Down Expand Up @@ -1176,10 +1224,12 @@ public void registerProcessor() {
remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientManageProcessor, this.heartbeatExecutor);
remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientManageProcessor, this.clientManageExecutor);
remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientManageProcessor, this.clientManageExecutor);
remotingServer.registerProcessor(RequestCode.LITE_SUBSCRIPTION_CTL, liteSubscriptionCtlProcessor, this.clientManageExecutor);

fastRemotingServer.registerProcessor(RequestCode.HEART_BEAT, clientManageProcessor, this.heartbeatExecutor);
fastRemotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientManageProcessor, this.clientManageExecutor);
fastRemotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientManageProcessor, this.clientManageExecutor);
fastRemotingServer.registerProcessor(RequestCode.LITE_SUBSCRIPTION_CTL, liteSubscriptionCtlProcessor, this.clientManageExecutor);

/**
* ConsumerManageProcessor
Expand Down Expand Up @@ -1207,6 +1257,23 @@ public void registerProcessor() {
remotingServer.registerProcessor(RequestCode.END_TRANSACTION, endTransactionProcessor, this.endTransactionExecutor);
fastRemotingServer.registerProcessor(RequestCode.END_TRANSACTION, endTransactionProcessor, this.endTransactionExecutor);

/*
* lite admin
*/
remotingServer.registerProcessor(RequestCode.GET_BROKER_LITE_INFO, liteManagerProcessor, adminBrokerExecutor);
remotingServer.registerProcessor(RequestCode.GET_PARENT_TOPIC_INFO, liteManagerProcessor, adminBrokerExecutor);
remotingServer.registerProcessor(RequestCode.GET_LITE_TOPIC_INFO, liteManagerProcessor, adminBrokerExecutor);
remotingServer.registerProcessor(RequestCode.GET_LITE_CLIENT_INFO, liteManagerProcessor, adminBrokerExecutor);
remotingServer.registerProcessor(RequestCode.GET_LITE_GROUP_INFO, liteManagerProcessor, adminBrokerExecutor);
remotingServer.registerProcessor(RequestCode.TRIGGER_LITE_DISPATCH, liteManagerProcessor, adminBrokerExecutor);

fastRemotingServer.registerProcessor(RequestCode.GET_BROKER_LITE_INFO, liteManagerProcessor, adminBrokerExecutor);
fastRemotingServer.registerProcessor(RequestCode.GET_PARENT_TOPIC_INFO, liteManagerProcessor, adminBrokerExecutor);
fastRemotingServer.registerProcessor(RequestCode.GET_LITE_TOPIC_INFO, liteManagerProcessor, adminBrokerExecutor);
fastRemotingServer.registerProcessor(RequestCode.GET_LITE_CLIENT_INFO, liteManagerProcessor, adminBrokerExecutor);
fastRemotingServer.registerProcessor(RequestCode.GET_LITE_GROUP_INFO, liteManagerProcessor, adminBrokerExecutor);
fastRemotingServer.registerProcessor(RequestCode.TRIGGER_LITE_DISPATCH, liteManagerProcessor, adminBrokerExecutor);

/*
* Default
*/
Expand Down Expand Up @@ -1389,6 +1456,10 @@ public PopMessageProcessor getPopMessageProcessor() {
return popMessageProcessor;
}

public PopLiteMessageProcessor getPopLiteMessageProcessor() {
return popLiteMessageProcessor;
}

public NotificationProcessor getNotificationProcessor() {
return notificationProcessor;
}
Expand All @@ -1409,6 +1480,14 @@ public ChangeInvisibleTimeProcessor getChangeInvisibleTimeProcessor() {
return changeInvisibleTimeProcessor;
}

public LiteSubscriptionRegistry getLiteSubscriptionRegistry() {
return liteSubscriptionRegistry;
}

public AbstractLiteLifecycleManager getLiteLifecycleManager() {
return liteLifecycleManager;
}

protected void shutdownBasicService() {

shutdown = true;
Expand Down Expand Up @@ -1450,6 +1529,13 @@ protected void shutdownBasicService() {
this.popMessageProcessor.getPopLongPollingService().shutdown();
}

if (this.popLiteMessageProcessor != null) {
this.popLiteMessageProcessor.stopPopLiteLockManager();
if (this.popLiteMessageProcessor.getPopLiteLongPollingService() != null) {
this.popLiteMessageProcessor.getPopLiteLongPollingService().shutdown();
}
}

if (this.popMessageProcessor.getQueueLockManager() != null) {
this.popMessageProcessor.getQueueLockManager().shutdown();
}
Expand Down Expand Up @@ -1600,6 +1686,18 @@ protected void shutdownBasicService() {
this.coldDataCgCtrService.shutdown();
}

if (this.liteEventDispatcher != null) {
this.liteEventDispatcher.shutdown();
}

if (this.liteLifecycleManager != null) {
this.liteLifecycleManager.shutdown();
}

if (this.liteSubscriptionRegistry != null) {
this.liteSubscriptionRegistry.shutdown();
}

shutdownScheduledExecutorService(this.syncBrokerMemberGroupExecutorService);
shutdownScheduledExecutorService(this.brokerHeartbeatExecutorService);

Expand Down Expand Up @@ -1732,6 +1830,13 @@ protected void startBasicService() throws Exception {
this.popMessageProcessor.getQueueLockManager().start();
}

if (this.popLiteMessageProcessor != null) {
this.popLiteMessageProcessor.startPopLiteLockManager();
if (this.popLiteMessageProcessor.getPopLiteLongPollingService() != null) {
this.popLiteMessageProcessor.getPopLiteLongPollingService().start();
}
}

if (this.ackMessageProcessor != null) {
if (brokerConfig.isPopConsumerFSServiceInit()) {
this.ackMessageProcessor.startPopReviveService();
Expand Down Expand Up @@ -1793,6 +1898,18 @@ protected void startBasicService() throws Exception {
if (this.coldDataCgCtrService != null) {
this.coldDataCgCtrService.start();
}

if (this.liteEventDispatcher != null) {
this.liteEventDispatcher.start();
}

if (this.liteLifecycleManager != null) {
this.liteLifecycleManager.start();
}

if (this.liteSubscriptionRegistry != null) {
this.liteSubscriptionRegistry.start();
}
}

public void start() throws Exception {
Expand Down Expand Up @@ -2654,4 +2771,8 @@ public ConfigContext getConfigContext() {
public void setConfigContext(ConfigContext configContext) {
this.configContext = configContext;
}

public LiteEventDispatcher getLiteEventDispatcher() {
return liteEventDispatcher;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.rocketmq.remoting.protocol.header.CheckTransactionStateRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.GetConsumerStatusRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.NotifyConsumerIdsChangedRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.NotifyUnsubscribeLiteRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.ResetOffsetRequestHeader;
import org.apache.rocketmq.store.exception.ConsumeQueueException;

Expand All @@ -60,6 +61,16 @@ public Broker2Client(BrokerController brokerController) {
this.brokerController = brokerController;
}

public void notifyUnsubscribeLite(Channel channel, NotifyUnsubscribeLiteRequestHeader requestHeader) {
RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.NOTIFY_UNSUBSCRIBE_LITE, requestHeader);
try {
this.brokerController.getRemotingServer().invokeOneway(channel, request, 100);
} catch (Exception e) {
log.error("notifyUnsubscribeLite failed. header={}, error={}", requestHeader, e.toString());
}
}

public void checkProducerTransactionState(
final String group,
final Channel channel,
Expand Down
Loading
Loading