Skip to content

Commit 5b122ca

Browse files
Enable Inheritance, Pass Self in Callbacks, Make Async/Sync Client StartUp Same
1 parent edfd3b6 commit 5b122ca

File tree

8 files changed

+173
-86
lines changed

8 files changed

+173
-86
lines changed

deepgram/client.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
AsyncPreRecordedClient,
1515
PrerecordedOptions,
1616
LiveOptions,
17+
LiveTranscriptionEvents,
1718
)
1819
from .clients.onprem.client import OnPremClient
1920
from .clients.onprem.v1.async_client import AsyncOnPremClient

deepgram/clients/listen.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,15 @@
1212
AsyncPreRecordedClient,
1313
PrerecordedOptions,
1414
)
15-
from .live.client import LiveClient, AsyncLiveClient, LiveOptions
15+
from .live.client import (
16+
LiveClient,
17+
AsyncLiveClient,
18+
LiveOptions,
19+
LiveResultResponse,
20+
MetadataResponse,
21+
ErrorResponse,
22+
LiveTranscriptionEvents,
23+
)
1624
from .errors import DeepgramModuleError
1725

1826

deepgram/clients/live/client.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
from .v1.client import LiveClient as LiveClientLatest
66
from .v1.async_client import AsyncLiveClient as AsyncLiveClientLatest
77
from .v1.options import LiveOptions as LiveOptionsLatest
8+
from .enums import LiveTranscriptionEvents
9+
from .v1.response import LiveResultResponse, MetadataResponse, ErrorResponse
810

911
"""
1012
The vX/client.py points to the current supported version in the SDK.

deepgram/clients/live/v1/async_client.py

Lines changed: 40 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
from ..helpers import convert_to_websocket_url, append_query_params
1212
from ..errors import DeepgramError
1313

14+
from .response import LiveResultResponse, MetadataResponse, ErrorResponse
1415
from .options import LiveOptions
1516

1617

@@ -38,14 +39,14 @@ def __init__(self, config: DeepgramClientOptions):
3839
self._event_handlers = {event: [] for event in LiveTranscriptionEvents}
3940
self.websocket_url = convert_to_websocket_url(self.config.url, self.endpoint)
4041

41-
async def __call__(self, options: LiveOptions = None):
42-
"""
43-
Establishes a WebSocket connection for live transcription.
44-
"""
45-
self.logger.debug("AsyncLiveClient.__call__ ENTER")
46-
self.logger.info("options: %s", options)
42+
async def start(self, options: LiveOptions = None, **kwargs):
43+
self.logger.debug("AsyncLiveClient.start ENTER")
44+
self.logger.info("kwargs: %s", options)
45+
self.logger.info("options: %s", kwargs)
4746

4847
self.options = options
48+
self.kwargs = kwargs
49+
4950
if isinstance(options, LiveOptions):
5051
self.logger.info("LiveOptions switching class -> json")
5152
self.options = self.options.to_dict()
@@ -55,13 +56,13 @@ async def __call__(self, options: LiveOptions = None):
5556
self._socket = await _socket_connect(url_with_params, self.config.headers)
5657
asyncio.create_task(self._start())
5758

58-
self.logger.notice("__call__ succeeded")
59-
self.logger.debug("AsyncLiveClient.__call__ LEAVE")
59+
self.logger.notice("start succeeded")
60+
self.logger.debug("AsyncLiveClient.start LEAVE")
6061
return self
6162
except websockets.ConnectionClosed as e:
6263
await self._emit(LiveTranscriptionEvents.Close, e.code)
6364
self.logger.notice("exception: websockets.ConnectionClosed")
64-
self.logger.debug("AsyncLiveClient.__call__ LEAVE")
65+
self.logger.debug("AsyncLiveClient.start LEAVE")
6566

6667
def on(self, event, handler):
6768
"""
@@ -74,7 +75,7 @@ async def _emit(
7475
self, event, *args, **kwargs
7576
): # triggers the registered event handlers for a specific event
7677
for handler in self._event_handlers[event]:
77-
handler(*args, **kwargs)
78+
handler(self, *args, **kwargs)
7879

