@@ -1099,16 +1099,29 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
1099
1099
@ ParameterizedTest (name = TestInfoUtils .TestWithParameterizedGroupProtocolNames )
1100
1100
@ MethodSource (Array (" getTestGroupProtocolParametersAll" ))
1101
1101
def testServersCanStartWithInvalidStaticConfigsAndValidDynamicConfigs (groupProtocol : String ): Unit = {
1102
+ TestNumReplicaFetcherMetricsReporter .testReporters.clear()
1103
+
1102
1104
// modify snapshot interval config to explicitly take snapshot on a broker with valid dynamic configs
1103
1105
val props = defaultStaticConfig(numServers)
1104
1106
props.put(MetadataLogConfig .METADATA_SNAPSHOT_MAX_INTERVAL_MS_CONFIG , " 10000" )
1107
+ props.put(MetricConfigs .METRIC_REPORTER_CLASSES_CONFIG , classOf [TestNumReplicaFetcherMetricsReporter ].getName)
1108
+ props.put(ReplicationConfigs .NUM_REPLICA_FETCHERS_CONFIG , " 1" )
1105
1109
1106
1110
val kafkaConfig = KafkaConfig .fromProps(props)
1107
1111
val newBroker = createBroker(kafkaConfig).asInstanceOf [BrokerServer ]
1108
1112
servers += newBroker
1109
1113
1110
1114
alterSslKeystoreUsingConfigCommand(sslProperties1, listenerPrefix(SecureExternal ))
1111
1115
1116
+ // Add num.replica.fetchers to the cluster-level config.
1117
+ val clusterLevelProps = new Properties
1118
+ clusterLevelProps.put(ReplicationConfigs .NUM_REPLICA_FETCHERS_CONFIG , " 2" )
1119
+ reconfigureServers(clusterLevelProps, perBrokerConfig = false , (ReplicationConfigs .NUM_REPLICA_FETCHERS_CONFIG , " 2" ))
1120
+
1121
+ // Wait for the metrics reporter to be configured
1122
+ val initialReporter = TestNumReplicaFetcherMetricsReporter .waitForReporters(1 ).head
1123
+ initialReporter.verifyState(reconfigureCount = 1 , numFetcher = 2 )
1124
+
1112
1125
TestUtils .ensureConsistentKRaftMetadata(servers, controllerServer)
1113
1126
1114
1127
TestUtils .waitUntilTrue(
@@ -1121,11 +1134,19 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
1121
1134
newBroker.shutdown()
1122
1135
newBroker.awaitShutdown()
1123
1136
1137
+ // Clean up the test reporter
1138
+ TestNumReplicaFetcherMetricsReporter .testReporters.clear()
1139
+
1124
1140
val invalidStaticConfigs = defaultStaticConfig(newBroker.config.brokerId)
1125
1141
invalidStaticConfigs.putAll(securityProps(invalidSslConfigs, KEYSTORE_PROPS , listenerPrefix(SecureExternal )))
1126
1142
newBroker.config.updateCurrentConfig(KafkaConfig .fromProps(invalidStaticConfigs))
1127
1143
1128
1144
newBroker.startup()
1145
+
1146
+ // Verify that the custom MetricsReporter is not reconfigured after restart.
1147
+ // If readDynamicBrokerConfigsFromSnapshot works correctly, the reporter should maintain its state.
1148
+ val reporterAfterRestart = TestNumReplicaFetcherMetricsReporter .waitForReporters(1 ).head
1149
+ reporterAfterRestart.verifyState(reconfigureCount = 0 , numFetcher = 2 )
1129
1150
}
1130
1151
1131
1152
private def awaitInitialPositions (consumer : Consumer [_, _]): Unit = {
@@ -1634,6 +1655,64 @@ class TestMetricsReporter extends MetricsReporter with Reconfigurable with Close
1634
1655
}
1635
1656
}
1636
1657
1658
+ object TestNumReplicaFetcherMetricsReporter {
1659
+ val testReporters = new ConcurrentLinkedQueue [TestNumReplicaFetcherMetricsReporter ]()
1660
+
1661
+ def waitForReporters (count : Int ): List [TestNumReplicaFetcherMetricsReporter ] = {
1662
+ TestUtils .waitUntilTrue(() => testReporters.size == count, msg = " Metrics reporters size not matched. Expected: " + count + " , actual: " + testReporters.size())
1663
+
1664
+ val reporters = testReporters.asScala.toList
1665
+ TestUtils .waitUntilTrue(() => reporters.forall(_.configureCount == 1 ), msg = " Metrics reporters not configured" )
1666
+ reporters
1667
+ }
1668
+ }
1669
+
1670
+
1671
+ class TestNumReplicaFetcherMetricsReporter extends MetricsReporter {
1672
+ import TestNumReplicaFetcherMetricsReporter ._
1673
+ @ volatile var configureCount = 0
1674
+ @ volatile var reconfigureCount = 0
1675
+ @ volatile var numFetchers : Int = 1
1676
+ testReporters.add(this )
1677
+
1678
+ override def init (metrics : util.List [KafkaMetric ]): Unit = {
1679
+ }
1680
+
1681
+ override def configure (configs : util.Map [String , _]): Unit = {
1682
+ configureCount += 1
1683
+ numFetchers = configs.get(ReplicationConfigs .NUM_REPLICA_FETCHERS_CONFIG ).toString.toInt
1684
+ }
1685
+
1686
+ override def metricChange (metric : KafkaMetric ): Unit = {
1687
+ }
1688
+
1689
+ override def metricRemoval (metric : KafkaMetric ): Unit = {
1690
+ }
1691
+
1692
+ override def reconfigurableConfigs (): util.Set [String ] = {
1693
+ util.Set .of(ReplicationConfigs .NUM_REPLICA_FETCHERS_CONFIG )
1694
+ }
1695
+
1696
+ override def validateReconfiguration (configs : util.Map [String , _]): Unit = {
1697
+ val numFetchers = configs.get(ReplicationConfigs .NUM_REPLICA_FETCHERS_CONFIG ).toString.toInt
1698
+ if (numFetchers <= 0 )
1699
+ throw new ConfigException (s " Invalid num.replica.fetchers $numFetchers" )
1700
+ }
1701
+
1702
+ override def reconfigure (configs : util.Map [String , _]): Unit = {
1703
+ reconfigureCount += 1
1704
+ numFetchers = configs.get(ReplicationConfigs .NUM_REPLICA_FETCHERS_CONFIG ).toString.toInt
1705
+ }
1706
+
1707
+ override def close (): Unit = {
1708
+ }
1709
+
1710
+ def verifyState (reconfigureCount : Int , numFetcher : Int = 1 ): Unit = {
1711
+ assertEquals(reconfigureCount, this .reconfigureCount)
1712
+ assertEquals(numFetcher, this .numFetchers)
1713
+ }
1714
+ }
1715
+
1637
1716
1638
1717
class MockFileConfigProvider extends FileConfigProvider {
1639
1718
@ throws(classOf [IOException ])
0 commit comments