Skip to content

Commit 2a711c7

Browse files
authored
Scaling based on a number of active calls fixed and filtering by sipTrunkId (#598)
* Scaling based on a number of active calls * Filtering by SipTrunkIds added to scaling method * Code coverage improvement * Casting between float64 and float32 fixed
1 parent a6a886e commit 2a711c7

File tree

3 files changed

+193
-2
lines changed

3 files changed

+193
-2
lines changed

pkg/config/config.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,8 @@ type Config struct {
8787
Logging logger.Config `yaml:"logging"`
8888
ClusterID string `yaml:"cluster_id"` // cluster this instance belongs to
8989
MaxCpuUtilization float64 `yaml:"max_cpu_utilization"`
90+
MaxActiveCalls int `yaml:"max_active_calls"` // if set, used for affinity-based routing
91+
SIPTrunkIds []string `yaml:"sip_trunk_ids"` // if set, only accept calls for these trunk IDs
9092

9193
UseExternalIP bool `yaml:"use_external_ip"`
9294
LocalNet string `yaml:"local_net"` // local IP net to use, e.g. 192.168.0.0/24

pkg/sip/service.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"net"
2525
"net/netip"
2626
"os"
27+
"slices"
2728
"strings"
2829
"sync"
2930
"sync/atomic"
@@ -312,8 +313,17 @@ func (s *Service) CreateSIPParticipant(ctx context.Context, req *rpc.InternalCre
312313
}
313314

314315
func (s *Service) CreateSIPParticipantAffinity(ctx context.Context, req *rpc.InternalCreateSIPParticipantRequest) float32 {
315-
// TODO: scale affinity based on a number or active calls?
316-
return 0.5
316+
if len(s.conf.SIPTrunkIds) > 0 && !slices.Contains(s.conf.SIPTrunkIds, req.GetSipTrunkId()) {
317+
return 0
318+
}
319+
active := float32(s.ActiveCalls().Total())
320+
if max := float32(s.conf.MaxActiveCalls); max > 0 {
321+
if active >= max {
322+
return 0
323+
}
324+
return 1 - active/max
325+
}
326+
return 1 / (1 + active)
317327
}
318328

319329
func (s *Service) TransferSIPParticipant(ctx context.Context, req *rpc.InternalTransferSIPParticipantRequest) (*emptypb.Empty, error) {

pkg/sip/service_test.go

Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -949,3 +949,182 @@ func TestSameCallIDForAuthFlow(t *testing.T) {
949949
t.Logf("First call ID: %s", capturedCallIDs[0])
950950
t.Logf("Second call ID: %s", capturedCallIDs[1])
951951
}
952+
953+
// newServiceForAffinity creates a minimal Service with initialized client/server maps
954+
// suitable for testing CreateSIPParticipantAffinity without network setup.
955+
func newServiceForAffinity(conf *config.Config) *Service {
956+
cli := &Client{
957+
conf: conf,
958+
activeCalls: make(map[LocalTag]*outboundCall),
959+
}
960+
srv := &Server{
961+
conf: conf,
962+
byRemoteTag: make(map[RemoteTag]*inboundCall),
963+
}
964+
return &Service{
965+
conf: conf,
966+
cli: cli,
967+
srv: srv,
968+
}
969+
}
970+
971+
func TestCreateSIPParticipantAffinity_NoConfig_NoCalls(t *testing.T) {
972+
s := newServiceForAffinity(&config.Config{})
973+
got := s.CreateSIPParticipantAffinity(context.Background(), nil)
974+
// 1 / (1 + 0) = 1.0
975+
require.InDelta(t, float32(1.0), got, 0.001)
976+
}
977+
978+
func TestCreateSIPParticipantAffinity_NoConfig_WithCalls(t *testing.T) {
979+
s := newServiceForAffinity(&config.Config{})
980+
981+
// Add 4 outbound calls
982+
for i := 0; i < 4; i++ {
983+
s.cli.activeCalls[LocalTag(fmt.Sprintf("out-%d", i))] = &outboundCall{}
984+
}
985+
// Add 5 inbound calls
986+
for i := 0; i < 5; i++ {
987+
s.srv.byRemoteTag[RemoteTag(fmt.Sprintf("in-%d", i))] = &inboundCall{}
988+
}
989+
990+
got := s.CreateSIPParticipantAffinity(context.Background(), nil)
991+
// 1 / (1 + 9) = 0.1
992+
require.InDelta(t, float32(0.1), got, 0.001)
993+
}
994+
995+
func TestCreateSIPParticipantAffinity_WithMaxCalls(t *testing.T) {
996+
s := newServiceForAffinity(&config.Config{MaxActiveCalls: 100})
997+
998+
got := s.CreateSIPParticipantAffinity(context.Background(), nil)
999+
// 0 active, max 100 => 1 - 0/100 = 1.0
1000+
require.InDelta(t, float32(1.0), got, 0.001)
1001+
}
1002+
1003+
func TestCreateSIPParticipantAffinity_WithMaxCalls_PartialLoad(t *testing.T) {
1004+
s := newServiceForAffinity(&config.Config{MaxActiveCalls: 100})
1005+
1006+
// Add 25 outbound calls before first measurement
1007+
for i := 0; i < 25; i++ {
1008+
s.cli.activeCalls[LocalTag(fmt.Sprintf("out-%d", i))] = &outboundCall{}
1009+
}
1010+
got := s.CreateSIPParticipantAffinity(context.Background(), nil)
1011+
// 25 active, max 100 => 1 - 25/100 = 0.75
1012+
require.InDelta(t, float32(0.75), got, 0.001)
1013+
1014+
// Add 25 more (50 total)
1015+
for i := 25; i < 50; i++ {
1016+
s.cli.activeCalls[LocalTag(fmt.Sprintf("out-%d", i))] = &outboundCall{}
1017+
}
1018+
got = s.CreateSIPParticipantAffinity(context.Background(), nil)
1019+
// 50 active, max 100 => 1 - 50/100 = 0.5
1020+
require.InDelta(t, float32(0.5), got, 0.001)
1021+
1022+
// Add 49 more (99 total, just under capacity)
1023+
for i := 50; i < 99; i++ {
1024+
s.cli.activeCalls[LocalTag(fmt.Sprintf("out-%d", i))] = &outboundCall{}
1025+
}
1026+
got = s.CreateSIPParticipantAffinity(context.Background(), nil)
1027+
// 99 active, max 100 => 1 - 99/100 = 0.01
1028+
require.InDelta(t, float32(0.01), got, 0.001)
1029+
}
1030+
1031+
func TestCreateSIPParticipantAffinity_AtCapacity(t *testing.T) {
1032+
s := newServiceForAffinity(&config.Config{MaxActiveCalls: 10})
1033+
1034+
for i := 0; i < 10; i++ {
1035+
s.cli.activeCalls[LocalTag(fmt.Sprintf("out-%d", i))] = &outboundCall{}
1036+
}
1037+
1038+
got := s.CreateSIPParticipantAffinity(context.Background(), nil)
1039+
require.Equal(t, float32(0), got)
1040+
}
1041+
1042+
func TestCreateSIPParticipantAffinity_OverCapacity(t *testing.T) {
1043+
s := newServiceForAffinity(&config.Config{MaxActiveCalls: 10})
1044+
1045+
for i := 0; i < 15; i++ {
1046+
s.cli.activeCalls[LocalTag(fmt.Sprintf("out-%d", i))] = &outboundCall{}
1047+
}
1048+
1049+
got := s.CreateSIPParticipantAffinity(context.Background(), nil)
1050+
require.Equal(t, float32(0), got)
1051+
}
1052+
1053+
func TestCreateSIPParticipantAffinity_MixedInboundOutbound(t *testing.T) {
1054+
s := newServiceForAffinity(&config.Config{MaxActiveCalls: 20})
1055+
1056+
// 6 outbound + 4 inbound = 10 total
1057+
for i := 0; i < 6; i++ {
1058+
s.cli.activeCalls[LocalTag(fmt.Sprintf("out-%d", i))] = &outboundCall{}
1059+
}
1060+
for i := 0; i < 4; i++ {
1061+
s.srv.byRemoteTag[RemoteTag(fmt.Sprintf("in-%d", i))] = &inboundCall{}
1062+
}
1063+
1064+
got := s.CreateSIPParticipantAffinity(context.Background(), nil)
1065+
// 10 active, max 20 => 1 - 10/20 = 0.5
1066+
require.InDelta(t, float32(0.5), got, 0.001)
1067+
}
1068+
1069+
func TestCreateSIPParticipantAffinity_TrunkWhitelist_Allowed(t *testing.T) {
1070+
s := newServiceForAffinity(&config.Config{
1071+
SIPTrunkIds: []string{"trunk-a", "trunk-b"},
1072+
})
1073+
1074+
req := &rpc.InternalCreateSIPParticipantRequest{SipTrunkId: "trunk-a"}
1075+
got := s.CreateSIPParticipantAffinity(context.Background(), req)
1076+
// Trunk is whitelisted, 0 active calls, no max => 1/(1+0) = 1.0
1077+
require.InDelta(t, float32(1.0), got, 0.001)
1078+
}
1079+
1080+
func TestCreateSIPParticipantAffinity_TrunkWhitelist_Rejected(t *testing.T) {
1081+
s := newServiceForAffinity(&config.Config{
1082+
SIPTrunkIds: []string{"trunk-a", "trunk-b"},
1083+
})
1084+
1085+
req := &rpc.InternalCreateSIPParticipantRequest{SipTrunkId: "trunk-c"}
1086+
got := s.CreateSIPParticipantAffinity(context.Background(), req)
1087+
require.Equal(t, float32(0), got)
1088+
}
1089+
1090+
func TestCreateSIPParticipantAffinity_TrunkWhitelist_EmptyTrunkId(t *testing.T) {
1091+
s := newServiceForAffinity(&config.Config{
1092+
SIPTrunkIds: []string{"trunk-a"},
1093+
})
1094+
1095+
req := &rpc.InternalCreateSIPParticipantRequest{}
1096+
got := s.CreateSIPParticipantAffinity(context.Background(), req)
1097+
// Empty trunk ID is not in the whitelist
1098+
require.Equal(t, float32(0), got)
1099+
}
1100+
1101+
func TestCreateSIPParticipantAffinity_TrunkWhitelist_EmptyList(t *testing.T) {
1102+
s := newServiceForAffinity(&config.Config{})
1103+
1104+
// No whitelist configured, any trunk ID should work
1105+
req := &rpc.InternalCreateSIPParticipantRequest{SipTrunkId: "any-trunk"}
1106+
got := s.CreateSIPParticipantAffinity(context.Background(), req)
1107+
require.InDelta(t, float32(1.0), got, 0.001)
1108+
}
1109+
1110+
func TestCreateSIPParticipantAffinity_TrunkWhitelist_WithMaxCalls(t *testing.T) {
1111+
s := newServiceForAffinity(&config.Config{
1112+
SIPTrunkIds: []string{"trunk-a"},
1113+
MaxActiveCalls: 100,
1114+
})
1115+
1116+
// Add 50 calls
1117+
for i := 0; i < 50; i++ {
1118+
s.cli.activeCalls[LocalTag(fmt.Sprintf("out-%d", i))] = &outboundCall{}
1119+
}
1120+
1121+
// Whitelisted trunk: should get normal affinity
1122+
req := &rpc.InternalCreateSIPParticipantRequest{SipTrunkId: "trunk-a"}
1123+
got := s.CreateSIPParticipantAffinity(context.Background(), req)
1124+
require.InDelta(t, float32(0.5), got, 0.001)
1125+
1126+
// Non-whitelisted trunk: 0 regardless of load
1127+
req = &rpc.InternalCreateSIPParticipantRequest{SipTrunkId: "trunk-x"}
1128+
got = s.CreateSIPParticipantAffinity(context.Background(), req)
1129+
require.Equal(t, float32(0), got)
1130+
}

0 commit comments

Comments
 (0)