7980
async def _start(self) -> None:
8081
self.logger.debug("AsyncLiveClient._start ENTER")
@@ -85,25 +86,45 @@ async def _start(self) -> None:
8586
response_type = data.get("type")
8687
match response_type:
8788
case LiveTranscriptionEvents.Transcript.value:
88-
self.logger.verbose(
89+
self.logger.debug(
8990
"response_type: %s, data: %s", response_type, data
9091
)
91-
await self._emit(LiveTranscriptionEvents.Transcript, data)
92-
case LiveTranscriptionEvents.Error.value:
93-
self.logger.verbose(
94-
"response_type: %s, data: %s", response_type, data
92+
result = LiveResultResponse.from_json(message)
93+
await self._emit(
94+
LiveTranscriptionEvents.Transcript,
95+
result=result,
96+
kwargs=self.kwargs,
9597
)
96-
await self._emit(LiveTranscriptionEvents.Error, data)
9798
case LiveTranscriptionEvents.Metadata.value:
98-
self.logger.verbose(
99+
self.logger.debug(
99100
"response_type: %s, data: %s", response_type, data
100101
)
101-
await self._emit(LiveTranscriptionEvents.Metadata, data)
102+
result = ErrorResponse.from_json(message)
103+
await self._emit(
104+
LiveTranscriptionEvents.Metadata,
105+
metadata=result,
106+
kwargs=self.kwargs,
107+
)
108+
case LiveTranscriptionEvents.Error.value:
109+
self.logger.debug(
110+
"response_type: %s, data: %s", response_type, data
111+
)
112+
result = MetadataResponse.from_json(message)
113+
await self._emit(
114+
LiveTranscriptionEvents.Error,
115+
error=result,
116+
kwargs=self.kwargs,
117+
)
102118
case _:
103119
self.logger.error(
104120
"response_type: %s, data: %s", response_type, data
105121
)
106-
await self._emit(LiveTranscriptionEvents.Error, data)
122+
error = ErrorResponse(
123+
type="UnhandledMessage",
124+
description="Unknown message type",
125+
message=f"Unhandle message type: {response_type}",
126+
)
127+
await self._emit(LiveTranscriptionEvents.Error, error=error)
107128
except json.JSONDecodeError as e:
108129
await self._emit(LiveTranscriptionEvents.Error, e.code)
109130
self.logger.error("exception: json.JSONDecodeError: %s", str(e))

deepgram/clients/live/v1/client.py

Lines changed: 40 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -43,14 +43,17 @@ def __init__(self, config: DeepgramClientOptions):
4343
self._event_handlers = {event: [] for event in LiveTranscriptionEvents}
4444
self.websocket_url = convert_to_websocket_url(self.config.url, self.endpoint)
4545

46-
def start(self, options: LiveOptions = None):
46+
def start(self, options: LiveOptions = None, **kwargs):
4747
"""
4848
Starts the WebSocket connection for live transcription.
4949
"""
5050
self.logger.debug("LiveClient.start ENTER")
51-
self.logger.info("options: %s", options)
51+
self.logger.info("kwargs: %s", options)
52+
self.logger.info("options: %s", kwargs)
5253

5354
self.options = options
55+
self.kwargs = kwargs
56+
5457
if isinstance(options, LiveOptions):
5558
self.logger.info("LiveOptions switching class -> json")
5659
self.options = self.options.to_dict()
@@ -89,11 +92,9 @@ def on(self, event, handler):
8992
if event in LiveTranscriptionEvents and callable(handler):
9093
self._event_handlers[event].append(handler)
9194

92-
def _emit(
93-
self, event, *args, **kwargs
94-
):
95+
def _emit(self, event, *args, **kwargs):
9596
for handler in self._event_handlers[event]:
96-
handler(*args, **kwargs)
97+
handler(self, *args, **kwargs)
9798

9899
def _listening(self) -> None:
99100
self.logger.debug("LiveClient._listening ENTER")
@@ -119,22 +120,45 @@ def _listening(self) -> None:
119120

120121
match response_type:
121122
case LiveTranscriptionEvents.Transcript.value:
123+
self.logger.debug(
124+
"response_type: %s, data: %s", response_type, data
125+
)
122126
result = LiveResultResponse.from_json(message)
123-
self._emit(LiveTranscriptionEvents.Transcript, result=result)
127+
self._emit(
128+
LiveTranscriptionEvents.Transcript,
129+
result=result,
130+
kwargs=self.kwargs,
131+
)
124132
case LiveTranscriptionEvents.Metadata.value:
133+
self.logger.debug(
134+
"response_type: %s, data: %s", response_type, data
135+
)
125136
result = MetadataResponse.from_json(message)
126-
self._emit(LiveTranscriptionEvents.Metadata, metadata=result)
137+
self._emit(
138+
LiveTranscriptionEvents.Metadata,
139+
metadata=result,
140+
kwargs=self.kwargs,
141+
)
127142
case LiveTranscriptionEvents.Error.value:
143+
self.logger.debug(
144+
"response_type: %s, data: %s", response_type, data
145+
)
128146
result = ErrorResponse.from_json(message)
129-
self._emit(LiveTranscriptionEvents.Error, error=result)
147+
self._emit(
148+
LiveTranscriptionEvents.Error,
149+
error=result,
150+
kwargs=self.kwargs,
151+
)
130152
case _:
131-
error: ErrorResponse = {
132-
"type": "UnhandledMessage",
133-
"description": "Unknown message type",
134-
"message": f"Unhandle message type: {response_type}",
135-
"variant": "",
136-
}
137-
self._emit(LiveTranscriptionEvents.Error, error)
153+
self.logger.error(
154+
"response_type: %s, data: %s", response_type, data
155+
)
156+
error = ErrorResponse(
157+
type="UnhandledMessage",
158+
description="Unknown message type",
159+
message=f"Unhandle message type: {response_type}",
160+
)
161+
self._emit(LiveTranscriptionEvents.Error, error=error)
138162

139163
except Exception as e:
140164
if e.code == 1000:

examples/streaming/async_http/main.py

Lines changed: 41 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -20,44 +20,55 @@
2020
# URL for the realtime streaming audio you would like to transcribe
2121
URL = "http://stream.live.vc.bbcmedia.co.uk/bbc_world_service"
2222

23-
deepgram_api_key = os.getenv("DG_API_KEY")
24-
2523

2624
async def main():
27-
deepgram = DeepgramClient(deepgram_api_key)
25+
deepgram = DeepgramClient()
2826

2927
# Create a websocket connection to Deepgram
3028
try:
31-
dg_connection = await deepgram.listen.asynclive.v("1")(options)
29+
dg_connection = deepgram.listen.asynclive.v("1")
30+
31+
def on_message(self, result, **kwargs):
32+
if result is None:
33+
return
34+
sentence = result.channel.alternatives[0].transcript
35+
if len(sentence) == 0:
36+
return
37+
print(f"speaker: {sentence}")
38+
39+
def on_metadata(self, metadata, **kwargs):
40+
if metadata is None:
41+
return
42+
print(f"\n\n{metadata}\n\n")
43+
44+
def on_error(self, error, **kwargs):
45+
if error is None:
46+
return
47+
print(f"\n\n{error}\n\n")
48+
49+
dg_connection.on(LiveTranscriptionEvents.Transcript, on_message)
50+
dg_connection.on(LiveTranscriptionEvents.Metadata, on_metadata)
51+
dg_connection.on(LiveTranscriptionEvents.Error, on_error)
52+
53+
# connect to websocket
54+
await dg_connection.start(options)
55+
56+
# Send streaming audio from the URL to Deepgram
57+
async with aiohttp.ClientSession() as session:
58+
async with session.get(URL) as audio:
59+
while True:
60+
data = await audio.content.readany()
61+
# send audio data through the socket
62+
await dg_connection.send(data)
63+
# If no data is being sent from the live stream, then break out of the loop.
64+
if not data:
65+
break
66+
67+
# Indicate that we've finished sending data by sending the {"type": "CloseStream"}
68+
await dg_connection.finish()
3269
except Exception as e:
3370
print(f"Could not open socket: {e}")
3471
return
3572

36-
# Listen for transcripts received from Deepgram and write them to the console
37-
dg_connection.on(LiveTranscriptionEvents.Transcript, print)
38-
39-
# Listen for metadata received from Deepgram and write to the console
40-
dg_connection.on(LiveTranscriptionEvents.Metadata, print)
41-
42-
# Listen for the connection to close
43-
dg_connection.on(
44-
LiveTranscriptionEvents.Close,
45-
lambda c: print(f"Connection closed with code {c}."),
46-
)
47-
48-
# Send streaming audio from the URL to Deepgram
49-
async with aiohttp.ClientSession() as session:
50-
async with session.get(URL) as audio:
51-
while True:
52-
data = await audio.content.readany()
53-
# send audio data through the socket
54-
await dg_connection.send(data)
55-
# If no data is being sent from the live stream, then break out of the loop.
56-
if not data:
57-
break
58-
59-
# Indicate that we've finished sending data by sending the {"type": "CloseStream"}
60-
await dg_connection.finish()
61-
6273

6374
asyncio.run(main())

examples/streaming/http/main.py

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,10 @@
1111

1212
load_dotenv()
1313

14-
options = LiveOptions(model="nova", interim_results=False, language="en-US")
15-
1614
# URL for the realtime streaming audio you would like to transcribe
1715
URL = "http://stream.live.vc.bbcmedia.co.uk/bbc_world_service"
1816

17+
1918
def main():
2019
try:
2120
deepgram = DeepgramClient()
@@ -40,12 +39,33 @@ def on_error(error=None):
4039

4140
# Create a websocket connection to Deepgram
4241
dg_connection = deepgram.listen.live.v("1")
43-
dg_connection.start(options)
42+
43+
def on_message(self, result, **kwargs):
44+
if result is None:
45+
return
46+
sentence = result.channel.alternatives[0].transcript
47+
if len(sentence) == 0:
48+
return
49+
print(f"speaker: {sentence}")
50+
51+
def on_metadata(self, metadata, **kwargs):
52+
if metadata is None:
53+
return
54+
print(f"\n\n{metadata}\n\n")
55+
56+
def on_error(self, error, **kwargs):
57+
if error is None:
58+
return
59+
print(f"\n\n{error}\n\n")
4460

4561
dg_connection.on(LiveTranscriptionEvents.Transcript, on_message)
4662
dg_connection.on(LiveTranscriptionEvents.Metadata, on_metadata)
4763
dg_connection.on(LiveTranscriptionEvents.Error, on_error)
4864

65+
# connect to websocket
66+
options = LiveOptions(model="nova", interim_results=False, language="en-US")
67+
dg_connection.start(options)
68+
4969
lock_exit = threading.Lock()
5070
exit = False
5171

0 commit comments

Comments
 (0)