Skip to content

Commit 673ee68

Browse files
authored
Add support for datastream based chat (#1096)
1 parent c43f9ee commit 673ee68

File tree

20 files changed

+599
-217
lines changed

20 files changed

+599
-217
lines changed
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
---
2+
"@livekit/components-core": patch
3+
"@livekit/components-react": minor
4+
"@livekit/components-styles": patch
5+
---
6+
7+
Add support for datastream based chat

package.json

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,13 @@
3434
"@livekit/changesets-changelog-github": "^0.0.4",
3535
"@rushstack/heft": "^0.68.0",
3636
"glob": "^11.0.0",
37+
"globals": "^15.14.0",
3738
"husky": "^8.0.3",
3839
"nodemon": "^3.0.3",
3940
"prettier": "^3.2.5",
4041
"turbo": "^2.1.1",
41-
"typescript": "5.7.3"
42+
"typescript": "5.7.3",
43+
"typescript-eslint": "^8.24.0"
4244
},
4345
"engines": {
4446
"node": ">=18"

packages/core/etc/components-core.api.md

Lines changed: 29 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import { ChatMessage } from 'livekit-client';
1010
import { ConnectionQuality } from 'livekit-client';
1111
import { ConnectionState } from 'livekit-client';
1212
import { DataPacket_Kind } from 'livekit-client';
13-
import type { DataPublishOptions } from 'livekit-client';
13+
import { DataPublishOptions } from 'livekit-client';
1414
import { LocalAudioTrack } from 'livekit-client';
1515
import { LocalParticipant } from 'livekit-client';
1616
import { LocalVideoTrack } from 'livekit-client';
@@ -27,7 +27,9 @@ import { Room } from 'livekit-client';
2727
import { RoomEvent } from 'livekit-client';
2828
import type { RoomEventCallbacks } from 'livekit-client/dist/src/room/Room';
2929
import type { ScreenShareCaptureOptions } from 'livekit-client';
30+
import { SendTextOptions } from 'livekit-client';
3031
import { setLogLevel as setLogLevel_2 } from 'livekit-client';
32+
import type { TextStreamInfo } from 'livekit-client/dist/src/room/types';
3133
import { Track } from 'livekit-client';
3234
import { TrackEvent as TrackEvent_2 } from 'livekit-client';
3335
import { TrackPublication } from 'livekit-client';
@@ -156,8 +158,7 @@ export const cssPrefix = "lk";
156158

157159
// @public (undocumented)
158160
export const DataTopic: {
159-
readonly CHAT: "lk-chat-topic";
160-
readonly CHAT_UPDATE: "lk-chat-update-topic";
161+
readonly CHAT: "lk.chat";
161162
};
162163

163164
// @public (undocumented)
@@ -262,13 +263,18 @@ export function isWeb(): boolean;
262263
// @public (undocumented)
263264
export interface LegacyChatMessage extends ChatMessage {
264265
// (undocumented)
265-
ignore?: boolean;
266+
ignoreLegacy?: boolean;
266267
}
267268

269+
// @public @deprecated (undocumented)
270+
export const LegacyDataTopic: {
271+
readonly CHAT: "lk-chat-topic";
272+
};
273+
268274
// @public (undocumented)
269275
export interface LegacyReceivedChatMessage extends ReceivedChatMessage {
270276
// (undocumented)
271-
ignore?: boolean;
277+
ignoreLegacy?: boolean;
272278
}
273279

274280
// @alpha
@@ -497,24 +503,19 @@ export type SetMediaDeviceOptions = {
497503
export function setupChat(room: Room, options?: ChatOptions): {
498504
messageObservable: Observable<ReceivedChatMessage[]>;
499505
isSendingObservable: BehaviorSubject<boolean>;
500-
send: (message: string) => Promise<ChatMessage>;
501-
update: (message: string, originalMessageOrId: string | ChatMessage) => Promise<{
502-
readonly message: string;
503-
readonly editTimestamp: number;
504-
readonly id: string;
505-
readonly timestamp: number;
506-
}>;
506+
send: (message: string, options?: SendTextOptions) => Promise<ReceivedChatMessage>;
507507
};
508508

509509
// @public (undocumented)
510510
export function setupChatMessageHandler(room: Room): {
511511
chatObservable: Observable<[message: ChatMessage, participant?: LocalParticipant | RemoteParticipant | undefined]>;
512-
send: (text: string) => Promise<ChatMessage>;
512+
send: (text: string, options: SendTextOptions) => Promise<ReceivedChatMessage>;
513513
edit: (text: string, originalMsg: ChatMessage) => Promise<{
514514
readonly message: string;
515515
readonly editTimestamp: number;
516516
readonly id: string;
517517
readonly timestamp: number;
518+
readonly attachedFiles?: Array<File>;
518519
}>;
519520
};
520521

@@ -620,6 +621,9 @@ export function setupStartVideo(): {
620621
handleStartVideoPlayback: (room: Room) => Promise<void>;
621622
};
622623

624+
// @public (undocumented)
625+
export function setupTextStream(room: Room, topic: string): Observable<TextStreamData[]>;
626+
623627
// @public (undocumented)
624628
export function setupTrackMutedIndicator(trackRef: TrackReferenceOrPlaceholder): {
625629
className: string;
@@ -638,6 +642,18 @@ export type SourcesArray = Track.Source[] | TrackSourceWithOptions[];
638642
// @public
639643
export function supportsScreenSharing(): boolean;
640644

645+
// @public (undocumented)
646+
export interface TextStreamData {
647+
// (undocumented)
648+
participantInfo: {
649+
identity: string;
650+
};
651+
// (undocumented)
652+
streamInfo: TextStreamInfo;
653+
// (undocumented)
654+
text: string;
655+
}
656+
641657
// @public (undocumented)
642658
export type ToggleSource = Exclude<Track.Source, Track.Source.ScreenShareAudio | Track.Source.Unknown>;
643659

Lines changed: 84 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
/* eslint-disable camelcase */
2-
import type { Participant, Room, ChatMessage } from 'livekit-client';
2+
import type { Participant, Room, ChatMessage, SendTextOptions } from 'livekit-client';
33
import { compareVersions, RoomEvent } from 'livekit-client';
4-
import { BehaviorSubject, Subject, scan, map, takeUntil, merge } from 'rxjs';
4+
import { BehaviorSubject, Subject, scan, map, takeUntil, from, filter } from 'rxjs';
55
import {
66
DataTopic,
7+
LegacyDataTopic,
78
sendMessage,
8-
setupChatMessageHandler,
99
setupDataMessageHandler,
1010
} from '../observables/dataChannel';
1111

@@ -18,11 +18,11 @@ export interface ReceivedChatMessage extends ChatMessage {
1818
}
1919

2020
export interface LegacyChatMessage extends ChatMessage {
21-
ignore?: boolean;
21+
ignoreLegacy?: boolean;
2222
}
2323

2424
export interface LegacyReceivedChatMessage extends ReceivedChatMessage {
25-
ignore?: boolean;
25+
ignoreLegacy?: boolean;
2626
}
2727

2828
/**
@@ -41,83 +41,86 @@ export type ChatOptions = {
4141
messageEncoder?: (message: LegacyChatMessage) => Uint8Array;
4242
/** @deprecated the new chat API doesn't rely on encoders and decoders anymore and uses a dedicated chat API instead */
4343
messageDecoder?: (message: Uint8Array) => LegacyReceivedChatMessage;
44-
/** @deprecated the new chat API doesn't rely on topics anymore and uses a dedicated chat API instead */
4544
channelTopic?: string;
46-
/** @deprecated the new chat API doesn't rely on topics anymore and uses a dedicated chat API instead */
45+
/** @deprecated the new chat API doesn't rely on update topics anymore and uses a dedicated chat API instead */
4746
updateChannelTopic?: string;
4847
};
4948

50-
type RawMessage = {
51-
payload: Uint8Array;
52-
topic: string | undefined;
53-
from: Participant | undefined;
54-
};
55-
56-
const encoder = new TextEncoder();
57-
const decoder = new TextDecoder();
49+
const topicSubjectMap: WeakMap<Room, Map<string, Subject<ReceivedChatMessage>>> = new WeakMap();
5850

59-
const topicSubjectMap: Map<Room, Map<string, Subject<RawMessage>>> = new Map();
51+
function isIgnorableChatMessage(msg: ReceivedChatMessage | LegacyReceivedChatMessage) {
52+
return (msg as LegacyChatMessage).ignoreLegacy == true;
53+
}
6054

61-
const encode = (message: LegacyReceivedChatMessage) => encoder.encode(JSON.stringify(message));
55+
const decodeLegacyMsg = (message: Uint8Array) =>
56+
JSON.parse(new TextDecoder().decode(message)) as LegacyReceivedChatMessage | ReceivedChatMessage;
6257

63-
const decode = (message: Uint8Array) =>
64-
JSON.parse(decoder.decode(message)) as LegacyReceivedChatMessage | ReceivedChatMessage;
58+
const encodeLegacyMsg = (message: LegacyReceivedChatMessage) =>
59+
new TextEncoder().encode(JSON.stringify(message));
6560

6661
export function setupChat(room: Room, options?: ChatOptions) {
67-
const onDestroyObservable = new Subject<void>();
68-
69-
const serverSupportsChatApi = () =>
62+
const serverSupportsDataStreams = () =>
7063
room.serverInfo?.edition === 1 ||
71-
(!!room.serverInfo?.version && compareVersions(room.serverInfo?.version, '1.17.2') > 0);
72-
73-
const { messageDecoder, messageEncoder, channelTopic, updateChannelTopic } = options ?? {};
64+
(!!room.serverInfo?.version && compareVersions(room.serverInfo?.version, '1.8.2') > 0);
7465

75-
const topic = channelTopic ?? DataTopic.CHAT;
66+
const onDestroyObservable = new Subject<void>();
7667

77-
const updateTopic = updateChannelTopic ?? DataTopic.CHAT_UPDATE;
68+
const topic = options?.channelTopic ?? DataTopic.CHAT;
69+
const legacyTopic = options?.channelTopic ?? LegacyDataTopic.CHAT;
7870

7971
let needsSetup = false;
8072
if (!topicSubjectMap.has(room)) {
8173
needsSetup = true;
8274
}
83-
const topicMap = topicSubjectMap.get(room) ?? new Map<string, Subject<RawMessage>>();
84-
const messageSubject = topicMap.get(topic) ?? new Subject<RawMessage>();
75+
const topicMap = topicSubjectMap.get(room) ?? new Map<string, Subject<ReceivedChatMessage>>();
76+
const messageSubject = topicMap.get(topic) ?? new Subject<ReceivedChatMessage>();
8577
topicMap.set(topic, messageSubject);
8678
topicSubjectMap.set(room, topicMap);
8779

80+
const finalMessageDecoder = options?.messageDecoder ?? decodeLegacyMsg;
8881
if (needsSetup) {
89-
/** Subscribe to all appropriate messages sent over the wire. */
90-
const { messageObservable } = setupDataMessageHandler(room, [topic, updateTopic]);
91-
messageObservable.pipe(takeUntil(onDestroyObservable)).subscribe(messageSubject);
82+
room.registerTextStreamHandler(topic, async (reader, participantInfo) => {
83+
const { id, timestamp } = reader.info;
84+
const streamObservable = from(reader).pipe(
85+
scan((acc: string, chunk: string) => {
86+
return acc + chunk;
87+
}),
88+
map((chunk: string) => {
89+
return {
90+
id,
91+
timestamp,
92+
message: chunk,
93+
from: room.getParticipantByIdentity(participantInfo.identity),
94+
// editTimestamp: type === 'update' ? timestamp : undefined,
95+
} as ReceivedChatMessage;
96+
}),
97+
);
98+
streamObservable.subscribe({
99+
next: (value) => messageSubject.next(value),
100+
});
101+
});
102+
103+
/** legacy chat protocol handling */
104+
const { messageObservable } = setupDataMessageHandler(room, [legacyTopic]);
105+
messageObservable
106+
.pipe(
107+
map((msg) => {
108+
const parsedMessage = finalMessageDecoder(msg.payload);
109+
if (isIgnorableChatMessage(parsedMessage)) {
110+
return undefined;
111+
}
112+
const newMessage: ReceivedChatMessage = { ...parsedMessage, from: msg.from };
113+
return newMessage;
114+
}),
115+
filter((msg) => !!msg),
116+
takeUntil(onDestroyObservable),
117+
)
118+
.subscribe(messageSubject);
92119
}
93-
const { chatObservable, send: sendChatMessage } = setupChatMessageHandler(room);
94-
95-
const finalMessageDecoder = messageDecoder ?? decode;
96120

97121
/** Build up the message array over time. */
98-
const messagesObservable = merge(
99-
messageSubject.pipe(
100-
map((msg) => {
101-
const parsedMessage = finalMessageDecoder(msg.payload);
102-
const newMessage = { ...parsedMessage, from: msg.from };
103-
if (isIgnorableChatMessage(newMessage)) {
104-
return undefined;
105-
}
106-
return newMessage;
107-
}),
108-
),
109-
chatObservable.pipe(
110-
map(([msg, participant]) => {
111-
return { ...msg, from: participant };
112-
}),
113-
),
114-
).pipe(
115-
scan<ReceivedChatMessage | undefined, ReceivedChatMessage[]>((acc, value) => {
116-
// ignore legacy message updates
117-
if (!value) {
118-
return acc;
119-
}
120-
// handle message updates
122+
const messagesObservable = messageSubject.pipe(
123+
scan<ReceivedChatMessage, ReceivedChatMessage[]>((acc, value) => {
121124
if (
122125
'id' in value &&
123126
acc.find((msg) => msg.from?.identity === value.from?.identity && msg.id === value.id)
@@ -128,10 +131,9 @@ export function setupChat(room: Room, options?: ChatOptions) {
128131
acc[replaceIndex] = {
129132
...value,
130133
timestamp: originalMsg.timestamp,
131-
editTimestamp: value.editTimestamp ?? value.timestamp,
134+
editTimestamp: value.timestamp,
132135
};
133136
}
134-
135137
return [...acc];
136138
}
137139
return [...acc, value];
@@ -140,42 +142,34 @@ export function setupChat(room: Room, options?: ChatOptions) {
140142
);
141143

142144
const isSending$ = new BehaviorSubject<boolean>(false);
145+
const finalMessageEncoder = options?.messageEncoder ?? encodeLegacyMsg;
143146

144-
const finalMessageEncoder = messageEncoder ?? encode;
145-
146-
const send = async (message: string) => {
147+
const send = async (message: string, options?: SendTextOptions) => {
148+
if (!options) {
149+
options = {};
150+
}
151+
options.topic ??= topic;
147152
isSending$.next(true);
153+
148154
try {
149-
const chatMessage = await sendChatMessage(message);
155+
const info = await room.localParticipant.sendText(message, options);
156+
const chatMsg: ReceivedChatMessage = {
157+
id: info.id,
158+
timestamp: Date.now(),
159+
message,
160+
from: room.localParticipant,
161+
attachedFiles: options.attachments,
162+
};
163+
messageSubject.next(chatMsg);
150164
const encodedLegacyMsg = finalMessageEncoder({
151-
...chatMessage,
152-
ignore: serverSupportsChatApi(),
165+
...chatMsg,
166+
ignoreLegacy: serverSupportsDataStreams(),
153167
});
154168
await sendMessage(room.localParticipant, encodedLegacyMsg, {
155169
reliable: true,
156-
topic,
170+
topic: legacyTopic,
157171
});
158-
return chatMessage;
159-
} finally {
160-
isSending$.next(false);
161-
}
162-
};
163-
164-
const update = async (message: string, originalMessageOrId: string | ChatMessage) => {
165-
const timestamp = Date.now();
166-
const originalMessage: ChatMessage =
167-
typeof originalMessageOrId === 'string'
168-
? { id: originalMessageOrId, message: '', timestamp }
169-
: originalMessageOrId;
170-
isSending$.next(true);
171-
try {
172-
const editedMessage = await room.localParticipant.editChatMessage(message, originalMessage);
173-
const encodedLegacyMessage = finalMessageEncoder(editedMessage);
174-
await sendMessage(room.localParticipant, encodedLegacyMessage, {
175-
topic: updateTopic,
176-
reliable: true,
177-
});
178-
return editedMessage;
172+
return chatMsg;
179173
} finally {
180174
isSending$.next(false);
181175
}
@@ -184,20 +178,15 @@ export function setupChat(room: Room, options?: ChatOptions) {
184178
function destroy() {
185179
onDestroyObservable.next();
186180
onDestroyObservable.complete();
181+
messageSubject.complete();
187182
topicSubjectMap.delete(room);
183+
room.unregisterTextStreamHandler(topic);
188184
}
189185
room.once(RoomEvent.Disconnected, destroy);
190186

191187
return {
192188
messageObservable: messagesObservable,
193189
isSendingObservable: isSending$,
194190
send,
195-
update,
196191
};
197192
}
198-
199-
function isIgnorableChatMessage(
200-
msg: ReceivedChatMessage | LegacyReceivedChatMessage,
201-
): msg is ReceivedChatMessage {
202-
return (msg as LegacyChatMessage).ignore == true;
203-
}

0 commit comments

Comments
 (0)