Skip to content

Commit 2bb3eea

Browse files
authored
fix(datastream): reader close, writer goroutine leak (#818)
* fix(datastream): reader close, writer goroutine leak * go mod tidy * update comment
1 parent 411a594 commit 2bb3eea

File tree

3 files changed

+46
-12
lines changed

3 files changed

+46
-12
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ require (
100100
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
101101
github.com/dennwc/iters v1.2.2 // indirect
102102
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
103-
github.com/frostbyte73/core v0.1.1 // indirect
103+
github.com/frostbyte73/core v0.1.1
104104
github.com/fsnotify/fsnotify v1.9.0 // indirect
105105
github.com/gammazero/deque v1.2.0
106106
github.com/go-jose/go-jose/v3 v3.0.4 // indirect

room.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -638,10 +638,25 @@ func (r *Room) cleanup() {
638638
r.engine.Close()
639639
r.LocalParticipant.closeTracks()
640640
r.setSid("", true)
641+
641642
r.byteStreamHandlers.Clear()
643+
r.byteStreamReaders.Range(func(key, value any) bool {
644+
if reader, ok := value.(*ByteStreamReader); ok {
645+
reader.close()
646+
}
647+
return true
648+
})
642649
r.byteStreamReaders.Clear()
650+
643651
r.textStreamHandlers.Clear()
652+
r.textStreamReaders.Range(func(key, value any) bool {
653+
if reader, ok := value.(*TextStreamReader); ok {
654+
reader.close()
655+
}
656+
return true
657+
})
644658
r.textStreamReaders.Clear()
659+
645660
r.rpcHandlers.Clear()
646661
r.LocalParticipant.cleanup()
647662
}

streams.go

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"sync"
77
"sync/atomic"
88

9+
"github.com/frostbyte73/core"
910
protocol "github.com/livekit/protocol/livekit"
1011
)
1112

@@ -104,7 +105,7 @@ type baseStreamWriter[T any] struct {
104105
onProgress func(progress float64)
105106

106107
chunkIndex uint64
107-
closed atomic.Bool
108+
closed core.Fuse
108109
lock sync.Mutex
109110

110111
writeQueue chan writeTask
@@ -128,50 +129,68 @@ func newBaseStreamWriter[T any](engine *RTCEngine, header *protocol.DataStream_H
128129

129130
// processes write queue asynchronously
130131
func (w *baseStreamWriter[T]) processWriteQueue() {
131-
for task := range w.writeQueue {
132-
w.writeStreamBytes(task.chunks, task.onDone)
132+
for {
133+
select {
134+
case task := <-w.writeQueue:
135+
w.writeStreamBytes(task.chunks, task.onDone)
136+
case <-w.closed.Watch():
137+
// Drain any pending tasks - close sends a trailer and no data should be sent after that
138+
for {
139+
select {
140+
case <-w.writeQueue:
141+
default:
142+
return
143+
}
144+
}
145+
}
133146
}
134147
}
135148

136149
// Write data to the stream, data can be a byte slice or a string
137150
// depending on the type of the stream writer
138151
// onDone is a callback function that will be called when the data provided is written to the stream
139152
func (w *baseStreamWriter[T]) Write(data T, onDone *func()) {
140-
if w.closed.Load() {
153+
if w.closed.IsBroken() {
141154
return
142155
}
143156

157+
var task writeTask
144158
switch v := any(data).(type) {
145159
case []byte:
146-
w.writeQueue <- writeTask{
160+
task = writeTask{
147161
chunks: chunkBytes(v),
148162
onDone: onDone,
149163
}
150164
case string:
151-
w.writeQueue <- writeTask{
165+
task = writeTask{
152166
chunks: chunkUtf8String(v),
153167
onDone: onDone,
154168
}
169+
default:
170+
return
171+
}
172+
173+
select {
174+
case w.writeQueue <- task:
175+
case <-w.closed.Watch():
155176
}
156177
}
157178

158179
// Close the stream, this will send a stream trailer to notify the receiver that the stream is closed
159180
func (w *baseStreamWriter[T]) Close() {
160-
if !w.closed.Load() {
161-
w.closed.Store(true)
162-
181+
w.closed.Once(func() {
163182
w.lock.Lock()
164183
w.engine.publishStreamTrailer(w.streamId, w.destinationIdentities)
165184
w.lock.Unlock()
166-
}
185+
})
167186
}
168187

169188
// writes a list of chunks to the stream
170189
func (w *baseStreamWriter[T]) writeStreamBytes(chunks [][]byte, onDone *func()) {
171190
w.lock.Lock()
172191
chunkIndex := w.chunkIndex
173192

174-
for i := 0; i < len(chunks) && !w.closed.Load(); i++ {
193+
for i := 0; i < len(chunks) && !w.closed.IsBroken(); i++ {
175194
chunk := chunks[i]
176195

177196
w.engine.waitForBufferStatusLow(protocol.DataPacket_RELIABLE)

0 commit comments

Comments
 (0)