Skip to content

Commit 7df173d

Browse files
committed
MOD: Change struct field names and exposed data columns
1 parent b402617 commit 7df173d

File tree

4 files changed

+108
-56
lines changed

4 files changed

+108
-56
lines changed

databento/common/bento.py

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import numpy as np
77
import pandas as pd
88
import zstandard
9-
from databento.common.data import DBZ_COLUMNS, DBZ_STRUCT_MAP, DERIV_SCHEMAS
9+
from databento.common.data import COLUMNS, DERIV_SCHEMAS, STRUCT_MAP
1010
from databento.common.enums import Compression, Encoding, Schema, SType
1111
from databento.common.logging import log_debug
1212
from databento.common.metadata import MetadataDecoder
@@ -159,7 +159,7 @@ def dtype(self) -> np.dtype:
159159
"""
160160
if self._dtype is None:
161161
self._check_metadata()
162-
self._dtype = np.dtype(DBZ_STRUCT_MAP[self.schema])
162+
self._dtype = np.dtype(STRUCT_MAP[self.schema])
163163

164164
return self._dtype
165165

@@ -404,7 +404,7 @@ def to_ndarray(self) -> np.ndarray:
404404
405405
"""
406406
data: bytes = self.reader(decompress=True).read()
407-
return np.frombuffer(data, dtype=DBZ_STRUCT_MAP[self.schema])
407+
return np.frombuffer(data, dtype=STRUCT_MAP[self.schema])
408408

