@@ -453,7 +453,7 @@ def _send_join_group_request(self):
453453 (protocol , metadata if isinstance (metadata , bytes ) else metadata .encode ())
454454 for protocol , metadata in self .group_protocols ()
455455 ]
456- version = self ._client .api_version (JoinGroupRequest , max_version = 2 )
456+ version = self ._client .api_version (JoinGroupRequest , max_version = 3 )
457457 if version == 0 :
458458 request = JoinGroupRequest [version ](
459459 self .group_id ,
@@ -493,6 +493,11 @@ def _failed_request(self, node_id, request, future, error):
493493 future .failure (error )
494494
495495 def _handle_join_group_response (self , future , send_time , response ):
496+ if response .API_VERSION >= 2 :
497+ self .sensors .throttle_time .record (response .throttle_time_ms )
498+ if response .throttle_time_ms > 0 :
499+ log .warning ("JoinGroupRequest throttled by broker (%d ms)" , response .throttle_time_ms )
500+
496501 error_type = Errors .for_code (response .error_code )
497502 if error_type is Errors .NoError :
498503 log .debug ("Received successful JoinGroup response for group %s: %s" ,
@@ -554,7 +559,7 @@ def _handle_join_group_response(self, future, send_time, response):
554559
555560 def _on_join_follower (self ):
556561 # send follower's sync group with an empty assignment
557- version = self ._client .api_version (SyncGroupRequest , max_version = 1 )
562+ version = self ._client .api_version (SyncGroupRequest , max_version = 2 )
558563 request = SyncGroupRequest [version ](
559564 self .group_id ,
560565 self ._generation .generation_id ,
@@ -582,7 +587,7 @@ def _on_join_leader(self, response):
582587 except Exception as e :
583588 return Future ().failure (e )
584589
585- version = self ._client .api_version (SyncGroupRequest , max_version = 1 )
590+ version = self ._client .api_version (SyncGroupRequest , max_version = 2 )
586591 request = SyncGroupRequest [version ](
587592 self .group_id ,
588593 self ._generation .generation_id ,
@@ -614,6 +619,11 @@ def _send_sync_group_request(self, request):
614619 return future
615620
616621 def _handle_sync_group_response (self , future , send_time , response ):
622+ if response .API_VERSION >= 1 :
623+ self .sensors .throttle_time .record (response .throttle_time_ms )
624+ if response .throttle_time_ms > 0 :
625+ log .warning ("SyncGroupRequest throttled by broker (%d ms)" , response .throttle_time_ms )
626+
617627 error_type = Errors .for_code (response .error_code )
618628 if error_type is Errors .NoError :
619629 self .sensors .sync_latency .record ((time .time () - send_time ) * 1000 )
@@ -770,7 +780,7 @@ def maybe_leave_group(self):
770780 # this is a minimal effort attempt to leave the group. we do not
771781 # attempt any resending if the request fails or times out.
772782 log .info ('Leaving consumer group (%s).' , self .group_id )
773- version = self ._client .api_version (LeaveGroupRequest , max_version = 1 )
783+ version = self ._client .api_version (LeaveGroupRequest , max_version = 2 )
774784 request = LeaveGroupRequest [version ](self .group_id , self ._generation .member_id )
775785 future = self ._client .send (self .coordinator_id , request )
776786 future .add_callback (self ._handle_leave_group_response )
@@ -780,6 +790,11 @@ def maybe_leave_group(self):
780790 self .reset_generation ()
781791
782792 def _handle_leave_group_response (self , response ):
793+ if response .API_VERSION >= 1 :
794+ self .sensors .throttle_time .record (response .throttle_time_ms )
795+ if response .throttle_time_ms > 0 :
796+ log .warning ("LeaveGroupRequest throttled by broker (%d ms)" , response .throttle_time_ms )
797+
783798 error_type = Errors .for_code (response .error_code )
784799 if error_type is Errors .NoError :
785800 log .debug ("LeaveGroup request for group %s returned successfully" ,
@@ -798,7 +813,7 @@ def _send_heartbeat_request(self):
798813 e = Errors .NodeNotReadyError (self .coordinator_id )
799814 return Future ().failure (e )
800815
801- version = self ._client .api_version (HeartbeatRequest , max_version = 1 )
816+ version = self ._client .api_version (HeartbeatRequest , max_version = 2 )
802817 request = HeartbeatRequest [version ](self .group_id ,
803818 self ._generation .generation_id ,
804819 self ._generation .member_id )
@@ -811,6 +826,11 @@ def _send_heartbeat_request(self):
811826 return future
812827
813828 def _handle_heartbeat_response (self , future , send_time , response ):
829+ if response .API_VERSION >= 1 :
830+ self .sensors .throttle_time .record (response .throttle_time_ms )
831+ if response .throttle_time_ms > 0 :
832+ log .warning ("HeartbeatRequest throttled by broker (%d ms)" , response .throttle_time_ms )
833+
814834 self .sensors .heartbeat_latency .record ((time .time () - send_time ) * 1000 )
815835 error_type = Errors .for_code (response .error_code )
816836 if error_type is Errors .NoError :
@@ -899,6 +919,14 @@ def __init__(self, heartbeat, metrics, prefix, tags=None):
899919 tags ), AnonMeasurable (
900920 lambda _ , now : (now / 1000 ) - self .heartbeat .last_send ))
901921
922+ self .throttle_time = metrics .sensor ('throttle-time' )
923+ self .throttle_time .add (metrics .metric_name (
924+ 'throttle-time-avg' , self .metric_group_name ,
925+ 'The average throttle time in ms' ), Avg ())
926+ self .throttle_time .add (metrics .metric_name (
927+ 'throttle-time-max' , self .metric_group_name ,
928+ 'The maximum throttle time in ms' ), Max ())
929+
902930
903931class HeartbeatThread (threading .Thread ):
904932 def __init__ (self , coordinator ):
0 commit comments