55"""
66
77import logging
8+ import struct
89from abc import ABC , abstractmethod
910from enum import Enum
10- from typing import Optional
11+ from typing import List , Optional
1112
1213import numpy as np
1314import numpy .typing as npt
15+ import usb
1416
1517
1618class AudioBackend (Enum ):
@@ -24,12 +26,26 @@ class AudioBase(ABC):
2426 """Abstract class for opening and managing audio devices."""
2527
2628 SAMPLE_RATE = 16000 # respeaker samplerate
29+ TIMEOUT = 100000
30+ PARAMETERS = {
31+ "VERSION" : (48 , 0 , 4 , "ro" , "uint8" ),
32+ "AEC_AZIMUTH_VALUES" : (33 , 75 , 16 + 1 , "ro" , "radians" ),
33+ "DOA_VALUE" : (20 , 18 , 4 + 1 , "ro" , "uint16" ),
34+ "DOA_VALUE_RADIANS" : (20 , 19 , 8 + 1 , "ro" , "radians" ),
35+ }
2736
2837 def __init__ (self , backend : AudioBackend , log_level : str = "INFO" ) -> None :
2938 """Initialize the audio device."""
3039 self .logger = logging .getLogger (__name__ )
3140 self .logger .setLevel (log_level )
3241 self .backend = backend
42+ self ._respeaker = self ._init_respeaker_usb ()
43+ # name, resid, cmdid, length, type
44+
45+ def __del__ (self ) -> None :
46+ """Destructor to ensure resources are released."""
47+ if self ._respeaker :
48+ usb .util .dispose_resources (self ._respeaker )
3349
3450 @abstractmethod
3551 def start_recording (self ) -> None :
@@ -70,3 +86,72 @@ def play_sound(self, sound_file: str) -> None:
7086
7187 """
7288 pass
89+
90+ def _init_respeaker_usb (self ) -> Optional [usb .core .Device ]:
91+ dev = usb .core .find (idVendor = 0x2886 , idProduct = 0x001A )
92+ if not dev :
93+ return None
94+
95+ return dev
96+
97+ def _read_usb (self , name : str ) -> Optional [List [int ] | List [float ]]:
98+ try :
99+ data = self .PARAMETERS [name ]
100+ except KeyError :
101+ self .logger .error (f"Unknown parameter: { name } " )
102+ return None
103+
104+ if not self ._respeaker :
105+ self .logger .warning ("ReSpeaker device not found." )
106+ return None
107+
108+ resid = data [0 ]
109+ cmdid = 0x80 | data [1 ]
110+ length = data [2 ]
111+
112+ response = self ._respeaker .ctrl_transfer (
113+ usb .util .CTRL_IN
114+ | usb .util .CTRL_TYPE_VENDOR
115+ | usb .util .CTRL_RECIPIENT_DEVICE ,
116+ 0 ,
117+ cmdid ,
118+ resid ,
119+ length ,
120+ self .TIMEOUT ,
121+ )
122+
123+ self .logger .debug (f"Response for { name } : { response } " )
124+
125+ result : Optional [List [float ] | List [int ]] = None
126+ if data [4 ] == "uint8" :
127+ result = response .tolist ()
128+ elif data [4 ] == "radians" :
129+ byte_data = response .tobytes ()
130+ num_values = (data [2 ] - 1 ) / 4
131+ match_str = "<"
132+ for i in range (int (num_values )):
133+ match_str += "f"
134+ result = [
135+ float (x ) for x in struct .unpack (match_str , byte_data [1 : data [2 ]])
136+ ]
137+ elif data [4 ] == "uint16" :
138+ result = response .tolist ()
139+
140+ return result
141+
142+ def get_DoA (self ) -> tuple [float , bool ] | None :
143+ """Get the Direction of Arrival (DoA) value from the ReSpeaker device.
144+
145+ 0° is left, 90° is front/back, 180° is right
146+
147+ Returns:
148+ tuple: A tuple containing the DoA value as an integer and the speech detection, or None if the device is not found.
149+
150+ """
151+ if not self ._respeaker :
152+ self .logger .warning ("ReSpeaker device not found." )
153+ return None
154+ result = self ._read_usb ("DOA_VALUE_RADIANS" )
155+ if result is None :
156+ return None
157+ return float (result [0 ]), bool (result [1 ])
0 commit comments