Skip to content

Commit e244bd4

Browse files
authored
VER: Release 0.31.1
See release notes.
2 parents 5ce951f + 2e70139 commit e244bd4

File tree

10 files changed

+151
-80
lines changed

10 files changed

+151
-80
lines changed

.github/PULL_REQUEST_TEMPLATE.md

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
## Description
22

3-
Please include a summary of the change and which issue is fixed. Please also include relevant motivation and context. List any dependencies that are required for this change.
3+
Please include a summary of the change and which issue is fixed.
4+
Please also include relevant motivation and context.
5+
List any dependencies that are required for this change.
46

57
Fixes # (issue)
68

@@ -13,18 +15,24 @@ Please delete options that are not relevant.
1315
- [ ] Breaking change (fix or feature that would cause existing functionality to not work as expected)
1416
- [ ] This change requires a documentation update
1517

16-
### How Has This Been Tested?
18+
### How has this been tested?
1719

18-
Please describe the tests that you ran to verify your changes. Provide instructions so we can reproduce. Please also list any relevant details for your test configuration
20+
Please describe the tests that you ran to verify your changes.
21+
Provide instructions so we can reproduce.
22+
Please also list any relevant details for your test configuration.
1923

2024
- [ ] Test A
2125
- [ ] Test B
2226

27+
## Checklist
2328

24-
### Checklist:
25-
26-
- [ ] My code follows the style guidelines of this project
29+
- [ ] My code builds locally with no new warnings (`scripts/build.sh`)
30+
- [ ] My code follows the style guidelines (`scripts/lint.sh`)
31+
- [ ] New and existing unit tests pass locally with my changes (`scripts/test.sh`)
2732
- [ ] I have made corresponding changes to the documentation
28-
- [ ] My changes generate no new warnings
2933
- [ ] I have added tests that prove my fix is effective or that my feature works
30-
- [ ] New and existing unit tests pass locally with my changes
34+
35+
### Declaration
36+
37+
I confirm this contribution is made under an Apache 2.0 license and that I have the authority
38+
necessary to make this contribution on behalf of its copyright owner.

.github/workflows/test.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
name: test
22

33
on:
4+
pull_request:
45
push:
5-
branches: [main, dev]
66

77
jobs:
88
test:
@@ -30,13 +30,13 @@ jobs:
3030
shell: bash
3131

3232
- name: Test (release)
33-
timeout-minutes: 5
33+
timeout-minutes: 2
3434
if: ${{ github.ref == 'refs/heads/main' }}
3535
run: scripts/test.sh --release
3636
shell: bash
3737

3838
- name: Test
39-
timeout-minutes: 5
39+
timeout-minutes: 2
4040
if: ${{ github.ref != 'refs/heads/main' }}
4141
run: scripts/test.sh
4242
shell: bash

CHANGELOG.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,14 @@
11
# Changelog
22

3+
## 0.31.1 - 2024-03-20
4+
5+
#### Enhancements
6+
- Increase `Live` session connection and authentication timeouts
7+
- Added new `F_TOB` and `F_MAYBE_BAD_BOOK` variants to `RecordFlags`
8+
9+
#### Bug fixes
10+
- Fixed an issue where calling `Live.subscribe` from a `Live` client callback would cause a deadlock
11+
312
## 0.31.0 - 2024-03-05
413

514
#### Enhancements

