@@ -87,15 +87,18 @@ async def run_producer():
8787 # AsyncIO Pattern: Batched async produce with concurrent futures
8888 # Creates 100 concurrent produce operations, each returning a Future
8989 # that resolves when the message is delivered or fails
90- produce_futures = [asyncio . create_task (
91- producer . produce ( topic = topic ,
90+ produce_futures = [await producer . produce (
91+ topic = topic ,
9292 key = f'testkey{ i } ' ,
93- value = f'testvalue{ i } ' ))
94- for i in range (100 )]
95- # Wait for all produce operations to complete concurrently
96- results = await asyncio .gather (* produce_futures )
93+ value = f'testvalue{ i } ' )
94+ for i in range (10 )]
9795
98- for msg in results :
96+ logger .info (f"Produced { len (produce_futures )} messages" )
97+ # Force a flush of the local buffer to ensure messages will be in flight before awaiting their delivery
98+ # TODO: this shouldn't be strictly necessary in the future
99+ await producer .flush ()
100+ # Wait for all produce operations to complete concurrently
101+ for msg in await asyncio .gather (* produce_futures ):
99102 logger .info (
100103 'Produced to: {} [{}] @ {}' .format (msg .topic (),
101104 msg .partition (),
@@ -105,6 +108,7 @@ async def run_producer():
105108 await producer .commit_transaction ()
106109 transaction_active = False
107110 # Use asyncio.sleep() instead of time.sleep() to yield control to event loop
111+ # Change this to sleep(0) in a real application as this is mimicing doing external work on the event loop
108112 await asyncio .sleep (1 )
109113 except Exception as e :
110114 logger .error (e )
@@ -113,7 +117,7 @@ async def run_producer():
113117 # Always clean up resources asynchronously to avoid blocking the event loop
114118 if transaction_active :
115119 await producer .abort_transaction ()
116- await producer .stop () # Stops background tasks and closes connections
120+ await producer .close () # Stops background tasks and closes connections
117121 logger .info ('Closed producer' )
118122
119123
0 commit comments