Skip to content

Commit b2cba08

Browse files
authored
Merge pull request kubernetes#93979 from dcbw/userspace-proxy-test-waitgroups
proxy/userspace: use waitgroups instead of sketchy atomic ops in testcases
2 parents b46612a + 0cb5e55 commit b2cba08

File tree

2 files changed

+254
-374
lines changed

2 files changed

+254
-374
lines changed

pkg/proxy/userspace/proxier.go

Lines changed: 29 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,29 @@ type ServiceInfo struct {
6868
stickyMaxAgeSeconds int
6969
// Deprecated, but required for back-compat (including e2e)
7070
externalIPs []string
71+
72+
// isStartedAtomic is set to non-zero when the service's socket begins
73+
// accepting requests. Used in testcases. Only access this with atomic ops.
74+
isStartedAtomic int32
75+
// isFinishedAtomic is set to non-zero when the service's socket shuts
76+
// down. Used in testcases. Only access this with atomic ops.
77+
isFinishedAtomic int32
78+
}
79+
80+
func (info *ServiceInfo) setStarted() {
81+
atomic.StoreInt32(&info.isStartedAtomic, 1)
82+
}
83+
84+
func (info *ServiceInfo) IsStarted() bool {
85+
return atomic.LoadInt32(&info.isStartedAtomic) != 0
86+
}
87+
88+
func (info *ServiceInfo) setFinished() {
89+
atomic.StoreInt32(&info.isFinishedAtomic, 1)
90+
}
91+
92+
func (info *ServiceInfo) IsFinished() bool {
93+
return atomic.LoadInt32(&info.isFinishedAtomic) != 0
7194
}
7295

7396
func (info *ServiceInfo) setAlive(b bool) {
@@ -124,7 +147,6 @@ type Proxier struct {
124147
udpIdleTimeout time.Duration
125148
portMapMutex sync.Mutex
126149
portMap map[portMapKey]*portMapValue
127-
numProxyLoops int32 // use atomic ops to access this; mostly for testing
128150
listenIP net.IP
129151
iptables iptables.Interface
130152
hostIP net.IP
@@ -427,14 +449,6 @@ func (proxier *Proxier) getServiceInfo(service proxy.ServicePortName) (*ServiceI
427449
return info, ok
428450
}
429451

430-
// addServiceOnPort lockes the proxy before calling addServiceOnPortInternal.
431-
// Used from testcases.
432-
func (proxier *Proxier) addServiceOnPort(service proxy.ServicePortName, protocol v1.Protocol, proxyPort int, timeout time.Duration) (*ServiceInfo, error) {
433-
proxier.mu.Lock()
434-
defer proxier.mu.Unlock()
435-
return proxier.addServiceOnPortInternal(service, protocol, proxyPort, timeout)
436-
}
437-
438452
// addServiceOnPortInternal starts listening for a new service, returning the ServiceInfo.
439453
// Pass proxyPort=0 to allocate a random port. The timeout only applies to UDP
440454
// connections, for now.
@@ -465,12 +479,10 @@ func (proxier *Proxier) addServiceOnPortInternal(service proxy.ServicePortName,
465479
proxier.serviceMap[service] = si
466480

467481
klog.V(2).Infof("Proxying for service %q on %s port %d", service, protocol, portNum)
468-
go func(service proxy.ServicePortName, proxier *Proxier) {
482+
go func() {
469483
defer runtime.HandleCrash()
470-
atomic.AddInt32(&proxier.numProxyLoops, 1)
471484
sock.ProxyLoop(service, si, proxier.loadBalancer)
472-
atomic.AddInt32(&proxier.numProxyLoops, -1)
473-
}(service, proxier)
485+
}()
474486

475487
return si, nil
476488
}
@@ -509,6 +521,7 @@ func (proxier *Proxier) mergeService(service *v1.Service) sets.String {
509521
if err := proxier.cleanupPortalAndProxy(serviceName, info); err != nil {
510522
klog.Error(err)
511523
}
524+
info.setFinished()
512525
}
513526
proxyPort, err := proxier.proxyPorts.AllocateNext()
514527
if err != nil {
@@ -541,6 +554,8 @@ func (proxier *Proxier) mergeService(service *v1.Service) sets.String {
541554
klog.Errorf("Failed to open portal for %q: %v", serviceName, err)
542555
}
543556
proxier.loadBalancer.NewService(serviceName, info.sessionAffinityType, info.stickyMaxAgeSeconds)
557+
558+
info.setStarted()
544559
}
545560

546561
return existingPorts
@@ -578,6 +593,7 @@ func (proxier *Proxier) unmergeService(service *v1.Service, existingPorts sets.S
578593
klog.Error(err)
579594
}
580595
proxier.loadBalancer.DeleteService(serviceName)
596+
info.setFinished()
581597
}
582598
for _, svcIP := range staleUDPServices.UnsortedList() {
583599
if err := conntrack.ClearEntriesForIP(proxier.exec, svcIP, v1.ProtocolUDP); err != nil {

0 commit comments

Comments
 (0)