Skip to content

Commit 9c99020

Browse files
committed
add types for AIO module
1 parent 8f789a0 commit 9c99020

File tree

4 files changed

+23
-34
lines changed

4 files changed

+23
-34
lines changed

src/confluent_kafka/experimental/aio/_AIOConsumer.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,17 +14,18 @@
1414

1515
import asyncio
1616
import concurrent.futures
17-
from typing import Any, Callable, Dict, Optional, Tuple
17+
from typing import Any, Callable, Optional, Tuple
1818

1919
import confluent_kafka
2020

2121
from . import _common as _common
22+
from ..._types import ConfigDict
2223

2324

2425
class AIOConsumer:
2526
def __init__(
2627
self,
27-
consumer_conf: Dict[str, Any],
28+
consumer_conf: ConfigDict,
2829
max_workers: int = 2,
2930
executor: Optional[concurrent.futures.Executor] = None
3031
) -> None:

src/confluent_kafka/experimental/aio/_common.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
import concurrent.futures
1919
from typing import Any, Callable, Dict, Optional, Tuple, TypeVar
2020

21+
from ..._types import ConfigDict
22+
2123
T = TypeVar('T')
2224

2325

@@ -32,7 +34,7 @@ def __init__(
3234
self.logger = logger
3335

3436
def log(self, *args: Any, **kwargs: Any) -> None:
35-
self.loop.call_soon_threadsafe(lambda: self.logger.log(*args, **kwargs))
37+
self.loop.call_soon_threadsafe(self.logger.log, *args, **kwargs)
3638

3739

3840
def wrap_callback(

src/confluent_kafka/experimental/aio/producer/_AIOProducer.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,15 @@
1515
import asyncio
1616
import concurrent.futures
1717
import logging
18-
from typing import Any, Callable, Dict, Optional
18+
from typing import Any, Callable, Optional
1919

2020
import confluent_kafka
2121

2222
from .. import _common as _common
2323
from ._producer_batch_processor import ProducerBatchManager
2424
from ._kafka_batch_executor import ProducerBatchExecutor
2525
from ._buffer_timeout_manager import BufferTimeoutManager
26+
from ..._types import ConfigDict
2627

2728

2829
logger = logging.getLogger(__name__)
@@ -36,7 +37,7 @@ class AIOProducer:
3637

3738
def __init__(
3839
self,
39-
producer_conf: Dict[str, Any],
40+
producer_conf: ConfigDict,
4041
max_workers: int = 4,
4142
executor: Optional[concurrent.futures.Executor] = None,
4243
batch_size: int = 1000,
@@ -224,7 +225,7 @@ async def flush(self, *args: Any, **kwargs: Any) -> Any:
224225
# Update buffer activity since we just flushed
225226
self._buffer_timeout_manager.mark_activity()
226227

227-
# Then flush the underlying producer and wait for delivery confirmation
228+
# Then flush underlying producer and wait for delivery confirmation
228229
return await self._call(self._producer.flush, *args, **kwargs)
229230

230231
async def purge(self, *args: Any, **kwargs: Any) -> Any:

src/confluent_kafka/experimental/aio/producer/_kafka_batch_executor.py

Lines changed: 13 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@
1919

2020
import confluent_kafka
2121

22-
from .. import _common
23-
2422
logger = logging.getLogger(__name__)
2523

2624

@@ -65,10 +63,10 @@ async def execute_batch(
6563
Args:
6664
topic: Target topic for the batch
6765
batch_messages: List of prepared messages with callbacks assigned
68-
partition: Target partition for the batch (-1 = RD_KAFKA_PARTITION_UA)
66+
partition: Target partition (-1 = RD_KAFKA_PARTITION_UA)
6967
7068
Returns:
71-
Result from producer.poll() indicating number of delivery reports processed
69+
Result from producer.poll() indicating # of delivery reports processed
7270
7371
Raises:
7472
Exception: Any exception from the batch operation is propagated
@@ -77,9 +75,9 @@ def _produce_batch_and_poll() -> int:
7775
"""Helper function to run in thread pool
7876
7977
This function encapsulates all the blocking Kafka operations:
80-
- Call produce_batch with specific partition and individual message callbacks
78+
- Call produce_batch with specific partition & individual callbacks
8179
- Handle partial batch failures for messages that fail immediately
82-
- Poll for delivery reports to trigger callbacks for successful messages
80+
- Poll for delivery reports to trigger callbacks for successful msgs
8381
"""
8482
# Call produce_batch with specific partition and individual callbacks
8583
# Convert tuple to list since produce_batch expects a list
@@ -90,7 +88,8 @@ def _produce_batch_and_poll() -> int:
9088
)
9189

9290
# Use the provided partition for the entire batch
93-
# This enables proper partition control while working around librdkafka limitations
91+
# This enables proper partition control while working around
92+
# librdkafka limitations
9493
self._producer.produce_batch(topic, messages_list, partition=partition)
9594

9695
# Handle partial batch failures: Check for messages that failed
@@ -99,7 +98,7 @@ def _produce_batch_and_poll() -> int:
9998
# so we need to manually invoke their callbacks
10099
self._handle_partial_failures(messages_list)
101100

102-
# Immediately poll to process delivery callbacks for successful messages
101+
# Immediately poll to process delivery callbacks for successful msgs
103102
poll_result = self._producer.poll(0)
104103

105104
return poll_result
@@ -108,24 +107,10 @@ def _produce_batch_and_poll() -> int:
108107
loop = asyncio.get_running_loop()
109108
return await loop.run_in_executor(self._executor, _produce_batch_and_poll)
110109

111-
async def flush_librdkafka_queue(self, timeout=-1):
112-
"""Flush the librdkafka queue and wait for all messages to be delivered
113-
114-
This method awaits until all outstanding produce requests are completed
115-
or the timeout is reached, unless the timeout is set to 0 (non-blocking).
116-
117-
Args:
118-
timeout: Maximum time to wait in seconds:
119-
- -1 = wait indefinitely (default)
120-
- 0 = non-blocking, return immediately
121-
- >0 = wait up to timeout seconds
122-
123-
Returns:
124-
Number of messages still in queue after flush attempt
125-
"""
126-
return await _common.async_call(self._executor, self._producer.flush, timeout)
127-
128-
def _handle_partial_failures(self, batch_messages: List[Dict[str, Any]]) -> None:
110+
def _handle_partial_failures(
111+
self,
112+
batch_messages: List[Dict[str, Any]]
113+
) -> None:
129114
"""Handle messages that failed during produce_batch
130115
131116
When produce_batch encounters messages that fail immediately (e.g.,
@@ -135,7 +120,7 @@ def _handle_partial_failures(self, batch_messages: List[Dict[str, Any]]) -> None
135120
manually invoke the simple future-resolving callbacks.
136121
137122
Args:
138-
batch_messages: List of message dictionaries that were passed to produce_batch
123+
batch_messages: List of message dicts passed to produce_batch
139124
"""
140125
for msg_dict in batch_messages:
141126
if '_error' in msg_dict:
@@ -146,7 +131,7 @@ def _handle_partial_failures(self, batch_messages: List[Dict[str, Any]]) -> None
146131
# Extract the error from the message dict (set by Producer.c)
147132
error = msg_dict['_error']
148133
# Manually invoke the callback with the error
149-
# Note: msg is None since the message failed before being queued
134+
# Note: msg is None since message failed before being queued
150135
try:
151136
callback(error, None)
152137
except Exception:

0 commit comments

Comments
 (0)