Skip to content

Commit 5073274

Browse files
authored
Guard maps in legacy handler (#9)
## Overview While doing some local stress testing I actually hit the case where concurrent map writes caused us to crash out: ``` [neko] fatal error: concurrent map writes [neko] goroutine 5642 [running]: [neko] internal/runtime/maps.fatal({0x13955ef?, 0x0?}) /usr/local/go/src/runtime/panic.go:1046 +0x20 [neko] internal/runtime/maps.(*Map).Delete(0x40001157a0, 0x1260ce0, 0x40021b7438) /usr/local/go/src/internal/runtime/maps/map.go:652 +0x48 [neko] github.com/m1k1o/neko/server/internal/http/legacy.(*session).destroy(0x40032ea5a0) /src/internal/http/legacy/session.go:207 +0xd0 [neko] github.com/m1k1o/neko/server/internal/http/legacy.(*LegacyHandler).Route.func1({0xffff4dcc1c60, 0x400213b8c0}, 0x4002860140) /src/internal/http/legacy/handler.go:212 +0x794 ``` Fixing for reliability! I also had gpt-5 scan the `internal` implementation for any similar cases, resulting in c667c74 ## Testing CI
1 parent 560fac5 commit 5073274

File tree

3 files changed

+18
-2
lines changed

3 files changed

+18
-2
lines changed

server/internal/capture/streamsink.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,9 @@ func (manager *StreamSinkManagerCtx) ID() string {
143143
}
144144

145145
func (manager *StreamSinkManagerCtx) Bitrate() uint64 {
146+
manager.listenersMu.Lock()
147+
defer manager.listenersMu.Unlock()
148+
146149
return manager.bitrate
147150
}
148151

@@ -151,7 +154,7 @@ func (manager *StreamSinkManagerCtx) Codec() codec.RTPCodec {
151154
}
152155

153156
func (manager *StreamSinkManagerCtx) start() error {
154-
if len(manager.listeners)+len(manager.listenersKf) == 0 {
157+
if manager.ListenersCount() == 0 {
155158
err := manager.CreatePipeline()
156159
if err != nil && !errors.Is(err, types.ErrCapturePipelineAlreadyExists) {
157160
return err
@@ -164,7 +167,7 @@ func (manager *StreamSinkManagerCtx) start() error {
164167
}
165168

166169
func (manager *StreamSinkManagerCtx) stop() {
167-
if len(manager.listeners)+len(manager.listenersKf) == 0 {
170+
if manager.ListenersCount() == 0 {
168171
manager.DestroyPipeline()
169172
manager.logger.Info().Msgf("last listener, stopping")
170173
}
@@ -408,6 +411,8 @@ func (manager *StreamSinkManagerCtx) DestroyPipeline() {
408411

409412
manager.pipelinesActive.Set(0)
410413

414+
manager.listenersMu.Lock()
415+
defer manager.listenersMu.Unlock()
411416
manager.brBuckets = make(map[int]float64)
412417
manager.bitrate = 0
413418
}

server/internal/http/legacy/handler.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"net/http"
1010
"net/url"
1111
"strings"
12+
"sync"
1213
"time"
1314

1415
"github.com/m1k1o/neko/server/internal/api"
@@ -44,6 +45,7 @@ type LegacyHandler struct {
4445
bannedIPs map[string]struct{}
4546
sessionIPs map[string]string
4647
wsDialer *websocket.Dialer
48+
mu sync.Mutex
4749
}
4850

4951
func New(serverAddr string) *LegacyHandler {
@@ -393,6 +395,8 @@ func (h *LegacyHandler) Route(r types.Router) {
393395

394396
func (h *LegacyHandler) ban(sessionId string) error {
395397
// find session by id
398+
h.mu.Lock()
399+
defer h.mu.Unlock()
396400
ip, ok := h.sessionIPs[sessionId]
397401
if !ok {
398402
return fmt.Errorf("session not found")
@@ -404,6 +408,9 @@ func (h *LegacyHandler) ban(sessionId string) error {
404408

405409
func (h *LegacyHandler) isBanned(r *http.Request) bool {
406410
ip := getIp(r)
411+
h.mu.Lock()
412+
defer h.mu.Unlock()
413+
407414
_, ok := h.bannedIPs[ip]
408415
return ok
409416
}

server/internal/http/legacy/session.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,8 @@ func (s *session) create(username, password string) error {
181181
}
182182

183183
s.id, s.ip = data.ID, getIp(s.r)
184+
s.h.mu.Lock()
185+
defer s.h.mu.Unlock()
184186
s.h.sessionIPs[s.id] = s.ip // save session ip by id
185187
s.token = data.Token
186188
s.name = data.Profile.Name
@@ -204,5 +206,7 @@ func (s *session) destroy() {
204206
}
205207

206208
// remove session id from ip map
209+
s.h.mu.Lock()
210+
defer s.h.mu.Unlock()
207211
delete(s.h.sessionIPs, s.id)
208212
}

0 commit comments

Comments
 (0)