Skip to content

Commit df40d5d

Browse files
committed
fix(datastream): reader close, writer goroutine leak
1 parent 411a594 commit df40d5d

File tree

2 files changed

+45
-11
lines changed

2 files changed

+45
-11
lines changed

room.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -638,10 +638,25 @@ func (r *Room) cleanup() {
638638
r.engine.Close()
639639
r.LocalParticipant.closeTracks()
640640
r.setSid("", true)
641+
641642
r.byteStreamHandlers.Clear()
643+
r.byteStreamReaders.Range(func(key, value any) bool {
644+
if reader, ok := value.(*ByteStreamReader); ok {
645+
reader.close()
646+
}
647+
return true
648+
})
642649
r.byteStreamReaders.Clear()
650+
643651
r.textStreamHandlers.Clear()
652+
r.textStreamReaders.Range(func(key, value any) bool {
653+
if reader, ok := value.(*TextStreamReader); ok {
654+
reader.close()
655+
}
656+
return true
657+
})
644658
r.textStreamReaders.Clear()
659+
645660
r.rpcHandlers.Clear()
646661
r.LocalParticipant.cleanup()
647662
}

streams.go

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"sync"
77
"sync/atomic"
88

9+
"github.com/frostbyte73/core"
910
protocol "github.com/livekit/protocol/livekit"
1011
)
1112

@@ -104,7 +105,7 @@ type baseStreamWriter[T any] struct {
104105
onProgress func(progress float64)
105106

106107
chunkIndex uint64
107-
closed atomic.Bool
108+
closed core.Fuse
108109
lock sync.Mutex
109110

110111
writeQueue chan writeTask
@@ -128,50 +129,68 @@ func newBaseStreamWriter[T any](engine *RTCEngine, header *protocol.DataStream_H
128129

129130
// processes write queue asynchronously
130131
func (w *baseStreamWriter[T]) processWriteQueue() {
131-
for task := range w.writeQueue {
132-
w.writeStreamBytes(task.chunks, task.onDone)
132+
for {
133+
select {
134+
case task := <-w.writeQueue:
135+
w.writeStreamBytes(task.chunks, task.onDone)
136+
case <-w.closed.Watch():
137+
// Drain any pending tasks - we don't want to send data after the trailer
138+
for {
139+
select {
140+
case <-w.writeQueue:
141+
default:
142+
return
143+
}
144+
}
145+
}
133146
}
134147
}
135148

136149
// Write data to the stream, data can be a byte slice or a string
137150
// depending on the type of the stream writer
138151
// onDone is a callback function that will be called when the data provided is written to the stream
139152
func (w *baseStreamWriter[T]) Write(data T, onDone *func()) {
140-
if w.closed.Load() {
153+
if w.closed.IsBroken() {
141154
return
142155
}
143156

157+
var task writeTask
144158
switch v := any(data).(type) {
145159
case []byte:
146-
w.writeQueue <- writeTask{
160+
task = writeTask{
147161
chunks: chunkBytes(v),
148162
onDone: onDone,
149163
}
150164
case string:
151-
w.writeQueue <- writeTask{
165+
task = writeTask{
152166
chunks: chunkUtf8String(v),
153167
onDone: onDone,
154168
}
169+
default:
170+
return
171+
}
172+
173+
select {
174+
case w.writeQueue <- task:
175+
case <-w.closed.Watch():
155176
}
156177
}
157178

158179
// Close the stream, this will send a stream trailer to notify the receiver that the stream is closed
159180
func (w *baseStreamWriter[T]) Close() {
160-
if !w.closed.Load() {
161-
w.closed.Store(true)
162-
181+
w.closed.Once(func() {
163182
w.lock.Lock()
164183
w.engine.publishStreamTrailer(w.streamId, w.destinationIdentities)
165184
w.lock.Unlock()
166-
}
185+
})
167186
}
168187

169188
// writes a list of chunks to the stream
170189
func (w *baseStreamWriter[T]) writeStreamBytes(chunks [][]byte, onDone *func()) {
171190
w.lock.Lock()
172191
chunkIndex := w.chunkIndex
173192

174-
for i := 0; i < len(chunks) && !w.closed.Load(); i++ {
193+
for i := 0; i < len(chunks) && !w.closed.IsBroken(); i++ {
175194
chunk := chunks[i]
176195

177196
w.engine.waitForBufferStatusLow(protocol.DataPacket_RELIABLE)

0 commit comments

Comments
 (0)