11import asyncio
22import logging
33from collections import defaultdict
4+ from collections .abc import Sequence
45from ssl import SSLContext
5- from typing import Any , Dict , List , Optional , Sequence , Tuple , Type , Union
6+ from typing import Any , Optional , Union
67
78import async_timeout
89
@@ -88,7 +89,7 @@ def __init__(
8889 self ,
8990 * ,
9091 loop = None ,
91- bootstrap_servers : Union [str , List [str ]] = "localhost" ,
92+ bootstrap_servers : Union [str , list [str ]] = "localhost" ,
9293 client_id : str = "aiokafka-" + __version__ ,
9394 request_timeout_ms : int = 40000 ,
9495 connections_max_idle_ms : int = 540000 ,
@@ -159,7 +160,7 @@ async def start(self):
159160 log .debug ("AIOKafkaAdminClient started" )
160161 self ._started = True
161162
162- def _matching_api_version (self , operation : Sequence [Type [Request ]]) -> int :
163+ def _matching_api_version (self , operation : Sequence [type [Request ]]) -> int :
163164 """Find the latest version of the protocol operation
164165 supported by both this library and the broker.
165166
@@ -225,7 +226,7 @@ def _convert_new_topic_request(new_topic):
225226
226227 async def create_topics (
227228 self ,
228- new_topics : List [NewTopic ],
229+ new_topics : list [NewTopic ],
229230 timeout_ms : Optional [int ] = None ,
230231 validate_only : bool = False ,
231232 ) -> Response :
@@ -267,7 +268,7 @@ async def create_topics(
267268
268269 async def delete_topics (
269270 self ,
270- topics : List [str ],
271+ topics : list [str ],
271272 timeout_ms : Optional [int ] = None ,
272273 ) -> Response :
273274 """Delete topics from the cluster.
@@ -284,7 +285,7 @@ async def delete_topics(
284285
285286 async def _get_cluster_metadata (
286287 self ,
287- topics : Optional [List [str ]] = None ,
288+ topics : Optional [list [str ]] = None ,
288289 ) -> Response :
289290 """
290291 Retrieve cluster metadata
@@ -295,30 +296,30 @@ async def _get_cluster_metadata(
295296 request = req_cls (topics = topics )
296297 return await self ._send_request (request )
297298
298- async def list_topics (self ) -> List [str ]:
299+ async def list_topics (self ) -> list [str ]:
299300 metadata = await self ._get_cluster_metadata (topics = None )
300301 obj = metadata .to_object ()
301302 return [t ["topic" ] for t in obj ["topics" ]]
302303
303304 async def describe_topics (
304305 self ,
305- topics : Optional [List [str ]] = None ,
306- ) -> List [Any ]:
306+ topics : Optional [list [str ]] = None ,
307+ ) -> list [Any ]:
307308 metadata = await self ._get_cluster_metadata (topics = topics )
308309 obj = metadata .to_object ()
309310 return obj ["topics" ]
310311
311- async def describe_cluster (self ) -> Dict [str , Any ]:
312+ async def describe_cluster (self ) -> dict [str , Any ]:
312313 metadata = await self ._get_cluster_metadata ()
313314 obj = metadata .to_object ()
314315 obj .pop ("topics" ) # We have 'describe_topics' for this
315316 return obj
316317
317318 async def describe_configs (
318319 self ,
319- config_resources : List [ConfigResource ],
320+ config_resources : list [ConfigResource ],
320321 include_synonyms : bool = False ,
321- ) -> List [Response ]:
322+ ) -> list [Response ]:
322323 """Fetch configuration parameters for one or more Kafka resources.
323324
324325 :param config_resources: An list of ConfigResource objects.
@@ -360,8 +361,8 @@ async def describe_configs(
360361 return await asyncio .gather (* futures )
361362
362363 async def alter_configs (
363- self , config_resources : List [ConfigResource ]
364- ) -> List [Response ]:
364+ self , config_resources : list [ConfigResource ]
365+ ) -> list [Response ]:
365366 """Alter configuration parameters of one or more Kafka resources.
366367 :param config_resources: A list of ConfigResource objects.
367368 :return: Appropriate version of AlterConfigsResponse class.
@@ -398,9 +399,9 @@ def _convert_alter_config_resource_request(config_resource):
398399 @classmethod
399400 def _convert_config_resources (
400401 cls ,
401- config_resources : List [ConfigResource ],
402+ config_resources : list [ConfigResource ],
402403 op_type : str = "describe" ,
403- ) -> Tuple [ Dict [int , Any ], List [Any ]]:
404+ ) -> tuple [ dict [int , Any ], list [Any ]]:
404405 broker_resources = defaultdict (list )
405406 topic_resources = []
406407 if op_type == "describe" :
@@ -416,15 +417,15 @@ def _convert_config_resources(
416417 return broker_resources , topic_resources
417418
418419 @staticmethod
419- def _convert_topic_partitions (topic_partitions : Dict [str , NewPartitions ]):
420+ def _convert_topic_partitions (topic_partitions : dict [str , NewPartitions ]):
420421 return [
421422 (topic_name , (new_part .total_count , new_part .new_assignments ))
422423 for topic_name , new_part in topic_partitions .items ()
423424 ]
424425
425426 async def create_partitions (
426427 self ,
427- topic_partitions : Dict [str , NewPartitions ],
428+ topic_partitions : dict [str , NewPartitions ],
428429 timeout_ms : Optional [int ] = None ,
429430 validate_only : bool = False ,
430431 ) -> Response :
@@ -455,10 +456,10 @@ async def create_partitions(
455456
456457 async def describe_consumer_groups (
457458 self ,
458- group_ids : List [str ],
459+ group_ids : list [str ],
459460 group_coordinator_id : Optional [int ] = None ,
460461 include_authorized_operations : bool = False ,
461- ) -> List [Response ]:
462+ ) -> list [Response ]:
462463 """Describe a set of consumer groups.
463464
464465 Any errors are immediately raised.
@@ -508,8 +509,8 @@ async def describe_consumer_groups(
508509
509510 async def list_consumer_groups (
510511 self ,
511- broker_ids : Optional [List [int ]] = None ,
512- ) -> List [ Tuple [Any , ...]]:
512+ broker_ids : Optional [list [int ]] = None ,
513+ ) -> list [ tuple [Any , ...]]:
513514 """List all consumer groups known to the cluster.
514515
515516 This returns a list of Consumer Group tuples. The tuples are
@@ -578,8 +579,8 @@ async def list_consumer_group_offsets(
578579 self ,
579580 group_id : str ,
580581 group_coordinator_id : Optional [int ] = None ,
581- partitions : Optional [List [TopicPartition ]] = None ,
582- ) -> Dict [TopicPartition , OffsetAndMetadata ]:
582+ partitions : Optional [list [TopicPartition ]] = None ,
583+ ) -> dict [TopicPartition , OffsetAndMetadata ]:
583584 """Fetch Consumer Offsets for a single consumer group.
584585
585586 Note:
@@ -636,9 +637,9 @@ async def list_consumer_group_offsets(
636637
637638 async def delete_records (
638639 self ,
639- records_to_delete : Dict [TopicPartition , RecordsToDelete ],
640+ records_to_delete : dict [TopicPartition , RecordsToDelete ],
640641 timeout_ms : Optional [int ] = None ,
641- ) -> Dict [TopicPartition , int ]:
642+ ) -> dict [TopicPartition , int ]:
642643 """Delete records from partitions.
643644
644645 :param records_to_delete: A map of RecordsToDelete for each TopicPartition
@@ -681,7 +682,7 @@ async def delete_records(
681682
682683 @staticmethod
683684 def _convert_records_to_delete (
684- records_to_delete : Dict [str , List [ Tuple [int , RecordsToDelete ]]],
685+ records_to_delete : dict [str , list [ tuple [int , RecordsToDelete ]]],
685686 ):
686687 return [
687688 (topic , [(partition , rec .before_offset ) for partition , rec in records ])
0 commit comments