Skip to content

Closing of connection from server don't caught for uvloop #1038

@dimastbk

Description

@dimastbk

Describe the bug
We are using aiokafka and uvloop. When connection closed from server and aiokafka do attempt to send something to it, RuntimeError is raised (but only OSError is caught).

Traceback
Traceback (most recent call last):

  File \"/opt/pysetup/.venv/lib/python3.11/site-packages/aiokafka/consumer/group_coordinator.py\", line 554, in _coordination_routine
    await self.__coordination_routine()

  File \"/opt/pysetup/.venv/lib/python3.11/site-packages/aiokafka/consumer/group_coordinator.py\", line 612, in __coordination_routine
    wait_timeout = await self._maybe_do_autocommit(assignment)
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

  File \"/opt/pysetup/.venv/lib/python3.11/site-packages/aiokafka/consumer/group_coordinator.py\", line 907, in _maybe_do_autocommit
    await self._do_commit_offsets(

  File \"/opt/pysetup/.venv/lib/python3.11/site-packages/aiokafka/consumer/group_coordinator.py\", line 995, in _do_commit_offsets
    response = await self._send_req(request)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

  File \"/opt/pysetup/.venv/lib/python3.11/site-packages/aiokafka/consumer/group_coordinator.py\", line 284, in _send_req
    resp = await self._client.send(
           ^^^^^^^^^^^^^^^^^^^^^^^^

  File \"/opt/pysetup/.venv/lib/python3.11/site-packages/aiokafka/client.py\", line 502, in send
    future = self._conns[(node_id, group)].send(
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

  File \"/opt/pysetup/.venv/lib/python3.11/site-packages/aiokafka/conn.py\", line 439, in send
    self._writer.write(size + message)

  File \"/usr/local/lib/python3.11/asyncio/streams.py\", line 346, in write
    self._transport.write(data)

  File \"uvloop/handles/stream.pyx\", line 674, in uvloop.loop.UVStream.write

  File \"uvloop/handles/handle.pyx\", line 159, in uvloop.loop.UVHandle._ensure_alive

RuntimeError: unable to perform operation on <TCPTransport closed=True reading=False 0x55e23b4f8540>; the handler is closed


During handling of the above exception, another exception occurred:


Traceback (most recent call last):

  File \"/usr/src/consumers/kafka/consumer.py\", line 68, in run
    async for message in self.consumer:

  File \"/opt/pysetup/.venv/lib/python3.11/site-packages/opentelemetry/instrumentation/aiokafka/utils.py\", line 207, in _traced_next
    record = await func(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^

  File \"/opt/pysetup/.venv/lib/python3.11/site-packages/aiokafka/consumer/consumer.py\", line 1263, in __anext__
    return (await self.getone())
            ^^^^^^^^^^^^^^^^^^^

  File \"/opt/pysetup/.venv/lib/python3.11/site-packages/aiokafka/consumer/consumer.py\", line 1146, in getone
    self._coordinator.check_errors()

  File \"/opt/pysetup/.venv/lib/python3.11/site-packages/aiokafka/consumer/group_coordinator.py\", line 299, in check_errors
    self._coordination_task.result()

aiokafka.errors.KafkaError: KafkaError: Unexpected error during coordination RuntimeError('unable to perform operation on <TCPTransport closed=True reading=False 0x55e23b4f8540>; the handler is closed

Expected behaviour
Aiokafka catches this error and recreate connection.

Environment (please complete the following information):

  • aiokafka version (python -c "import aiokafka; print(aiokafka.__version__)"): 0.10.0
  • uvloop: 0.19.0
  • Kafka Broker version (kafka-topics.sh --version): 3.5

Reproducible example

class TestClosedSocket:
    @pytest.fixture(
        params=(
            asyncio.DefaultEventLoopPolicy(),
            uvloop.EventLoopPolicy(),
        ),
    )
    def event_loop(
        self, request: pytest.FixtureRequest
    ) -> Iterable[asyncio.AbstractEventLoop]:
        loop: asyncio.AbstractEventLoop = request.param.new_event_loop()
        yield loop
        loop.close()

    @pytest.fixture()
    def server(self, unused_tcp_port: int) -> Iterable[Tuple[str, int, socket.socket]]:
        host = "localhost"
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.bind((host, unused_tcp_port))
        sock.listen(8)
        sock.setblocking(False)

        yield host, unused_tcp_port, sock

        sock.close()

    @pytest_asyncio.fixture()
    async def conn(
        self, server: Tuple[str, int, socket.socket]
    ) -> AsyncIterable[AIOKafkaConnection]:
        host, port, _ = server

        conn = AIOKafkaConnection(host=host, port=port, request_timeout_ms=1000)
        conn._create_reader_task = mock.Mock()

        yield conn

        fut = conn.close()
        if fut:
            await fut

    @pytest.mark.asyncio
    async def test_send_to_closed_socket(
        self, server: Tuple[str, int, socket.socket], conn: AIOKafkaConnection
    ) -> None:
        _, _, sock = server

        request = MetadataRequest([])

        with pytest.raises(KafkaConnectionError):
            await conn.send(request)

        await conn.connect()

        sock.close()
        await asyncio.sleep(0.1)

        with pytest.raises(KafkaConnectionError):
            await conn.send(request)

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions