Skip to content

Commit 14917ae

Browse files
authored
MINOR: Handle envelope response in AutoTopicCreationManager (#20569)
In the create topic request we send a CreateTopic request in an Envelope, so we need to also unpack the response correctly Reviewers: Lucas Brutschy <[email protected]>
1 parent 444ceeb commit 14917ae

File tree

2 files changed

+427
-46
lines changed

2 files changed

+427
-46
lines changed

core/src/main/scala/kafka/server/AutoTopicCreationManager.scala

Lines changed: 108 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import org.apache.kafka.common.message.CreateTopicsRequestData
3030
import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreatableTopicConfig, CreatableTopicConfigCollection}
3131
import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic
3232
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
33-
import org.apache.kafka.common.requests.{CreateTopicsRequest, CreateTopicsResponse, RequestContext, RequestHeader}
33+
import org.apache.kafka.common.requests.{AbstractResponse, CreateTopicsRequest, CreateTopicsResponse, EnvelopeResponse, RequestContext, RequestHeader}
3434
import org.apache.kafka.coordinator.group.GroupCoordinator
3535
import org.apache.kafka.coordinator.share.ShareCoordinator
3636
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
@@ -198,6 +198,22 @@ class DefaultAutoTopicCreationManager(
198198
.setTopics(topicsToCreate)
199199
)
200200

