11import asyncio
2+ import contextlib
23import logging
34import random
45import time
@@ -188,16 +189,14 @@ def hosts(self):
188189 async def close (self ):
189190 if self ._sync_task :
190191 self ._sync_task .cancel ()
191- try :
192+ with contextlib . suppress ( asyncio . CancelledError ) :
192193 await self ._sync_task
193- except asyncio .CancelledError :
194- pass
195194 self ._sync_task = None
196195 # Be careful to wait for graceful closure of all connections, so we
197196 # process all pending buffers.
198- futs = []
199- for conn in self ._conns .values ():
200- futs . append ( conn . close ( reason = CloseReason . SHUTDOWN ))
197+ futs = [
198+ conn . close ( reason = CloseReason . SHUTDOWN ) for conn in self ._conns .values ()
199+ ]
201200 if futs :
202201 await asyncio .gather (* futs )
203202
@@ -231,7 +230,7 @@ async def bootstrap(self):
231230 sasl_mechanism = self ._sasl_mechanism ,
232231 sasl_plain_username = self ._sasl_plain_username ,
233232 sasl_plain_password = self ._sasl_plain_password ,
234- sasl_kerberos_service_name = self ._sasl_kerberos_service_name , # noqa: E501
233+ sasl_kerberos_service_name = self ._sasl_kerberos_service_name ,
235234 sasl_kerberos_domain_name = self ._sasl_kerberos_domain_name ,
236235 sasl_oauth_token_provider = self ._sasl_oauth_token_provider ,
237236 version_hint = version_hint ,
@@ -370,7 +369,7 @@ def force_metadata_update(self):
370369 if not self ._md_update_waiter .done ():
371370 self ._md_update_waiter .set_result (None )
372371 self ._md_update_fut = self ._loop .create_future ()
373- # Metadata will be updated in the background by syncronizer
372+ # Metadata will be updated in the background by synchronizer
374373 return asyncio .shield (self ._md_update_fut )
375374
376375 async def fetch_all_metadata (self ):
@@ -413,10 +412,7 @@ def _on_connection_closed(self, conn, reason):
413412 """Callback called when connection is closed"""
414413 # Connection failures imply that our metadata is stale, so let's
415414 # refresh
416- if (
417- reason == CloseReason .CONNECTION_BROKEN
418- or reason == CloseReason .CONNECTION_TIMEOUT
419- ):
415+ if reason in [CloseReason .CONNECTION_BROKEN , CloseReason .CONNECTION_TIMEOUT ]:
420416 self .force_metadata_update ()
421417
422418 async def _get_conn (self , node_id , * , group = ConnectionGroup .DEFAULT , no_hint = False ):
@@ -471,7 +467,7 @@ async def _get_conn(self, node_id, *, group=ConnectionGroup.DEFAULT, no_hint=Fal
471467 sasl_mechanism = self ._sasl_mechanism ,
472468 sasl_plain_username = self ._sasl_plain_username ,
473469 sasl_plain_password = self ._sasl_plain_password ,
474- sasl_kerberos_service_name = self ._sasl_kerberos_service_name , # noqa: E501
470+ sasl_kerberos_service_name = self ._sasl_kerberos_service_name ,
475471 sasl_kerberos_domain_name = self ._sasl_kerberos_domain_name ,
476472 sasl_oauth_token_provider = self ._sasl_oauth_token_provider ,
477473 version_hint = version_hint ,
@@ -511,7 +507,7 @@ async def send(self, node_id, request, *, group=ConnectionGroup.DEFAULT):
511507 if not (await self .ready (node_id , group = group )):
512508 raise NodeNotReadyError (
513509 "Attempt to send a request to node"
514- " which is not ready (node id {})." . format ( node_id )
510+ f " which is not ready (node id { node_id } )."
515511 )
516512
517513 # Every request gets a response, except one special case:
@@ -524,10 +520,10 @@ async def send(self, node_id, request, *, group=ConnectionGroup.DEFAULT):
524520 )
525521 try :
526522 result = await future
527- except asyncio .TimeoutError :
523+ except asyncio .TimeoutError as exc :
528524 # close connection so it is renewed in next request
529525 self ._conns [(node_id , group )].close (reason = CloseReason .CONNECTION_TIMEOUT )
530- raise RequestTimedOutError ()
526+ raise RequestTimedOutError () from exc
531527 else :
532528 return result
533529
@@ -536,14 +532,14 @@ async def check_version(self, node_id=None):
536532 if node_id is None :
537533 default_group_conns = [
538534 n_id
539- for (n_id , group ) in self ._conns . keys ()
535+ for (n_id , group ) in self ._conns
540536 if group == ConnectionGroup .DEFAULT
541537 ]
542538 if default_group_conns :
543539 node_id = default_group_conns [0 ]
544540 else :
545541 assert self .cluster .brokers (), "no brokers in metadata"
546- node_id = list ( self .cluster .brokers ())[ 0 ] .nodeId
542+ node_id = next ( iter ( self .cluster .brokers ())) .nodeId
547543
548544 from aiokafka .protocol .admin import ApiVersionRequest_v0 , ListGroupsRequest_v0
549545 from aiokafka .protocol .commit import (
@@ -577,14 +573,12 @@ async def check_version(self, node_id=None):
577573 # so we send metadata request and wait response
578574 task = create_task (conn .send (request ))
579575 await asyncio .wait ([task ], timeout = 0.1 )
580- try :
576+ # metadata request can be cancelled in case
577+ # of invalid correlationIds order
578+ with contextlib .suppress (KafkaError ):
581579 await conn .send (MetadataRequest_v0 ([]))
582- except KafkaError :
583- # metadata request can be cancelled in case
584- # of invalid correlationIds order
585- pass
586580 response = await task
587- except KafkaError :
581+ except KafkaError : # noqa: PERF203
588582 continue
589583 else :
590584 # To avoid having a connection in undefined state
@@ -593,7 +587,7 @@ async def check_version(self, node_id=None):
593587 if isinstance (request , ApiVersionRequest_v0 ):
594588 # Starting from 0.10 kafka broker we determine version
595589 # by looking at ApiVersionResponse
596- version = self ._check_api_version_response (response )
590+ return self ._check_api_version_response (response )
597591 return version
598592
599593 raise UnrecognizedBrokerVersion ()
0 commit comments