-
Notifications
You must be signed in to change notification settings - Fork 151
Expand file tree
/
Copy pathdataChannel.ts
More file actions
104 lines (90 loc) · 2.99 KB
/
dataChannel.ts
File metadata and controls
104 lines (90 loc) · 2.99 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
import {
type ChatMessage,
type DataPublishOptions,
type LocalParticipant,
type Participant,
type Room,
type SendTextOptions,
} from 'livekit-client';
import type { Subscriber } from 'rxjs';
import { Observable, filter, map } from 'rxjs';
import { createChatObserver, createDataObserver } from './room';
import { ReceivedChatMessage } from '../components/chat';
export const DataTopic = {
CHAT: 'lk.chat',
TRANSCRIPTION: 'lk.transcription',
} as const;
/** @deprecated */
export const LegacyDataTopic = {
CHAT: 'lk-chat-topic',
} as const;
/** Publish data from the LocalParticipant. */
export async function sendMessage(
localParticipant: LocalParticipant,
payload: Uint8Array,
options: DataPublishOptions = {},
) {
const { reliable, destinationIdentities, topic } = options;
await localParticipant.publishData(payload, {
destinationIdentities,
topic,
reliable,
});
}
export interface BaseDataMessage<T extends string | undefined> {
topic?: T;
payload: Uint8Array;
}
export interface ReceivedDataMessage<T extends string | undefined = string>
extends BaseDataMessage<T> {
from?: Participant;
}
export function setupDataMessageHandler<T extends string>(
room: Room,
topic?: T | [T, ...T[]],
onMessage?: (msg: ReceivedDataMessage<T>) => void,
) {
const topics = Array.isArray(topic) ? topic : [topic];
/** Setup a Observable that returns all data messages belonging to a topic. */
const messageObservable = createDataObserver(room).pipe(
filter(
([, , , messageTopic]) =>
topic === undefined || (messageTopic !== undefined && topics.includes(messageTopic as T)),
),
map(([payload, participant, , messageTopic]) => {
const msg = {
payload,
topic: messageTopic as T,
from: participant,
} satisfies ReceivedDataMessage<T>;
onMessage?.(msg);
return msg;
}),
);
let isSendingSubscriber: Subscriber<boolean>;
const isSendingObservable = new Observable<boolean>((subscriber) => {
isSendingSubscriber = subscriber;
});
const send = async (payload: Uint8Array, options: DataPublishOptions = {}) => {
isSendingSubscriber.next(true);
try {
await sendMessage(room.localParticipant, payload, { topic: topics[0], ...options });
} finally {
isSendingSubscriber.next(false);
}
};
return { messageObservable, isSendingObservable, send };
}
export function setupChatMessageHandler(room: Room) {
const chatObservable = createChatObserver(room);
const send = async (text: string, options: SendTextOptions): Promise<ReceivedChatMessage> => {
const msg = await room.localParticipant.sendChatMessage(text, options);
await room.localParticipant.sendText(text, options);
return { ...msg, from: room.localParticipant, attachedFiles: options.attachments };
};
const edit = async (text: string, originalMsg: ChatMessage) => {
const msg = await room.localParticipant.editChatMessage(text, originalMsg);
return msg;
};
return { chatObservable, send, edit };
}