Skip to content

Commit 9087235

Browse files
committed
MOD: Complete DBZ end-to-end integration
1 parent 2f494ce commit 9087235

18 files changed

+150
-254
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ The library is fully compatible with the latest distribution of Anaconda 3.7 and
3131
The minimum dependencies as found in the `requirements.txt` are also listed below:
3232
- Python (>=3.7)
3333
- aiohttp (>=3.7.2)
34+
- dbz-lib (>=0.1.1)
3435
- numpy (>=1.17.0)
3536
- pandas (>=1.1.3)
3637
- requests (>=2.24.0)

databento/common/bento.py

Lines changed: 78 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,7 @@
55
import numpy as np
66
import pandas as pd
77
import zstandard
8-
from databento.common.data import (
9-
CSV_HEADERS,
10-
DBZ_COLUMNS,
11-
DBZ_STRUCT_MAP,
12-
DERIV_SCHEMAS,
13-
)
8+
from databento.common.data import DBZ_COLUMNS, DBZ_STRUCT_MAP, DERIV_SCHEMAS
149
from databento.common.enums import Compression, Encoding, Schema, SType
1510
from databento.common.logging import log_debug
1611
from databento.common.metadata import MetadataDecoder
@@ -65,19 +60,18 @@ def source_metadata(self) -> Dict[str, Any]:
6560
"""
6661
log_debug("Decoding metadata...")
6762
metadata_initial: bytes = self.reader().read(8)
68-
69-
if not metadata_initial.startswith(b"Q*M\x18"):
70-
return {}
71-
7263
magic_bin = metadata_initial[:4]
7364
frame_size_bin = metadata_initial[4:]
7465

66+
if not metadata_initial.startswith(b"P*M\x18"):
67+
return {}
68+
7569
metadata_magic = int.from_bytes(bytes=magic_bin, byteorder="little")
7670
metadata_frame_size = int.from_bytes(bytes=frame_size_bin, byteorder="little")
7771
log_debug(f"magic={metadata_magic}, frame_size={metadata_frame_size}")
7872

7973
metadata_raw = self.reader().read(8 + metadata_frame_size)
80-
return MetadataDecoder.decode_to_json(metadata_raw[8:])
74+
return MetadataDecoder.decode_to_json(metadata_raw)
8175

8276
def set_metadata(self, metadata: Dict[str, Any]) -> None:
8377
"""
@@ -322,22 +316,6 @@ def limit(self) -> Optional[int]:
322316

323317
return self._limit
324318

325-
@property
326-
def encoding(self) -> Encoding:
327-
"""
328-
Return the data encoding.
329-
330-
Returns
331-
-------
332-
Encoding
333-
334-
"""
335-
if self._encoding is None:
336-
self._check_metadata()
337-
self._encoding = Encoding(self._metadata["encoding"])
338-
339-
return self._encoding
340-
341319
@property
342320
def compression(self) -> Compression:
343321
"""
@@ -367,13 +345,9 @@ def shape(self) -> Tuple:
367345
"""
368346
if self._shape is None:
369347
self._check_metadata()
370-
if self.encoding == Encoding.DBZ:
371-
ncols = len(DBZ_STRUCT_MAP[self.schema])
372-
else:
373-
ncols = len(CSV_HEADERS[self.schema])
374348
self._shape = (
375349
self._metadata["record_count"],
376-
ncols,
350+
len(DBZ_STRUCT_MAP[self.schema]),
377351
)
378352

379353
return self._shape
@@ -395,10 +369,7 @@ def mappings(self) -> List[Dict[str, List[Dict[str, str]]]]:
395369
@property
396370
def symbology(self) -> Dict[str, Any]:
397371
"""
398-
Return the symbology resolution response information for the query.
399-
400-
This JSON representable object should exactly match a `symbology.resolve`
401-
request using the same query parameters.
372+
Return the symbology resolution information for the query.
402373
403374
Returns
404375
-------
@@ -407,10 +378,12 @@ def symbology(self) -> Dict[str, Any]:
407378
"""
408379
self._check_metadata()
409380

