Skip to content

Commit 6746df7

Browse files
authored
Merge pull request kubernetes#127153 from Anurag252/master
Add a static NodePort allocator for e2e tests
2 parents 281bba8 + a9b6414 commit 6746df7

File tree

2 files changed

+157
-10
lines changed

2 files changed

+157
-10
lines changed

test/e2e/framework/service/jig.go

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,11 @@ import (
2020
"context"
2121
"errors"
2222
"fmt"
23+
"math/rand"
2324
"net"
2425
"strconv"
2526
"strings"
27+
"sync"
2628
"time"
2729

2830
"github.com/onsi/ginkgo/v2"
@@ -58,6 +60,33 @@ var NodePortRange = utilnet.PortRange{Base: 30000, Size: 2768}
5860
// It is copied from "k8s.io/kubernetes/pkg/registry/core/service/portallocator"
5961
var errAllocated = errors.New("provided port is already allocated")
6062

63+
// staticPortRange implements port allocation model described here
64+
// https://github.com/kubernetes/enhancements/tree/master/keps/sig-network/3668-reserved-service-nodeport-range
65+
type staticPortRange struct {
66+
sync.Mutex
67+
baseport int32
68+
length int32
69+
reservedPorts sets.Set[int32]
70+
}
71+
72+
func calculateRange(size int32) int32 {
73+
var minPort int32 = 16
74+
var step int32 = 32
75+
var maxPort int32 = 128
76+
return min(max(minPort, size/step), maxPort)
77+
}
78+
79+
var staticPortAllocator *staticPortRange
80+
81+
// Initialize only once per test
82+
func init() {
83+
staticPortAllocator = &staticPortRange{
84+
baseport: int32(NodePortRange.Base),
85+
length: calculateRange(int32(NodePortRange.Size)),
86+
reservedPorts: sets.New[int32](),
87+
}
88+
}
89+
6190
// TestJig is a test jig to help service testing.
6291
type TestJig struct {
6392
Client clientset.Interface
@@ -82,6 +111,73 @@ func NewTestJig(client clientset.Interface, namespace, name string) *TestJig {
82111
return j
83112
}
84113

114+
// reservePort reserves the port provided as input.
115+
// If an invalid port was provided or if the port is already reserved, it returns false
116+
func (s *staticPortRange) reservePort(port int32) bool {
117+
s.Lock()
118+
defer s.Unlock()
119+
if port < s.baseport || port > s.baseport+s.length || s.reservedPorts.Has(port) {
120+
return false
121+
}
122+
s.reservedPorts.Insert(port)
123+
return true
124+
}
125+
126+
// getUnusedPort returns a free port from the range and returns its number and nil value
127+
// the port is not allocated so the consumer should allocate it explicitly calling allocatePort()
128+
// if none is available then it returns -1 and error
129+
func (s *staticPortRange) getUnusedPort() (int32, error) {
130+
s.Lock()
131+
defer s.Unlock()
132+
// start in a random offset
133+
start := rand.Int31n(s.length)
134+
for i := int32(0); i < s.length; i++ {
135+
port := s.baseport + (start+i)%(s.length)
136+
if !s.reservedPorts.Has(port) {
137+
return port, nil
138+
}
139+
}
140+
return -1, fmt.Errorf("no free ports were found")
141+
}
142+
143+
// releasePort releases the port passed as an argument
144+
func (s *staticPortRange) releasePort(port int32) {
145+
s.Lock()
146+
defer s.Unlock()
147+
s.reservedPorts.Delete(port)
148+
}
149+
150+
// GetUnusedStaticNodePort returns a free port in static range and a nil value
151+
// If no port in static range is available it returns -1 and an error value
152+
// Note that it is not guaranteed that the returned port is actually available on the apiserver;
153+
// You must allocate a port, then attempt to create the service, then call
154+
// ReserveStaticNodePort.
155+
func GetUnusedStaticNodePort() (int32, error) {
156+
return staticPortAllocator.getUnusedPort()
157+
}
158+
159+
// ReserveStaticNodePort reserves the port provided as input. It is guaranteed
160+
// that no other test will receive this port from GetUnusedStaticNodePort until
161+
// after you call ReleaseStaticNodePort.
162+
//
163+
// port must have been previously allocated by GetUnusedStaticNodePort, and
164+
// then successfully used as a NodePort or HealthCheckNodePort when creating
165+
// a service. Trying to reserve a port that was not allocated by
166+
// GetUnusedStaticNodePort, or reserving it before creating the associated service
167+
// may cause other e2e tests to fail.
168+
//
169+
// If an invalid port was provided or if the port is already reserved, it returns false
170+
func ReserveStaticNodePort(port int32) bool {
171+
return staticPortAllocator.reservePort(port)
172+
}
173+
174+
// ReleaseStaticNodePort releases the specified port.
175+
// The corresponding service should have already been deleted, to ensure that the
176+
// port allocator doesn't try to reuse it before the apiserver considers it available.
177+
func ReleaseStaticNodePort(port int32) {
178+
staticPortAllocator.releasePort(port)
179+
}
180+
85181
// newServiceTemplate returns the default v1.Service template for this j, but
86182
// does not actually create the Service. The default Service has the same name
87183
// as the j and exposes the given port.

test/e2e/network/service.go

Lines changed: 61 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1682,10 +1682,37 @@ var _ = common.SIGDescribe("Services", func() {
16821682

16831683
service := t.BuildServiceSpec()
16841684
service.Spec.Type = v1.ServiceTypeNodePort
1685-
1685+
numberOfRetries := 5
16861686
ginkgo.By("creating service " + serviceName + " with type NodePort in namespace " + ns)
1687-
service, err := t.CreateService(service)
1688-
framework.ExpectNoError(err, "failed to create service: %s in namespace: %s", serviceName, ns)
1687+
var err error
1688+
for i := 0; i < numberOfRetries; i++ {
1689+
port, err := e2eservice.GetUnusedStaticNodePort()
1690+
framework.ExpectNoError(err, "Static node port allocator was not able to find a free nodeport.")
1691+
service.Spec.Ports[0].NodePort = port
1692+
service, err = t.CreateService(service)
1693+
// We will later delete this service and then recreate it with same nodeport. We need to ensure that
1694+
// another e2e test doesn't start listening on our old nodeport and conflicts re-creation of service
1695+
// hence we use ReserveStaticNodePort.
1696+
if err == nil {
1697+
nodePort := service.Spec.Ports[0].NodePort
1698+
ok := e2eservice.ReserveStaticNodePort(nodePort)
1699+
if !ok {
1700+
// We could not reserve the allocated port which means the port was either invalid or was reserved by another test.
1701+
// This indicates a problem in code and we have a log message to debug it.
1702+
framework.Failf("Static node port allocator was not able to reserve nodeport: %d", nodePort)
1703+
}
1704+
break
1705+
}
1706+
if apierrors.IsConflict(err) {
1707+
framework.Logf("node port %d is already allocated to other service, retrying ... : %v", port, err)
1708+
continue
1709+
}
1710+
framework.ExpectNoError(err, "failed to create service: %s in namespace: %s", serviceName, ns)
1711+
1712+
}
1713+
1714+
nodePort := service.Spec.Ports[0].NodePort
1715+
defer e2eservice.ReleaseStaticNodePort(nodePort)
16891716

16901717
if service.Spec.Type != v1.ServiceTypeNodePort {
16911718
framework.Failf("got unexpected Spec.Type for new service: %v", service)
@@ -1700,7 +1727,6 @@ var _ = common.SIGDescribe("Services", func() {
17001727
if !e2eservice.NodePortRange.Contains(int(port.NodePort)) {
17011728
framework.Failf("got unexpected (out-of-range) port for new service: %v", service)
17021729
}
1703-
nodePort := port.NodePort
17041730

17051731
ginkgo.By("deleting original service " + serviceName)
17061732
err = t.DeleteService(serviceName)
@@ -3931,10 +3957,37 @@ var _ = common.SIGDescribe("Services", func() {
39313957
}
39323958

39333959
ginkgo.By("creating the service")
3934-
svc, err := jig.CreateLoadBalancerServiceWaitForClusterIPOnly(func(svc *v1.Service) {
3935-
svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyLocal
3936-
})
3937-
framework.ExpectNoError(err, "creating the service")
3960+
var svc *v1.Service
3961+
numberOfRetries := 5
3962+
for i := 0; i < numberOfRetries; i++ {
3963+
port, err := e2eservice.GetUnusedStaticNodePort()
3964+
framework.ExpectNoError(err, "Static node port allocator was not able to find a free nodeport.")
3965+
svc, err = jig.CreateLoadBalancerServiceWaitForClusterIPOnly(func(svc *v1.Service) {
3966+
svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyLocal
3967+
svc.Spec.HealthCheckNodePort = port
3968+
})
3969+
// We will later convert this service to Cluster traffic policy, but we need to ensure that
3970+
// another e2e test doesn't start listening on our old HealthCheckNodePort when we
3971+
// do that, so we use ReserveStaticNodePort.
3972+
if err == nil {
3973+
staticHealthCheckPort := svc.Spec.HealthCheckNodePort
3974+
ok := e2eservice.ReserveStaticNodePort(staticHealthCheckPort)
3975+
if !ok {
3976+
// We could not reserve the allocated port which means the port was either invalid or was reserved by another test.
3977+
// This indicates a problem in code and we have a log message to debug it.
3978+
framework.Failf("Static node port allocator was not able to reserve healthcheck nodeport: %d", staticHealthCheckPort)
3979+
}
3980+
break
3981+
}
3982+
if apierrors.IsConflict(err) {
3983+
framework.Logf("node port %d is already allocated to other service, retrying ... : %v", port, err)
3984+
continue
3985+
}
3986+
framework.ExpectNoError(err, "failed to create service: %s in namespace: %s", serviceName, namespace)
3987+
3988+
}
3989+
3990+
defer e2eservice.ReleaseStaticNodePort(svc.Spec.HealthCheckNodePort)
39383991
nodePortStr := fmt.Sprintf("%d", svc.Spec.Ports[0].NodePort)
39393992
hcNodePortStr := fmt.Sprintf("%d", svc.Spec.HealthCheckNodePort)
39403993
framework.Logf("NodePort is %s, HealthCheckNodePort is %s", nodePortStr, hcNodePortStr)
@@ -4043,7 +4096,6 @@ var _ = common.SIGDescribe("Services", func() {
40434096
}
40444097
deadline = time.Now().Add(e2eservice.KubeProxyEndpointLagTimeout)
40454098

4046-
// FIXME: this is racy; we need to use a reserved HCNP here.
40474099
ginkgo.By("ensuring that the HealthCheckNodePort no longer responds on the endpoint node when ExternalTrafficPolicy is Cluster")
40484100
checkOneHealthCheck(endpointNodeIP, false, "", deadline)
40494101
ginkgo.By("ensuring that the HealthCheckNodePort no longer responds on the execpod node when ExternalTrafficPolicy is Cluster")
@@ -4062,7 +4114,6 @@ var _ = common.SIGDescribe("Services", func() {
40624114
_, err = jig.UpdateService(ctx, func(svc *v1.Service) {
40634115
svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyLocal
40644116
// Request the same healthCheckNodePort as before, to test the user-requested allocation path
4065-
// FIXME: we need to use a reserved HCNP here.
40664117
svc.Spec.HealthCheckNodePort = oldHealthCheckNodePort
40674118
})
40684119
framework.ExpectNoError(err, "updating ExternalTrafficPolicy and HealthCheckNodePort")

0 commit comments

Comments
 (0)