Skip to content
Open
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.stats.Rate;
import org.apache.pulsar.common.topics.TopicCompactionStrategy;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.FutureUtil;
Expand Down Expand Up @@ -294,6 +295,8 @@ protected TopicStatsHelper initialValue() {
@Getter
private final PersistentTopicMetrics persistentTopicMetrics = new PersistentTopicMetrics();

private final Rate exceedTTLDelayedMessage = new Rate();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exceedTTLDelayedMessage doesn't read well. It seems that ttlExceededDelayedMessagesRate would be more accurate for this field.

In addition, it would be necessary to add a comment to this field to explain it.


private volatile PersistentTopicAttributes persistentTopicAttributes = null;
private static final AtomicReferenceFieldUpdater<PersistentTopic, PersistentTopicAttributes>
PERSISTENT_TOPIC_ATTRIBUTES_FIELD_UPDATER = AtomicReferenceFieldUpdater.newUpdater(
Expand Down Expand Up @@ -2752,6 +2755,7 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats
this.addEntryLatencyStatsUsec.refresh();
NamespaceStats.add(this.addEntryLatencyStatsUsec.getBuckets(), nsStats.addLatencyBucket);
this.addEntryLatencyStatsUsec.reset();
this.exceedTTLDelayedMessage.calculateRate();
}

public double getLastUpdatedAvgPublishRateInMsg() {
Expand Down Expand Up @@ -2825,6 +2829,7 @@ public CompletableFuture<? extends TopicStatsImpl> asyncGetStats(GetStatsOptions
stats.ongoingTxnCount = txnBuffer.getOngoingTxnCount();
stats.abortedTxnCount = txnBuffer.getAbortedTxnCount();
stats.committedTxnCount = txnBuffer.getCommittedTxnCount();
stats.exceedTTLDelayedMessages = getExceedTTLDelayedMessages();

replicators.forEach((cluster, replicator) -> {
ReplicatorStatsImpl replicatorStats = replicator.computeStats();
Expand Down Expand Up @@ -4787,10 +4792,14 @@ public Optional<TopicName> getShadowSourceTopic() {
protected boolean isExceedMaximumDeliveryDelay(ByteBuf headersAndPayload) {
if (isDelayedDeliveryEnabled()) {
long maxDeliveryDelayInMs = getDelayedDeliveryMaxDelayInMillis();
headersAndPayload.markReaderIndex();
MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload);
headersAndPayload.resetReaderIndex();
// count exceed ttl delayed messages
if (isExceedMessageTTL(msgMetadata)) {
this.incrementExceedTTLDelayedMessages();
}
if (maxDeliveryDelayInMs > 0) {
headersAndPayload.markReaderIndex();
MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload);
headersAndPayload.resetReaderIndex();
return msgMetadata.hasDeliverAtTime()
&& msgMetadata.getDeliverAtTime() - msgMetadata.getPublishTime() > maxDeliveryDelayInMs;
}
Expand Down Expand Up @@ -4867,4 +4876,30 @@ public void readEntryFailed(ManagedLedgerException exception, Object ctx) {

return future;
}

/**
* Check if the message deliver time is expired by message TTL.
*
* @param msgMetadata the message metadata
* @return true if the message deliver time is expired by message TTL, false otherwise
*/
protected boolean isExceedMessageTTL(MessageMetadata msgMetadata) {
Integer messageTTLInSeconds = topicPolicies.getMessageTTLInSeconds().get();
if (messageTTLInSeconds == null || messageTTLInSeconds <= 0) {
return false;
}
if (!msgMetadata.hasDeliverAtTime()) {
return false;
}
return msgMetadata.getDeliverAtTime() >= (messageTTLInSeconds * 1000L) + System.currentTimeMillis();
}


public void incrementExceedTTLDelayedMessages() {
this.exceedTTLDelayedMessage.recordEvent();
}

public long getExceedTTLDelayedMessages() {
return this.exceedTTLDelayedMessage.getTotalCount();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,9 @@ public class TopicStatsImpl implements TopicStats {
/** The last publish timestamp in epoch milliseconds. */
public long lastPublishTimeStamp;

/** The number of delay messages that exceed TTL delay. */
public long exceedTTLDelayedMessages;
Comment on lines +189 to +190
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similar comment about exceedTTLDelayedMessages as in the previous comment. It doesn't read well. I think that ttlExceededDelayedMessages would be better. In the comment, it should be made explicit that this is the number of delayed messsages that exceeded TTL at publish time.


public List<? extends PublisherStats> getPublishers() {
return Stream.concat(publishers.stream().sorted(
Comparator.comparing(PublisherStatsImpl::getProducerName, nullsLast(naturalOrder()))),
Expand Down Expand Up @@ -264,6 +267,7 @@ public void reset() {
this.oldestBacklogMessageSubscriptionName = null;
this.topicCreationTimeStamp = 0;
this.lastPublishTimeStamp = 0;
this.exceedTTLDelayedMessages = 0;
}

// if the stats are added for the 1st time, we will need to make a copy of these stats and add it to the current
Expand Down Expand Up @@ -295,6 +299,7 @@ public TopicStatsImpl add(TopicStats ts) {
this.committedTxnCount = stats.committedTxnCount;
this.backlogQuotaLimitTime = stats.backlogQuotaLimitTime;
this.backlogQuotaLimitSize = stats.backlogQuotaLimitSize;
this.exceedTTLDelayedMessages += stats.exceedTTLDelayedMessages;
if (stats.oldestBacklogMessageAgeSeconds > this.oldestBacklogMessageAgeSeconds) {
this.oldestBacklogMessageAgeSeconds = stats.oldestBacklogMessageAgeSeconds;
this.oldestBacklogMessageSubscriptionName = stats.oldestBacklogMessageSubscriptionName;
Expand Down