Skip to content

Commit 430854a

Browse files
committed
add test for issue
1 parent 6c61d5f commit 430854a

File tree

3 files changed

+64
-1
lines changed

3 files changed

+64
-1
lines changed

tests/conftest.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,26 @@ async def topic2_path(driver, topic_consumer, database) -> str:
176176
return topic_path
177177

178178

179+
@pytest.fixture()
180+
@pytest.mark.asyncio()
181+
async def topic_with_two_partitions_path(driver, topic_consumer, database) -> str:
182+
topic_path = database + "/test-topic-two-partitions"
183+
184+
try:
185+
await driver.topic_client.drop_topic(topic_path)
186+
except issues.SchemeError:
187+
pass
188+
189+
await driver.topic_client.create_topic(
190+
path=topic_path,
191+
consumers=[topic_consumer],
192+
min_active_partitions=2,
193+
partition_count_limit=2,
194+
)
195+
196+
return topic_path
197+
198+
179199
@pytest.fixture()
180200
@pytest.mark.asyncio()
181201
async def topic_with_messages(driver, topic_consumer, database):

tests/topics/test_topic_reader.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import asyncio
2+
13
import pytest
24

35
import ydb
@@ -161,3 +163,44 @@ def decode(b: bytes):
161163
with driver_sync.topic_client.reader(topic_path, topic_consumer, decoders={codec: decode}) as reader:
162164
batch = reader.receive_batch()
163165
assert batch.messages[0].data.decode() == "123"
166+
167+
168+
@pytest.mark.asyncio
169+
class TestBugFixesAsync:
170+
@pytest.mark.skip("LOGBROKER-8319")
171+
async def test_issue_297_bad_handle_stop_partition(self, driver, topic_consumer, topic_with_two_partitions_path: str):
172+
173+
async def wait(fut):
174+
return await asyncio.wait_for(fut, timeout=10)
175+
176+
topic = topic_with_two_partitions_path # type: str
177+
178+
async with driver.topic_client.writer(topic, partition_id=0) as writer:
179+
await writer.write_with_ack("01")
180+
181+
async with driver.topic_client.writer(topic, partition_id=1) as writer:
182+
await writer.write_with_ack("1")
183+
184+
# Start first reader and receive messages from both partitions
185+
reader0 = driver.topic_client.reader(topic, consumer=topic_consumer)
186+
await wait(reader0.receive_message())
187+
await wait(reader0.receive_message())
188+
189+
# Start second reader for same topic, same consumer, partition 1
190+
reader1 = driver.topic_client.reader(ydb.TopicReaderSelector(
191+
path=topic,
192+
partitions=1,
193+
), consumer=topic_consumer)
194+
195+
await asyncio.sleep(0.1)
196+
197+
# receive uncommited message from partition 1
198+
msg = await wait(reader1.receive_message())
199+
assert msg.data.decode() == "1"
200+
201+
# write message to partition 0 - for reader 0
202+
# async with driver.topic_client.writer(topic, partition_id=0) as writer:
203+
# await writer.write_with_ack("02")
204+
205+
msg = await wait(reader0.receive_message())
206+
assert msg.data.decode() == "02"

ydb/_topic_reader/topic_reader_asyncio.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -427,7 +427,7 @@ async def _read_messages_loop(self):
427427
self._state_changed.set()
428428
except Exception as e:
429429
self._set_first_error(e)
430-
raise
430+
return
431431

432432
async def _update_token_loop(self):
433433
while True:

0 commit comments

Comments
 (0)