@@ -47,35 +47,37 @@ public class RocketMqSchedulerRebuildSubscriptionInterceptor extends RocketMqAbs
4747
4848 @ Override
4949 public ExecuteContext doAfter (ExecuteContext context ) throws Exception {
50- ConcurrentMap <String , Object > subscriptionInner = (ConcurrentMap <String , Object >) context .getResult ();
51- RebalanceImpl balance = (RebalanceImpl ) context .getObject ();
52- if (balance .getConsumerGroup () == null ) {
53- return context ;
54- }
55- List <Object > retryTopicSubscriptions = new ArrayList <>();
56- List <Object > originTopicSubscriptions = new ArrayList <>();
57- buildTopicSubscriptions (subscriptionInner , retryTopicSubscriptions , originTopicSubscriptions );
58- Object changedOriginSubscription = null ;
59- for (Object subscriptionData : originTopicSubscriptions ) {
60- if (RocketMqSubscriptionDataUtils
50+ synchronized (RocketMqSchedulerRebuildSubscriptionInterceptor .class ) {
51+ ConcurrentMap <String , Object > subscriptionInner = (ConcurrentMap <String , Object >) context .getResult ();
52+ RebalanceImpl rebalance = (RebalanceImpl ) context .getObject ();
53+ if (rebalance .getConsumerGroup () == null ) {
54+ return context ;
55+ }
56+ List <Object > retryTopicSubscriptions = new ArrayList <>();
57+ List <Object > originTopicSubscriptions = new ArrayList <>();
58+ buildTopicSubscriptions (subscriptionInner , retryTopicSubscriptions , originTopicSubscriptions );
59+ Object changedOriginSubscription = null ;
60+ for (Object subscriptionData : originTopicSubscriptions ) {
61+ if (RocketMqSubscriptionDataUtils
6162 .isExpressionTypeInaccurate (RocketMqReflectUtils .getExpressionType (subscriptionData ))) {
62- continue ;
63+ continue ;
64+ }
65+ String topic = RocketMqReflectUtils .getTopic (subscriptionData );
66+ if (!RocketMqSubscriptionDataUtils .getGrayTagChangeFlag (topic , rebalance )) {
67+ continue ;
68+ }
69+ buildSql92SubscriptionData (subscriptionData , rebalance , topic );
70+
71+ // sql92 expression is associated only with the consumer group. Therefore,
72+ // using any of the changed subscription build retry-topic sql92 expression.
73+ changedOriginSubscription = subscriptionData ;
6374 }
64- String topic = RocketMqReflectUtils . getTopic ( subscriptionData );
65- if (! RocketMqSubscriptionDataUtils . getGrayTagChangeFlag ( topic , balance )) {
66- continue ;
75+ if ( changedOriginSubscription != null ) {
76+ // update %RETRY%+GROUP substring with sql92
77+ updateRetrySubscriptionData ( changedOriginSubscription , retryTopicSubscriptions ) ;
6778 }
68- buildSql92SubscriptionData (subscriptionData , balance , topic );
69-
70- // sql92 expression is associated only with the consumer group. Therefore,
71- // using any of the changed subscription build retry-topic sql92 expression.
72- changedOriginSubscription = subscriptionData ;
73- }
74- if (changedOriginSubscription != null ) {
75- // update %RETRY%+GROUP substring with sql92
76- updateRetrySubscriptionData (changedOriginSubscription , retryTopicSubscriptions );
79+ return context ;
7780 }
78- return context ;
7981 }
8082
8183 private void buildTopicSubscriptions (ConcurrentMap <String , Object > subscriptionInner ,
@@ -115,15 +117,15 @@ private void updateRetrySubscriptionData(Object subscriptionData, Collection<Obj
115117 }
116118 }
117119
118- private void buildSql92SubscriptionData (Object subscriptionData , RebalanceImpl balance , String topic ) {
119- String consumerGroup = balance .getConsumerGroup ();
120- MQClientInstance instance = balance .getmQClientFactory ();
120+ private void buildSql92SubscriptionData (Object subscriptionData , RebalanceImpl rebalance , String topic ) {
121+ String consumerGroup = rebalance .getConsumerGroup ();
122+ MQClientInstance instance = rebalance .getmQClientFactory ();
121123 if (StringUtils .isEmpty (RocketMqGrayscaleConfigUtils .getGrayGroupTag ())) {
122124 RocketMqConsumerGroupAutoCheck .setMqClientInstance (topic , consumerGroup , instance );
123125 RocketMqConsumerGroupAutoCheck .syncUpdateCacheGrayTags ();
124126 RocketMqConsumerGroupAutoCheck .startSchedulerCheckGroupTask ();
125127 }
126- String namesrvAddr = balance .getmQClientFactory ().getClientConfig ().getNamesrvAddr ();
128+ String namesrvAddr = rebalance .getmQClientFactory ().getClientConfig ().getNamesrvAddr ();
127129 resetsSql92SubscriptionData (topic , consumerGroup , subscriptionData , namesrvAddr );
128130
129131 // update change flag when finished build substr
0 commit comments