Skip to content
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.stats.Rate;
import org.apache.pulsar.common.topics.TopicCompactionStrategy;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.FutureUtil;
Expand Down Expand Up @@ -294,6 +295,8 @@ protected TopicStatsHelper initialValue() {
@Getter
private final PersistentTopicMetrics persistentTopicMetrics = new PersistentTopicMetrics();

private final Rate exceedTTLDelayedMessage = new Rate();

private volatile PersistentTopicAttributes persistentTopicAttributes = null;
private static final AtomicReferenceFieldUpdater<PersistentTopic, PersistentTopicAttributes>
PERSISTENT_TOPIC_ATTRIBUTES_FIELD_UPDATER = AtomicReferenceFieldUpdater.newUpdater(
Expand Down Expand Up @@ -2752,6 +2755,7 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats
this.addEntryLatencyStatsUsec.refresh();
NamespaceStats.add(this.addEntryLatencyStatsUsec.getBuckets(), nsStats.addLatencyBucket);
this.addEntryLatencyStatsUsec.reset();
this.exceedTTLDelayedMessage.calculateRate();
}

public double getLastUpdatedAvgPublishRateInMsg() {
Expand Down Expand Up @@ -2825,6 +2829,7 @@ public CompletableFuture<? extends TopicStatsImpl> asyncGetStats(GetStatsOptions
stats.ongoingTxnCount = txnBuffer.getOngoingTxnCount();
stats.abortedTxnCount = txnBuffer.getAbortedTxnCount();
stats.committedTxnCount = txnBuffer.getCommittedTxnCount();
stats.exceedTTLDelayedMessages = getExceedTTLDelayedMessages();

replicators.forEach((cluster, replicator) -> {
ReplicatorStatsImpl replicatorStats = replicator.computeStats();
Expand Down Expand Up @@ -4787,13 +4792,22 @@ public Optional<TopicName> getShadowSourceTopic() {
protected boolean isExceedMaximumDeliveryDelay(ByteBuf headersAndPayload) {
if (isDelayedDeliveryEnabled()) {
long maxDeliveryDelayInMs = getDelayedDeliveryMaxDelayInMillis();
if (maxDeliveryDelayInMs > 0) {
headersAndPayload.markReaderIndex();
MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload);
headersAndPayload.resetReaderIndex();
return msgMetadata.hasDeliverAtTime()
&& msgMetadata.getDeliverAtTime() - msgMetadata.getPublishTime() > maxDeliveryDelayInMs;
Integer messageTTLInSeconds = topicPolicies.getMessageTTLInSeconds().get();
if (maxDeliveryDelayInMs <= 0 && (messageTTLInSeconds == null || messageTTLInSeconds <= 0)) {
return false;
}
headersAndPayload.markReaderIndex();
MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload);
headersAndPayload.resetReaderIndex();
if (!msgMetadata.hasDeliverAtTime()) {
return false;
}
long deliverAtTime = msgMetadata.getDeliverAtTime();
// count exceed ttl delayed messages
if (deliverAtTime >= (messageTTLInSeconds * 1000L) + System.currentTimeMillis()) {
this.incrementExceedTTLDelayedMessages();
}
return deliverAtTime - msgMetadata.getPublishTime() > maxDeliveryDelayInMs;
}
return false;
}
Expand Down Expand Up @@ -4867,4 +4881,12 @@ public void readEntryFailed(ManagedLedgerException exception, Object ctx) {

return future;
}

public void incrementExceedTTLDelayedMessages() {
this.exceedTTLDelayedMessage.recordEvent();
}

public long getExceedTTLDelayedMessages() {
return this.exceedTTLDelayedMessage.getTotalCount();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,9 @@ public class TopicStatsImpl implements TopicStats {
/** The last publish timestamp in epoch milliseconds. */
public long lastPublishTimeStamp;

/** The number of delay messages that exceed TTL delay. */
public long exceedTTLDelayedMessages;

public List<? extends PublisherStats> getPublishers() {
return Stream.concat(publishers.stream().sorted(
Comparator.comparing(PublisherStatsImpl::getProducerName, nullsLast(naturalOrder()))),
Expand Down Expand Up @@ -264,6 +267,7 @@ public void reset() {
this.oldestBacklogMessageSubscriptionName = null;
this.topicCreationTimeStamp = 0;
this.lastPublishTimeStamp = 0;
this.exceedTTLDelayedMessages = 0;
}

// if the stats are added for the 1st time, we will need to make a copy of these stats and add it to the current
Expand Down Expand Up @@ -295,6 +299,7 @@ public TopicStatsImpl add(TopicStats ts) {
this.committedTxnCount = stats.committedTxnCount;
this.backlogQuotaLimitTime = stats.backlogQuotaLimitTime;
this.backlogQuotaLimitSize = stats.backlogQuotaLimitSize;
this.exceedTTLDelayedMessages += stats.exceedTTLDelayedMessages;
if (stats.oldestBacklogMessageAgeSeconds > this.oldestBacklogMessageAgeSeconds) {
this.oldestBacklogMessageAgeSeconds = stats.oldestBacklogMessageAgeSeconds;
this.oldestBacklogMessageSubscriptionName = stats.oldestBacklogMessageSubscriptionName;
Expand Down