@@ -21,14 +21,14 @@ import (
21
21
"fmt"
22
22
"net"
23
23
"reflect"
24
- "strconv"
25
24
"sync"
26
25
"time"
27
26
28
27
"k8s.io/api/core/v1"
29
28
"k8s.io/apimachinery/pkg/types"
30
29
"k8s.io/klog"
31
30
"k8s.io/kubernetes/pkg/proxy"
31
+ "k8s.io/kubernetes/pkg/proxy/util"
32
32
"k8s.io/kubernetes/pkg/util/slice"
33
33
)
34
34
@@ -178,28 +178,6 @@ func (lb *LoadBalancerRR) NextEndpoint(svcPort proxy.ServicePortName, srcAddr ne
178
178
return endpoint , nil
179
179
}
180
180
181
- type hostPortPair struct {
182
- host string
183
- port int
184
- }
185
-
186
- func isValidEndpoint (hpp * hostPortPair ) bool {
187
- return hpp .host != "" && hpp .port > 0
188
- }
189
-
190
- func flattenValidEndpoints (endpoints []hostPortPair ) []string {
191
- // Convert Endpoint objects into strings for easier use later. Ignore
192
- // the protocol field - we'll get that from the Service objects.
193
- var result []string
194
- for i := range endpoints {
195
- hpp := & endpoints [i ]
196
- if isValidEndpoint (hpp ) {
197
- result = append (result , net .JoinHostPort (hpp .host , strconv .Itoa (hpp .port )))
198
- }
199
- }
200
- return result
201
- }
202
-
203
181
// Remove any session affinity records associated to a particular endpoint (for example when a pod goes down).
204
182
func removeSessionAffinityByEndpoint (state * balancerState , svcPort proxy.ServicePortName , endpoint string ) {
205
183
for _ , affinity := range state .affinity .affinityMap {
@@ -233,33 +211,15 @@ func (lb *LoadBalancerRR) updateAffinityMap(svcPort proxy.ServicePortName, newEn
233
211
}
234
212
}
235
213
236
- // buildPortsToEndpointsMap builds a map of portname -> all ip:ports for that
237
- // portname. Explode Endpoints.Subsets[*] into this structure.
238
- func buildPortsToEndpointsMap (endpoints * v1.Endpoints ) map [string ][]hostPortPair {
239
- portsToEndpoints := map [string ][]hostPortPair {}
240
- for i := range endpoints .Subsets {
241
- ss := & endpoints .Subsets [i ]
242
- for i := range ss .Ports {
243
- port := & ss .Ports [i ]
244
- for i := range ss .Addresses {
245
- addr := & ss .Addresses [i ]
246
- portsToEndpoints [port .Name ] = append (portsToEndpoints [port .Name ], hostPortPair {addr .IP , int (port .Port )})
247
- // Ignore the protocol field - we'll get that from the Service objects.
248
- }
249
- }
250
- }
251
- return portsToEndpoints
252
- }
253
-
254
214
func (lb * LoadBalancerRR ) OnEndpointsAdd (endpoints * v1.Endpoints ) {
255
- portsToEndpoints := buildPortsToEndpointsMap (endpoints )
215
+ portsToEndpoints := util . BuildPortsToEndpointsMap (endpoints )
256
216
257
217
lb .lock .Lock ()
258
218
defer lb .lock .Unlock ()
259
219
260
220
for portname := range portsToEndpoints {
261
221
svcPort := proxy.ServicePortName {NamespacedName : types.NamespacedName {Namespace : endpoints .Namespace , Name : endpoints .Name }, Port : portname }
262
- newEndpoints := flattenValidEndpoints ( portsToEndpoints [portname ])
222
+ newEndpoints := portsToEndpoints [portname ]
263
223
state , exists := lb .services [svcPort ]
264
224
265
225
if ! exists || state == nil || len (newEndpoints ) > 0 {
@@ -279,16 +239,16 @@ func (lb *LoadBalancerRR) OnEndpointsAdd(endpoints *v1.Endpoints) {
279
239
}
280
240
281
241
func (lb * LoadBalancerRR ) OnEndpointsUpdate (oldEndpoints , endpoints * v1.Endpoints ) {
282
- portsToEndpoints := buildPortsToEndpointsMap (endpoints )
283
- oldPortsToEndpoints := buildPortsToEndpointsMap (oldEndpoints )
242
+ portsToEndpoints := util . BuildPortsToEndpointsMap (endpoints )
243
+ oldPortsToEndpoints := util . BuildPortsToEndpointsMap (oldEndpoints )
284
244
registeredEndpoints := make (map [proxy.ServicePortName ]bool )
285
245
286
246
lb .lock .Lock ()
287
247
defer lb .lock .Unlock ()
288
248
289
249
for portname := range portsToEndpoints {
290
250
svcPort := proxy.ServicePortName {NamespacedName : types.NamespacedName {Namespace : endpoints .Namespace , Name : endpoints .Name }, Port : portname }
291
- newEndpoints := flattenValidEndpoints ( portsToEndpoints [portname ])
251
+ newEndpoints := portsToEndpoints [portname ]
292
252
state , exists := lb .services [svcPort ]
293
253
294
254
curEndpoints := []string {}
@@ -326,7 +286,7 @@ func (lb *LoadBalancerRR) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoint
326
286
}
327
287
328
288
func (lb * LoadBalancerRR ) OnEndpointsDelete (endpoints * v1.Endpoints ) {
329
- portsToEndpoints := buildPortsToEndpointsMap (endpoints )
289
+ portsToEndpoints := util . BuildPortsToEndpointsMap (endpoints )
330
290
331
291
lb .lock .Lock ()
332
292
defer lb .lock .Unlock ()
0 commit comments