11import asyncio
2- from asyncio import AbstractEventLoop
2+ from queue import Queue
3+ from asyncio import AbstractEventLoop , Queue as AsyncQueue
34from numpy import ndarray , vstack as npvstack , zeros as npzeros
45from sounddevice import OutputStream
56from typing_extensions import Any , Optional , NoReturn , Callable , deprecated
67from .._types import AudioSamplerate , AudioChannels , AudioDType
78from ..base import AsyncSoundDeviceStreamerBase , SoundDeviceStreamerBase , StreamerState
8- from ..queues import AsyncQueue , Queue
99
1010# ! Main Class
1111class CallbackSoundDeviceStreamer (SoundDeviceStreamerBase ):
@@ -18,7 +18,7 @@ def __init__(
1818 dtype : Optional [AudioDType ]= None ,
1919 closefd : bool = True ,
2020 device : Optional [int ]= None ,
21- precallback : Optional [Callable [[int ], bool ]]= None
21+ precallback : Optional [Callable [[int ], Any ]]= None
2222 ) -> None :
2323 super ().__init__ (samplerate , channels , dtype , closefd , device )
2424 self .queue : Queue [ndarray ] = Queue (1 )
@@ -33,21 +33,20 @@ def __init__(
3333 self .buffer : Optional [ndarray ] = None
3434
3535 def __callback__ (self , outdata : ndarray , frames : int , time , status ):
36- if not self .precallback (frames ):
37- return
36+ self .precallback (frames )
3837 if self .buffer is None :
3938 try :
40- d = self .queue .get ()
39+ d = self .queue .get_nowait ()
4140 except :
4241 return
4342 wdata = d [:frames ]
4443 self .buffer = d [frames :]
4544 elif len (self .buffer ) >= frames :
4645 wdata = self .buffer [:frames ]
4746 self .buffer = self .buffer [frames :]
48- elif (len (self .buffer ) < frames ) and (not self .queue .qsize () == 0 ):
47+ elif (len (self .buffer ) < frames ) and (self .queue .qsize () >= 1 ):
4948 try :
50- d = self .queue .get ()
49+ d = self .queue .get_nowait ()
5150 except :
5251 return
5352 wdata = self .buffer .copy ()
@@ -59,10 +58,10 @@ def __callback__(self, outdata: ndarray, frames: int, time, status):
5958 wdata = self .buffer .copy ()
6059 self .buffer = None
6160 needed = frames - len (wdata )
62- wdata = npvstack ([wdata , npzeros ((needed , 2 ), dtype = outdata .dtype )])
61+ wdata = npvstack ([wdata , npzeros ((needed , self . channels ), dtype = outdata .dtype )])
6362 else :
6463 self .buffer = None
65- return
64+ wdata = npzeros (( frames , self . channels ), dtype = outdata . dtype )
6665 outdata [:] = wdata
6766
6867 def is_busy (self ) -> bool :
@@ -82,14 +81,14 @@ def stop(self) -> None:
8281 if StreamerState .RUNNING in self .state :
8382 self .state &= ~ StreamerState .RUNNING
8483 self .state |= StreamerState .LOCKED
85- try : self .queue .abort ()
86- except : pass
84+ try : self .queue .task_done ()
85+ except ValueError : pass
8786 self .stream .stop ()
8887
8988 def abort (self ):
9089 self .state |= StreamerState .LOCKED
91- try : self .queue .abort ()
92- except : pass
90+ try : self .queue .task_done ()
91+ except ValueError : pass
9392 self .stream .abort ()
9493 self .state &= ~ StreamerState .LOCKED
9594
@@ -125,21 +124,20 @@ def __init__(
125124 self .buffer : Optional [ndarray ] = None
126125
127126 def __callback__ (self , outdata : ndarray , frames : int , time , status ):
128- if not self .precallback (frames ):
129- return
127+ self .precallback (frames )
130128 if self .buffer is None :
131129 try :
132- d = asyncio .run_coroutine_threadsafe (self .queue .get (), self .loop ).result ()
130+ d = asyncio .run_coroutine_threadsafe (self .queue .get_nowait (), self .loop ).result ()
133131 except :
134132 return
135133 wdata = d [:frames ]
136134 self .buffer = d [frames :]
137135 elif len (self .buffer ) >= frames :
138136 wdata = self .buffer [:frames ]
139137 self .buffer = self .buffer [frames :]
140- elif (len (self .buffer ) < frames ) and (not self .queue .qsize () == 0 ):
138+ elif (len (self .buffer ) < frames ) and (self .queue .qsize () >= 1 ):
141139 try :
142- d = asyncio .run_coroutine_threadsafe (self .queue .get (), self .loop ).result ()
140+ d = asyncio .run_coroutine_threadsafe (self .queue .get_nowait (), self .loop ).result ()
143141 except :
144142 return
145143 wdata = self .buffer .copy ()
@@ -151,10 +149,10 @@ def __callback__(self, outdata: ndarray, frames: int, time, status):
151149 wdata = self .buffer .copy ()
152150 self .buffer = None
153151 needed = frames - len (wdata )
154- wdata = npvstack ([wdata , npzeros ((needed , 2 ), dtype = outdata .dtype )])
152+ wdata = npvstack ([wdata , npzeros ((needed , self . channels ), dtype = outdata .dtype )])
155153 else :
156154 self .buffer = None
157- return
155+ wdata = npzeros (( frames , self . channels ), dtype = outdata . dtype )
158156 outdata [:] = wdata
159157
160158 async def is_busy (self ) -> bool :
@@ -174,14 +172,14 @@ async def stop(self) -> None:
174172 if StreamerState .RUNNING in self .state :
175173 self .state &= ~ StreamerState .RUNNING
176174 self .state |= StreamerState .LOCKED
177- try : await self .queue .abort ()
178- except : pass
175+ try : self .queue .task_done ()
176+ except ValueError : pass
179177 self .stream .stop ()
180178
181179 async def abort (self ):
182180 self .state |= StreamerState .LOCKED
183- try : await self .queue .abort ()
184- except : pass
181+ try : self .queue .task_done ()
182+ except ValueError : pass
185183 self .stream .abort ()
186184 self .state &= ~ StreamerState .LOCKED
187185
0 commit comments