Skip to content

Commit be3d733

Browse files
committed
group pods by metadata.labels.pod-template-hash during rolling update
1 parent d3c1a73 commit be3d733

File tree

7 files changed

+598
-32
lines changed

7 files changed

+598
-32
lines changed

src/main/java/org/jgroups/protocols/kubernetes/Client.java

Lines changed: 42 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,12 @@
66
import org.jgroups.util.Util;
77

88
import java.io.InputStream;
9-
import java.util.*;
9+
import java.util.ArrayList;
10+
import java.util.Collections;
11+
import java.util.List;
12+
import java.util.Map;
13+
import java.util.Optional;
14+
import java.util.TreeMap;
1015

1116
import static org.jgroups.protocols.kubernetes.Utils.openStream;
1217
import static org.jgroups.protocols.kubernetes.Utils.urlencode;
@@ -92,6 +97,37 @@ public List<Pod> getPods(String namespace, String labels, boolean dump_requests)
9297
return parseJsonResult(result, namespace, labels);
9398
}
9499

100+
/**
101+
* get pod group during Rolling Update
102+
* @param pod - json returned by k8s
103+
* @return
104+
*/
105+
String getPodGroup(Json pod) {
106+
Json meta = Optional.ofNullable(pod.at("metadata")).orElse(null);
107+
Json labels = Optional.ofNullable(meta)
108+
.map(podMetadata -> podMetadata.at("labels"))
109+
.orElse(null);
110+
String group = Optional.ofNullable(labels)
111+
.map(l -> l.at("pod-template-hash"))
112+
.map(Json::asString)
113+
.orElse(null);
114+
115+
if (group == null) {
116+
log.warn("metadata.labels.pod-template-hash not found in pod json. Impossible to reliably determine pod group during Rolling Update");
117+
// keep backward-compatible behavior
118+
group = Optional.ofNullable(labels)
119+
.map(l -> l.at("deployment"))
120+
.map(Json::asString)
121+
.orElse(null);
122+
}
123+
124+
log.debug("pod %s, group %s", Optional.ofNullable(meta)
125+
.map(m -> m.at("name"))
126+
.map(Json::asString)
127+
.orElse(null), group);
128+
return group;
129+
}
130+
95131
protected List<Pod> parseJsonResult(String input, String namespace, String labels) {
96132
if(input == null)
97133
return Collections.emptyList();
@@ -109,11 +145,8 @@ protected List<Pod> parseJsonResult(String input, String namespace, String label
109145
List<Json> items=json.at("items").asJsonList();
110146
List<Pod> pods=new ArrayList<>();
111147
for(Json obj: items) {
112-
String parentDeployment = Optional.ofNullable(obj.at("metadata"))
113-
.map(podMetadata -> podMetadata.at("labels"))
114-
.map(podLabels -> podLabels.at("deployment"))
115-
.map(Json::asString)
116-
.orElse(null);
148+
String parentDeployment = getPodGroup(obj);
149+
117150
String name = Optional.ofNullable(obj.at("metadata"))
118151
.map(podMetadata -> podMetadata.at("name"))
119152
.map(Json::asString)
@@ -126,10 +159,10 @@ protected List<Pod> parseJsonResult(String input, String namespace, String label
126159
.orElse(null);
127160
}
128161
boolean running = podRunning(podStatus);
129-
if(podIP == null || !running) {
130-
log.trace("Skipping pod %s since it's IP is %s or running is %s", name, podIP, Boolean.toString(running));
162+
if(podIP == null) {
163+
log.trace("Skipping pod %s since it's IP is %s", name, podIP);
131164
} else {
132-
pods.add(new Pod(name, podIP, parentDeployment));
165+
pods.add(new Pod(name, podIP, parentDeployment, running));
133166
}
134167
}
135168
log.trace("getPods(%s, %s) = %s", namespace, labels, pods);

src/main/java/org/jgroups/protocols/kubernetes/KUBE_PING.java

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ public class KUBE_PING extends Discovery {
108108
" 'old' and 'new' during that process")
109109
protected boolean split_clusters_during_rolling_update;
110110

111+
111112
protected Client client;
112113

113114
protected int tp_bind_port;
@@ -194,15 +195,18 @@ private boolean isPropertyDefined(String property_name) {
194195
super.destroy();
195196
}
196197

198+
private PhysicalAddress getCurrentPhysicalAddress(Address addr) {
199+
return (PhysicalAddress)down(new Event(Event.GET_PHYSICAL_ADDRESS, addr));
200+
}
201+
197202
public void findMembers(List<Address> members, boolean initial_discovery, Responses responses) {
198203
List<Pod> hosts=readAll();
199204
List<PhysicalAddress> cluster_members=new ArrayList<>(hosts != null? hosts.size() : 16);
200205
PhysicalAddress physical_addr=null;
201206
PingData data=null;
202207

203208
if(!use_ip_addrs || !initial_discovery) {
204-
physical_addr=(PhysicalAddress)down(new Event(Event.GET_PHYSICAL_ADDRESS, local_addr));
205-
209+
physical_addr = getCurrentPhysicalAddress(local_addr);
206210
// https://issues.jboss.org/browse/JGRP-1670
207211
data=new PingData(local_addr, false, NameCache.get(local_addr), physical_addr);
208212
if(members != null && members.size() <= max_members_in_discovery_request)
@@ -213,6 +217,8 @@ public void findMembers(List<Address> members, boolean initial_discovery, Respon
213217
if(log.isTraceEnabled())
214218
log.trace("%s: hosts fetched from Kubernetes: %s", local_addr, hosts);
215219
for(Pod host: hosts) {
220+
if (!host.isReady())
221+
continue;
216222
for(int i=0; i <= port_range; i++) {
217223
try {
218224
IpAddress addr=new IpAddress(host.getIp(), tp_bind_port + i);
@@ -236,19 +242,21 @@ public void findMembers(List<Address> members, boolean initial_discovery, Respon
236242
if (split_clusters_during_rolling_update) {
237243
if(physical_addr != null) {
238244
String senderIp = ((IpAddress)physical_addr).getIpAddress().getHostAddress();
239-
String senderParentDeployment = hosts.stream()
245+
// Please note we search for sender parent group through all pods, ever not ready. It's because JGroup discovery is performed
246+
// before Wildfly can respond to http liveness probe.
247+
String senderPodGroup = hosts.stream()
240248
.filter(pod -> senderIp.contains(pod.getIp()))
241-
.map(Pod::getParentDeployment)
249+
.map(Pod::getPodGroup)
242250
.findFirst().orElse(null);
243-
if(senderParentDeployment != null) {
251+
if(senderPodGroup != null) {
244252
Set<String> allowedAddresses = hosts.stream()
245-
.filter(pod -> senderParentDeployment.equals(pod.getParentDeployment()))
253+
.filter(pod -> senderPodGroup.equals(pod.getPodGroup()))
246254
.map(Pod::getIp)
247255
.collect(Collectors.toSet());
248256
for(Iterator<PhysicalAddress> memberIterator = cluster_members.iterator(); memberIterator.hasNext();) {
249257
IpAddress podAddress = (IpAddress) memberIterator.next();
250258
if(!allowedAddresses.contains(podAddress.getIpAddress().getHostAddress())) {
251-
log.trace("removing pod %s from cluster members list since its parent domain is different than senders (%s). Allowed hosts: %s", podAddress, senderParentDeployment, allowedAddresses);
259+
log.trace("removing pod %s from cluster members list since its parent domain is different than senders (%s). Allowed hosts: %s", podAddress, senderPodGroup, allowedAddresses);
252260
memberIterator.remove();
253261
}
254262
}

src/main/java/org/jgroups/protocols/kubernetes/Pod.java

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,19 @@ public class Pod {
44

55
private final String name;
66
private final String ip;
7-
private final String parentDeployment;
7+
private final String podGroup; // name of group of Pods during Rolling Update. There is two groups: new pods and old pods
8+
private final boolean isReady;
89

910

10-
public Pod(String name, String ip, String parentDeployment) {
11+
public Pod(String name, String ip, String podGroup, boolean isReady) {
1112
this.name = name;
1213
this.ip = ip;
13-
this.parentDeployment = parentDeployment;
14+
this.podGroup = podGroup;
15+
this.isReady = isReady;
16+
}
17+
18+
public Pod(String name, String ip, String podGroup) {
19+
this(name, ip, podGroup, false);
1420
}
1521

1622
public String getName() {
@@ -21,16 +27,20 @@ public String getIp() {
2127
return ip;
2228
}
2329

24-
public String getParentDeployment() {
25-
return parentDeployment;
30+
public String getPodGroup() {
31+
return podGroup;
32+
}
33+
34+
public boolean isReady() {
35+
return isReady;
2636
}
2737

2838
@Override
2939
public String toString() {
3040
return "Pod{" +
3141
"name='" + name + '\'' +
3242
", ip='" + ip + '\'' +
33-
", parentDeployment='" + parentDeployment + '\'' +
43+
", podGroup='" + podGroup + '\'' +
3444
'}';
3545
}
3646

@@ -43,14 +53,14 @@ public boolean equals(Object o) {
4353

4454
if (name != null ? !name.equals(pod.name) : pod.name != null) return false;
4555
if (ip != null ? !ip.equals(pod.ip) : pod.ip != null) return false;
46-
return parentDeployment != null ? parentDeployment.equals(pod.parentDeployment) : pod.parentDeployment == null;
56+
return podGroup != null ? podGroup.equals(pod.podGroup) : pod.podGroup == null;
4757
}
4858

4959
@Override
5060
public int hashCode() {
5161
int result = name != null ? name.hashCode() : 0;
5262
result = 31 * result + (ip != null ? ip.hashCode() : 0);
53-
result = 31 * result + (parentDeployment != null ? parentDeployment.hashCode() : 0);
63+
result = 31 * result + (podGroup != null ? podGroup.hashCode() : 0);
5464
return result;
5565
}
5666
}

src/test/java/org/jgroups/ping/kube/test/ClientTest.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,4 +37,29 @@ public void testParsingPortWithoutNames() throws Exception {
3737
assertEquals(2, numberOfPods);
3838
}
3939

40+
@Test
41+
public void testParsingPodGroupPodTemplateHash() throws Exception {
42+
//given
43+
Client client = new TestClient("/pods_without_ports.json");
44+
45+
//when
46+
String podGroup = client.getPods(null, null, false).get(0).getPodGroup();
47+
48+
//then
49+
assertEquals("infinispan-simple-tutorials-kubernetes-5", podGroup);
50+
}
51+
52+
@Test
53+
public void testParsingPodGroupOpenshift() throws Exception {
54+
//given
55+
Client client = new TestClient("/replicaset_rolling_update.json");
56+
57+
//when
58+
String podGroup = client.getPods(null, null, false).get(0).getPodGroup();
59+
60+
//then
61+
assertEquals("6569c544b", podGroup);
62+
}
63+
64+
4065
}

src/test/java/org/jgroups/ping/kube/test/RollingUpdateTest.java

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -48,22 +48,34 @@ public void testPuttingAllNodesInTheSameClusterDuringRollingUpdate() throws Exce
4848
}
4949

5050
@Test
51-
public void testPutOnlyNodesWithTheSameParentDuringRollingUpdate() throws Exception {
51+
public void testPutOnlyNodesWithTheSameParentDuringRollingUpdateOpenShift() throws Exception {
5252
//given
5353
KUBE_PING_FOR_TESTING testedProtocol = new KUBE_PING_FOR_TESTING("/openshift_rolling_update.json");
5454
testedProtocol.setValue("split_clusters_during_rolling_update", true);
5555

56+
testPutOnlyNodesWithTheSameParentDuringRollingUpdate(testedProtocol);
57+
}
58+
59+
@Test
60+
public void testPutOnlyNodesWithTheSameParentDuringRollingUpdateReplicaSet() throws Exception {
61+
//given
62+
KUBE_PING_FOR_TESTING testedProtocol = new KUBE_PING_FOR_TESTING("/replicaset_rolling_update.json");
63+
testedProtocol.setValue("split_clusters_during_rolling_update", true);
64+
testPutOnlyNodesWithTheSameParentDuringRollingUpdate(testedProtocol);
65+
}
66+
67+
private void testPutOnlyNodesWithTheSameParentDuringRollingUpdate(KUBE_PING_FOR_TESTING testedProtocol) throws Exception {
5668
//when
5769
sendInitialDiscovery(testedProtocol);
5870
String senderParentDeployment = testedProtocol.getPods().stream()
5971
.filter(pod -> "127.0.0.1".equals(pod.getIp()))
60-
.map(Pod::getParentDeployment)
72+
.map(Pod::getPodGroup)
6173
.findFirst().get();
6274
Set<String> membersUsedForDiscovery = testedProtocol.getCollectedMessages().stream()
63-
.map(e -> ((IpAddress)e.getDest()).getIpAddress().getHostAddress())
64-
.collect(Collectors.toSet());
75+
.map(e -> ((IpAddress)e.getDest()).getIpAddress().getHostAddress())
76+
.collect(Collectors.toSet());
6577
List<String> allowedPodsFromKubernetesApi = testedProtocol.getPods().stream()
66-
.filter(pod -> senderParentDeployment.equals(pod.getParentDeployment()))
78+
.filter(pod -> senderParentDeployment.equals(pod.getPodGroup()))
6779
.map(Pod::getIp)
6880
.collect(Collectors.toList());
6981

src/test/java/org/jgroups/ping/kube/test/StatusTest.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
package org.jgroups.ping.kube.test;
22

3-
import static org.junit.Assert.assertEquals;
4-
53
import java.util.List;
64

75
import org.jgroups.protocols.kubernetes.Client;
86
import org.jgroups.protocols.kubernetes.Pod;
97
import org.junit.Assert;
108
import org.junit.Test;
119

10+
import static org.junit.Assert.*;
11+
1212
/**
1313
* @author <a href="mailto:[email protected]">Ulrich Romahn</a>
1414
*/
@@ -30,7 +30,10 @@ public void testOnePodNotRunning() throws Exception {
3030
Client client = new TestClient(jsonFile);
3131
List<Pod> pods = client.getPods(null, null, false);
3232
Assert.assertNotNull(pods);
33-
assertEquals(2, pods.size());
33+
assertEquals(3, pods.size());
34+
assertTrue(pods.get(0).isReady());
35+
assertTrue(pods.get(1).isReady());
36+
assertFalse(pods.get(2).isReady());
3437
String pod = pods.get(0).getIp();
3538
Assert.assertNotNull(pod);
3639
}

0 commit comments

Comments
 (0)