Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 69 additions & 0 deletions internal/hls/hls.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,71 @@ func Init() {
api.HandleFunc("api/hls/segment.m4s", handlerSegmentMP4)

ws.HandleFunc("hls", handlerWSHLS)

// Start session cleanup goroutine
go sessionCleanup()
}

var log zerolog.Logger

const keepalive = 5 * time.Second

// MaxSessions limits total concurrent HLS sessions to prevent memory exhaustion
const MaxSessions = 100

// once I saw 404 on MP4 segment, so better to use mutex
var sessions = map[string]*Session{}
var sessionsMu sync.RWMutex

// sessionCleanup periodically checks for and removes stale sessions
func sessionCleanup() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()

for range ticker.C {
now := time.Now()
var staleIDs []string

sessionsMu.RLock()
count := len(sessions)
for id, session := range sessions {
// Check if session has exceeded maximum age
if now.Sub(session.createdAt) > MaxSessionAge {
staleIDs = append(staleIDs, id)
}
}
sessionsMu.RUnlock()

// Remove stale sessions
if len(staleIDs) > 0 {
sessionsMu.Lock()
for _, id := range staleIDs {
if session, ok := sessions[id]; ok {
// Stop the alive timer to prevent double cleanup
if session.alive != nil {
session.alive.Stop()
}
delete(sessions, id)
log.Info().Str("id", id).Dur("age", now.Sub(session.createdAt)).
Msg("[hls] cleaned up stale session")
}
}
sessionsMu.Unlock()
}

if count > 0 {
log.Debug().Int("sessions", count).Int("stale_removed", len(staleIDs)).
Msg("[hls] active sessions")
}

// If we have too many sessions, log a warning
if count > MaxSessions/2 {
log.Warn().Int("sessions", count).Int("max", MaxSessions).
Msg("[hls] high session count - possible leak")
}
}
}

