2525import org .apache .kafka .common .Node ;
2626import org .apache .kafka .common .TopicPartition ;
2727import org .apache .kafka .common .Uuid ;
28+ import org .apache .kafka .common .errors .GroupAuthorizationException ;
29+ import org .apache .kafka .common .errors .InvalidCommitOffsetSizeException ;
30+ import org .apache .kafka .common .errors .OffsetMetadataTooLarge ;
2831import org .apache .kafka .common .errors .RetriableException ;
32+ import org .apache .kafka .common .errors .StaleMemberEpochException ;
2933import org .apache .kafka .common .errors .TimeoutException ;
34+ import org .apache .kafka .common .errors .TopicAuthorizationException ;
35+ import org .apache .kafka .common .errors .UnknownMemberIdException ;
3036import org .apache .kafka .common .message .OffsetCommitRequestData ;
3137import org .apache .kafka .common .message .OffsetCommitResponseData ;
3238import org .apache .kafka .common .message .OffsetFetchRequestData ;
@@ -778,7 +784,8 @@ public void testOffsetFetchRequestErroredRequests(final Errors error) {
778784
779785 @ ParameterizedTest
780786 @ MethodSource ("offsetFetchExceptionSupplier" )
781- public void testOffsetFetchRequestTimeoutRequests (final Errors error ) {
787+ public void testOffsetFetchRequestTimeoutRequests (final Errors error ,
788+ final Class <? extends Exception > expectedExceptionClass ) {
782789 CommitRequestManager commitRequestManager = create (true , 100 );
783790 when (coordinatorRequestManager .coordinator ()).thenReturn (Optional .of (mockedNode ));
784791
@@ -799,10 +806,10 @@ public void testOffsetFetchRequestTimeoutRequests(final Errors error) {
799806 assertFalse (commitRequestManager .pendingRequests .unsentOffsetFetches .isEmpty ());
800807 NetworkClientDelegate .PollResult poll = commitRequestManager .poll (time .milliseconds ());
801808 mimicResponse (error , poll );
802- futures .forEach (f -> assertFutureThrows (f , TimeoutException . class ));
809+ futures .forEach (f -> assertFutureThrows (f , expectedExceptionClass ));
803810 assertTrue (commitRequestManager .pendingRequests .unsentOffsetFetches .isEmpty ());
804811 } else {
805- futures .forEach (f -> assertFutureThrows (f , KafkaException . class ));
812+ futures .forEach (f -> assertFutureThrows (f , expectedExceptionClass ));
806813 assertEmptyPendingRequests (commitRequestManager );
807814 }
808815 }
@@ -965,7 +972,9 @@ public void testOffsetCommitSyncTimeoutNotReturnedOnPollAndFails() {
965972 */
966973 @ ParameterizedTest
967974 @ MethodSource ("offsetCommitExceptionSupplier" )
968- public void testOffsetCommitSyncFailedWithRetriableThrowsTimeoutWhenRetryTimeExpires (final Errors error ) {
975+ public void testOffsetCommitSyncFailedWithRetriableThrowsTimeoutWhenRetryTimeExpires (
976+ final Errors error ,
977+ final Class <? extends Exception > expectedExceptionClass ) {
969978 CommitRequestManager commitRequestManager = create (false , 100 );
970979 when (coordinatorRequestManager .coordinator ()).thenReturn (Optional .of (mockedNode ));
971980
@@ -985,10 +994,7 @@ public void testOffsetCommitSyncFailedWithRetriableThrowsTimeoutWhenRetryTimeExp
985994 assertEquals (0 , res .unsentRequests .size ());
986995 assertTrue (commitResult .isDone ());
987996
988- if (error .exception () instanceof RetriableException )
989- assertFutureThrows (commitResult , TimeoutException .class );
990- else
991- assertFutureThrows (commitResult , KafkaException .class );
997+ assertFutureThrows (commitResult , expectedExceptionClass );
992998 }
993999
9941000 /**
@@ -1383,40 +1389,51 @@ private void testNonRetriable(final List<CompletableFuture<Map<TopicPartition, O
13831389 */
13841390 private static Stream <Arguments > offsetCommitExceptionSupplier () {
13851391 return Stream .of (
1386- Arguments .of (Errors .NOT_COORDINATOR ),
1387- Arguments .of (Errors .COORDINATOR_LOAD_IN_PROGRESS ),
1388- Arguments .of (Errors .UNKNOWN_SERVER_ERROR ),
1389- Arguments .of (Errors .GROUP_AUTHORIZATION_FAILED ),
1390- Arguments .of (Errors .OFFSET_METADATA_TOO_LARGE ),
1391- Arguments .of (Errors .INVALID_COMMIT_OFFSET_SIZE ),
1392- Arguments .of (Errors .UNKNOWN_TOPIC_OR_PARTITION ),
1393- Arguments .of (Errors .COORDINATOR_NOT_AVAILABLE ),
1394- Arguments .of (Errors .REQUEST_TIMED_OUT ),
1395- Arguments .of (Errors .TOPIC_AUTHORIZATION_FAILED ),
1396- Arguments .of (Errors .STALE_MEMBER_EPOCH ),
1397- Arguments .of (Errors .UNKNOWN_MEMBER_ID ));
1392+ // Retriable errors should result in TimeoutException when retry time expires
1393+ Arguments .of (Errors .NOT_COORDINATOR , TimeoutException .class ),
1394+ Arguments .of (Errors .COORDINATOR_LOAD_IN_PROGRESS , TimeoutException .class ),
1395+ Arguments .of (Errors .COORDINATOR_NOT_AVAILABLE , TimeoutException .class ),
1396+ Arguments .of (Errors .REQUEST_TIMED_OUT , TimeoutException .class ),
1397+ Arguments .of (Errors .UNKNOWN_TOPIC_OR_PARTITION , TimeoutException .class ),
1398+
1399+ // Non-retriable errors should result in their specific exceptions
1400+ Arguments .of (Errors .GROUP_AUTHORIZATION_FAILED , GroupAuthorizationException .class ),
1401+ Arguments .of (Errors .OFFSET_METADATA_TOO_LARGE , OffsetMetadataTooLarge .class ),
1402+ Arguments .of (Errors .INVALID_COMMIT_OFFSET_SIZE , InvalidCommitOffsetSizeException .class ),
1403+ Arguments .of (Errors .TOPIC_AUTHORIZATION_FAILED , TopicAuthorizationException .class ),
1404+ Arguments .of (Errors .UNKNOWN_MEMBER_ID , CommitFailedException .class ),
1405+ Arguments .of (Errors .STALE_MEMBER_EPOCH , CommitFailedException .class ),
1406+
1407+ // Generic errors should result in KafkaException
1408+ Arguments .of (Errors .UNKNOWN_SERVER_ERROR , KafkaException .class ));
13981409 }
13991410
14001411 /**
14011412 * @return {@link Errors} that could be received in {@link ApiKeys#OFFSET_FETCH} responses.
14021413 */
14031414 private static Stream <Arguments > offsetFetchExceptionSupplier () {
14041415 return Stream .of (
1405- Arguments .of (Errors .NOT_COORDINATOR ),
1406- Arguments .of (Errors .COORDINATOR_LOAD_IN_PROGRESS ),
1407- Arguments .of (Errors .UNKNOWN_SERVER_ERROR ),
1408- Arguments .of (Errors .GROUP_AUTHORIZATION_FAILED ),
1409- Arguments .of (Errors .OFFSET_METADATA_TOO_LARGE ),
1410- Arguments .of (Errors .INVALID_COMMIT_OFFSET_SIZE ),
1411- Arguments .of (Errors .UNKNOWN_TOPIC_OR_PARTITION ),
1412- Arguments .of (Errors .COORDINATOR_NOT_AVAILABLE ),
1413- Arguments .of (Errors .REQUEST_TIMED_OUT ),
1414- Arguments .of (Errors .TOPIC_AUTHORIZATION_FAILED ),
1415- Arguments .of (Errors .UNKNOWN_MEMBER_ID ),
1416+ // Retriable errors should result in TimeoutException when retry time expires
1417+ Arguments .of (Errors .NOT_COORDINATOR , TimeoutException .class ),
1418+ Arguments .of (Errors .COORDINATOR_LOAD_IN_PROGRESS , TimeoutException .class ),
1419+ Arguments .of (Errors .COORDINATOR_NOT_AVAILABLE , TimeoutException .class ),
1420+ Arguments .of (Errors .REQUEST_TIMED_OUT , TimeoutException .class ),
1421+ Arguments .of (Errors .UNSTABLE_OFFSET_COMMIT , TimeoutException .class ),
1422+ Arguments .of (Errors .UNKNOWN_TOPIC_OR_PARTITION , TimeoutException .class ),
1423+
1424+ // Non-retriable errors should result in their specific exceptions
1425+ Arguments .of (Errors .GROUP_AUTHORIZATION_FAILED , GroupAuthorizationException .class ),
1426+ Arguments .of (Errors .OFFSET_METADATA_TOO_LARGE , KafkaException .class ),
1427+ Arguments .of (Errors .INVALID_COMMIT_OFFSET_SIZE , KafkaException .class ),
1428+
1429+ Arguments .of (Errors .TOPIC_AUTHORIZATION_FAILED , KafkaException .class ),
1430+ Arguments .of (Errors .UNKNOWN_MEMBER_ID , UnknownMemberIdException .class ),
14161431 // Adding STALE_MEMBER_EPOCH as non-retriable here because it is only retried if a new
14171432 // member epoch is received. Tested separately.
1418- Arguments .of (Errors .STALE_MEMBER_EPOCH ),
1419- Arguments .of (Errors .UNSTABLE_OFFSET_COMMIT ));
1433+ Arguments .of (Errors .STALE_MEMBER_EPOCH , StaleMemberEpochException .class ),
1434+
1435+ // Generic errors should result in KafkaException
1436+ Arguments .of (Errors .UNKNOWN_SERVER_ERROR , KafkaException .class ));
14201437 }
14211438
14221439 /**
0 commit comments