1818package org .apache .rocketmq .dashboard .service .impl ;
1919
2020import com .google .common .base .Throwables ;
21+ import com .google .common .collect .ImmutableMap ;
2122import com .google .common .collect .Lists ;
2223import com .google .common .collect .Sets ;
23- import java .util .stream .Collectors ;
2424import org .apache .commons .lang3 .StringUtils ;
2525import org .apache .rocketmq .acl .common .AclClientRPCHook ;
2626import org .apache .rocketmq .acl .common .SessionCredentials ;
2727import org .apache .rocketmq .client .producer .DefaultMQProducer ;
2828import org .apache .rocketmq .client .producer .SendResult ;
29+ import org .apache .rocketmq .client .producer .TransactionListener ;
30+ import org .apache .rocketmq .client .producer .TransactionMQProducer ;
31+ import org .apache .rocketmq .client .producer .LocalTransactionState ;
2932import org .apache .rocketmq .client .trace .TraceContext ;
3033import org .apache .rocketmq .client .trace .TraceDispatcher ;
3134import org .apache .rocketmq .common .MixAll ;
3235import org .apache .rocketmq .common .TopicConfig ;
33- import org .apache .rocketmq .remoting . protocol . admin . TopicStatsTable ;
36+ import org .apache .rocketmq .common . attribute . TopicMessageType ;
3437import org .apache .rocketmq .common .message .Message ;
35- import org .apache .rocketmq .remoting .protocol .body .ClusterInfo ;
36- import org .apache .rocketmq .remoting .protocol .body .GroupList ;
37- import org .apache .rocketmq .remoting .protocol .body .TopicList ;
38- import org .apache .rocketmq .remoting .protocol .route .BrokerData ;
39- import org .apache .rocketmq .remoting .protocol .route .TopicRouteData ;
38+ import org .apache .rocketmq .common .message .MessageExt ;
4039import org .apache .rocketmq .common .topic .TopicValidator ;
4140import org .apache .rocketmq .dashboard .config .RMQConfigure ;
4241import org .apache .rocketmq .dashboard .model .request .SendTopicMessageRequest ;
4342import org .apache .rocketmq .dashboard .model .request .TopicConfigInfo ;
4443import org .apache .rocketmq .dashboard .service .AbstractCommonService ;
4544import org .apache .rocketmq .dashboard .service .TopicService ;
4645import org .apache .rocketmq .remoting .RPCHook ;
46+ import org .apache .rocketmq .remoting .protocol .admin .TopicStatsTable ;
47+ import org .apache .rocketmq .remoting .protocol .body .ClusterInfo ;
48+ import org .apache .rocketmq .remoting .protocol .body .GroupList ;
49+ import org .apache .rocketmq .remoting .protocol .body .TopicList ;
50+ import org .apache .rocketmq .remoting .protocol .route .BrokerData ;
51+ import org .apache .rocketmq .remoting .protocol .route .TopicRouteData ;
4752import org .apache .rocketmq .tools .command .CommandUtil ;
4853import org .joor .Reflect ;
4954import org .springframework .beans .BeanUtils ;
5560import java .util .List ;
5661import java .util .Set ;
5762import java .util .concurrent .ArrayBlockingQueue ;
63+ import java .util .concurrent .ConcurrentHashMap ;
64+ import java .util .concurrent .atomic .AtomicInteger ;
65+ import java .util .stream .Collectors ;
66+
67+ import static org .apache .rocketmq .common .TopicAttributes .TOPIC_MESSAGE_TYPE_ATTRIBUTE ;
5868
5969@ Service
6070public class TopicServiceImpl extends AbstractCommonService implements TopicService {
@@ -68,18 +78,18 @@ public TopicList fetchAllTopicList(boolean skipSysProcess, boolean skipRetryAndD
6878 TopicList allTopics = mqAdminExt .fetchAllTopicList ();
6979 TopicList sysTopics = getSystemTopicList ();
7080 Set <String > topics =
71- allTopics .getTopicList ().stream ().map (topic -> {
72- if (!skipSysProcess && sysTopics .getTopicList ().contains (topic )) {
73- topic = String .format ("%s%s" , "%SYS%" , topic );
74- }
75- return topic ;
76- }).filter (topic -> {
77- if (skipRetryAndDlq ) {
78- return !(topic .startsWith (MixAll .RETRY_GROUP_TOPIC_PREFIX )
79- || topic .startsWith (MixAll .DLQ_GROUP_TOPIC_PREFIX ));
80- }
81- return true ;
82- }).collect (Collectors .toSet ());
81+ allTopics .getTopicList ().stream ().map (topic -> {
82+ if (!skipSysProcess && sysTopics .getTopicList ().contains (topic )) {
83+ topic = String .format ("%s%s" , "%SYS%" , topic );
84+ }
85+ return topic ;
86+ }).filter (topic -> {
87+ if (skipRetryAndDlq ) {
88+ return !(topic .startsWith (MixAll .RETRY_GROUP_TOPIC_PREFIX )
89+ || topic .startsWith (MixAll .DLQ_GROUP_TOPIC_PREFIX ));
90+ }
91+ return true ;
92+ }).collect (Collectors .toSet ());
8393 allTopics .getTopicList ().clear ();
8494 allTopics .getTopicList ().addAll (topics );
8595 return allTopics ;
@@ -123,10 +133,15 @@ public GroupList queryTopicConsumerInfo(String topic) {
123133 public void createOrUpdate (TopicConfigInfo topicCreateOrUpdateRequest ) {
124134 TopicConfig topicConfig = new TopicConfig ();
125135 BeanUtils .copyProperties (topicCreateOrUpdateRequest , topicConfig );
136+ String messageType = topicCreateOrUpdateRequest .getMessageType ();
137+ if (StringUtils .isBlank (messageType )) {
138+ messageType = TopicMessageType .NORMAL .name ();
139+ }
140+ topicConfig .setAttributes (ImmutableMap .of ("+" .concat (TOPIC_MESSAGE_TYPE_ATTRIBUTE .getName ()), messageType ));
126141 try {
127142 ClusterInfo clusterInfo = mqAdminExt .examineBrokerClusterInfo ();
128143 for (String brokerName : changeToBrokerNameSet (clusterInfo .getClusterAddrTable (),
129- topicCreateOrUpdateRequest .getClusterNameList (), topicCreateOrUpdateRequest .getBrokerNameList ())) {
144+ topicCreateOrUpdateRequest .getClusterNameList (), topicCreateOrUpdateRequest .getBrokerNameList ())) {
130145 mqAdminExt .createAndUpdateTopicConfig (clusterInfo .getBrokerAddrTable ().get (brokerName ).selectBrokerAddr (), topicConfig );
131146 }
132147 } catch (Exception err ) {
@@ -156,6 +171,11 @@ public List<TopicConfigInfo> examineTopicConfig(String topic) {
156171 TopicConfig topicConfig = examineTopicConfig (topic , brokerData .getBrokerName ());
157172 BeanUtils .copyProperties (topicConfig , topicConfigInfo );
158173 topicConfigInfo .setBrokerNameList (Lists .newArrayList (brokerData .getBrokerName ()));
174+ String messageType = topicConfig .getAttributes ().get (TOPIC_MESSAGE_TYPE_ATTRIBUTE .getName ());
175+ if (StringUtils .isBlank (messageType )) {
176+ messageType = TopicMessageType .UNSPECIFIED .name ();
177+ }
178+ topicConfigInfo .setMessageType (messageType );
159179 topicConfigInfoList .add (topicConfigInfo );
160180 }
161181 return topicConfigInfoList ;
@@ -226,6 +246,12 @@ public DefaultMQProducer buildDefaultMQProducer(String producerGroup, RPCHook rp
226246 return defaultMQProducer ;
227247 }
228248
249+ public TransactionMQProducer buildTransactionMQProducer (String producerGroup , RPCHook rpcHook , boolean traceEnabled ) {
250+ TransactionMQProducer defaultMQProducer = new TransactionMQProducer (null , producerGroup , rpcHook , traceEnabled , TopicValidator .RMQ_SYS_TRACE_TOPIC );
251+ defaultMQProducer .setUseTLS (configure .isUseTLS ());
252+ return defaultMQProducer ;
253+ }
254+
229255 private TopicList getSystemTopicList () {
230256 RPCHook rpcHook = null ;
231257 boolean isEnableAcl = !StringUtils .isEmpty (configure .getAccessKey ()) && !StringUtils .isEmpty (configure .getSecretKey ());
@@ -249,32 +275,61 @@ private TopicList getSystemTopicList() {
249275
250276 @ Override
251277 public SendResult sendTopicMessageRequest (SendTopicMessageRequest sendTopicMessageRequest ) {
252- DefaultMQProducer producer = null ;
278+ List <TopicConfigInfo > topicConfigInfos = examineTopicConfig (sendTopicMessageRequest .getTopic ());
279+ String messageType = topicConfigInfos .get (0 ).getMessageType ();
253280 AclClientRPCHook rpcHook = null ;
254281 if (configure .isACLEnabled ()) {
255282 rpcHook = new AclClientRPCHook (new SessionCredentials (
256- configure .getAccessKey (),
257- configure .getSecretKey ()
283+ configure .getAccessKey (),
284+ configure .getSecretKey ()
258285 ));
259286 }
260- producer = buildDefaultMQProducer (MixAll .SELF_TEST_PRODUCER_GROUP , rpcHook , sendTopicMessageRequest .isTraceEnabled ());
261- producer .setInstanceName (String .valueOf (System .currentTimeMillis ()));
262- producer .setNamesrvAddr (configure .getNamesrvAddr ());
263- try {
264- producer .start ();
265- Message msg = new Message (sendTopicMessageRequest .getTopic (),
266- sendTopicMessageRequest .getTag (),
267- sendTopicMessageRequest .getKey (),
268- sendTopicMessageRequest .getMessageBody ().getBytes ()
269- );
270- return producer .send (msg );
271- } catch (Exception e ) {
272- Throwables .throwIfUnchecked (e );
273- throw new RuntimeException (e );
274- } finally {
275- waitSendTraceFinish (producer , sendTopicMessageRequest .isTraceEnabled ());
276- producer .shutdown ();
287+ if (TopicMessageType .TRANSACTION .getValue ().equals (messageType )) {
288+ // transaction message
289+ TransactionListener transactionListener = new TransactionListenerImpl ();
290+
291+ TransactionMQProducer producer = buildTransactionMQProducer (MixAll .SELF_TEST_PRODUCER_GROUP , rpcHook , sendTopicMessageRequest .isTraceEnabled ());
292+ producer .setInstanceName (String .valueOf (System .currentTimeMillis ()));
293+ producer .setNamesrvAddr (configure .getNamesrvAddr ());
294+ producer .setTransactionListener (transactionListener );
295+ try {
296+ producer .start ();
297+ Message msg = new Message (sendTopicMessageRequest .getTopic (),
298+ sendTopicMessageRequest .getTag (),
299+ sendTopicMessageRequest .getKey (),
300+ sendTopicMessageRequest .getMessageBody ().getBytes ()
301+ );
302+ return producer .sendMessageInTransaction (msg , null );
303+ } catch (Exception e ) {
304+ Throwables .throwIfUnchecked (e );
305+ throw new RuntimeException (e );
306+ } finally {
307+ waitSendTraceFinish (producer , sendTopicMessageRequest .isTraceEnabled ());
308+ producer .shutdown ();
309+ }
310+ } else {
311+ // no transaction message
312+ DefaultMQProducer producer = null ;
313+ producer = buildDefaultMQProducer (MixAll .SELF_TEST_PRODUCER_GROUP , rpcHook , sendTopicMessageRequest .isTraceEnabled ());
314+ producer .setInstanceName (String .valueOf (System .currentTimeMillis ()));
315+ producer .setNamesrvAddr (configure .getNamesrvAddr ());
316+ try {
317+ producer .start ();
318+ Message msg = new Message (sendTopicMessageRequest .getTopic (),
319+ sendTopicMessageRequest .getTag (),
320+ sendTopicMessageRequest .getKey (),
321+ sendTopicMessageRequest .getMessageBody ().getBytes ()
322+ );
323+ return producer .send (msg );
324+ } catch (Exception e ) {
325+ Throwables .throwIfUnchecked (e );
326+ throw new RuntimeException (e );
327+ } finally {
328+ waitSendTraceFinish (producer , sendTopicMessageRequest .isTraceEnabled ());
329+ producer .shutdown ();
330+ }
277331 }
332+
278333 }
279334
280335 private void waitSendTraceFinish (DefaultMQProducer producer , boolean traceEnabled ) {
@@ -296,4 +351,20 @@ private void waitSendTraceFinish(DefaultMQProducer producer, boolean traceEnable
296351 } catch (Exception ignore ) {
297352 }
298353 }
354+
355+ static class TransactionListenerImpl implements TransactionListener {
356+ private AtomicInteger transactionIndex = new AtomicInteger (0 );
357+
358+ private ConcurrentHashMap <String , Integer > localTrans = new ConcurrentHashMap <>();
359+
360+ @ Override
361+ public LocalTransactionState executeLocalTransaction (Message msg , Object arg ) {
362+ return LocalTransactionState .COMMIT_MESSAGE ;
363+ }
364+
365+ @ Override
366+ public LocalTransactionState checkLocalTransaction (MessageExt msg ) {
367+ return LocalTransactionState .COMMIT_MESSAGE ;
368+ }
369+ }
299370}
0 commit comments