Skip to content

AIOKafkaProducer appears to hang forever if Kafka is down for send_and_wait and stop() #1101

@MrCreosote

Description

@MrCreosote

Describe the bug
If the Kafka server is down, send()ing a message and awaiting the results has two effects:

  1. A log is sent every retry_backoff_ms
    • This is pretty spammy at the default 100ms
  2. The await hangs apparently forever

If the result is asyncio.wait_for()'d with a timeout so that the thread can eventually continue, and then stop() is called on the producer:

  • The logs continue to be sent
  • The stop() hangs apparently forever

Expected behaviour
I would expect that both awaiting the result of a send() call (or awaiting a send_and_wait() call) and a stop() call eventually time out.

Environment (please complete the following information):

  • aiokafka version: 0.12.0
  • Kafka Broker version: Tested with 3.9.0 and 4.0.0
  • Other information: n/a

Reproducible example

"""
Test what happens when kafka is down and the notifier sends a message.
"""

import asyncio
from aiokafka import AIOKafkaProducer
import traceback


async def main():
    ###
    # Kafka is running here
    ###
    kp = AIOKafkaProducer(
        bootstrap_servers=f"localhost:9092",
        enable_idempotence=True,
        acks='all',
        request_timeout_ms=5000,
        retry_backoff_ms=1000
    )
    await kp.start()
    print("sending 1st message")
    future = await kp.send("mytopic", b"foo1")
    print("done")
    await future

    ###
    # Kafka is stopped here
    ###
    print("sending 2nd message")
    future = await kp.send("mytopic", b"foo2")
    print("done")
    try:
        await asyncio.wait_for(future, 6)
    except TimeoutError as e:
        traceback.print_exception(e)
    print("stopping client")
    await kp.stop()
    print('done"')


if __name__ == "__main__":
    asyncio.run(main())

The above results in the output:

sending 1st message
Topic mytopic not found in cluster metadata
done
sending 2nd message
done
Unable connect to node with id 1: [Errno 111] Connect call failed ('127.0.0.1', 54301)
Got error produce response: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 1).
Unable connect to node with id 1: [Errno 111] Connect call failed ('127.0.0.1', 54301)
Unable to update metadata from [1]
Unable connect to node with id 1: [Errno 111] Connect call failed ('127.0.0.1', 54301)
Got error produce response: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 1).
Unable connect to node with id 1: [Errno 111] Connect call failed ('127.0.0.1', 54301)
Unable to update metadata from [1]
Unable connect to node with id 1: [Errno 111] Connect call failed ('127.0.0.1', 54301)
Got error produce response: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 1).
Unable connect to node with id 1: [Errno 111] Connect call failed ('127.0.0.1', 54301)
Unable to update metadata from [1]
Unable connect to node with id 1: [Errno 111] Connect call failed ('127.0.0.1', 54301)
Got error produce response: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 1).
Unable connect to node with id 1: [Errno 111] Connect call failed ('127.0.0.1', 54301)
Unable to update metadata from [1]
Unable connect to node with id 1: [Errno 111] Connect call failed ('127.0.0.1', 54301)
Got error produce response: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 1).
Unable connect to node with id 1: [Errno 111] Connect call failed ('127.0.0.1', 54301)
Unable to update metadata from [1]
Unable connect to node with id 1: [Errno 111] Connect call failed ('127.0.0.1', 54301)
Got error produce response: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 1).
Unable connect to node with id 1: [Errno 111] Connect call failed ('127.0.0.1', 54301)
Unable to update metadata from [1]
Traceback (most recent call last):
  File "/usr/lib/python3.11/asyncio/tasks.py", line 490, in wait_for
    return fut.result()
           ^^^^^^^^^^^^
asyncio.exceptions.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/<user>/test_manual/aiokafka_test.py", line 36, in main
    await asyncio.wait_for(future, 6)
  File "/usr/lib/python3.11/asyncio/tasks.py", line 492, in wait_for
    raise exceptions.TimeoutError() from exc
TimeoutError
stopping client
Unable connect to node with id 1: [Errno 111] Connect call failed ('127.0.0.1', 54301)
Got error produce response: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 1).
Unable connect to node with id 1: [Errno 111] Connect call failed ('127.0.0.1', 54301)
Unable to update metadata from [1]
Unable connect to node with id 1: [Errno 111] Connect call failed ('127.0.0.1', 54301)
Got error produce response: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 1).
Unable connect to node with id 1: [Errno 111] Connect call failed ('127.0.0.1', 54301)

*** continues seemingly forever ***

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions