Skip to content

Commit 47e78f3

Browse files
authored
Merge pull request kubernetes#81378 from tedyu/ports-2-endpt
buildPortsToEndpointsMap should use flattened value type
2 parents 424d951 + 2f67134 commit 47e78f3

File tree

7 files changed

+107
-172
lines changed

7 files changed

+107
-172
lines changed

pkg/proxy/userspace/roundrobin.go

Lines changed: 7 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,14 @@ import (
2121
"fmt"
2222
"net"
2323
"reflect"
24-
"strconv"
2524
"sync"
2625
"time"
2726

2827
"k8s.io/api/core/v1"
2928
"k8s.io/apimachinery/pkg/types"
3029
"k8s.io/klog"
3130
"k8s.io/kubernetes/pkg/proxy"
31+
"k8s.io/kubernetes/pkg/proxy/util"
3232
"k8s.io/kubernetes/pkg/util/slice"
3333
)
3434

@@ -188,28 +188,6 @@ func (lb *LoadBalancerRR) NextEndpoint(svcPort proxy.ServicePortName, srcAddr ne
188188
return endpoint, nil
189189
}
190190

191-
type hostPortPair struct {
192-
host string
193-
port int
194-
}
195-
196-
func isValidEndpoint(hpp *hostPortPair) bool {
197-
return hpp.host != "" && hpp.port > 0
198-
}
199-
200-
func flattenValidEndpoints(endpoints []hostPortPair) []string {
201-
// Convert Endpoint objects into strings for easier use later. Ignore
202-
// the protocol field - we'll get that from the Service objects.
203-
var result []string
204-
for i := range endpoints {
205-
hpp := &endpoints[i]
206-
if isValidEndpoint(hpp) {
207-
result = append(result, net.JoinHostPort(hpp.host, strconv.Itoa(hpp.port)))
208-
}
209-
}
210-
return result
211-
}
212-
213191
// Remove any session affinity records associated to a particular endpoint (for example when a pod goes down).
214192
func removeSessionAffinityByEndpoint(state *balancerState, svcPort proxy.ServicePortName, endpoint string) {
215193
for _, affinity := range state.affinity.affinityMap {
@@ -243,33 +221,15 @@ func (lb *LoadBalancerRR) updateAffinityMap(svcPort proxy.ServicePortName, newEn
243221
}
244222
}
245223

246-
// buildPortsToEndpointsMap builds a map of portname -> all ip:ports for that
247-
// portname. Expode Endpoints.Subsets[*] into this structure.
248-
func buildPortsToEndpointsMap(endpoints *v1.Endpoints) map[string][]hostPortPair {
249-
portsToEndpoints := map[string][]hostPortPair{}
250-
for i := range endpoints.Subsets {
251-
ss := &endpoints.Subsets[i]
252-
for i := range ss.Ports {
253-
port := &ss.Ports[i]
254-
for i := range ss.Addresses {
255-
addr := &ss.Addresses[i]
256-
portsToEndpoints[port.Name] = append(portsToEndpoints[port.Name], hostPortPair{addr.IP, int(port.Port)})
257-
// Ignore the protocol field - we'll get that from the Service objects.
258-
}
259-
}
260-
}
261-
return portsToEndpoints
262-
}
263-
264224
func (lb *LoadBalancerRR) OnEndpointsAdd(endpoints *v1.Endpoints) {
265-
portsToEndpoints := buildPortsToEndpointsMap(endpoints)
225+
portsToEndpoints := util.BuildPortsToEndpointsMap(endpoints)
266226

267227
lb.lock.Lock()
268228
defer lb.lock.Unlock()
269229

270230
for portname := range portsToEndpoints {
271231
svcPort := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}, Port: portname}
272-
newEndpoints := flattenValidEndpoints(portsToEndpoints[portname])
232+
newEndpoints := portsToEndpoints[portname]
273233
state, exists := lb.services[svcPort]
274234

275235
if !exists || state == nil || len(newEndpoints) > 0 {
@@ -289,16 +249,16 @@ func (lb *LoadBalancerRR) OnEndpointsAdd(endpoints *v1.Endpoints) {
289249
}
290250

291251
func (lb *LoadBalancerRR) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints) {
292-
portsToEndpoints := buildPortsToEndpointsMap(endpoints)
293-
oldPortsToEndpoints := buildPortsToEndpointsMap(oldEndpoints)
252+
portsToEndpoints := util.BuildPortsToEndpointsMap(endpoints)
253+
oldPortsToEndpoints := util.BuildPortsToEndpointsMap(oldEndpoints)
294254
registeredEndpoints := make(map[proxy.ServicePortName]bool)
295255

296256
lb.lock.Lock()
297257
defer lb.lock.Unlock()
298258

299259
for portname := range portsToEndpoints {
300260
svcPort := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}, Port: portname}
301-
newEndpoints := flattenValidEndpoints(portsToEndpoints[portname])
261+
newEndpoints := portsToEndpoints[portname]
302262
state, exists := lb.services[svcPort]
303263

304264
curEndpoints := []string{}
@@ -344,7 +304,7 @@ func (lb *LoadBalancerRR) resetService(svcPort proxy.ServicePortName) {
344304
}
345305

346306
func (lb *LoadBalancerRR) OnEndpointsDelete(endpoints *v1.Endpoints) {
347-
portsToEndpoints := buildPortsToEndpointsMap(endpoints)
307+
portsToEndpoints := util.BuildPortsToEndpointsMap(endpoints)
348308

349309
lb.lock.Lock()
350310
defer lb.lock.Unlock()

pkg/proxy/userspace/roundrobin_test.go

Lines changed: 0 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -26,45 +26,6 @@ import (
2626
"k8s.io/kubernetes/pkg/proxy"
2727
)
2828

29-
func TestValidateWorks(t *testing.T) {
30-
if isValidEndpoint(&hostPortPair{}) {
31-
t.Errorf("Didn't fail for empty set")
32-
}
33-
if isValidEndpoint(&hostPortPair{host: "foobar"}) {
34-
t.Errorf("Didn't fail with invalid port")
35-
}
36-
if isValidEndpoint(&hostPortPair{host: "foobar", port: -1}) {
37-
t.Errorf("Didn't fail with a negative port")
38-
}
39-
if !isValidEndpoint(&hostPortPair{host: "foobar", port: 8080}) {
40-
t.Errorf("Failed a valid config.")
41-
}
42-
}
43-
44-
func TestFilterWorks(t *testing.T) {
45-
endpoints := []hostPortPair{
46-
{host: "foobar", port: 1},
47-
{host: "foobar", port: 2},
48-
{host: "foobar", port: -1},
49-
{host: "foobar", port: 3},
50-
{host: "foobar", port: -2},
51-
}
52-
filtered := flattenValidEndpoints(endpoints)
53-
54-
if len(filtered) != 3 {
55-
t.Errorf("Failed to filter to the correct size")
56-
}
57-
if filtered[0] != "foobar:1" {
58-
t.Errorf("Index zero is not foobar:1")
59-
}
60-
if filtered[1] != "foobar:2" {
61-
t.Errorf("Index one is not foobar:2")
62-
}
63-
if filtered[2] != "foobar:3" {
64-
t.Errorf("Index two is not foobar:3")
65-
}
66-
}
67-
6829
func TestLoadBalanceFailsWithNoEndpoints(t *testing.T) {
6930
loadBalancer := NewLoadBalancerRR()
7031
service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "does-not-exist"}

pkg/proxy/util/utils.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"errors"
2222
"fmt"
2323
"net"
24+
"strconv"
2425

2526
"k8s.io/api/core/v1"
2627
"k8s.io/apimachinery/pkg/types"
@@ -48,6 +49,30 @@ var (
4849
ErrNoAddresses = errors.New("No addresses for hostname")
4950
)
5051

52+
// isValidEndpoint checks that the given host / port pair are valid endpoint
53+
func isValidEndpoint(host string, port int) bool {
54+
return host != "" && port > 0
55+
}
56+
57+
// BuildPortsToEndpointsMap builds a map of portname -> all ip:ports for that
58+
// portname. Explode Endpoints.Subsets[*] into this structure.
59+
func BuildPortsToEndpointsMap(endpoints *v1.Endpoints) map[string][]string {
60+
portsToEndpoints := map[string][]string{}
61+
for i := range endpoints.Subsets {
62+
ss := &endpoints.Subsets[i]
63+
for i := range ss.Ports {
64+
port := &ss.Ports[i]
65+
for i := range ss.Addresses {
66+
addr := &ss.Addresses[i]
67+
if isValidEndpoint(addr.IP, int(port.Port)) {
68+
portsToEndpoints[port.Name] = append(portsToEndpoints[port.Name], net.JoinHostPort(addr.IP, strconv.Itoa(int(port.Port))))
69+
}
70+
}
71+
}
72+
}
73+
return portsToEndpoints
74+
}
75+
5176
// IsZeroCIDR checks whether the input CIDR string is either
5277
// the IPv4 or IPv6 zero CIDR
5378
func IsZeroCIDR(cidr string) bool {

pkg/proxy/util/utils_test.go

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package util
1919
import (
2020
"context"
2121
"net"
22+
"reflect"
2223
"testing"
2324

2425
"k8s.io/api/core/v1"
@@ -28,6 +29,72 @@ import (
2829
fake "k8s.io/kubernetes/pkg/proxy/util/testing"
2930
)
3031

32+
func TestValidateWorks(t *testing.T) {
33+
if isValidEndpoint("", 0) {
34+
t.Errorf("Didn't fail for empty set")
35+
}
36+
if isValidEndpoint("foobar", 0) {
37+
t.Errorf("Didn't fail with invalid port")
38+
}
39+
if isValidEndpoint("foobar", -1) {
40+
t.Errorf("Didn't fail with a negative port")
41+
}
42+
if !isValidEndpoint("foobar", 8080) {
43+
t.Errorf("Failed a valid config.")
44+
}
45+
}
46+
47+
func TestBuildPortsToEndpointsMap(t *testing.T) {
48+
endpoints := &v1.Endpoints{
49+
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "testnamespace"},
50+
Subsets: []v1.EndpointSubset{
51+
{
52+
Addresses: []v1.EndpointAddress{
53+
{IP: "10.0.0.1"},
54+
{IP: "10.0.0.2"},
55+
},
56+
Ports: []v1.EndpointPort{
57+
{Name: "http", Port: 80},
58+
{Name: "https", Port: 443},
59+
},
60+
},
61+
{
62+
Addresses: []v1.EndpointAddress{
63+
{IP: "10.0.0.1"},
64+
{IP: "10.0.0.3"},
65+
},
66+
Ports: []v1.EndpointPort{
67+
{Name: "http", Port: 8080},
68+
{Name: "dns", Port: 53},
69+
},
70+
},
71+
{
72+
Addresses: []v1.EndpointAddress{},
73+
Ports: []v1.EndpointPort{
74+
{Name: "http", Port: 8888},
75+
{Name: "ssh", Port: 22},
76+
},
77+
},
78+
{
79+
Addresses: []v1.EndpointAddress{
80+
{IP: "10.0.0.1"},
81+
},
82+
Ports: []v1.EndpointPort{},
83+
},
84+
},
85+
}
86+
expectedPortsToEndpoints := map[string][]string{
87+
"http": {"10.0.0.1:80", "10.0.0.2:80", "10.0.0.1:8080", "10.0.0.3:8080"},
88+
"https": {"10.0.0.1:443", "10.0.0.2:443"},
89+
"dns": {"10.0.0.1:53", "10.0.0.3:53"},
90+
}
91+
92+
portsToEndpoints := BuildPortsToEndpointsMap(endpoints)
93+
if !reflect.DeepEqual(expectedPortsToEndpoints, portsToEndpoints) {
94+
t.Errorf("expected ports to endpoints not seen")
95+
}
96+
}
97+
3198
func TestIsProxyableIP(t *testing.T) {
3299
testCases := []struct {
33100
ip string

pkg/proxy/winuserspace/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ go_library(
2020
"//pkg/apis/core/v1/helper:go_default_library",
2121
"//pkg/proxy:go_default_library",
2222
"//pkg/proxy/config:go_default_library",
23+
"//pkg/proxy/util:go_default_library",
2324
"//pkg/util/ipconfig:go_default_library",
2425
"//pkg/util/netsh:go_default_library",
2526
"//pkg/util/slice:go_default_library",

pkg/proxy/winuserspace/roundrobin.go

Lines changed: 7 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,14 @@ import (
2121
"fmt"
2222
"net"
2323
"reflect"
24-
"strconv"
2524
"sync"
2625
"time"
2726

2827
"k8s.io/api/core/v1"
2928
"k8s.io/apimachinery/pkg/types"
3029
"k8s.io/klog"
3130
"k8s.io/kubernetes/pkg/proxy"
31+
"k8s.io/kubernetes/pkg/proxy/util"
3232
"k8s.io/kubernetes/pkg/util/slice"
3333
)
3434

@@ -178,28 +178,6 @@ func (lb *LoadBalancerRR) NextEndpoint(svcPort proxy.ServicePortName, srcAddr ne
178178
return endpoint, nil
179179
}
180180

181-
type hostPortPair struct {
182-
host string
183-
port int
184-
}
185-
186-
func isValidEndpoint(hpp *hostPortPair) bool {
187-
return hpp.host != "" && hpp.port > 0
188-
}
189-
190-
func flattenValidEndpoints(endpoints []hostPortPair) []string {
191-
// Convert Endpoint objects into strings for easier use later. Ignore
192-
// the protocol field - we'll get that from the Service objects.
193-
var result []string
194-
for i := range endpoints {
195-
hpp := &endpoints[i]
196-
if isValidEndpoint(hpp) {
197-
result = append(result, net.JoinHostPort(hpp.host, strconv.Itoa(hpp.port)))
198-
}
199-
}
200-
return result
201-
}
202-
203181
// Remove any session affinity records associated to a particular endpoint (for example when a pod goes down).
204182
func removeSessionAffinityByEndpoint(state *balancerState, svcPort proxy.ServicePortName, endpoint string) {
205183
for _, affinity := range state.affinity.affinityMap {
@@ -233,33 +211,15 @@ func (lb *LoadBalancerRR) updateAffinityMap(svcPort proxy.ServicePortName, newEn
233211
}
234212
}
235213

236-
// buildPortsToEndpointsMap builds a map of portname -> all ip:ports for that
237-
// portname. Explode Endpoints.Subsets[*] into this structure.
238-
func buildPortsToEndpointsMap(endpoints *v1.Endpoints) map[string][]hostPortPair {
239-
portsToEndpoints := map[string][]hostPortPair{}
240-
for i := range endpoints.Subsets {
241-
ss := &endpoints.Subsets[i]
242-
for i := range ss.Ports {
243-
port := &ss.Ports[i]
244-
for i := range ss.Addresses {
245-
addr := &ss.Addresses[i]
246-
portsToEndpoints[port.Name] = append(portsToEndpoints[port.Name], hostPortPair{addr.IP, int(port.Port)})
247-
// Ignore the protocol field - we'll get that from the Service objects.
248-
}
249-
}
250-
}
251-
return portsToEndpoints
252-
}
253-
254214
func (lb *LoadBalancerRR) OnEndpointsAdd(endpoints *v1.Endpoints) {
255-
portsToEndpoints := buildPortsToEndpointsMap(endpoints)
215+
portsToEndpoints := util.BuildPortsToEndpointsMap(endpoints)
256216

257217
lb.lock.Lock()
258218
defer lb.lock.Unlock()
259219

260220
for portname := range portsToEndpoints {
261221
svcPort := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}, Port: portname}
262-
newEndpoints := flattenValidEndpoints(portsToEndpoints[portname])
222+
newEndpoints := portsToEndpoints[portname]
263223
state, exists := lb.services[svcPort]
264224

265225
if !exists || state == nil || len(newEndpoints) > 0 {
@@ -279,16 +239,16 @@ func (lb *LoadBalancerRR) OnEndpointsAdd(endpoints *v1.Endpoints) {
279239
}
280240

281241
func (lb *LoadBalancerRR) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints) {
282-
portsToEndpoints := buildPortsToEndpointsMap(endpoints)
283-
oldPortsToEndpoints := buildPortsToEndpointsMap(oldEndpoints)
242+
portsToEndpoints := util.BuildPortsToEndpointsMap(endpoints)
243+
oldPortsToEndpoints := util.BuildPortsToEndpointsMap(oldEndpoints)
284244
registeredEndpoints := make(map[proxy.ServicePortName]bool)
285245

286246
lb.lock.Lock()
287247
defer lb.lock.Unlock()
288248

289249
for portname := range portsToEndpoints {
290250
svcPort := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}, Port: portname}
291-
newEndpoints := flattenValidEndpoints(portsToEndpoints[portname])
251+
newEndpoints := portsToEndpoints[portname]
292252
state, exists := lb.services[svcPort]
293253

294254
curEndpoints := []string{}
@@ -326,7 +286,7 @@ func (lb *LoadBalancerRR) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoint
326286
}
327287

328288
func (lb *LoadBalancerRR) OnEndpointsDelete(endpoints *v1.Endpoints) {
329-
portsToEndpoints := buildPortsToEndpointsMap(endpoints)
289+
portsToEndpoints := util.BuildPortsToEndpointsMap(endpoints)
330290

331291
lb.lock.Lock()
332292
defer lb.lock.Unlock()

0 commit comments

Comments
 (0)