func handlerStream(w http.ResponseWriter, r *http.Request) {
// CORS important for Chromecast
w.Header().Set("Access-Control-Allow-Origin", "*")
Expand All @@ -49,6 +104,17 @@ func handlerStream(w http.ResponseWriter, r *http.Request) {
return
}

// Check session limit before creating new session
sessionsMu.RLock()
sessionCount := len(sessions)
sessionsMu.RUnlock()

if sessionCount >= MaxSessions {
log.Warn().Int("sessions", sessionCount).Msg("[hls] max sessions reached, rejecting new request")
http.Error(w, "too many HLS sessions", http.StatusServiceUnavailable)
return
}

src := r.URL.Query().Get("src")
stream := streams.Get(src)
if stream == nil {
Expand Down Expand Up @@ -84,6 +150,7 @@ func handlerStream(w http.ResponseWriter, r *http.Request) {
sessionsMu.Unlock()

stream.RemoveConsumer(cons)
log.Debug().Str("id", session.id).Msg("[hls] session expired and cleaned up")
})

sessionsMu.Lock()
Expand All @@ -92,6 +159,8 @@ func handlerStream(w http.ResponseWriter, r *http.Request) {

go session.Run()

log.Debug().Str("id", session.id).Str("src", src).Msg("[hls] new session created")

if _, err := w.Write(session.Main()); err != nil {
log.Error().Err(err).Caller().Send()
}
Expand Down
59 changes: 46 additions & 13 deletions internal/hls/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,33 @@ import (
"github.com/AlexxIT/go2rtc/pkg/mp4"
)

// MaxBufferSize limits the HLS segment buffer to prevent memory leaks
// when clients don't fetch segments. 16MB should be enough for ~30 seconds
// of high-quality video at typical bitrates.
const MaxBufferSize = 16 * 1024 * 1024

// MaxSessionAge is the maximum time a session can exist before being forcefully cleaned up
// This prevents orphaned sessions from leaking memory
const MaxSessionAge = 24 * time.Hour

type Session struct {
cons core.Consumer
id string
template string
init []byte
buffer []byte
seq int
alive *time.Timer
mu sync.Mutex
cons core.Consumer
id string
template string
init []byte
buffer []byte
seq int
alive *time.Timer
mu sync.Mutex
dropped int // count of dropped writes due to buffer overflow
createdAt time.Time // when session was created, for cleanup
}

func NewSession(cons core.Consumer) *Session {
s := &Session{
id: core.RandString(8, 62),
cons: cons,
id: core.RandString(8, 62),
cons: cons,
createdAt: time.Now(),
}

// two segments important for Chromecast
Expand Down Expand Up @@ -55,12 +67,33 @@ segment.ts?id=` + s.id + `&n=%d`

func (s *Session) Write(p []byte) (n int, err error) {
s.mu.Lock()
defer s.mu.Unlock()

if s.init == nil {
s.init = p
} else {
s.buffer = append(s.buffer, p...)
return len(p), nil
}
s.mu.Unlock()

// Check if adding this data would exceed the buffer limit
if len(s.buffer)+len(p) > MaxBufferSize {
// Buffer is full - drop old data to make room
// This prevents unbounded memory growth when clients don't consume segments
s.dropped++

// If buffer is way too big, reset it entirely
if len(s.buffer) > MaxBufferSize {
s.buffer = nil
} else {
// Trim the beginning of the buffer to make room
trimSize := len(p)
if trimSize > len(s.buffer) {
trimSize = len(s.buffer)
}
s.buffer = s.buffer[trimSize:]
}
}

s.buffer = append(s.buffer, p...)
return len(p), nil
}

Expand Down
28 changes: 25 additions & 3 deletions pkg/core/writebuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ import (
"sync"
)

// MaxWriteBufferSize limits the internal buffer size to prevent memory leaks
// when data is written but not consumed. 8MB should be enough for buffering.
const MaxWriteBufferSize = 8 * 1024 * 1024

// WriteBuffer by defaul Write(s) to bytes.Buffer.
// But after WriteTo to new io.Writer - calls Reset.
// Reset will flush current buffer data to new writer and starts to Write to new io.Writer
Expand All @@ -28,15 +32,33 @@ func NewWriteBuffer(wr io.Writer) *WriteBuffer {

func (w *WriteBuffer) Write(p []byte) (n int, err error) {
w.mu.Lock()
defer w.mu.Unlock()

if w.err != nil {
err = w.err
} else if n, err = w.Writer.Write(p); err != nil {
return 0, w.err
}

// Check if we're writing to an internal buffer and enforce size limit
if buf, ok := w.Writer.(*bytes.Buffer); ok {
if buf.Len()+len(p) > MaxWriteBufferSize {
// Buffer is too large - truncate old data to make room
// This prevents unbounded memory growth
if buf.Len() > MaxWriteBufferSize/2 {
// Keep only the second half of the buffer
data := buf.Bytes()
buf.Reset()
buf.Write(data[len(data)/2:])
}
}
}

n, err = w.Writer.Write(p)
if err != nil {
w.err = err
w.done()
} else if f, ok := w.Writer.(http.Flusher); ok {
f.Flush()
}
w.mu.Unlock()
return
}

Expand Down
23 changes: 20 additions & 3 deletions pkg/rtsp/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,15 @@ import (
"github.com/pion/rtp"
)

// udpBufferPool reduces memory allocations for UDP reads
// TP-Link Tapo camera has crazy 10000 bytes packet size, so we use 10240
var udpBufferPool = sync.Pool{
New: func() interface{} {
buf := make([]byte, 10240)
return &buf
},
}

type Conn struct {
core.Connection
core.Listener
Expand Down Expand Up @@ -171,15 +180,23 @@ func (c *Conn) handleUDPData(channel byte) {
conn := c.udpConn[channel]

for {
// TP-Link Tapo camera has crazy 10000 bytes packet size
buf := make([]byte, 10240)
// Use buffer pool to reduce allocations
bufPtr := udpBufferPool.Get().(*[]byte)
buf := *bufPtr

n, _, err := conn.ReadFromUDP(buf)
if err != nil {
udpBufferPool.Put(bufPtr)
return
}

if err = c.handleRawPacket(channel, buf[:n]); err != nil {
// Copy the data before returning buffer to pool
// This is necessary because handleRawPacket may hold references
data := make([]byte, n)
copy(data, buf[:n])
udpBufferPool.Put(bufPtr)

if err = c.handleRawPacket(channel, data); err != nil {
return
}
}
Expand Down