2323 ListGroupsRequest , DescribeGroupsRequest , DescribeAclsRequest , CreateAclsRequest , DeleteAclsRequest ,
2424 DeleteGroupsRequest , DescribeLogDirsRequest
2525)
26- from kafka .protocol .commit import GroupCoordinatorRequest , OffsetFetchRequest
26+ from kafka .protocol .commit import OffsetFetchRequest
27+ from kafka .protocol .find_coordinator import FindCoordinatorRequest
2728from kafka .protocol .metadata import MetadataRequest
2829from kafka .protocol .types import Array
2930from kafka .structs import TopicPartition , OffsetAndMetadata , MemberInformation , GroupInformation
@@ -285,17 +286,14 @@ def _find_coordinator_id_send_request(self, group_id):
285286 Returns:
286287 A message future
287288 """
288- # TODO add support for dynamically picking version of
289- # GroupCoordinatorRequest which was renamed to FindCoordinatorRequest.
290- # When I experimented with this, the coordinator value returned in
291- # GroupCoordinatorResponse_v1 didn't match the value returned by
292- # GroupCoordinatorResponse_v0 and I couldn't figure out why.
293- version = self ._client .api_version (GroupCoordinatorRequest , max_version = 0 )
289+ version = self ._client .api_version (FindCoordinatorRequest , max_version = 2 )
294290 if version <= 0 :
295- request = GroupCoordinatorRequest [version ](group_id )
291+ request = FindCoordinatorRequest [version ](group_id )
292+ elif version <= 2 :
293+ request = FindCoordinatorRequest [version ](group_id , 0 )
296294 else :
297295 raise NotImplementedError (
298- "Support for GroupCoordinatorRequest_v {} has not yet been added to KafkaAdminClient."
296+ "Support for FindCoordinatorRequest_v {} has not yet been added to KafkaAdminClient."
299297 .format (version ))
300298 return self ._send_request_to_node (self ._client .least_loaded_node (), request )
301299
@@ -308,18 +306,13 @@ def _find_coordinator_id_process_response(self, response):
308306 Returns:
309307 The node_id of the broker that is the coordinator.
310308 """
311- if response .API_VERSION <= 0 :
312- error_type = Errors .for_code (response .error_code )
313- if error_type is not Errors .NoError :
314- # Note: When error_type.retriable, Java will retry... see
315- # KafkaAdminClient's handleFindCoordinatorError method
316- raise error_type (
317- "FindCoordinatorRequest failed with response '{}'."
318- .format (response ))
319- else :
320- raise NotImplementedError (
321- "Support for FindCoordinatorRequest_v{} has not yet been added to KafkaAdminClient."
322- .format (response .API_VERSION ))
309+ error_type = Errors .for_code (response .error_code )
310+ if error_type is not Errors .NoError :
311+ # Note: When error_type.retriable, Java will retry... see
312+ # KafkaAdminClient's handleFindCoordinatorError method
313+ raise error_type (
314+ "FindCoordinatorRequest failed with response '{}'."
315+ .format (response ))
323316 return response .coordinator_id
324317
325318 def _find_coordinator_ids (self , group_ids ):
0 commit comments