Skip to content

Commit 25de320

Browse files
KAFKA-19518 Remove the usage of KafkaMetricsGroup(Class<?> klass) (#20399)
the constructor is error-prone when migrating code, since metrics could get unintentionally changed. We should remove the constructor and use constant strings instead to avoid issues like KAFKA-17876, KAFKA-19150, and others. Reviewers: Ken Huang <[email protected]>, Jhen-Yung Hsu <[email protected]>, KuoChe <[email protected]>, Chia-Ping Tsai <[email protected]>
1 parent efc3773 commit 25de320

File tree

20 files changed

+99
-33
lines changed

20 files changed

+99
-33
lines changed

core/src/main/scala/kafka/cluster/Partition.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,10 @@ class DelayedOperations(topicId: Option[Uuid],
116116
}
117117

118118
object Partition {
119-
private val metricsGroup = new KafkaMetricsGroup(classOf[Partition])
119+
// Changing the package or class name may cause incompatibility with existing code and metrics configuration
120+
private val metricsPackage = "kafka.cluster"
121+
private val metricsClassName = "Partition"
122+
private val metricsGroup = new KafkaMetricsGroup(metricsPackage, metricsClassName)
120123

121124
def apply(topicIdPartition: TopicIdPartition,
122125
time: Time,

core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -166,8 +166,10 @@ class TransactionMarkerChannelManager(
166166
time: Time
167167
) extends InterBrokerSendThread("TxnMarkerSenderThread-" + config.brokerId, networkClient, config.requestTimeoutMs, time)
168168
with Logging {
169-
170-
private val metricsGroup = new KafkaMetricsGroup(this.getClass)
169+
// Changing the package or class name may cause incompatibility with existing code and metrics configuration
170+
private val metricsPackage = "kafka.coordinator.transaction"
171+
private val metricsClassName = "TransactionMarkerChannelManager"
172+
private val metricsGroup = new KafkaMetricsGroup(metricsPackage, metricsClassName)
171173

172174
this.logIdent = "[Transaction Marker Channel Manager " + config.brokerId + "]: "
173175

core/src/main/scala/kafka/log/LogManager.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,10 @@ class LogManager(logDirs: Seq[File],
8181
cleanerFactory: (CleanerConfig, util.List[File], ConcurrentMap[TopicPartition, UnifiedLog], LogDirFailureChannel, Time) => LogCleaner =
8282
(cleanerConfig, files, map, logDirFailureChannel, time) => new LogCleaner(cleanerConfig, files, map, logDirFailureChannel, time)
8383
) extends Logging {
84-
85-
private val metricsGroup = new KafkaMetricsGroup(this.getClass)
84+
// Changing the package or class name may cause incompatibility with existing code and metrics configuration
85+
private val metricsPackage = "kafka.log"
86+
private val metricsClassName = "LogManager"
87+
private val metricsGroup = new KafkaMetricsGroup(metricsPackage, metricsClassName)
8688

8789
private val logCreationOrDeletionLock = new Object
8890
private val currentLogs = new util.concurrent.ConcurrentHashMap[TopicPartition, UnifiedLog]()

core/src/main/scala/kafka/network/RequestChannel.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -346,7 +346,10 @@ class RequestChannel(val queueSize: Int,
346346
val metrics: RequestChannelMetrics) {
347347
import RequestChannel._
348348

349-
private val metricsGroup = new KafkaMetricsGroup(this.getClass)
349+
// Changing the package or class name may cause incompatibility with existing code and metrics configuration
350+
private val metricsPackage = "kafka.network"
351+
private val metricsClassName = "RequestChannel"
352+
private val metricsGroup = new KafkaMetricsGroup(metricsPackage, metricsClassName)
350353

351354
private val requestQueue = new ArrayBlockingQueue[BaseRequest](queueSize)
352355
private val processors = new ConcurrentHashMap[Int, Processor]()

core/src/main/scala/kafka/network/SocketServer.scala

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,10 @@ class SocketServer(
7878
val socketFactory: ServerSocketFactory = ServerSocketFactory.INSTANCE,
7979
val connectionDisconnectListeners: Seq[ConnectionDisconnectListener] = Seq.empty
8080
) extends Logging with BrokerReconfigurable {
81-
82-
private val metricsGroup = new KafkaMetricsGroup(this.getClass)
81+
// Changing the package or class name may cause incompatibility with existing code and metrics configuration
82+
private val metricsPackage = "kafka.network"
83+
private val metricsClassName = "SocketServer"
84+
private val metricsGroup = new KafkaMetricsGroup(metricsPackage, metricsClassName)
8385

8486
private val maxQueuedRequests = config.queuedMaxRequests
8587

@@ -485,9 +487,6 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer,
485487
memoryPool: MemoryPool,
486488
apiVersionManager: ApiVersionManager)
487489
extends Runnable with Logging {
488-
489-
private val metricsGroup = new KafkaMetricsGroup(this.getClass)
490-
491490
val shouldRun = new AtomicBoolean(true)
492491

493492
private val sendBufferSize = config.socketSendBufferBytes
@@ -516,7 +515,7 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer,
516515
private val blockedPercentMeterMetricName = backwardCompatibilityMetricGroup.metricName(
517516
"AcceptorBlockedPercent",
518517
Map(ListenerMetricTag -> endPoint.listener).asJava)
519-
private val blockedPercentMeter = metricsGroup.newMeter(blockedPercentMeterMetricName,"blocked time", TimeUnit.NANOSECONDS)
518+
private val blockedPercentMeter = backwardCompatibilityMetricGroup.newMeter(blockedPercentMeterMetricName,"blocked time", TimeUnit.NANOSECONDS)
520519
private var currentProcessorIndex = 0
521520
private[network] val throttledSockets = new mutable.PriorityQueue[DelayedCloseSocket]()
522521
private val started = new AtomicBoolean()
@@ -834,7 +833,9 @@ private[kafka] class Processor(
834833
threadName: String,
835834
connectionDisconnectListeners: Seq[ConnectionDisconnectListener]
836835
) extends Runnable with Logging {
837-
private val metricsGroup = new KafkaMetricsGroup(this.getClass)
836+
private val metricsPackage = "kafka.server"
837+
private val metricsClassName = "Processor"
838+
private val metricsGroup = new KafkaMetricsGroup(metricsPackage, metricsClassName)
838839

839840
val shouldRun: AtomicBoolean = new AtomicBoolean(true)
840841
private val started: AtomicBoolean = new AtomicBoolean()

core/src/main/scala/kafka/server/AbstractFetcherManager.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,10 @@ import scala.jdk.OptionConverters._
3030

3131
abstract class AbstractFetcherManager[T <: AbstractFetcherThread](val name: String, clientId: String, numFetchers: Int)
3232
extends Logging {
33-
private val metricsGroup = new KafkaMetricsGroup(this.getClass)
33+
// Changing the package or class name may cause incompatibility with existing code and metrics configuration
34+
private val metricsPackage = "kafka.server"
35+
private val metricsClassName = this.getClass.getSimpleName
36+
private val metricsGroup = new KafkaMetricsGroup(metricsPackage, metricsClassName)
3437

3538
// map of (source broker_id, fetcher_id per source broker) => fetcher.
3639
// package private for test

core/src/main/scala/kafka/server/AbstractFetcherThread.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -888,7 +888,10 @@ object FetcherMetrics {
888888
}
889889

890890
class FetcherLagMetrics(metricId: ClientIdTopicPartition) {
891-
private val metricsGroup = new KafkaMetricsGroup(this.getClass)
891+
// Changing the package or class name may cause incompatibility with existing code and metrics configuration
892+
private val metricsPackage = "kafka.server"
893+
private val metricsClassName = "FetcherLagMetrics"
894+
private val metricsGroup = new KafkaMetricsGroup(metricsPackage, metricsClassName)
892895

893896
private[this] val lagVal = new AtomicLong(-1L)
894897
private[this] val tags = Map(
@@ -927,7 +930,10 @@ class FetcherLagStats(metricId: ClientIdAndBroker) {
927930
}
928931

929932
class FetcherStats(metricId: ClientIdAndBroker) {
930-
private val metricsGroup = new KafkaMetricsGroup(this.getClass)
933+
// Changing the package or class name may cause incompatibility with existing code and metrics configuration
934+
private val metricsPackage = "kafka.server"
935+
private val metricsClassName = "FetcherStats"
936+
private val metricsGroup = new KafkaMetricsGroup(metricsPackage, metricsClassName)
931937

932938
val tags: util.Map[String, String] = Map("clientId" -> metricId.clientId,
933939
"brokerHost" -> metricId.brokerHost,

core/src/main/scala/kafka/server/ControllerServer.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,10 @@ class ControllerServer(
7171

7272
import kafka.server.Server._
7373

74-
private val metricsGroup = new KafkaMetricsGroup(this.getClass)
74+
// Changing the package or class name may cause incompatibility with existing code and metrics configuration
75+
private val metricsPackage = "kafka.server"
76+
private val metricsClassName = "ControllerServer"
77+
private val metricsGroup = new KafkaMetricsGroup(metricsPackage, metricsClassName)
7578

7679
val config = sharedServer.controllerConfig
7780
val logContext = new LogContext(s"[ControllerServer id=${config.nodeId}] ")

core/src/main/scala/kafka/server/DelayedFetch.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,10 @@ class DelayedFetch(
187187
}
188188

189189
object DelayedFetchMetrics {
190-
private val metricsGroup = new KafkaMetricsGroup(DelayedFetchMetrics.getClass)
190+
// Changing the package or class name may cause incompatibility with existing code and metrics configuration
191+
private val metricsPackage = "kafka.server"
192+
private val metricsClassName = "DelayedFetchMetrics"
193+
private val metricsGroup = new KafkaMetricsGroup(metricsPackage, metricsClassName)
191194
private val FetcherTypeKey = "fetcherType"
192195
val followerExpiredRequestMeter: Meter = metricsGroup.newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS, Map(FetcherTypeKey -> "follower").asJava)
193196
val consumerExpiredRequestMeter: Meter = metricsGroup.newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS, Map(FetcherTypeKey -> "consumer").asJava)

core/src/main/scala/kafka/server/DelayedProduce.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,10 @@ class DelayedProduce(delayMs: Long,
135135
}
136136

137137
object DelayedProduceMetrics {
138-
private val metricsGroup = new KafkaMetricsGroup(DelayedProduceMetrics.getClass)
138+
// Changing the package or class name may cause incompatibility with existing code and metrics configuration
139+
private val metricsPackage = "kafka.server"
140+
private val metricsClassName = "DelayedProduceMetrics"
141+
private val metricsGroup = new KafkaMetricsGroup(metricsPackage, metricsClassName)
139142

140143
private val aggregateExpirationMeter = metricsGroup.newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS)
141144

0 commit comments

Comments
 (0)