@@ -69,13 +69,6 @@ import scala.util.control.ControlThrowable
6969 * It is possible to configure multiple data-planes by specifying multiple "," separated endpoints for "listeners" in KafkaConfig.
7070 * Acceptor has N Processor threads that each have their own selector and read requests from sockets
7171 * M Handler threads that handle requests and produce responses back to the processor threads for writing.
72- * - control-plane :
73- * - Handles requests from controller. This is optional and can be configured by specifying "control.plane.listener.name".
74- * If not configured, the controller requests are handled by the data-plane.
75- * - The threading model is
76- * 1 Acceptor thread that handles new connections
77- * Acceptor has 1 Processor thread that has its own selector and read requests from the socket.
78- * 1 Handler thread that handles requests and produces responses back to the processor thread for writing.
7972 */
8073class SocketServer (
8174 val config : KafkaConfig ,
@@ -105,10 +98,6 @@ class SocketServer(
10598 // data-plane
10699 private [network] val dataPlaneAcceptors = new ConcurrentHashMap [EndPoint , DataPlaneAcceptor ]()
107100 val dataPlaneRequestChannel = new RequestChannel (maxQueuedRequests, DataPlaneAcceptor .MetricPrefix , time, apiVersionManager.newRequestMetrics)
108- // control-plane
109- private [network] var controlPlaneAcceptorOpt : Option [ControlPlaneAcceptor ] = None
110- val controlPlaneRequestChannelOpt : Option [RequestChannel ] = config.controlPlaneListenerName.map(_ =>
111- new RequestChannel (20 , ControlPlaneAcceptor .MetricPrefix , time, apiVersionManager.newRequestMetrics))
112101
113102 private [this ] val nextProcessorId : AtomicInteger = new AtomicInteger (0 )
114103 val connectionQuotas = new ConnectionQuotas (config, time, metrics)
@@ -137,17 +126,7 @@ class SocketServer(
137126 }.sum / dataPlaneProcessors.size
138127 }
139128 })
140- if (config.requiresZookeeper) {
141- metricsGroup.newGauge(s " ${ControlPlaneAcceptor .MetricPrefix }NetworkProcessorAvgIdlePercent " , () => SocketServer .this .synchronized {
142- val controlPlaneProcessorOpt = controlPlaneAcceptorOpt.map(a => a.processors(0 ))
143- val ioWaitRatioMetricName = controlPlaneProcessorOpt.map { p =>
144- metrics.metricName(" io-wait-ratio" , MetricsGroup , p.metricTags)
145- }
146- ioWaitRatioMetricName.map { metricName =>
147- Option (metrics.metric(metricName)).fold(0.0 )(m => Math .min(m.metricValue.asInstanceOf [Double ], 1.0 ))
148- }.getOrElse(Double .NaN )
149- })
150- }
129+
151130 metricsGroup.newGauge(" MemoryPoolAvailable" , () => memoryPool.availableMemory)
152131 metricsGroup.newGauge(" MemoryPoolUsed" , () => memoryPool.size() - memoryPool.availableMemory)
153132 metricsGroup.newGauge(s " ${DataPlaneAcceptor .MetricPrefix }ExpiredConnectionsKilledCount " , () => SocketServer .this .synchronized {
@@ -159,17 +138,6 @@ class SocketServer(
159138 Option (metrics.metric(metricName)).fold(0.0 )(m => m.metricValue.asInstanceOf [Double ])
160139 }.sum
161140 })
162- if (config.requiresZookeeper) {
163- metricsGroup.newGauge(s " ${ControlPlaneAcceptor .MetricPrefix }ExpiredConnectionsKilledCount " , () => SocketServer .this .synchronized {
164- val controlPlaneProcessorOpt = controlPlaneAcceptorOpt.map(a => a.processors(0 ))
165- val expiredConnectionsKilledCountMetricNames = controlPlaneProcessorOpt.map { p =>
166- metrics.metricName(" expired-connections-killed-count" , MetricsGroup , p.metricTags)
167- }
168- expiredConnectionsKilledCountMetricNames.map { metricName =>
169- Option (metrics.metric(metricName)).fold(0.0 )(m => m.metricValue.asInstanceOf [Double ])
170- }.getOrElse(0.0 )
171- })
172- }
173141
174142 // Create acceptors and processors for the statically configured endpoints when the
175143 // SocketServer is constructed. Note that this just opens the ports and creates the data
@@ -178,7 +146,6 @@ class SocketServer(
178146 if (apiVersionManager.listenerType.equals(ListenerType .CONTROLLER )) {
179147 config.controllerListeners.foreach(createDataPlaneAcceptorAndProcessors)
180148 } else {
181- config.controlPlaneListener.foreach(createControlPlaneAcceptorAndProcessor)
182149 config.dataPlaneListeners.foreach(createDataPlaneAcceptorAndProcessors)
183150 }
184151
@@ -232,16 +199,14 @@ class SocketServer(
232199 }
233200
234201 info(" Enabling request processing." )
235- controlPlaneAcceptorOpt.foreach(chainAcceptorFuture)
236202 dataPlaneAcceptors.values().forEach(chainAcceptorFuture)
237203 FutureUtils .chainFuture(CompletableFuture .allOf(authorizerFutures.values.toArray: _* ),
238204 allAuthorizerFuturesComplete)
239205
240206 // Construct a future that will be completed when all Acceptors have been successfully started.
241207 // Alternately, if any of them fail to start, this future will be completed exceptionally.
242- val allAcceptors = dataPlaneAcceptors.values().asScala.toSeq ++ controlPlaneAcceptorOpt
243208 val enableFuture = new CompletableFuture [Void ]
244- FutureUtils .chainFuture(CompletableFuture .allOf(allAcceptors. map(_.startedFuture).toArray : _* ), enableFuture)
209+ FutureUtils .chainFuture(CompletableFuture .allOf(dataPlaneAcceptors.values().asScala.toArray. map(_.startedFuture): _* ), enableFuture)
245210 enableFuture
246211 }
247212
@@ -251,36 +216,20 @@ class SocketServer(
251216 }
252217 val parsedConfigs = config.valuesFromThisConfigWithPrefixOverride(endpoint.listenerName.configPrefix)
253218 connectionQuotas.addListener(config, endpoint.listenerName)
254- val isPrivilegedListener = controlPlaneRequestChannelOpt.isEmpty &&
255- config.interBrokerListenerName == endpoint.listenerName
219+ val isPrivilegedListener = config.interBrokerListenerName == endpoint.listenerName
256220 val dataPlaneAcceptor = createDataPlaneAcceptor(endpoint, isPrivilegedListener, dataPlaneRequestChannel)
257221 config.addReconfigurable(dataPlaneAcceptor)
258222 dataPlaneAcceptor.configure(parsedConfigs)
259223 dataPlaneAcceptors.put(endpoint, dataPlaneAcceptor)
260224 info(s " Created data-plane acceptor and processors for endpoint : ${endpoint.listenerName}" )
261225 }
262226
263- private def createControlPlaneAcceptorAndProcessor (endpoint : EndPoint ): Unit = synchronized {
264- if (stopped) {
265- throw new RuntimeException (" Can't create new control plane acceptor and processor: SocketServer is stopped." )
266- }
267- connectionQuotas.addListener(config, endpoint.listenerName)
268- val controlPlaneAcceptor = createControlPlaneAcceptor(endpoint, controlPlaneRequestChannelOpt.get)
269- controlPlaneAcceptor.addProcessors(1 )
270- controlPlaneAcceptorOpt = Some (controlPlaneAcceptor)
271- info(s " Created control-plane acceptor and processor for endpoint : ${endpoint.listenerName}" )
272- }
273-
274227 private def endpoints = config.listeners.map(l => l.listenerName -> l).toMap
275228
276229 protected def createDataPlaneAcceptor (endPoint : EndPoint , isPrivilegedListener : Boolean , requestChannel : RequestChannel ): DataPlaneAcceptor = {
277230 new DataPlaneAcceptor (this , endPoint, config, nodeId, connectionQuotas, time, isPrivilegedListener, requestChannel, metrics, credentialProvider, logContext, memoryPool, apiVersionManager)
278231 }
279232
280- private def createControlPlaneAcceptor (endPoint : EndPoint , requestChannel : RequestChannel ): ControlPlaneAcceptor = {
281- new ControlPlaneAcceptor (this , endPoint, config, nodeId, connectionQuotas, time, requestChannel, metrics, credentialProvider, logContext, memoryPool, apiVersionManager)
282- }
283-
284233 /**
285234 * Stop processing requests and new connections.
286235 */
@@ -289,11 +238,8 @@ class SocketServer(
289238 stopped = true
290239 info(" Stopping socket server request processors" )
291240 dataPlaneAcceptors.asScala.values.foreach(_.beginShutdown())
292- controlPlaneAcceptorOpt.foreach(_.beginShutdown())
293241 dataPlaneAcceptors.asScala.values.foreach(_.close())
294- controlPlaneAcceptorOpt.foreach(_.close())
295242 dataPlaneRequestChannel.clear()
296- controlPlaneRequestChannelOpt.foreach(_.clear())
297243 info(" Stopped socket server request processors" )
298244 }
299245 }
@@ -309,7 +255,6 @@ class SocketServer(
309255 this .synchronized {
310256 stopProcessingRequests()
311257 dataPlaneRequestChannel.shutdown()
312- controlPlaneRequestChannelOpt.foreach(_.shutdown())
313258 connectionQuotas.close()
314259 }
315260 info(" Shutdown completed" )
@@ -321,7 +266,7 @@ class SocketServer(
321266 if (acceptor != null ) {
322267 acceptor.localPort
323268 } else {
324- controlPlaneAcceptorOpt.map(_.localPort).getOrElse( throw new KafkaException (" Could not find listenerName : " + listenerName + " in data-plane or control-plane " ) )
269+ throw new KafkaException (" Could not find listenerName : " + listenerName + " in data-plane. " )
325270 }
326271 } catch {
327272 case e : Exception =>
@@ -528,42 +473,6 @@ class DataPlaneAcceptor(socketServer: SocketServer,
528473 }
529474}
530475
531- object ControlPlaneAcceptor {
532- val ThreadPrefix = " control-plane"
533- val MetricPrefix = " ControlPlane"
534- }
535-
536- class ControlPlaneAcceptor (socketServer : SocketServer ,
537- endPoint : EndPoint ,
538- config : KafkaConfig ,
539- nodeId : Int ,
540- connectionQuotas : ConnectionQuotas ,
541- time : Time ,
542- requestChannel : RequestChannel ,
543- metrics : Metrics ,
544- credentialProvider : CredentialProvider ,
545- logContext : LogContext ,
546- memoryPool : MemoryPool ,
547- apiVersionManager : ApiVersionManager )
548- extends Acceptor (socketServer,
549- endPoint,
550- config,
551- nodeId,
552- connectionQuotas,
553- time,
554- true ,
555- requestChannel,
556- metrics,
557- credentialProvider,
558- logContext,
559- memoryPool,
560- apiVersionManager) {
561-
562- override def metricPrefix (): String = ControlPlaneAcceptor .MetricPrefix
563- override def threadPrefix (): String = ControlPlaneAcceptor .ThreadPrefix
564-
565- }
566-
567476/**
568477 * Thread that accepts and configures new connections. There is one of these per endpoint.
569478 */
0 commit comments