Skip to content

Commit d2c0981

Browse files
committed
FIX: Filter live data by RType in Python client
1 parent a23cc2f commit d2c0981

File tree

5 files changed

+185
-23
lines changed

5 files changed

+185
-23
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ This release adds support for DBN v2 as well as Python v3.12.
77
#### Enhancements
88
- Added support for Python 3.12
99
- Improved the performance for stream writes in the `Live` client
10+
- Improved the performance of `DBNStore.to_ndarray` and `DBNStore.to_df` for heterogeneous DBN data
1011
- Upgraded `databento-dbn` to 0.14.2
1112
- Added `databento.common.types` module to hold common type annotations
1213

databento/common/dbnstore.py

Lines changed: 28 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,12 @@
3232
from databento_dbn import InstrumentDefMsg
3333
from databento_dbn import InstrumentDefMsgV1
3434
from databento_dbn import Metadata
35+
from databento_dbn import RType
3536
from databento_dbn import Schema
3637
from databento_dbn import SType
3738
from databento_dbn import Transcoder
3839
from databento_dbn import VersionUpgradePolicy
40+
from pandas.io.common import os
3941

4042
from databento.common.constants import DEFINITION_TYPE_MAX_MAP
4143
from databento.common.constants import INT64_NULL
@@ -1082,11 +1084,14 @@ def to_ndarray(
10821084
raise ValueError("a schema must be specified for mixed DBN data")
10831085

10841086
schema_struct = self._schema_struct_map[schema]
1087+
schema_rtype = RType.from_schema(schema)
10851088
schema_dtype = schema_struct._dtypes
1086-
schema_filter = filter(lambda r: isinstance(r, schema_struct), self)
10871089

1090+
reader = self.reader
1091+
reader.seek(self._metadata_length)
10881092
ndarray_iter = NDArrayBytesIterator(
1089-
records=map(bytes, schema_filter),
1093+
stream=reader,
1094+
rtype=schema_rtype,
10901095
dtype=schema_dtype,
10911096
count=count,
10921097
)
@@ -1229,31 +1234,45 @@ class NDArrayBytesIterator(NDArrayIterator):
12291234

12301235
def __init__(
12311236
self,
1232-
records: Iterator[bytes],
1237+
stream: IO[bytes],
1238+
rtype: RType,
12331239
dtype: list[tuple[str, str]],
12341240
count: int | None,
12351241
):
1236-
self._records = records
1242+
self._stream = stream
1243+
self._rtype = rtype
12371244
self._dtype = dtype
12381245
self._count = count
12391246
self._first_next = True
12401247

12411248
def __iter__(self) -> NDArrayIterator:
12421249
return self
12431250

1251+
def __iter_rtype__(self) -> Generator[bytes, None, None]:
1252+
while header := self._stream.read(2):
1253+
length, rtype = header[:2]
1254+
read_size = length * 4 - 2
1255+
if rtype == self._rtype:
1256+
yield header + self._stream.read(read_size)
1257+
else:
1258+
self._stream.seek(read_size, os.SEEK_CUR)
1259+
return
1260+
12441261
def __next__(self) -> np.ndarray[Any, Any]:
12451262
record_bytes = BytesIO()
12461263
num_records = 0
1247-
for record in itertools.islice(self._records, self._count):
1264+
1265+
for record in itertools.islice(self.__iter_rtype__(), self._count):
12481266
num_records += 1
12491267
record_bytes.write(record)
12501268

1251-
if num_records == 0:
1252-
if self._first_next:
1269+
if self._first_next:
1270+
self._first_next = False
1271+
if num_records == 0:
12531272
return np.empty([0, 1], dtype=self._dtype)
1254-
raise StopIteration
12551273

1256-
self._first_next = False
1274+
if num_records == 0:
1275+
raise StopIteration
12571276

12581277
try:
12591278
return np.frombuffer(

tests/conftest.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,23 @@ def pytest_collection_modifyitems(
8989
item.add_marker(skip_release)
9090

9191

92+
@pytest.fixture(name="live_test_data_path")
93+
def fixture_live_test_data_path() -> pathlib.Path:
94+
"""
95+
Fixture to retrieve the live stub data path.
96+
97+
Returns
98+
-------
99+
pathlib.Path
100+
101+
See Also
102+
--------
103+
live_test_data
104+
105+
"""
106+
return TESTS_ROOT / "data" / "LIVE" / "test_data.live.dbn.zst"
107+
108+
92109
@pytest.fixture(name="test_data_path")
93110
def fixture_test_data_path() -> Callable[[Dataset, Schema], pathlib.Path]:
94111
"""
@@ -120,6 +137,25 @@ def func(dataset: Dataset, schema: Schema) -> pathlib.Path:
120137
return func
121138

122139

140+
@pytest.fixture(name="live_test_data")
141+
def fixture_live_test_data(
142+
live_test_data_path: pathlib.Path,
143+
) -> bytes:
144+
"""
145+
Fixture to retrieve stub test data.
146+
147+
Returns
148+
-------
149+
bytes
150+
151+
See Also
152+
--------
153+
live_test_data_path
154+
155+
"""
156+
return live_test_data_path.read_bytes()
157+
158+
123159
@pytest.fixture(name="test_data")
124160
def fixture_test_data(
125161
test_data_path: Callable[[Dataset, Schema], pathlib.Path],
1.05 KB
Binary file not shown.

tests/test_historical_bento.py

Lines changed: 120 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -937,6 +937,57 @@ def test_dbnstore_to_ndarray_with_count(
937937
assert np.array_equal(expected, np.concatenate(aggregator))
938938

939939

940+
@pytest.mark.parametrize(
941+
"schema",
942+
[
943+
Schema.MBO,
944+
Schema.MBP_1,
945+
Schema.MBP_10,
946+
Schema.TRADES,
947+
Schema.OHLCV_1S,
948+
Schema.OHLCV_1M,
949+
Schema.OHLCV_1H,
950+
Schema.OHLCV_1D,
951+
Schema.DEFINITION,
952+
Schema.STATISTICS,
953+
],
954+
)
955+
@pytest.mark.parametrize(
956+
"count",
957+
[
958+
1,
959+
2,
960+
3,
961+
],
962+
)
963+
def test_dbnstore_to_ndarray_with_count_live(
964+
schema: Schema,
965+
live_test_data: bytes,
966+
count: int,
967+
) -> None:
968+
"""
969+
Test that calling to_ndarray with count produces an identical result to
970+
without.
971+
"""
972+
# Arrange
973+
dbn_stub_data = zstandard.ZstdDecompressor().stream_reader(live_test_data).read()
974+
975+
# Act
976+
dbnstore = DBNStore.from_bytes(data=dbn_stub_data)
977+
978+
expected = dbnstore.to_ndarray(schema=schema)
979+
nd_iter = dbnstore.to_ndarray(schema=schema, count=count)
980+
981+
# Assert
982+
aggregator: list[np.ndarray[Any, Any]] = []
983+
984+
for batch in nd_iter:
985+
assert len(batch) <= count
986+
aggregator.append(batch)
987+
988+
assert np.array_equal(expected, np.concatenate(aggregator))
989+
990+
940991
@pytest.mark.parametrize(
941992
"schema",
942993
[pytest.param(schema, id=str(schema)) for schema in Schema.variants()],
@@ -993,6 +1044,38 @@ def test_dbnstore_to_ndarray_with_count_empty(
9931044
assert len(next(nd_iter)) == 0
9941045

9951046

1047+
@pytest.mark.parametrize(
1048+
"schema, expected_count",
1049+
[
1050+
(Schema.MBO, 5),
1051+
(Schema.MBP_1, 2),
1052+
(Schema.MBP_10, 2),
1053+
(Schema.TRADES, 2),
1054+
(Schema.OHLCV_1S, 2),
1055+
(Schema.OHLCV_1M, 2),
1056+
(Schema.OHLCV_1H, 0),
1057+
(Schema.OHLCV_1D, 0),
1058+
(Schema.DEFINITION, 2),
1059+
(Schema.STATISTICS, 9),
1060+
],
1061+
)
1062+
def test_dbnstore_to_ndarray_with_schema_live(
1063+
live_test_data: bytes,
1064+
schema: Schema,
1065+
expected_count: int,
1066+
) -> None:
1067+
# Arrange
1068+
dbn_stub_data = zstandard.ZstdDecompressor().stream_reader(live_test_data).read()
1069+
1070+
# Act
1071+
dbnstore = DBNStore.from_bytes(data=dbn_stub_data)
1072+
1073+
array = dbnstore.to_ndarray(schema=schema)
1074+
1075+
# Assert
1076+
assert len(array) == expected_count
1077+
1078+
9961079
def test_dbnstore_to_ndarray_with_schema_empty(
9971080
test_data: Callable[[Dataset, Schema], bytes],
9981081
) -> None:
@@ -1016,6 +1099,23 @@ def test_dbnstore_to_ndarray_with_schema_empty(
10161099
assert len(array) == 0
10171100

10181101

1102+
def test_dbnstore_to_ndarray_with_schema_empty_live(
1103+
live_test_data: bytes,
1104+
) -> None:
1105+
"""
1106+
Test that a schema must be specified for live data.
1107+
"""
1108+
# Arrange
1109+
dbn_stub_data = zstandard.ZstdDecompressor().stream_reader(live_test_data).read()
1110+
1111+
# Act
1112+
dbnstore = DBNStore.from_bytes(data=dbn_stub_data)
1113+
1114+
# Assert
1115+
with pytest.raises(ValueError):
1116+
dbnstore.to_ndarray()
1117+
1118+
10191119
@pytest.mark.parametrize(
10201120
"schema",
10211121
[pytest.param(schema, id=str(schema)) for schema in Schema.variants()],
@@ -1063,32 +1163,38 @@ def test_dbnstore_to_df_with_count(
10631163

10641164

10651165
@pytest.mark.parametrize(
1066-
"schema",
1067-
[pytest.param(schema, id=str(schema)) for schema in Schema.variants()],
1166+
"schema, expected_count",
1167+
[
1168+
(Schema.MBO, 5),
1169+
(Schema.MBP_1, 2),
1170+
(Schema.MBP_10, 2),
1171+
(Schema.TRADES, 2),
1172+
(Schema.OHLCV_1S, 2),
1173+
(Schema.OHLCV_1M, 2),
1174+
(Schema.OHLCV_1H, 0),
1175+
(Schema.OHLCV_1D, 0),
1176+
(Schema.DEFINITION, 2),
1177+
(Schema.STATISTICS, 9),
1178+
],
10681179
)
1069-
def test_dbnstore_to_df_with_schema(
1180+
def test_dbnstore_to_df_with_schema_live(
10701181
schema: Schema,
1071-
test_data: Callable[[Dataset, Schema], bytes],
1182+
live_test_data: bytes,
1183+
expected_count: int,
10721184
) -> None:
10731185
"""
1074-
Test that calling to_df with schema produces an identical result to
1075-
without.
1186+
Test that calling to_df with schema produces a DataFrame for live data.
10761187
"""
10771188
# Arrange
1078-
dbn_stub_data = (
1079-
zstandard.ZstdDecompressor()
1080-
.stream_reader(test_data(Dataset.GLBX_MDP3, schema))
1081-
.read()
1082-
)
1189+
dbn_stub_data = zstandard.ZstdDecompressor().stream_reader(live_test_data).read()
10831190

10841191
# Act
10851192
dbnstore = DBNStore.from_bytes(data=dbn_stub_data)
10861193

1087-
expected = dbnstore.to_df()
1088-
actual = dbnstore.to_df(schema=schema)
1194+
df = dbnstore.to_df(schema=schema)
10891195

10901196
# Assert
1091-
assert actual.equals(expected)
1197+
assert len(df) == expected_count
10921198

10931199

10941200
def test_dbnstore_to_df_with_schema_empty(

0 commit comments

Comments
 (0)