Skip to content
Open
Original file line number Diff line number Diff line change
Expand Up @@ -2252,4 +2252,17 @@ public HealthChecker getHealthChecker() {
}
return healthChecker;
}

/**
* Check if message delay time exceeds TTL
*
* @param topic
* @param deliverAtTime
* @return true if message delay time exceeds TTL, false otherwise
*/
public boolean isMessageDelayTimeExceedTTL(Topic topic, long deliverAtTime) {
return deliverAtTime
>= topic.getHierarchyTopicPolicies().getMessageTTLInSeconds().get() * 1000 + System.currentTimeMillis();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2042,6 +2042,13 @@ protected void handleSend(CommandSend send, ByteBuf headersAndPayload) {
producer.publishMessage(send.getProducerId(), send.getSequenceId(), headersAndPayload,
send.getNumMessages(), send.isIsChunk(), send.isMarker(), position);
}

// count delayed message times exceeding the ttl policy time
MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload);
if (msgMetadata.hasDeliverAtTime() && service.getPulsar()
.isMessageDelayTimeExceedTTL(producer.getTopic(), msgMetadata.getDeliverAtTime())) {
producer.getTopic().incrementExceedTTLDelayMessages();
}
}

private void printSendCommandDebug(CommandSend send, ByteBuf headersAndPayload) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,4 +391,18 @@ default boolean isSystemTopic() {
* @return
*/
TopicAttributes getTopicAttributes();

/**
* Increment exceed TTL delay message number.
*/
default void incrementExceedTTLDelayMessages() {
}

/**
* Get exceed TTL delay messages number
* @return
*/
default long getExceedTTLDelayMessages() {
return 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.stats.Rate;
import org.apache.pulsar.common.topics.TopicCompactionStrategy;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.FutureUtil;
Expand Down Expand Up @@ -294,6 +295,8 @@ protected TopicStatsHelper initialValue() {
@Getter
private final PersistentTopicMetrics persistentTopicMetrics = new PersistentTopicMetrics();

private final Rate exceedTTLDelayMessage = new Rate();

private volatile PersistentTopicAttributes persistentTopicAttributes = null;
private static final AtomicReferenceFieldUpdater<PersistentTopic, PersistentTopicAttributes>
PERSISTENT_TOPIC_ATTRIBUTES_FIELD_UPDATER = AtomicReferenceFieldUpdater.newUpdater(
Expand Down Expand Up @@ -2752,6 +2755,7 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats
this.addEntryLatencyStatsUsec.refresh();
NamespaceStats.add(this.addEntryLatencyStatsUsec.getBuckets(), nsStats.addLatencyBucket);
this.addEntryLatencyStatsUsec.reset();
this.exceedTTLDelayMessage.calculateRate();
}

public double getLastUpdatedAvgPublishRateInMsg() {
Expand Down Expand Up @@ -2825,6 +2829,7 @@ public CompletableFuture<? extends TopicStatsImpl> asyncGetStats(GetStatsOptions
stats.ongoingTxnCount = txnBuffer.getOngoingTxnCount();
stats.abortedTxnCount = txnBuffer.getAbortedTxnCount();
stats.committedTxnCount = txnBuffer.getCommittedTxnCount();
stats.exceedTTLDelayMessages = getExceedTTLDelayMessages();

replicators.forEach((cluster, replicator) -> {
ReplicatorStatsImpl replicatorStats = replicator.computeStats();
Expand Down Expand Up @@ -4867,4 +4872,14 @@ public void readEntryFailed(ManagedLedgerException exception, Object ctx) {

return future;
}

@Override
public void incrementExceedTTLDelayMessages() {
this.exceedTTLDelayMessage.recordEvent();
}

@Override
public long getExceedTTLDelayMessages() {
return this.exceedTTLDelayMessage.getCount();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,9 @@ public class TopicStatsImpl implements TopicStats {
/** The last publish timestamp in epoch milliseconds. */
public long lastPublishTimeStamp;

/** The number of delay messages that exceed TTL delay. */
public long exceedTTLDelayMessages;

public List<? extends PublisherStats> getPublishers() {
return Stream.concat(publishers.stream().sorted(
Comparator.comparing(PublisherStatsImpl::getProducerName, nullsLast(naturalOrder()))),
Expand Down Expand Up @@ -264,6 +267,7 @@ public void reset() {
this.oldestBacklogMessageSubscriptionName = null;
this.topicCreationTimeStamp = 0;
this.lastPublishTimeStamp = 0;
this.exceedTTLDelayMessages = 0;
}

// if the stats are added for the 1st time, we will need to make a copy of these stats and add it to the current
Expand Down Expand Up @@ -295,6 +299,7 @@ public TopicStatsImpl add(TopicStats ts) {
this.committedTxnCount = stats.committedTxnCount;
this.backlogQuotaLimitTime = stats.backlogQuotaLimitTime;
this.backlogQuotaLimitSize = stats.backlogQuotaLimitSize;
this.exceedTTLDelayMessages += stats.exceedTTLDelayMessages;
if (stats.oldestBacklogMessageAgeSeconds > this.oldestBacklogMessageAgeSeconds) {
this.oldestBacklogMessageAgeSeconds = stats.oldestBacklogMessageAgeSeconds;
this.oldestBacklogMessageSubscriptionName = stats.oldestBacklogMessageSubscriptionName;
Expand Down