diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 2f0255f4f2aae..306d1d72b650e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -187,6 +187,7 @@ import org.apache.pulsar.common.protocol.schema.SchemaData; import org.apache.pulsar.common.protocol.schema.SchemaVersion; import org.apache.pulsar.common.schema.SchemaType; +import org.apache.pulsar.common.stats.Rate; import org.apache.pulsar.common.topics.TopicCompactionStrategy; import org.apache.pulsar.common.util.Codec; import org.apache.pulsar.common.util.FutureUtil; @@ -294,6 +295,8 @@ protected TopicStatsHelper initialValue() { @Getter private final PersistentTopicMetrics persistentTopicMetrics = new PersistentTopicMetrics(); + private final Rate exceedTTLDelayedMessage = new Rate(); + private volatile PersistentTopicAttributes persistentTopicAttributes = null; private static final AtomicReferenceFieldUpdater PERSISTENT_TOPIC_ATTRIBUTES_FIELD_UPDATER = AtomicReferenceFieldUpdater.newUpdater( @@ -2752,6 +2755,7 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats this.addEntryLatencyStatsUsec.refresh(); NamespaceStats.add(this.addEntryLatencyStatsUsec.getBuckets(), nsStats.addLatencyBucket); this.addEntryLatencyStatsUsec.reset(); + this.exceedTTLDelayedMessage.calculateRate(); } public double getLastUpdatedAvgPublishRateInMsg() { @@ -2825,6 +2829,7 @@ public CompletableFuture asyncGetStats(GetStatsOptions stats.ongoingTxnCount = txnBuffer.getOngoingTxnCount(); stats.abortedTxnCount = txnBuffer.getAbortedTxnCount(); stats.committedTxnCount = txnBuffer.getCommittedTxnCount(); + stats.exceedTTLDelayedMessages = getExceedTTLDelayedMessages(); replicators.forEach((cluster, replicator) -> { ReplicatorStatsImpl replicatorStats = replicator.computeStats(); @@ -4787,13 +4792,22 @@ public Optional getShadowSourceTopic() { protected boolean isExceedMaximumDeliveryDelay(ByteBuf headersAndPayload) { if (isDelayedDeliveryEnabled()) { long maxDeliveryDelayInMs = getDelayedDeliveryMaxDelayInMillis(); - if (maxDeliveryDelayInMs > 0) { - headersAndPayload.markReaderIndex(); - MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload); - headersAndPayload.resetReaderIndex(); - return msgMetadata.hasDeliverAtTime() - && msgMetadata.getDeliverAtTime() - msgMetadata.getPublishTime() > maxDeliveryDelayInMs; + Integer messageTTLInSeconds = topicPolicies.getMessageTTLInSeconds().get(); + if (maxDeliveryDelayInMs <= 0 && (messageTTLInSeconds == null || messageTTLInSeconds <= 0)) { + return false; + } + headersAndPayload.markReaderIndex(); + MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload); + headersAndPayload.resetReaderIndex(); + if (!msgMetadata.hasDeliverAtTime()) { + return false; + } + long deliverAtTime = msgMetadata.getDeliverAtTime(); + // count exceed ttl delayed messages + if (deliverAtTime >= (messageTTLInSeconds * 1000L) + System.currentTimeMillis()) { + this.incrementExceedTTLDelayedMessages(); } + return deliverAtTime - msgMetadata.getPublishTime() > maxDeliveryDelayInMs; } return false; } @@ -4867,4 +4881,12 @@ public void readEntryFailed(ManagedLedgerException exception, Object ctx) { return future; } + + public void incrementExceedTTLDelayedMessages() { + this.exceedTTLDelayedMessage.recordEvent(); + } + + public long getExceedTTLDelayedMessages() { + return this.exceedTTLDelayedMessage.getTotalCount(); + } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java index 523ec5ac87d97..b39638df249b8 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java @@ -186,6 +186,9 @@ public class TopicStatsImpl implements TopicStats { /** The last publish timestamp in epoch milliseconds. */ public long lastPublishTimeStamp; + /** The number of delay messages that exceed TTL delay. */ + public long exceedTTLDelayedMessages; + public List getPublishers() { return Stream.concat(publishers.stream().sorted( Comparator.comparing(PublisherStatsImpl::getProducerName, nullsLast(naturalOrder()))), @@ -264,6 +267,7 @@ public void reset() { this.oldestBacklogMessageSubscriptionName = null; this.topicCreationTimeStamp = 0; this.lastPublishTimeStamp = 0; + this.exceedTTLDelayedMessages = 0; } // if the stats are added for the 1st time, we will need to make a copy of these stats and add it to the current @@ -295,6 +299,7 @@ public TopicStatsImpl add(TopicStats ts) { this.committedTxnCount = stats.committedTxnCount; this.backlogQuotaLimitTime = stats.backlogQuotaLimitTime; this.backlogQuotaLimitSize = stats.backlogQuotaLimitSize; + this.exceedTTLDelayedMessages += stats.exceedTTLDelayedMessages; if (stats.oldestBacklogMessageAgeSeconds > this.oldestBacklogMessageAgeSeconds) { this.oldestBacklogMessageAgeSeconds = stats.oldestBacklogMessageAgeSeconds; this.oldestBacklogMessageSubscriptionName = stats.oldestBacklogMessageSubscriptionName;