Skip to content

Commit df9dd18

Browse files
committed
ADD: Add client support for is_last
1 parent 1ef8c34 commit df9dd18

File tree

5 files changed

+22
-5
lines changed

5 files changed

+22
-5
lines changed

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
# Changelog
22

3+
## 0.55.0 - TBD
4+
5+
#### Enhancements
6+
- Added `is_last` field to live subscription requests which will be used to improve
7+
the handling of split subscription requests
8+
39
## 0.54.0 - 2025-05-13
410

511
#### Enhancements

databento/live/gateway.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@ class SubscriptionRequest(GatewayControl):
133133
start: int | None = None
134134
snapshot: int = 0
135135
id: int | None = None
136+
is_last: int = 1
136137

137138

138139
@dataclasses.dataclass

databento/live/protocol.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,8 @@ def subscribe(
325325

326326
subscriptions: list[SubscriptionRequest] = []
327327
chunked_symbols = list(chunk(symbols_list, SYMBOL_LIST_BATCH_SIZE))
328-
for batch in chunked_symbols:
328+
last_chunk_idx = len(chunked_symbols) - 1
329+
for i, batch in enumerate(chunked_symbols):
329330
batch_str = ",".join(batch)
330331
message = SubscriptionRequest(
331332
schema=validate_enum(schema, Schema, "schema"),
@@ -334,6 +335,7 @@ def subscribe(
334335
start=optional_datetime_to_unix_nanoseconds(start),
335336
snapshot=int(snapshot),
336337
id=subscription_id,
338+
is_last=int(i == last_chunk_idx),
337339
)
338340
subscriptions.append(message)
339341

tests/test_live_client.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -581,10 +581,11 @@ async def test_live_subscribe_large_symbol_list(
581581
)
582582

583583
reconstructed: list[str] = []
584-
for _ in range(8):
584+
for i in range(8):
585585
message = await mock_live_server.wait_for_message_of_type(
586586
message_type=gateway.SubscriptionRequest,
587587
)
588+
assert int(message.is_last) == int(i == 7)
588589
reconstructed.extend(message.symbols.split(","))
589590

590591
# Assert

tests/test_live_gateway_messages.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -352,7 +352,11 @@ def test_parse_subscription_request(
352352
stype_in=SType.INSTRUMENT_ID,
353353
symbols="1234,5678,90",
354354
),
355-
b"schema=mbo|" b"stype_in=instrument_id|" b"symbols=1234,5678,90|" b"snapshot=0\n",
355+
b"schema=mbo|"
356+
b"stype_in=instrument_id|"
357+
b"symbols=1234,5678,90|"
358+
b"snapshot=0|"
359+
b"is_last=1\n",
356360
),
357361
pytest.param(
358362
SubscriptionRequest(
@@ -361,12 +365,14 @@ def test_parse_subscription_request(
361365
symbols="UNI,TTE,ST",
362366
start=1671717080706865759,
363367
snapshot=0,
368+
is_last=0,
364369
),
365370
b"schema=mbo|"
366371
b"stype_in=raw_symbol|"
367372
b"symbols=UNI,TTE,ST|"
368373
b"start=1671717080706865759|"
369-
b"snapshot=0\n",
374+
b"snapshot=0|"
375+
b"is_last=0\n",
370376
),
371377
pytest.param(
372378
SubscriptionRequest(
@@ -381,7 +387,8 @@ def test_parse_subscription_request(
381387
b"stype_in=instrument_id|"
382388
b"symbols=1234,5678,90|"
383389
b"snapshot=1|"
384-
b"id=5\n",
390+
b"id=5|"
391+
b"is_last=1\n",
385392
),
386393
],
387394
)

0 commit comments

Comments
 (0)