Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@
import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.KeyBuilder;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.PopAckConstants;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.CommandCallback;
Expand Down Expand Up @@ -159,15 +161,25 @@ public void notifyMessageArrivingWithRetryTopic(final String topic, final int qu

public void notifyMessageArrivingWithRetryTopic(final String topic, final int queueId, long offset,
Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
String notifyTopic;
if (KeyBuilder.isPopRetryTopicV2(topic)) {
notifyTopic = KeyBuilder.parseNormalTopic(topic);
String prefix = MixAll.RETRY_GROUP_TOPIC_PREFIX;
if (topic.startsWith(prefix)) {
String originTopic = properties.get(MessageConst.PROPERTY_ORIGIN_TOPIC);
String suffix = "_" + originTopic;
String cid = topic.substring(prefix.length(), topic.length() - suffix.length());
POP_LOGGER.info("Processing retry topic: {}, originTopic: {}, properties: {}",
topic, originTopic, properties);
POP_LOGGER.info("Extracted cid: {} from retry topic: {}", cid, topic);
long interval = brokerController.getBrokerConfig().getPopLongPollingForceNotifyInterval();
boolean force = interval > 0L && offset % interval == 0L;
if (queueId >= 0) {
notifyMessageArriving(originTopic, -1, cid, force, tagsCode, msgStoreTime, filterBitMap, properties);
}
notifyMessageArriving(originTopic, queueId, cid, force, tagsCode, msgStoreTime, filterBitMap, properties);
} else {
notifyTopic = topic;
notifyMessageArriving(topic, queueId, offset, tagsCode, msgStoreTime, filterBitMap, properties);
}
notifyMessageArriving(notifyTopic, queueId, offset, tagsCode, msgStoreTime, filterBitMap, properties);
}

public void notifyMessageArriving(final String topic, final int queueId, long offset,
Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
ConcurrentHashMap<String, Byte> cids = topicCidMap.get(topic);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ private boolean reviveRetry(PopCheckPoint popCheckPoint, MessageExt messageExt)
if (messageExt.getReconsumeTimes() == 0 || msgInner.getProperties().get(MessageConst.PROPERTY_FIRST_POP_TIME) == null) {
msgInner.getProperties().put(MessageConst.PROPERTY_FIRST_POP_TIME, String.valueOf(popCheckPoint.getPopTime()));
}
msgInner.getProperties().computeIfAbsent(MessageConst.PROPERTY_ORIGIN_TOPIC, k -> popCheckPoint.getTopic());
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
addRetryTopicIfNotExist(msgInner.getTopic(), popCheckPoint.getCId());
PutMessageResult putMessageResult = brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ public class MessageConst {
public static final String PROPERTY_WAIT_STORE_MSG_OK = "WAIT";
public static final String PROPERTY_DELAY_TIME_LEVEL = "DELAY";
public static final String PROPERTY_RETRY_TOPIC = "RETRY_TOPIC";
public static final String PROPERTY_ORIGIN_TOPIC = "ORIGIN_TOPIC"; //Distinct for retry topic
public static final String PROPERTY_REAL_TOPIC = "REAL_TOPIC";
public static final String PROPERTY_REAL_QUEUE_ID = "REAL_QID";
public static final String PROPERTY_TRANSACTION_PREPARED = "TRAN_MSG";
Expand Down Expand Up @@ -113,6 +114,7 @@ public class MessageConst {
STRING_HASH_SET.add(PROPERTY_WAIT_STORE_MSG_OK);
STRING_HASH_SET.add(PROPERTY_DELAY_TIME_LEVEL);
STRING_HASH_SET.add(PROPERTY_RETRY_TOPIC);
STRING_HASH_SET.add(PROPERTY_ORIGIN_TOPIC);
STRING_HASH_SET.add(PROPERTY_REAL_TOPIC);
STRING_HASH_SET.add(PROPERTY_REAL_QUEUE_ID);
STRING_HASH_SET.add(PROPERTY_TRANSACTION_PREPARED);
Expand Down