Skip to content

Commit 69300d3

Browse files
committed
Merge branch 'test-297' into 297-stop-partition
2 parents c62147d + 430854a commit 69300d3

File tree

3 files changed

+64
-1
lines changed

3 files changed

+64
-1
lines changed

tests/conftest.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,26 @@ async def topic2_path(driver, topic_consumer, database) -> str:
176176
return topic_path
177177

178178

179+
@pytest.fixture()
180+
@pytest.mark.asyncio()
181+
async def topic_with_two_partitions_path(driver, topic_consumer, database) -> str:
182+
topic_path = database + "/test-topic-two-partitions"
183+
184+
try:
185+
await driver.topic_client.drop_topic(topic_path)
186+
except issues.SchemeError:
187+
pass
188+
189+
await driver.topic_client.create_topic(
190+
path=topic_path,
191+
consumers=[topic_consumer],
192+
min_active_partitions=2,
193+
partition_count_limit=2,
194+
)
195+
196+
return topic_path
197+
198+
179199
@pytest.fixture()
180200
@pytest.mark.asyncio()
181201
async def topic_with_messages(driver, topic_consumer, database):

tests/topics/test_topic_reader.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import asyncio
2+
13
import pytest
24

35
import ydb
@@ -161,3 +163,44 @@ def decode(b: bytes):
161163
with driver_sync.topic_client.reader(topic_path, topic_consumer, decoders={codec: decode}) as reader:
162164
batch = reader.receive_batch()
163165
assert batch.messages[0].data.decode() == "123"
166+
167+
168+
@pytest.mark.asyncio
169+
class TestBugFixesAsync:
170+
@pytest.mark.skip("LOGBROKER-8319")
171+
async def test_issue_297_bad_handle_stop_partition(self, driver, topic_consumer, topic_with_two_partitions_path: str):
172+
173+
async def wait(fut):
174+
return await asyncio.wait_for(fut, timeout=10)
175+
176+
topic = topic_with_two_partitions_path # type: str
177+
178+
async with driver.topic_client.writer(topic, partition_id=0) as writer:
179+
await writer.write_with_ack("01")
180+
181+
async with driver.topic_client.writer(topic, partition_id=1) as writer:
182+
await writer.write_with_ack("1")
183+
184+
# Start first reader and receive messages from both partitions
185+
reader0 = driver.topic_client.reader(topic, consumer=topic_consumer)
186+
await wait(reader0.receive_message())
187+
await wait(reader0.receive_message())
188+
189+
# Start second reader for same topic, same consumer, partition 1
190+
reader1 = driver.topic_client.reader(ydb.TopicReaderSelector(
191+
path=topic,
192+
partitions=1,
193+
), consumer=topic_consumer)
194+
195+
await asyncio.sleep(0.1)
196+
197+
# receive uncommited message from partition 1
198+
msg = await wait(reader1.receive_message())
199+
assert msg.data.decode() == "1"
200+
201+
# write message to partition 0 - for reader 0
202+
# async with driver.topic_client.writer(topic, partition_id=0) as writer:
203+
# await writer.write_with_ack("02")
204+
205+
msg = await wait(reader0.receive_message())
206+
assert msg.data.decode() == "02"

ydb/_topic_reader/topic_reader_asyncio.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -451,7 +451,7 @@ async def _read_messages_loop(self):
451451
self._state_changed.set()
452452
except Exception as e:
453453
self._set_first_error(e)
454-
raise
454+
return
455455

456456
async def _update_token_loop(self):
457457
while True:

0 commit comments

Comments
 (0)