Skip to content

Commit 6d68f8a

Browse files
authored
MINOR: Move BrokerReconfigurable to the sever-common module (apache#19383)
This patch moves `BrokerReconfigurable` to the `server-common module` and decouples the `TransactionLogConfig` and `KafkaConfig` to unblock KAFKA-14485. Reviewers: PoAn Yang <[email protected]>, TaiJuWu <[email protected]>, Chia-Ping Tsai <[email protected]>
1 parent e69a311 commit 6d68f8a

File tree

13 files changed

+48
-35
lines changed

13 files changed

+48
-35
lines changed

checkstyle/import-control-server-common.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
<disallow pkg="kafka" />
3939

4040
<!-- anyone can use public classes -->
41-
<allow pkg="org.apache.kafka.common" exact-match="true" />
41+
<allow pkg="org.apache.kafka.common" />
4242
<allow pkg="org.apache.kafka.common.security" />
4343
<allow pkg="org.apache.kafka.common.serialization" />
4444
<allow pkg="org.apache.kafka.common.utils" />

checkstyle/import-control-server.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
<allow pkg="org.apache.kafka.metadata" />
6060

6161
<!-- utilities and reusable classes from server-common -->
62+
<allow pkg="org.apache.kafka.config"/>
6263
<allow pkg="org.apache.kafka.queue" />
6364
<allow pkg="org.apache.kafka.security" />
6465
<allow pkg="org.apache.kafka.server.common" />

core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import org.apache.kafka.common.protocol.Errors
2727
import org.apache.kafka.common.record.RecordBatch
2828
import org.apache.kafka.common.requests.{AddPartitionsToTxnResponse, TransactionResult}
2929
import org.apache.kafka.common.utils.{LogContext, ProducerIdAndEpoch, Time}
30-
import org.apache.kafka.coordinator.transaction.ProducerIdManager
30+
import org.apache.kafka.coordinator.transaction.{ProducerIdManager, TransactionLogConfig}
3131
import org.apache.kafka.metadata.MetadataCache
3232
import org.apache.kafka.server.common.{RequestLocal, TransactionVersion}
3333
import org.apache.kafka.server.util.Scheduler
@@ -46,13 +46,14 @@ object TransactionCoordinator {
4646
metadataCache: MetadataCache,
4747
time: Time): TransactionCoordinator = {
4848

49+
val transactionLogConfig = new TransactionLogConfig(config)
4950
val txnConfig = TransactionConfig(config.transactionStateManagerConfig.transactionalIdExpirationMs,
5051
config.transactionStateManagerConfig.transactionMaxTimeoutMs,
51-
config.transactionLogConfig.transactionTopicPartitions,
52-
config.transactionLogConfig.transactionTopicReplicationFactor,
53-
config.transactionLogConfig.transactionTopicSegmentBytes,
54-
config.transactionLogConfig.transactionLoadBufferSize,
55-
config.transactionLogConfig.transactionTopicMinISR,
52+
transactionLogConfig.transactionTopicPartitions,
53+
transactionLogConfig.transactionTopicReplicationFactor,
54+
transactionLogConfig.transactionTopicSegmentBytes,
55+
transactionLogConfig.transactionLoadBufferSize,
56+
transactionLogConfig.transactionTopicMinISR,
5657
config.transactionStateManagerConfig.transactionAbortTimedOutTransactionCleanupIntervalMs,
5758
config.transactionStateManagerConfig.transactionRemoveExpiredTransactionalIdCleanupIntervalMs,
5859
config.transactionStateManagerConfig.transaction2PCEnabled,

core/src/main/scala/kafka/log/LogManager.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import kafka.utils.{CoreUtils, Logging, Pool}
2929
import org.apache.kafka.common.{DirectoryId, KafkaException, TopicPartition, Uuid}
3030
import org.apache.kafka.common.utils.{Exit, KafkaThread, Time, Utils}
3131
import org.apache.kafka.common.errors.{InconsistentTopicIdException, KafkaStorageException, LogDirNotFoundException}
32+
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
3233

3334
import scala.jdk.CollectionConverters._
3435
import scala.collection._
@@ -1548,6 +1549,7 @@ object LogManager {
15481549
val defaultLogConfig = new LogConfig(defaultProps)
15491550

15501551
val cleanerConfig = LogCleaner.cleanerConfig(config)
1552+
val transactionLogConfig = new TransactionLogConfig(config)
15511553

15521554
new LogManager(logDirs = config.logDirs.map(new File(_).getAbsoluteFile),
15531555
initialOfflineDirs = initialOfflineDirs.map(new File(_).getAbsoluteFile),
@@ -1560,8 +1562,8 @@ object LogManager {
15601562
flushStartOffsetCheckpointMs = config.logFlushStartOffsetCheckpointIntervalMs,
15611563
retentionCheckMs = config.logCleanupIntervalMs,
15621564
maxTransactionTimeoutMs = config.transactionStateManagerConfig.transactionMaxTimeoutMs,
1563-
producerStateManagerConfig = new ProducerStateManagerConfig(config.transactionLogConfig.producerIdExpirationMs, config.transactionLogConfig.transactionPartitionVerificationEnable),
1564-
producerIdExpirationCheckIntervalMs = config.transactionLogConfig.producerIdExpirationCheckIntervalMs,
1565+
producerStateManagerConfig = new ProducerStateManagerConfig(transactionLogConfig.producerIdExpirationMs, transactionLogConfig.transactionPartitionVerificationEnable),
1566+
producerIdExpirationCheckIntervalMs = transactionLogConfig.producerIdExpirationCheckIntervalMs,
15651567
scheduler = kafkaScheduler,
15661568
brokerTopicStats = brokerTopicStats,
15671569
logDirFailureChannel = logDirFailureChannel,

core/src/main/scala/kafka/server/AutoTopicCreationManager.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
3232
import org.apache.kafka.common.requests.{CreateTopicsRequest, RequestContext, RequestHeader}
3333
import org.apache.kafka.coordinator.group.GroupCoordinator
3434
import org.apache.kafka.coordinator.share.ShareCoordinator
35+
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
3536
import org.apache.kafka.server.common.{ControllerRequestCompletionHandler, NodeToControllerChannelManager}
3637

3738
import scala.collection.{Map, Seq, Set, mutable}
@@ -189,10 +190,11 @@ class DefaultAutoTopicCreationManager(
189190
.setReplicationFactor(config.groupCoordinatorConfig.offsetsTopicReplicationFactor)
190191
.setConfigs(convertToTopicConfigCollections(groupCoordinator.groupMetadataTopicConfigs))
191192
case TRANSACTION_STATE_TOPIC_NAME =>
193+
val transactionLogConfig = new TransactionLogConfig(config)
192194
new CreatableTopic()
193195
.setName(topic)
194-
.setNumPartitions(config.transactionLogConfig.transactionTopicPartitions)
195-
.setReplicationFactor(config.transactionLogConfig.transactionTopicReplicationFactor)
196+
.setNumPartitions(transactionLogConfig.transactionTopicPartitions)
197+
.setReplicationFactor(transactionLogConfig.transactionTopicReplicationFactor)
196198
.setConfigs(convertToTopicConfigCollections(
197199
txnCoordinator.transactionTopicConfigs))
198200
case SHARE_GROUP_STATE_TOPIC_NAME =>

core/src/main/scala/kafka/server/DynamicBrokerConfig.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,11 @@ import org.apache.kafka.common.metrics.{Metrics, MetricsReporter}
3535
import org.apache.kafka.common.network.{ListenerName, ListenerReconfigurable}
3636
import org.apache.kafka.common.security.authenticator.LoginManager
3737
import org.apache.kafka.common.utils.{BufferSupplier, ConfigUtils, Utils}
38+
import org.apache.kafka.config
3839
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
3940
import org.apache.kafka.network.SocketServerConfigs
4041
import org.apache.kafka.raft.KafkaRaftClient
41-
import org.apache.kafka.server.{ProcessRole, DynamicThreadPool}
42+
import org.apache.kafka.server.{DynamicThreadPool, ProcessRole}
4243
import org.apache.kafka.server.common.ApiMessageAndVersion
4344
import org.apache.kafka.server.config.{DynamicProducerStateManagerConfig, ServerConfigs, ServerLogConfigs, ServerTopicConfigSynonyms}
4445
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
@@ -323,7 +324,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
323324
reconfigurables.add(reconfigurable)
324325
}
325326

326-
def addBrokerReconfigurable(reconfigurable: org.apache.kafka.server.config.BrokerReconfigurable): Unit = {
327+
def addBrokerReconfigurable(reconfigurable: config.BrokerReconfigurable): Unit = {
327328
verifyReconfigurableConfigs(reconfigurable.reconfigurableConfigs.asScala)
328329
brokerReconfigurables.add(new BrokerReconfigurable {
329330
override def reconfigurableConfigs: Set[String] = reconfigurable.reconfigurableConfigs().asScala
@@ -617,7 +618,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
617618
}
618619

619620
/**
620-
* Implement [[org.apache.kafka.server.config.BrokerReconfigurable]] instead.
621+
* Implement [[config.BrokerReconfigurable]] instead.
621622
*/
622623
trait BrokerReconfigurable {
623624

core/src/main/scala/kafka/server/KafkaConfig.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ import org.apache.kafka.coordinator.group.Group.GroupType
3737
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig
3838
import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinatorConfig}
3939
import org.apache.kafka.coordinator.share.ShareCoordinatorConfig
40-
import org.apache.kafka.coordinator.transaction.{AddPartitionsToTxnConfig, TransactionLogConfig, TransactionStateManagerConfig}
40+
import org.apache.kafka.coordinator.transaction.{AddPartitionsToTxnConfig, TransactionStateManagerConfig}
4141
import org.apache.kafka.network.SocketServerConfigs
4242
import org.apache.kafka.raft.QuorumConfig
4343
import org.apache.kafka.security.authorizer.AuthorizerUtils
@@ -204,7 +204,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
204204
private val _shareCoordinatorConfig = new ShareCoordinatorConfig(this)
205205
def shareCoordinatorConfig: ShareCoordinatorConfig = _shareCoordinatorConfig
206206

207-
override val transactionLogConfig = new TransactionLogConfig(this)
208207
private val _transactionStateManagerConfig = new TransactionStateManagerConfig(this)
209208
private val _addPartitionsToTxnConfig = new AddPartitionsToTxnConfig(this)
210209
def transactionStateManagerConfig: TransactionStateManagerConfig = _transactionStateManagerConfig

core/src/main/scala/kafka/server/ReplicaManager.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ import org.apache.kafka.common.requests.FetchRequest.PartitionData
4747
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
4848
import org.apache.kafka.common.requests._
4949
import org.apache.kafka.common.utils.{Exit, Time, Utils}
50+
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
5051
import org.apache.kafka.image.{LocalReplicaChanges, MetadataImage, TopicsDelta}
5152
import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
5253
import org.apache.kafka.metadata.MetadataCache
@@ -1038,10 +1039,13 @@ class ReplicaManager(val config: KafkaConfig,
10381039
callback: ((Map[TopicPartition, Errors], Map[TopicPartition, VerificationGuard])) => Unit,
10391040
transactionSupportedOperation: TransactionSupportedOperation
10401041
): Unit = {
1042+
def transactionPartitionVerificationEnable = {
1043+
new TransactionLogConfig(config).transactionPartitionVerificationEnable
1044+
}
10411045
// Skip verification if the request is not transactional or transaction verification is disabled.
1042-
if (transactionalId == null ||
1043-
(!config.transactionLogConfig.transactionPartitionVerificationEnable && !transactionSupportedOperation.supportsEpochBump)
1046+
if (transactionalId == null
10441047
|| addPartitionsToTxnManager.isEmpty
1048+
|| (!transactionSupportedOperation.supportsEpochBump && !transactionPartitionVerificationEnable)
10451049
) {
10461050
callback((Map.empty[TopicPartition, Errors], Map.empty[TopicPartition, VerificationGuard]))
10471051
return

core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import org.apache.kafka.common.errors.TimeoutException
2727
import org.apache.kafka.common.internals.Topic
2828
import org.apache.kafka.coordinator.group.GroupCoordinator
2929
import org.apache.kafka.coordinator.share.ShareCoordinator
30+
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
3031
import org.apache.kafka.image.loader.LoaderManifest
3132
import org.apache.kafka.image.publisher.MetadataPublisher
3233
import org.apache.kafka.image.{MetadataDelta, MetadataImage, TopicDelta}
@@ -331,9 +332,10 @@ class BrokerMetadataPublisher(
331332
case t: Throwable => fatalFaultHandler.handleFault("Error starting GroupCoordinator", t)
332333
}
333334
try {
335+
val transactionLogConfig = new TransactionLogConfig(config)
334336
// Start the transaction coordinator.
335337
txnCoordinator.startup(() => metadataCache.numPartitions(
336-
Topic.TRANSACTION_STATE_TOPIC_NAME).orElse(config.transactionLogConfig.transactionTopicPartitions))
338+
Topic.TRANSACTION_STATE_TOPIC_NAME).orElse(transactionLogConfig.transactionTopicPartitions))
337339
} catch {
338340
case t: Throwable => fatalFaultHandler.handleFault("Error starting TransactionCoordinator", t)
339341
}

core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2523,7 +2523,8 @@ class ReplicaManagerTest {
25232523
val props = new Properties()
25242524
props.put(TransactionLogConfig.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG, "true")
25252525
config.dynamicConfig.updateBrokerConfig(config.brokerId, props)
2526-
TestUtils.waitUntilTrue(() => config.transactionLogConfig.transactionPartitionVerificationEnable, "Config did not dynamically update.")
2526+
val transactionLogConfig = new TransactionLogConfig(config)
2527+
TestUtils.waitUntilTrue(() => transactionLogConfig.transactionPartitionVerificationEnable, "Config did not dynamically update.")
25272528

25282529
// Try to append more records. We don't need to send a request since the transaction is already ongoing.
25292530
val moreTransactionalRecords = MemoryRecords.withTransactionalRecords(Compression.NONE, producerId, producerEpoch, sequence + 1,
@@ -2575,7 +2576,8 @@ class ReplicaManagerTest {
25752576
val props = new Properties()
25762577
props.put(TransactionLogConfig.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG, "false")
25772578
config.dynamicConfig.updateBrokerConfig(config.brokerId, props)
2578-
TestUtils.waitUntilTrue(() => !config.transactionLogConfig.transactionPartitionVerificationEnable, "Config did not dynamically update.")
2579+
val transactionLogConfig = new TransactionLogConfig(config)
2580+
TestUtils.waitUntilTrue(() => !transactionLogConfig.transactionPartitionVerificationEnable, "Config did not dynamically update.")
25792581

25802582
// Confirm we did not write to the log and instead returned error.
25812583
val callback: AddPartitionsToTxnManager.AppendCallback = appendCallback.getValue

0 commit comments

Comments
 (0)