Skip to content

Commit c5e6c3c

Browse files
committed
replace RingQueue with asyncio.Queue
1 parent 0989149 commit c5e6c3c

File tree

1 file changed

+16
-12
lines changed

1 file changed

+16
-12
lines changed

livekit-rtc/livekit/rtc/data_stream.py

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
from dataclasses import dataclass
2222
from typing import AsyncIterator, Optional, TypedDict, Dict, List
2323
from ._proto.room_pb2 import DataStream as proto_DataStream
24-
from ._utils import RingQueue
2524
from ._proto import ffi_pb2 as proto_ffi
2625
from ._proto import room_pb2 as proto_room
2726
from ._ffi_client import FfiClient
@@ -59,7 +58,10 @@ class TextStreamUpdate:
5958

6059

6160
class TextStreamReader:
62-
def __init__(self, header: proto_DataStream.Header, capacity: int = 0) -> None:
61+
def __init__(
62+
self,
63+
header: proto_DataStream.Header,
64+
) -> None:
6365
self._header = header
6466
self._info = TextStreamInfo(
6567
stream_id=header.stream_id,
@@ -70,14 +72,14 @@ def __init__(self, header: proto_DataStream.Header, capacity: int = 0) -> None:
7072
attributes=dict(header.attributes),
7173
attachments=list(header.text_header.attached_stream_ids),
7274
)
73-
self._queue: RingQueue[proto_DataStream.Chunk | None] = RingQueue(capacity)
75+
self._queue: asyncio.Queue[proto_DataStream.Chunk | None] = asyncio.Queue()
7476
self._chunks: Dict[int, proto_DataStream.Chunk] = {}
7577

76-
def _on_chunk_update(self, chunk: proto_DataStream.Chunk):
77-
self._queue.put(chunk)
78+
async def _on_chunk_update(self, chunk: proto_DataStream.Chunk):
79+
await self._queue.put(chunk)
7880

79-
def _on_stream_close(self, trailer: proto_DataStream.Trailer):
80-
self._queue.put(None)
81+
async def _on_stream_close(self, trailer: proto_DataStream.Trailer):
82+
await self._queue.put(None)
8183

8284
def __aiter__(self) -> AsyncIterator[TextStreamUpdate]:
8385
return self
@@ -125,13 +127,15 @@ def __init__(self, header: proto_DataStream.Header, capacity: int = 0) -> None:
125127
attributes=dict(header.attributes),
126128
name=header.byte_header.name,
127129
)
128-
self._queue: RingQueue[proto_DataStream.Chunk | None] = RingQueue(capacity)
130+
self._queue: asyncio.Queue[proto_DataStream.Chunk | None] = asyncio.Queue(
131+
capacity
132+
)
129133

130-
def _on_chunk_update(self, chunk: proto_DataStream.Chunk):
131-
self._queue.put(chunk)
134+
async def _on_chunk_update(self, chunk: proto_DataStream.Chunk):
135+
await self._queue.put(chunk)
132136

133-
def _on_stream_close(self, trailer: proto_DataStream.Trailer):
134-
self._queue.put(None)
137+
async def _on_stream_close(self, trailer: proto_DataStream.Trailer):
138+
await self._queue.put(None)
135139

136140
def __aiter__(self) -> AsyncIterator[bytes]:
137141
return self

0 commit comments

Comments
 (0)