Skip to content

Commit 933b754

Browse files
committed
Fix tests and move _loop upper in broker init methid
1 parent b879371 commit 933b754

File tree

3 files changed

+32
-55
lines changed

3 files changed

+32
-55
lines changed

taskiq_aio_kafka/broker.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,8 @@ def __init__( # noqa: WPS211
7878

7979
self._bootstrap_servers: Optional[Union[str, List[str]]] = bootstrap_servers
8080

81+
self._loop: Optional[asyncio.AbstractEventLoop] = loop
82+
8183
self._kafka_topic: NewTopic = kafka_topic or NewTopic(
8284
name="taskiq_topic",
8385
num_partitions=1,
@@ -108,7 +110,6 @@ def __init__( # noqa: WPS211
108110
client_id="kafka-python-taskiq",
109111
)
110112
)
111-
self._loop: Optional[asyncio.AbstractEventLoop] = loop
112113

113114
self._delete_topic_on_shutdown: bool = delete_topic_on_shutdown
114115

tests/conftest.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,11 @@ def kafka_url() -> str:
3232

3333
@pytest.fixture()
3434
def base_topic_name() -> str:
35-
"""Generate topic name.
35+
"""Return topic name.
3636
37-
:returns: random topic name.
37+
:returns: topic name.
3838
"""
39-
return uuid4().hex
39+
return "taskiq_topic"
4040

4141

4242
@pytest.fixture()

tests/test_broker.py

Lines changed: 27 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,15 @@ async def test_kick_success(broker: AioKafkaBroker) -> None:
4444

4545
await broker.kick(message_to_send)
4646

47-
received_message = await asyncio.wait_for(get_first_task(broker), timeout=1)
47+
received_message_bytes: bytes = await asyncio.wait_for(
48+
get_first_task(broker),
49+
timeout=1,
50+
)
51+
assert pickle.dumps(message_to_send) == received_message_bytes
52+
53+
received_message: BrokerMessage = pickle.loads(
54+
received_message_bytes,
55+
)
4856

4957
assert message_to_send == received_message
5058

@@ -94,62 +102,30 @@ async def test_listen(
94102
message: bytes = pickle.dumps(uuid4().hex)
95103
labels: Dict[str, str] = {"test_label": "123"}
96104

105+
message_to_send: BrokerMessage = BrokerMessage(
106+
task_id=task_id,
107+
task_name=task_name,
108+
message=message,
109+
labels=labels,
110+
)
111+
97112
await test_kafka_producer.send(
98113
topic=base_topic_name,
99-
value=pickle.dumps(
100-
BrokerMessage(
101-
task_id=task_id,
102-
task_name=task_name,
103-
message=message,
104-
labels=labels,
105-
),
106-
),
114+
value=pickle.dumps(message_to_send),
107115
)
108116

109-
broker_message: BrokerMessage = pickle.loads( # noqa: S301
110-
await asyncio.wait_for(
111-
get_first_task(broker),
112-
timeout=1,
113-
),
117+
received_message_bytes: bytes = await asyncio.wait_for(
118+
get_first_task(broker),
119+
timeout=1,
114120
)
115121

116-
assert broker_message.message == message
117-
assert broker_message.labels == labels
118-
assert broker_message.task_id == task_id
119-
assert broker_message.task_name == task_name
120-
121-
122-
@pytest.mark.anyio
123-
async def test_delayed_message(
124-
broker: AioKafkaBroker,
125-
) -> None:
126-
"""Test that delayed messages are delivered correctly.
122+
assert pickle.dumps(message_to_send) == received_message_bytes
127123

128-
This test send message with delay label,
129-
checks that this message will appear in main topic
130-
only after delay time.
131-
132-
:param broker: current broker.
133-
"""
134-
broker_msg = BrokerMessage(
135-
task_id="1",
136-
task_name="name",
137-
message=pickle.dumps("message"),
138-
labels={"delay": "3"},
124+
received_message: BrokerMessage = pickle.loads(
125+
received_message_bytes,
139126
)
140127

141-
await broker.kick(broker_msg)
142-
143-
with pytest.raises(Exception):
144-
await asyncio.wait_for(
145-
get_first_task(broker),
146-
timeout=1,
147-
)
148-
149-
await asyncio.sleep(3)
150-
151-
received_message = await asyncio.wait_for(
152-
get_first_task(broker),
153-
timeout=1,
154-
)
155-
assert received_message == broker_msg
128+
assert received_message.message == message
129+
assert received_message.labels == labels
130+
assert received_message.task_id == task_id
131+
assert received_message.task_name == task_name

0 commit comments

Comments
 (0)