Skip to content
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions core/src/main/scala/kafka/server/AlterPartitionManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import org.apache.kafka.common.utils.Time
import org.apache.kafka.metadata.{LeaderAndIsr, LeaderRecoveryState}
import org.apache.kafka.server.common.{ControllerRequestCompletionHandler, NodeToControllerChannelManager, TopicIdPartition}
import org.apache.kafka.server.util.Scheduler
import org.apache.kafka.server.ControllerNodeProvider
import org.apache.kafka.server.NodeToControllerChannelManagerImpl

import scala.collection.mutable
import scala.collection.mutable.ListBuffer
Expand Down Expand Up @@ -74,12 +76,12 @@ object AlterPartitionManager {
): AlterPartitionManager = {
val channelManager = new NodeToControllerChannelManagerImpl(
controllerNodeProvider,
time = time,
metrics = metrics,
config = config,
channelName = "alter-partition",
threadNamePrefix = threadNamePrefix,
retryTimeoutMs = Long.MaxValue
time,
metrics,
config,
"alter-partition",
threadNamePrefix,
Long.MaxValue
)
new DefaultAlterPartitionManager(
controllerChannelManager = channelManager,
Expand Down Expand Up @@ -150,7 +152,7 @@ class DefaultAlterPartitionManager(
val request = buildRequest(inflightAlterPartitionItems, brokerEpoch)
debug(s"Sending AlterPartition to controller $request")

// We will not timeout AlterPartition request, instead letting it retry indefinitely
// We will not time out AlterPartition request, instead letting it retry indefinitely
// until a response is received, or a new LeaderAndIsr overwrites the existing isrState
// which causes the response for those partitions to be ignored.
controllerChannelManager.sendRequest(request,
Expand All @@ -159,7 +161,7 @@ class DefaultAlterPartitionManager(
debug(s"Received AlterPartition response $response")
val error = try {
if (response.authenticationException != null) {
// For now we treat authentication errors as retriable. We use the
// For now, we treat authentication errors as retriable. We use the
// `NETWORK_EXCEPTION` error code for lack of a good alternative.
// Note that `NodeToControllerChannelManager` will still log the
// authentication errors so that users have a chance to fix the problem.
Expand Down
10 changes: 6 additions & 4 deletions core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ import org.apache.kafka.server.{AssignmentsManager, BrokerFeatures, ClientMetric
import org.apache.kafka.server.transaction.AddPartitionsToTxnManager
import org.apache.kafka.storage.internals.log.LogDirFailureChannel
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import org.apache.kafka.server.NodeToControllerChannelManagerImpl
import org.apache.kafka.server.RaftControllerNodeProvider

import java.time.Duration
import java.util
Expand Down Expand Up @@ -233,16 +235,16 @@ class BrokerServer(
"controller quorum voters future",
sharedServer.controllerQuorumVotersFuture,
startupDeadline, time)
val controllerNodeProvider = RaftControllerNodeProvider(raftManager, config)
val controllerNodeProvider = RaftControllerNodeProvider.create(raftManager, config)

clientToControllerChannelManager = new NodeToControllerChannelManagerImpl(
controllerNodeProvider,
time,
metrics,
config,
channelName = "forwarding",
"forwarding",
s"broker-${config.nodeId}-",
retryTimeoutMs = 60000
60000
)
clientToControllerChannelManager.start()
forwardingManager = new ForwardingManagerImpl(clientToControllerChannelManager, metrics)
Expand Down Expand Up @@ -316,7 +318,7 @@ class BrokerServer(
config,
"directory-assignments",
s"broker-${config.nodeId}-",
retryTimeoutMs = 60000
60000
)
assignmentsManager = new AssignmentsManager(
time,
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/scala/kafka/server/ControllerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics, L
import org.apache.kafka.server.network.{EndpointReadyFutures, KafkaAuthorizerServerInfo}
import org.apache.kafka.server.policy.{AlterConfigPolicy, CreateTopicPolicy}
import org.apache.kafka.server.util.{Deadline, FutureUtils}
import org.apache.kafka.server.NodeToControllerChannelManagerImpl
import org.apache.kafka.server.RaftControllerNodeProvider

import java.util
import java.util.{Optional, OptionalLong}
Expand Down Expand Up @@ -405,7 +407,7 @@ class ControllerServer(
/**
* Start the KIP-919 controller registration manager.
*/
val controllerNodeProvider = RaftControllerNodeProvider(raftManager, config)
val controllerNodeProvider = RaftControllerNodeProvider.create(raftManager, config)
registrationChannelManager = new NodeToControllerChannelManagerImpl(
controllerNodeProvider,
time,
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -415,11 +415,11 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
val unstableApiVersionsEnabled = getBoolean(ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG)
val unstableFeatureVersionsEnabled = getBoolean(ServerConfigs.UNSTABLE_FEATURE_VERSIONS_ENABLE_CONFIG)

def addReconfigurable(reconfigurable: Reconfigurable): Unit = {
override def addReconfigurable(reconfigurable: Reconfigurable): Unit = {
dynamicConfig.addReconfigurable(reconfigurable)
}

def removeReconfigurable(reconfigurable: Reconfigurable): Unit = {
override def removeReconfigurable(reconfigurable: Reconfigurable): Unit = {
dynamicConfig.removeReconfigurable(reconfigurable)
}

Expand Down
Loading