Skip to content

Commit 5dde5b8

Browse files
committed
ADD: Add configurable heartbeat support to clients
1 parent 5243ae8 commit 5dde5b8

File tree

6 files changed

+24
-4
lines changed

6 files changed

+24
-4
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
## 0.35.0 - TBD
44

55
#### Enhancements
6+
- Added optional `heartbeat_interval_s` parameter to `Live` client for configuring the
7+
interval at which the gateway will send heartbeat records
68
- Upgraded `databento-dbn` to 0.18.0
79

810
#### Breaking changes

databento/live/client.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,9 @@ class Live:
5050
ts_out: bool, default False
5151
If set, DBN records will be timestamped when they are sent by the
5252
gateway.
53+
heartbeat_interval_s: int, optional
54+
The interval in seconds at which the gateway will send heartbeat records if no
55+
other data records are sent.
5356
5457
"""
5558

@@ -66,6 +69,7 @@ def __init__(
6669
gateway: str | None = None,
6770
port: int = DEFAULT_REMOTE_PORT,
6871
ts_out: bool = False,
72+
heartbeat_interval_s: int | None = None,
6973
) -> None:
7074
if key is None:
7175
key = os.environ.get("DATABENTO_API_KEY")
@@ -83,6 +87,7 @@ def __init__(
8387

8488
self._dataset: Dataset | str = ""
8589
self._ts_out = ts_out
90+
self._heartbeat_interval_s = heartbeat_interval_s
8691

8792
self._dbn_queue: DBNQueue = DBNQueue()
8893
self._metadata: SessionMetadata = SessionMetadata()
@@ -102,6 +107,7 @@ def factory() -> _SessionProtocol:
102107
loop=self._loop,
103108
metadata=self._metadata,
104109
ts_out=self._ts_out,
110+
heartbeat_interval_s=self._heartbeat_interval_s,
105111
)
106112

107113
self._session: Session = Session(

databento/live/gateway.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ class AuthenticationRequest(GatewayControl):
108108
encoding: Encoding = Encoding.DBN
109109
details: str | None = None
110110
ts_out: str = "0"
111+
heartbeat_interval_s: int | None = None
111112
client: str = USER_AGENT
112113

113114

databento/live/protocol.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,9 @@ class DatabentoLiveProtocol(asyncio.BufferedProtocol):
5555
The dataset for authentication.
5656
ts_out : bool, default False
5757
Flag for requesting `ts_out` to be appending to all records in the session.
58+
heartbeat_interval_s: int, optional
59+
The interval in seconds at which the gateway will send heartbeat records if no
60+
other data records are sent.
5861
5962
See Also
6063
--------
@@ -67,13 +70,15 @@ def __init__(
6770
api_key: str,
6871
dataset: Dataset | str,
6972
ts_out: bool = False,
73+
heartbeat_interval_s: int | None = None,
7074
) -> None:
7175
self.__api_key = api_key
7276
self.__transport: asyncio.Transport | None = None
7377
self.__buffer: bytearray = bytearray(RECV_BUFFER_SIZE)
7478

7579
self._dataset = validate_semantic_string(dataset, "dataset")
7680
self._ts_out = ts_out
81+
self._heartbeat_interval_s = heartbeat_interval_s
7782

7883
self._dbn_decoder = databento_dbn.DBNDecoder(
7984
upgrade_policy=VersionUpgradePolicy.UPGRADE,
@@ -392,6 +397,7 @@ def _(self, message: ChallengeRequest) -> None:
392397
auth=response,
393398
dataset=self._dataset,
394399
ts_out=str(int(self._ts_out)),
400+
heartbeat_interval_s=self._heartbeat_interval_s,
395401
)
396402
logger.debug("sending CRAM challenge response: %s", str(auth_request).strip())
397403
self.transport.write(bytes(auth_request))

databento/live/session.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,7 @@ def __init__(
186186
loop: asyncio.AbstractEventLoop,
187187
metadata: SessionMetadata,
188188
ts_out: bool = False,
189+
heartbeat_interval_s: int | None = None,
189190
):
190191
super().__init__(api_key, dataset, ts_out)
191192

tests/test_live_gateway_messages.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,16 +30,17 @@
3030
[
3131
pytest.param(
3232
"auth=abcd1234|dataset=GLBX.MDP3|encoding=json\n",
33-
("abcd1234", "GLBX.MDP3", "json", None, "0"),
33+
("abcd1234", "GLBX.MDP3", "json", None, "0", None),
3434
),
3535
pytest.param(
36-
"auth=abcd1234|dataset=GLBX.MDP3|ts_out=1\n",
36+
"auth=abcd1234|dataset=GLBX.MDP3|heartbeat_interval_s=10|ts_out=1\n",
3737
(
3838
"abcd1234",
3939
"GLBX.MDP3",
4040
str(Encoding.DBN),
4141
None,
4242
"1",
43+
"10",
4344
),
4445
),
4546
pytest.param(
@@ -50,10 +51,11 @@
5051
str(Encoding.DBN),
5152
None,
5253
"0",
54+
None,
5355
),
5456
),
5557
pytest.param(
56-
"auth=abcd1234|dataset=GLBX.MDP3|ts_out=1|encoding=csv|extra=key\n",
58+
"auth=abcd1234|dataset=GLBX.MDP3|ts_out=1|encoding=csv|heartbeat_interval_s=5|extra=key\n",
5759
ValueError,
5860
),
5961
],
@@ -74,6 +76,7 @@ def test_parse_authentication_request(
7476
msg.encoding,
7577
msg.details,
7678
msg.ts_out,
79+
msg.heartbeat_interval_s,
7780
) == expected
7881
else:
7982
with pytest.raises(expected):
@@ -97,8 +100,9 @@ def test_parse_authentication_request(
97100
dataset=Dataset.XNAS_ITCH,
98101
ts_out="1",
99102
client="unittest",
103+
heartbeat_interval_s=35,
100104
),
101-
b"auth=abcd1234|dataset=XNAS.ITCH|encoding=dbn|ts_out=1|client=unittest\n",
105+
b"auth=abcd1234|dataset=XNAS.ITCH|encoding=dbn|ts_out=1|heartbeat_interval_s=35|client=unittest\n",
102106
),
103107
],
104108
)

0 commit comments

Comments
 (0)