Skip to content

Commit 7612538

Browse files
committed
Add stream_id and sender_identity to stream_text params
1 parent a766014 commit 7612538

File tree

2 files changed

+14
-4
lines changed

2 files changed

+14
-4
lines changed

livekit-rtc/livekit/rtc/data_stream.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
if TYPE_CHECKING:
3131
from .participant import LocalParticipant
3232

33-
3433
STREAM_CHUNK_SIZE = 15_000
3534

3635

@@ -146,6 +145,7 @@ def __init__(
146145
total_size: int | None = None,
147146
mime_type: str = "",
148147
destination_identities: Optional[List[str]] = None,
148+
sender_identity: str | None = None,
149149
):
150150
self._local_participant = local_participant
151151
if stream_id is None:
@@ -161,14 +161,15 @@ def __init__(
161161
)
162162
self._next_chunk_index: int = 0
163163
self._destination_identities = destination_identities
164+
self._sender_identity = sender_identity or self._local_participant.identity
164165

165166
async def _send_header(self):
166167
req = proto_ffi.FfiRequest(
167168
send_stream_header=proto_room.SendStreamHeaderRequest(
168169
header=self._header,
169170
local_participant_handle=self._local_participant._ffi_handle.handle,
170171
destination_identities=self._destination_identities,
171-
sender_identity=self._local_participant.identity,
172+
sender_identity=self._sender_identity,
172173
)
173174
)
174175

@@ -230,10 +231,12 @@ async def _send_trailer(self, trailer: proto_DataStream.Trailer):
230231
if cb.send_stream_chunk.error:
231232
raise ConnectionError(cb.send_stream_trailer.error)
232233

233-
async def aclose(self):
234+
async def aclose(
235+
self, *, reason: str = "", attributes: Optional[Dict[str, str]] = None
236+
):
234237
await self._send_trailer(
235238
trailer=proto_DataStream.Trailer(
236-
stream_id=self._header.stream_id, reason=""
239+
stream_id=self._header.stream_id, reason=reason, attributes=attributes
237240
)
238241
)
239242

@@ -249,6 +252,7 @@ def __init__(
249252
total_size: int | None = None,
250253
reply_to_id: str | None = None,
251254
destination_identities: Optional[List[str]] = None,
255+
sender_identity: str | None = None,
252256
) -> None:
253257
super().__init__(
254258
local_participant,
@@ -258,6 +262,7 @@ def __init__(
258262
total_size,
259263
mime_type="text/plain",
260264
destination_identities=destination_identities,
265+
sender_identity=sender_identity,
261266
)
262267
self._header.text_header.operation_type = proto_DataStream.OperationType.CREATE
263268
if reply_to_id:
@@ -276,6 +281,7 @@ def __init__(
276281
async def write(self, text: str):
277282
async with self._write_lock:
278283
for chunk in split_utf8(text, STREAM_CHUNK_SIZE):
284+
logger.info("Sending chunk: %s", chunk)
279285
content = chunk.encode()
280286
chunk_index = self._next_chunk_index
281287
self._next_chunk_index += 1

livekit-rtc/livekit/rtc/participant.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -585,8 +585,10 @@ async def stream_text(
585585
destination_identities: Optional[List[str]] = None,
586586
topic: str = "",
587587
attributes: Optional[Dict[str, str]] = None,
588+
stream_id: str | None = None,
588589
reply_to_id: str | None = None,
589590
total_size: int | None = None,
591+
sender_identity: str | None = None,
590592
) -> TextStreamWriter:
591593
"""
592594
Returns a TextStreamWriter that allows to write individual chunks of text to a text stream.
@@ -599,6 +601,8 @@ async def stream_text(
599601
reply_to_id=reply_to_id,
600602
destination_identities=destination_identities,
601603
total_size=total_size,
604+
stream_id=stream_id,
605+
sender_identity=sender_identity,
602606
)
603607

604608
await writer._send_header()

0 commit comments

Comments
 (0)