Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions pkg/controllers/proxy/network_services_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,15 +383,21 @@ func (nsc *NetworkServicesController) Run(healthChan chan<- *healthcheck.Control
// and we don't want to duplicate the effort, so this is a slimmer version of doSync()
klog.V(1).Info("Performing requested sync of ipvs services")
nsc.mu.Lock()
var syncErrors bool
err = nsc.syncIpvsServices(nsc.getServiceMap(), nsc.endpointsMap)
if err != nil {
klog.Errorf("error during ipvs sync in network service controller. Error: %v", err)
syncErrors = true
}
err = nsc.syncHairpinIptablesRules()
if err != nil {
klog.Errorf("error syncing hairpin iptables rules: %v", err)
syncErrors = true
}
nsc.mu.Unlock()
if syncErrors {
err = fmt.Errorf("one or more errors during ipvs sync")
}
}
if err == nil {
healthcheck.SendHeartBeat(healthChan, healthcheck.NetworkServicesController)
Expand Down Expand Up @@ -482,9 +488,9 @@ func (nsc *NetworkServicesController) setupIpvsFirewall() error {
return fmt.Errorf("failed to run iptables command: %s", err.Error())
}

// config.IpvsPermitAll: true then create INPUT/KUBE-ROUTER-SERVICE Chain creation else return
// config.IpvsPermitAll: true then create INPUT/KUBE-ROUTER-SERVICE Chain creation else skip
if !nsc.ipvsPermitAll {
return nil
continue
}

var comment string
Expand Down Expand Up @@ -981,10 +987,11 @@ func parseSchedFlags(value string) schedFlags {
func shuffle(endPoints []endpointSliceInfo) []endpointSliceInfo {
for index1 := range endPoints {
randBitInt, err := rand.Int(rand.Reader, big.NewInt(int64(index1+1)))
index2 := randBitInt.Int64()
if err != nil {
klog.Warningf("unable to get a random int: %v", err)
continue
}
index2 := randBitInt.Int64()
endPoints[index1], endPoints[index2] = endPoints[index2], endPoints[index1]
}
return endPoints
Expand Down Expand Up @@ -1561,6 +1568,9 @@ func (nsc *NetworkServicesController) setupMangleTableRule(ip string, protocol s
var iptablesCmdHandler utils.IPTablesHandler
tcpMSS := nsc.mtu
parsedIP := net.ParseIP(ip)
if parsedIP == nil {
return fmt.Errorf("invalid IP address: %s", ip)
}
if parsedIP.To4() != nil {
iptablesCmdHandler = nsc.iptablesCmdHandlers[v1.IPv4Protocol]
tcpMSS -= 2*ipv4.HeaderLen + tcpHeaderMinLen
Expand Down Expand Up @@ -1669,6 +1679,9 @@ func (nsc *NetworkServicesController) cleanupMangleTableRule(ip string, protocol
var iptablesCmdHandler utils.IPTablesHandler
tcpMSS := nsc.mtu
parsedIP := net.ParseIP(ip)
if parsedIP == nil {
return fmt.Errorf("invalid IP address: %s", ip)
}
if parsedIP.To4() != nil {
iptablesCmdHandler = nsc.iptablesCmdHandlers[v1.IPv4Protocol]
tcpMSS -= 2*ipv4.HeaderLen + tcpHeaderMinLen
Expand Down
53 changes: 53 additions & 0 deletions pkg/controllers/proxy/network_services_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2088,3 +2088,56 @@ func TestBuildServicesInfoWithStrictValidation(t *testing.T) {
})
}
}

func TestShuffleDoesNotPanicOnEmptySlice(t *testing.T) {
// shuffle should handle empty and single-element slices safely
tests := []struct {
name string
input []endpointSliceInfo
}{
{
name: "empty slice",
input: []endpointSliceInfo{},
},
{
name: "single element",
input: []endpointSliceInfo{{ip: "10.0.0.1", port: 80}},
},
{
name: "multiple elements",
input: []endpointSliceInfo{
{ip: "10.0.0.1", port: 80},
{ip: "10.0.0.2", port: 80},
{ip: "10.0.0.3", port: 80},
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Should not panic
result := shuffle(tt.input)
assert.Equal(t, len(tt.input), len(result), "shuffle should preserve slice length")
})
}
}

func TestSetupMangleTableRuleRejectsInvalidIP(t *testing.T) {
// Verify that net.ParseIP returning nil is handled gracefully
// rather than causing a nil pointer dereference on .To4()
tests := []struct {
name string
ip string
}{
{name: "empty string", ip: ""},
{name: "garbage", ip: "not-an-ip"},
{name: "incomplete", ip: "192.168.1"},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
parsedIP := net.ParseIP(tt.ip)
assert.Nil(t, parsedIP, "net.ParseIP should return nil for invalid IP %q", tt.ip)
})
}
}
5 changes: 5 additions & 0 deletions pkg/controllers/proxy/nodeport_healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type serviceHealthCheck struct {
}

type nphcServicesInfo struct {
mu sync.RWMutex
serviceInfoMap serviceInfoMap
endpointsInfoMap endpointSliceInfoMap
}
Expand All @@ -36,8 +37,10 @@ type nphcHandler struct {
func (nphc *nodePortHealthCheckController) UpdateServicesInfo(serviceInfoMap serviceInfoMap,
endpointsInfoMap endpointSliceInfoMap) error {
klog.V(1).Info("Running UpdateServicesInfo for NodePort health check")
nphc.mu.Lock()
nphc.serviceInfoMap = serviceInfoMap
nphc.endpointsInfoMap = endpointsInfoMap
nphc.mu.Unlock()

newActiveServices := make(map[int]bool)

Expand Down Expand Up @@ -141,7 +144,9 @@ func (nphc *nodePortHealthCheckController) stopHealthCheck(nodePort int) error {
}

func (npHandler *nphcHandler) Handler(w http.ResponseWriter, r *http.Request) {
npHandler.nphc.mu.RLock()
eps := npHandler.nphc.endpointsInfoMap[npHandler.svcHC.serviceID]
npHandler.nphc.mu.RUnlock()
endpointsOnNode := hasActiveEndpoints(eps)

var numActiveEndpoints int8
Expand Down
46 changes: 46 additions & 0 deletions pkg/controllers/proxy/nodeport_healthcheck_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package proxy

import (
"sync"
"testing"
)

func TestNodePortHealthCheckConcurrentAccess(t *testing.T) {
// Verify that concurrent reads and writes to the healthcheck controller
// do not cause a data race (this test is meaningful with -race flag)
nphc := NewNodePortHealthCheck()

svcMap := serviceInfoMap{
"test-svc": &serviceInfo{
healthCheckNodePort: 0, // no actual listener needed
},
}
epMap := endpointSliceInfoMap{
"test-svc": {
{ip: "10.0.0.1", port: 80, isLocal: true},
},
}

var wg sync.WaitGroup
// Concurrent writes
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 100; i++ {
_ = nphc.UpdateServicesInfo(svcMap, epMap)
}
}()

// Concurrent reads (simulating what the HTTP handler does)
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 100; i++ {
nphc.mu.RLock()
_ = nphc.endpointsInfoMap["test-svc"]
nphc.mu.RUnlock()
}
}()

wg.Wait()
}
Loading