409409
def to_df(
410410
self,
@@ -437,20 +437,12 @@ def to_df(
437437
df.set_index(self._get_index_column(), inplace=True)
438438

439439
# Cleanup dataframe
440-
if self.schema == Schema.MBO:
441-
df.drop("channel_id", axis=1, inplace=True)
442-
df = df.reindex(columns=DBZ_COLUMNS[self.schema])
440+
df.drop(["length", "rtype"], axis=1, inplace=True)
441+
if self.schema == Schema.MBO or self.schema in DERIV_SCHEMAS:
442+
df = df.reindex(columns=COLUMNS[self.schema])
443443
df["flags"] = df["flags"] & 0xFF # Apply bitmask
444444
df["side"] = df["side"].str.decode("utf-8")
445445
df["action"] = df["action"].str.decode("utf-8")
446-
elif self.schema in DERIV_SCHEMAS:
447-
df.drop(["nwords", "type", "depth"], axis=1, inplace=True)
448-
df = df.reindex(columns=DBZ_COLUMNS[self.schema])
449-
df["flags"] = df["flags"] & 0xFF # Apply bitmask
450-
df["side"] = df["side"].str.decode("utf-8")
451-
df["action"] = df["action"].str.decode("utf-8")
452-
else:
453-
df.drop(["nwords", "type"], axis=1, inplace=True)
454446

455447
if pretty_ts:
456448
df.index = pd.to_datetime(df.index, utc=True)
@@ -493,7 +485,7 @@ def replay(self, callback: Callable[[Any], None]) -> None:
493485
The callback to the data handler.
494486
495487
"""
496-
dtype = DBZ_STRUCT_MAP[self.schema]
488+
dtype = STRUCT_MAP[self.schema]
497489
reader: BinaryIO = self.reader(decompress=True)
498490
while True:
499491
raw: bytes = reader.read(self.struct_size)

databento/common/data.py

Lines changed: 27 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -36,16 +36,16 @@ def get_deriv_ba_types(level: int) -> List[Tuple[str, Union[type, str]]]:
3636
)
3737

3838

39-
DBZ_COMMON_HEADER: List[Tuple[str, Union[type, str]]] = [
40-
("nwords", np.uint8),
41-
("type", np.uint8),
39+
RECORD_HEADER: List[Tuple[str, Union[type, str]]] = [
40+
("length", np.uint8),
41+
("rtype", np.uint8),
4242
("publisher_id", np.uint16),
4343
("product_id", np.uint32),
4444
("ts_event", np.uint64),
4545
]
4646

4747

48-
DBZ_MBP_MSG: List[Tuple[str, Union[type, str]]] = [
48+
MBP_MSG: List[Tuple[str, Union[type, str]]] = [
4949
("price", np.int64),
5050
("size", np.uint32),
5151
("action", "S1"), # 1 byte chararray
@@ -58,7 +58,7 @@ def get_deriv_ba_types(level: int) -> List[Tuple[str, Union[type, str]]]:
5858
]
5959

6060

61-
DBZ_OHLCV_MSG: List[Tuple[str, Union[type, str]]] = [
61+
OHLCV_MSG: List[Tuple[str, Union[type, str]]] = [
6262
("open", np.int64),
6363
("high", np.int64),
6464
("low", np.int64),
@@ -67,8 +67,8 @@ def get_deriv_ba_types(level: int) -> List[Tuple[str, Union[type, str]]]:
6767
]
6868

6969

70-
DBZ_STRUCT_MAP: Dict[Schema, List[Tuple[str, Union[type, str]]]] = {
71-
Schema.MBO: DBZ_COMMON_HEADER
70+
STRUCT_MAP: Dict[Schema, List[Tuple[str, Union[type, str]]]] = {
71+
Schema.MBO: RECORD_HEADER
7272
+ [
7373
("order_id", np.uint64),
7474
("price", np.int64),
@@ -81,9 +81,9 @@ def get_deriv_ba_types(level: int) -> List[Tuple[str, Union[type, str]]]:
8181
("ts_in_delta", np.int32),
8282
("sequence", np.uint32),
8383
],
84-
Schema.MBP_1: DBZ_COMMON_HEADER + DBZ_MBP_MSG + get_deriv_ba_types(0), # 1
85-
Schema.MBP_10: DBZ_COMMON_HEADER
86-
+ DBZ_MBP_MSG
84+
Schema.MBP_1: RECORD_HEADER + MBP_MSG + get_deriv_ba_types(0), # 1
85+
Schema.MBP_10: RECORD_HEADER
86+
+ MBP_MSG
8787
+ get_deriv_ba_types(0) # 1
8888
+ get_deriv_ba_types(1) # 2
8989
+ get_deriv_ba_types(2) # 3
@@ -94,21 +94,21 @@ def get_deriv_ba_types(level: int) -> List[Tuple[str, Union[type, str]]]:
9494
+ get_deriv_ba_types(7) # 8
9595
+ get_deriv_ba_types(8) # 9
9696
+ get_deriv_ba_types(9), # 10
97-
Schema.TBBO: DBZ_COMMON_HEADER + DBZ_MBP_MSG + get_deriv_ba_types(0),
98-
Schema.TRADES: DBZ_COMMON_HEADER + DBZ_MBP_MSG,
99-
Schema.OHLCV_1S: DBZ_COMMON_HEADER + DBZ_OHLCV_MSG,
100-
Schema.OHLCV_1M: DBZ_COMMON_HEADER + DBZ_OHLCV_MSG,
101-
Schema.OHLCV_1H: DBZ_COMMON_HEADER + DBZ_OHLCV_MSG,
102-
Schema.OHLCV_1D: DBZ_COMMON_HEADER + DBZ_OHLCV_MSG,
103-
Schema.STATUS: DBZ_COMMON_HEADER
97+
Schema.TBBO: RECORD_HEADER + MBP_MSG + get_deriv_ba_types(0),
98+
Schema.TRADES: RECORD_HEADER + MBP_MSG,
99+
Schema.OHLCV_1S: RECORD_HEADER + OHLCV_MSG,
100+
Schema.OHLCV_1M: RECORD_HEADER + OHLCV_MSG,
101+
Schema.OHLCV_1H: RECORD_HEADER + OHLCV_MSG,
102+
Schema.OHLCV_1D: RECORD_HEADER + OHLCV_MSG,
103+
Schema.STATUS: RECORD_HEADER
104104
+ [
105105
("ts_recv", np.uint64),
106106
("group", "S1"), # 1 byte chararray
107107
("trading_status", np.uint8),
108108
("halt_reason", np.uint8),
109109
("trading_event", np.uint8),
110110
],
111-
Schema.DEFINITION: DBZ_COMMON_HEADER
111+
Schema.DEFINITION: RECORD_HEADER
112112
+ [
113113
("ts_recv", np.uint64),
114114
("min_price_increment", np.int64),
@@ -191,24 +191,27 @@ def get_deriv_ba_fields(level: int) -> List[str]:
191191
]
192192

193193

194-
DBZ_DERIV_HEADER_FIELDS = [
194+
DERIV_HEADER_FIELDS = [
195195
"ts_event",
196196
"ts_in_delta",
197197
"publisher_id",
198+
"channel_id",
198199
"product_id",
199200
"action",
200201
"side",
202+
"depth",
201203
"flags",
202204
"price",
203205
"size",
204206
"sequence",
205207
]
206208

207-
DBZ_COLUMNS = {
209+
COLUMNS = {
208210
Schema.MBO: [
209211
"ts_event",
210212
"ts_in_delta",
211213
"publisher_id",
214+
"channel_id",
212215
"product_id",
213216
"order_id",
214217
"action",
@@ -218,8 +221,8 @@ def get_deriv_ba_fields(level: int) -> List[str]:
218221
"size",
219222
"sequence",
220223
],
221-
Schema.MBP_1: DBZ_DERIV_HEADER_FIELDS + get_deriv_ba_fields(0),
222-
Schema.MBP_10: DBZ_DERIV_HEADER_FIELDS
224+
Schema.MBP_1: DERIV_HEADER_FIELDS + get_deriv_ba_fields(0),
225+
Schema.MBP_10: DERIV_HEADER_FIELDS
223226
+ get_deriv_ba_fields(0)
224227
+ get_deriv_ba_fields(1)
225228
+ get_deriv_ba_fields(2)
@@ -230,6 +233,6 @@ def get_deriv_ba_fields(level: int) -> List[str]:
230233
+ get_deriv_ba_fields(7)
231234
+ get_deriv_ba_fields(8)
232235
+ get_deriv_ba_fields(9),
233-
Schema.TBBO: DBZ_DERIV_HEADER_FIELDS + get_deriv_ba_fields(0),
234-
Schema.TRADES: DBZ_DERIV_HEADER_FIELDS,
236+
Schema.TBBO: DERIV_HEADER_FIELDS + get_deriv_ba_fields(0),
237+
Schema.TRADES: DERIV_HEADER_FIELDS,
235238
}

notebooks/quickstart.ipynb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -749,7 +749,7 @@
749749
{
750750
"data": {
751751
"text/plain": [
752-
"dtype([('nwords', 'u1'), ('type', 'u1'), ('publisher_id', '<u2'), ('product_id', '<u4'), ('ts_event', '<u8'), ('order_id', '<u8'), ('price', '<i8'), ('size', '<u4'), ('flags', 'i1'), ('channel_id', 'u1'), ('action', 'S1'), ('side', 'S1'), ('ts_recv', '<u8'), ('ts_in_delta', '<i4'), ('sequence', '<u4')])"
752+
"dtype([('length', 'u1'), ('rtype', 'u1'), ('publisher_id', '<u2'), ('product_id', '<u4'), ('ts_event', '<u8'), ('order_id', '<u8'), ('price', '<i8'), ('size', '<u4'), ('flags', 'i1'), ('channel_id', 'u1'), ('action', 'S1'), ('side', 'S1'), ('ts_recv', '<u8'), ('ts_in_delta', '<i4'), ('sequence', '<u4')])"
753753
]
754754
},
755755
"execution_count": 32,
@@ -1424,7 +1424,7 @@
14241424
" (14, 32, 1, 5482, 1609099225061045683, 647570749727, 321025000000000, 1, 0, 0, b'B', b'A', 1609099225250461359, 92701, 1098),\n",
14251425
" (14, 32, 1, 5482, 1609099225061045683, 647570749776, 320925000000000, 1, 0, 0, b'B', b'A', 1609099225250461359, 92701, 1098),\n",
14261426
" (14, 32, 1, 5482, 1609099225061045683, 647570749868, 320825000000000, 1, 0, 0, b'B', b'A', 1609099225250461359, 92701, 1098)],\n",
1427-
" dtype=[('nwords', 'u1'), ('type', 'u1'), ('publisher_id', '<u2'), ('product_id', '<u4'), ('ts_event', '<u8'), ('order_id', '<u8'), ('price', '<i8'), ('size', '<u4'), ('flags', 'i1'), ('channel_id', 'u1'), ('action', 'S1'), ('side', 'S1'), ('ts_recv', '<u8'), ('ts_in_delta', '<i4'), ('sequence', '<u4')])"
1427+
" dtype=[('length', 'u1'), ('rtype', 'u1'), ('publisher_id', '<u2'), ('product_id', '<u4'), ('ts_event', '<u8'), ('order_id', '<u8'), ('price', '<i8'), ('size', '<u4'), ('flags', 'i1'), ('channel_id', 'u1'), ('action', 'S1'), ('side', 'S1'), ('ts_recv', '<u8'), ('ts_in_delta', '<i4'), ('sequence', '<u4')])"
14281428
]
14291429
},
14301430
"execution_count": 31,

tests/test_historical_bento.py

Lines changed: 72 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -98,8 +98,8 @@ def test_bento_given_initial_nbytes_returns_expected_metadata(self) -> None:
9898
# Assert
9999
assert data.dtype == np.dtype(
100100
[
101-
("nwords", "u1"),
102-
("type", "u1"),
101+
("length", "u1"),
102+
("rtype", "u1"),
103103
("publisher_id", "<u2"),
104104
("product_id", "<u4"),
105105
("ts_event", "<u8"),
@@ -260,7 +260,7 @@ def test_to_df_with_mbo_data_returns_expected_record(self) -> None:
260260
assert df.iloc[0].action == "C"
261261
assert df.iloc[0].side == "A"
262262
assert df.iloc[0].price == 3722750000000
263-
assert df.iloc[0].size == 11
263+
assert df.iloc[0].size == 12
264264
assert df.iloc[0].sequence == 1170352
265265

266266
def test_to_df_with_stub_ohlcv_data_returns_expected_record(self) -> None:
@@ -380,7 +380,7 @@ def test_from_file_given_various_paths_returns_expected_metadata(
380380
assert data.schema == expected_schema
381381
assert data.compression == expected_compression
382382

383-
def test_to_csv_writes_expected_file_to_disk(self) -> None:
383+
def test_mbo_to_csv_writes_expected_file_to_disk(self) -> None:
384384
# Arrange
385385
test_data_path = get_test_data_path(schema=Schema.MBO)
386386
data = FileBento(path=test_data_path)
@@ -394,10 +394,11 @@ def test_to_csv_writes_expected_file_to_disk(self) -> None:
394394
written = open(path, mode="rb").read()
395395
assert os.path.isfile(path)
396396
expected = (
397-
b"ts_recv,ts_event,ts_in_delta,publisher_id,product_id,order_id,action,side,flags,pr" # noqa
398-
b"ice,size,sequence\n1609160400000704060,1609160400000429831,22993,1,5482,6" # noqa
399-
b"47784973705,C,A,128,3722750000000,1,1170352\n1609160400000711344,160916" # noqa
400-
b"0400000431665,19621,1,5482,647784973631,C,A,128,3723000000000,1,1170353\n" # noqa
397+
b"ts_recv,ts_event,ts_in_delta,publisher_id,channel_id,product_id,order_id,act" # noqa
398+
b"ion,side,flags,price,size,sequence\n1609160400000704060,16091604000004298" # noqa
399+
b"31,22993,1,0,5482,647784973705,C,A,128,3722750000000,1,1170352\n160916040" # noqa
400+
b"0000711344,1609160400000431665,19621,1,0,5482,647784973631,C,A,128,372300000" # noqa
401+
b"0000,1,1170353\n"
401402
)
402403
if sys.platform == "win32":
403404
expected = expected.replace(b"\n", b"\r\n")
@@ -406,7 +407,35 @@ def test_to_csv_writes_expected_file_to_disk(self) -> None:
406407
# Cleanup
407408
os.remove(path)
408409

409-
def test_to_json_writes_expected_file_to_disk(self) -> None:
410+
def test_mbp_1_to_csv_writes_expected_file_to_disk(self) -> None:
411+
# Arrange
412+
test_data_path = get_test_data_path(schema=Schema.MBP_1)
413+
data = FileBento(path=test_data_path)
414+
415+
path = "test.my_mbo.csv"
416+
417+
# Act
418+
data.to_csv(path)
419+
420+
# Assert
421+
written = open(path, mode="rb").read()
422+
assert os.path.isfile(path)
423+
expected = (
424+
b"ts_recv,ts_event,ts_in_delta,publisher_id,channel_id,product_id,action,side," # noqa
425+
b"depth,flags,price,size,sequence,bid_px_00,ask_px_00,bid_sz_00,ask_sz_00,bid_" # noqa
426+
b"oq_00,ask_oq_00\n1609160400006136329,1609160400006001487,17214,1,,5482,A," # noqa
427+
b"A,0,128,3720500000000,1,1170362,3720250000000,3720500000000,24,11,15,9\n1" # noqa
428+
b"609160400006246513,1609160400006146661,18858,1,,5482,A,A,0,128,3720500000000" # noqa
429+
b",1,1170364,3720250000000,3720500000000,24,12,15,10\n"
430+
)
431+
if sys.platform == "win32":
432+
expected = expected.replace(b"\n", b"\r\n")
433+
assert written == expected
434+
435+
# Cleanup
436+
os.remove(path)
437+
438+
def test_mbo_to_json_writes_expected_file_to_disk(self) -> None:
410439
# Arrange
411440
test_data_path = get_test_data_path(schema=Schema.MBO)
412441
data = FileBento(path=test_data_path)
@@ -420,12 +449,40 @@ def test_to_json_writes_expected_file_to_disk(self) -> None:
420449
written = open(path, mode="rb").read()
421450
assert os.path.isfile(path)
422451
assert written == (
423-
b'{"ts_event":1609160400000429831,"ts_in_delta":22993,"publisher_id":1,"product_id":' # noqa
424-
b'5482,"order_id":647784973705,"action":"C","side":"A","flags":128,"price":372' # noqa
425-
b'2750000000,"size":1,"sequence":1170352}\n{"ts_event":160916040000043166' # noqa
426-
b'5,"ts_in_delta":19621,"publisher_id":1,"product_id":5482,"order_id":647784973631,"' # noqa
427-
b'action":"C","side":"A","flags":128,"price":3723000000000,"size":1,"sequenc' # noqa
428-
b'e":1170353}\n'
452+
b'{"ts_event":1609160400000429831,"ts_in_delta":22993,"publisher_id":1,"channe' # noqa
453+
b'l_id":0,"product_id":5482,"order_id":647784973705,"action":"C","side":"A","f' # noqa
454+
b'lags":128,"price":3722750000000,"size":1,"sequence":1170352}\n{"ts_event"' # noqa
455+
b':1609160400000431665,"ts_in_delta":19621,"publisher_id":1,"channel_id":0,"pr' # noqa
456+
b'oduct_id":5482,"order_id":647784973631,"action":"C","side":"A","flags":128,"' # noqa
457+
b'price":3723000000000,"size":1,"sequence":1170353}\n'
458+
)
459+
460+
# Cleanup
461+
os.remove(path)
462+
463+
def test_mbp_1_to_json_writes_expected_file_to_disk(self) -> None:
464+
# Arrange
465+
test_data_path = get_test_data_path(schema=Schema.MBP_1)
466+
data = FileBento(path=test_data_path)
467+
468+
path = "test.my_mbo.json"
469+
470+
# Act
471+
data.to_json(path)
472+
473+
# Assert
474+
written = open(path, mode="rb").read()
475+
assert os.path.isfile(path)
476+
assert written == (
477+
b'{"ts_event":1609160400006001487,"ts_in_delta":17214,"publisher_id":1,"channe' # noqa
478+
b'l_id":null,"product_id":5482,"action":"A","side":"A","depth":0,"flags":128,"' # noqa
479+
b'price":3720500000000,"size":1,"sequence":1170362,"bid_px_00":3720250000000,"' # noqa
480+
b'ask_px_00":3720500000000,"bid_sz_00":24,"ask_sz_00":11,"bid_oq_00":15,"ask_o' # noqa
481+
b'q_00":9}\n{"ts_event":1609160400006146661,"ts_in_delta":18858,"publisher_' # noqa
482+
b'id":1,"channel_id":null,"product_id":5482,"action":"A","side":"A","depth":0,' # noqa
483+
b'"flags":128,"price":3720500000000,"size":1,"sequence":1170364,"bid_px_00":37' # noqa
484+
b'20250000000,"ask_px_00":3720500000000,"bid_sz_00":24,"ask_sz_00":12,"bid_oq_' # noqa
485+
b'00":15,"ask_oq_00":10}\n'
429486
)
430487

431488
# Cleanup

0 commit comments

Comments
 (0)