Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions aiokafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,12 @@
f"No connection to broker at {self._host}:{self._port}"
)

if self._writer.is_closing():
self.close(reason=CloseReason.CONNECTION_BROKEN)
raise Errors.KafkaConnectionError(

Check warning on line 463 in aiokafka/conn.py

View check run for this annotation

Codecov / codecov/patch

aiokafka/conn.py#L462-L463

Added lines #L462 - L463 were not covered by tests
f"Connection at {self._host}:{self._port} is closing"
)

correlation_id = self._next_correlation_id()
header = request.build_request_header(
correlation_id=correlation_id, client_id=self._client_id
Expand Down
1 change: 1 addition & 0 deletions requirements-ci.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ Pygments==2.18.0
gssapi==1.9.0
async-timeout==4.0.3
cramjam==2.9.0
uvloop==0.19.0
89 changes: 84 additions & 5 deletions tests/test_conn.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
import asyncio
import gc
import socket
import struct
import sys
from collections.abc import AsyncIterable, Iterable
from typing import Any
from unittest import mock

import pytest
import pytest_asyncio

from aiokafka.conn import AIOKafkaConnection, VersionInfo, create_conn
from aiokafka.errors import (
Expand Down Expand Up @@ -144,7 +148,7 @@
with self.assertRaises(KafkaConnectionError):
await conn.send(request)

conn._writer = mock.MagicMock()
conn._writer = mock.MagicMock(is_closing=mock.Mock(return_value=False))

Check warning on line 151 in tests/test_conn.py

View check run for this annotation

Codecov / codecov/patch

tests/test_conn.py#L151

Added line #L151 was not covered by tests
conn._writer.write.side_effect = OSError("mocked writer is closed")

with self.assertRaises(KafkaConnectionError):
Expand Down Expand Up @@ -173,7 +177,7 @@
return resp

reader.readexactly.side_effect = [first_resp(), second_resp()]
writer = mock.MagicMock()
writer = mock.MagicMock(is_closing=mock.Mock(return_value=False))

Check warning on line 180 in tests/test_conn.py

View check run for this annotation

Codecov / codecov/patch

tests/test_conn.py#L180

Added line #L180 was not covered by tests

conn._reader = reader
conn._writer = writer
Expand Down Expand Up @@ -208,7 +212,7 @@
return resp

reader.readexactly.side_effect = [first_resp(), second_resp()]
writer = mock.MagicMock()
writer = mock.MagicMock(is_closing=mock.Mock(return_value=False))

Check warning on line 215 in tests/test_conn.py

View check run for this annotation

Codecov / codecov/patch

tests/test_conn.py#L215

Added line #L215 was not covered by tests

conn._reader = reader
conn._writer = writer
Expand Down Expand Up @@ -237,7 +241,7 @@
# setup reader
reader = mock.MagicMock()
reader.readexactly.return_value = invoke_osserror()
writer = mock.MagicMock()
writer = mock.MagicMock(is_closing=mock.Mock(return_value=False))

Check warning on line 244 in tests/test_conn.py

View check run for this annotation

Codecov / codecov/patch

tests/test_conn.py#L244

Added line #L244 was not covered by tests

conn._reader = reader
conn._writer = writer
Expand Down Expand Up @@ -394,7 +398,7 @@
# setup connection with mocked transport and protocol
conn = AIOKafkaConnection(host="", port=9999)
conn.close = mock.MagicMock()
conn._writer = mock.MagicMock()
conn._writer = mock.MagicMock(is_closing=mock.Mock(return_value=False))

Check warning on line 401 in tests/test_conn.py

View check run for this annotation

Codecov / codecov/patch

tests/test_conn.py#L401

Added line #L401 was not covered by tests
out_buffer = []
conn._writer.write = mock.Mock(side_effect=out_buffer.append)
conn._reader = mock.MagicMock()
Expand Down Expand Up @@ -424,3 +428,78 @@
conn._send_sasl_token(b"Super data")
# We don't need to close 2ce
self.assertEqual(conn.close.call_count, 1)


@pytest.mark.skipif(sys.platform == "win32", reason="Uvloop doesn't support Windows")
class TestClosedSocket:
@pytest.fixture(
params=(
pytest.param("asyncio", id="asyncio"),
pytest.param("uvloop", id="uvloop"),
),
)
def event_loop_policy(
self, request: pytest.FixtureRequest
) -> Iterable[asyncio.AbstractEventLoopPolicy]:
if request.param == "asyncio":
policy = asyncio.DefaultEventLoopPolicy()

Check warning on line 445 in tests/test_conn.py

View check run for this annotation

Codecov / codecov/patch

tests/test_conn.py#L445

Added line #L445 was not covered by tests
elif request.param == "uvloop":
import uvloop

Check warning on line 447 in tests/test_conn.py

View check run for this annotation

Codecov / codecov/patch

tests/test_conn.py#L447

Added line #L447 was not covered by tests

policy = uvloop.EventLoopPolicy()

Check warning on line 449 in tests/test_conn.py

View check run for this annotation

Codecov / codecov/patch

tests/test_conn.py#L449

Added line #L449 was not covered by tests
else:
raise ValueError(f"loop {request.param} is not supported")

Check warning on line 451 in tests/test_conn.py

View check run for this annotation

Codecov / codecov/patch

tests/test_conn.py#L451

Added line #L451 was not covered by tests

yield policy

Check warning on line 453 in tests/test_conn.py

View check run for this annotation

Codecov / codecov/patch

tests/test_conn.py#L453

Added line #L453 was not covered by tests

@pytest.fixture()
def server(self, unused_tcp_port: int) -> Iterable[tuple[str, int, socket.socket]]:
host = "localhost"
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind((host, unused_tcp_port))
sock.listen(8)
sock.setblocking(False)

Check warning on line 461 in tests/test_conn.py

View check run for this annotation

Codecov / codecov/patch

tests/test_conn.py#L457-L461

Added lines #L457 - L461 were not covered by tests

yield host, unused_tcp_port, sock

Check warning on line 463 in tests/test_conn.py

View check run for this annotation

Codecov / codecov/patch

tests/test_conn.py#L463

Added line #L463 was not covered by tests

sock.close()

Check warning on line 465 in tests/test_conn.py

View check run for this annotation

Codecov / codecov/patch

tests/test_conn.py#L465

Added line #L465 was not covered by tests

@pytest_asyncio.fixture()
async def conn(
self, server: tuple[str, int, socket.socket]
) -> AsyncIterable[AIOKafkaConnection]:
host, port, _ = server

Check warning on line 471 in tests/test_conn.py

View check run for this annotation

Codecov / codecov/patch

tests/test_conn.py#L471

Added line #L471 was not covered by tests

conn = AIOKafkaConnection(host=host, port=port, request_timeout_ms=1000)
conn._create_reader_task = mock.Mock()

Check warning on line 474 in tests/test_conn.py

View check run for this annotation

Codecov / codecov/patch

tests/test_conn.py#L473-L474

Added lines #L473 - L474 were not covered by tests

yield conn

Check warning on line 476 in tests/test_conn.py

View check run for this annotation

Codecov / codecov/patch

tests/test_conn.py#L476

Added line #L476 was not covered by tests

fut = conn.close()

Check warning on line 478 in tests/test_conn.py

View check run for this annotation

Codecov / codecov/patch

tests/test_conn.py#L478

Added line #L478 was not covered by tests
if fut:
await fut

Check warning on line 480 in tests/test_conn.py

View check run for this annotation

Codecov / codecov/patch

tests/test_conn.py#L480

Added line #L480 was not covered by tests

Check notice

Code scanning / CodeQL

Statement has no effect Note test

This statement has no effect.

@pytest.mark.asyncio
async def test_send_to_closed_socket(
self, server: tuple[str, int, socket.socket], conn: AIOKafkaConnection
) -> None:
host, port, sock = server

Check warning on line 486 in tests/test_conn.py

View check run for this annotation

Codecov / codecov/patch

tests/test_conn.py#L486

Added line #L486 was not covered by tests

request = MetadataRequest([])

Check warning on line 488 in tests/test_conn.py

View check run for this annotation

Codecov / codecov/patch

tests/test_conn.py#L488

Added line #L488 was not covered by tests

with pytest.raises(

Check warning on line 490 in tests/test_conn.py

View check run for this annotation

Codecov / codecov/patch

tests/test_conn.py#L490

Added line #L490 was not covered by tests
KafkaConnectionError,
match=f"KafkaConnectionError: No connection to broker at {host}:{port}",
):
await conn.send(request)

Check warning on line 494 in tests/test_conn.py

View check run for this annotation

Codecov / codecov/patch

tests/test_conn.py#L494

Added line #L494 was not covered by tests

await conn.connect()

Check warning on line 496 in tests/test_conn.py

View check run for this annotation

Codecov / codecov/patch

tests/test_conn.py#L496

Added line #L496 was not covered by tests

sock.close()
await asyncio.sleep(0.1)

Check warning on line 499 in tests/test_conn.py

View check run for this annotation

Codecov / codecov/patch

tests/test_conn.py#L498-L499

Added lines #L498 - L499 were not covered by tests

with pytest.raises(

Check warning on line 501 in tests/test_conn.py

View check run for this annotation

Codecov / codecov/patch

tests/test_conn.py#L501

Added line #L501 was not covered by tests
KafkaConnectionError,
match=f"KafkaConnectionError: Connection at {host}:{port} is closing",
):
await conn.send(request)

Check warning on line 505 in tests/test_conn.py

View check run for this annotation

Codecov / codecov/patch

tests/test_conn.py#L505

Added line #L505 was not covered by tests
Loading