Skip to content

Commit 828a7fd

Browse files
committed
FIX: Live client error handling on connect
1 parent e180a58 commit 828a7fd

File tree

4 files changed

+18
-10
lines changed

4 files changed

+18
-10
lines changed

CHANGELOG.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,13 @@
11
# Changelog
22

3+
## 0.27.0 - TBD
4+
5+
#### Breaking changes
6+
- Renamed `DatatbentoLiveProtocol.started` to `DatatbentoLiveProtocol.is_started` which now returns a bool instead of an `asyncio.Event`
7+
8+
#### Bug fixes
9+
- Fixed an issue where an error message from the live gateway would not properly raise an exception if the connection closed before `Live.start` was called
10+
311
## 0.26.0 - 2024-01-16
412

513
This release adds support for transcoding DBN data into Apache parquet.

databento/live/protocol.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ def __init__(
8282

8383
self._authenticated: asyncio.Future[int] = asyncio.Future()
8484
self._disconnected: asyncio.Future[None] = asyncio.Future()
85-
self._started = asyncio.Event()
85+
self._started: bool = False
8686

8787
@property
8888
def authenticated(self) -> asyncio.Future[int]:
@@ -118,14 +118,14 @@ def disconnected(self) -> asyncio.Future[None]:
118118
return self._disconnected
119119

120120
@property
121-
def started(self) -> asyncio.Event:
121+
def is_started(self) -> bool:
122122
"""
123-
Event that is set when the session has started streaming. This occurs
124-
when the SessionStart message is sent to the gateway.
123+
True if the session has started streaming. This occurs when the
124+
SessionStart message is sent to the gateway.
125125
126126
Returns
127127
-------
128-
asyncio.Event
128+
bool
129129
130130
"""
131131
return self._started
@@ -219,7 +219,7 @@ def buffer_updated(self, nbytes: int) -> None:
219219
logger.debug("read %d bytes from remote gateway", nbytes)
220220
data = self.__buffer[:nbytes]
221221

222-
if self.started.is_set():
222+
if self.authenticated.done():
223223
self._process_dbn(data)
224224
else:
225225
self._process_gateway(data)
@@ -303,7 +303,7 @@ def start(
303303
"""
304304
logger.debug("sending start")
305305
message = SessionStart()
306-
self.started.set()
306+
self._started = True
307307
self.transport.write(bytes(message))
308308

309309
def _process_dbn(self, data: bytes) -> None:

databento/live/session.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,8 @@ def is_reading(self) -> bool:
276276

277277
def is_started(self) -> bool:
278278
"""
279-
Return true if the session's connection has started streaming.
279+
Return true if the session's connection has started streaming, false
280+
otherwise.
280281
281282
Returns
282283
-------
@@ -286,7 +287,7 @@ def is_started(self) -> bool:
286287
with self._lock:
287288
if self._protocol is None:
288289
return False
289-
return self._protocol.started.is_set()
290+
return self._protocol.is_started
290291

291292
@property
292293
def metadata(self) -> databento_dbn.Metadata | None:

tests/test_live_protocol.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,6 @@ async def test_protocol_connection_streaming(
9090
)
9191

9292
protocol.start()
93-
await asyncio.wait_for(protocol.started.wait(), timeout=1)
9493
await asyncio.wait_for(protocol.disconnected, timeout=1)
9594

9695
# Assert

0 commit comments

Comments
 (0)