@@ -152,12 +152,11 @@ def api_version(self):
152152 def hosts (self ):
153153 return collect_hosts (self ._bootstrap_servers )
154154
155- @asyncio .coroutine
156- def close (self ):
155+ async def close (self ):
157156 if self ._sync_task :
158157 self ._sync_task .cancel ()
159158 try :
160- yield from self ._sync_task
159+ await self ._sync_task
161160 except asyncio .CancelledError :
162161 pass
163162 self ._sync_task = None
@@ -167,10 +166,9 @@ def close(self):
167166 for conn in self ._conns .values ():
168167 futs .append (conn .close (reason = CloseReason .SHUTDOWN ))
169168 if futs :
170- yield from asyncio .gather (* futs , loop = self ._loop )
169+ await asyncio .gather (* futs , loop = self ._loop )
171170
172- @asyncio .coroutine
173- def bootstrap (self ):
171+ async def bootstrap (self ):
174172 """Try to to bootstrap initial cluster metadata"""
175173 # using request v0 for bootstap if not sure v1 is available
176174 if self ._api_version == "auto" or self ._api_version < (0 , 10 ):
@@ -186,7 +184,7 @@ def bootstrap(self):
186184 log .debug ("Attempting to bootstrap via node at %s:%s" , host , port )
187185
188186 try :
189- bootstrap_conn = yield from create_conn (
187+ bootstrap_conn = await create_conn (
190188 host , port , loop = self ._loop , client_id = self ._client_id ,
191189 request_timeout_ms = self ._request_timeout_ms ,
192190 ssl_context = self ._ssl_context ,
@@ -203,7 +201,7 @@ def bootstrap(self):
203201 continue
204202
205203 try :
206- metadata = yield from bootstrap_conn .send (metadata_request )
204+ metadata = await bootstrap_conn .send (metadata_request )
207205 except KafkaError as err :
208206 log .warning ('Unable to request metadata from "%s:%s": %s' ,
209207 host , port , err )
@@ -229,27 +227,26 @@ def bootstrap(self):
229227
230228 # detect api version if need
231229 if self ._api_version == 'auto' :
232- self ._api_version = yield from self .check_version ()
230+ self ._api_version = await self .check_version ()
233231
234232 if self ._sync_task is None :
235233 # starting metadata synchronizer task
236234 self ._sync_task = ensure_future (
237235 self ._md_synchronizer (), loop = self ._loop )
238236
239- @asyncio .coroutine
240- def _md_synchronizer (self ):
237+ async def _md_synchronizer (self ):
241238 """routine (async task) for synchronize cluster metadata every
242239 `metadata_max_age_ms` milliseconds"""
243240 while True :
244- yield from asyncio .wait (
241+ await asyncio .wait (
245242 [self ._md_update_waiter ],
246243 timeout = self ._metadata_max_age_ms / 1000 ,
247244 loop = self ._loop )
248245
249246 topics = self ._topics
250247 if self ._md_update_fut is None :
251248 self ._md_update_fut = create_future (loop = self ._loop )
252- ret = yield from self ._metadata_update (self .cluster , topics )
249+ ret = await self ._metadata_update (self .cluster , topics )
253250 # If list of topics changed during metadata update we must update
254251 # it again right away.
255252 if topics != self ._topics :
@@ -273,8 +270,7 @@ def get_random_node(self):
273270 return None
274271 return random .choice (nodeids )
275272
276- @asyncio .coroutine
277- def _metadata_update (self , cluster_metadata , topics ):
273+ async def _metadata_update (self , cluster_metadata , topics ):
278274 assert isinstance (cluster_metadata , ClusterMetadata )
279275 topics = list (topics )
280276 version_id = 0 if self .api_version < (0 , 10 ) else 1
@@ -287,15 +283,15 @@ def _metadata_update(self, cluster_metadata, topics):
287283 nodeids .append ('bootstrap' )
288284 random .shuffle (nodeids )
289285 for node_id in nodeids :
290- conn = yield from self ._get_conn (node_id )
286+ conn = await self ._get_conn (node_id )
291287
292288 if conn is None :
293289 continue
294290 log .debug ("Sending metadata request %s to node %s" ,
295291 metadata_request , node_id )
296292
297293 try :
298- metadata = yield from conn .send (metadata_request )
294+ metadata = await conn .send (metadata_request )
299295 except KafkaError as err :
300296 log .error (
301297 'Unable to request metadata from node with id %s: %s' ,
@@ -337,11 +333,10 @@ def force_metadata_update(self):
337333 # Metadata will be updated in the background by syncronizer
338334 return asyncio .shield (self ._md_update_fut , loop = self ._loop )
339335
340- @asyncio .coroutine
341- def fetch_all_metadata (self ):
336+ async def fetch_all_metadata (self ):
342337 cluster_md = ClusterMetadata (
343338 metadata_max_age_ms = self ._metadata_max_age_ms )
344- updated = yield from self ._metadata_update (cluster_md , [])
339+ updated = await self ._metadata_update (cluster_md , [])
345340 if not updated :
346341 raise KafkaError (
347342 'Unable to get cluster metadata over all known brokers' )
@@ -385,9 +380,10 @@ def _on_connection_closed(self, conn, reason):
385380 reason == CloseReason .CONNECTION_TIMEOUT :
386381 self .force_metadata_update ()
387382
388- @asyncio .coroutine
389- def _get_conn (self , node_id , * , group = ConnectionGroup .DEFAULT ,
390- no_hint = False ):
383+ async def _get_conn (
384+ self , node_id , * , group = ConnectionGroup .DEFAULT ,
385+ no_hint = False
386+ ):
391387 "Get or create a connection to a broker using host and port"
392388 conn_id = (node_id , group )
393389 if conn_id in self ._conns :
@@ -414,15 +410,15 @@ def _get_conn(self, node_id, *, group=ConnectionGroup.DEFAULT,
414410 log .debug ("Initiating connection to node %s at %s:%s" ,
415411 node_id , broker .host , broker .port )
416412
417- with ( yield from self ._get_conn_lock ) :
413+ async with self ._get_conn_lock :
418414 if conn_id in self ._conns :
419415 return self ._conns [conn_id ]
420416
421417 version_hint = self ._api_version
422418 if version_hint == "auto" or no_hint :
423419 version_hint = None
424420
425- self ._conns [conn_id ] = yield from create_conn (
421+ self ._conns [conn_id ] = await create_conn (
426422 broker .host , broker .port , loop = self ._loop ,
427423 client_id = self ._client_id ,
428424 request_timeout_ms = self ._request_timeout_ms ,
@@ -447,15 +443,13 @@ def _get_conn(self, node_id, *, group=ConnectionGroup.DEFAULT,
447443 else :
448444 return self ._conns [conn_id ]
449445
450- @asyncio .coroutine
451- def ready (self , node_id , * , group = ConnectionGroup .DEFAULT ):
452- conn = yield from self ._get_conn (node_id , group = group )
446+ async def ready (self , node_id , * , group = ConnectionGroup .DEFAULT ):
447+ conn = await self ._get_conn (node_id , group = group )
453448 if conn is None :
454449 return False
455450 return True
456451
457- @asyncio .coroutine
458- def send (self , node_id , request , * , group = ConnectionGroup .DEFAULT ):
452+ async def send (self , node_id , request , * , group = ConnectionGroup .DEFAULT ):
459453 """Send a request to a specific node.
460454
461455 Arguments:
@@ -471,7 +465,7 @@ def send(self, node_id, request, *, group=ConnectionGroup.DEFAULT):
471465 Returns:
472466 Future: resolves to Response struct
473467 """
474- if not (yield from self .ready (node_id , group = group )):
468+ if not (await self .ready (node_id , group = group )):
475469 raise NodeNotReadyError (
476470 "Attempt to send a request to node"
477471 " which is not ready (node id {})." .format (node_id ))
@@ -485,7 +479,7 @@ def send(self, node_id, request, *, group=ConnectionGroup.DEFAULT):
485479 future = self ._conns [(node_id , group )].send (
486480 request , expect_response = expect_response )
487481 try :
488- result = yield from future
482+ result = await future
489483 except asyncio .TimeoutError :
490484 # close connection so it is renewed in next request
491485 self ._conns [(node_id , group )].close (
@@ -494,8 +488,7 @@ def send(self, node_id, request, *, group=ConnectionGroup.DEFAULT):
494488 else :
495489 return result
496490
497- @asyncio .coroutine
498- def check_version (self , node_id = None ):
491+ async def check_version (self , node_id = None ):
499492 """Attempt to guess the broker version"""
500493 if node_id is None :
501494 default_group_conns = [
@@ -526,26 +519,26 @@ def check_version(self, node_id=None):
526519 # vanilla MetadataRequest. If the server did not recognize the first
527520 # request, both will be failed with a ConnectionError that wraps
528521 # socket.error (32, 54, or 104)
529- conn = yield from self ._get_conn (node_id , no_hint = True )
522+ conn = await self ._get_conn (node_id , no_hint = True )
530523 if conn is None :
531524 raise ConnectionError (
532525 "No connection to node with id {}" .format (node_id ))
533526 for version , request in test_cases :
534527 try :
535528 if not conn .connected ():
536- yield from conn .connect ()
529+ await conn .connect ()
537530 assert conn , 'no connection to node with id {}' .format (node_id )
538531 # request can be ignored by Kafka broker,
539532 # so we send metadata request and wait response
540533 task = self ._loop .create_task (conn .send (request ))
541- yield from asyncio .wait ([task ], timeout = 0.1 , loop = self ._loop )
534+ await asyncio .wait ([task ], timeout = 0.1 , loop = self ._loop )
542535 try :
543- yield from conn .send (MetadataRequest_v0 ([]))
536+ await conn .send (MetadataRequest_v0 ([]))
544537 except KafkaError :
545538 # metadata request can be cancelled in case
546539 # of invalid correlationIds order
547540 pass
548- response = yield from task
541+ response = await task
549542 except KafkaError :
550543 continue
551544 else :
@@ -588,8 +581,7 @@ def _check_api_version_response(self, response):
588581 # so if all else fails, choose that
589582 return (0 , 10 , 0 )
590583
591- @asyncio .coroutine
592- def _wait_on_metadata (self , topic ):
584+ async def _wait_on_metadata (self , topic ):
593585 """
594586 Wait for cluster metadata including partitions for the given topic to
595587 be available.
@@ -612,25 +604,23 @@ def _wait_on_metadata(self, topic):
612604
613605 t0 = self ._loop .time ()
614606 while True :
615- yield from self .force_metadata_update ()
607+ await self .force_metadata_update ()
616608 if topic in self .cluster .topics ():
617609 break
618610 if (self ._loop .time () - t0 ) > (self ._request_timeout_ms / 1000 ):
619611 raise UnknownTopicOrPartitionError ()
620612 if topic in self .cluster .unauthorized_topics :
621613 raise Errors .TopicAuthorizationFailedError (topic )
622- yield from asyncio .sleep (self ._retry_backoff , loop = self ._loop )
614+ await asyncio .sleep (self ._retry_backoff , loop = self ._loop )
623615
624616 return self .cluster .partitions_for_topic (topic )
625617
626- @asyncio .coroutine
627- def _maybe_wait_metadata (self ):
618+ async def _maybe_wait_metadata (self ):
628619 if self ._md_update_fut is not None :
629- yield from asyncio .shield (
620+ await asyncio .shield (
630621 self ._md_update_fut , loop = self ._loop )
631622
632- @asyncio .coroutine
633- def coordinator_lookup (self , coordinator_type , coordinator_key ):
623+ async def coordinator_lookup (self , coordinator_type , coordinator_key ):
634624 """ Lookup which node in the cluster is the coordinator for a certain
635625 role (Transaction coordinator or Group coordinator atm.)
636626 NOTE: Client keeps track of all coordination nodes separately, as they
@@ -653,7 +643,7 @@ def coordinator_lookup(self, coordinator_type, coordinator_key):
653643 "No transactions for older brokers"
654644 request = FindCoordinatorRequest [0 ](coordinator_key )
655645
656- resp = yield from self .send (node_id , request )
646+ resp = await self .send (node_id , request )
657647 log .debug ("Received group coordinator response %s" , resp )
658648 error_type = Errors .for_code (resp .error_code )
659649 if error_type is not Errors .NoError :
0 commit comments