Skip to content

Commit 17bc455

Browse files
committed
Implementation for [RIP-83 Lite Topic: A New Message Model]
Change-Id: I102d2bf7b5d573f41416333135074cde1a9f1fec
1 parent 223e3fe commit 17bc455

33 files changed

+2882
-2
lines changed
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.rocketmq.broker.lite;
19+
20+
public interface LiteSharding {
21+
22+
String shardingByLmqName(String parentTopic, String lmqName);
23+
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.rocketmq.broker.lite;
19+
20+
import com.google.common.hash.Hashing;
21+
import org.apache.commons.collections.CollectionUtils;
22+
import org.apache.commons.lang3.StringUtils;
23+
import org.apache.rocketmq.broker.BrokerController;
24+
import org.apache.rocketmq.broker.topic.TopicRouteInfoManager;
25+
import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
26+
import org.apache.rocketmq.common.lite.LiteUtil;
27+
import org.apache.rocketmq.common.message.MessageQueue;
28+
29+
import java.util.List;
30+
31+
public class LiteShardingImpl implements LiteSharding {
32+
33+
private final BrokerController brokerController;
34+
private final TopicRouteInfoManager topicRouteInfoManager;
35+
36+
public LiteShardingImpl(BrokerController brokerController, TopicRouteInfoManager topicRouteInfoManager) {
37+
this.brokerController = brokerController;
38+
this.topicRouteInfoManager = topicRouteInfoManager;
39+
}
40+
41+
@Override
42+
public String shardingByLmqName(String parentTopic, String lmqName) {
43+
TopicPublishInfo topicPublishInfo = topicRouteInfoManager.tryToFindTopicPublishInfo(parentTopic);
44+
if (topicPublishInfo == null) {
45+
// if topic not exist, return current broker
46+
return brokerController.getBrokerConfig().getBrokerName();
47+
}
48+
List<MessageQueue> writeQueues = topicPublishInfo.getMessageQueueList();
49+
if (CollectionUtils.isEmpty(writeQueues)) {
50+
return brokerController.getBrokerConfig().getBrokerName();
51+
}
52+
String liteTopic = LiteUtil.getLiteTopic(lmqName);
53+
if (StringUtils.isEmpty(liteTopic)) {
54+
return brokerController.getBrokerConfig().getBrokerName();
55+
}
56+
int bucket = Hashing.consistentHash(liteTopic.hashCode(), writeQueues.size());
57+
MessageQueue targetQueue = writeQueues.get(bucket);
58+
return targetQueue.getBrokerName();
59+
}
60+
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.rocketmq.broker.lite;
19+
20+
import io.netty.channel.Channel;
21+
22+
import java.util.List;
23+
import java.util.Set;
24+
import org.apache.rocketmq.common.entity.ClientGroup;
25+
import org.apache.rocketmq.common.lite.LiteSubscription;
26+
27+
public interface LiteSubscriptionRegistry {
28+
29+
void updateClientChannel(String clientId, Channel channel);
30+
31+
LiteSubscription getLiteSubscription(String clientId);
32+
33+
int getActiveSubscriptionNum();
34+
35+
void addPartialSubscription(String clientId, String group, String topic, Set<String> lmqNameSet, boolean exclusive);
36+
37+
void removePartialSubscription(String clientId, String group, String topic, Set<String> lmqNameSet);
38+
39+
void addCompleteSubscription(String clientId, String group, String topic, Set<String> newLmqNameSet, long version);
40+
41+
void removeCompleteSubscription(String clientId);
42+
43+
void addListener(LiteCtlListener listener);
44+
45+
Set<ClientGroup> getSubscriber(String lmqName);
46+
47+
List<String> getAllClientIdByGroup(String group);
48+
49+
void cleanSubscription(String lmqName, boolean notifyClient);
50+
51+
void start();
52+
53+
void shutdown();
54+
}

0 commit comments

Comments
 (0)