-
Notifications
You must be signed in to change notification settings - Fork 252
Closed
Description
Describe the bug
As also mentioned in #3383, we have a memory leak (consistent linear increase in memory utilisation) when we use AIOKafkaConsumer in our fastapi code, it seems that the source is endless increase in objects from the path /python3.13/site-packages/opentelemetry/instrumentation/asyncio/init.py.
trying to use getone() within a while loop instead of anext reduce the leak pace but did not solve it. some of our checks showed that setting enable_auto_commit to False fixes this issue but we do want to use auto committing.
Expected behaviour
stable memory utilization when using aiokafka's best practices.
Environment (please complete the following information):
- aiokafka version (
python -c "import aiokafka; print(aiokafka.__version__)"): 0.12.0 - Kafka Broker version (
kafka-topics.sh --version): 3.6-IV2
Reproducible example
from aiokafka import AIOKafkaConsumer
import asyncio
async def consume():
consumer = AIOKafkaConsumer(
'my_topic', 'my_other_topic',
bootstrap_servers='localhost:9092',
group_id="my-group")
# Get cluster layout and join group `my-group`
await consumer.start()
try:
# Consume messages
async for msg in consumer:
print("consumed: ", msg.topic, msg.partition, msg.offset,
msg.key, msg.value, msg.timestamp)
finally:
# Will leave consumer group; perform autocommit if enabled.
await consumer.stop()
asyncio.run(consume())Metadata
Metadata
Assignees
Labels
No labels