Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@
import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.KeyBuilder;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.PopAckConstants;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.CommandCallback;
Expand Down Expand Up @@ -159,15 +161,28 @@ public void notifyMessageArrivingWithRetryTopic(final String topic, final int qu

public void notifyMessageArrivingWithRetryTopic(final String topic, final int queueId, long offset,
Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
String notifyTopic;
if (KeyBuilder.isPopRetryTopicV2(topic)) {
notifyTopic = KeyBuilder.parseNormalTopic(topic);
String prefix = MixAll.RETRY_GROUP_TOPIC_PREFIX;
if (topic.startsWith(prefix)) {
// 从properties获取原始topic名称
String originTopic = properties.get(MessageConst.PROPERTY_ORIGIN_TOPIC);
//根据原始topic和retryTopic,最后获得retryTopic对应的cid (可能还可以与topicCidMap验证一下)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delete Chinese comments

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.
Thanks for your review.

String suffix = "_" + originTopic; //这里把下划线换成加号也是一样的
String cid = topic.substring(prefix.length(), topic.length() - suffix.length());
POP_LOGGER.info("Processing retry topic: {}, originTopic: {}, properties: {}",
topic, originTopic, properties); //grep "Processing retry topic" ~/logs/rocketmqlogs/pop.log可以看到日志
POP_LOGGER.info("Extracted cid: {} from retry topic: {}", cid, topic);
//然后调用包含cid的notifyMessageArriving
long interval = brokerController.getBrokerConfig().getPopLongPollingForceNotifyInterval();
boolean force = interval > 0L && offset % interval == 0L;
if (queueId >= 0) {
notifyMessageArriving(originTopic, -1, cid, force, tagsCode, msgStoreTime, filterBitMap, properties);
}
notifyMessageArriving(originTopic, queueId, cid, force, tagsCode, msgStoreTime, filterBitMap, properties);
} else {
notifyTopic = topic;
//普通消息(非重试消息)还是走之前的逻辑不变
notifyMessageArriving(topic, queueId, offset, tagsCode, msgStoreTime, filterBitMap, properties);
}
notifyMessageArriving(notifyTopic, queueId, offset, tagsCode, msgStoreTime, filterBitMap, properties);
}

public void notifyMessageArriving(final String topic, final int queueId, long offset,
Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
ConcurrentHashMap<String, Byte> cids = topicCidMap.get(topic);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ private boolean reviveRetry(PopCheckPoint popCheckPoint, MessageExt messageExt)
if (messageExt.getReconsumeTimes() == 0 || msgInner.getProperties().get(MessageConst.PROPERTY_FIRST_POP_TIME) == null) {
msgInner.getProperties().put(MessageConst.PROPERTY_FIRST_POP_TIME, String.valueOf(popCheckPoint.getPopTime()));
}
msgInner.getProperties().computeIfAbsent(MessageConst.PROPERTY_ORIGIN_TOPIC, k -> popCheckPoint.getTopic());
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
addRetryTopicIfNotExist(msgInner.getTopic(), popCheckPoint.getCId());
PutMessageResult putMessageResult = brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ public class MessageConst {
public static final String PROPERTY_WAIT_STORE_MSG_OK = "WAIT";
public static final String PROPERTY_DELAY_TIME_LEVEL = "DELAY";
public static final String PROPERTY_RETRY_TOPIC = "RETRY_TOPIC";
public static final String PROPERTY_ORIGIN_TOPIC = "ORIGIN_TOPIC"; //Distinct for retry topic
public static final String PROPERTY_REAL_TOPIC = "REAL_TOPIC";
public static final String PROPERTY_REAL_QUEUE_ID = "REAL_QID";
public static final String PROPERTY_TRANSACTION_PREPARED = "TRAN_MSG";
Expand Down Expand Up @@ -113,6 +114,7 @@ public class MessageConst {
STRING_HASH_SET.add(PROPERTY_WAIT_STORE_MSG_OK);
STRING_HASH_SET.add(PROPERTY_DELAY_TIME_LEVEL);
STRING_HASH_SET.add(PROPERTY_RETRY_TOPIC);
STRING_HASH_SET.add(PROPERTY_ORIGIN_TOPIC);
STRING_HASH_SET.add(PROPERTY_REAL_TOPIC);
STRING_HASH_SET.add(PROPERTY_REAL_QUEUE_ID);
STRING_HASH_SET.add(PROPERTY_TRANSACTION_PREPARED);
Expand Down