Skip to content

Commit 026f547

Browse files
WS Protocol Ping & Some Clean Up
1 parent 2cecfb8 commit 026f547

File tree

3 files changed

+43
-22
lines changed

3 files changed

+43
-22
lines changed

deepgram/clients/live/v1/async_client.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,8 @@ async def finish(self):
162162
await self._socket.wait_closed()
163163
self.logger.notice("socket.wait_closed succeeded")
164164

165+
self._socket = None
166+
165167
self.logger.notice("finish succeeded")
166168
self.logger.debug("AsyncLiveClient.finish LEAVE")
167169

deepgram/clients/live/v1/client.py

Lines changed: 37 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -81,11 +81,8 @@ def start(self, options: LiveOptions = None, addons: dict = None, **kwargs):
8181
self.listening.start()
8282

8383
# keepalive thread
84-
self.processing = None
85-
if self.config.options.get("keepalive") == "true":
86-
self.logger.info("KeepAlive enabled")
87-
self.processing = threading.Thread(target=self._processing)
88-
self.processing.start()
84+
self.processing = threading.Thread(target=self._processing)
85+
self.processing.start()
8986

9087
self.logger.notice("start succeeded")
9188
self.logger.debug("LiveClient.start LEAVE")
@@ -111,7 +108,7 @@ def _listening(self) -> None:
111108
myExit = self.exit
112109
self.lock_exit.release()
113110
if myExit:
114-
self.logger.notice("exiting gracefully")
111+
self.logger.notice("_listening exiting gracefully")
115112
self.logger.debug("LiveClient._listening LEAVE")
116113
return
117114

@@ -156,19 +153,15 @@ def _listening(self) -> None:
156153
**dict(self.kwargs),
157154
)
158155
case _:
159-
self.logger.error(
160-
"response_type: %s, data: %s", response_type, data
161-
)
162-
error = ErrorResponse(
163-
type="UnhandledMessage",
164-
description="Unknown message type",
165-
message=f"Unhandle message type: {response_type}",
156+
self.logger.warning(
157+
"Unknown Message: response_type: %s, data: %s",
158+
response_type,
159+
data,
166160
)
167-
self._emit(LiveTranscriptionEvents.Error, error=error)
168161

169162
except Exception as e:
170163
if e.code == 1000:
171-
self.logger.notice("exiting thread gracefully")
164+
self.logger.notice("_listening(1000) exiting gracefully")
172165
self.logger.debug("LiveClient._listening LEAVE")
173166
return
174167

@@ -186,25 +179,34 @@ def _listening(self) -> None:
186179
def _processing(self) -> None:
187180
self.logger.debug("LiveClient._processing ENTER")
188181

182+
counter = 0
183+
189184
while True:
190185
try:
191186
time.sleep(PING_INTERVAL)
187+
counter += 1
192188

193189
self.lock_exit.acquire()
194190
myExit = self.exit
195191
self.lock_exit.release()
196192
if myExit:
197-
self.logger.notice("exiting gracefully")
193+
self.logger.notice("_processing exiting gracefully")
198194
self.logger.debug("LiveClient._processing LEAVE")
199195
return
200196

201197
# deepgram keepalive
202-
self.logger.debug("Sending KeepAlive...")
203-
self.send(json.dumps({"type": "KeepAlive"}))
198+
if self.config.options.get("keepalive") == "true":
199+
self.logger.debug("Sending KeepAlive...")
200+
self.send(json.dumps({"type": "KeepAlive"}))
201+
202+
# websocket keepalive
203+
if counter % 4 == 0:
204+
self.logger.debug("Sending Ping...")
205+
self.send_ping()
204206

205207
except Exception as e:
206208
if e.code == 1000:
207-
self.logger.notice("exiting thread gracefully")
209+
self.logger.notice("_processing(1000) exiting gracefully")
208210
self.logger.debug("LiveClient._processing LEAVE")
209211
return
210212

@@ -239,6 +241,19 @@ def send(self, data) -> int:
239241
self.logger.spam("LiveClient.send LEAVE")
240242
return 0
241243

244+
def send_ping(self) -> None:
245+
"""
246+
Sends a ping over the WebSocket connection.
247+
"""
248+
self.logger.spam("LiveClient.send_ping ENTER")
249+
250+
if self._socket:
251+
self.lock_send.acquire()
252+
self._socket.ping()
253+
self.lock_send.release()
254+
255+
self.logger.spam("LiveClient.send_ping LEAVE")
256+
242257
def finish(self):
243258
"""
244259
Closes the WebSocket connection gracefully.
@@ -247,8 +262,8 @@ def finish(self):
247262

248263
if self._socket:
249264
self.logger.notice("sending CloseStream...")
250-
self._socket.send(json.dumps({"type": "CloseStream"}))
251-
time.sleep(1)
265+
self.send(json.dumps({"type": "CloseStream"}))
266+
time.sleep(0.5)
252267

253268
self.lock_exit.acquire()
254269
self.logger.notice("signal exit")
@@ -271,6 +286,7 @@ def finish(self):
271286

272287
self._socket = None
273288
self.lock_exit = None
289+
self.lock_send = None
274290

275291
self.logger.notice("finish succeeded")
276292
self.logger.spam("LiveClient.finish LEAVE")

examples/streaming/microphone/main.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ def main():
2222
try:
2323
# example of setting up a client config. logging values: WARNING, VERBOSE, DEBUG, SPAM
2424
# config = DeepgramClientOptions(
25-
# verbose=logging.SPAM, options={"keepalive": "true"}
25+
# verbose=logging.DEBUG,
26+
# options={"keepalive": "true"}
2627
# )
2728
# deepgram: DeepgramClient = DeepgramClient("", config)
2829
# otherwise, use default config
@@ -89,6 +90,8 @@ def on_error(self, error, **kwargs):
8990
dg_connection.finish()
9091

9192
print("Finished")
93+
# sleep(30) # wait 30 seconds to see if there is any additional socket activity
94+
# print("Really done!")
9295

9396
except Exception as e:
9497
print(f"Could not open socket: {e}")

0 commit comments

Comments
 (0)