Skip to content

Commit 84186ad

Browse files
committed
MOD: Decode Live client DBN streams before write
1 parent dc652df commit 84186ad

File tree

66 files changed

+65
-38
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

66 files changed

+65
-38
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions

databento/live/session.py

Lines changed: 56 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,14 @@
44
import dataclasses
55
import logging
66
import queue
7+
import struct
78
import threading
89
from collections.abc import Iterable
910
from typing import IO
1011
from typing import Callable
1112
from typing import Final
1213

1314
import databento_dbn
14-
from databento_dbn import Metadata
1515
from databento_dbn import Schema
1616
from databento_dbn import SType
1717

@@ -196,33 +196,37 @@ def __init__(
196196
self._user_callbacks = user_callbacks
197197
self._user_streams = user_streams
198198

199-
def _process_dbn(self, data: bytes) -> None:
200-
start_index = 0
201-
if data.startswith(b"DBN") and self._metadata:
202-
# We have already received metata for the stream
203-
# Set start index to metadata length
204-
start_index = int.from_bytes(data[4:8], byteorder="little") + 8
205-
self._metadata.check(Metadata.decode(bytes(data[:start_index])))
206-
for stream, exc_callback in self._user_streams.items():
207-
try:
208-
stream.write(data[start_index:])
209-
except Exception as exc:
210-
stream_name = getattr(stream, "name", str(stream))
211-
logger.error(
212-
"error writing %d bytes to `%s` stream",
213-
len(data[start_index:]),
214-
stream_name,
215-
exc_info=exc,
216-
)
217-
if exc_callback is not None:
218-
exc_callback(exc)
219-
return super()._process_dbn(data)
220-
221199
def received_metadata(self, metadata: databento_dbn.Metadata) -> None:
222-
self._metadata.data = metadata
200+
if self._metadata:
201+
self._metadata.check(metadata)
202+
else:
203+
metadata_bytes = metadata.encode()
204+
for stream, exc_callback in self._user_streams.items():
205+
try:
206+
stream.write(metadata_bytes)
207+
except Exception as exc:
208+
stream_name = getattr(stream, "name", str(stream))
209+
logger.error(
210+
"error writing %d bytes to `%s` stream",
211+
len(metadata_bytes),
212+
stream_name,
213+
exc_info=exc,
214+
)
215+
if exc_callback is not None:
216+
exc_callback(exc)
217+
218+
self._metadata.data = metadata
223219
return super().received_metadata(metadata)
224220

225221
def received_record(self, record: DBNRecord) -> None:
222+
self._dispatch_writes(record)
223+
self._dispatch_callbacks(record)
224+
if self._dbn_queue.is_enabled():
225+
self._queue_for_iteration(record)
226+
227+
return super().received_record(record)
228+
229+
def _dispatch_callbacks(self, record: DBNRecord) -> None:
226230
for callback, exc_callback in self._user_callbacks.items():
227231
try:
228232
callback(record)
@@ -236,18 +240,37 @@ def received_record(self, record: DBNRecord) -> None:
236240
if exc_callback is not None:
237241
exc_callback(exc)
238242

239-
if self._dbn_queue.is_enabled():
240-
self._dbn_queue.put(record)
243+
def _dispatch_writes(self, record: DBNRecord) -> None:
244+
if hasattr(record, "ts_out"):
245+
ts_out_bytes = struct.pack("Q", record.ts_out)
246+
else:
247+
ts_out_bytes = b""
241248

242-
# DBNQueue has no max size; so check if it's above capacity, and if so, pause reading
243-
if self._dbn_queue.is_full():
244-
logger.warning(
245-
"record queue is full; %d record(s) to be processed",
246-
self._dbn_queue.qsize(),
249+
record_bytes = bytes(record) + ts_out_bytes
250+
251+
for stream, exc_callback in self._user_streams.items():
252+
try:
253+
stream.write(record_bytes)
254+
except Exception as exc:
255+
stream_name = getattr(stream, "name", str(stream))
256+
logger.error(
257+
"error writing %d bytes to `%s` stream",
258+
len(record_bytes),
259+
stream_name,
260+
exc_info=exc,
247261
)
248-
self.transport.pause_reading()
262+
if exc_callback is not None:
263+
exc_callback(exc)
249264

250-
return super().received_record(record)
265+
def _queue_for_iteration(self, record: DBNRecord) -> None:
266+
self._dbn_queue.put(record)
267+
# DBNQueue has no max size; so check if it's above capacity, and if so, pause reading
268+
if self._dbn_queue.is_full():
269+
logger.warning(
270+
"record queue is full; %d record(s) to be processed",
271+
self._dbn_queue.qsize(),
272+
)
273+
self.transport.pause_reading()
251274

252275

253276
class Session:
-22 Bytes
Binary file not shown.
-9 Bytes
Binary file not shown.
-18 Bytes
Binary file not shown.
-20 Bytes
Binary file not shown.
-16 Bytes
Binary file not shown.
-14 Bytes
Binary file not shown.
-14 Bytes
Binary file not shown.
-3 Bytes
Binary file not shown.

0 commit comments

Comments
 (0)