Skip to content

Commit 223e3fe

Browse files
committed
Implementation for [RIP-83 Lite Topic: A New Message Model]
Change-Id: Ie83f5ad363eb7b7c48371a0f4b3eab9f04465b81
1 parent 204b251 commit 223e3fe

File tree

145 files changed

+11434
-225
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

145 files changed

+11434
-225
lines changed

auth/src/main/java/org/apache/rocketmq/auth/authorization/builder/DefaultAuthorizationContextBuilder.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import apache.rocketmq.v2.Subscription;
3232
import apache.rocketmq.v2.SubscriptionEntry;
3333
import apache.rocketmq.v2.TelemetryCommand;
34+
import apache.rocketmq.v2.SyncLiteSubscriptionRequest;
3435
import com.google.protobuf.GeneratedMessageV3;
3536
import io.grpc.Metadata;
3637
import io.netty.channel.ChannelHandlerContext;
@@ -124,6 +125,13 @@ public List<DefaultAuthorizationContext> build(Metadata metadata, GeneratedMessa
124125
}
125126
result = newSubContexts(metadata, request.getGroup(), request.getMessageQueue().getTopic());
126127
}
128+
if (message instanceof SyncLiteSubscriptionRequest) {
129+
SyncLiteSubscriptionRequest request = (SyncLiteSubscriptionRequest) message;
130+
if (request.getLiteTopicSetCount() <= 0) {
131+
return null;
132+
}
133+
result = newSubContexts(metadata, request.getGroup(), request.getTopic());
134+
}
127135
if (message instanceof AckMessageRequest) {
128136
AckMessageRequest request = (AckMessageRequest) message;
129137
result = newSubContexts(metadata, request.getGroup(), request.getTopic());

broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java

Lines changed: 124 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,14 @@
5050
import org.apache.rocketmq.broker.filter.CommitLogDispatcherCalcBitMap;
5151
import org.apache.rocketmq.broker.filter.ConsumerFilterManager;
5252
import org.apache.rocketmq.broker.latency.BrokerFastFailure;
53+
import org.apache.rocketmq.broker.lite.AbstractLiteLifecycleManager;
54+
import org.apache.rocketmq.broker.lite.LiteEventDispatcher;
55+
import org.apache.rocketmq.broker.lite.LiteSubscriptionRegistry;
56+
import org.apache.rocketmq.broker.lite.LiteSubscriptionRegistryImpl;
57+
import org.apache.rocketmq.broker.lite.LiteLifecycleManager;
58+
import org.apache.rocketmq.broker.lite.LiteSharding;
59+
import org.apache.rocketmq.broker.lite.LiteShardingImpl;
60+
import org.apache.rocketmq.broker.lite.RocksDBLiteLifecycleManager;
5361
import org.apache.rocketmq.broker.longpolling.LmqPullRequestHoldService;
5462
import org.apache.rocketmq.broker.longpolling.NotifyMessageArrivingListener;
5563
import org.apache.rocketmq.broker.longpolling.PullRequestHoldService;
@@ -69,10 +77,13 @@
6977
import org.apache.rocketmq.broker.processor.ClientManageProcessor;
7078
import org.apache.rocketmq.broker.processor.ConsumerManageProcessor;
7179
import org.apache.rocketmq.broker.processor.EndTransactionProcessor;
80+
import org.apache.rocketmq.broker.processor.LiteManagerProcessor;
81+
import org.apache.rocketmq.broker.processor.LiteSubscriptionCtlProcessor;
7282
import org.apache.rocketmq.broker.processor.NotificationProcessor;
7383
import org.apache.rocketmq.broker.processor.PeekMessageProcessor;
7484
import org.apache.rocketmq.broker.processor.PollingInfoProcessor;
7585
import org.apache.rocketmq.broker.processor.PopInflightMessageCounter;
86+
import org.apache.rocketmq.broker.processor.PopLiteMessageProcessor;
7687
import org.apache.rocketmq.broker.processor.PopMessageProcessor;
7788
import org.apache.rocketmq.broker.processor.PullMessageProcessor;
7889
import org.apache.rocketmq.broker.processor.QueryAssignmentProcessor;
@@ -204,12 +215,19 @@ public class BrokerController {
204215
protected final PullMessageProcessor pullMessageProcessor;
205216
protected final PeekMessageProcessor peekMessageProcessor;
206217
protected final PopMessageProcessor popMessageProcessor;
218+
protected final PopLiteMessageProcessor popLiteMessageProcessor;
207219
protected final AckMessageProcessor ackMessageProcessor;
208220
protected final ChangeInvisibleTimeProcessor changeInvisibleTimeProcessor;
209221
protected final NotificationProcessor notificationProcessor;
210222
protected final PollingInfoProcessor pollingInfoProcessor;
211223
protected final QueryAssignmentProcessor queryAssignmentProcessor;
212224
protected final ClientManageProcessor clientManageProcessor;
225+
protected final LiteSubscriptionCtlProcessor liteSubscriptionCtlProcessor;
226+
protected final LiteSharding liteSharding;
227+
protected final AbstractLiteLifecycleManager liteLifecycleManager;
228+
protected final LiteSubscriptionRegistry liteSubscriptionRegistry;
229+
protected final LiteEventDispatcher liteEventDispatcher;
230+
protected final LiteManagerProcessor liteManagerProcessor;
213231
protected final SendMessageProcessor sendMessageProcessor;
214232
protected final RecallMessageProcessor recallMessageProcessor;
215233
protected final ReplyMessageProcessor replyMessageProcessor;
@@ -371,18 +389,27 @@ public BrokerController(
371389
this.topicQueueMappingManager = new TopicQueueMappingManager(this);
372390
this.authenticationMetadataManager = AuthenticationFactory.getMetadataManager(this.authConfig);
373391
this.authorizationMetadataManager = AuthorizationFactory.getMetadataManager(this.authConfig);
392+
this.topicRouteInfoManager = new TopicRouteInfoManager(this);
393+
this.liteSharding = new LiteShardingImpl(this, this.topicRouteInfoManager);
394+
this.liteLifecycleManager = this.messageStoreConfig.isEnableRocksDBStore() ?
395+
new RocksDBLiteLifecycleManager(this, this.liteSharding) : new LiteLifecycleManager(this, this.liteSharding);
396+
this.liteSubscriptionRegistry = new LiteSubscriptionRegistryImpl(this, liteLifecycleManager);
397+
this.liteSubscriptionCtlProcessor = new LiteSubscriptionCtlProcessor(this, liteSubscriptionRegistry);
398+
this.liteEventDispatcher = new LiteEventDispatcher(this, this.liteSubscriptionRegistry, this.liteLifecycleManager);
399+
this.liteManagerProcessor = new LiteManagerProcessor(this, liteLifecycleManager, liteSharding);
374400
this.pullMessageProcessor = new PullMessageProcessor(this);
375401
this.peekMessageProcessor = new PeekMessageProcessor(this);
376402
this.pullRequestHoldService = messageStoreConfig.isEnableLmq() ? new LmqPullRequestHoldService(this) : new PullRequestHoldService(this);
377403
this.popMessageProcessor = new PopMessageProcessor(this);
404+
this.popLiteMessageProcessor = new PopLiteMessageProcessor(this, this.liteEventDispatcher);
378405
this.notificationProcessor = new NotificationProcessor(this);
379406
this.pollingInfoProcessor = new PollingInfoProcessor(this);
380407
this.ackMessageProcessor = new AckMessageProcessor(this);
381408
this.changeInvisibleTimeProcessor = new ChangeInvisibleTimeProcessor(this);
382409
this.sendMessageProcessor = new SendMessageProcessor(this);
383410
this.recallMessageProcessor = new RecallMessageProcessor(this);
384411
this.replyMessageProcessor = new ReplyMessageProcessor(this);
385-
this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService, this.popMessageProcessor, this.notificationProcessor);
412+
this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService, this.popMessageProcessor, this.notificationProcessor, this.liteEventDispatcher);
386413
this.consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this);
387414
this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener, this.brokerStatsManager, this.brokerConfig);
388415
this.producerManager = new ProducerManager(this.brokerStatsManager);
@@ -461,8 +488,6 @@ public boolean online(String instanceId, String group, String topic) {
461488

462489
this.escapeBridge = new EscapeBridge(this);
463490

464-
this.topicRouteInfoManager = new TopicRouteInfoManager(this);
465-
466491
if (this.brokerConfig.isEnableSlaveActingMaster() && !this.brokerConfig.isSkipPreOnline()) {
467492
this.brokerPreOnlineService = new BrokerPreOnlineService(this);
468493
}
@@ -934,6 +959,8 @@ public boolean recoverAndInitService() throws CloneNotSupportedException {
934959

935960
initialRequestPipeline();
936961

962+
initLiteService();
963+
937964
if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
938965
// Register a listener to reload SslContext
939966
try {
@@ -1029,6 +1056,21 @@ public PutMessageResult executeBeforePutMessage(MessageExt msg) {
10291056
}
10301057
});
10311058

1059+
putMessageHookList.add(new PutMessageHook() {
1060+
@Override
1061+
public String hookName() {
1062+
return "handleLmqQuota";
1063+
}
1064+
1065+
@Override
1066+
public PutMessageResult executeBeforePutMessage(MessageExt msg) {
1067+
if (msg instanceof MessageExtBrokerInner) {
1068+
return HookUtils.handleLmqQuota(BrokerController.this, (MessageExtBrokerInner) msg);
1069+
}
1070+
return null;
1071+
}
1072+
});
1073+
10321074
SendMessageBackHook sendMessageBackHook = new SendMessageBackHook() {
10331075
@Override
10341076
public boolean executeSendMessageBack(List<MessageExt> msgList, String brokerName, String brokerAddr) {
@@ -1091,6 +1133,11 @@ private void initialRequestPipeline() {
10911133
}
10921134
}
10931135

1136+
private void initLiteService() {
1137+
this.liteEventDispatcher.init();
1138+
this.liteLifecycleManager.init();
1139+
}
1140+
10941141
public void registerProcessor() {
10951142
RemotingServer remotingServer = remotingServerMap.get(TCP_REMOTING_SERVER);
10961143
RemotingServer fastRemotingServer = remotingServerMap.get(FAST_REMOTING_SERVER);
@@ -1125,6 +1172,7 @@ public void registerProcessor() {
11251172
* PopMessageProcessor
11261173
*/
11271174
remotingServer.registerProcessor(RequestCode.POP_MESSAGE, this.popMessageProcessor, this.pullMessageExecutor);
1175+
remotingServer.registerProcessor(RequestCode.POP_LITE_MESSAGE, this.popLiteMessageProcessor, this.pullMessageExecutor);
11281176

11291177
/**
11301178
* AckMessageProcessor
@@ -1176,10 +1224,12 @@ public void registerProcessor() {
11761224
remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientManageProcessor, this.heartbeatExecutor);
11771225
remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientManageProcessor, this.clientManageExecutor);
11781226
remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientManageProcessor, this.clientManageExecutor);
1227+
remotingServer.registerProcessor(RequestCode.LITE_SUBSCRIPTION_CTL, liteSubscriptionCtlProcessor, this.clientManageExecutor);
11791228

11801229
fastRemotingServer.registerProcessor(RequestCode.HEART_BEAT, clientManageProcessor, this.heartbeatExecutor);
11811230
fastRemotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientManageProcessor, this.clientManageExecutor);
11821231
fastRemotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientManageProcessor, this.clientManageExecutor);
1232+
fastRemotingServer.registerProcessor(RequestCode.LITE_SUBSCRIPTION_CTL, liteSubscriptionCtlProcessor, this.clientManageExecutor);
11831233

11841234
/**
11851235
* ConsumerManageProcessor
@@ -1207,6 +1257,23 @@ public void registerProcessor() {
12071257
remotingServer.registerProcessor(RequestCode.END_TRANSACTION, endTransactionProcessor, this.endTransactionExecutor);
12081258
fastRemotingServer.registerProcessor(RequestCode.END_TRANSACTION, endTransactionProcessor, this.endTransactionExecutor);
12091259

1260+
/*
1261+
* lite admin
1262+
*/
1263+
remotingServer.registerProcessor(RequestCode.GET_BROKER_LITE_INFO, liteManagerProcessor, adminBrokerExecutor);
1264+
remotingServer.registerProcessor(RequestCode.GET_PARENT_TOPIC_INFO, liteManagerProcessor, adminBrokerExecutor);
1265+
remotingServer.registerProcessor(RequestCode.GET_LITE_TOPIC_INFO, liteManagerProcessor, adminBrokerExecutor);
1266+
remotingServer.registerProcessor(RequestCode.GET_LITE_CLIENT_INFO, liteManagerProcessor, adminBrokerExecutor);
1267+
remotingServer.registerProcessor(RequestCode.GET_LITE_GROUP_INFO, liteManagerProcessor, adminBrokerExecutor);
1268+
remotingServer.registerProcessor(RequestCode.TRIGGER_LITE_DISPATCH, liteManagerProcessor, adminBrokerExecutor);
1269+
1270+
fastRemotingServer.registerProcessor(RequestCode.GET_BROKER_LITE_INFO, liteManagerProcessor, adminBrokerExecutor);
1271+
fastRemotingServer.registerProcessor(RequestCode.GET_PARENT_TOPIC_INFO, liteManagerProcessor, adminBrokerExecutor);
1272+
fastRemotingServer.registerProcessor(RequestCode.GET_LITE_TOPIC_INFO, liteManagerProcessor, adminBrokerExecutor);
1273+
fastRemotingServer.registerProcessor(RequestCode.GET_LITE_CLIENT_INFO, liteManagerProcessor, adminBrokerExecutor);
1274+
fastRemotingServer.registerProcessor(RequestCode.GET_LITE_GROUP_INFO, liteManagerProcessor, adminBrokerExecutor);
1275+
fastRemotingServer.registerProcessor(RequestCode.TRIGGER_LITE_DISPATCH, liteManagerProcessor, adminBrokerExecutor);
1276+
12101277
/*
12111278
* Default
12121279
*/
@@ -1389,6 +1456,10 @@ public PopMessageProcessor getPopMessageProcessor() {
13891456
return popMessageProcessor;
13901457
}
13911458

1459+
public PopLiteMessageProcessor getPopLiteMessageProcessor() {
1460+
return popLiteMessageProcessor;
1461+
}
1462+
13921463
public NotificationProcessor getNotificationProcessor() {
13931464
return notificationProcessor;
13941465
}
@@ -1409,6 +1480,14 @@ public ChangeInvisibleTimeProcessor getChangeInvisibleTimeProcessor() {
14091480
return changeInvisibleTimeProcessor;
14101481
}
14111482

1483+
public LiteSubscriptionRegistry getLiteSubscriptionRegistry() {
1484+
return liteSubscriptionRegistry;
1485+
}
1486+
1487+
public AbstractLiteLifecycleManager getLiteLifecycleManager() {
1488+
return liteLifecycleManager;
1489+
}
1490+
14121491
protected void shutdownBasicService() {
14131492

14141493
shutdown = true;
@@ -1450,6 +1529,13 @@ protected void shutdownBasicService() {
14501529
this.popMessageProcessor.getPopLongPollingService().shutdown();
14511530
}
14521531

1532+
if (this.popLiteMessageProcessor != null) {
1533+
this.popLiteMessageProcessor.stopPopLiteLockManager();
1534+
if (this.popLiteMessageProcessor.getPopLiteLongPollingService() != null) {
1535+
this.popLiteMessageProcessor.getPopLiteLongPollingService().shutdown();
1536+
}
1537+
}
1538+
14531539
if (this.popMessageProcessor.getQueueLockManager() != null) {
14541540
this.popMessageProcessor.getQueueLockManager().shutdown();
14551541
}
@@ -1600,6 +1686,18 @@ protected void shutdownBasicService() {
16001686
this.coldDataCgCtrService.shutdown();
16011687
}
16021688

1689+
if (this.liteEventDispatcher != null) {
1690+
this.liteEventDispatcher.shutdown();
1691+
}
1692+
1693+
if (this.liteLifecycleManager != null) {
1694+
this.liteLifecycleManager.shutdown();
1695+
}
1696+
1697+
if (this.liteSubscriptionRegistry != null) {
1698+
this.liteSubscriptionRegistry.shutdown();
1699+
}
1700+
16031701
shutdownScheduledExecutorService(this.syncBrokerMemberGroupExecutorService);
16041702
shutdownScheduledExecutorService(this.brokerHeartbeatExecutorService);
16051703

@@ -1732,6 +1830,13 @@ protected void startBasicService() throws Exception {
17321830
this.popMessageProcessor.getQueueLockManager().start();
17331831
}
17341832

1833+
if (this.popLiteMessageProcessor != null) {
1834+
this.popLiteMessageProcessor.startPopLiteLockManager();
1835+
if (this.popLiteMessageProcessor.getPopLiteLongPollingService() != null) {
1836+
this.popLiteMessageProcessor.getPopLiteLongPollingService().start();
1837+
}
1838+
}
1839+
17351840
if (this.ackMessageProcessor != null) {
17361841
if (brokerConfig.isPopConsumerFSServiceInit()) {
17371842
this.ackMessageProcessor.startPopReviveService();
@@ -1793,6 +1898,18 @@ protected void startBasicService() throws Exception {
17931898
if (this.coldDataCgCtrService != null) {
17941899
this.coldDataCgCtrService.start();
17951900
}
1901+
1902+
if (this.liteEventDispatcher != null) {
1903+
this.liteEventDispatcher.start();
1904+
}
1905+
1906+
if (this.liteLifecycleManager != null) {
1907+
this.liteLifecycleManager.start();
1908+
}
1909+
1910+
if (this.liteSubscriptionRegistry != null) {
1911+
this.liteSubscriptionRegistry.start();
1912+
}
17961913
}
17971914

17981915
public void start() throws Exception {
@@ -2654,4 +2771,8 @@ public ConfigContext getConfigContext() {
26542771
public void setConfigContext(ConfigContext configContext) {
26552772
this.configContext = configContext;
26562773
}
2774+
2775+
public LiteEventDispatcher getLiteEventDispatcher() {
2776+
return liteEventDispatcher;
2777+
}
26572778
}

broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.apache.rocketmq.remoting.protocol.header.CheckTransactionStateRequestHeader;
5050
import org.apache.rocketmq.remoting.protocol.header.GetConsumerStatusRequestHeader;
5151
import org.apache.rocketmq.remoting.protocol.header.NotifyConsumerIdsChangedRequestHeader;
52+
import org.apache.rocketmq.remoting.protocol.header.NotifyUnsubscribeLiteRequestHeader;
5253
import org.apache.rocketmq.remoting.protocol.header.ResetOffsetRequestHeader;
5354
import org.apache.rocketmq.store.exception.ConsumeQueueException;
5455

@@ -60,6 +61,16 @@ public Broker2Client(BrokerController brokerController) {
6061
this.brokerController = brokerController;
6162
}
6263

64+
public void notifyUnsubscribeLite(Channel channel, NotifyUnsubscribeLiteRequestHeader requestHeader) {
65+
RemotingCommand request =
66+
RemotingCommand.createRequestCommand(RequestCode.NOTIFY_UNSUBSCRIBE_LITE, requestHeader);
67+
try {
68+
this.brokerController.getRemotingServer().invokeOneway(channel, request, 100);
69+
} catch (Exception e) {
70+
log.error("notifyUnsubscribeLite failed. header={}, error={}", requestHeader, e.toString());
71+
}
72+
}
73+
6374
public void checkProducerTransactionState(
6475
final String group,
6576
final Channel channel,

0 commit comments

Comments
 (0)