diff --git a/pkg/config/config.go b/pkg/config/config.go index 3d2845b7..712bdba1 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -87,6 +87,8 @@ type Config struct { Logging logger.Config `yaml:"logging"` ClusterID string `yaml:"cluster_id"` // cluster this instance belongs to MaxCpuUtilization float64 `yaml:"max_cpu_utilization"` + MaxActiveCalls int `yaml:"max_active_calls"` // if set, used for affinity-based routing + SIPTrunkIds []string `yaml:"sip_trunk_ids"` // if set, only accept calls for these trunk IDs UseExternalIP bool `yaml:"use_external_ip"` LocalNet string `yaml:"local_net"` // local IP net to use, e.g. 192.168.0.0/24 diff --git a/pkg/sip/service.go b/pkg/sip/service.go index 9a734f9c..9d7e7024 100644 --- a/pkg/sip/service.go +++ b/pkg/sip/service.go @@ -24,6 +24,7 @@ import ( "net" "net/netip" "os" + "slices" "strings" "sync" "sync/atomic" @@ -312,8 +313,17 @@ func (s *Service) CreateSIPParticipant(ctx context.Context, req *rpc.InternalCre } func (s *Service) CreateSIPParticipantAffinity(ctx context.Context, req *rpc.InternalCreateSIPParticipantRequest) float32 { - // TODO: scale affinity based on a number or active calls? - return 0.5 + if len(s.conf.SIPTrunkIds) > 0 && !slices.Contains(s.conf.SIPTrunkIds, req.GetSipTrunkId()) { + return 0 + } + active := float32(s.ActiveCalls().Total()) + if max := float32(s.conf.MaxActiveCalls); max > 0 { + if active >= max { + return 0 + } + return 1 - active/max + } + return 1 / (1 + active) } func (s *Service) TransferSIPParticipant(ctx context.Context, req *rpc.InternalTransferSIPParticipantRequest) (*emptypb.Empty, error) { diff --git a/pkg/sip/service_test.go b/pkg/sip/service_test.go index 7b3742fe..93aee69e 100644 --- a/pkg/sip/service_test.go +++ b/pkg/sip/service_test.go @@ -949,3 +949,182 @@ func TestSameCallIDForAuthFlow(t *testing.T) { t.Logf("First call ID: %s", capturedCallIDs[0]) t.Logf("Second call ID: %s", capturedCallIDs[1]) } + +// newServiceForAffinity creates a minimal Service with initialized client/server maps +// suitable for testing CreateSIPParticipantAffinity without network setup. +func newServiceForAffinity(conf *config.Config) *Service { + cli := &Client{ + conf: conf, + activeCalls: make(map[LocalTag]*outboundCall), + } + srv := &Server{ + conf: conf, + byRemoteTag: make(map[RemoteTag]*inboundCall), + } + return &Service{ + conf: conf, + cli: cli, + srv: srv, + } +} + +func TestCreateSIPParticipantAffinity_NoConfig_NoCalls(t *testing.T) { + s := newServiceForAffinity(&config.Config{}) + got := s.CreateSIPParticipantAffinity(context.Background(), nil) + // 1 / (1 + 0) = 1.0 + require.InDelta(t, float32(1.0), got, 0.001) +} + +func TestCreateSIPParticipantAffinity_NoConfig_WithCalls(t *testing.T) { + s := newServiceForAffinity(&config.Config{}) + + // Add 4 outbound calls + for i := 0; i < 4; i++ { + s.cli.activeCalls[LocalTag(fmt.Sprintf("out-%d", i))] = &outboundCall{} + } + // Add 5 inbound calls + for i := 0; i < 5; i++ { + s.srv.byRemoteTag[RemoteTag(fmt.Sprintf("in-%d", i))] = &inboundCall{} + } + + got := s.CreateSIPParticipantAffinity(context.Background(), nil) + // 1 / (1 + 9) = 0.1 + require.InDelta(t, float32(0.1), got, 0.001) +} + +func TestCreateSIPParticipantAffinity_WithMaxCalls(t *testing.T) { + s := newServiceForAffinity(&config.Config{MaxActiveCalls: 100}) + + got := s.CreateSIPParticipantAffinity(context.Background(), nil) + // 0 active, max 100 => 1 - 0/100 = 1.0 + require.InDelta(t, float32(1.0), got, 0.001) +} + +func TestCreateSIPParticipantAffinity_WithMaxCalls_PartialLoad(t *testing.T) { + s := newServiceForAffinity(&config.Config{MaxActiveCalls: 100}) + + // Add 25 outbound calls before first measurement + for i := 0; i < 25; i++ { + s.cli.activeCalls[LocalTag(fmt.Sprintf("out-%d", i))] = &outboundCall{} + } + got := s.CreateSIPParticipantAffinity(context.Background(), nil) + // 25 active, max 100 => 1 - 25/100 = 0.75 + require.InDelta(t, float32(0.75), got, 0.001) + + // Add 25 more (50 total) + for i := 25; i < 50; i++ { + s.cli.activeCalls[LocalTag(fmt.Sprintf("out-%d", i))] = &outboundCall{} + } + got = s.CreateSIPParticipantAffinity(context.Background(), nil) + // 50 active, max 100 => 1 - 50/100 = 0.5 + require.InDelta(t, float32(0.5), got, 0.001) + + // Add 49 more (99 total, just under capacity) + for i := 50; i < 99; i++ { + s.cli.activeCalls[LocalTag(fmt.Sprintf("out-%d", i))] = &outboundCall{} + } + got = s.CreateSIPParticipantAffinity(context.Background(), nil) + // 99 active, max 100 => 1 - 99/100 = 0.01 + require.InDelta(t, float32(0.01), got, 0.001) +} + +func TestCreateSIPParticipantAffinity_AtCapacity(t *testing.T) { + s := newServiceForAffinity(&config.Config{MaxActiveCalls: 10}) + + for i := 0; i < 10; i++ { + s.cli.activeCalls[LocalTag(fmt.Sprintf("out-%d", i))] = &outboundCall{} + } + + got := s.CreateSIPParticipantAffinity(context.Background(), nil) + require.Equal(t, float32(0), got) +} + +func TestCreateSIPParticipantAffinity_OverCapacity(t *testing.T) { + s := newServiceForAffinity(&config.Config{MaxActiveCalls: 10}) + + for i := 0; i < 15; i++ { + s.cli.activeCalls[LocalTag(fmt.Sprintf("out-%d", i))] = &outboundCall{} + } + + got := s.CreateSIPParticipantAffinity(context.Background(), nil) + require.Equal(t, float32(0), got) +} + +func TestCreateSIPParticipantAffinity_MixedInboundOutbound(t *testing.T) { + s := newServiceForAffinity(&config.Config{MaxActiveCalls: 20}) + + // 6 outbound + 4 inbound = 10 total + for i := 0; i < 6; i++ { + s.cli.activeCalls[LocalTag(fmt.Sprintf("out-%d", i))] = &outboundCall{} + } + for i := 0; i < 4; i++ { + s.srv.byRemoteTag[RemoteTag(fmt.Sprintf("in-%d", i))] = &inboundCall{} + } + + got := s.CreateSIPParticipantAffinity(context.Background(), nil) + // 10 active, max 20 => 1 - 10/20 = 0.5 + require.InDelta(t, float32(0.5), got, 0.001) +} + +func TestCreateSIPParticipantAffinity_TrunkWhitelist_Allowed(t *testing.T) { + s := newServiceForAffinity(&config.Config{ + SIPTrunkIds: []string{"trunk-a", "trunk-b"}, + }) + + req := &rpc.InternalCreateSIPParticipantRequest{SipTrunkId: "trunk-a"} + got := s.CreateSIPParticipantAffinity(context.Background(), req) + // Trunk is whitelisted, 0 active calls, no max => 1/(1+0) = 1.0 + require.InDelta(t, float32(1.0), got, 0.001) +} + +func TestCreateSIPParticipantAffinity_TrunkWhitelist_Rejected(t *testing.T) { + s := newServiceForAffinity(&config.Config{ + SIPTrunkIds: []string{"trunk-a", "trunk-b"}, + }) + + req := &rpc.InternalCreateSIPParticipantRequest{SipTrunkId: "trunk-c"} + got := s.CreateSIPParticipantAffinity(context.Background(), req) + require.Equal(t, float32(0), got) +} + +func TestCreateSIPParticipantAffinity_TrunkWhitelist_EmptyTrunkId(t *testing.T) { + s := newServiceForAffinity(&config.Config{ + SIPTrunkIds: []string{"trunk-a"}, + }) + + req := &rpc.InternalCreateSIPParticipantRequest{} + got := s.CreateSIPParticipantAffinity(context.Background(), req) + // Empty trunk ID is not in the whitelist + require.Equal(t, float32(0), got) +} + +func TestCreateSIPParticipantAffinity_TrunkWhitelist_EmptyList(t *testing.T) { + s := newServiceForAffinity(&config.Config{}) + + // No whitelist configured, any trunk ID should work + req := &rpc.InternalCreateSIPParticipantRequest{SipTrunkId: "any-trunk"} + got := s.CreateSIPParticipantAffinity(context.Background(), req) + require.InDelta(t, float32(1.0), got, 0.001) +} + +func TestCreateSIPParticipantAffinity_TrunkWhitelist_WithMaxCalls(t *testing.T) { + s := newServiceForAffinity(&config.Config{ + SIPTrunkIds: []string{"trunk-a"}, + MaxActiveCalls: 100, + }) + + // Add 50 calls + for i := 0; i < 50; i++ { + s.cli.activeCalls[LocalTag(fmt.Sprintf("out-%d", i))] = &outboundCall{} + } + + // Whitelisted trunk: should get normal affinity + req := &rpc.InternalCreateSIPParticipantRequest{SipTrunkId: "trunk-a"} + got := s.CreateSIPParticipantAffinity(context.Background(), req) + require.InDelta(t, float32(0.5), got, 0.001) + + // Non-whitelisted trunk: 0 regardless of load + req = &rpc.InternalCreateSIPParticipantRequest{SipTrunkId: "trunk-x"} + got = s.CreateSIPParticipantAffinity(context.Background(), req) + require.Equal(t, float32(0), got) +}