Skip to content

Commit 008872f

Browse files
committed
FIX: Live client backpressure test flakiness
1 parent 252830d commit 008872f

File tree

1 file changed

+25
-16
lines changed

1 file changed

+25
-16
lines changed

tests/test_live_client.py

Lines changed: 25 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
"""Unit tests for the Live client."""
2+
import asyncio
23
import pathlib
34
import platform
45
from io import BytesIO
@@ -10,10 +11,17 @@
1011
import zstandard
1112
from databento.common.cram import BUCKET_ID_LENGTH
1213
from databento.common.dbnstore import DBNStore
13-
from databento.common.enums import Dataset, Encoding, Schema, SType
14+
from databento.common.enums import Dataset
15+
from databento.common.enums import Encoding
16+
from databento.common.enums import Schema
17+
from databento.common.enums import SType
1418
from databento.common.error import BentoError
1519
from databento.common.symbology import ALL_SYMBOLS
16-
from databento.live import DBNRecord, client, gateway, protocol, session
20+
from databento.live import DBNRecord
21+
from databento.live import client
22+
from databento.live import gateway
23+
from databento.live import protocol
24+
from databento.live import session
1725

1826
from tests.mock_live_server import MockLiveServer
1927

@@ -128,7 +136,6 @@ def test_live_connection_cram_failure(
128136
Test that a failed auth message due to an incorrect CRAM
129137
raies a BentoError.
130138
"""
131-
132139
# Dork up the API key in the mock client to fail CRAM
133140
bucket_id = test_api_key[-BUCKET_ID_LENGTH:]
134141
invalid_key = "db-invalidkey00000000000000FFFFF"
@@ -500,7 +507,6 @@ def test_live_add_stream(
500507
"""
501508
Test that calling add_stream adds that stream to the client.
502509
"""
503-
504510
stream = BytesIO()
505511

506512
live_client.add_stream(stream)
@@ -578,19 +584,19 @@ async def test_live_async_iteration_backpressure(
578584
symbols="TEST",
579585
)
580586

587+
monkeypatch.setattr(live_client._session._transport, "pause_reading", pause_mock:=MagicMock())
588+
581589
live_client.start()
590+
it = live_client.__iter__()
591+
await live_client.wait_for_close()
582592

583-
records: List[DBNRecord] = []
584-
async for record in live_client:
585-
records.append(record)
593+
assert pause_mock.called
586594

595+
records = list(it)
587596
assert len(records) == 4
588-
assert isinstance(records[0], databento_dbn.MBOMsg)
589-
assert isinstance(records[1], databento_dbn.MBOMsg)
590-
assert isinstance(records[2], databento_dbn.MBOMsg)
591-
assert isinstance(records[3], databento_dbn.MBOMsg)
592597
assert live_client._dbn_queue.empty()
593598

599+
594600
@pytest.mark.skipif(platform.system() == "Windows", reason="flaky on windows runner")
595601
async def test_live_async_iteration_dropped(
596602
monkeypatch: pytest.MonkeyPatch,
@@ -616,16 +622,18 @@ async def test_live_async_iteration_dropped(
616622
symbols="TEST",
617623
)
618624

625+
monkeypatch.setattr(live_client._session._transport, "pause_reading", pause_mock:=MagicMock())
626+
619627
live_client.start()
628+
it = live_client.__iter__()
629+
await live_client.wait_for_close()
620630

621-
records: List[DBNRecord] = []
622-
async for record in live_client:
623-
records.append(record)
631+
assert pause_mock.called
624632

625-
assert len(records) < 4
633+
records = list(it)
634+
assert len(records) == 1
626635
assert live_client._dbn_queue.empty()
627636

628-
629637
@pytest.mark.skipif(platform.system() == "Windows", reason="flaky on windows runner")
630638
async def test_live_async_iteration_stop(
631639
live_client: client.Live,
@@ -651,6 +659,7 @@ async def test_live_async_iteration_stop(
651659
assert len(records) > 1
652660
assert live_client._dbn_queue.empty()
653661

662+
654663
@pytest.mark.skipif(platform.system() == "Windows", reason="flaky on windows runner")
655664
def test_live_sync_iteration(
656665
live_client: client.Live,

0 commit comments

Comments
 (0)