Skip to content

Commit 7c64d15

Browse files
authored
Merge pull request #1373 from yue9944882/feat/network-loadbalance
Feat: Utilities for doing client-side load-balancing based-on endpoints
2 parents a43fa93 + a812284 commit 7c64d15

File tree

7 files changed

+412
-0
lines changed

7 files changed

+412
-0
lines changed
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
Copyright 2020 The Kubernetes Authors.
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
package io.kubernetes.client.extended.network;
14+
15+
import io.kubernetes.client.openapi.models.V1Endpoints;
16+
import java.util.List;
17+
import java.util.function.Supplier;
18+
import java.util.stream.Collectors;
19+
20+
/**
21+
* EndpointsLoadBalancer does client-side load-balancing by reading available IP addresses from
22+
* endpoint instance.
23+
*
24+
* <p>For example:
25+
*
26+
* <p>> new EndpointsLoadBalancer( > () -> endpoints, new YourLoadBalanceStrategy()) > );
27+
*/
28+
public class EndpointsLoadBalancer implements LoadBalancer {
29+
30+
protected final Supplier<V1Endpoints> endpointsSupplier;
31+
protected final LoadBalanceStrategy strategy;
32+
33+
public EndpointsLoadBalancer(
34+
Supplier<V1Endpoints> endpointsSupplier, LoadBalanceStrategy strategy) {
35+
this.endpointsSupplier = endpointsSupplier;
36+
this.strategy = strategy;
37+
}
38+
39+
@Override
40+
public String getTargetIP() {
41+
V1Endpoints ep = endpointsSupplier.get();
42+
List<String> availableIPs =
43+
ep.getSubsets().stream()
44+
.flatMap(subset -> subset.getAddresses().stream().map(addr -> addr.getIp()))
45+
.collect(Collectors.toList());
46+
return this.strategy.chooseIP(availableIPs);
47+
}
48+
49+
@Override
50+
public String getTargetIP(int port) {
51+
V1Endpoints ep = endpointsSupplier.get();
52+
List<String> availableIPs =
53+
ep.getSubsets().stream()
54+
.filter(
55+
subset ->
56+
subset.getPorts().stream()
57+
.anyMatch(epPort -> Integer.valueOf(port).equals(epPort.getPort())))
58+
.flatMap(subset -> subset.getAddresses().stream().map(addr -> addr.getIp()))
59+
.collect(Collectors.toList());
60+
return this.strategy.chooseIP(availableIPs);
61+
}
62+
}
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/*
2+
Copyright 2020 The Kubernetes Authors.
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
package io.kubernetes.client.extended.network;
14+
15+
import com.google.common.util.concurrent.ThreadFactoryBuilder;
16+
import io.kubernetes.client.informer.cache.Lister;
17+
import io.kubernetes.client.openapi.ApiClient;
18+
import io.kubernetes.client.openapi.ApiException;
19+
import io.kubernetes.client.openapi.apis.CoreV1Api;
20+
import io.kubernetes.client.openapi.models.V1Endpoints;
21+
import java.time.Duration;
22+
import java.util.concurrent.Executors;
23+
import java.util.concurrent.ScheduledExecutorService;
24+
import java.util.concurrent.TimeUnit;
25+
import java.util.concurrent.atomic.AtomicReference;
26+
import java.util.function.Supplier;
27+
import org.slf4j.Logger;
28+
import org.slf4j.LoggerFactory;
29+
30+
public class EndpointsUtils {
31+
32+
private static final Logger LOGGER = LoggerFactory.getLogger(EndpointsUtils.class);
33+
34+
public static Supplier<V1Endpoints> newPollingEndpointsReader(
35+
String namespace, String name, Duration pollingInterval, ApiClient apiClient)
36+
throws ApiException {
37+
return newPollingEndpointsReader(
38+
namespace,
39+
name,
40+
pollingInterval,
41+
Executors.newSingleThreadScheduledExecutor(
42+
new ThreadFactoryBuilder()
43+
.setDaemon(true)
44+
.setNameFormat("EndpointPoller[" + namespace + "/" + name + "]")
45+
.build()),
46+
apiClient);
47+
}
48+
49+
public static Supplier<V1Endpoints> newPollingEndpointsReader(
50+
String namespace,
51+
String name,
52+
Duration pollingInterval,
53+
ScheduledExecutorService pollerDaemon,
54+
ApiClient apiClient)
55+
throws ApiException {
56+
if (pollingInterval.isZero() || pollingInterval.isNegative()) {
57+
throw new IllegalArgumentException(
58+
"polling interval must be positive: " + pollingInterval.toString());
59+
}
60+
CoreV1Api coreV1Api = new CoreV1Api(apiClient);
61+
V1Endpoints endpoints = coreV1Api.readNamespacedEndpoints(name, namespace, null, null, null);
62+
AtomicReference<V1Endpoints> ref = new AtomicReference<>(endpoints);
63+
pollerDaemon.scheduleAtFixedRate(
64+
() -> {
65+
try {
66+
V1Endpoints refreshingEndpoint =
67+
coreV1Api.readNamespacedEndpoints(name, namespace, null, null, null);
68+
ref.set(refreshingEndpoint);
69+
} catch (ApiException e) {
70+
LOGGER.error("failed polling endpoint {}/{}", namespace, name, e);
71+
}
72+
},
73+
0,
74+
pollingInterval.toMillis(),
75+
TimeUnit.MILLISECONDS);
76+
return ref::get;
77+
}
78+
79+
public static Supplier<V1Endpoints> newCachedEndpointsReader(
80+
String namespace, String name, Lister<V1Endpoints> endpointsLister) {
81+
return () -> endpointsLister.namespace(namespace).get(name);
82+
}
83+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/*
2+
Copyright 2020 The Kubernetes Authors.
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
package io.kubernetes.client.extended.network;
14+
15+
import java.util.List;
16+
17+
public interface LoadBalanceStrategy {
18+
19+
String chooseIP(List<String> ipCandidates);
20+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
Copyright 2020 The Kubernetes Authors.
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
package io.kubernetes.client.extended.network;
14+
15+
/** LoadBalancer provides IP address for L4 client-side load-balancing. */
16+
public interface LoadBalancer {
17+
18+
String getTargetIP();
19+
20+
String getTargetIP(int port);
21+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
Copyright 2020 The Kubernetes Authors.
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
package io.kubernetes.client.extended.network;
14+
15+
import java.util.List;
16+
import java.util.concurrent.atomic.AtomicInteger;
17+
18+
public class RoundRobinLoadBalanceStrategy implements LoadBalanceStrategy {
19+
20+
private final AtomicInteger robinIndex = new AtomicInteger(0);
21+
22+
@Override
23+
public String chooseIP(List<String> availableIPs) {
24+
if (availableIPs == null || availableIPs.size() == 0) {
25+
throw new IllegalArgumentException("failed choosing IP target: empty candidates");
26+
}
27+
int len = availableIPs.size();
28+
int idx = robinIndex.getAndIncrement() % len;
29+
return availableIPs.get(idx);
30+
}
31+
}
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/*
2+
Copyright 2020 The Kubernetes Authors.
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
package io.kubernetes.client.extended.network;
14+
15+
import static org.junit.Assert.*;
16+
17+
import io.kubernetes.client.openapi.models.V1EndpointAddress;
18+
import io.kubernetes.client.openapi.models.V1EndpointPort;
19+
import io.kubernetes.client.openapi.models.V1EndpointSubset;
20+
import io.kubernetes.client.openapi.models.V1Endpoints;
21+
import java.util.Arrays;
22+
import java.util.List;
23+
import java.util.concurrent.atomic.AtomicReference;
24+
import org.junit.Test;
25+
26+
public class EndpointsLoadBalancerTests {
27+
28+
private final V1Endpoints onePortTwoHostEp =
29+
new V1Endpoints()
30+
.addSubsetsItem(
31+
new V1EndpointSubset()
32+
.addAddressesItem(new V1EndpointAddress().ip("127.0.0.1"))
33+
.addAddressesItem(new V1EndpointAddress().ip("127.0.0.2"))
34+
.addPortsItem(new V1EndpointPort().port(8080)));
35+
36+
private final V1Endpoints twoPortTwoHostEp =
37+
new V1Endpoints()
38+
.addSubsetsItem(
39+
new V1EndpointSubset()
40+
.addAddressesItem(new V1EndpointAddress().ip("127.0.0.1"))
41+
.addAddressesItem(new V1EndpointAddress().ip("127.0.0.2"))
42+
.addPortsItem(new V1EndpointPort().port(8080))
43+
.addPortsItem(new V1EndpointPort().port(8081)));
44+
45+
private final V1Endpoints twoSubsetTwoPortTwoHostEp =
46+
new V1Endpoints()
47+
.addSubsetsItem(
48+
new V1EndpointSubset()
49+
.addAddressesItem(new V1EndpointAddress().ip("127.0.0.1"))
50+
.addAddressesItem(new V1EndpointAddress().ip("127.0.0.2"))
51+
.addPortsItem(new V1EndpointPort().port(8080))
52+
.addPortsItem(new V1EndpointPort().port(8081)))
53+
.addSubsetsItem(
54+
new V1EndpointSubset()
55+
.addAddressesItem(new V1EndpointAddress().ip("127.0.0.3"))
56+
.addPortsItem(new V1EndpointPort().port(8082)));
57+
58+
@Test
59+
public void testGetTargetIP1() {
60+
AtomicReference<List<String>> receivingAvailableIPs = new AtomicReference<>();
61+
EndpointsLoadBalancer endpointsLoadBalancer =
62+
new EndpointsLoadBalancer(
63+
() -> onePortTwoHostEp,
64+
(ips) -> {
65+
receivingAvailableIPs.set(ips);
66+
return null;
67+
});
68+
endpointsLoadBalancer.getTargetIP();
69+
assertEquals(Arrays.asList("127.0.0.1", "127.0.0.2"), receivingAvailableIPs.get());
70+
}
71+
72+
@Test
73+
public void testGetTargetIP2() {
74+
AtomicReference<List<String>> receivingAvailableIPs = new AtomicReference<>();
75+
EndpointsLoadBalancer endpointsLoadBalancer =
76+
new EndpointsLoadBalancer(
77+
() -> twoPortTwoHostEp,
78+
(ips) -> {
79+
receivingAvailableIPs.set(ips);
80+
return null;
81+
});
82+
endpointsLoadBalancer.getTargetIP();
83+
assertEquals(Arrays.asList("127.0.0.1", "127.0.0.2"), receivingAvailableIPs.get());
84+
}
85+
86+
@Test
87+
public void testGetTargetIP3() {
88+
AtomicReference<List<String>> receivingAvailableIPs = new AtomicReference<>();
89+
EndpointsLoadBalancer endpointsLoadBalancer =
90+
new EndpointsLoadBalancer(
91+
() -> twoSubsetTwoPortTwoHostEp,
92+
(ips) -> {
93+
receivingAvailableIPs.set(ips);
94+
return null;
95+
});
96+
endpointsLoadBalancer.getTargetIP(8082);
97+
assertEquals(Arrays.asList("127.0.0.3"), receivingAvailableIPs.get());
98+
}
99+
}

0 commit comments

Comments
 (0)