Skip to content

Commit 20d277a

Browse files
slaskawibelaban
authored andcommitted
Introduced split_clusters_during_rolling_update
- Fixes #33 (rolling upgrades) - Set version to 1.0.3-SNAPSHOT, included Seb's changes (issue #33) - Fix to #33
1 parent 733b022 commit 20d277a

File tree

9 files changed

+1786
-31
lines changed

9 files changed

+1786
-31
lines changed

README.adoc

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,8 @@ will be thrown
167167

168168
| dump_requests | Dumps all discovery requests and responses to the Kubernetes server to stdout when true
169169

170+
| split_clusters_during_rolling_update | During the Rolling Update, prevents from putting all Pods into a single cluster
171+
170172
|===============
171173

172174

@@ -312,4 +314,4 @@ The commands for running on https://cloud.google.com/container-engine/docs/[Goog
312314
as when running locally in https://github.com/kubernetes/minikube[minikube].
313315

314316
The only difference is that on GKE, contrary to minikube, IP multicasting is not available. This means that the `probe.sh`
315-
command has to be run as `probe.sh -addr localhost` instead of simply running `probe.sh`.
317+
command has to be run as `probe.sh -addr localhost` instead of simply running `probe.sh`.

pom.xml

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
<groupId>org.jgroups.kubernetes</groupId>
55
<artifactId>jgroups-kubernetes</artifactId>
6-
<version>1.0.2-SNAPSHOT</version>
6+
<version>1.0.4-SNAPSHOT</version>
77
<packaging>jar</packaging>
88
<name>KUBE_PING</name>
99
<url>https://github.com/jgroups-extras/jgroups-kubernetes</url>
@@ -56,6 +56,7 @@
5656
<version.jgroups>4.0.4.Final</version.jgroups>
5757
<version.oauth>20100527</version.oauth>
5858
<version.junit>4.12</version.junit>
59+
<version.assertj>3.8.0</version.assertj>
5960
<maven.compiler.source>1.8</maven.compiler.source>
6061
<maven.compiler.target>1.8</maven.compiler.target>
6162

@@ -146,6 +147,12 @@
146147
<version>${version.junit}</version>
147148
<scope>test</scope>
148149
</dependency>
150+
<dependency>
151+
<groupId>org.assertj</groupId>
152+
<artifactId>assertj-core</artifactId>
153+
<version>${version.assertj}</version>
154+
<scope>test</scope>
155+
</dependency>
149156
</dependencies>
150157

151158
<build>

src/main/java/org/jgroups/protocols/kubernetes/Client.java