410-
status = self._metadata["status"]
411-
if status == 1:
381+
status = 0
382+
if self._metadata["partial"]:
383+
status = 1
412384
message = "Partially resolved"
413-
elif status == 2:
385+
elif self._metadata["not_found"]:
386+
status = 2
414387
message = "Not found"
415388
else:
416389
message = "OK"
@@ -603,6 +576,72 @@ def to_json(self, path: str) -> None:
603576
"""
604577
self.to_df().to_json(path, orient="records", lines=True)
605578

579+
def request_symbology(self, client) -> Dict[str, Dict[str, Any]]:
580+
"""
581+
Request symbology resolution based on the metadata properties.
582+
583+
Makes a `GET /symbology.resolve` HTTP request.
584+
585+
Current symbology mappings from the metadata are also available by
586+
calling the `.symbology` or `.mappings` properties.
587+
588+
Parameters
589+
----------
590+
client : Historical
591+
The historical client to use for the request.
592+
593+
Returns
594+
-------
595+
Dict[str, Dict[str, Any]]
596+
A map of input symbol to output symbol across the date range.
597+
598+
"""
599+
return client.symbology.resolve(
600+
dataset=self.dataset,
601+
symbols=self.symbols,
602+
stype_in=self.stype_in,
603+
stype_out=self.stype_out,
604+
start_date=self.start.date(),
605+
end_date=self.end.date(),
606+
)
607+
608+
def request_full_definitions(
609+
self,
610+
client,
611+
path: Optional[str] = None,
612+
) -> "Bento":
613+
"""
614+
Request full instrument definitions based on the metadata properties.
615+
616+
Makes a `GET /timeseries.stream` HTTP request.
617+
618+
Parameters
619+
----------
620+
client : Historical
621+
The historical client to use for the request.
622+
path : str, optional
623+
The file path to write to on disk (if provided).
624+
625+
Returns
626+
-------
627+
Bento
628+
629+
Warnings
630+
--------
631+
Calling this method will incur a cost.
632+
633+
"""
634+
return client.timeseries.stream(
635+
dataset=self.dataset,
636+
symbols=self.symbols,
637+
schema=Schema.DEFINITION,
638+
start=self.start,
639+
end=self.end,
640+
stype_in=self.stype_in,
641+
stype_out=self.stype_out,
642+
path=path,
643+
)
644+
606645

607646
class MemoryBento(Bento):
608647
"""

databento/common/data.py

Lines changed: 0 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -233,45 +233,3 @@ def get_deriv_ba_fields(level: int) -> List[str]:
233233
Schema.TBBO: DBZ_DERIV_HEADER_FIELDS + get_deriv_ba_fields(0),
234234
Schema.TRADES: DBZ_DERIV_HEADER_FIELDS,
235235
}
236-
237-
238-
################################################################################
239-
# CSV headers
240-
################################################################################
241-
242-
CSV_DERIV_HEADER = b"ts_recv,ts_event,ts_in_delta,publisher_id,product_id,action,side,flags,price,size,sequence" # noqa
243-
CSV_OHLCV_HEADER = b"ts_event,publisher_id,product_id,open,high,low,close,volume"
244-
245-
246-
CSV_HEADERS = {
247-
Schema.MBO: b"ts_recv,ts_event,ts_in_delta,publisher_id,product_id,order_id,action,side,flags,price,size,sequence", # noqa
248-
Schema.MBP_1: CSV_DERIV_HEADER + b"," + ",".join(get_deriv_ba_fields(0)).encode(),
249-
Schema.MBP_10: CSV_DERIV_HEADER
250-
+ b","
251-
+ ",".join(get_deriv_ba_fields(0)).encode()
252-
+ b","
253-
+ ",".join(get_deriv_ba_fields(1)).encode()
254-
+ b","
255-
+ ",".join(get_deriv_ba_fields(2)).encode()
256-
+ b","
257-
+ ",".join(get_deriv_ba_fields(3)).encode()
258-
+ b","
259-
+ ",".join(get_deriv_ba_fields(4)).encode()
260-
+ b","
261-
+ ",".join(get_deriv_ba_fields(5)).encode()
262-
+ b","
263-
+ ",".join(get_deriv_ba_fields(6)).encode()
264-
+ b","
265-
+ ",".join(get_deriv_ba_fields(7)).encode()
266-
+ b","
267-
+ ",".join(get_deriv_ba_fields(8)).encode()
268-
+ b","
269-
+ ",".join(get_deriv_ba_fields(9)).encode(),
270-
Schema.TBBO: CSV_DERIV_HEADER + b"," + ",".join(get_deriv_ba_fields(0)).encode(),
271-
Schema.TRADES: CSV_DERIV_HEADER,
272-
Schema.OHLCV_1S: CSV_OHLCV_HEADER,
273-
Schema.OHLCV_1M: CSV_OHLCV_HEADER,
274-
Schema.OHLCV_1H: CSV_OHLCV_HEADER,
275-
Schema.OHLCV_1D: CSV_OHLCV_HEADER,
276-
# TODO(cs) Complete headers
277-
}

databento/common/metadata.py

Lines changed: 21 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -1,91 +1,42 @@
1-
import json
2-
import struct
3-
from typing import Any, Dict, Optional
1+
from typing import Any
42

5-
import zstandard
6-
from databento.common.enums import Compression, Encoding, Schema, SType
7-
from databento.common.parsing import (
8-
int_to_compression,
9-
int_to_encoding,
10-
int_to_schema,
11-
int_to_stype,
12-
)
3+
from databento.common.parsing import int_to_compression, int_to_schema, int_to_stype
4+
from dbz_lib import decode_metadata
135

146

