@@ -63,10 +63,10 @@ async def execute_batch(
6363 Args:
6464 topic: Target topic for the batch
6565 batch_messages: List of prepared messages with callbacks assigned
66- partition: Target partition (-1 = RD_KAFKA_PARTITION_UA)
66+ partition: Target partition for the batch (-1 = RD_KAFKA_PARTITION_UA)
6767
6868 Returns:
69- Result from producer.poll() indicating # of delivery reports processed
69+ Result from producer.poll() indicating number of delivery reports processed
7070
7171 Raises:
7272 Exception: Any exception from the batch operation is propagated
@@ -75,9 +75,9 @@ def _produce_batch_and_poll() -> int:
7575 """Helper function to run in thread pool
7676
7777 This function encapsulates all the blocking Kafka operations:
78- - Call produce_batch with specific partition & individual callbacks
78+ - Call produce_batch with specific partition and individual message callbacks
7979 - Handle partial batch failures for messages that fail immediately
80- - Poll for delivery reports to trigger callbacks for successful msgs
80+ - Poll for delivery reports to trigger callbacks for successful messages
8181 """
8282 # Call produce_batch with specific partition and individual callbacks
8383 # Convert tuple to list since produce_batch expects a list
@@ -88,8 +88,7 @@ def _produce_batch_and_poll() -> int:
8888 )
8989
9090 # Use the provided partition for the entire batch
91- # This enables proper partition control while working around
92- # librdkafka limitations
91+ # This enables proper partition control while working around librdkafka limitations
9392 self ._producer .produce_batch (topic , messages_list , partition = partition )
9493
9594 # Handle partial batch failures: Check for messages that failed
@@ -98,7 +97,7 @@ def _produce_batch_and_poll() -> int:
9897 # so we need to manually invoke their callbacks
9998 self ._handle_partial_failures (messages_list )
10099
101- # Immediately poll to process delivery callbacks for successful msgs
100+ # Immediately poll to process delivery callbacks for successful messages
102101 poll_result = self ._producer .poll (0 )
103102
104103 return poll_result
@@ -120,7 +119,7 @@ def _handle_partial_failures(
120119 manually invoke the simple future-resolving callbacks.
121120
122121 Args:
123- batch_messages: List of message dicts passed to produce_batch
122+ batch_messages: List of message dictionaries that were passed to produce_batch
124123 """
125124 for msg_dict in batch_messages :
126125 if '_error' in msg_dict :
@@ -131,7 +130,7 @@ def _handle_partial_failures(
131130 # Extract the error from the message dict (set by Producer.c)
132131 error = msg_dict ['_error' ]
133132 # Manually invoke the callback with the error
134- # Note: msg is None since message failed before being queued
133+ # Note: msg is None since the message failed before being queued
135134 try :
136135 callback (error , None )
137136 except Exception :
0 commit comments