Skip to content

Commit 15ec053

Browse files
authored
KAFKA-18656 Backport KAFKA-18597 to 4.0 (#20026)
Backport of [KAFKA-18597](#18627) to the 4.0 branch. Reviewers: Chia-Ping Tsai <[email protected]>
1 parent 46e843d commit 15ec053

File tree

2 files changed

+153
-34
lines changed

2 files changed

+153
-34
lines changed

core/src/main/scala/kafka/log/LogCleaner.scala

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -117,14 +117,14 @@ class LogCleaner(initialConfig: CleanerConfig,
117117

118118
/**
119119
* @param f to compute the result
120-
* @return the max value (int value) or 0 if there is no cleaner
120+
* @return the max value or 0 if there is no cleaner
121121
*/
122-
private[log] def maxOverCleanerThreads(f: CleanerThread => Double): Int =
123-
cleaners.map(f).maxOption.getOrElse(0.0d).toInt
122+
private[log] def maxOverCleanerThreads(f: CleanerThread => Double): Double =
123+
cleaners.map(f).maxOption.getOrElse(0.0d)
124124

125125
/* a metric to track the maximum utilization of any thread's buffer in the last cleaning */
126126
metricsGroup.newGauge(MaxBufferUtilizationPercentMetricName,
127-
() => maxOverCleanerThreads(_.lastStats.bufferUtilization) * 100)
127+
() => (maxOverCleanerThreads(_.lastStats.bufferUtilization) * 100).toInt)
128128

129129
/* a metric to track the recopy rate of each thread's last cleaning */
130130
metricsGroup.newGauge(CleanerRecopyPercentMetricName, () => {
@@ -134,12 +134,12 @@ class LogCleaner(initialConfig: CleanerConfig,
134134
})
135135

136136
/* a metric to track the maximum cleaning time for the last cleaning from each thread */
137-
metricsGroup.newGauge(MaxCleanTimeMetricName, () => maxOverCleanerThreads(_.lastStats.elapsedSecs))
137+
metricsGroup.newGauge(MaxCleanTimeMetricName, () => maxOverCleanerThreads(_.lastStats.elapsedSecs).toInt)
138138

139139
// a metric to track delay between the time when a log is required to be compacted
140140
// as determined by max compaction lag and the time of last cleaner run.
141141
metricsGroup.newGauge(MaxCompactionDelayMetricsName,
142-
() => maxOverCleanerThreads(_.lastPreCleanStats.maxCompactionDelayMs.toDouble) / 1000)
142+
() => (maxOverCleanerThreads(_.lastPreCleanStats.maxCompactionDelayMs.toDouble) / 1000).toInt)
143143

144144
metricsGroup.newGauge(DeadThreadCountMetricName, () => deadThreadCount)
145145

@@ -523,10 +523,11 @@ object LogCleaner {
523523

524524
}
525525

526-
private val MaxBufferUtilizationPercentMetricName = "max-buffer-utilization-percent"
526+
// Visible for test.
527+
private[log] val MaxBufferUtilizationPercentMetricName = "max-buffer-utilization-percent"
527528
private val CleanerRecopyPercentMetricName = "cleaner-recopy-percent"
528-
private val MaxCleanTimeMetricName = "max-clean-time-secs"
529-
private val MaxCompactionDelayMetricsName = "max-compaction-delay-secs"
529+
private[log] val MaxCleanTimeMetricName = "max-clean-time-secs"
530+
private[log] val MaxCompactionDelayMetricsName = "max-compaction-delay-secs"
530531
private val DeadThreadCountMetricName = "DeadThreadCount"
531532
// package private for testing
532533
private[log] val MetricNames = Set(

core/src/test/scala/unit/kafka/log/LogCleanerTest.scala

Lines changed: 143 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package kafka.log
1919

2020
import kafka.common._
21+
import kafka.log.LogCleaner.{MaxBufferUtilizationPercentMetricName, MaxCleanTimeMetricName, MaxCompactionDelayMetricsName}
2122
import kafka.server.KafkaConfig
2223
import kafka.utils.{CoreUtils, Logging, Pool, TestUtils}
2324
import org.apache.kafka.common.TopicPartition
@@ -2048,42 +2049,159 @@ class LogCleanerTest extends Logging {
20482049
}
20492050

20502051
@Test
2051-
def testMaxOverCleanerThreads(): Unit = {
2052-
val logCleaner = new LogCleaner(new CleanerConfig(true),
2052+
def testMaxBufferUtilizationPercentMetric(): Unit = {
2053+
val logCleaner = new LogCleaner(
2054+
new CleanerConfig(true),
20532055
logDirs = Array(TestUtils.tempDir(), TestUtils.tempDir()),
20542056
logs = new Pool[TopicPartition, UnifiedLog](),
20552057
logDirFailureChannel = new LogDirFailureChannel(1),
2056-
time = time)
2058+
time = time
2059+
)
2060+
2061+
def assertMaxBufferUtilizationPercent(expected: Int): Unit = {
2062+
val gauge = logCleaner.metricsGroup.newGauge(MaxBufferUtilizationPercentMetricName,
2063+
() => (logCleaner.maxOverCleanerThreads(_.lastStats.bufferUtilization) * 100).toInt)
2064+
assertEquals(expected, gauge.value())
2065+
}
2066+
2067+
try {
2068+
// No CleanerThreads
2069+
assertMaxBufferUtilizationPercent(0)
2070+
2071+
val cleaners = logCleaner.cleaners
2072+
2073+
val cleaner1 = new logCleaner.CleanerThread(1)
2074+
cleaner1.lastStats = new CleanerStats(time)
2075+
cleaner1.lastStats.bufferUtilization = 0.75
2076+
cleaners += cleaner1
2077+
2078+
val cleaner2 = new logCleaner.CleanerThread(2)
2079+
cleaner2.lastStats = new CleanerStats(time)
2080+
cleaner2.lastStats.bufferUtilization = 0.85
2081+
cleaners += cleaner2
2082+
2083+
val cleaner3 = new logCleaner.CleanerThread(3)
2084+
cleaner3.lastStats = new CleanerStats(time)
2085+
cleaner3.lastStats.bufferUtilization = 0.65
2086+
cleaners += cleaner3
2087+
2088+
// expect the gauge value to reflect the maximum bufferUtilization
2089+
assertMaxBufferUtilizationPercent(85)
2090+
2091+
// Update bufferUtilization and verify the gauge value updates
2092+
cleaner1.lastStats.bufferUtilization = 0.9
2093+
assertMaxBufferUtilizationPercent(90)
2094+
2095+
// All CleanerThreads have the same bufferUtilization
2096+
cleaners.foreach(_.lastStats.bufferUtilization = 0.5)
2097+
assertMaxBufferUtilizationPercent(50)
2098+
} finally {
2099+
logCleaner.shutdown()
2100+
}
2101+
}
2102+
2103+
@Test
2104+
def testMaxCleanTimeMetric(): Unit = {
2105+
val logCleaner = new LogCleaner(
2106+
new CleanerConfig(true),
2107+
logDirs = Array(TestUtils.tempDir(), TestUtils.tempDir()),
2108+
logs = new Pool[TopicPartition, UnifiedLog](),
2109+
logDirFailureChannel = new LogDirFailureChannel(1),
2110+
time = time
2111+
)
2112+
2113+
def assertMaxCleanTime(expected: Int): Unit = {
2114+
val gauge = logCleaner.metricsGroup.newGauge(MaxCleanTimeMetricName,
2115+
() => logCleaner.maxOverCleanerThreads(_.lastStats.elapsedSecs).toInt)
2116+
assertEquals(expected, gauge.value())
2117+
}
20572118

2058-
val cleaners = logCleaner.cleaners
2119+
try {
2120+
// No CleanerThreads
2121+
assertMaxCleanTime(0)
20592122

2060-
val cleaner1 = new logCleaner.CleanerThread(1)
2061-
cleaner1.lastStats = new CleanerStats(time)
2062-
cleaner1.lastStats.bufferUtilization = 0.75
2063-
cleaners += cleaner1
2123+
val cleaners = logCleaner.cleaners
20642124

2065-
val cleaner2 = new logCleaner.CleanerThread(2)
2066-
cleaner2.lastStats = new CleanerStats(time)
2067-
cleaner2.lastStats.bufferUtilization = 0.85
2068-
cleaners += cleaner2
2125+
val cleaner1 = new logCleaner.CleanerThread(1)
2126+
cleaner1.lastStats = new CleanerStats(time)
2127+
cleaner1.lastStats.endTime = cleaner1.lastStats.startTime + 1_000L
2128+
cleaners += cleaner1
20692129

2070-
val cleaner3 = new logCleaner.CleanerThread(3)
2071-
cleaner3.lastStats = new CleanerStats(time)
2072-
cleaner3.lastStats.bufferUtilization = 0.65
2073-
cleaners += cleaner3
2130+
val cleaner2 = new logCleaner.CleanerThread(2)
2131+
cleaner2.lastStats = new CleanerStats(time)
2132+
cleaner2.lastStats.endTime = cleaner2.lastStats.startTime + 2_000L
2133+
cleaners += cleaner2
20742134

2075-
assertEquals(0, logCleaner.maxOverCleanerThreads(_.lastStats.bufferUtilization))
2135+
val cleaner3 = new logCleaner.CleanerThread(3)
2136+
cleaner3.lastStats = new CleanerStats(time)
2137+
cleaner3.lastStats.endTime = cleaner3.lastStats.startTime + 3_000L
2138+
cleaners += cleaner3
20762139

2077-
cleaners.clear()
2140+
// expect the gauge value to reflect the maximum cleanTime
2141+
assertMaxCleanTime(3)
20782142

2079-
cleaner1.lastStats.bufferUtilization = 5d
2080-
cleaners += cleaner1
2081-
cleaner2.lastStats.bufferUtilization = 6d
2082-
cleaners += cleaner2
2083-
cleaner3.lastStats.bufferUtilization = 7d
2084-
cleaners += cleaner3
2143+
// Update cleanTime and verify the gauge value updates
2144+
cleaner1.lastStats.endTime = cleaner1.lastStats.startTime + 4_000L
2145+
assertMaxCleanTime(4)
20852146

2086-
assertEquals(7, logCleaner.maxOverCleanerThreads(_.lastStats.bufferUtilization))
2147+
// All CleanerThreads have the same cleanTime
2148+
cleaners.foreach(cleaner => cleaner.lastStats.endTime = cleaner.lastStats.startTime + 1_500L)
2149+
assertMaxCleanTime(1)
2150+
} finally {
2151+
logCleaner.shutdown()
2152+
}
2153+
}
2154+
2155+
@Test
2156+
def testMaxCompactionDelayMetrics(): Unit = {
2157+
val logCleaner = new LogCleaner(
2158+
new CleanerConfig(true),
2159+
logDirs = Array(TestUtils.tempDir(), TestUtils.tempDir()),
2160+
logs = new Pool[TopicPartition, UnifiedLog](),
2161+
logDirFailureChannel = new LogDirFailureChannel(1),
2162+
time = time
2163+
)
2164+
2165+
def assertMaxCompactionDelay(expected: Int): Unit = {
2166+
val gauge = logCleaner.metricsGroup.newGauge(MaxCompactionDelayMetricsName,
2167+
() => (logCleaner.maxOverCleanerThreads(_.lastPreCleanStats.maxCompactionDelayMs.toDouble) / 1000).toInt)
2168+
assertEquals(expected, gauge.value())
2169+
}
2170+
2171+
try {
2172+
// No CleanerThreads
2173+
assertMaxCompactionDelay(0)
2174+
2175+
val cleaners = logCleaner.cleaners
2176+
2177+
val cleaner1 = new logCleaner.CleanerThread(1)
2178+
cleaner1.lastStats = new CleanerStats(time)
2179+
cleaner1.lastPreCleanStats.maxCompactionDelayMs = 1_000L
2180+
cleaners += cleaner1
2181+
2182+
val cleaner2 = new logCleaner.CleanerThread(2)
2183+
cleaner2.lastStats = new CleanerStats(time)
2184+
cleaner2.lastPreCleanStats.maxCompactionDelayMs = 2_000L
2185+
cleaners += cleaner2
2186+
2187+
val cleaner3 = new logCleaner.CleanerThread(3)
2188+
cleaner3.lastStats = new CleanerStats(time)
2189+
cleaner3.lastPreCleanStats.maxCompactionDelayMs = 3_000L
2190+
cleaners += cleaner3
2191+
2192+
// expect the gauge value to reflect the maximum CompactionDelay
2193+
assertMaxCompactionDelay(3)
2194+
2195+
// Update CompactionDelay and verify the gauge value updates
2196+
cleaner1.lastPreCleanStats.maxCompactionDelayMs = 4_000L
2197+
assertMaxCompactionDelay(4)
2198+
2199+
// All CleanerThreads have the same CompactionDelay
2200+
cleaners.foreach(_.lastPreCleanStats.maxCompactionDelayMs = 1_500L)
2201+
assertMaxCompactionDelay(1)
2202+
} finally {
2203+
logCleaner.shutdown()
2204+
}
20872205
}
20882206

20892207
private def writeToLog(log: UnifiedLog, keysAndValues: Iterable[(Int, Int)], offsetSeq: Iterable[Long]): Iterable[Long] = {

0 commit comments

Comments
 (0)