Lines changed: 24 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -71,17 +71,18 @@ protected String fetchFromKubernetes(String op, String namespace, String labels,
7171

7272

7373

74-
public List<String> getPods(String namespace, String labels, boolean dump_requests) throws Exception {
74+
public List<Pod> getPods(String namespace, String labels, boolean dump_requests) throws Exception {
7575
String result=fetchFromKubernetes("pods", namespace, labels, dump_requests);
7676
if(result == null)
7777
return Collections.emptyList();
7878
return parseJsonResult(result, namespace, labels);
7979
}
8080

81-
protected List<String> parseJsonResult(String input, String namespace, String labels) {
81+
protected List<Pod> parseJsonResult(String input, String namespace, String labels) {
8282
if(input == null)
8383
return Collections.emptyList();
8484
Json json=Json.read(input);
85+
8586
if(json == null || !json.isObject()) {
8687
log.error("JSON is not a map: %s", json);
8788
return Collections.emptyList();
@@ -92,22 +93,28 @@ protected List<String> parseJsonResult(String input, String namespace, String la
9293
return Collections.emptyList();
9394
}
9495
List<Json> items=json.at("items").asJsonList();
95-
List<String> pods=new ArrayList<>();
96+
List<Pod> pods=new ArrayList<>();
9697
for(Json obj: items) {
97-
if(obj.isObject() && obj.has("status")) {
98-
Json status=obj.at("status");
99-
if(status.isObject() && status.has("podIP")) {
100-
String podIP=status.at("podIP").asString();
101-
if(status.has("phase")) {
102-
Json phase=status.at("phase");
103-
if(phase != null && phase.isString() && !"Running".equals(phase.asString())) {
104-
log.trace("skipped pod with IP=%s as it is not running (%s)", podIP, phase);
105-
continue;
106-
}
107-
}
108-
if(!pods.contains(podIP))
109-
pods.add(podIP);
110-
}
98+
String parentDeployment = Optional.ofNullable(obj.at("metadata"))
99+
.map(podMetadata -> podMetadata.at("labels"))
100+
.map(podLabels -> podLabels.at("deployment"))
101+
.map(Json::asString)
102+
.orElse(null);
103+
String name = Optional.ofNullable(obj.at("metadata"))
104+
.map(podMetadata -> podMetadata.at("name"))
105+
.map(Json::asString)
106+
.orElse(null);
107+
String podIP = Optional.ofNullable(obj.at("status"))
108+
.map(podStatus -> podStatus.at("podIP"))
109+
.map(Json::asString)
110+
.orElse(null);
111+
if(podIP == null) {
112+
//Previously we did checks on phase. But from my observations, it is extremely rare to have a container
113+
//listed by Kubernetes API with any other status but Running (I might imagine it will hang in scheduled).
114+
//However in both cases, its IP address will be null. So it is much better to stick to that.
115+
log.trace("Skipping pod %s since it has no IP %s", name, podIP);
116+
} else {
117+
pods.add(new Pod(name, podIP, parentDeployment));
111118
}
112119
}
113120
log.trace("getPods(%s, %s) = %s", namespace, labels, pods);

src/main/java/org/jgroups/protocols/kubernetes/KUBE_PING.java

Lines changed: 38 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,11 @@ public class KUBE_PING extends Discovery {
101101
@Property(description="Dumps all discovery requests and responses to the Kubernetes server to stdout when true")
102102
protected boolean dump_requests;
103103

104+
@Property(description="The standard behavior during Rolling Update is to put all Pods in the same cluster. In" +
105+
" cases (application level incompatibility) this causes problems. One might decide to split clusters to" +
106+
" 'old' and 'new' during that process")
107+
protected boolean split_clusters_during_rolling_update;
108+
104109
protected Client client;
105110

106111
protected int tp_bind_port;
@@ -168,7 +173,7 @@ public void init() throws Exception {
168173
}
169174

170175
public void findMembers(List<Address> members, boolean initial_discovery, Responses responses) {
171-
List<String> hosts=readAll();
176+
List<Pod> hosts=readAll();
172177
List<PhysicalAddress> cluster_members=new ArrayList<>(hosts != null? hosts.size() : 16);
173178
PhysicalAddress physical_addr=null;
174179
PingData data=null;
@@ -185,10 +190,10 @@ public void findMembers(List<Address> members, boolean initial_discovery, Respon
185190
if(hosts != null) {
186191
if(log.isTraceEnabled())
187192
log.trace("%s: hosts fetched from Kubernetes: %s", local_addr, hosts);
188-
for(String host: hosts) {
193+
for(Pod host: hosts) {
189194
for(int i=0; i <= port_range; i++) {
190195
try {
191-
IpAddress addr=new IpAddress(host, tp_bind_port + i);
196+
IpAddress addr=new IpAddress(host.getIp(), tp_bind_port + i);
192197
if(!cluster_members.contains(addr))
193198
cluster_members.add(addr);
194199
}
@@ -206,6 +211,33 @@ public void findMembers(List<Address> members, boolean initial_discovery, Respon
206211
list.stream().filter(phys_addr -> !cluster_members.contains(phys_addr)).forEach(cluster_members::add);
207212
}
208213

214+
if (split_clusters_during_rolling_update) {
215+
if(physical_addr != null) {
216+
String senderIp = ((IpAddress)physical_addr).getIpAddress().getHostAddress();
217+
String senderParentDeployment = hosts.stream()
218+
.filter(pod -> senderIp.contains(pod.getIp()))
219+
.map(Pod::getParentDeployment)
220+
.findFirst().orElse(null);
221+
if(senderParentDeployment != null) {
222+
Set<String> allowedAddresses = hosts.stream()
223+
.filter(pod -> senderParentDeployment.equals(pod.getParentDeployment()))
224+
.map(Pod::getIp)
225+
.collect(Collectors.toSet());
226+
for(Iterator<PhysicalAddress> memberIterator = cluster_members.iterator(); memberIterator.hasNext();) {
227+
IpAddress podAddress = (IpAddress) memberIterator.next();
228+
if(!allowedAddresses.contains(podAddress.getIpAddress().getHostAddress())) {
229+
log.trace("removing pod %s from cluster members list since its parent domain is different than senders (%s). Allowed hosts: %s", podAddress, senderParentDeployment, allowedAddresses);
230+
memberIterator.remove();
231+
}
232+
}
233+
} else {
234+
log.warn("split_clusters_during_rolling_update is set to 'true' but can't obtain local node parent deployment. All nodes will be placed in the same cluster.");
235+
}
236+
} else {
237+
log.warn("split_clusters_during_rolling_update is set to 'true' but can't obtain local node IP address. All nodes will be placed in the same cluster.");
238+
}
239+
}
240+
209241
if(log.isTraceEnabled())
210242
log.trace("%s: sending discovery requests to %s", local_addr, cluster_members);
211243
PingHeader hdr=new PingHeader(PingHeader.GET_MBRS_REQ).clusterName(cluster_name).initialDiscovery(initial_discovery);
@@ -229,12 +261,12 @@ public void findMembers(List<Address> members, boolean initial_discovery, Respon
229261

230262
@ManagedOperation(description="Asks Kubernetes for the IP addresses of all pods")
231263
public String fetchFromKube() {
232-
List<String> list=readAll();
233-
return list.stream().collect(Collectors.joining(", "));
264+
List<Pod> list=readAll();
265+
return list.toString();
234266
}
235267

236268

237-
protected List<String> readAll() {
269+
protected List<Pod> readAll() {
238270
if(isClusteringEnabled() && client != null) {
239271
try {
240272
return client.getPods(namespace, labels, dump_requests);
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package org.jgroups.protocols.kubernetes;
2+
3+
public class Pod {
4+
5+
private final String name;
6+
private final String ip;
7+
private final String parentDeployment;
8+
9+
10+
public Pod(String name, String ip, String parentDeployment) {
11+
this.name = name;
12+
this.ip = ip;
13+
this.parentDeployment = parentDeployment;
14+
}
15+
16+
public String getName() {
17+
return name;
18+
}
19+
20+
public String getIp() {
21+
return ip;
22+
}
23+
24+
public String getParentDeployment() {
25+
return parentDeployment;
26+
}
27+
28+
@Override
29+
public String toString() {
30+
return "Pod{" +
31+
"name='" + name + '\'' +
32+
", ip='" + ip + '\'' +
33+
", parentDeployment='" + parentDeployment + '\'' +
34+
'}';
35+
}
36+
37+
@Override
38+
public boolean equals(Object o) {
39+
if (this == o) return true;
40+
if (o == null || getClass() != o.getClass()) return false;
41+
42+
Pod pod = (Pod) o;
43+
44+
if (name != null ? !name.equals(pod.name) : pod.name != null) return false;
45+
if (ip != null ? !ip.equals(pod.ip) : pod.ip != null) return false;
46+
return parentDeployment != null ? parentDeployment.equals(pod.parentDeployment) : pod.parentDeployment == null;
47+
}
48+
49+
@Override
50+
public int hashCode() {
51+
int result = name != null ? name.hashCode() : 0;
52+
result = 31 * result + (ip != null ? ip.hashCode() : 0);
53+
result = 31 * result + (parentDeployment != null ? parentDeployment.hashCode() : 0);
54+
return result;
55+
}
56+
}

src/test/java/org/jgroups/ping/kube/test/ClientTest.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11

22
package org.jgroups.ping.kube.test;
33

4-
import org.jgroups.protocols.kubernetes.Client;
5-
import org.junit.Assert;
6-
import org.junit.Test;
4+
import static org.junit.Assert.assertEquals;
75

86
import java.util.List;
97

10-
import static org.junit.Assert.assertEquals;
8+
import org.jgroups.protocols.kubernetes.Client;
9+
import org.jgroups.protocols.kubernetes.Pod;
10+
import org.junit.Assert;
11+
import org.junit.Test;
1112

1213
/**
1314
* @author <a href="mailto:[email protected]">Ales Justin</a>
@@ -17,10 +18,10 @@ public class ClientTest {
1718
@Test
1819
public void testPods() throws Exception {
1920
Client client = new TestClient();
20-
List<String> pods = client.getPods(null, null, false);
21+
List<Pod> pods = client.getPods(null, null, false);
2122
Assert.assertNotNull(pods);
2223
assertEquals(2, pods.size());
23-
String pod = pods.get(0);
24+
String pod = pods.get(0).getIp();
2425
Assert.assertNotNull(pod);
2526
}
2627

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
package org.jgroups.ping.kube.test;
2+
3+
import static org.jgroups.ping.kube.test.util.FreePortFinder.findFreePort;
4+
5+
import java.net.InetAddress;
6+
import java.util.ArrayList;
7+
import java.util.List;
8+
import java.util.Set;
9+
import java.util.stream.Collectors;
10+
11+
import org.assertj.core.api.Assertions;
12+
import org.jgroups.JChannel;
13+
import org.jgroups.Message;
14+
import org.jgroups.protocols.TCP;
15+
import org.jgroups.protocols.kubernetes.KUBE_PING;
16+
import org.jgroups.protocols.kubernetes.Pod;
17+
import org.jgroups.protocols.pbcast.GMS;
18+
import org.jgroups.protocols.pbcast.NAKACK2;
19+
import org.jgroups.stack.IpAddress;
20+
import org.junit.Test;
21+
22+
/**
23+
* Tests Rolling update scenarios.
24+
*
25+
* <p>
26+
* The idea of this tests is to mock the Kubernetes API response and check which hosts were queried during
27+
* initial discovery. This way we will know which hosts where put into the cluster.
28+
* </p>
29+
*/
30+
public class RollingUpdateTest {
31+
32+
@Test
33+
public void testPuttingAllNodesInTheSameClusterDuringRollingUpdate() throws Exception {
34+
//given
35+
KUBE_PING_FOR_TESTING testedProtocol = new KUBE_PING_FOR_TESTING("/openshift_rolling_update.json");
36+
37+
//when
38+
sendInitialDiscovery(testedProtocol);
39+
Set<String> membersUsedForDiscovery = testedProtocol.getCollectedMessages().stream()
40+
.map(e -> ((IpAddress)e.getDest()).getIpAddress().getHostAddress())
41+
.collect(Collectors.toSet());
42+
List<String> allPodsFromKubernetesApi = testedProtocol.getPods().stream()
43+
.map(pod -> pod.getIp())
44+
.collect(Collectors.toList());
45+
46+
//then
47+
Assertions.assertThat(membersUsedForDiscovery).hasSameElementsAs(allPodsFromKubernetesApi);
48+
}
49+
50+
@Test
51+
public void testPutOnlyNodesWithTheSameParentDuringRollingUpdate() throws Exception {
52+
//given
53+
KUBE_PING_FOR_TESTING testedProtocol = new KUBE_PING_FOR_TESTING("/openshift_rolling_update.json");
54+
testedProtocol.setValue("split_clusters_during_rolling_update", true);
55+
56+
//when
57+
sendInitialDiscovery(testedProtocol);
58+
String senderParentDeployment = testedProtocol.getPods().stream()
59+
.filter(pod -> "127.0.0.1".equals(pod.getIp()))
60+
.map(pod -> pod.getParentDeployment())
61+
.findFirst().get();
62+
Set<String> membersUsedForDiscovery = testedProtocol.getCollectedMessages().stream()
63+
.map(e -> ((IpAddress)e.getDest()).getIpAddress().getHostAddress())
64+
.collect(Collectors.toSet());
65+
List<String> allowedPodsFromKubernetesApi = testedProtocol.getPods().stream()
66+
.filter(pod -> senderParentDeployment.equals(pod.getParentDeployment()))
67+
.map(pod -> pod.getIp())
68+
.collect(Collectors.toList());
69+
70+
//then
71+
Assertions.assertThat(allowedPodsFromKubernetesApi).containsAll(membersUsedForDiscovery);
72+
}
73+
74+
private void sendInitialDiscovery(KUBE_PING kubePingProtocol) throws Exception {
75+
new JChannel(
76+
new TCP().setValue("bind_addr", InetAddress.getLoopbackAddress()).setValue("bind_port", findFreePort()),
77+
kubePingProtocol,
78+
new NAKACK2(),
79+
new GMS().setValue("join_timeout", 1)
80+
).connect("RollingUpdateTest").disconnect();
81+
}
82+
83+
static class KUBE_PING_FOR_TESTING extends KUBE_PING {
84+
85+
private final String resourceFile;
86+
private List<Message> collectedMessages = new ArrayList<>();
87+
private List<Pod> pods;
88+
89+
public KUBE_PING_FOR_TESTING(String resourceFile) {
90+
this.resourceFile = resourceFile;
91+
}
92+
93+
@Override
94+
public void init() throws Exception {
95+
super.init();
96+
client = new TestClient(resourceFile);
97+
pods = client.getPods(namespace, labels, true);
98+
}
99+
100+
@Override
101+
protected void sendDiscoveryRequest(Message req) {
102+
collectedMessages.add(req);
103+
}
104+
105+
public List<Message> getCollectedMessages() {
106+
return collectedMessages;
107+
}
108+
109+
public List<Pod> getPods() {
110+
return pods;
111+
}
112+
}
113+
114+
}

0 commit comments

Comments
 (0)