Skip to content

Commit 8ec2ec2

Browse files
committed
ADD: Add Live client symbology mapping
1 parent 36f5e76 commit 8ec2ec2

File tree

7 files changed

+65
-16
lines changed

7 files changed

+65
-16
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
# Changelog
22

3+
## 0.15.0 - TBD
4+
5+
### Enhancements
6+
- Added `symbology_map` property to `Live` client
7+
38
## 0.14.1 - 2023-06-16
49

510
- Fixed issue where `DBNStore.to_df()` would raise an exception if no records were present

databento/live/client.py

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,8 @@ def __init__(
8888

8989
self._dbn_queue: DBNQueue = DBNQueue(maxsize=DEFAULT_QUEUE_SIZE)
9090
self._metadata: SessionMetadata = SessionMetadata()
91-
self._user_callbacks: list[UserCallback] = []
91+
self._symbology_map: dict[int, str | int] = {}
92+
self._user_callbacks: list[UserCallback] = [self._map_symbol]
9293
self._user_streams: list[IO[bytes]] = []
9394

9495
def factory() -> _SessionProtocol:
@@ -237,6 +238,23 @@ def port(self) -> int:
237238
"""
238239
return self._port
239240

241+
@property
242+
def symbology_map(self) -> dict[int, str | int]:
243+
"""
244+
Return the symbology map for this client session. A symbol mapping is
245+
added when the client receives a SymbolMappingMsg.
246+
247+
This can be used to transform an `instrument_id` in a DBN record
248+
to the input symbology.
249+
250+
Returns
251+
-------
252+
dict[int, str | int]
253+
A mapping of the exchange's instrument_id to the subscription symbology.
254+
255+
"""
256+
return self._symbology_map
257+
240258
@property
241259
def ts_out(self) -> bool:
242260
"""
@@ -548,3 +566,11 @@ async def _shutdown(self) -> None:
548566
if self._session is None:
549567
return
550568
await self._session.wait_for_close()
569+
self._symbology_map.clear()
570+
571+
def _map_symbol(self, record: DBNRecord) -> None:
572+
if isinstance(record, databento_dbn.SymbolMappingMsg):
573+
out_symbol = record.stype_out_symbol
574+
instrument_id = record.instrument_id
575+
self._symbology_map[instrument_id] = record.stype_out_symbol
576+
logger.info("added symbology mapping %s to %d", out_symbol, instrument_id)

databento/live/protocol.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -312,8 +312,22 @@ def _process_dbn(self, data: bytes) -> None:
312312
logger.debug("dispatching %s", type(record).__name__)
313313
if isinstance(record, databento_dbn.Metadata):
314314
self.received_metadata(record)
315-
else:
316-
self.received_record(record)
315+
continue
316+
317+
if isinstance(record, databento_dbn.ErrorMsg):
318+
logger.error(
319+
"gateway error: %s",
320+
record.err,
321+
)
322+
if isinstance(record, databento_dbn.SystemMsg):
323+
if record.is_heartbeat:
324+
logger.debug("gateway heartbeat")
325+
else:
326+
logger.info(
327+
"gateway message: %s",
328+
record.msg,
329+
)
330+
self.received_record(record)
317331

318332
def _process_gateway(self, data: bytes) -> None:
319333
try:

databento/live/session.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -470,15 +470,15 @@ async def _connect_task(
470470
),
471471
timeout=CONNECT_TIMEOUT_SECONDS,
472472
)
473-
except asyncio.TimeoutError:
473+
except asyncio.TimeoutError as exc:
474474
raise BentoError(
475475
f"Connection to {gateway}:{port} timed out after "
476476
f"{CONNECT_TIMEOUT_SECONDS} second(s).",
477-
)
478-
except OSError:
477+
) from exc
478+
except OSError as exc:
479479
raise BentoError(
480480
f"Connection to {gateway}:{port} failed.",
481-
)
481+
) from exc
482482

483483
logger.debug(
484484
"connected to %s:%d",
@@ -491,13 +491,13 @@ async def _connect_task(
491491
protocol.authenticated,
492492
timeout=AUTH_TIMEOUT_SECONDS,
493493
)
494-
except asyncio.TimeoutError:
494+
except asyncio.TimeoutError as exc:
495495
raise BentoError(
496496
f"Authentication with {gateway}:{port} timed out after "
497497
f"{AUTH_TIMEOUT_SECONDS} second(s).",
498-
)
498+
) from exc
499499
except ValueError as exc:
500-
raise BentoError(f"User authentication failed: {str(exc)}")
500+
raise BentoError(f"User authentication failed: {str(exc)}") from exc
501501

502502
logger.info(
503503
"authentication with remote gateway completed",

databento/version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = "0.14.1"
1+
__version__ = "0.15.0"

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tool.poetry]
22
name = "databento"
3-
version = "0.14.1"
3+
version = "0.15.0"
44
description = "Official Python client library for Databento"
55
authors = [
66
"Databento <[email protected]>",

tests/test_live_client.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -496,7 +496,7 @@ def callback(_: object) -> None:
496496
pass
497497

498498
live_client.add_callback(callback)
499-
assert live_client._user_callbacks == [callback]
499+
assert callback in live_client._user_callbacks
500500
assert live_client._user_streams == []
501501

502502

@@ -509,7 +509,6 @@ def test_live_add_stream(
509509
stream = BytesIO()
510510

511511
live_client.add_stream(stream)
512-
assert live_client._user_callbacks == []
513512
assert live_client._user_streams == [stream]
514513

515514

@@ -581,7 +580,9 @@ async def test_live_async_iteration_backpressure(
581580
symbols="TEST",
582581
)
583582

584-
monkeypatch.setattr(live_client._session._transport, "pause_reading", pause_mock:=MagicMock())
583+
monkeypatch.setattr(
584+
live_client._session._transport, "pause_reading", pause_mock := MagicMock(),
585+
)
585586

586587
live_client.start()
587588
it = live_client.__iter__()
@@ -618,7 +619,9 @@ async def test_live_async_iteration_dropped(
618619
symbols="TEST",
619620
)
620621

621-
monkeypatch.setattr(live_client._session._transport, "pause_reading", pause_mock:=MagicMock())
622+
monkeypatch.setattr(
623+
live_client._session._transport, "pause_reading", pause_mock := MagicMock(),
624+
)
622625

623626
live_client.start()
624627
it = live_client.__iter__()
@@ -630,6 +633,7 @@ async def test_live_async_iteration_dropped(
630633
assert len(records) == 1
631634
assert live_client._dbn_queue.empty()
632635

636+
633637
@pytest.mark.skipif(platform.system() == "Windows", reason="flaky on windows runner")
634638
async def test_live_async_iteration_stop(
635639
live_client: client.Live,

0 commit comments

Comments
 (0)