|
17 | 17 | package org.apache.rocketmq.dashboard.service.client; |
18 | 18 |
|
19 | 19 | import com.google.common.base.Throwables; |
| 20 | +import java.io.UnsupportedEncodingException; |
| 21 | +import java.util.List; |
| 22 | +import java.util.Map; |
| 23 | +import java.util.Properties; |
| 24 | +import java.util.Set; |
20 | 25 | import org.apache.rocketmq.client.QueryResult; |
21 | 26 | import org.apache.rocketmq.client.exception.MQBrokerException; |
22 | 27 | import org.apache.rocketmq.client.exception.MQClientException; |
| 28 | +import org.apache.rocketmq.client.impl.MQAdminImpl; |
23 | 29 | import org.apache.rocketmq.common.AclConfig; |
24 | 30 | import org.apache.rocketmq.common.PlainAccessConfig; |
25 | 31 | import org.apache.rocketmq.common.TopicConfig; |
| 32 | +import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats; |
| 33 | +import org.apache.rocketmq.remoting.protocol.admin.RollbackStats; |
| 34 | +import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable; |
26 | 35 | import org.apache.rocketmq.common.message.MessageClientIDSetter; |
27 | 36 | import org.apache.rocketmq.common.message.MessageExt; |
28 | 37 | import org.apache.rocketmq.common.message.MessageQueue; |
29 | 38 | import org.apache.rocketmq.common.message.MessageRequestMode; |
30 | | -import org.apache.rocketmq.dashboard.util.JsonUtil; |
31 | | -import org.apache.rocketmq.remoting.RemotingClient; |
32 | | -import org.apache.rocketmq.remoting.exception.RemotingCommandException; |
33 | | -import org.apache.rocketmq.remoting.exception.RemotingConnectException; |
34 | | -import org.apache.rocketmq.remoting.exception.RemotingException; |
35 | | -import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; |
36 | | -import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; |
37 | | -import org.apache.rocketmq.remoting.protocol.RemotingCommand; |
38 | 39 | import org.apache.rocketmq.remoting.protocol.RequestCode; |
39 | 40 | import org.apache.rocketmq.remoting.protocol.ResponseCode; |
40 | | -import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats; |
41 | | -import org.apache.rocketmq.remoting.protocol.admin.RollbackStats; |
42 | | -import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable; |
43 | 41 | import org.apache.rocketmq.remoting.protocol.body.BrokerReplicasInfo; |
44 | 42 | import org.apache.rocketmq.remoting.protocol.body.BrokerStatsData; |
45 | 43 | import org.apache.rocketmq.remoting.protocol.body.ClusterAclVersionInfo; |
|
66 | 64 | import org.apache.rocketmq.remoting.protocol.statictopic.TopicQueueMappingDetail; |
67 | 65 | import org.apache.rocketmq.remoting.protocol.subscription.GroupForbidden; |
68 | 66 | import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; |
| 67 | +import org.apache.rocketmq.dashboard.util.JsonUtil; |
| 68 | +import org.apache.rocketmq.remoting.RemotingClient; |
| 69 | +import org.apache.rocketmq.remoting.exception.RemotingCommandException; |
| 70 | +import org.apache.rocketmq.remoting.exception.RemotingConnectException; |
| 71 | +import org.apache.rocketmq.remoting.exception.RemotingException; |
| 72 | +import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; |
| 73 | +import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; |
| 74 | +import org.apache.rocketmq.remoting.protocol.RemotingCommand; |
69 | 75 | import org.apache.rocketmq.tools.admin.MQAdminExt; |
70 | 76 | import org.apache.rocketmq.tools.admin.api.BrokerOperatorResult; |
71 | 77 | import org.apache.rocketmq.tools.admin.api.MessageTrack; |
72 | 78 | import org.apache.rocketmq.tools.admin.common.AdminToolResult; |
| 79 | +import org.joor.Reflect; |
73 | 80 | import org.slf4j.Logger; |
74 | 81 | import org.slf4j.LoggerFactory; |
75 | 82 | import org.springframework.stereotype.Service; |
76 | 83 |
|
77 | | -import java.io.UnsupportedEncodingException; |
78 | | -import java.util.List; |
79 | | -import java.util.Map; |
80 | | -import java.util.Properties; |
81 | | -import java.util.Set; |
82 | | - |
83 | 84 | import static org.apache.rocketmq.remoting.protocol.RemotingSerializable.decode; |
84 | 85 |
|
85 | 86 | @Service |
@@ -461,18 +462,23 @@ public MessageExt viewMessage(String topic, |
461 | 462 | logger.info("MessageClientIDSetter.getNearlyTimeFromID(msgId)={} msgId={}", MessageClientIDSetter.getNearlyTimeFromID(msgId), msgId); |
462 | 463 | try { |
463 | 464 | return viewMessage(msgId); |
| 465 | + } catch (Exception e) { |
464 | 466 | } |
465 | | - catch (Exception e) { |
466 | | - } |
467 | | - |
| 467 | + MQAdminImpl mqAdminImpl = MQAdminInstance.threadLocalMqClientInstance().getMQAdminImpl(); |
468 | 468 | Set<String> clusterList = MQAdminInstance.threadLocalMQAdminExt().getTopicClusterList(topic); |
469 | 469 | if (clusterList == null || clusterList.isEmpty()) { |
470 | | - return MQAdminInstance.threadLocalMQAdminExt().queryMessage("", topic, msgId); |
471 | | - } |
472 | | - for (String name : clusterList) { |
473 | | - MessageExt messageExt = MQAdminInstance.threadLocalMQAdminExt().queryMessage(name, topic, msgId); |
474 | | - if (messageExt != null) { |
475 | | - return messageExt; |
| 470 | + QueryResult qr = Reflect.on(mqAdminImpl).call("queryMessage", "", topic, msgId, 32, |
| 471 | + MessageClientIDSetter.getNearlyTimeFromID(msgId).getTime(), Long.MAX_VALUE, true).get(); |
| 472 | + if (qr != null && qr.getMessageList() != null && !qr.getMessageList().isEmpty()) { |
| 473 | + return qr.getMessageList().get(0); |
| 474 | + } |
| 475 | + } else { |
| 476 | + for (String name : clusterList) { |
| 477 | + QueryResult qr = Reflect.on(mqAdminImpl).call("queryMessage", name, topic, msgId, 32, |
| 478 | + MessageClientIDSetter.getNearlyTimeFromID(msgId).getTime(), Long.MAX_VALUE, true).get(); |
| 479 | + if (qr != null && qr.getMessageList() != null && !qr.getMessageList().isEmpty()) { |
| 480 | + return qr.getMessageList().get(0); |
| 481 | + } |
476 | 482 | } |
477 | 483 | } |
478 | 484 | return null; |
|
0 commit comments