201+
// Capture request header information for proper envelope response parsing
202+
val requestHeaderForParsing = requestContext.map { context =>
203+
val requestVersion =
204+
channelManager.controllerApiVersions.toScala match {
205+
case None =>
206+
ApiKeys.CREATE_TOPICS.latestVersion()
207+
case Some(nodeApiVersions) =>
208+
nodeApiVersions.latestUsableVersion(ApiKeys.CREATE_TOPICS)
209+
}
210+
211+
new RequestHeader(ApiKeys.CREATE_TOPICS,
212+
requestVersion,
213+
context.clientId,
214+
context.correlationId)
215+
}
216+
201217
val requestCompletionHandler = new ControllerRequestCompletionHandler {
202218
override def onTimeout(): Unit = {
203219
clearInflightRequests(creatableTopics)
@@ -213,6 +229,33 @@ class DefaultAutoTopicCreationManager(
213229
} else {
214230
if (response.hasResponse) {
215231
response.responseBody() match {
232+
case envelopeResponse: EnvelopeResponse =>
233+
// Unwrap the envelope response to get the actual CreateTopicsResponse
234+
val envelopeError = envelopeResponse.error()
235+
if (envelopeError != Errors.NONE) {
236+
warn(s"Auto topic creation failed for ${creatableTopics.keys} with envelope error: ${envelopeError}")
237+
} else {
238+
requestHeaderForParsing match {
239+
case Some(requestHeader) =>
240+
try {
241+
// Use the captured request header for proper envelope response parsing
242+
val createTopicsResponse = AbstractResponse.parseResponse(
243+
envelopeResponse.responseData(), requestHeader).asInstanceOf[CreateTopicsResponse]
244+
245+
createTopicsResponse.data().topics().forEach(topicResult => {
246+
val error = Errors.forCode(topicResult.errorCode)
247+
if (error != Errors.NONE) {
248+
warn(s"Auto topic creation failed for ${topicResult.name} with error '${error.name}': ${topicResult.errorMessage}")
249+
}
250+
})
251+
} catch {
252+
case e: Exception =>
253+
warn(s"Failed to parse envelope response for auto topic creation of ${creatableTopics.keys}", e)
254+
}
255+
case None =>
256+
warn(s"Cannot parse envelope response without original request header information")
257+
}
258+
}
216259
case createTopicsResponse: CreateTopicsResponse =>
217260
createTopicsResponse.data().topics().forEach(topicResult => {
218261
val error = Errors.forCode(topicResult.errorCode)
@@ -229,26 +272,13 @@ class DefaultAutoTopicCreationManager(
229272
}
230273
}
231274

232-
val request = requestContext.map { context =>
233-
val requestVersion =
234-
channelManager.controllerApiVersions.toScala match {
235-
case None =>
236-
// We will rely on the Metadata request to be retried in the case
237-
// that the latest version is not usable by the controller.
238-
ApiKeys.CREATE_TOPICS.latestVersion()
239-
case Some(nodeApiVersions) =>
240-
nodeApiVersions.latestUsableVersion(ApiKeys.CREATE_TOPICS)
241-
}
242-
243-
// Borrow client information such as client id and correlation id from the original request,
244-
// in order to correlate the create request with the original metadata request.
245-
val requestHeader = new RequestHeader(ApiKeys.CREATE_TOPICS,
246-
requestVersion,
247-
context.clientId,
248-
context.correlationId)
249-
ForwardingManager.buildEnvelopeRequest(context,
250-
createTopicsRequest.build(requestVersion).serializeWithHeader(requestHeader))
251-
}.getOrElse(createTopicsRequest)
275+
val request = (requestContext, requestHeaderForParsing) match {
276+
case (Some(context), Some(requestHeader)) =>
277+
ForwardingManager.buildEnvelopeRequest(context,
278+
createTopicsRequest.build(requestHeader.apiVersion()).serializeWithHeader(requestHeader))
279+
case _ =>
280+
createTopicsRequest
281+
}
252282

253283
channelManager.sendRequest(request, requestCompletionHandler)
254284

@@ -364,6 +394,22 @@ class DefaultAutoTopicCreationManager(
364394
.setTopics(topicsToCreate)
365395
)
366396

397+
// Capture request header information for proper envelope response parsing
398+
val requestHeaderForParsing = requestContext.map { context =>
399+
val requestVersion =
400+
channelManager.controllerApiVersions.toScala match {
401+
case None =>
402+
ApiKeys.CREATE_TOPICS.latestVersion()
403+
case Some(nodeApiVersions) =>
404+
nodeApiVersions.latestUsableVersion(ApiKeys.CREATE_TOPICS)
405+
}
406+
407+
new RequestHeader(ApiKeys.CREATE_TOPICS,
408+
requestVersion,
409+
context.clientId,
410+
context.correlationId)
411+
}
412+
367413
val requestCompletionHandler = new ControllerRequestCompletionHandler {
368414
override def onTimeout(): Unit = {
369415
clearInflightRequests(creatableTopics)
@@ -382,36 +428,52 @@ class DefaultAutoTopicCreationManager(
382428
warn(s"Auto topic creation failed for ${creatableTopics.keys} with version mismatch exception: ${versionException.getMessage}")
383429
cacheTopicCreationErrors(creatableTopics.keys.toSet, versionException.getMessage, timeoutMs)
384430
} else {
385-
response.responseBody() match {
386-
case createTopicsResponse: CreateTopicsResponse =>
387-
cacheTopicCreationErrorsFromResponse(createTopicsResponse, timeoutMs)
388-
case _ =>
389-
debug(s"Auto topic creation completed for ${creatableTopics.keys} with response ${response.responseBody}.")
431+
if (response.hasResponse) {
432+
response.responseBody() match {
433+
case envelopeResponse: EnvelopeResponse =>
434+
// Unwrap the envelope response to get the actual CreateTopicsResponse
435+
val envelopeError = envelopeResponse.error()
436+
if (envelopeError != Errors.NONE) {
437+
warn(s"Auto topic creation failed for ${creatableTopics.keys} with envelope error: ${envelopeError}")
438+
cacheTopicCreationErrors(creatableTopics.keys.toSet, s"Envelope error: ${envelopeError}", timeoutMs)
439+
} else {
440+
requestHeaderForParsing match {
441+
case Some(requestHeader) =>
442+
try {
443+
// Use the captured request header for proper envelope response parsing
444+
val createTopicsResponse = AbstractResponse.parseResponse(
445+
envelopeResponse.responseData(), requestHeader).asInstanceOf[CreateTopicsResponse]
446+
447+
cacheTopicCreationErrorsFromResponse(createTopicsResponse, timeoutMs)
448+
} catch {
449+
case e: Exception =>
450+
warn(s"Failed to parse envelope response for auto topic creation of ${creatableTopics.keys}", e)
451+
cacheTopicCreationErrors(creatableTopics.keys.toSet, s"Response parsing error: ${e.getMessage}", timeoutMs)
452+
}
453+
case None =>
454+
warn(s"Cannot parse envelope response without original request header information")
455+
cacheTopicCreationErrors(creatableTopics.keys.toSet, "Missing request header for envelope parsing", timeoutMs)
456+
}
457+
}
458+
case createTopicsResponse: CreateTopicsResponse =>
459+
cacheTopicCreationErrorsFromResponse(createTopicsResponse, timeoutMs)
460+
case unexpectedResponse =>
461+
warn(s"Auto topic creation request received unexpected response type: ${unexpectedResponse.getClass.getSimpleName}")
462+
cacheTopicCreationErrors(creatableTopics.keys.toSet, s"Unexpected response type: ${unexpectedResponse.getClass.getSimpleName}", timeoutMs)
463+
}
464+
debug(s"Auto topic creation completed for ${creatableTopics.keys} with response ${response.responseBody}.")
390465
}
391466
}
392467
}
393468
}
394469

395-
val request = requestContext.map { context =>
396-
val requestVersion =
397-
channelManager.controllerApiVersions.toScala match {
398-
case None =>
399-
// We will rely on the Metadata request to be retried in the case
400-
// that the latest version is not usable by the controller.
401-
ApiKeys.CREATE_TOPICS.latestVersion()
402-
case Some(nodeApiVersions) =>
403-
nodeApiVersions.latestUsableVersion(ApiKeys.CREATE_TOPICS)
404-
}
405-
406-
// Borrow client information such as client id and correlation id from the original request,
407-
// in order to correlate the create request with the original metadata request.
408-
val requestHeader = new RequestHeader(ApiKeys.CREATE_TOPICS,
409-
requestVersion,
410-
context.clientId,
411-
context.correlationId)
412-
ForwardingManager.buildEnvelopeRequest(context,
413-
createTopicsRequest.build(requestVersion).serializeWithHeader(requestHeader))
414-
}.getOrElse(createTopicsRequest)
470+
val request = (requestContext, requestHeaderForParsing) match {
471+
case (Some(context), Some(requestHeader)) =>
472+
ForwardingManager.buildEnvelopeRequest(context,
473+
createTopicsRequest.build(requestHeader.apiVersion()).serializeWithHeader(requestHeader))
474+
case _ =>
475+
createTopicsRequest
476+
}
415477

416478
channelManager.sendRequest(request, requestCompletionHandler)
417479

0 commit comments

Comments
 (0)