-
Notifications
You must be signed in to change notification settings - Fork 151
Expand file tree
/
Copy pathtextStream.ts
More file actions
106 lines (89 loc) · 3.4 KB
/
textStream.ts
File metadata and controls
106 lines (89 loc) · 3.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
import { RoomEvent, type Room } from 'livekit-client';
import type { TextStreamInfo } from 'livekit-client/dist/src/room/types';
import { from, scan, Subject, type Observable } from 'rxjs';
import { share } from 'rxjs/operators';
export interface TextStreamData {
text: string;
participantInfo: { identity: string }; // Replace with the correct type from livekit-client
streamInfo: TextStreamInfo;
}
// Singleton getters for lazy initialization
let observableCacheInstance: Map<string, Observable<TextStreamData[]>> | null = null;
let roomInstanceMapInstance: WeakMap<Room, string> | null = null;
let nextRoomId = 0;
// Get or create the observable cache
function getObservableCache(): Map<string, Observable<TextStreamData[]>> {
if (!observableCacheInstance) {
observableCacheInstance = new Map<string, Observable<TextStreamData[]>>();
}
return observableCacheInstance;
}
// Get or create the room instance map
function getRoomInstanceMap(): WeakMap<Room, string> {
if (!roomInstanceMapInstance) {
roomInstanceMapInstance = new WeakMap<Room, string>();
}
return roomInstanceMapInstance;
}
// Helper to generate cache key
function getCacheKey(room: Room, topic: string): string {
const instanceMap = getRoomInstanceMap();
// Get or create a unique ID for this room instance
let roomId = instanceMap.get(room);
if (!roomId) {
roomId = `room_${nextRoomId++}`;
instanceMap.set(room, roomId);
}
return `${roomId}:${topic}`;
}
export function setupTextStream(room: Room, topic: string): Observable<TextStreamData[]> {
const cacheKey = getCacheKey(room, topic);
const observableCache = getObservableCache();
// Check if we already have an observable for this room and topic
const existingObservable = observableCache.get(cacheKey);
if (existingObservable) {
return existingObservable;
}
const textStreamsSubject = new Subject<TextStreamData[]>();
const textStreams: TextStreamData[] = [];
room.registerTextStreamHandler(topic, async (reader, participantInfo) => {
// Create an observable from the reader
const streamObservable = from(reader).pipe(
scan((acc: string, chunk: string) => {
return acc + chunk;
}, ''),
);
// Subscribe to the stream and update our array when new chunks arrive
streamObservable.subscribe((accumulatedText) => {
// Find and update the stream in our array
const index = textStreams.findIndex((stream) => stream.streamInfo.id === reader.info.id);
if (index !== -1) {
textStreams[index] = {
...textStreams[index],
text: accumulatedText,
};
// Emit the updated array
textStreamsSubject.next([...textStreams]);
} else {
// Handle case where stream ID wasn't found (new stream)
textStreams.push({
text: accumulatedText,
participantInfo,
streamInfo: reader.info,
});
// Emit the updated array with the new stream
textStreamsSubject.next([...textStreams]);
}
});
});
// Create shared observable and store in cache
const sharedObservable = textStreamsSubject.asObservable().pipe(share());
observableCache.set(cacheKey, sharedObservable);
// Add cleanup when room is disconnected
room.once(RoomEvent.Disconnected, () => {
room.unregisterTextStreamHandler(topic);
textStreamsSubject.complete();
getObservableCache().delete(cacheKey);
});
return sharedObservable;
}