Skip to content

Commit fff2d0e

Browse files
Merge pull request #448 from dvonthenen/fix-httpx-streaming
Fix HTTPX Streaming
2 parents 91fc36d + f903063 commit fff2d0e

File tree

4 files changed

+109
-5
lines changed

4 files changed

+109
-5
lines changed

deepgram/clients/abstract_async_client.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -348,7 +348,6 @@ async def _handle_request_raw(
348348
client = httpx.AsyncClient(timeout=timeout, transport=transport)
349349
if transport:
350350
kwargs.pop("transport")
351-
kwargs.pop("transport")
352351
req = client.build_request(method, _url, headers=_headers, **kwargs)
353352
return await client.send(req, stream=True)
354353

deepgram/clients/abstract_sync_client.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -337,9 +337,9 @@ def _handle_request_raw(
337337

338338
try:
339339
transport = kwargs.get("transport")
340-
with httpx.Client(timeout=timeout, transport=transport) as client:
341-
if transport:
342-
kwargs.pop("transport")
340+
client = httpx.Client(timeout=timeout, transport=transport)
341+
if transport:
342+
kwargs.pop("transport")
343343
req = client.build_request(method, _url, headers=_headers, **kwargs)
344344
return client.send(req, stream=True)
345345

examples/requirements-examples.txt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,6 @@
44
python-dotenv
55

66
# streaming libs
7-
pyaudio
7+
pyaudio
8+
sounddevice==0.4.7
9+
numpy==2.0.1
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
# Copyright 2024 Deepgram SDK contributors. All Rights Reserved.
2+
# Use of this source code is governed by a MIT license that can be found in the LICENSE file.
3+
# SPDX-License-Identifier: MIT
4+
5+
import sounddevice as sd
6+
import numpy as np
7+
import queue
8+
import threading
9+
10+
from deepgram import (
11+
DeepgramClient,
12+
SpeakOptions,
13+
)
14+
15+
SPEAK_TEXT = {"text": "Hello world!"}
16+
17+
18+
# Define a queue to manage audio data
19+
audio_queue = queue.Queue(maxsize=20) # Adjust size as needed
20+
21+
element_size = np.dtype(np.int16).itemsize # Element size for np.int16 (16-bit integer)
22+
CHUNK_SIZE = 32768 # Desired size of each audio chunk in bytes
23+
24+
25+
def fetch_audio(response):
26+
try:
27+
buffer = bytearray() # Buffer to accumulate data
28+
for data in response.iter_bytes():
29+
buffer.extend(data) # Add incoming data to buffer
30+
while len(buffer) >= CHUNK_SIZE:
31+
# Extract a chunk of the desired size
32+
chunk = buffer[:CHUNK_SIZE]
33+
buffer = buffer[CHUNK_SIZE:] # Remove the chunk from the buffer
34+
35+
# Ensure the chunk is aligned to the element size
36+
buffer_size = len(chunk) - (len(chunk) % element_size)
37+
38+
if buffer_size > 0:
39+
audio_data = np.frombuffer(chunk[:buffer_size], dtype=np.int16)
40+
audio_queue.put(audio_data)
41+
print(
42+
f"Queued audio data of size: {audio_data.size * element_size} bytes"
43+
)
44+
45+
# Process any remaining data in the buffer
46+
if buffer:
47+
audio_data = np.frombuffer(buffer, dtype=np.int16)
48+
audio_queue.put(audio_data)
49+
print(
50+
f"Queued remaining audio data of size: {audio_data.size * element_size} bytes"
51+
)
52+
53+
# Signal the end of the stream
54+
audio_queue.put(None)
55+
print("End of audio stream.")
56+
except Exception as e:
57+
print(f"Fetch audio exception: {e}")
58+
59+
60+
def main():
61+
try:
62+
# STEP 1: Create a Deepgram client using the API key from environment variables
63+
deepgram: DeepgramClient = DeepgramClient()
64+
65+
# STEP 2: Call the save method on the speak property
66+
options = SpeakOptions(
67+
model="aura-asteria-en",
68+
encoding="linear16",
69+
container="none",
70+
sample_rate=48000,
71+
)
72+
73+
response = deepgram.speak.rest.v("1").stream_raw(SPEAK_TEXT, options)
74+
75+
# Display response headers
76+
print("Response headers:")
77+
for header in response.headers:
78+
print(f"{header}: {response.headers[header]}")
79+
80+
# Create and start a separate thread for fetching audio
81+
fetch_thread = threading.Thread(target=fetch_audio, args=(response,))
82+
fetch_thread.start()
83+
84+
# Play audio data from the queue
85+
while True:
86+
audio_data = audio_queue.get()
87+
if audio_data is None:
88+
break # End of stream
89+
90+
# Play audio data using sounddevice
91+
sd.play(audio_data, samplerate=48000)
92+
sd.wait() # Wait for the audio to finish playing
93+
94+
fetch_thread.join()
95+
96+
print("Audio playback finished.")
97+
98+
except Exception as e:
99+
print(f"Exception: {e}")
100+
101+
102+
if __name__ == "__main__":
103+
main()

0 commit comments

Comments
 (0)