Skip to content

Commit f40ba89

Browse files
authored
Adds in support for attachments and attributes fields into data streams (#1283)
1 parent 5e174e1 commit f40ba89

File tree

5 files changed

+1422
-36
lines changed

5 files changed

+1422
-36
lines changed

.changeset/deep-pugs-lick.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@livekit/components-core': patch
3+
---
4+
5+
Adds in support for attachments and attributes fields into data streams

packages/core/src/components/chat.ts

Lines changed: 85 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,18 @@
11
/* eslint-disable camelcase */
22
import type { Room, SendTextOptions } from 'livekit-client';
33
import { compareVersions, RoomEvent } from 'livekit-client';
4-
import { BehaviorSubject, Subject, scan, map, takeUntil, from, filter } from 'rxjs';
4+
import {
5+
BehaviorSubject,
6+
Subject,
7+
scan,
8+
map,
9+
takeUntil,
10+
from,
11+
filter,
12+
mergeMap,
13+
finalize,
14+
of,
15+
} from 'rxjs';
516
import {
617
DataTopic,
718
LegacyDataTopic,
@@ -10,6 +21,7 @@ import {
1021
} from '../observables/dataChannel';
1122
import { log } from '../logger';
1223
import { ChatMessage, ReceivedChatMessage } from '../messages/types';
24+
import { Future } from '../helper/future';
1325

1426
/** @public */
1527
export type { ChatMessage, ReceivedChatMessage };
@@ -44,6 +56,19 @@ export type ChatOptions = {
4456
};
4557

4658
const topicSubjectMap: WeakMap<Room, Map<string, Subject<ReceivedChatMessage>>> = new WeakMap();
59+
const streamIdToAttachments = new Map<
60+
string /* stream id */,
61+
Map<
62+
string /* attachment id */,
63+
Future<
64+
{
65+
fileName: string;
66+
buffer: Array<Uint8Array>;
67+
},
68+
never
69+
>
70+
>
71+
>();
4772

4873
function isIgnorableChatMessage(msg: ReceivedChatMessage | LegacyReceivedChatMessage) {
4974
return (msg as LegacyChatMessage).ignoreLegacy == true;
@@ -79,26 +104,82 @@ export function setupChat(room: Room, options?: ChatOptions) {
79104
const finalMessageDecoder = options?.messageDecoder ?? decodeLegacyMsg;
80105
if (needsSetup) {
81106
room.registerTextStreamHandler(topic, async (reader, participantInfo) => {
82-
const { id, timestamp } = reader.info;
107+
const { id, timestamp, attributes, attachedStreamIds } = reader.info;
108+
109+
// Store a future for each attachment to be later resolved once the corresponding file data
110+
// stream completes.
111+
const attachments = new Map(
112+
(attachedStreamIds ?? []).map((id) => [
113+
id,
114+
new Future<{ fileName: string; buffer: Array<Uint8Array> }, never>(),
115+
]),
116+
);
117+
streamIdToAttachments.set(id, attachments);
118+
83119
const streamObservable = from(reader).pipe(
84120
scan((acc: string, chunk: string) => {
85121
return acc + chunk;
86122
}),
87-
map((chunk: string) => {
123+
mergeMap((chunk: string) => {
124+
if (attachments.size === 0) {
125+
return of({ chunk, attachedFiles: [] });
126+
} else {
127+
// Aggregate all attachments into memory and transform them into a list of files
128+
return from(attachments.values()).pipe(
129+
mergeMap((attachment) => from(attachment.promise)),
130+
scan(
131+
(acc, attachment) => [...acc, new File(attachment.buffer, attachment.fileName)],
132+
[] as Array<File>,
133+
),
134+
map((attachedFiles) => ({ chunk, attachedFiles })),
135+
);
136+
}
137+
}),
138+
map(({ chunk, attachedFiles }) => {
88139
return {
89140
id,
90141
timestamp,
91142
message: chunk,
92143
from: room.getParticipantByIdentity(participantInfo.identity),
93144
type: 'chatMessage',
145+
attributes,
146+
attachedFiles,
94147
// editTimestamp: type === 'update' ? timestamp : undefined,
95148
} satisfies ReceivedChatMessage;
96149
}),
150+
finalize(() => streamIdToAttachments.delete(id)),
97151
);
98152
streamObservable.subscribe({
99153
next: (value) => messageSubject.next(value),
100154
});
101155
});
156+
// NOTE: Attachment byte streams are guaranteed to arrive after their parent text stream
157+
// has initialized the attachment map (per client SDK sending implementation)
158+
room.registerByteStreamHandler(topic, async (reader) => {
159+
const { id: attachmentStreamId } = reader.info;
160+
const foundStreamAttachmentPair = Array.from(streamIdToAttachments).find(([, attachments]) =>
161+
attachments.has(attachmentStreamId),
162+
);
163+
if (!foundStreamAttachmentPair) {
164+
return;
165+
}
166+
const streamId = foundStreamAttachmentPair[0];
167+
168+
const bufferList = [];
169+
for await (const buffer of reader) {
170+
bufferList.push(buffer);
171+
}
172+
173+
const attachment = streamIdToAttachments.get(streamId)?.get(attachmentStreamId);
174+
if (!attachment) {
175+
return;
176+
}
177+
178+
attachment.resolve?.({
179+
fileName: reader.info.name,
180+
buffer: bufferList,
181+
});
182+
});
102183

103184
/** legacy chat protocol handling */
104185
const { messageObservable } = setupDataMessageHandler(room, [legacyTopic]);
@@ -204,6 +285,7 @@ export function setupChat(room: Room, options?: ChatOptions) {
204285
messageSubject.complete();
205286
topicSubjectMap.delete(room);
206287
room.unregisterTextStreamHandler(topic);
288+
room.unregisterByteStreamHandler(topic);
207289
}
208290
room.once(RoomEvent.Disconnected, destroy);
209291

packages/core/src/helper/future.ts

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
export class Future<T, E extends Error> {
2+
promise: Promise<T>;
3+
4+
resolve?: (arg: T) => void;
5+
6+
reject?: (e: E) => void;
7+
8+
onFinally?: () => void;
9+
10+
get isResolved(): boolean {
11+
return this._isResolved;
12+
}
13+
14+
private _isResolved: boolean = false;
15+
16+
constructor(
17+
futureBase?: (resolve: (arg: T) => void, reject: (e: E) => void) => void,
18+
onFinally?: () => void,
19+
) {
20+
this.onFinally = onFinally;
21+
this.promise = new Promise<T>(async (resolve, reject) => {
22+
this.resolve = resolve;
23+
this.reject = reject;
24+
if (futureBase) {
25+
await futureBase(resolve, reject);
26+
}
27+
}).finally(() => {
28+
this._isResolved = true;
29+
this.onFinally?.();
30+
});
31+
}
32+
}

0 commit comments

Comments
 (0)