Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 102 additions & 0 deletions pkg/controllers/proxy/hardening_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package proxy

import (
"net"
"sync"
"testing"

"github.com/stretchr/testify/assert"
)

func TestShuffleDoesNotPanicOnEmptySlice(t *testing.T) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please move this into network_services_controller_test.go?

// shuffle should handle empty and single-element slices safely
tests := []struct {
name string
input []endpointSliceInfo
}{
{
name: "empty slice",
input: []endpointSliceInfo{},
},
{
name: "single element",
input: []endpointSliceInfo{{ip: "10.0.0.1", port: 80}},
},
{
name: "multiple elements",
input: []endpointSliceInfo{
{ip: "10.0.0.1", port: 80},
{ip: "10.0.0.2", port: 80},
{ip: "10.0.0.3", port: 80},
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Should not panic
result := shuffle(tt.input)
assert.Equal(t, len(tt.input), len(result), "shuffle should preserve slice length")
})
}
}

func TestNodePortHealthCheckConcurrentAccess(t *testing.T) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please add a nodeport_healthcheck_test.go file and add this function there?

// Verify that concurrent reads and writes to the healthcheck controller
// do not cause a data race (this test is meaningful with -race flag)
nphc := NewNodePortHealthCheck()

svcMap := serviceInfoMap{
"test-svc": &serviceInfo{
healthCheckNodePort: 0, // no actual listener needed
},
}
epMap := endpointSliceInfoMap{
"test-svc": {
{ip: "10.0.0.1", port: 80, isLocal: true},
},
}

var wg sync.WaitGroup
// Concurrent writes
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 100; i++ {
_ = nphc.UpdateServicesInfo(svcMap, epMap)
}
}()

// Concurrent reads (simulating what the HTTP handler does)
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 100; i++ {
nphc.mu.RLock()
_ = nphc.endpointsInfoMap["test-svc"]
nphc.mu.RUnlock()
}
}()

wg.Wait()
}

func TestSetupMangleTableRuleRejectsInvalidIP(t *testing.T) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please move this into network_services_controller_test.go?

// Verify that net.ParseIP returning nil is handled gracefully
// rather than causing a nil pointer dereference on .To4()
tests := []struct {
name string
ip string
}{
{name: "empty string", ip: ""},
{name: "garbage", ip: "not-an-ip"},
{name: "incomplete", ip: "192.168.1"},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
parsedIP := net.ParseIP(tt.ip)
assert.Nil(t, parsedIP, "net.ParseIP should return nil for invalid IP %q", tt.ip)
})
}
}
19 changes: 16 additions & 3 deletions pkg/controllers/proxy/network_services_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,15 +383,21 @@ func (nsc *NetworkServicesController) Run(healthChan chan<- *healthcheck.Control
// and we don't want to duplicate the effort, so this is a slimmer version of doSync()
klog.V(1).Info("Performing requested sync of ipvs services")
nsc.mu.Lock()
var syncErrors bool
err = nsc.syncIpvsServices(nsc.getServiceMap(), nsc.endpointsMap)
if err != nil {
klog.Errorf("error during ipvs sync in network service controller. Error: %v", err)
syncErrors = true
}
err = nsc.syncHairpinIptablesRules()
if err != nil {
klog.Errorf("error syncing hairpin iptables rules: %v", err)
syncErrors = true
}
nsc.mu.Unlock()
if syncErrors {
err = fmt.Errorf("one or more errors during ipvs sync")
}
}
if err == nil {
healthcheck.SendHeartBeat(healthChan, healthcheck.NetworkServicesController)
Expand Down Expand Up @@ -482,9 +488,9 @@ func (nsc *NetworkServicesController) setupIpvsFirewall() error {
return fmt.Errorf("failed to run iptables command: %s", err.Error())
}

// config.IpvsPermitAll: true then create INPUT/KUBE-ROUTER-SERVICE Chain creation else return
// config.IpvsPermitAll: true then create INPUT/KUBE-ROUTER-SERVICE Chain creation else skip
if !nsc.ipvsPermitAll {
return nil
continue
}

var comment string
Expand Down Expand Up @@ -981,10 +987,11 @@ func parseSchedFlags(value string) schedFlags {
func shuffle(endPoints []endpointSliceInfo) []endpointSliceInfo {
for index1 := range endPoints {
randBitInt, err := rand.Int(rand.Reader, big.NewInt(int64(index1+1)))
index2 := randBitInt.Int64()
if err != nil {
klog.Warningf("unable to get a random int: %v", err)
continue
}
index2 := randBitInt.Int64()
endPoints[index1], endPoints[index2] = endPoints[index2], endPoints[index1]
}
return endPoints
Expand Down Expand Up @@ -1561,6 +1568,9 @@ func (nsc *NetworkServicesController) setupMangleTableRule(ip string, protocol s
var iptablesCmdHandler utils.IPTablesHandler
tcpMSS := nsc.mtu
parsedIP := net.ParseIP(ip)
if parsedIP == nil {
return fmt.Errorf("invalid IP address: %s", ip)
}
if parsedIP.To4() != nil {
iptablesCmdHandler = nsc.iptablesCmdHandlers[v1.IPv4Protocol]
tcpMSS -= 2*ipv4.HeaderLen + tcpHeaderMinLen
Expand Down Expand Up @@ -1669,6 +1679,9 @@ func (nsc *NetworkServicesController) cleanupMangleTableRule(ip string, protocol
var iptablesCmdHandler utils.IPTablesHandler
tcpMSS := nsc.mtu
parsedIP := net.ParseIP(ip)
if parsedIP == nil {
return fmt.Errorf("invalid IP address: %s", ip)
}
if parsedIP.To4() != nil {
iptablesCmdHandler = nsc.iptablesCmdHandlers[v1.IPv4Protocol]
tcpMSS -= 2*ipv4.HeaderLen + tcpHeaderMinLen
Expand Down
5 changes: 5 additions & 0 deletions pkg/controllers/proxy/nodeport_healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type serviceHealthCheck struct {
}

type nphcServicesInfo struct {
mu sync.RWMutex
serviceInfoMap serviceInfoMap
endpointsInfoMap endpointSliceInfoMap
}
Expand All @@ -36,8 +37,10 @@ type nphcHandler struct {
func (nphc *nodePortHealthCheckController) UpdateServicesInfo(serviceInfoMap serviceInfoMap,
endpointsInfoMap endpointSliceInfoMap) error {
klog.V(1).Info("Running UpdateServicesInfo for NodePort health check")
nphc.mu.Lock()
nphc.serviceInfoMap = serviceInfoMap
nphc.endpointsInfoMap = endpointsInfoMap
nphc.mu.Unlock()

newActiveServices := make(map[int]bool)

Expand Down Expand Up @@ -141,7 +144,9 @@ func (nphc *nodePortHealthCheckController) stopHealthCheck(nodePort int) error {
}

func (npHandler *nphcHandler) Handler(w http.ResponseWriter, r *http.Request) {
npHandler.nphc.mu.RLock()
eps := npHandler.nphc.endpointsInfoMap[npHandler.svcHC.serviceID]
npHandler.nphc.mu.RUnlock()
endpointsOnNode := hasActiveEndpoints(eps)

var numActiveEndpoints int8
Expand Down
Loading