@@ -51,6 +51,7 @@ import org.mockito.{ArgumentCaptor, ArgumentMatchers, Mockito}
5151import org .mockito .Mockito .never
5252
5353import scala .collection .{Map , Seq }
54+ import scala .jdk .CollectionConverters ._
5455
5556class AutoTopicCreationManagerTest {
5657
@@ -903,4 +904,225 @@ class AutoTopicCreationManagerTest {
903904 assertTrue(! cachedErrors.contains(" test-topic-1" ), " test-topic-1 should have been evicted" )
904905 assertTrue(! cachedErrors.contains(" test-topic-2" ), " test-topic-2 should have been evicted" )
905906 }
907+
908+ @ Test
909+ def testTopicsInBackoffAreNotRetried (): Unit = {
910+ autoTopicCreationManager = new DefaultAutoTopicCreationManager (
911+ config,
912+ brokerToController,
913+ groupCoordinator,
914+ transactionCoordinator,
915+ shareCoordinator,
916+ mockTime,
917+ topicErrorCacheCapacity = testCacheCapacity)
918+
919+ val topics = Map (
920+ " test-topic" -> new CreatableTopic ().setName(" test-topic" ).setNumPartitions(1 ).setReplicationFactor(1 )
921+ )
922+ val requestContext = initializeRequestContextWithUserPrincipal()
923+ val timeoutMs = 5000L
924+
925+ // First attempt - trigger topic creation
926+ autoTopicCreationManager.createStreamsInternalTopics(topics, requestContext, timeoutMs)
927+
928+ val argumentCaptor = ArgumentCaptor .forClass(classOf [ControllerRequestCompletionHandler ])
929+ Mockito .verify(brokerToController, Mockito .times(1 )).sendRequest(
930+ any(classOf [AbstractRequest .Builder [_ <: AbstractRequest ]]),
931+ argumentCaptor.capture())
932+
933+ // Simulate error response to cache the error
934+ val createTopicsResponseData = new org.apache.kafka.common.message.CreateTopicsResponseData ()
935+ val topicResult = new org.apache.kafka.common.message.CreateTopicsResponseData .CreatableTopicResult ()
936+ .setName(" test-topic" )
937+ .setErrorCode(Errors .INVALID_REPLICATION_FACTOR .code())
938+ .setErrorMessage(" Invalid replication factor" )
939+ createTopicsResponseData.topics().add(topicResult)
940+
941+ val createTopicsResponse = new CreateTopicsResponse (createTopicsResponseData)
942+ val header = new RequestHeader (ApiKeys .CREATE_TOPICS , 0 , " client" , 1 )
943+ val clientResponse = new ClientResponse (header, null , null ,
944+ 0 , 0 , false , null , null , createTopicsResponse)
945+
946+ argumentCaptor.getValue.onComplete(clientResponse)
947+
948+ // Verify error is cached
949+ val cachedErrors = autoTopicCreationManager.getStreamsInternalTopicCreationErrors(Set (" test-topic" ), mockTime.milliseconds())
950+ assertEquals(1 , cachedErrors.size)
951+
952+ // Second attempt - should NOT send request because topic is in back-off
953+ autoTopicCreationManager.createStreamsInternalTopics(topics, requestContext, timeoutMs)
954+
955+ // Verify still only one request was sent (not retried during back-off)
956+ Mockito .verify(brokerToController, Mockito .times(1 )).sendRequest(
957+ any(classOf [AbstractRequest .Builder [_ <: AbstractRequest ]]),
958+ any(classOf [ControllerRequestCompletionHandler ]))
959+ }
960+
961+ @ Test
962+ def testTopicsOutOfBackoffCanBeRetried (): Unit = {
963+ autoTopicCreationManager = new DefaultAutoTopicCreationManager (
964+ config,
965+ brokerToController,
966+ groupCoordinator,
967+ transactionCoordinator,
968+ shareCoordinator,
969+ mockTime,
970+ topicErrorCacheCapacity = testCacheCapacity)
971+
972+ val topics = Map (
973+ " test-topic" -> new CreatableTopic ().setName(" test-topic" ).setNumPartitions(1 ).setReplicationFactor(1 )
974+ )
975+ val requestContext = initializeRequestContextWithUserPrincipal()
976+ val shortTtlMs = 1000L
977+
978+ // First attempt - trigger topic creation
979+ autoTopicCreationManager.createStreamsInternalTopics(topics, requestContext, shortTtlMs)
980+
981+ val argumentCaptor = ArgumentCaptor .forClass(classOf [ControllerRequestCompletionHandler ])
982+ Mockito .verify(brokerToController, Mockito .times(1 )).sendRequest(
983+ any(classOf [AbstractRequest .Builder [_ <: AbstractRequest ]]),
984+ argumentCaptor.capture())
985+
986+ // Simulate error response to cache the error
987+ val createTopicsResponseData = new org.apache.kafka.common.message.CreateTopicsResponseData ()
988+ val topicResult = new org.apache.kafka.common.message.CreateTopicsResponseData .CreatableTopicResult ()
989+ .setName(" test-topic" )
990+ .setErrorCode(Errors .INVALID_REPLICATION_FACTOR .code())
991+ .setErrorMessage(" Invalid replication factor" )
992+ createTopicsResponseData.topics().add(topicResult)
993+
994+ val createTopicsResponse = new CreateTopicsResponse (createTopicsResponseData)
995+ val header = new RequestHeader (ApiKeys .CREATE_TOPICS , 0 , " client" , 1 )
996+ val clientResponse = new ClientResponse (header, null , null ,
997+ 0 , 0 , false , null , null , createTopicsResponse)
998+
999+ argumentCaptor.getValue.onComplete(clientResponse)
1000+
1001+ // Verify error is cached
1002+ val cachedErrors1 = autoTopicCreationManager.getStreamsInternalTopicCreationErrors(Set (" test-topic" ), mockTime.milliseconds())
1003+ assertEquals(1 , cachedErrors1.size)
1004+
1005+ // Advance time beyond TTL to exit back-off period
1006+ mockTime.sleep(shortTtlMs + 100 )
1007+
1008+ // Verify error is expired
1009+ val cachedErrors2 = autoTopicCreationManager.getStreamsInternalTopicCreationErrors(Set (" test-topic" ), mockTime.milliseconds())
1010+ assertTrue(cachedErrors2.isEmpty, " Error should be expired after TTL" )
1011+
1012+ // Second attempt - should send request because topic is out of back-off
1013+ autoTopicCreationManager.createStreamsInternalTopics(topics, requestContext, shortTtlMs)
1014+
1015+ // Verify a second request was sent (retry allowed after back-off expires)
1016+ Mockito .verify(brokerToController, Mockito .times(2 )).sendRequest(
1017+ any(classOf [AbstractRequest .Builder [_ <: AbstractRequest ]]),
1018+ any(classOf [ControllerRequestCompletionHandler ]))
1019+ }
1020+
1021+ @ Test
1022+ def testInflightTopicsAreNotRetriedConcurrently (): Unit = {
1023+ autoTopicCreationManager = new DefaultAutoTopicCreationManager (
1024+ config,
1025+ brokerToController,
1026+ groupCoordinator,
1027+ transactionCoordinator,
1028+ shareCoordinator,
1029+ mockTime,
1030+ topicErrorCacheCapacity = testCacheCapacity)
1031+
1032+ val topics = Map (
1033+ " test-topic" -> new CreatableTopic ().setName(" test-topic" ).setNumPartitions(1 ).setReplicationFactor(1 )
1034+ )
1035+ val requestContext = initializeRequestContextWithUserPrincipal()
1036+ val timeoutMs = 5000L
1037+
1038+ // First call - should send request and mark topic as in-flight
1039+ autoTopicCreationManager.createStreamsInternalTopics(topics, requestContext, timeoutMs)
1040+
1041+ Mockito .verify(brokerToController, Mockito .times(1 )).sendRequest(
1042+ any(classOf [AbstractRequest .Builder [_ <: AbstractRequest ]]),
1043+ any(classOf [ControllerRequestCompletionHandler ]))
1044+
1045+ // Second concurrent call - should NOT send request because topic is in-flight
1046+ autoTopicCreationManager.createStreamsInternalTopics(topics, requestContext, timeoutMs)
1047+
1048+ // Verify still only one request was sent (concurrent request blocked)
1049+ Mockito .verify(brokerToController, Mockito .times(1 )).sendRequest(
1050+ any(classOf [AbstractRequest .Builder [_ <: AbstractRequest ]]),
1051+ any(classOf [ControllerRequestCompletionHandler ]))
1052+ }
1053+
1054+ @ Test
1055+ def testBackoffAndInflightInteraction (): Unit = {
1056+ autoTopicCreationManager = new DefaultAutoTopicCreationManager (
1057+ config,
1058+ brokerToController,
1059+ groupCoordinator,
1060+ transactionCoordinator,
1061+ shareCoordinator,
1062+ mockTime,
1063+ topicErrorCacheCapacity = testCacheCapacity)
1064+
1065+ val topics = Map (
1066+ " backoff-topic" -> new CreatableTopic ().setName(" backoff-topic" ).setNumPartitions(1 ).setReplicationFactor(1 ),
1067+ " inflight-topic" -> new CreatableTopic ().setName(" inflight-topic" ).setNumPartitions(1 ).setReplicationFactor(1 ),
1068+ " normal-topic" -> new CreatableTopic ().setName(" normal-topic" ).setNumPartitions(1 ).setReplicationFactor(1 )
1069+ )
1070+ val requestContext = initializeRequestContextWithUserPrincipal()
1071+ val timeoutMs = 5000L
1072+
1073+ // Create error for backoff-topic
1074+ val backoffOnly = Map (" backoff-topic" -> topics(" backoff-topic" ))
1075+ autoTopicCreationManager.createStreamsInternalTopics(backoffOnly, requestContext, timeoutMs)
1076+
1077+ val argumentCaptor1 = ArgumentCaptor .forClass(classOf [ControllerRequestCompletionHandler ])
1078+ Mockito .verify(brokerToController, Mockito .times(1 )).sendRequest(
1079+ any(classOf [AbstractRequest .Builder [_ <: AbstractRequest ]]),
1080+ argumentCaptor1.capture())
1081+
1082+ // Simulate error response for backoff-topic
1083+ val createTopicsResponseData = new org.apache.kafka.common.message.CreateTopicsResponseData ()
1084+ val topicResult = new org.apache.kafka.common.message.CreateTopicsResponseData .CreatableTopicResult ()
1085+ .setName(" backoff-topic" )
1086+ .setErrorCode(Errors .INVALID_REPLICATION_FACTOR .code())
1087+ .setErrorMessage(" Invalid replication factor" )
1088+ createTopicsResponseData.topics().add(topicResult)
1089+
1090+ val createTopicsResponse = new CreateTopicsResponse (createTopicsResponseData)
1091+ val header = new RequestHeader (ApiKeys .CREATE_TOPICS , 0 , " client" , 1 )
1092+ val clientResponse = new ClientResponse (header, null , null ,
1093+ 0 , 0 , false , null , null , createTopicsResponse)
1094+
1095+ argumentCaptor1.getValue.onComplete(clientResponse)
1096+
1097+ // Make inflight-topic in-flight (without completing the request)
1098+ val inflightOnly = Map (" inflight-topic" -> topics(" inflight-topic" ))
1099+ autoTopicCreationManager.createStreamsInternalTopics(inflightOnly, requestContext, timeoutMs)
1100+
1101+ Mockito .verify(brokerToController, Mockito .times(2 )).sendRequest(
1102+ any(classOf [AbstractRequest .Builder [_ <: AbstractRequest ]]),
1103+ any(classOf [ControllerRequestCompletionHandler ]))
1104+
1105+ // Now attempt to create all three topics together
1106+ autoTopicCreationManager.createStreamsInternalTopics(topics, requestContext, timeoutMs)
1107+
1108+ val argumentCaptor2 = ArgumentCaptor .forClass(classOf [AbstractRequest .Builder [_ <: AbstractRequest ]])
1109+ // Total 3 requests: 1 for backoff-topic, 1 for inflight-topic, 1 for normal-topic only
1110+ Mockito .verify(brokerToController, Mockito .times(3 )).sendRequest(
1111+ argumentCaptor2.capture(),
1112+ any(classOf [ControllerRequestCompletionHandler ]))
1113+
1114+ // Verify that only normal-topic was included in the last request
1115+ val lastRequest = argumentCaptor2.getValue.asInstanceOf [EnvelopeRequest .Builder ]
1116+ .build(ApiKeys .ENVELOPE .latestVersion())
1117+ val forwardedRequestBuffer = lastRequest.requestData().duplicate()
1118+ val requestHeader = RequestHeader .parse(forwardedRequestBuffer)
1119+ val parsedRequest = CreateTopicsRequest .parse(new org.apache.kafka.common.protocol.ByteBufferAccessor (forwardedRequestBuffer),
1120+ requestHeader.apiVersion())
1121+
1122+ val topicNames = parsedRequest.data().topics().asScala.map(_.name()).toSet
1123+ assertEquals(1 , topicNames.size, " Only normal-topic should be created" )
1124+ assertTrue(topicNames.contains(" normal-topic" ), " normal-topic should be in the request" )
1125+ assertTrue(! topicNames.contains(" backoff-topic" ), " backoff-topic should be filtered (in back-off)" )
1126+ assertTrue(! topicNames.contains(" inflight-topic" ), " inflight-topic should be filtered (in-flight)" )
1127+ }
9061128}
0 commit comments