Skip to content

Commit 252830d

Browse files
committed
FIX: Live test flakiness
1 parent 9f064e9 commit 252830d

17 files changed

+139
-104
lines changed

databento/live/client.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -121,14 +121,14 @@ async def __anext__(self) -> DBNRecord:
121121

122122
def __iter__(self) -> "Live":
123123
logger.debug("starting iteration")
124-
self._dbn_queue._enabled = True
124+
self._dbn_queue._enabled.set()
125125
return self
126126

127127
def __next__(self) -> DBNRecord:
128128
if self._dbn_queue is None:
129129
raise ValueError("iteration has not started")
130130

131-
while not self._session.is_disconnected() or self._dbn_queue.qsize() > 0:
131+
while not self._session.is_disconnected() or self._dbn_queue._qsize() > 0:
132132
try:
133133
record = self._dbn_queue.get(block=False)
134134
except queue.Empty:
@@ -141,14 +141,14 @@ def __next__(self) -> DBNRecord:
141141
self._dbn_queue.task_done()
142142
return record
143143
finally:
144-
if not self._dbn_queue.full() and not self._session.is_reading():
144+
if not self._dbn_queue.half_full() and not self._session.is_reading():
145145
logger.debug(
146146
"resuming reading with %d pending records",
147-
self._dbn_queue.qsize(),
147+
self._dbn_queue._qsize(),
148148
)
149149
self._session.resume_reading()
150150

151-
self._dbn_queue._enabled = False
151+
self._dbn_queue._enabled.clear()
152152
raise StopIteration
153153

154154
def __repr__(self) -> str:

databento/live/session.py

Lines changed: 8 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import logging
44
import queue
55
import struct
6+
import threading
67
from typing import IO, Callable, Iterable, List, Optional, Set, Union
78

89
import databento_dbn
@@ -38,40 +39,23 @@ class DBNQueue(queue.Queue): # type: ignore [type-arg]
3839

3940
def __init__(self, maxsize: int) -> None:
4041
super().__init__(maxsize)
41-
self._enabled: bool = False
42-
43-
def put(
44-
self,
45-
item: DBNRecord,
46-
block: bool = True,
47-
timeout: Optional[float] = None,
48-
) -> None:
49-
if self._enabled:
50-
return super().put(item, block, timeout)
51-
raise ValueError("queue is not being iterated")
52-
53-
def put_nowait(
54-
self,
55-
item: DBNRecord,
56-
) -> None:
57-
if self._enabled:
58-
return super().put_nowait(item)
59-
raise ValueError("queue is not being iterated")
42+
self._enabled = threading.Event()
6043

6144
@property
6245
def enabled(self) -> bool:
6346
"""
6447
True if the Queue will allow pushing.
6548
A queue should only be enabled when it has a consumer.
6649
"""
67-
return self._enabled
50+
return self._enabled.is_set()
6851

69-
def full(self) -> bool:
52+
def half_full(self) -> bool:
7053
"""
7154
Implementation which reports the queue as full when it
7255
has reached half capacity.
7356
"""
74-
return self.qsize() > self.maxsize // 2
57+
with self.mutex:
58+
return self._qsize() > self.maxsize // 2
7559

7660

7761
@dataclasses.dataclass
@@ -180,10 +164,10 @@ def received_record(self, record: DBNRecord) -> None:
180164
record.ts_event,
181165
)
182166
else:
183-
if self._dbn_queue.full():
167+
if self._dbn_queue.half_full():
184168
logger.warning(
185169
"record queue is full; %d record(s) to be processed",
186-
self._dbn_queue.qsize(),
170+
self._dbn_queue._qsize(),
187171
)
188172
self.transport.pause_reading()
189173

tests/data/generator.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,21 @@
1111
print(schema.value)
1212
path = f"test_data.{schema.value}.dbn.zst"
1313

14+
if schema == Schema.IMBALANCE:
15+
dataset = "XNAS.ITCH"
16+
symbol = "NVDA"
17+
else:
18+
dataset = "GLBX.MDP3"
19+
symbol = "ESH1"
20+
1421
# Execute request through client
1522
data: DBNStore = client.timeseries.get_range(
16-
dataset="GLBX.MDP3",
17-
symbols=["ESH1"],
23+
dataset=dataset,
24+
symbols=[symbol],
1825
schema=schema,
1926
start="2020-12-28T13:00",
2027
end="2020-12-29T13:00",
21-
limit=2, # <-- limiting response to 2 records only (for test cases)
28+
limit=4, # <-- limiting response to 4 records only (for test cases)
2229
path=path,
2330
) # -> DBNStore
2431

17 Bytes
Binary file not shown.
22 Bytes
Binary file not shown.

tests/data/test_data.mbo.dbn.zst

50 Bytes
Binary file not shown.

tests/data/test_data.mbp-1.dbn.zst

71 Bytes
Binary file not shown.
55 Bytes
Binary file not shown.
0 Bytes
Binary file not shown.
68 Bytes
Binary file not shown.

0 commit comments

Comments
 (0)