Skip to content

Commit 99bbe3d

Browse files
authored
Drop all maps other than byLocalTag (#614)
1 parent 2a711c7 commit 99bbe3d

File tree

9 files changed

+232
-63
lines changed

9 files changed

+232
-63
lines changed

pkg/sip/client.go

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@ type Client struct {
6464
closing core.Fuse
6565
cmu sync.Mutex
6666
activeCalls map[LocalTag]*outboundCall
67-
byRemote map[RemoteTag]*outboundCall
6867

6968
handler Handler
7069
getIOClient GetIOInfoClient
@@ -103,7 +102,6 @@ func NewClient(region string, conf *config.Config, log logger.Logger, mon *stats
103102
getSipClient: DefaultGetSipClientFunc,
104103
getRoom: DefaultGetRoomFunc,
105104
activeCalls: make(map[LocalTag]*outboundCall),
106-
byRemote: make(map[RemoteTag]*outboundCall),
107105
}
108106
for _, option := range options {
109107
option(c)
@@ -146,7 +144,6 @@ func (c *Client) Stop() {
146144
c.cmu.Lock()
147145
calls := maps.Values(c.activeCalls)
148146
c.activeCalls = make(map[LocalTag]*outboundCall)
149-
c.byRemote = make(map[RemoteTag]*outboundCall)
150147
c.cmu.Unlock()
151148
for _, call := range calls {
152149
call.Close(ctx)
@@ -331,9 +328,9 @@ func (c *Client) onBye(req *sip.Request, tx sip.ServerTransaction) bool {
331328
ctx := context.Background()
332329
ctx, span := Tracer.Start(ctx, "sip.Client.onBye")
333330
defer span.End()
334-
tag, _ := getFromTag(req)
331+
tag, _ := GetLocalTagUAS(req)
335332
c.cmu.Lock()
336-
call := c.byRemote[tag]
333+
call := c.activeCalls[tag]
337334
c.cmu.Unlock()
338335
if call == nil {
339336
if tag != "" {
@@ -351,9 +348,9 @@ func (c *Client) onBye(req *sip.Request, tx sip.ServerTransaction) bool {
351348
}
352349

353350
func (c *Client) onNotify(req *sip.Request, tx sip.ServerTransaction) bool {
354-
tag, _ := getFromTag(req)
351+
tag, _ := GetLocalTagUAS(req)
355352
c.cmu.Lock()
356-
call := c.byRemote[tag]
353+
call := c.activeCalls[tag]
357354
c.cmu.Unlock()
358355
if call == nil {
359356
return false

pkg/sip/inbound.go

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,10 @@ func hashPassword(password string) string {
8181
return hex.EncodeToString(hash[:8]) // Use first 8 bytes for shorter hash
8282
}
8383

84+
func generateNonce(sipCallID string) string {
85+
return fmt.Sprintf("%d-%s", time.Now().UnixMicro(), sipCallID)
86+
}
87+
8488
type inboundCallInfo struct {
8589
sync.Mutex
8690
cseq uint32
@@ -122,19 +126,19 @@ func (c *inboundCallInfo) countInvite(log logger.Logger, req *sip.Request) {
122126
}
123127
}
124128

125-
func (s *Server) getCallInfo(id string) *inboundCallInfo {
126-
c, _ := s.infos.byCallID.Get(id)
129+
func (s *Server) getCallInfo(id LocalTag) *inboundCallInfo {
130+
c, _ := s.infos.byLocalTag.Get(id)
127131
if c != nil {
128132
return c
129133
}
130134
s.infos.Lock()
131135
defer s.infos.Unlock()
132-
c, _ = s.infos.byCallID.Get(id)
136+
c, _ = s.infos.byLocalTag.Get(id)
133137
if c != nil {
134138
return c
135139
}
136140
c = &inboundCallInfo{}
137-
s.infos.byCallID.Add(id, c)
141+
s.infos.byLocalTag.Add(id, c)
138142
return c
139143
}
140144

@@ -187,7 +191,7 @@ func (s *Server) handleInviteAuth(tid traceid.ID, log logger.Logger, req *sip.Re
187191
if h == nil {
188192
inviteState.challenge = digest.Challenge{
189193
Realm: UserAgent,
190-
Nonce: fmt.Sprintf("%d", time.Now().UnixMicro()),
194+
Nonce: generateNonce(sipCallID),
191195
Algorithm: "MD5",
192196
}
193197

@@ -331,7 +335,7 @@ func (s *Server) processInvite(req *sip.Request, tx sip.ServerTransaction) (retE
331335
log.Infow("processing invite")
332336

333337
s.cmu.RLock()
334-
existing := s.byCallID[cc.SIPCallID()]
338+
existing := s.byLocalTag[cc.ID()]
335339
s.cmu.RUnlock()
336340
if existing != nil && existing.cc.InviteCSeq() < cc.InviteCSeq() {
337341
log.Infow("accepting reinvite", "content-type", req.ContentType(), "content-length", req.ContentLength())
@@ -437,7 +441,7 @@ func (s *Server) processInvite(req *sip.Request, tx sip.ServerTransaction) (retE
437441
// We will send password request anyway, so might as well signal that the progress is made.
438442
cc.Processing()
439443
}
440-
s.getCallInfo(cc.SIPCallID()).countInvite(log, req)
444+
s.getCallInfo(cc.ID()).countInvite(log, req)
441445
if !s.handleInviteAuth(tid, log, req, tx, from.User, r.Username, r.Password) {
442446
// Store (call-ID + from tag) to (to tag) mapping
443447
s.cmu.Lock()
@@ -449,7 +453,7 @@ func (s *Server) processInvite(req *sip.Request, tx sip.ServerTransaction) (retE
449453
}
450454
// ok
451455
case AuthAccept:
452-
s.getCallInfo(cc.SIPCallID()).countInvite(log, req)
456+
s.getCallInfo(cc.ID()).countInvite(log, req)
453457
// ok
454458
}
455459

@@ -464,12 +468,12 @@ func (s *Server) onOptions(log *slog.Logger, req *sip.Request, tx sip.ServerTran
464468
}
465469

466470
func (s *Server) onAck(log *slog.Logger, req *sip.Request, tx sip.ServerTransaction) {
467-
tag, err := getFromTag(req)
471+
tag, err := GetLocalTagUAS(req)
468472
if err != nil {
469473
return
470474
}
471475
s.cmu.RLock()
472-
c := s.byRemoteTag[tag]
476+
c := s.byLocalTag[tag]
473477
s.cmu.RUnlock()
474478
if c == nil {
475479
return
@@ -479,14 +483,14 @@ func (s *Server) onAck(log *slog.Logger, req *sip.Request, tx sip.ServerTransact
479483
}
480484

481485
func (s *Server) onBye(log *slog.Logger, req *sip.Request, tx sip.ServerTransaction) {
482-
tag, err := getFromTag(req)
486+
tag, err := GetLocalTagUAS(req)
483487
if err != nil {
484488
_ = tx.Respond(sip.NewResponseFromRequest(req, sip.StatusBadRequest, "", nil))
485489
return
486490
}
487491

488492
s.cmu.RLock()
489-
c := s.byRemoteTag[tag]
493+
c := s.byLocalTag[tag]
490494
s.cmu.RUnlock()
491495
if c != nil {
492496
c.cc.AcceptBye(req, tx)
@@ -542,14 +546,14 @@ func (s *Server) OnNoRoute(log *slog.Logger, req *sip.Request, tx sip.ServerTran
542546
}
543547

544548
func (s *Server) onNotify(log *slog.Logger, req *sip.Request, tx sip.ServerTransaction) {
545-
tag, err := getFromTag(req)
549+
tag, err := GetLocalTagUAS(req)
546550
if err != nil {
547551
_ = tx.Respond(sip.NewResponseFromRequest(req, sip.StatusBadRequest, "", nil))
548552
return
549553
}
550554

551555
s.cmu.RLock()
552-
c := s.byRemoteTag[tag]
556+
c := s.byLocalTag[tag]
553557
s.cmu.RUnlock()
554558
if c != nil {
555559
c.log().Infow("NOTIFY")
@@ -631,9 +635,7 @@ func (s *Server) newInboundCall(
631635
c.lkRoom = s.getRoom(c.log(), &c.stats.Room)
632636
c.ctx, c.cancel = context.WithCancel(ctx)
633637
s.cmu.Lock()
634-
s.byRemoteTag[cc.Tag()] = c
635638
s.byLocalTag[cc.ID()] = c
636-
s.byCallID[cc.SIPCallID()] = c
637639
s.cmu.Unlock()
638640
return c
639641
}
@@ -1189,9 +1191,7 @@ func (c *inboundCall) close(ctx context.Context, error bool, status CallStatus,
11891191
c.callDur()
11901192
}
11911193
c.s.cmu.Lock()
1192-
delete(c.s.byRemoteTag, c.cc.Tag())
11931194
delete(c.s.byLocalTag, c.cc.ID())
1194-
delete(c.s.byCallID, c.cc.SIPCallID())
11951195
c.s.cmu.Unlock()
11961196

11971197
c.s.DeregisterTransferSIPParticipant(c.cc.ID())
Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
package sip
2+
3+
import (
4+
"log/slog"
5+
"math/rand"
6+
"net/netip"
7+
"testing"
8+
"time"
9+
10+
"github.com/stretchr/testify/require"
11+
12+
"github.com/livekit/media-sdk/sdp"
13+
"github.com/livekit/mediatransportutil/pkg/rtcconfig"
14+
"github.com/livekit/protocol/logger"
15+
"github.com/livekit/protocol/rpc"
16+
"github.com/livekit/sipgo"
17+
"github.com/livekit/sipgo/sip"
18+
19+
"github.com/livekit/sip/pkg/config"
20+
"github.com/livekit/sip/pkg/stats"
21+
)
22+
23+
type InboundTest struct {
24+
Server *Server
25+
Handler Handler
26+
Client *sipgo.Client
27+
addr netip.AddrPort
28+
}
29+
30+
func (it *InboundTest) NewInvite(t *testing.T, callID string, cseq uint32, offer []byte) (*sip.Request, []byte) {
31+
if offer == nil {
32+
sdpOffer, err := sdp.NewOffer(it.addr.Addr(), 0xB0B, sdp.EncryptionNone)
33+
require.NoError(t, err)
34+
offer, err = sdpOffer.SDP.Marshal()
35+
require.NoError(t, err)
36+
}
37+
38+
inviteReq := sip.NewRequest(sip.INVITE, sip.Uri{User: "to", Host: it.addr.String()})
39+
fromTag := sip.GenerateTagN(16)
40+
inviteReq.AppendHeader(&sip.FromHeader{
41+
Address: sip.Uri{
42+
Scheme: "sip",
43+
User: "caller",
44+
Host: it.addr.String(),
45+
},
46+
Params: sip.HeaderParams{
47+
"tag": fromTag,
48+
},
49+
})
50+
inviteReq.AppendHeader(&sip.ToHeader{
51+
Address: sip.Uri{
52+
Scheme: "sip",
53+
User: "callee",
54+
Host: it.addr.String(),
55+
},
56+
})
57+
inviteReq.SetDestination(it.addr.String())
58+
inviteReq.SetBody(offer)
59+
inviteReq.AppendHeader(sip.NewHeader("Content-Type", "application/sdp"))
60+
callIDHdr := sip.CallIDHeader(callID)
61+
inviteReq.AppendHeader(&callIDHdr)
62+
inviteReq.AppendHeader(&sip.CSeqHeader{SeqNo: cseq, MethodName: sip.INVITE})
63+
return inviteReq, offer
64+
}
65+
66+
func (it *InboundTest) TransactionRequest(t *testing.T, req *sip.Request) *sip.Response {
67+
tx, err := it.Client.TransactionRequest(req)
68+
require.NoError(t, err)
69+
defer tx.Terminate()
70+
71+
resp := getFinalResponseOrFail(t, tx, req)
72+
if resp.StatusCode < 300 { // Need to send ACK for 2xx, sipgo sends ACK for 3xx+
73+
ack := sip.NewAckRequest(req, resp, nil)
74+
err = it.Client.WriteRequest(ack)
75+
require.NoError(t, err)
76+
}
77+
return resp
78+
}
79+
80+
func (it *InboundTest) Address() netip.AddrPort {
81+
return it.addr
82+
}
83+
84+
func NewInboundTest(t *testing.T) *InboundTest {
85+
t.Helper()
86+
87+
sipPort := rand.Intn(testPortSIPMax-testPortSIPMin) + testPortSIPMin
88+
loopback := netip.MustParseAddr("127.0.0.1")
89+
90+
conf := &config.Config{
91+
MaxCpuUtilization: 0.9,
92+
SIPPort: sipPort,
93+
SIPPortListen: sipPort,
94+
RTPPort: rtcconfig.PortRange{Start: testPortRTPMin, End: testPortRTPMax},
95+
SIPRingingInterval: time.Second,
96+
}
97+
mon, err := stats.NewMonitor(conf)
98+
require.NoError(t, err)
99+
require.NoError(t, mon.Start(conf), "start monitor so metrics (e.g. inviteReqRaw) are registered")
100+
101+
log := logger.NewTestLogger(t)
102+
103+
srv := NewServer(
104+
"",
105+
conf,
106+
log,
107+
mon,
108+
func(projectID string) rpc.IOInfoClient { return &MockIOInfoClient{} },
109+
WithGetRoomServer(newTestRoom),
110+
)
111+
require.NotNil(t, srv)
112+
113+
sconf := &ServiceConfig{
114+
SignalingIP: loopback,
115+
SignalingIPLocal: loopback,
116+
MediaIP: loopback,
117+
}
118+
119+
err = srv.Start(nil, sconf, nil, nil)
120+
require.NoError(t, err)
121+
t.Cleanup(srv.Stop)
122+
123+
handler := &TestHandler{}
124+
srv.SetHandler(handler)
125+
126+
addr := netip.AddrPortFrom(loopback, uint16(sipPort))
127+
128+
ua, err := sipgo.NewUA(
129+
sipgo.WithUserAgent("from@test"),
130+
sipgo.WithUserAgentLogger(slog.New(logger.ToSlogHandler(srv.log))),
131+
)
132+
require.NoError(t, err)
133+
134+
client, err := sipgo.NewClient(ua)
135+
require.NoError(t, err)
136+
t.Cleanup(func() { client.Close() })
137+
t.Cleanup(func() { ua.Close() })
138+
139+
return &InboundTest{Server: srv, Handler: handler, Client: client, addr: addr}
140+
}
141+
142+
func TestProcessInvite_Reinvite(t *testing.T) {
143+
it := NewInboundTest(t)
144+
145+
cseq := uint32(2)
146+
callID := "reinvite-new@test"
147+
origInviteReq, _ := it.NewInvite(t, callID, cseq, nil)
148+
firstResp := it.TransactionRequest(t, origInviteReq.Clone())
149+
require.Equal(t, sip.StatusCode(200), firstResp.StatusCode, "200 OK")
150+
answer := string(firstResp.Body())
151+
152+
// Test prev CSeq
153+
req2 := origInviteReq.Clone()
154+
req2.CSeq().SeqNo = cseq - 1
155+
resp2 := it.TransactionRequest(t, req2)
156+
require.Equal(t, sip.StatusCode(200), resp2.StatusCode, "200 OK")
157+
require.NotEqual(t, answer, string(resp2.Body()), "answer should not be the same as original answer")
158+
159+
// Test next CSeq
160+
req3 := origInviteReq.Clone()
161+
req3.CSeq().SeqNo = cseq + 1
162+
req3.ReplaceHeader(sip.HeaderClone(firstResp.To()))
163+
resp3 := it.TransactionRequest(t, req3)
164+
require.Equal(t, sip.StatusCode(200), resp3.StatusCode, "200 OK")
165+
require.Equal(t, answer, string(resp3.Body()), "answer should be the same")
166+
require.NotEqual(t, resp2.To().Params["tag"], resp3.To().Params["tag"], "to tag should not be the same")
167+
}

pkg/sip/outbound.go

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -343,9 +343,6 @@ func (c *outboundCall) close(ctx context.Context, err error, status CallStatus,
343343

344344
c.c.cmu.Lock()
345345
delete(c.c.activeCalls, c.cc.ID())
346-
if tag := c.cc.Tag(); tag != "" {
347-
delete(c.c.byRemote, tag)
348-
}
349346
c.c.cmu.Unlock()
350347

351348
c.c.DeregisterTransferSIPParticipant(string(c.cc.ID()))
@@ -653,10 +650,6 @@ func (c *outboundCall) sipSignal(ctx context.Context, tid traceid.ID) error {
653650
return err
654651
}
655652

656-
c.c.cmu.Lock()
657-
c.c.byRemote[c.cc.Tag()] = c
658-
c.c.cmu.Unlock()
659-
660653
c.mon.InviteAccept()
661654
c.media.EnableOut()
662655
c.media.EnableTimeout(true)

pkg/sip/outbound_utilities_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@ func newTestRoom(log logger.Logger, st *RoomStats) RoomInterface {
132132

133133
// Set ready immediately (skip connection)
134134
room.ready.Break()
135+
room.subscribed.Break()
135136
resolve.Resolve()
136137

137138
room.room.OnRoomUpdate(&livekit.Room{ // Set metadata, and specifically Sid

0 commit comments

Comments
 (0)