Skip to content

Commit f1d076c

Browse files
author
Valeriya Popova
committed
update pytest version to prevent warning
1 parent 8ea665f commit f1d076c

File tree

6 files changed

+38
-22
lines changed

6 files changed

+38
-22
lines changed

test-requirements.txt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@ pycparser==2.20
2626
PyNaCl==1.4.0
2727
pyparsing==2.4.7
2828
pyrsistent==0.18.0
29-
pytest==6.2.4
30-
pytest-asyncio==0.15.1
29+
pytest==7.2.2
30+
pytest-asyncio==0.21.0
3131
pytest-docker-compose==3.2.1
3232
python-dotenv==0.18.0
3333
PyYAML==5.4.1

ydb/_topic_reader/datatypes_test.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ def add_notify(future, notified_offset):
175175
),
176176
],
177177
)
178-
def test_add_waiter(
178+
async def test_add_waiter(
179179
self,
180180
session,
181181
original: List[PartitionSession.CommitAckWaiter],
@@ -188,13 +188,13 @@ def test_add_waiter(
188188
assert result == session._ack_waiters
189189
assert res.future.done() == is_done
190190

191-
def test_close_notify_waiters(self, session):
191+
async def test_close_notify_waiters(self, session):
192192
waiter = session.add_waiter(session.committed_offset + 1)
193193
session.close()
194194

195195
with pytest.raises(topic_reader_asyncio.TopicReaderCommitToExpiredPartition):
196196
waiter.future.result()
197197

198-
def test_close_twice(self, session):
198+
async def test_close_twice(self, session):
199199
session.close()
200200
session.close()

ydb/_topic_reader/topic_reader_asyncio.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -606,7 +606,8 @@ async def flush(self):
606606
for session in self._partition_sessions.values():
607607
futures.extend(w.future for w in session._ack_waiters)
608608

609-
await asyncio.gather(*futures)
609+
if futures:
610+
await asyncio.wait(futures)
610611

611612
async def close(self):
612613
if self._closed:
@@ -622,4 +623,6 @@ async def close(self):
622623

623624
for task in self._background_tasks:
624625
task.cancel()
625-
await asyncio.wait(self._background_tasks)
626+
627+
if self._background_tasks:
628+
await asyncio.wait(self._background_tasks)

ydb/_topic_reader/topic_reader_asyncio_test.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -411,6 +411,8 @@ async def test_commit_ranges_for_received_messages(
411411
received = stream_reader_started.receive_batch_nowait().messages
412412
assert received == [m2]
413413

414+
await stream_reader_started.close()
415+
414416
# noinspection PyTypeChecker
415417
@pytest.mark.parametrize(
416418
"batch,data_out",

ydb/_topic_writer/topic_writer_asyncio.py

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -384,17 +384,21 @@ async def _connection_loop(self):
384384
await stream_writer.close()
385385
for task in tasks:
386386
task.cancel()
387-
await asyncio.wait(tasks)
387+
if tasks:
388+
await asyncio.wait(tasks)
388389

389390
async def _encode_loop(self):
390-
while True:
391-
messages = await self._messages_for_encode.get()
392-
while not self._messages_for_encode.empty():
393-
messages.extend(self._messages_for_encode.get_nowait())
391+
try:
392+
while True:
393+
messages = await self._messages_for_encode.get()
394+
while not self._messages_for_encode.empty():
395+
messages.extend(self._messages_for_encode.get_nowait())
394396

395-
batch_codec = await self._codec_selector(messages)
396-
await self._encode_data_inplace(batch_codec, messages)
397-
self._add_messages_to_send_queue(messages)
397+
batch_codec = await self._codec_selector(messages)
398+
await self._encode_data_inplace(batch_codec, messages)
399+
self._add_messages_to_send_queue(messages)
400+
except BaseException as err:
401+
self._stop(err)
398402

399403
async def _encode_data_inplace(self, codec: PublicCodec, messages: List[InternalMessage]):
400404
if codec == PublicCodec.RAW:
@@ -531,10 +535,9 @@ async def _send_loop(self, writer: "WriterAsyncIOStream"):
531535
writer.write([m])
532536
except Exception as e:
533537
self._stop(e)
534-
finally:
535-
pass
538+
raise
536539

537-
def _stop(self, reason: Exception):
540+
def _stop(self, reason: BaseException):
538541
if reason is None:
539542
raise Exception("writer stop reason can not be None")
540543

@@ -554,7 +557,7 @@ async def flush(self):
554557
return
555558

556559
# wait last message
557-
await asyncio.wait((self._messages_future[-1],))
560+
await asyncio.wait(self._messages_future)
558561

559562

560563
class WriterAsyncIOStream:
@@ -647,7 +650,7 @@ async def _start(self, stream: IGrpcWrapperAsyncIO, init_message: StreamWriteMes
647650
@staticmethod
648651
def _ensure_ok(message: WriterMessagesFromServerToClient):
649652
if not message.status.is_success():
650-
raise TopicWriterError("status error from server in writer: %s", message.status)
653+
raise TopicWriterError(f"status error from server in writer: {message.status}")
651654

652655
def write(self, messages: List[InternalMessage]):
653656
if self._closed:

ydb/_topic_writer/topic_writer_asyncio_test.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,8 @@ async def test_update_token(self, stream: StreamMock):
183183
receive_task.cancel()
184184
await asyncio.wait([receive_task])
185185

186+
await writer.close()
187+
186188

187189
@pytest.mark.asyncio
188190
class TestWriterAsyncIOReconnector:
@@ -439,8 +441,11 @@ async def test_auto_seq_no(self, default_driver, default_settings, get_stream_wr
439441

440442
await reconnector.close(flush=False)
441443

442-
async def test_deny_double_seqno(self, reconnector: WriterAsyncIOReconnector):
444+
async def test_deny_double_seqno(self, reconnector: WriterAsyncIOReconnector, get_stream_writer):
445+
writer = get_stream_writer()
446+
443447
await reconnector.write_with_ack_future([PublicMessage(seqno=10, data="123")])
448+
writer.from_server.put_nowait(self.make_default_ack_message(seq_no=10))
444449

445450
with pytest.raises(TopicWriterError):
446451
await reconnector.write_with_ack_future([PublicMessage(seqno=9, data="123")])
@@ -449,8 +454,9 @@ async def test_deny_double_seqno(self, reconnector: WriterAsyncIOReconnector):
449454
await reconnector.write_with_ack_future([PublicMessage(seqno=10, data="123")])
450455

451456
await reconnector.write_with_ack_future([PublicMessage(seqno=11, data="123")])
457+
writer.from_server.put_nowait(self.make_default_ack_message(seq_no=11))
452458

453-
await reconnector.close(flush=False)
459+
await reconnector.close(flush=True)
454460

455461
@freezegun.freeze_time("2022-01-13 20:50:00", tz_offset=0)
456462
async def test_auto_created_at(self, default_driver, default_settings, get_stream_writer):
@@ -571,6 +577,8 @@ async def test_encode_data_inplace(
571577
assert mess.codec == codec
572578
assert mess.get_bytes() == expected_datas[index]
573579

580+
await reconnector.close(flush=True)
581+
574582
async def test_custom_encoder(self, default_driver, default_settings, get_stream_writer):
575583
codec = 10001
576584

0 commit comments

Comments
 (0)