157
class MetadataDecoder:
168
"""
17-
Provides a decoder for Databento metadata headers.
18-
19-
References
20-
----------
21-
https://github.com/facebook/zstd/wiki
22-
https://github.com/facebook/zstd/blob/dev/doc/zstd_compression_format.md#skippable-frames
9+
Provides a decoder for DBZ metadata headers.
2310
"""
2411

25-
# 4 Bytes, little-endian ordering. Value : 0x184D2A5?, which means any value
26-
# from 0x184D2A50 to 0x184D2A5F. All 16 values are valid to identify a
27-
# skippable frame. This specification doesn't detail any specific tagging
28-
# for skippable frames.
29-
ZSTD_FIRST_MAGIC = 0x184D2A50 # 407710288
30-
METADATA_STRUCT_FMT = "<B16sBBBQQQBBQH40x"
31-
METADATA_STRUCT_SIZE = struct.calcsize(METADATA_STRUCT_FMT)
32-
3312
@staticmethod
34-
def decode_to_json(metadata: bytes) -> Dict[str, Any]:
13+
def decode_to_json(raw_metadata: bytes) -> dict[str, Any]:
3514
"""
3615
Decode the given metadata into a JSON object (as a Python dict).
3716
3817
Parameters
3918
----------
40-
metadata : bytes
19+
raw_metadata : bytes
4120
The metadata to decode.
4221
4322
Returns
4423
-------
45-
Dict[str, Any]
24+
dict[str, Any]
4625
4726
"""
48-
fixed_fmt: str = MetadataDecoder.METADATA_STRUCT_FMT
49-
fixed_buffer: bytes = metadata[: MetadataDecoder.METADATA_STRUCT_SIZE]
50-
fixed_values = struct.unpack(fixed_fmt, fixed_buffer)
51-
52-
# Decode fixed values
53-
version: int = fixed_values[0]
54-
dataset: str = fixed_values[1].replace(b"\x00", b"").decode("ascii")
55-
schema: Schema = int_to_schema(fixed_values[2])
56-
stype_in: SType = int_to_stype(fixed_values[3])
57-
stype_out: SType = int_to_stype(fixed_values[4])
58-
start: int = fixed_values[5] # UNIX nanoseconds
59-
end: int = fixed_values[6] # UNIX nanoseconds
60-
61-
limit_int: int = fixed_values[7]
62-
limit: Optional[int] = None if limit_int == 0 else limit_int
63-
64-
encoding: Encoding = int_to_encoding(fixed_values[8])
65-
compression: Compression = int_to_compression(fixed_values[9])
6627

67-
nrows: int = fixed_values[10]
68-
ncols: int = fixed_values[11]
69-
70-
var_buffer: bytes = metadata[MetadataDecoder.METADATA_STRUCT_SIZE :]
71-
var_decompressed: bytes = zstandard.decompress(var_buffer)
72-
var_json: Dict[str, Any] = json.loads(var_decompressed)
73-
74-
json_obj = {
75-
"version": version,
76-
"dataset": dataset,
77-
"schema": schema.value,
78-
"stype_in": stype_in.value,
79-
"stype_out": stype_out.value,
80-
"start": start,
81-
"end": end,
82-
"limit": limit,
83-
"encoding": encoding.value,
84-
"compression": compression.value,
85-
"nrows": nrows,
86-
"ncols": ncols,
28+
def enum_value(fn):
29+
return lambda x: fn(x).value
30+
31+
metadata = decode_metadata(raw_metadata)
32+
conversion_mapping = {
33+
"compression": enum_value(int_to_compression),
34+
"limit": lambda lim: None if lim == 0 else lim,
35+
"mappings": lambda m: {i["native"]: i["intervals"] for i in m},
36+
"schema": enum_value(int_to_schema),
37+
"stype_in": enum_value(int_to_stype),
38+
"stype_out": enum_value(int_to_stype),
8739
}
88-
89-
json_obj.update(var_json)
90-
91-
return json_obj
40+
for key, conv_fn in conversion_mapping.items():
41+
metadata[key] = conv_fn(metadata[key])
42+
return metadata

databento/historical/api/timeseries.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
import pandas as pd
66
from databento.common.bento import Bento
7-
from databento.common.enums import Compression, Dataset, Encoding, Schema, SType
7+
from databento.common.enums import Dataset, Encoding, Schema, SType
88
from databento.common.validation import validate_enum
99
from databento.historical.api import API_VERSION
1010
from databento.historical.http import BentoHttpAPI
@@ -191,7 +191,6 @@ async def stream_async(
191191
)
192192

193193
params.append(("encoding", Encoding.DBZ.value)) # Always requests DBZ
194-
params.append(("compression", Compression.ZSTD.value)) # Always requests ZSTD
195194

196195
self._pre_check_data_size(
197196
symbols=symbols,

0 commit comments

Comments
 (0)