Skip to content

Commit 0a1597d

Browse files
authored
Merge pull request #1750 from chengyouling/develop-retry
RocketMq retry-topic support SQL filtering messages
2 parents d947141 + 06e8c45 commit 0a1597d

File tree

4 files changed

+83
-24
lines changed

4 files changed

+83
-24
lines changed

sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqSchedulerRebuildSubscriptionInterceptor.java

Lines changed: 75 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package io.sermant.mq.grayscale.rocketmq.interceptor;
1818

19+
import io.sermant.core.common.LoggerFactory;
1920
import io.sermant.core.plugin.agent.entity.ExecuteContext;
2021
import io.sermant.core.utils.StringUtils;
2122
import io.sermant.mq.grayscale.rocketmq.service.RocketMqConsumerGroupAutoCheck;
@@ -26,7 +27,12 @@
2627
import org.apache.rocketmq.client.impl.consumer.RebalanceImpl;
2728
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
2829

30+
import java.util.ArrayList;
31+
import java.util.Collection;
32+
import java.util.List;
33+
import java.util.Locale;
2934
import java.util.concurrent.ConcurrentMap;
35+
import java.util.logging.Logger;
3036

3137
/**
3238
* TAG/SQL92 query message statement interceptor
@@ -35,46 +41,95 @@
3541
* @since 2024-05-27
3642
**/
3743
public class RocketMqSchedulerRebuildSubscriptionInterceptor extends RocketMqAbstractInterceptor {
38-
private final Object lock = new Object();
44+
private static final Logger LOGGER = LoggerFactory.getLogger();
45+
46+
private static final String RETRY_TOPIC_FLAG = "%RETRY%";
3947

4048
@Override
4149
public ExecuteContext doAfter(ExecuteContext context) throws Exception {
42-
ConcurrentMap<String, Object> map = (ConcurrentMap<String, Object>) context.getResult();
50+
ConcurrentMap<String, Object> subscriptionInner = (ConcurrentMap<String, Object>) context.getResult();
4351
RebalanceImpl balance = (RebalanceImpl) context.getObject();
4452
if (balance.getConsumerGroup() == null) {
4553
return context;
4654
}
47-
for (Object subscriptionData : map.values()) {
55+
List<Object> retryTopicSubscriptions = new ArrayList<>();
56+
List<Object> originTopicSubscriptions = new ArrayList<>();
57+
buildTopicSubscriptions(subscriptionInner, retryTopicSubscriptions, originTopicSubscriptions);
58+
Object changedOriginSubscription = null;
59+
for (Object subscriptionData : originTopicSubscriptions) {
4860
if (RocketMqSubscriptionDataUtils
4961
.isExpressionTypeInaccurate(RocketMqReflectUtils.getExpressionType(subscriptionData))) {
5062
continue;
5163
}
52-
buildSql92SubscriptionData(subscriptionData, balance);
64+
String topic = RocketMqReflectUtils.getTopic(subscriptionData);
65+
if (!RocketMqSubscriptionDataUtils.getGrayTagChangeFlag(topic, balance)) {
66+
continue;
67+
}
68+
buildSql92SubscriptionData(subscriptionData, balance, topic);
69+
70+
// sql92 expression is associated only with the consumer group. Therefore,
71+
// using any of the changed subscription build retry-topic sql92 expression.
72+
changedOriginSubscription = subscriptionData;
73+
}
74+
if (changedOriginSubscription != null) {
75+
// update %RETRY%+GROUP substring with sql92
76+
updateRetrySubscriptionData(changedOriginSubscription, retryTopicSubscriptions);
5377
}
5478
return context;
5579
}
5680

57-
private void buildSql92SubscriptionData(Object subscriptionData, RebalanceImpl balance) {
58-
synchronized (lock) {
59-
String topic = RocketMqReflectUtils.getTopic(subscriptionData);
60-
if (!RocketMqSubscriptionDataUtils.getGrayTagChangeFlag(topic, balance)) {
61-
return;
62-
}
63-
String consumerGroup = balance.getConsumerGroup();
64-
MQClientInstance instance = balance.getmQClientFactory();
65-
if (StringUtils.isEmpty(RocketMqGrayscaleConfigUtils.getGrayGroupTag())) {
66-
RocketMqConsumerGroupAutoCheck.setMqClientInstance(topic, consumerGroup, instance);
67-
RocketMqConsumerGroupAutoCheck.syncUpdateCacheGrayTags();
68-
RocketMqConsumerGroupAutoCheck.startSchedulerCheckGroupTask();
81+
private void buildTopicSubscriptions(ConcurrentMap<String, Object> subscriptionInner,
82+
List<Object> retryTopicSubscriptions, List<Object> originTopicSubscriptions) {
83+
// If one consumer subscribe many topic when create, subscriptionInner data structure is:
84+
// many origin topic subscriptionData, as topic1:subscriptionData1 topic2:subscriptionData2
85+
// Currently, there is only one retry topic subscriptionData, because the retry topic is associated with
86+
// consumer group, so that one consumer group has only one rebalancing task, still using array storage
87+
// for convenience
88+
for (Object subscriptionData : subscriptionInner.values()) {
89+
String tempTopic = RocketMqReflectUtils.getTopic(subscriptionData);
90+
if (tempTopic.contains(RETRY_TOPIC_FLAG)) {
91+
retryTopicSubscriptions.add(subscriptionData);
92+
} else {
93+
originTopicSubscriptions.add(subscriptionData);
6994
}
70-
String namesrvAddr = balance.getmQClientFactory().getClientConfig().getNamesrvAddr();
71-
resetsSql92SubscriptionData(topic, consumerGroup, subscriptionData, namesrvAddr);
95+
}
96+
}
7297

73-
// update change flag when finished build substr
74-
RocketMqSubscriptionDataUtils.resetTagChangeMap(namesrvAddr, topic, consumerGroup, false);
98+
private void updateRetrySubscriptionData(Object subscriptionData, Collection<Object> retryTopicSubscriptions) {
99+
for (Object subData : retryTopicSubscriptions) {
100+
RocketMqReflectUtils.getTagsSet(subData).clear();
101+
RocketMqReflectUtils.getCodeSet(subData).clear();
102+
RocketMqReflectUtils.setSubscriptionData(subData, "setExpressionType",
103+
new Class[]{String.class}, new Object[]{"SQL92"});
104+
RocketMqReflectUtils.setSubscriptionData(subData, "setSubVersion",
105+
new Class[]{long.class}, new Object[]{System.currentTimeMillis()});
106+
String originSubData = RocketMqReflectUtils.getSubString(subData);
107+
String sqlSubstr = RocketMqReflectUtils.getSubString(subscriptionData);
108+
RocketMqReflectUtils.setSubscriptionData(subData, "setSubString",
109+
new Class[]{String.class}, new Object[]{sqlSubstr});
110+
String originTopic = RocketMqReflectUtils.getTopic(subscriptionData);
111+
String retryTopic = RocketMqReflectUtils.getTopic(subData);
112+
LOGGER.warning(String.format(Locale.ENGLISH, "update retry topic [%s] SQL92 expression, "
113+
+ "originTopic: [%s], originSubStr: [%s], newSubStr: [%s]", retryTopic, originTopic,
114+
originSubData, sqlSubstr));
75115
}
76116
}
77117

118+
private void buildSql92SubscriptionData(Object subscriptionData, RebalanceImpl balance, String topic) {
119+
String consumerGroup = balance.getConsumerGroup();
120+
MQClientInstance instance = balance.getmQClientFactory();
121+
if (StringUtils.isEmpty(RocketMqGrayscaleConfigUtils.getGrayGroupTag())) {
122+
RocketMqConsumerGroupAutoCheck.setMqClientInstance(topic, consumerGroup, instance);
123+
RocketMqConsumerGroupAutoCheck.syncUpdateCacheGrayTags();
124+
RocketMqConsumerGroupAutoCheck.startSchedulerCheckGroupTask();
125+
}
126+
String namesrvAddr = balance.getmQClientFactory().getClientConfig().getNamesrvAddr();
127+
resetsSql92SubscriptionData(topic, consumerGroup, subscriptionData, namesrvAddr);
128+
129+
// update change flag when finished build substr
130+
RocketMqSubscriptionDataUtils.resetTagChangeMap(namesrvAddr, topic, consumerGroup, false);
131+
}
132+
78133
private void resetsSql92SubscriptionData(String topic, String consumerGroup, Object subscriptionData,
79134
String namesrvAddr) {
80135
String subscribeScope = RocketMqSubscriptionDataUtils.buildSubscribeScope(topic, consumerGroup,

sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/utils/RocketMqReflectUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ public static String getExpressionType(Object subscriptionData) {
113113
* @param paramsType paramsType
114114
* @param params params
115115
*/
116-
public static void setSubscriptionDatae(Object subscriptionData, String methodName, Class<?>[] paramsType,
116+
public static void setSubscriptionData(Object subscriptionData, String methodName, Class<?>[] paramsType,
117117
Object[] params) {
118118
ReflectUtils.invokeMethod(subscriptionData, methodName, paramsType, params);
119119
}

sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/utils/RocketMqSubscriptionDataUtils.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -285,14 +285,14 @@ public static void resetsSql92SubscriptionData(Object subscriptionData, String s
285285
newSubStr = SELECT_ALL_MESSAGE_SQL;
286286
}
287287
if (EXPRESSION_TYPE_TAG.equals(RocketMqReflectUtils.getExpressionType(subscriptionData))) {
288-
RocketMqReflectUtils.setSubscriptionDatae(subscriptionData, "setExpressionType",
288+
RocketMqReflectUtils.setSubscriptionData(subscriptionData, "setExpressionType",
289289
new Class[]{String.class}, new Object[]{"SQL92"});
290290
RocketMqReflectUtils.getTagsSet(subscriptionData).clear();
291291
RocketMqReflectUtils.getCodeSet(subscriptionData).clear();
292292
}
293-
RocketMqReflectUtils.setSubscriptionDatae(subscriptionData, "setSubString",
293+
RocketMqReflectUtils.setSubscriptionData(subscriptionData, "setSubString",
294294
new Class[]{String.class}, new Object[]{newSubStr});
295-
RocketMqReflectUtils.setSubscriptionDatae(subscriptionData, "setSubVersion",
295+
RocketMqReflectUtils.setSubscriptionData(subscriptionData, "setSubVersion",
296296
new Class[]{long.class}, new Object[]{System.currentTimeMillis()});
297297
LOGGER.warning(String.format(Locale.ENGLISH, "update [key: %s] SQL92 subscriptionData, originSubStr: "
298298
+ "[%s], newSubStr: [%s]", subscribeScope, originSubData, newSubStr));

sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/test/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqSchedulerRebuildSubscriptionInterceptorTest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,12 +49,16 @@ public void testDoAfter() throws Exception {
4949
ExecuteContext context = ExecuteContext.forMemberMethod(rebalanced, null, null, null, null);
5050
SubscriptionData subscriptionData = new SubscriptionData();
5151
subscriptionData.setTopic("TOPIC_TEST");
52+
SubscriptionData retrySubscriptionData = new SubscriptionData();
53+
retrySubscriptionData.setTopic("%RETRY%consumerGroup");
5254
ConcurrentMap<String, SubscriptionData> map = new ConcurrentHashMap<>();
5355
map.put("test", subscriptionData);
56+
map.put("testRetry", retrySubscriptionData);
5457
context.afterMethod(map, null);
5558
RocketMqSchedulerRebuildSubscriptionInterceptor interceptor
5659
= new RocketMqSchedulerRebuildSubscriptionInterceptor();
5760
interceptor.doAfter(context);
5861
Assert.assertEquals("(x_lane_canary in ('gray'))", subscriptionData.getSubString());
62+
Assert.assertEquals("(x_lane_canary in ('gray'))", retrySubscriptionData.getSubString());
5963
}
6064
}

0 commit comments

Comments
 (0)