diff --git a/pkg/controllers/proxy/network_services_controller.go b/pkg/controllers/proxy/network_services_controller.go index f4428ba4b..d79931afe 100644 --- a/pkg/controllers/proxy/network_services_controller.go +++ b/pkg/controllers/proxy/network_services_controller.go @@ -383,15 +383,21 @@ func (nsc *NetworkServicesController) Run(healthChan chan<- *healthcheck.Control // and we don't want to duplicate the effort, so this is a slimmer version of doSync() klog.V(1).Info("Performing requested sync of ipvs services") nsc.mu.Lock() + var syncErrors bool err = nsc.syncIpvsServices(nsc.getServiceMap(), nsc.endpointsMap) if err != nil { klog.Errorf("error during ipvs sync in network service controller. Error: %v", err) + syncErrors = true } err = nsc.syncHairpinIptablesRules() if err != nil { klog.Errorf("error syncing hairpin iptables rules: %v", err) + syncErrors = true } nsc.mu.Unlock() + if syncErrors { + err = fmt.Errorf("one or more errors during ipvs sync") + } } if err == nil { healthcheck.SendHeartBeat(healthChan, healthcheck.NetworkServicesController) @@ -482,9 +488,9 @@ func (nsc *NetworkServicesController) setupIpvsFirewall() error { return fmt.Errorf("failed to run iptables command: %s", err.Error()) } - // config.IpvsPermitAll: true then create INPUT/KUBE-ROUTER-SERVICE Chain creation else return + // config.IpvsPermitAll: true then create INPUT/KUBE-ROUTER-SERVICE Chain creation else skip if !nsc.ipvsPermitAll { - return nil + continue } var comment string @@ -981,10 +987,11 @@ func parseSchedFlags(value string) schedFlags { func shuffle(endPoints []endpointSliceInfo) []endpointSliceInfo { for index1 := range endPoints { randBitInt, err := rand.Int(rand.Reader, big.NewInt(int64(index1+1))) - index2 := randBitInt.Int64() if err != nil { klog.Warningf("unable to get a random int: %v", err) + continue } + index2 := randBitInt.Int64() endPoints[index1], endPoints[index2] = endPoints[index2], endPoints[index1] } return endPoints @@ -1561,6 +1568,9 @@ func (nsc *NetworkServicesController) setupMangleTableRule(ip string, protocol s var iptablesCmdHandler utils.IPTablesHandler tcpMSS := nsc.mtu parsedIP := net.ParseIP(ip) + if parsedIP == nil { + return fmt.Errorf("invalid IP address: %s", ip) + } if parsedIP.To4() != nil { iptablesCmdHandler = nsc.iptablesCmdHandlers[v1.IPv4Protocol] tcpMSS -= 2*ipv4.HeaderLen + tcpHeaderMinLen @@ -1669,6 +1679,9 @@ func (nsc *NetworkServicesController) cleanupMangleTableRule(ip string, protocol var iptablesCmdHandler utils.IPTablesHandler tcpMSS := nsc.mtu parsedIP := net.ParseIP(ip) + if parsedIP == nil { + return fmt.Errorf("invalid IP address: %s", ip) + } if parsedIP.To4() != nil { iptablesCmdHandler = nsc.iptablesCmdHandlers[v1.IPv4Protocol] tcpMSS -= 2*ipv4.HeaderLen + tcpHeaderMinLen diff --git a/pkg/controllers/proxy/network_services_controller_test.go b/pkg/controllers/proxy/network_services_controller_test.go index 6bc5dc737..af1df1e2c 100644 --- a/pkg/controllers/proxy/network_services_controller_test.go +++ b/pkg/controllers/proxy/network_services_controller_test.go @@ -2088,3 +2088,56 @@ func TestBuildServicesInfoWithStrictValidation(t *testing.T) { }) } } + +func TestShuffleDoesNotPanicOnEmptySlice(t *testing.T) { + // shuffle should handle empty and single-element slices safely + tests := []struct { + name string + input []endpointSliceInfo + }{ + { + name: "empty slice", + input: []endpointSliceInfo{}, + }, + { + name: "single element", + input: []endpointSliceInfo{{ip: "10.0.0.1", port: 80}}, + }, + { + name: "multiple elements", + input: []endpointSliceInfo{ + {ip: "10.0.0.1", port: 80}, + {ip: "10.0.0.2", port: 80}, + {ip: "10.0.0.3", port: 80}, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Should not panic + result := shuffle(tt.input) + assert.Equal(t, len(tt.input), len(result), "shuffle should preserve slice length") + }) + } +} + +func TestSetupMangleTableRuleRejectsInvalidIP(t *testing.T) { + // Verify that net.ParseIP returning nil is handled gracefully + // rather than causing a nil pointer dereference on .To4() + tests := []struct { + name string + ip string + }{ + {name: "empty string", ip: ""}, + {name: "garbage", ip: "not-an-ip"}, + {name: "incomplete", ip: "192.168.1"}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + parsedIP := net.ParseIP(tt.ip) + assert.Nil(t, parsedIP, "net.ParseIP should return nil for invalid IP %q", tt.ip) + }) + } +} diff --git a/pkg/controllers/proxy/nodeport_healthcheck.go b/pkg/controllers/proxy/nodeport_healthcheck.go index e340fe289..0c0817ce1 100644 --- a/pkg/controllers/proxy/nodeport_healthcheck.go +++ b/pkg/controllers/proxy/nodeport_healthcheck.go @@ -24,6 +24,7 @@ type serviceHealthCheck struct { } type nphcServicesInfo struct { + mu sync.RWMutex serviceInfoMap serviceInfoMap endpointsInfoMap endpointSliceInfoMap } @@ -36,8 +37,10 @@ type nphcHandler struct { func (nphc *nodePortHealthCheckController) UpdateServicesInfo(serviceInfoMap serviceInfoMap, endpointsInfoMap endpointSliceInfoMap) error { klog.V(1).Info("Running UpdateServicesInfo for NodePort health check") + nphc.mu.Lock() nphc.serviceInfoMap = serviceInfoMap nphc.endpointsInfoMap = endpointsInfoMap + nphc.mu.Unlock() newActiveServices := make(map[int]bool) @@ -141,7 +144,9 @@ func (nphc *nodePortHealthCheckController) stopHealthCheck(nodePort int) error { } func (npHandler *nphcHandler) Handler(w http.ResponseWriter, r *http.Request) { + npHandler.nphc.mu.RLock() eps := npHandler.nphc.endpointsInfoMap[npHandler.svcHC.serviceID] + npHandler.nphc.mu.RUnlock() endpointsOnNode := hasActiveEndpoints(eps) var numActiveEndpoints int8 diff --git a/pkg/controllers/proxy/nodeport_healthcheck_test.go b/pkg/controllers/proxy/nodeport_healthcheck_test.go new file mode 100644 index 000000000..8e0e1251f --- /dev/null +++ b/pkg/controllers/proxy/nodeport_healthcheck_test.go @@ -0,0 +1,46 @@ +package proxy + +import ( + "sync" + "testing" +) + +func TestNodePortHealthCheckConcurrentAccess(t *testing.T) { + // Verify that concurrent reads and writes to the healthcheck controller + // do not cause a data race (this test is meaningful with -race flag) + nphc := NewNodePortHealthCheck() + + svcMap := serviceInfoMap{ + "test-svc": &serviceInfo{ + healthCheckNodePort: 0, // no actual listener needed + }, + } + epMap := endpointSliceInfoMap{ + "test-svc": { + {ip: "10.0.0.1", port: 80, isLocal: true}, + }, + } + + var wg sync.WaitGroup + // Concurrent writes + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < 100; i++ { + _ = nphc.UpdateServicesInfo(svcMap, epMap) + } + }() + + // Concurrent reads (simulating what the HTTP handler does) + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < 100; i++ { + nphc.mu.RLock() + _ = nphc.endpointsInfoMap["test-svc"] + nphc.mu.RUnlock() + } + }() + + wg.Wait() +}