33from collections import defaultdict
44from collections .abc import Sequence
55from ssl import SSLContext
6- from typing import Any , Optional , Union
6+ from typing import Any
77
88import async_timeout
99
@@ -89,21 +89,21 @@ def __init__(
8989 self ,
9090 * ,
9191 loop = None ,
92- bootstrap_servers : Union [ str , list [str ] ] = "localhost" ,
92+ bootstrap_servers : str | list [str ] = "localhost" ,
9393 client_id : str = "aiokafka-" + __version__ ,
9494 request_timeout_ms : int = 40000 ,
9595 connections_max_idle_ms : int = 540000 ,
9696 retry_backoff_ms : int = 100 ,
9797 metadata_max_age_ms : int = 300000 ,
9898 security_protocol : str = "PLAINTEXT" ,
99- ssl_context : Optional [ SSLContext ] = None ,
99+ ssl_context : SSLContext | None = None ,
100100 api_version : str = "auto" ,
101101 sasl_mechanism : str = "PLAIN" ,
102- sasl_plain_username : Optional [ str ] = None ,
103- sasl_plain_password : Optional [ str ] = None ,
102+ sasl_plain_username : str | None = None ,
103+ sasl_plain_password : str | None = None ,
104104 sasl_kerberos_service_name : str = "kafka" ,
105- sasl_kerberos_domain_name : Optional [ str ] = None ,
106- sasl_oauth_token_provider : Optional [ str ] = None ,
105+ sasl_kerberos_domain_name : str | None = None ,
106+ sasl_oauth_token_provider : str | None = None ,
107107 ):
108108 self ._closed = False
109109 self ._started = False
@@ -141,7 +141,7 @@ async def close(self):
141141 async def _send_request (
142142 self ,
143143 request : Request ,
144- node_id : Optional [ int ] = None ,
144+ node_id : int | None = None ,
145145 ) -> Response :
146146 if node_id is None :
147147 node_id = self ._client .get_random_node ()
@@ -227,7 +227,7 @@ def _convert_new_topic_request(new_topic):
227227 async def create_topics (
228228 self ,
229229 new_topics : list [NewTopic ],
230- timeout_ms : Optional [ int ] = None ,
230+ timeout_ms : int | None = None ,
231231 validate_only : bool = False ,
232232 ) -> Response :
233233 """Create new topics in the cluster.
@@ -269,7 +269,7 @@ async def create_topics(
269269 async def delete_topics (
270270 self ,
271271 topics : list [str ],
272- timeout_ms : Optional [ int ] = None ,
272+ timeout_ms : int | None = None ,
273273 ) -> Response :
274274 """Delete topics from the cluster.
275275
@@ -285,7 +285,7 @@ async def delete_topics(
285285
286286 async def _get_cluster_metadata (
287287 self ,
288- topics : Optional [ list [str ]] = None ,
288+ topics : list [str ] | None = None ,
289289 ) -> Response :
290290 """
291291 Retrieve cluster metadata
@@ -303,7 +303,7 @@ async def list_topics(self) -> list[str]:
303303
304304 async def describe_topics (
305305 self ,
306- topics : Optional [ list [str ]] = None ,
306+ topics : list [str ] | None = None ,
307307 ) -> list [Any ]:
308308 metadata = await self ._get_cluster_metadata (topics = topics )
309309 obj = metadata .to_object ()
@@ -426,7 +426,7 @@ def _convert_topic_partitions(topic_partitions: dict[str, NewPartitions]):
426426 async def create_partitions (
427427 self ,
428428 topic_partitions : dict [str , NewPartitions ],
429- timeout_ms : Optional [ int ] = None ,
429+ timeout_ms : int | None = None ,
430430 validate_only : bool = False ,
431431 ) -> Response :
432432 """Create additional partitions for an existing topic.
@@ -457,7 +457,7 @@ async def create_partitions(
457457 async def describe_consumer_groups (
458458 self ,
459459 group_ids : list [str ],
460- group_coordinator_id : Optional [ int ] = None ,
460+ group_coordinator_id : int | None = None ,
461461 include_authorized_operations : bool = False ,
462462 ) -> list [Response ]:
463463 """Describe a set of consumer groups.
@@ -509,7 +509,7 @@ async def describe_consumer_groups(
509509
510510 async def list_consumer_groups (
511511 self ,
512- broker_ids : Optional [ list [int ]] = None ,
512+ broker_ids : list [int ] | None = None ,
513513 ) -> list [tuple [Any , ...]]:
514514 """List all consumer groups known to the cluster.
515515
@@ -578,8 +578,8 @@ async def find_coordinator(self, group_id: str, coordinator_type: int = 0) -> in
578578 async def list_consumer_group_offsets (
579579 self ,
580580 group_id : str ,
581- group_coordinator_id : Optional [ int ] = None ,
582- partitions : Optional [ list [TopicPartition ]] = None ,
581+ group_coordinator_id : int | None = None ,
582+ partitions : list [TopicPartition ] | None = None ,
583583 ) -> dict [TopicPartition , OffsetAndMetadata ]:
584584 """Fetch Consumer Offsets for a single consumer group.
585585
@@ -638,7 +638,7 @@ async def list_consumer_group_offsets(
638638 async def delete_records (
639639 self ,
640640 records_to_delete : dict [TopicPartition , RecordsToDelete ],
641- timeout_ms : Optional [ int ] = None ,
641+ timeout_ms : int | None = None ,
642642 ) -> dict [TopicPartition , int ]:
643643 """Delete records from partitions.
644644
0 commit comments