33import logging
44import queue
55import struct
6- import threading
76from typing import IO , Callable , Iterable , List , Optional , Set , Union
87
98import databento_dbn
109
11- from databento .common .enums import Dataset , Schema , SType
10+ from databento .common .enums import Dataset
11+ from databento .common .enums import Schema
12+ from databento .common .enums import SType
1213from databento .common .error import BentoError
1314from databento .common .symbology import ALL_SYMBOLS
14- from databento .live import AUTH_TIMEOUT_SECONDS , CONNECT_TIMEOUT_SECONDS , DBNRecord
15+ from databento .live import AUTH_TIMEOUT_SECONDS
16+ from databento .live import CONNECT_TIMEOUT_SECONDS
17+ from databento .live import DBNRecord
1518from databento .live .protocol import DatabentoLiveProtocol
1619
20+
1721logger = logging .getLogger (__name__ )
1822
1923
@@ -258,7 +262,6 @@ def __init__(
258262 self ._ts_out = ts_out
259263 self ._protocol_factory = protocol_factory
260264
261- self ._lock = threading .Lock ()
262265 self ._transport : Optional [asyncio .Transport ] = None
263266 self ._protocol : Optional [_SessionProtocol ] = None
264267
@@ -276,12 +279,11 @@ def is_authenticated(self) -> bool:
276279 """
277280 if self ._protocol is None :
278281 return False
279- with self ._lock :
280- try :
281- self ._protocol .authenticated .result ()
282- except (asyncio .InvalidStateError , asyncio .CancelledError , BentoError ):
283- return False
284- return True
282+ try :
283+ self ._protocol .authenticated .result ()
284+ except (asyncio .InvalidStateError , asyncio .CancelledError , BentoError ):
285+ return False
286+ return True
285287
286288 def is_disconnected (self ) -> bool :
287289 """
@@ -294,12 +296,7 @@ def is_disconnected(self) -> bool:
294296 """
295297 if self ._protocol is None :
296298 return True
297- with self ._lock :
298- try :
299- self ._protocol .disconnected .result ()
300- except (asyncio .InvalidStateError , asyncio .CancelledError , Exception ):
301- return False
302- return True
299+ return self ._protocol .disconnected .done ()
303300
304301 def is_reading (self ) -> bool :
305302 """
@@ -312,8 +309,7 @@ def is_reading(self) -> bool:
312309 """
313310 if self ._transport is None :
314311 return False
315- with self ._lock :
316- return self ._transport .is_reading ()
312+ return self ._transport .is_reading ()
317313
318314 def is_started (self ) -> bool :
319315 """
@@ -326,8 +322,7 @@ def is_started(self) -> bool:
326322 """
327323 if self ._protocol is None :
328324 return False
329- with self ._lock :
330- return self ._protocol .started .is_set ()
325+ return self ._protocol .started .is_set ()
331326
332327 @property
333328 def metadata (self ) -> Optional [databento_dbn .Metadata ]:
@@ -356,9 +351,8 @@ def abort(self) -> None:
356351 """
357352 if self ._transport is None :
358353 return
359- with self ._lock :
360- self ._transport .abort ()
361- self ._protocol = None
354+ self ._transport .abort ()
355+ self ._protocol = None
362356
363357 def close (self ) -> None :
364358 """
@@ -367,10 +361,9 @@ def close(self) -> None:
367361 """
368362 if self ._transport is None :
369363 return
370- with self ._lock :
371- if self ._transport .can_write_eof ():
372- self ._transport .write_eof ()
373- self ._transport .close ()
364+ if self ._transport .can_write_eof ():
365+ self ._loop .call_soon_threadsafe (self ._transport .write_eof )
366+ self ._loop .call_soon_threadsafe (self ._transport .close )
374367
375368 def subscribe (
376369 self ,
@@ -421,8 +414,7 @@ def resume_reading(self) -> None:
421414 """
422415 if self ._transport is None :
423416 return
424- with self ._lock :
425- self ._transport .resume_reading ()
417+ self ._loop .call_soon_threadsafe (self ._transport .resume_reading )
426418
427419 def start (self ) -> None :
428420 """
@@ -445,10 +437,16 @@ async def wait_for_close(self) -> None:
445437 """
446438 if self ._protocol is None :
447439 return
440+
448441 await self ._protocol .disconnected
442+ disconnect_exc = self ._protocol .disconnected .exception ()
443+
449444 await self ._protocol .wait_for_processing ()
450445 self ._protocol = self ._transport = None
451446
447+ if disconnect_exc is not None :
448+ raise BentoError (disconnect_exc )
449+
452450 def _connect (
453451 self ,
454452 dataset : Union [Dataset , str ],
@@ -521,6 +519,5 @@ async def _connect_task(
521519 "authentication with remote gateway completed" ,
522520 )
523521
524- with self ._lock :
525- self ._transport = transport
526- self ._protocol = protocol
522+ self ._transport = transport
523+ self ._protocol = protocol
0 commit comments