diff --git a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java index e87a8e803fd..49d56d88191 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java @@ -27,9 +27,11 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.common.KeyBuilder; +import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.PopAckConstants; import org.apache.rocketmq.common.ServiceThread; import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.remoting.CommandCallback; @@ -159,15 +161,25 @@ public void notifyMessageArrivingWithRetryTopic(final String topic, final int qu public void notifyMessageArrivingWithRetryTopic(final String topic, final int queueId, long offset, Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map properties) { - String notifyTopic; - if (KeyBuilder.isPopRetryTopicV2(topic)) { - notifyTopic = KeyBuilder.parseNormalTopic(topic); + String prefix = MixAll.RETRY_GROUP_TOPIC_PREFIX; + if (topic.startsWith(prefix)) { + String originTopic = properties.get(MessageConst.PROPERTY_ORIGIN_TOPIC); + String suffix = "_" + originTopic; + String cid = topic.substring(prefix.length(), topic.length() - suffix.length()); + POP_LOGGER.info("Processing retry topic: {}, originTopic: {}, properties: {}", + topic, originTopic, properties); + POP_LOGGER.info("Extracted cid: {} from retry topic: {}", cid, topic); + long interval = brokerController.getBrokerConfig().getPopLongPollingForceNotifyInterval(); + boolean force = interval > 0L && offset % interval == 0L; + if (queueId >= 0) { + notifyMessageArriving(originTopic, -1, cid, force, tagsCode, msgStoreTime, filterBitMap, properties); + } + notifyMessageArriving(originTopic, queueId, cid, force, tagsCode, msgStoreTime, filterBitMap, properties); } else { - notifyTopic = topic; + notifyMessageArriving(topic, queueId, offset, tagsCode, msgStoreTime, filterBitMap, properties); } - notifyMessageArriving(notifyTopic, queueId, offset, tagsCode, msgStoreTime, filterBitMap, properties); } - + public void notifyMessageArriving(final String topic, final int queueId, long offset, Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map properties) { ConcurrentHashMap cids = topicCidMap.get(topic); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java index 2be41a69d63..3e77c16593d 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java @@ -129,6 +129,7 @@ private boolean reviveRetry(PopCheckPoint popCheckPoint, MessageExt messageExt) if (messageExt.getReconsumeTimes() == 0 || msgInner.getProperties().get(MessageConst.PROPERTY_FIRST_POP_TIME) == null) { msgInner.getProperties().put(MessageConst.PROPERTY_FIRST_POP_TIME, String.valueOf(popCheckPoint.getPopTime())); } + msgInner.getProperties().computeIfAbsent(MessageConst.PROPERTY_ORIGIN_TOPIC, k -> popCheckPoint.getTopic()); msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); addRetryTopicIfNotExist(msgInner.getTopic(), popCheckPoint.getCId()); PutMessageResult putMessageResult = brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner); diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java index 24f7bdb99a5..06f2b35c2d6 100644 --- a/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java +++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java @@ -24,6 +24,7 @@ public class MessageConst { public static final String PROPERTY_WAIT_STORE_MSG_OK = "WAIT"; public static final String PROPERTY_DELAY_TIME_LEVEL = "DELAY"; public static final String PROPERTY_RETRY_TOPIC = "RETRY_TOPIC"; + public static final String PROPERTY_ORIGIN_TOPIC = "ORIGIN_TOPIC"; //Distinct for retry topic public static final String PROPERTY_REAL_TOPIC = "REAL_TOPIC"; public static final String PROPERTY_REAL_QUEUE_ID = "REAL_QID"; public static final String PROPERTY_TRANSACTION_PREPARED = "TRAN_MSG"; @@ -113,6 +114,7 @@ public class MessageConst { STRING_HASH_SET.add(PROPERTY_WAIT_STORE_MSG_OK); STRING_HASH_SET.add(PROPERTY_DELAY_TIME_LEVEL); STRING_HASH_SET.add(PROPERTY_RETRY_TOPIC); + STRING_HASH_SET.add(PROPERTY_ORIGIN_TOPIC); STRING_HASH_SET.add(PROPERTY_REAL_TOPIC); STRING_HASH_SET.add(PROPERTY_REAL_QUEUE_ID); STRING_HASH_SET.add(PROPERTY_TRANSACTION_PREPARED);