Skip to content

Commit 0d4a5d5

Browse files
committed
FIX: Fix DBNStore.to_df() when empty
1 parent a61dd1a commit 0d4a5d5

File tree

6 files changed

+69
-82
lines changed

6 files changed

+69
-82
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
# Changelog
22

3+
## 0.14.1 - TBD
4+
- Fixed issue where `DBNStore.to_df()` would raise an exception if no records were present
5+
36
## 0.14.0 - 2023-06-14
47
- Added support for reusing a `Live` client to reconnect
58
- Added `metadata` property to `Live`

databento/common/data.py

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -28,15 +28,6 @@ def get_deriv_ba_types(level: int) -> list[tuple[str, type | str]]:
2828
Schema.TRADES,
2929
)
3030

31-
32-
OHLCV_SCHEMAS = (
33-
Schema.OHLCV_1S,
34-
Schema.OHLCV_1M,
35-
Schema.OHLCV_1H,
36-
Schema.OHLCV_1D,
37-
)
38-
39-
4031
RECORD_HEADER: list[tuple[str, type | str]] = [
4132
("length", np.uint8),
4233
("rtype", np.uint8),
@@ -265,6 +256,7 @@ def get_deriv_ba_fields(level: int) -> list[str]:
265256

266257

267258
DERIV_HEADER_COLUMNS = [
259+
"ts_recv",
268260
"ts_event",
269261
"ts_in_delta",
270262
"publisher_id",
@@ -279,6 +271,7 @@ def get_deriv_ba_fields(level: int) -> list[str]:
279271
]
280272

281273
OHLCV_HEADER_COLUMNS = [
274+
"ts_event",
282275
"publisher_id",
283276
"instrument_id",
284277
"open",
@@ -289,7 +282,6 @@ def get_deriv_ba_fields(level: int) -> list[str]:
289282
]
290283

291284
DEFINITION_DROP_COLUMNS = [
292-
"ts_recv",
293285
"length",
294286
"rtype",
295287
"reserved1",
@@ -299,14 +291,12 @@ def get_deriv_ba_fields(level: int) -> list[str]:
299291
]
300292

301293
IMBALANCE_DROP_COLUMNS = [
302-
"ts_recv",
303294
"length",
304295
"rtype",
305296
"dummy",
306297
]
307298

308299
STATISTICS_DROP_COLUMNS = [
309-
"ts_recv",
310300
"length",
311301
"rtype",
312302
"dummy",
@@ -330,6 +320,7 @@ def get_deriv_ba_fields(level: int) -> list[str]:
330320

331321
COLUMNS = {
332322
Schema.MBO: [
323+
"ts_recv",
333324
"ts_event",
334325
"ts_in_delta",
335326
"publisher_id",

databento/common/dbnstore.py

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import abc
44
import datetime as dt
5+
import functools
56
import logging
67
from collections.abc import Generator
78
from io import BytesIO
@@ -453,10 +454,6 @@ def _prepare_dataframe(
453454
df: pd.DataFrame,
454455
schema: Schema,
455456
) -> pd.DataFrame:
456-
# Setup column ordering and index
457-
df.set_index(self._get_index_column(schema), inplace=True)
458-
df = df.reindex(columns=COLUMNS[schema])
459-
460457
if schema == Schema.MBO or schema in DERIV_SCHEMAS:
461458
df["flags"] = df["flags"] & 0xFF # Apply bitmask
462459
df["side"] = df["side"].str.decode("utf-8")
@@ -941,7 +938,12 @@ def to_df(
941938
raise ValueError("a schema must be specified for mixed DBN data")
942939
schema = self.schema
943940

944-
df = pd.DataFrame(self.to_ndarray(schema=schema))
941+
df = pd.DataFrame(
942+
self.to_ndarray(schema),
943+
columns=COLUMNS[schema],
944+
)
945+
df.set_index(self._get_index_column(schema), inplace=True)
946+
945947
df = self._prepare_dataframe(df, schema)
946948

947949
if pretty_ts:
@@ -1049,12 +1051,10 @@ def to_ndarray(
10491051
self,
10501052
)
10511053

1052-
result = []
1053-
for record in schema_records:
1054-
np_rec = np.frombuffer(
1055-
bytes(record),
1056-
dtype=STRUCT_MAP[schema],
1057-
)
1058-
result.append(np_rec[0])
1054+
decoder = functools.partial(np.frombuffer, dtype=STRUCT_MAP[schema])
1055+
result = tuple(map(decoder, map(bytes, schema_records)))
1056+
1057+
if not result:
1058+
return np.empty(shape=(0, 1), dtype=STRUCT_MAP[schema])
10591059

1060-
return np.asarray(result)
1060+
return np.ravel(result)

databento/version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = "0.14.0"
1+
__version__ = "0.14.1"

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tool.poetry]
22
name = "databento"
3-
version = "0.14.0"
3+
version = "0.14.1"
44
description = "Official Python client library for Databento"
55
authors = [
66
"Databento <[email protected]>",

tests/test_historical_bento.py

Lines changed: 48 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -413,17 +413,7 @@ def test_to_df_with_pretty_px_with_various_schemas_converts_prices_as_expected(
413413

414414
@pytest.mark.parametrize(
415415
"expected_schema",
416-
[
417-
Schema.MBO,
418-
Schema.MBP_1,
419-
Schema.MBP_10,
420-
Schema.TBBO,
421-
Schema.TRADES,
422-
Schema.OHLCV_1S,
423-
Schema.OHLCV_1M,
424-
Schema.OHLCV_1H,
425-
Schema.OHLCV_1D,
426-
],
416+
[pytest.param(schema, id=str(schema)) for schema in Schema],
427417
)
428418
def test_from_file_given_various_paths_returns_expected_metadata(
429419
test_data_path: Callable[[Schema], Path],
@@ -474,7 +464,7 @@ def test_mbo_to_csv_writes_expected_file_to_disk(
474464
written = open(path, mode="rb").read()
475465
assert path.exists()
476466
expected = (
477-
b"ts_recv,ts_event,ts_in_delta,publisher_id,channel_id,instrument_id,order_id,act" # noqa
467+
b"ts_recv,ts_event,ts_in_delta,publisher_id,channel_id,instrument_id,order_id,act" # noqa
478468
b"ion,side,flags,price,size,sequence\n1609160400000704060,16091604000004298" # noqa
479469
b"31,22993,1,0,5482,647784973705,C,A,128,3722750000000,1,1170352\n160916040" # noqa
480470
b"0000711344,1609160400000431665,19621,1,0,5482,647784973631,C,A,128,372300000" # noqa
@@ -718,16 +708,7 @@ def test_mbp_1_to_json_with_all_options_writes_expected_file_to_disk(
718708

719709
@pytest.mark.parametrize(
720710
"schema",
721-
[
722-
s
723-
for s in Schema
724-
if s
725-
not in (
726-
Schema.OHLCV_1H,
727-
Schema.OHLCV_1D,
728-
Schema.DEFINITION,
729-
)
730-
],
711+
[pytest.param(schema, id=str(schema)) for schema in Schema],
731712
)
732713
def test_dbnstore_repr(
733714
test_data: Callable[[Schema], bytes],
@@ -820,17 +801,7 @@ def test_dbnstore_iterable_parallel(
820801

821802
@pytest.mark.parametrize(
822803
"schema",
823-
[
824-
Schema.MBO,
825-
Schema.MBP_1,
826-
Schema.MBP_10,
827-
Schema.OHLCV_1D,
828-
Schema.OHLCV_1H,
829-
Schema.OHLCV_1M,
830-
Schema.OHLCV_1S,
831-
Schema.TBBO,
832-
Schema.TRADES,
833-
],
804+
[pytest.param(schema, id=str(schema)) for schema in Schema],
834805
)
835806
def test_dbnstore_compression_equality(
836807
test_data: Callable[[Schema], bytes],
@@ -923,17 +894,7 @@ def test_dbnstore_buffer_long(
923894

924895
@pytest.mark.parametrize(
925896
"schema",
926-
[
927-
Schema.MBO,
928-
Schema.MBP_1,
929-
Schema.MBP_10,
930-
Schema.OHLCV_1D,
931-
Schema.OHLCV_1H,
932-
Schema.OHLCV_1M,
933-
Schema.OHLCV_1S,
934-
Schema.TBBO,
935-
Schema.TRADES,
936-
],
897+
[pytest.param(schema, id=str(schema)) for schema in Schema],
937898
)
938899
def test_dbnstore_to_ndarray_with_schema(
939900
schema: Schema,
@@ -957,19 +918,30 @@ def test_dbnstore_to_ndarray_with_schema(
957918
assert row == expected[i]
958919

959920

921+
def test_dbnstore_to_ndarray_with_schema_empty(
922+
test_data: Callable[[Schema], bytes],
923+
) -> None:
924+
"""
925+
Test that calling to_ndarray on a DBNStore that contains no data of the
926+
specified schema returns an empty DataFrame.
927+
"""
928+
# Arrange
929+
dbn_stub_data = (
930+
zstandard.ZstdDecompressor().stream_reader(test_data(Schema.TRADES)).read()
931+
)
932+
933+
# Act
934+
dbnstore = DBNStore.from_bytes(data=dbn_stub_data)
935+
936+
array = dbnstore.to_ndarray(schema=Schema.MBO)
937+
938+
# Assert
939+
assert len(array) == 0
940+
941+
960942
@pytest.mark.parametrize(
961943
"schema",
962-
[
963-
Schema.MBO,
964-
Schema.MBP_1,
965-
Schema.MBP_10,
966-
Schema.OHLCV_1D,
967-
Schema.OHLCV_1H,
968-
Schema.OHLCV_1M,
969-
Schema.OHLCV_1S,
970-
Schema.TBBO,
971-
Schema.TRADES,
972-
],
944+
[pytest.param(schema, id=str(schema)) for schema in Schema],
973945
)
974946
def test_dbnstore_to_df_with_schema(
975947
schema: Schema,
@@ -990,3 +962,24 @@ def test_dbnstore_to_df_with_schema(
990962

991963
# Assert
992964
assert actual.equals(expected)
965+
966+
967+
def test_dbnstore_to_df_with_schema_empty(
968+
test_data: Callable[[Schema], bytes],
969+
) -> None:
970+
"""
971+
Test that calling to_df on a DBNStore that contains no data of the
972+
specified schema returns an empty DataFrame.
973+
"""
974+
# Arrange
975+
dbn_stub_data = (
976+
zstandard.ZstdDecompressor().stream_reader(test_data(Schema.TRADES)).read()
977+
)
978+
979+
# Act
980+
dbnstore = DBNStore.from_bytes(data=dbn_stub_data)
981+
982+
df = dbnstore.to_df(schema=Schema.MBO)
983+
984+
# Assert
985+
assert df.empty

0 commit comments

Comments
 (0)