Skip to content

Commit c84a06e

Browse files
committed
DEL: Remove DBN tasks from Live client
1 parent 6256848 commit c84a06e

File tree

1 file changed

+38
-66
lines changed

1 file changed

+38
-66
lines changed

databento/live/session.py

Lines changed: 38 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -134,37 +134,59 @@ def __init__(
134134
self._dbn_queue = dbn_queue
135135
self._loop = loop
136136
self._metadata: SessionMetadata = metadata
137-
self._tasks: set[asyncio.Task[None]] = set()
138137
self._user_callbacks = user_callbacks
139138
self._user_streams = user_streams
140139

141140
def received_metadata(self, metadata: databento_dbn.Metadata) -> None:
142141
if not self._metadata:
143142
self._metadata.data = metadata
144143
for stream, exc_callback in self._user_streams.items():
145-
task = self._loop.create_task(
146-
self._stream_task(stream, metadata, exc_callback),
147-
)
148-
task.add_done_callback(self._tasks.remove)
149-
self._tasks.add(task)
144+
try:
145+
stream.write(bytes(metadata))
146+
except Exception as exc:
147+
stream_name = getattr(stream, "name", str(stream))
148+
logger.error(
149+
"error writing %s to `%s` stream",
150+
type(metadata).__name__,
151+
stream_name,
152+
exc_info=exc,
153+
)
154+
if exc_callback is not None:
155+
exc_callback(exc)
150156
else:
151157
self._metadata.check(metadata)
152158
return super().received_metadata(metadata)
153159

154160
def received_record(self, record: DBNRecord) -> None:
155161
for callback, exc_callback in self._user_callbacks.items():
156-
task = self._loop.create_task(
157-
self._callback_task(callback, record, exc_callback),
158-
)
159-
task.add_done_callback(self._tasks.remove)
160-
self._tasks.add(task)
162+
try:
163+
callback(record)
164+
except Exception as exc:
165+
logger.error(
166+
"error dispatching %s to `%s` callback",
167+
type(record).__name__,
168+
getattr(callback, "__name__", str(callback)),
169+
exc_info=exc,
170+
)
171+
if exc_callback is not None:
172+
exc_callback(exc)
161173

174+
has_ts_out = self._metadata.data and self._metadata.data.ts_out
162175
for stream, exc_callback in self._user_streams.items():
163-
task = self._loop.create_task(
164-
self._stream_task(stream, record, exc_callback),
165-
)
166-
task.add_done_callback(self._tasks.remove)
167-
self._tasks.add(task)
176+
try:
177+
stream.write(bytes(record))
178+
if not isinstance(record, databento_dbn.Metadata) and has_ts_out:
179+
stream.write(struct.pack("Q", record.ts_out))
180+
except Exception as exc:
181+
stream_name = getattr(stream, "name", str(stream))
182+
logger.error(
183+
"error writing %s to `%s` stream",
184+
type(record).__name__,
185+
stream_name,
186+
exc_info=exc,
187+
)
188+
if exc_callback is not None:
189+
exc_callback(exc)
168190

169191
if self._dbn_queue.enabled:
170192
try:
@@ -185,55 +207,6 @@ def received_record(self, record: DBNRecord) -> None:
185207

186208
return super().received_record(record)
187209

188-
async def _callback_task(
189-
self,
190-
record_callback: RecordCallback,
191-
record: DBNRecord,
192-
exception_callback: ExceptionCallback | None,
193-
) -> None:
194-
try:
195-
record_callback(record)
196-
except Exception as exc:
197-
logger.error(
198-
"error dispatching %s to `%s` callback",
199-
type(record).__name__,
200-
getattr(record_callback, "__name__", str(record_callback)),
201-
exc_info=exc,
202-
)
203-
if exception_callback is not None:
204-
self._loop.call_soon_threadsafe(exception_callback, exc)
205-
206-
async def _stream_task(
207-
self,
208-
stream: IO[bytes],
209-
record: databento_dbn.Metadata | DBNRecord,
210-
exc_callback: ExceptionCallback | None,
211-
) -> None:
212-
has_ts_out = self._metadata.data and self._metadata.data.ts_out
213-
try:
214-
stream.write(bytes(record))
215-
if not isinstance(record, databento_dbn.Metadata) and has_ts_out:
216-
stream.write(struct.pack("Q", record.ts_out))
217-
except Exception as exc:
218-
stream_name = getattr(stream, "name", str(stream))
219-
logger.error(
220-
"error writing %s to `%s` stream",
221-
type(record).__name__,
222-
stream_name,
223-
exc_info=exc,
224-
)
225-
if exc_callback is not None:
226-
self._loop.call_soon_threadsafe(exc_callback, exc)
227-
228-
async def wait_for_processing(self) -> None:
229-
while self._tasks:
230-
logger.info(
231-
"waiting for %d record(s) to process",
232-
len(self._tasks),
233-
)
234-
await asyncio.gather(*self._tasks)
235-
236-
237210
class Session:
238211
"""
239212
Parameters
@@ -446,7 +419,6 @@ async def wait_for_close(self) -> None:
446419

447420
await self._protocol.authenticated
448421
await self._protocol.disconnected
449-
await self._protocol.wait_for_processing()
450422

451423
try:
452424
self._protocol.authenticated.result()

0 commit comments

Comments
 (0)