-
-
Notifications
You must be signed in to change notification settings - Fork 255
Description
I just tried the sample code for AIOKafkaConsumer from your github page on a topic with 1 partition. At each iteration I output msg.offset, and the offsets appear in order. So, how is this library even asynchronous? I don't understand what's asynchronous about it.
Here's the code that I ran with ip and topics omitted:
from aiokafka import AIOKafkaConsumer
import asyncio
async def consume():
consumer = AIOKafkaConsumer(
'my_topic', 'my_other_topic',
bootstrap_servers='localhost:9092',
group_id="my-group")
# Get cluster layout and join group `my-group`
await consumer.start()
try:
# Consume messages
async for msg in consumer:
print(msg.offset)
finally:
# Will leave consumer group; perform autocommit if enabled.
await consumer.stop()
asyncio.run(consume())
I am not an advanced python user, I use scala, but from what I understand each iteration of the for loop has to create an async task for reading a kafka message.
Now imagine the message at offset 0 to be something that needs 1 hour to download, and the message at offset 1 takes 1 second to download, the callback for message 1 should be executed before the callback for message 0, and they are supposed to come out of order. But why do they come in order?