1818package kafka .server
1919
2020import java .util .concurrent .ConcurrentHashMap
21- import java .util .concurrent .atomic .AtomicReference
2221import java .util .{Collections , Properties }
23- import kafka .controller .KafkaController
2422import kafka .coordinator .transaction .TransactionCoordinator
2523import kafka .utils .Logging
2624import org .apache .kafka .clients .ClientResponse
@@ -31,7 +29,7 @@ import org.apache.kafka.common.message.CreateTopicsRequestData
3129import org .apache .kafka .common .message .CreateTopicsRequestData .{CreatableTopic , CreatableTopicConfig , CreatableTopicConfigCollection }
3230import org .apache .kafka .common .message .MetadataResponseData .MetadataResponseTopic
3331import org .apache .kafka .common .protocol .{ApiKeys , Errors }
34- import org .apache .kafka .common .requests .{ApiError , CreateTopicsRequest , RequestContext , RequestHeader }
32+ import org .apache .kafka .common .requests .{CreateTopicsRequest , RequestContext , RequestHeader }
3533import org .apache .kafka .coordinator .group .GroupCoordinator
3634import org .apache .kafka .coordinator .share .ShareCoordinator
3735import org .apache .kafka .server .common .{ControllerRequestCompletionHandler , NodeToControllerChannelManager }
@@ -49,34 +47,13 @@ trait AutoTopicCreationManager {
4947 ): Seq [MetadataResponseTopic ]
5048}
5149
52- object AutoTopicCreationManager {
53-
54- def apply (
55- config : KafkaConfig ,
56- channelManager : Option [NodeToControllerChannelManager ],
57- adminManager : Option [ZkAdminManager ],
58- controller : Option [KafkaController ],
59- groupCoordinator : GroupCoordinator ,
60- txnCoordinator : TransactionCoordinator ,
61- shareCoordinator : Option [ShareCoordinator ],
62- ): AutoTopicCreationManager = {
63- new DefaultAutoTopicCreationManager (config, channelManager, adminManager,
64- controller, groupCoordinator, txnCoordinator, shareCoordinator)
65- }
66- }
67-
6850class DefaultAutoTopicCreationManager (
6951 config : KafkaConfig ,
70- channelManager : Option [NodeToControllerChannelManager ],
71- adminManager : Option [ZkAdminManager ],
72- controller : Option [KafkaController ],
52+ channelManager : NodeToControllerChannelManager ,
7353 groupCoordinator : GroupCoordinator ,
7454 txnCoordinator : TransactionCoordinator ,
7555 shareCoordinator : Option [ShareCoordinator ]
7656) extends AutoTopicCreationManager with Logging {
77- if (controller.isEmpty && channelManager.isEmpty) {
78- throw new IllegalArgumentException (" Must supply a channel manager if not supplying a controller" )
79- }
8057
8158 private val inflightTopics = Collections .newSetFromMap(new ConcurrentHashMap [String , java.lang.Boolean ]())
8259
@@ -99,65 +76,13 @@ class DefaultAutoTopicCreationManager(
9976
10077 val creatableTopicResponses = if (creatableTopics.isEmpty) {
10178 Seq .empty
102- } else if (controller.isEmpty || ! controller.get.isActive && channelManager.isDefined) {
103- sendCreateTopicRequest(creatableTopics, metadataRequestContext)
10479 } else {
105- createTopicsInZk (creatableTopics, controllerMutationQuota )
80+ sendCreateTopicRequest (creatableTopics, metadataRequestContext )
10681 }
10782
10883 uncreatableTopicResponses ++ creatableTopicResponses
10984 }
11085
111- private def createTopicsInZk (
112- creatableTopics : Map [String , CreatableTopic ],
113- controllerMutationQuota : ControllerMutationQuota
114- ): Seq [MetadataResponseTopic ] = {
115- val topicErrors = new AtomicReference [Map [String , ApiError ]]()
116- try {
117- // Note that we use timeout = 0 since we do not need to wait for metadata propagation
118- // and we want to get the response error immediately.
119- adminManager.get.createTopics(
120- timeout = 0 ,
121- validateOnly = false ,
122- creatableTopics,
123- Map .empty,
124- controllerMutationQuota,
125- topicErrors.set
126- )
127-
128- val creatableTopicResponses = Option (topicErrors.get) match {
129- case Some (errors) =>
130- errors.toSeq.map { case (topic, apiError) =>
131- val error = apiError.error match {
132- case Errors .TOPIC_ALREADY_EXISTS | Errors .REQUEST_TIMED_OUT =>
133- // The timeout error is expected because we set timeout=0. This
134- // nevertheless indicates that the topic metadata was created
135- // successfully, so we return LEADER_NOT_AVAILABLE.
136- Errors .LEADER_NOT_AVAILABLE
137- case error => error
138- }
139-
140- new MetadataResponseTopic ()
141- .setErrorCode(error.code)
142- .setName(topic)
143- .setIsInternal(Topic .isInternal(topic))
144- }
145-
146- case None =>
147- creatableTopics.keySet.toSeq.map { topic =>
148- new MetadataResponseTopic ()
149- .setErrorCode(Errors .UNKNOWN_TOPIC_OR_PARTITION .code)
150- .setName(topic)
151- .setIsInternal(Topic .isInternal(topic))
152- }
153- }
154-
155- creatableTopicResponses
156- } finally {
157- clearInflightRequests(creatableTopics)
158- }
159- }
160-
16186 private def sendCreateTopicRequest (
16287 creatableTopics : Map [String , CreatableTopic ],
16388 metadataRequestContext : Option [RequestContext ]
@@ -189,10 +114,6 @@ class DefaultAutoTopicCreationManager(
189114 }
190115 }
191116
192- val channelManager = this .channelManager.getOrElse {
193- throw new IllegalStateException (" Channel manager must be defined in order to send CreateTopic requests." )
194- }
195-
196117 val request = metadataRequestContext.map { context =>
197118 val requestVersion =
198119 channelManager.controllerApiVersions.toScala match {
0 commit comments