99 Awaitable ,
1010 Callable ,
1111 Dict ,
12- List ,
12+ MutableSequence ,
1313 Optional ,
1414 Protocol ,
15+ Sequence ,
1516 Tuple ,
16- Union ,
1717)
1818
1919import aiokafka
@@ -49,8 +49,8 @@ class AIOKafkaSendProto(Protocol):
4949 async def __call__ (
5050 self ,
5151 topic : str ,
52- value : Any | None = None ,
53- key : Any | None = None ,
52+ value : object | None = None ,
53+ key : object | None = None ,
5454 partition : int | None = None ,
5555 timestamp_ms : int | None = None ,
5656 headers : HeadersT | None = None ,
@@ -72,14 +72,14 @@ async def __call__(
7272 ]
7373]
7474
75- HeadersT = List [Tuple [str , Optional [bytes ]]]
75+ HeadersT = Sequence [Tuple [str , Optional [bytes ]]]
7676
7777_LOG = getLogger (__name__ )
7878
7979
8080def _extract_bootstrap_servers (
8181 client : aiokafka .AIOKafkaClient ,
82- ) -> Union [ str , List [str ] ]:
82+ ) -> str | list [str ]:
8383 return client ._bootstrap_servers
8484
8585
@@ -89,51 +89,53 @@ def _extract_client_id(client: aiokafka.AIOKafkaClient) -> str:
8989
9090def _extract_consumer_group (
9191 consumer : aiokafka .AIOKafkaConsumer ,
92- ) -> Optional [ str ] :
92+ ) -> str | None :
9393 return consumer ._group_id
9494
9595
9696def _extract_argument (
9797 key : str ,
9898 position : int ,
9999 default_value : Any ,
100- args : Tuple [Any ],
101- kwargs : Dict [str , Any ],
100+ args : tuple [Any , ... ],
101+ kwargs : dict [str , Any ],
102102) -> Any :
103103 if len (args ) > position :
104104 return args [position ]
105105 return kwargs .get (key , default_value )
106106
107107
108- def _extract_send_topic (args : Tuple [Any ], kwargs : Dict [str , Any ]) -> str :
108+ def _extract_send_topic (args : tuple [Any , ... ], kwargs : dict [str , Any ]) -> str :
109109 """extract topic from `send` method arguments in AIOKafkaProducer class"""
110110 return _extract_argument ("topic" , 0 , "unknown" , args , kwargs )
111111
112112
113113def _extract_send_value (
114- args : Tuple [Any ], kwargs : Dict [str , Any ]
115- ) -> Optional [ Any ] :
114+ args : tuple [Any , ... ], kwargs : dict [str , Any ]
115+ ) -> object | None :
116116 """extract value from `send` method arguments in AIOKafkaProducer class"""
117117 return _extract_argument ("value" , 1 , None , args , kwargs )
118118
119119
120120def _extract_send_key (
121- args : Tuple [Any ], kwargs : Dict [str , Any ]
122- ) -> Optional [ Any ] :
121+ args : tuple [Any , ... ], kwargs : dict [str , Any ]
122+ ) -> object | None :
123123 """extract key from `send` method arguments in AIOKafkaProducer class"""
124124 return _extract_argument ("key" , 2 , None , args , kwargs )
125125
126126
127- def _extract_send_headers (args : Tuple [Any ], kwargs : Dict [str , Any ]):
127+ def _extract_send_headers (
128+ args : tuple [Any , ...], kwargs : dict [str , Any ]
129+ ) -> HeadersT | None :
128130 """extract headers from `send` method arguments in AIOKafkaProducer class"""
129131 return _extract_argument ("headers" , 5 , None , args , kwargs )
130132
131133
132134async def _extract_send_partition (
133135 instance : aiokafka .AIOKafkaProducer ,
134- args : Tuple [Any ],
135- kwargs : Dict [str , Any ],
136- ) -> Optional [ int ] :
136+ args : tuple [Any , ... ],
137+ kwargs : dict [str , Any ],
138+ ) -> int | None :
137139 """extract partition `send` method arguments, using the `_partition` method in AIOKafkaProducer class"""
138140 try :
139141 topic = _extract_send_topic (args , kwargs )
@@ -159,7 +161,7 @@ async def _extract_send_partition(
159161
160162
161163class AIOKafkaContextGetter (textmap .Getter [HeadersT ]):
162- def get (self , carrier : HeadersT , key : str ) -> Optional [ List [ str ]] :
164+ def get (self , carrier : HeadersT , key : str ) -> list [ str ] | None :
163165 if carrier is None :
164166 return None
165167
@@ -169,19 +171,25 @@ def get(self, carrier: HeadersT, key: str) -> Optional[List[str]]:
169171 return [value .decode ()]
170172 return None
171173
172- def keys (self , carrier : HeadersT ) -> List [str ]:
174+ def keys (self , carrier : HeadersT ) -> list [str ]:
173175 if carrier is None :
174176 return []
175177 return [key for (key , value ) in carrier ]
176178
177179
178180class AIOKafkaContextSetter (textmap .Setter [HeadersT ]):
179181 def set (
180- self , carrier : HeadersT , key : Optional [ str ] , value : Optional [ str ]
182+ self , carrier : HeadersT , key : str | None , value : str | None
181183 ) -> None :
182184 if carrier is None or key is None :
183185 return
184186
187+ if not isinstance (carrier , MutableSequence ):
188+ _LOG .warning (
189+ "Unable to set context in headers. Headers is immutable"
190+ )
191+ return
192+
185193 if value is not None :
186194 carrier .append ((key , value .encode ()))
187195 else :
@@ -195,11 +203,11 @@ def set(
195203def _enrich_base_span (
196204 span : Span ,
197205 * ,
198- bootstrap_servers : Union [ str , List [str ] ],
206+ bootstrap_servers : str | list [str ],
199207 client_id : str ,
200208 topic : str ,
201- partition : Optional [ int ] ,
202- key : Optional [ Any ] ,
209+ partition : int | None ,
210+ key : object | None ,
203211) -> None :
204212 span .set_attribute (
205213 messaging_attributes .MESSAGING_SYSTEM ,
@@ -219,18 +227,19 @@ def _enrich_base_span(
219227
220228 if key is not None :
221229 span .set_attribute (
222- messaging_attributes .MESSAGING_KAFKA_MESSAGE_KEY , key
230+ messaging_attributes .MESSAGING_KAFKA_MESSAGE_KEY ,
231+ key , # FIXME: serialize key to str?
223232 )
224233
225234
226235def _enrich_send_span (
227236 span : Span ,
228237 * ,
229- bootstrap_servers : Union [ str , List [str ] ],
238+ bootstrap_servers : str | list [str ],
230239 client_id : str ,
231240 topic : str ,
232- partition : Optional [ int ] ,
233- key : Optional [ str ] ,
241+ partition : int | None ,
242+ key : object | None ,
234243) -> None :
235244 if not span .is_recording ():
236245 return
@@ -254,12 +263,12 @@ def _enrich_send_span(
254263def _enrich_getone_span (
255264 span : Span ,
256265 * ,
257- bootstrap_servers : Union [ str , List [str ] ],
266+ bootstrap_servers : str | list [str ],
258267 client_id : str ,
259- consumer_group : Optional [ str ] ,
268+ consumer_group : str | None ,
260269 topic : str ,
261- partition : Optional [ int ] ,
262- key : Optional [ str ] ,
270+ partition : int | None ,
271+ key : object | None ,
263272 offset : int ,
264273) -> None :
265274 if not span .is_recording ():
@@ -303,9 +312,9 @@ def _enrich_getone_span(
303312def _enrich_getmany_poll_span (
304313 span : Span ,
305314 * ,
306- bootstrap_servers : Union [ str , List [str ] ],
315+ bootstrap_servers : str | list [str ],
307316 client_id : str ,
308- consumer_group : Optional [ str ] ,
317+ consumer_group : str | None ,
309318 message_count : int ,
310319) -> None :
311320 if not span .is_recording ():
@@ -339,9 +348,9 @@ def _enrich_getmany_poll_span(
339348def _enrich_getmany_topic_span (
340349 span : Span ,
341350 * ,
342- bootstrap_servers : Union [ str , List [str ] ],
351+ bootstrap_servers : str | list [str ],
343352 client_id : str ,
344- consumer_group : Optional [ str ] ,
353+ consumer_group : str | None ,
345354 topic : str ,
346355 partition : int ,
347356 message_count : int ,
@@ -384,10 +393,10 @@ def _wrap_send(
384393 async def _traced_send (
385394 func : AIOKafkaSendProto ,
386395 instance : aiokafka .AIOKafkaProducer ,
387- args : Tuple [Any ],
388- kwargs : Dict [str , Any ],
396+ args : tuple [Any , ... ],
397+ kwargs : dict [str , Any ],
389398 ) -> asyncio .Future [RecordMetadata ]:
390- headers = _extract_send_headers (args , kwargs )
399+ headers : HeadersT | None = _extract_send_headers (args , kwargs )
391400 if headers is None :
392401 headers = []
393402 kwargs ["headers" ] = headers
@@ -430,11 +439,11 @@ async def _create_consumer_span(
430439 async_consume_hook : ConsumeHookT ,
431440 record : aiokafka .ConsumerRecord [object , object ],
432441 extracted_context : Context ,
433- bootstrap_servers : Union [ str , List [str ] ],
442+ bootstrap_servers : str | list [str ],
434443 client_id : str ,
435- consumer_group : Optional [ str ] ,
436- args : Tuple [ Any ],
437- kwargs : Dict [str , Any ],
444+ consumer_group : str | None ,
445+ args : tuple [ aiokafka . TopicPartition , ... ],
446+ kwargs : dict [str , Any ],
438447) -> trace .Span :
439448 span_name = _get_span_name ("receive" , record .topic )
440449 with tracer .start_as_current_span (
@@ -470,8 +479,8 @@ def _wrap_getone(
470479 async def _traced_getone (
471480 func : AIOKafkaGetOneProto ,
472481 instance : aiokafka .AIOKafkaConsumer ,
473- args : Tuple [ Any ],
474- kwargs : Dict [str , Any ],
482+ args : tuple [ aiokafka . TopicPartition , ... ],
483+ kwargs : dict [str , Any ],
475484 ) -> aiokafka .ConsumerRecord [object , object ]:
476485 record = await func (* args , ** kwargs )
477486
@@ -513,8 +522,8 @@ def _wrap_getmany(
513522 async def _traced_getmany (
514523 func : AIOKafkaGetManyProto ,
515524 instance : aiokafka .AIOKafkaConsumer ,
516- args : Tuple [ Any ],
517- kwargs : Dict [str , Any ],
525+ args : tuple [ aiokafka . TopicPartition , ... ],
526+ kwargs : dict [str , Any ],
518527 ) -> dict [
519528 aiokafka .TopicPartition , list [aiokafka .ConsumerRecord [object , object ]]
520529 ]:
@@ -527,9 +536,7 @@ async def _traced_getmany(
527536
528537 span_name = _get_span_name (
529538 "poll" ,
530- ", " .join (
531- sorted (set (topic .topic for topic in records .keys ()))
532- ),
539+ ", " .join (sorted ({topic .topic for topic in records .keys ()})),
533540 )
534541 with tracer .start_as_current_span (
535542 span_name , kind = trace .SpanKind .CLIENT
0 commit comments