CONTRIBUTING.md

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1-
We welcome feedback through discussions and issues on GitHub, however we don't currently accept pull requests due to the open-soruce repository being a downstream mirror of our internal Python client library codebase.
2-
3-
Please direct email feedback to [email protected] or [email protected].
1+
Thank you for taking the time to contribute to our project.
2+
We welcome feedback through discussions and issues on GitHub, as well as our [community Slack](https://databento.com/support).
3+
While we don't merge pull requests directly due to the open-source repository being a downstream
4+
mirror of our internal codebase, we can commit the changes upstream with the original author.

databento/common/enums.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,18 +199,24 @@ class RecordFlags(StringyMixin, IntFlag): # type: ignore
199199
200200
F_LAST
201201
Last message in the packet from the venue for a given `instrument_id`.
202+
F_TOB
203+
Indicates a top-of-book message, not an individual order.
202204
F_SNAPSHOT
203205
Message sourced from a replay, such as a snapshot server.
204206
F_MBP
205207
Aggregated price level message, not an individual order.
206208
F_BAD_TS_RECV
207209
The `ts_recv` value is inaccurate (clock issues or reordering).
210+
F_MAYBE_BAD_BOOK
211+
Indicates an unrecoverable gap was detected in the channel.
208212
209213
Other bits are reserved and have no current meaning.
210214
211215
"""
212216

213217
F_LAST = 128
218+
F_TOB = 64
214219
F_SNAPSHOT = 32
215220
F_MBP = 16
216221
F_BAD_TS_RECV = 8
222+
F_MAYBE_BAD_BOOK = 4

databento/live/session.py

Lines changed: 4 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@
2525

2626
logger = logging.getLogger(__name__)
2727

28-
AUTH_TIMEOUT_SECONDS: Final = 2.0
29-
CONNECT_TIMEOUT_SECONDS: Final = 5.0
28+
AUTH_TIMEOUT_SECONDS: Final = 30.0
29+
CONNECT_TIMEOUT_SECONDS: Final = 10.0
3030
DBN_QUEUE_CAPACITY: Final = 2**20
3131
DEFAULT_REMOTE_PORT: Final = 13000
3232

@@ -434,15 +434,12 @@ def subscribe(
434434
loop=self._loop,
435435
)
436436

437-
asyncio.run_coroutine_threadsafe(
438-
self._subscribe_task(
437+
self._protocol.subscribe(
439438
schema=schema,
440439
symbols=symbols,
441440
stype_in=stype_in,
442441
start=start,
443-
),
444-
loop=self._loop,
445-
).result()
442+
)
446443

447444
def resume_reading(self) -> None:
448445
"""
@@ -565,21 +562,3 @@ async def _connect_task(
565562
)
566563

567564
return transport, protocol
568-
569-
async def _subscribe_task(
570-
self,
571-
schema: Schema | str,
572-
symbols: Iterable[str | int] | str | int = ALL_SYMBOLS,
573-
stype_in: SType | str = SType.RAW_SYMBOL,
574-
start: str | int | None = None,
575-
) -> None:
576-
with self._lock:
577-
if self._protocol is None:
578-
return
579-
580-
self._protocol.subscribe(
581-
schema=schema,
582-
symbols=symbols,
583-
stype_in=stype_in,
584-
start=start,
585-
)

databento/version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = "0.31.0"
1+
__version__ = "0.31.1"

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tool.poetry]
22
name = "databento"
3-
version = "0.31.0"
3+
version = "0.31.1"
44
description = "Official Python client library for Databento"
55
authors = [
66
"Databento <[email protected]>",

tests/mock_live_server.py

Lines changed: 58 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
from functools import singledispatchmethod
1717
from io import BytesIO
1818
from os import PathLike
19+
from typing import Any
1920
from typing import Callable
2021
from typing import NewType
2122
from typing import TypeVar
@@ -32,9 +33,7 @@
3233
from databento.live.gateway import SessionStart
3334
from databento.live.gateway import SubscriptionRequest
3435
from databento.live.gateway import parse_gateway_message
35-
from databento_dbn import Metadata
3636
from databento_dbn import Schema
37-
from databento_dbn import SType
3837

3938

4039
LIVE_SERVER_VERSION: str = "1.0.0"
@@ -100,7 +99,8 @@ def __init__(
10099
self._version: str = version
101100
self._is_authenticated: bool = False
102101
self._is_streaming: bool = False
103-
self._repeater_tasks: set[asyncio.Task[None]] = set()
102+
self._subscriptions: list[SubscriptionRequest] = []
103+
self._tasks: set[asyncio.Task[None]] = set()
104104

105105
self._dbn_path = pathlib.Path(dbn_path)
106106
self._user_api_keys = user_api_keys
@@ -155,6 +155,18 @@ def is_streaming(self) -> bool:
155155
"""
156156
return self._is_streaming
157157

158+
@property
159+
def dataset_path(self) -> pathlib.Path:
160+
"""
161+
The path to the DBN files for serving.
162+
163+
Returns
164+
-------
165+
Path
166+
167+
"""
168+
return self._dbn_path / (self._dataset or "")
169+
158170
@property
159171
def mode(self) -> MockLiveMode:
160172
"""
@@ -205,6 +217,18 @@ def session_id(self) -> str:
205217
"""
206218
return str(hash(self))
207219

220+
@property
221+
def subscriptions(self) -> tuple[SubscriptionRequest, ...]:
222+
"""
223+
The received subscriptions.
224+
225+
Returns
226+
-------
227+
tuple[SubscriptionRequest, ...]
228+
229+
"""
230+
return tuple(self._subscriptions)
231+
208232
@property
209233
def version(self) -> str:
210234
"""
@@ -353,11 +377,11 @@ def _(self, message: AuthenticationRequest) -> None:
353377
logger.info("received CRAM response: %s", message.auth)
354378
if self.is_authenticated:
355379
logger.error("authentication request sent when already authenticated")
356-
self.__transport.close()
380+
self.__transport.write_eof()
357381
return
358382
if self.is_streaming:
359383
logger.error("authentication request sent while streaming")
360-
self.__transport.close()
384+
self.__transport.write_eof()
361385
return
362386

363387
_, bucket_id = message.auth.split("-")
@@ -400,54 +424,49 @@ def _(self, message: SubscriptionRequest) -> None:
400424
logger.info("received subscription request: %s", str(message).strip())
401425
if not self.is_authenticated:
402426
logger.error("subscription request sent while unauthenticated")
403-
self.__transport.close()
427+
self.__transport.write_eof()
404428

405-
if self.is_streaming:
406-
logger.error("subscription request sent while streaming")
407-
self.__transport.close()
429+
self._subscriptions.append(message)
408430

409-
self._schemas.append(Schema(message.schema))
431+
if self.is_streaming:
432+
self.create_server_task(message)
410433

411434
@handle_client_message.register(SessionStart)
412435
def _(self, message: SessionStart) -> None:
413436
logger.info("received session start request: %s", str(message).strip())
414437
self._is_streaming = True
415438

416-
dataset_path = self._dbn_path / (self._dataset or "")
439+
for sub in self.subscriptions:
440+
self.create_server_task(sub)
441+
442+
def create_server_task(self, message: SubscriptionRequest) -> None:
417443
if self.mode is MockLiveMode.REPLAY:
418-
for schema in self._schemas:
419-
for test_data_path in dataset_path.glob(f"*{schema}.dbn.zst"):
420-
decompressor = zstandard.ZstdDecompressor().stream_reader(
421-
test_data_path.read_bytes(),
422-
)
423-
logger.info(
424-
"streaming %s for %s schema",
425-
test_data_path.name,
426-
schema,
427-
)
428-
self.__transport.write(decompressor.readall())
444+
task = asyncio.create_task(self.replay_task(schema=Schema(message.schema)))
445+
else:
446+
task = asyncio.create_task(self.repeater_task(schema=Schema(message.schema)))
429447

430-
logger.info(
431-
"data streaming for %d schema(s) completed",
432-
len(self._schemas),
433-
)
448+
self._tasks.add(task)
449+
task.add_done_callback(self._tasks.remove)
450+
task.add_done_callback(self.check_done)
434451

452+
def check_done(self, _: Any) -> None:
453+
if not self._tasks:
454+
logger.info("streaming tasks completed")
435455
self.__transport.write_eof()
436-
self.__transport.close()
437456

438-
elif self.mode is MockLiveMode.REPEAT:
439-
metadata = Metadata("UNIT.TEST", 0, SType.RAW_SYMBOL, [], [], [], [])
440-
self.__transport.write(bytes(metadata))
441-
442-
loop = asyncio.get_event_loop()
443-
for schema in self._schemas:
444-
task = loop.create_task(self.repeater(schema))
445-
self._repeater_tasks.add(task)
446-
task.add_done_callback(self._repeater_tasks.remove)
447-
else:
448-
raise ValueError(f"unsupported mode {MockLiveMode.REPEAT}")
457+
async def replay_task(self, schema: Schema) -> None:
458+
for test_data_path in self.dataset_path.glob(f"*{schema}.dbn.zst"):
459+
decompressor = zstandard.ZstdDecompressor().stream_reader(
460+
test_data_path.read_bytes(),
461+
)
462+
logger.info(
463+
"streaming %s for %s schema",
464+
test_data_path.name,
465+
schema,
466+
)
467+
self.__transport.write(decompressor.readall())
449468

450-
async def repeater(self, schema: Schema) -> None:
469+
async def repeater_task(self, schema: Schema) -> None:
451470
struct = SCHEMA_STRUCT_MAP[schema]
452471
repeated = bytes(struct(*[0] * 12)) # for now we only support MBP_1
453472

0 commit comments

Comments
 (0)