1- from typing import AsyncIterator , Optional
21import asyncio
32import numpy as np
43import contextlib
54from dataclasses import dataclass
5+ from typing import AsyncIterator , Optional
66from .audio_frame import AudioFrame
77from .log import logger
88
9-
109_Stream = AsyncIterator [AudioFrame ]
1110
1211
@@ -58,7 +57,9 @@ def __init__(
5857 self ._chunk_size : int = blocksize if blocksize > 0 else int (sample_rate // 10 )
5958 self ._stream_timeout_ms : int = stream_timeout_ms
6059 self ._queue : asyncio .Queue [Optional [AudioFrame ]] = asyncio .Queue (maxsize = capacity )
61- self ._closed : bool = False
60+ # _ending signals that no new streams will be added,
61+ # but we continue processing until all streams are exhausted.
62+ self ._ending : bool = False
6263 self ._mixer_task : asyncio .Task = asyncio .create_task (self ._mixer ())
6364
6465 def add_stream (self , stream : AsyncIterator [AudioFrame ]) -> None :
@@ -87,25 +88,10 @@ def remove_stream(self, stream: AsyncIterator[AudioFrame]) -> None:
8788 self ._streams .discard (stream )
8889 self ._buffers .pop (stream , None )
8990
90- async def __aiter__ (self ) -> "AudioMixer" :
91- """
92- Return the async iterator interface for the mixer.
93-
94- Returns:
95- AudioMixer: The mixer itself, as it implements the async iterator protocol.
96- """
91+ def __aiter__ (self ) -> "AudioMixer" :
9792 return self
9893
9994 async def __anext__ (self ) -> AudioFrame :
100- """
101- Retrieve the next mixed AudioFrame from the output queue.
102-
103- Returns:
104- AudioFrame: The next mixed audio frame.
105-
106- Raises:
107- StopAsyncIteration: When the mixer is closed and no more frames are available.
108- """
10995 item = await self ._queue .get ()
11096 if item is None :
11197 raise StopAsyncIteration
@@ -117,7 +103,7 @@ async def aclose(self) -> None:
117103
118104 This cancels the mixing task, and any unconsumed output in the queue may be dropped.
119105 """
120- self ._closed = True
106+ self ._ending = True
121107 self ._mixer_task .cancel ()
122108 with contextlib .suppress (asyncio .CancelledError ):
123109 await self ._mixer_task
@@ -129,13 +115,18 @@ def end_input(self) -> None:
129115 This method marks the mixer as closed so that it flushes any remaining buffered output before ending.
130116 Note that existing streams will still be processed until exhausted.
131117 """
132- self ._closed = True
118+ self ._ending = True
133119
134120 async def _mixer (self ) -> None :
135- while not self ._closed :
121+ while True :
122+ # If we're in ending mode and there are no more streams, exit.
123+ if self ._ending and not self ._streams :
124+ break
125+
136126 if not self ._streams :
137127 await asyncio .sleep (0.01 )
138128 continue
129+
139130 tasks = [
140131 self ._get_contribution (
141132 stream ,
@@ -185,7 +176,7 @@ async def _get_contribution(
185176 stream .__anext__ (), timeout = self ._stream_timeout_ms / 1000
186177 )
187178 except asyncio .TimeoutError :
188- logger .warning (f"AudioMixer: stream { stream } timeout, ignoring` " )
179+ logger .warning (f"AudioMixer: stream { stream } timeout, ignoring" )
189180 break
190181 except StopAsyncIteration :
191182 exhausted = True
0 commit comments