Conversation
🦋 Changeset detectedLatest commit: 61763fe The changes in this PR will be included in the next version bump. This PR includes changesets to release 6 packages
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
davidzhao
left a comment
There was a problem hiding this comment.
nice find! this makes sense.
could you do me a favor and:
- indicate in the PR description how this leaks, and share any test results
- also fix the same for AudioStream
|
I added some log statements that helped me debug the issue. For future reference. This is wrong we should be adding Frame 6 to the queue instead of passing it to the resolver that we created for Frame 5 |
|
@davidzhao I wonder if it's cleaner to try and abstract away the Async Queue part of the logic into a separate class. Something like export class VideoStream implements AsyncIterableIterator<VideoFrameEvent> {
private asyncQueue = new AsyncQueue<VideoFrameEvent | null>();
constructor(track: Track) {
// Setup code...
FfiClient.instance.on(FfiClientEvent.FfiEvent, (ev) => {
if (ev.message.case === 'videoStreamEvent' &&
ev.message.value.streamHandle === this.ffiHandle.handle) {
if (ev.message.value.message.case === 'frameReceived') {
// Process frame and add to queue
const frameEvent = this.processFrame(ev.message.value.message.value);
this.asyncQueue.push(frameEvent);
} else if (ev.message.value.message.case === 'eos') {
this.asyncQueue.push(null); // Signal end of stream
FfiClient.instance.off(FfiClientEvent.FfiEvent, this.onEvent);
}
}
});
}
async next(): Promise<IteratorResult<VideoFrameEvent>> {
const item = await this.asyncQueue.next();
return item === null ?
{ done: true, value: undefined } :
{ done: false, value: item };
}
} |
yeah! I think that's a good idea.. this pattern will come up elsewhere too |
cool I'll do in a follow up PR. |
Issue
A memory leak was identified in the
VideoStream(andAudioStream) class where the Promise resolver for the async iterator was not being properly reset after use. This occurred in theonEventmethod when processing incoming video frames from the LiveKit server. When a frame is received and a queueResolve function exists (indicating a waiting iterator consumer), the resolver was being called but the reference was not cleared afterward. If a new Frame arrives before we callnext(), we call resolver on the stale reference learning to:next()are not actually being stored in the queue. There being tossed into a stale resolver.We can solve this by just simply clearing the reference to the resolver after it is called.
Testing
Wrote a test script that consume video from a LK server.
2210MBusage after 2 mins818MBusage after 2 mins