Skip to content

Commit 7e0c421

Browse files
committed
workaround specific partition bug LOGBROKER-8319
1 parent 438bde3 commit 7e0c421

File tree

1 file changed

+17
-20
lines changed

1 file changed

+17
-20
lines changed

tests/topics/test_topic_reader.py

Lines changed: 17 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -167,44 +167,41 @@ def decode(b: bytes):
167167

168168
@pytest.mark.asyncio
169169
class TestBugFixesAsync:
170-
@pytest.mark.skip("LOGBROKER-8319")
171170
async def test_issue_297_bad_handle_stop_partition(
172-
self, driver, topic_consumer, topic_with_two_partitions_path: str
171+
self, driver, topic_consumer, topic_with_two_partitions_path: str
173172
):
174173
async def wait(fut):
175174
return await asyncio.wait_for(fut, timeout=10)
176175

177176
topic = topic_with_two_partitions_path # type: str
178177

179178
async with driver.topic_client.writer(topic, partition_id=0) as writer:
180-
await writer.write_with_ack("01")
179+
await writer.write_with_ack("00")
181180

182181
async with driver.topic_client.writer(topic, partition_id=1) as writer:
183-
await writer.write_with_ack("1")
182+
await writer.write_with_ack("01")
184183

185184
# Start first reader and receive messages from both partitions
186185
reader0 = driver.topic_client.reader(topic, consumer=topic_consumer)
187186
await wait(reader0.receive_message())
188187
await wait(reader0.receive_message())
189188

190189
# Start second reader for same topic, same consumer, partition 1
191-
reader1 = driver.topic_client.reader(
192-
ydb.TopicReaderSelector(
193-
path=topic,
194-
partitions=1,
195-
),
196-
consumer=topic_consumer,
197-
)
190+
reader1 = driver.topic_client.reader(topic, consumer=topic_consumer)
198191

199-
await asyncio.sleep(0.1)
192+
# receive uncommited message
193+
await reader1.receive_message()
194+
195+
# write one message for every partition
196+
async with driver.topic_client.writer(topic, partition_id=0) as writer:
197+
await writer.write_with_ack("10")
198+
async with driver.topic_client.writer(topic, partition_id=0) as writer:
199+
await writer.write_with_ack("11")
200200

201-
# receive uncommited message from partition 1
202-
msg = await wait(reader1.receive_message())
203-
assert msg.data.decode() == "1"
201+
msg0 = await wait(reader0.receive_message())
202+
msg1 = await wait(reader1.receive_message())
204203

205-
# write message to partition 0 - for reader 0
206-
# async with driver.topic_client.writer(topic, partition_id=0) as writer:
207-
# await writer.write_with_ack("02")
204+
datas = [msg0.data.decode(), msg1.data.decode]
205+
datas.sort()
208206

209-
msg = await wait(reader0.receive_message())
210-
assert msg.data.decode() == "02"
207+
assert datas == ["10", "11"]

0 commit comments

Comments
 (0)