Skip to content

Commit 25e6410

Browse files
committed
Add support for datastream based chat
1 parent fe1d072 commit 25e6410

File tree

1 file changed

+45
-7
lines changed

1 file changed

+45
-7
lines changed

livekit-rtc/livekit/rtc/chat.py

Lines changed: 45 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,15 @@
1717
import json
1818
import logging
1919
from typing import Any, Dict, Literal, Optional
20+
import asyncio
2021

21-
from .room import Room, Participant, DataPacket
22+
from .room import Room, Participant, DataPacket, TextStreamReader, RemoteParticipant, LocalParticipant
2223
from .event_emitter import EventEmitter
2324
from ._utils import generate_random_base62
2425

25-
_CHAT_TOPIC = "lk-chat-topic"
26-
_CHAT_UPDATE_TOPIC = "lk-chat-update-topic"
26+
_LEGACY_CHAT_TOPIC = "lk-chat-topic"
27+
_LEGACY_CHAT_UPDATE_TOPIC = "lk-chat-update-topic"
28+
_DS_CHAT_TOPIC = "lk.chat"
2729

2830
EventTypes = Literal["message_received",]
2931

@@ -38,11 +40,17 @@ def __init__(self, room: Room):
3840
super().__init__()
3941
self._lp = room.local_participant
4042
self._room = room
43+
self._tasks: list[asyncio.Task] = []
4144

4245
room.on("data_received", self._on_data_received)
46+
room.unregister_text_stream_handler(_DS_CHAT_TOPIC)
47+
room.register_text_stream_handler(_DS_CHAT_TOPIC, self._on_text_stream_received)
4348

4449
def close(self):
4550
self._room.off("data_received", self._on_data_received)
51+
self._room.unregister_text_stream_handler(_DS_CHAT_TOPIC)
52+
for task in self._tasks:
53+
task.cancel()
4654

4755
async def send_message(self, message: str) -> "ChatMessage":
4856
"""Send a chat message to the end user using LiveKit Chat Protocol.
@@ -58,9 +66,12 @@ async def send_message(self, message: str) -> "ChatMessage":
5866
is_local=True,
5967
participant=self._lp,
6068
)
69+
msg_dict = msg.asjsondict()
70+
msg_dict["ignoreLegacy"] = True
71+
await self._lp.send_text(message, topic=_DS_CHAT_TOPIC)
6172
await self._lp.publish_data(
62-
payload=json.dumps(msg.asjsondict()),
63-
topic=_CHAT_TOPIC,
73+
payload=json.dumps(msg_dict),
74+
topic=_LEGACY_CHAT_TOPIC,
6475
)
6576
return msg
6677

@@ -72,21 +83,38 @@ async def update_message(self, message: "ChatMessage"):
7283
"""
7384
await self._lp.publish_data(
7485
payload=json.dumps(message.asjsondict()),
75-
topic=_CHAT_UPDATE_TOPIC,
86+
topic=_LEGACY_CHAT_UPDATE_TOPIC,
7687
)
7788

7889
def _on_data_received(self, dp: DataPacket):
7990
# handle both new and updates the same way, as long as the ID is in there
8091
# the user can decide how to replace the previous message
81-
if dp.topic == _CHAT_TOPIC or dp.topic == _CHAT_UPDATE_TOPIC:
92+
if dp.topic == _LEGACY_CHAT_TOPIC or dp.topic == _LEGACY_CHAT_UPDATE_TOPIC:
8293
try:
8394
parsed = json.loads(dp.data)
95+
# if the message is marked as ignoreLegacy, we'll skip it
96+
if parsed.get("ignoreLegacy"):
97+
return
8498
msg = ChatMessage.from_jsondict(parsed)
8599
if dp.participant:
86100
msg.participant = dp.participant
87101
self.emit("message_received", msg)
88102
except Exception as e:
89103
logging.warning("failed to parse chat message: %s", e, exc_info=e)
104+
105+
def _on_text_stream_received(self, stream: TextStreamReader, participant_identity: str):
106+
task = asyncio.create_task(self._handle_text_stream(stream, participant_identity))
107+
self._tasks.append(task)
108+
task.add_done_callback(self._tasks.remove)
109+
110+
async def _handle_text_stream(self, stream: TextStreamReader, participant_identity: str):
111+
msg = await ChatMessage.from_text_stream(stream)
112+
participant: RemoteParticipant | LocalParticipant | None = self._room._remote_participants.get(participant_identity)
113+
if participant is None and self._room.local_participant.identity == participant_identity:
114+
participant = self._room.local_participant
115+
msg.is_local = True
116+
msg.participant = participant
117+
self.emit("message_received", msg)
90118

91119

92120
@dataclass
@@ -129,3 +157,13 @@ def asjsondict(self):
129157
if self.deleted:
130158
d["deleted"] = True
131159
return d
160+
161+
@classmethod
162+
async def from_text_stream(cls, stream: TextStreamReader):
163+
message_text = await stream.read_all()
164+
timestamp = datetime.fromtimestamp(stream.info.timestamp / 1000.0)
165+
return cls(
166+
message=message_text,
167+
timestamp=timestamp,
168+
id=stream.info.stream_id,
169+
)

0 commit comments

Comments
 (0)