Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ type Config struct {
Logging logger.Config `yaml:"logging"`
ClusterID string `yaml:"cluster_id"` // cluster this instance belongs to
MaxCpuUtilization float64 `yaml:"max_cpu_utilization"`
MaxActiveCalls int `yaml:"max_active_calls"` // if set, used for affinity-based routing
SIPTrunkIds []string `yaml:"sip_trunk_ids"` // if set, only accept calls for these trunk IDs

UseExternalIP bool `yaml:"use_external_ip"`
LocalNet string `yaml:"local_net"` // local IP net to use, e.g. 192.168.0.0/24
Expand Down
14 changes: 12 additions & 2 deletions pkg/sip/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"net"
"net/netip"
"os"
"slices"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -312,8 +313,17 @@ func (s *Service) CreateSIPParticipant(ctx context.Context, req *rpc.InternalCre
}

func (s *Service) CreateSIPParticipantAffinity(ctx context.Context, req *rpc.InternalCreateSIPParticipantRequest) float32 {
// TODO: scale affinity based on a number or active calls?
return 0.5
if len(s.conf.SIPTrunkIds) > 0 && !slices.Contains(s.conf.SIPTrunkIds, req.GetSipTrunkId()) {
return 0
}
active := float32(s.ActiveCalls().Total())
if max := float32(s.conf.MaxActiveCalls); max > 0 {
if active >= max {
return 0
}
return 1 - active/max
}
return 1 / (1 + active)
}

func (s *Service) TransferSIPParticipant(ctx context.Context, req *rpc.InternalTransferSIPParticipantRequest) (*emptypb.Empty, error) {
Expand Down
179 changes: 179 additions & 0 deletions pkg/sip/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -949,3 +949,182 @@ func TestSameCallIDForAuthFlow(t *testing.T) {
t.Logf("First call ID: %s", capturedCallIDs[0])
t.Logf("Second call ID: %s", capturedCallIDs[1])
}

// newServiceForAffinity creates a minimal Service with initialized client/server maps
// suitable for testing CreateSIPParticipantAffinity without network setup.
func newServiceForAffinity(conf *config.Config) *Service {
cli := &Client{
conf: conf,
activeCalls: make(map[LocalTag]*outboundCall),
}
srv := &Server{
conf: conf,
byRemoteTag: make(map[RemoteTag]*inboundCall),
}
return &Service{
conf: conf,
cli: cli,
srv: srv,
}
}

func TestCreateSIPParticipantAffinity_NoConfig_NoCalls(t *testing.T) {
s := newServiceForAffinity(&config.Config{})
got := s.CreateSIPParticipantAffinity(context.Background(), nil)
// 1 / (1 + 0) = 1.0
require.InDelta(t, float32(1.0), got, 0.001)
}

func TestCreateSIPParticipantAffinity_NoConfig_WithCalls(t *testing.T) {
s := newServiceForAffinity(&config.Config{})

// Add 4 outbound calls
for i := 0; i < 4; i++ {
s.cli.activeCalls[LocalTag(fmt.Sprintf("out-%d", i))] = &outboundCall{}
}
// Add 5 inbound calls
for i := 0; i < 5; i++ {
s.srv.byRemoteTag[RemoteTag(fmt.Sprintf("in-%d", i))] = &inboundCall{}
}

got := s.CreateSIPParticipantAffinity(context.Background(), nil)
// 1 / (1 + 9) = 0.1
require.InDelta(t, float32(0.1), got, 0.001)
}

func TestCreateSIPParticipantAffinity_WithMaxCalls(t *testing.T) {
s := newServiceForAffinity(&config.Config{MaxActiveCalls: 100})

got := s.CreateSIPParticipantAffinity(context.Background(), nil)
// 0 active, max 100 => 1 - 0/100 = 1.0
require.InDelta(t, float32(1.0), got, 0.001)
}

func TestCreateSIPParticipantAffinity_WithMaxCalls_PartialLoad(t *testing.T) {
s := newServiceForAffinity(&config.Config{MaxActiveCalls: 100})

// Add 25 outbound calls before first measurement
for i := 0; i < 25; i++ {
s.cli.activeCalls[LocalTag(fmt.Sprintf("out-%d", i))] = &outboundCall{}
}
got := s.CreateSIPParticipantAffinity(context.Background(), nil)
// 25 active, max 100 => 1 - 25/100 = 0.75
require.InDelta(t, float32(0.75), got, 0.001)

// Add 25 more (50 total)
for i := 25; i < 50; i++ {
s.cli.activeCalls[LocalTag(fmt.Sprintf("out-%d", i))] = &outboundCall{}
}
got = s.CreateSIPParticipantAffinity(context.Background(), nil)
// 50 active, max 100 => 1 - 50/100 = 0.5
require.InDelta(t, float32(0.5), got, 0.001)

// Add 49 more (99 total, just under capacity)
for i := 50; i < 99; i++ {
s.cli.activeCalls[LocalTag(fmt.Sprintf("out-%d", i))] = &outboundCall{}
}
got = s.CreateSIPParticipantAffinity(context.Background(), nil)
// 99 active, max 100 => 1 - 99/100 = 0.01
require.InDelta(t, float32(0.01), got, 0.001)
}

func TestCreateSIPParticipantAffinity_AtCapacity(t *testing.T) {
s := newServiceForAffinity(&config.Config{MaxActiveCalls: 10})

for i := 0; i < 10; i++ {
s.cli.activeCalls[LocalTag(fmt.Sprintf("out-%d", i))] = &outboundCall{}
}

got := s.CreateSIPParticipantAffinity(context.Background(), nil)
require.Equal(t, float32(0), got)
}

func TestCreateSIPParticipantAffinity_OverCapacity(t *testing.T) {
s := newServiceForAffinity(&config.Config{MaxActiveCalls: 10})

for i := 0; i < 15; i++ {
s.cli.activeCalls[LocalTag(fmt.Sprintf("out-%d", i))] = &outboundCall{}
}

got := s.CreateSIPParticipantAffinity(context.Background(), nil)
require.Equal(t, float32(0), got)
}

func TestCreateSIPParticipantAffinity_MixedInboundOutbound(t *testing.T) {
s := newServiceForAffinity(&config.Config{MaxActiveCalls: 20})

// 6 outbound + 4 inbound = 10 total
for i := 0; i < 6; i++ {
s.cli.activeCalls[LocalTag(fmt.Sprintf("out-%d", i))] = &outboundCall{}
}
for i := 0; i < 4; i++ {
s.srv.byRemoteTag[RemoteTag(fmt.Sprintf("in-%d", i))] = &inboundCall{}
}

got := s.CreateSIPParticipantAffinity(context.Background(), nil)
// 10 active, max 20 => 1 - 10/20 = 0.5
require.InDelta(t, float32(0.5), got, 0.001)
}

func TestCreateSIPParticipantAffinity_TrunkWhitelist_Allowed(t *testing.T) {
s := newServiceForAffinity(&config.Config{
SIPTrunkIds: []string{"trunk-a", "trunk-b"},
})

req := &rpc.InternalCreateSIPParticipantRequest{SipTrunkId: "trunk-a"}
got := s.CreateSIPParticipantAffinity(context.Background(), req)
// Trunk is whitelisted, 0 active calls, no max => 1/(1+0) = 1.0
require.InDelta(t, float32(1.0), got, 0.001)
}

func TestCreateSIPParticipantAffinity_TrunkWhitelist_Rejected(t *testing.T) {
s := newServiceForAffinity(&config.Config{
SIPTrunkIds: []string{"trunk-a", "trunk-b"},
})

req := &rpc.InternalCreateSIPParticipantRequest{SipTrunkId: "trunk-c"}
got := s.CreateSIPParticipantAffinity(context.Background(), req)
require.Equal(t, float32(0), got)
}

func TestCreateSIPParticipantAffinity_TrunkWhitelist_EmptyTrunkId(t *testing.T) {
s := newServiceForAffinity(&config.Config{
SIPTrunkIds: []string{"trunk-a"},
})

req := &rpc.InternalCreateSIPParticipantRequest{}
got := s.CreateSIPParticipantAffinity(context.Background(), req)
// Empty trunk ID is not in the whitelist
require.Equal(t, float32(0), got)
}

func TestCreateSIPParticipantAffinity_TrunkWhitelist_EmptyList(t *testing.T) {
s := newServiceForAffinity(&config.Config{})

// No whitelist configured, any trunk ID should work
req := &rpc.InternalCreateSIPParticipantRequest{SipTrunkId: "any-trunk"}
got := s.CreateSIPParticipantAffinity(context.Background(), req)
require.InDelta(t, float32(1.0), got, 0.001)
}

func TestCreateSIPParticipantAffinity_TrunkWhitelist_WithMaxCalls(t *testing.T) {
s := newServiceForAffinity(&config.Config{
SIPTrunkIds: []string{"trunk-a"},
MaxActiveCalls: 100,
})

// Add 50 calls
for i := 0; i < 50; i++ {
s.cli.activeCalls[LocalTag(fmt.Sprintf("out-%d", i))] = &outboundCall{}
}

// Whitelisted trunk: should get normal affinity
req := &rpc.InternalCreateSIPParticipantRequest{SipTrunkId: "trunk-a"}
got := s.CreateSIPParticipantAffinity(context.Background(), req)
require.InDelta(t, float32(0.5), got, 0.001)

// Non-whitelisted trunk: 0 regardless of load
req = &rpc.InternalCreateSIPParticipantRequest{SipTrunkId: "trunk-x"}
got = s.CreateSIPParticipantAffinity(context.Background(), req)
require.Equal(t, float32(0), got)
}