Skip to content

Commit 3076293

Browse files
authored
Add Method to Extract Undecorated Topic from Datastream Metadata for Accurate SLA Latency Reporting (#984)
* Add functionality to send undecorated topic to support any type of topic names for reporting latency metrics * Not required anymore
1 parent f4b21d6 commit 3076293

File tree

3 files changed

+15
-3
lines changed

3 files changed

+15
-3
lines changed

datastream-server-api/src/main/java/com/linkedin/datastream/server/api/transport/DatastreamRecordMetadata.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,17 @@ public String getTopic() {
123123
return _topic;
124124
}
125125

126+
/**
127+
* Gets the topic name without any decorations or modifications.
128+
* This base implementation simply returns the topic name as is.
129+
* Subclasses may override this method to provide custom topic name undecorating logic.
130+
* @return The undecorated topic name
131+
*/
132+
public String getUndecoratedTopic() {
133+
// It is kept for backward compatibility.
134+
return _topic;
135+
}
136+
126137
/**
127138
* An index identifying the exact {@link com.linkedin.datastream.common.BrooklinEnvelope} event produced,
128139
* from those obtainable through {@link com.linkedin.datastream.server.DatastreamProducerRecord#getEvents()}

datastream-server/src/main/java/com/linkedin/datastream/server/EventProducer.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -337,7 +337,7 @@ private void performSlaRelatedLogging(DatastreamRecordMetadata metadata, long ev
337337
if (_numEventsOutsideAltSlaLogEnabled) {
338338
try {
339339
if (sourceToDestinationLatencyMs > _availabilityThresholdAlternateSlaMs) {
340-
TopicPartition topicPartition = new TopicPartition(metadata.getTopic(), metadata.getSourcePartition());
340+
TopicPartition topicPartition = new TopicPartition(metadata.getUndecoratedTopic(), metadata.getSourcePartition());
341341
int numEvents = _trackEventsOutsideAltSlaMap.getOrDefault(topicPartition, 0);
342342
_trackEventsOutsideAltSlaMap.put(topicPartition, numEvents + 1);
343343
}
@@ -506,7 +506,8 @@ private void onSendCallback(DatastreamRecordMetadata metadata, Exception excepti
506506
// Report metrics
507507
checkpoint(metadata.getPartition(), metadata.getCheckpoint());
508508
// Reporting separate metrics for throughput violating topics.
509-
if (_throughputViolatingTopicsProvider.apply(_datastreamTask).contains(metadata.getTopic())) {
509+
510+
if (_throughputViolatingTopicsProvider.apply(_datastreamTask).contains(metadata.getUndecoratedTopic())) {
510511
reportMetricsForThroughputViolatingTopics(metadata, eventSourceTimestamp, eventSendTimestamp);
511512
} else {
512513
reportMetrics(metadata, eventSourceTimestamp, eventSendTimestamp);

gradle/maven.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
allprojects {
2-
version = "5.5.5"
2+
version = "5.5.6"
33
}
44

55
subprojects {

0 commit comments

Comments
 (0)