@@ -3524,44 +3524,62 @@ void handleResponse(AbstractResponse abstractResponse) {
35243524 for (final Node node : allNodes ) {
35253525 final long nowList = time .milliseconds ();
35263526 runnable .call (new Call ("listGroups" , deadline , new ConstantNodeIdProvider (node .id ())) {
3527+
3528+ // If only regular consumer group types are required, we can try an earlier request version if
3529+ // UnsupportedVersionException is thrown
3530+ final boolean canTryEarlierRequestVersion = options .regularConsumerGroupTypes ();
3531+ boolean tryUsingEarlierRequestVersion = false ;
3532+
35273533 @ Override
35283534 ListGroupsRequest .Builder createRequest (int timeoutMs ) {
3529- List <String > groupTypes = options .types ()
3530- .stream ()
3531- .map (GroupType ::toString )
3532- .collect (Collectors .toList ());
3533- List <String > groupStates = options .groupStates ()
3534- .stream ()
3535- .map (GroupState ::toString )
3536- .collect (Collectors .toList ());
3537- return new ListGroupsRequest .Builder (new ListGroupsRequestData ()
3538- .setTypesFilter (groupTypes )
3539- .setStatesFilter (groupStates )
3540- );
3535+ if (tryUsingEarlierRequestVersion ) {
3536+ List <String > groupStates = options .groupStates ()
3537+ .stream ()
3538+ .map (GroupState ::toString )
3539+ .collect (Collectors .toList ());
3540+ return new ListGroupsRequest .Builder (new ListGroupsRequestData ()
3541+ .setStatesFilter (groupStates )
3542+ );
3543+ } else {
3544+ List <String > groupTypes = options .types ()
3545+ .stream ()
3546+ .map (GroupType ::toString )
3547+ .collect (Collectors .toList ());
3548+ List <String > groupStates = options .groupStates ()
3549+ .stream ()
3550+ .map (GroupState ::toString )
3551+ .collect (Collectors .toList ());
3552+ return new ListGroupsRequest .Builder (new ListGroupsRequestData ()
3553+ .setTypesFilter (groupTypes )
3554+ .setStatesFilter (groupStates )
3555+ );
3556+ }
35413557 }
35423558
35433559 private void maybeAddGroup (ListGroupsResponseData .ListedGroup group ) {
3544- final String groupId = group .groupId ();
3545- final Optional <GroupType > type ;
3546- if (group .groupType () == null || group .groupType ().isEmpty ()) {
3547- type = Optional .empty ();
3548- } else {
3549- type = Optional .of (GroupType .parse (group .groupType ()));
3550- }
3551- final String protocolType = group .protocolType ();
3552- final Optional <GroupState > groupState ;
3553- if (group .groupState () == null || group .groupState ().isEmpty ()) {
3554- groupState = Optional .empty ();
3555- } else {
3556- groupState = Optional .of (GroupState .parse (group .groupState ()));
3560+ String protocolType = group .protocolType ();
3561+ if (options .protocolTypes ().isEmpty () || options .protocolTypes ().contains (protocolType )) {
3562+ final String groupId = group .groupId ();
3563+ final Optional <GroupType > type ;
3564+ if (group .groupType () == null || group .groupType ().isEmpty ()) {
3565+ type = Optional .empty ();
3566+ } else {
3567+ type = Optional .of (GroupType .parse (group .groupType ()));
3568+ }
3569+ final Optional <GroupState > groupState ;
3570+ if (group .groupState () == null || group .groupState ().isEmpty ()) {
3571+ groupState = Optional .empty ();
3572+ } else {
3573+ groupState = Optional .of (GroupState .parse (group .groupState ()));
3574+ }
3575+ final GroupListing groupListing = new GroupListing (
3576+ groupId ,
3577+ type ,
3578+ protocolType ,
3579+ groupState
3580+ );
3581+ results .addListing (groupListing );
35573582 }
3558- final GroupListing groupListing = new GroupListing (
3559- groupId ,
3560- type ,
3561- protocolType ,
3562- groupState
3563- );
3564- results .addListing (groupListing );
35653583 }
35663584
35673585 @ Override
@@ -3582,6 +3600,23 @@ void handleResponse(AbstractResponse abstractResponse) {
35823600 }
35833601 }
35843602
3603+ @ Override
3604+ boolean handleUnsupportedVersionException (final UnsupportedVersionException exception ) {
3605+ // If we cannot try the earlier request version, give up
3606+ if (!canTryEarlierRequestVersion ) {
3607+ return false ;
3608+ }
3609+
3610+ // If have already tried the earlier request version, give up
3611+ if (tryUsingEarlierRequestVersion ) {
3612+ return false ;
3613+ }
3614+
3615+ // Have a try using the earlier request version
3616+ tryUsingEarlierRequestVersion = true ;
3617+ return true ;
3618+ }
3619+
35853620 @ Override
35863621 void handleFailure (Throwable throwable ) {
35873622 synchronized (results ) {
0 commit comments