Skip to content

Commit f0568c5

Browse files
committed
CLOUD-974 JGroups 4 support
fixes #18
1 parent dae1dd8 commit f0568c5

File tree

11 files changed

+187
-32
lines changed

11 files changed

+187
-32
lines changed

common/src/main/java/org/jgroups/ping/common/OpenshiftPing.java

Lines changed: 42 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,20 +22,24 @@
2222
import java.io.DataInputStream;
2323
import java.io.DataOutputStream;
2424
import java.io.InputStream;
25+
import java.lang.reflect.Method;
2526
import java.net.HttpURLConnection;
2627
import java.net.InetSocketAddress;
2728
import java.net.URL;
2829
import java.util.Collections;
2930
import java.util.List;
3031
import java.util.concurrent.TimeUnit;
3132

33+
import org.jgroups.Address;
3234
import org.jgroups.Event;
3335
import org.jgroups.Message;
3436
import org.jgroups.annotations.Property;
35-
import org.jgroups.protocols.PING;
3637
import org.jgroups.ping.common.server.Server;
3738
import org.jgroups.ping.common.server.ServerFactory;
3839
import org.jgroups.ping.common.server.Servers;
40+
import org.jgroups.protocols.PING;
41+
import org.openshift.ping.common.compatibility.CompatibilityException;
42+
import org.openshift.ping.common.compatibility.CompatibilityUtils;
3943

4044
public abstract class OpenshiftPing extends PING {
4145

@@ -63,9 +67,20 @@ public abstract class OpenshiftPing extends PING {
6367
private Server _server;
6468
private String _serverName;
6569

70+
private static Method sendMethod; //handled via reflection due to JGroups 3/4 incompatibility
71+
6672
public OpenshiftPing(String systemEnvPrefix) {
6773
super();
6874
_systemEnvPrefix = trimToNull(systemEnvPrefix);
75+
try {
76+
if(CompatibilityUtils.isJGroups4()) {
77+
sendMethod = this.getClass().getMethod("up", Message.class);
78+
} else {
79+
sendMethod = this.getClass().getMethod("up", Event.class);
80+
}
81+
} catch (Exception e) {
82+
throw new CompatibilityException("Could not find suitable 'up' method.", e);
83+
}
6984
}
7085

7186
protected final String getSystemEnvName(String systemEnvSuffix) {
@@ -180,25 +195,49 @@ protected void sendMcastDiscoveryRequest(Message msg) {
180195
return;
181196
}
182197
if (msg.getSrc() == null) {
183-
msg.setSrc(local_addr);
198+
setMessageSourceAddress(msg);
199+
184200
}
185201
for (InetSocketAddress node : nodes) {
186202
// forward the request to each node
187203
timer.execute(new SendDiscoveryRequest(node, msg));
188204
}
189205
}
190206

207+
private void setMessageSourceAddress(Message msg) {
208+
//public void setSrc(Address new_src) {src_addr=new_src;} - JGroups 3.6.13
209+
//public Message setSrc(Address new_src) {src_addr=new_src; return this;} - JGroups 4.0.1
210+
//unfortunately need to use reflections
211+
try {
212+
msg.getClass().getMethod("setSrc", Address.class).invoke(msg, local_addr);
213+
} catch (Exception e) {
214+
log.warn("Setting local address for discovery failed.");
215+
}
216+
}
217+
191218
public void handlePingRequest(InputStream stream) throws Exception {
192219
DataInputStream dataInput = new DataInputStream(stream);
193220
Message msg = new Message();
194221
msg.readFrom(dataInput);
195222
try {
196-
up(new Event(Event.MSG, msg));
223+
sendUp(msg);
197224
} catch (Exception e) {
198225
log.error("Error processing GET_MBRS_REQ.", e);
199226
}
200227
}
201228

229+
private void sendUp(Message msg) {
230+
try {
231+
if(CompatibilityUtils.isJGroups4()) {
232+
sendMethod.invoke(this, msg);
233+
} else {
234+
sendMethod.invoke(this, new Event(1, msg));
235+
}
236+
} catch (Exception e) {
237+
throw new CompatibilityException("Could not invoke 'up' method.", e);
238+
}
239+
}
240+
202241
private List<InetSocketAddress> readAll() {
203242
if (isClusteringEnabled()) {
204243
return doReadAll(clusterName);

common/src/main/java/org/jgroups/ping/common/server/AbstractServer.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import java.util.HashMap;
2222
import java.util.Map;
2323

24-
import org.jgroups.Channel;
2524
import org.jgroups.JChannel;
2625
import org.jgroups.ping.common.OpenshiftPing;
2726

@@ -31,13 +30,13 @@
3130
public abstract class AbstractServer implements Server {
3231

3332
protected final int port;
34-
protected final Map<String, Channel> CHANNELS = new HashMap<String, Channel>();
33+
protected final Map<String, JChannel> CHANNELS = new HashMap<>();
3534

3635
protected AbstractServer(int port) {
3736
this.port = port;
3837
}
3938

40-
public final Channel getChannel(String clusterName) {
39+
public final JChannel getChannel(String clusterName) {
4140
if (clusterName != null) {
4241
synchronized (CHANNELS) {
4342
return CHANNELS.get(clusterName);
@@ -46,7 +45,7 @@ public final Channel getChannel(String clusterName) {
4645
return null;
4746
}
4847

49-
protected final void addChannel(Channel channel) {
48+
protected final void addChannel(JChannel channel) {
5049
String clusterName = getClusterName(channel);
5150
if (clusterName != null) {
5251
synchronized (CHANNELS) {
@@ -55,7 +54,7 @@ protected final void addChannel(Channel channel) {
5554
}
5655
}
5756

58-
protected final void removeChannel(Channel channel) {
57+
protected final void removeChannel(JChannel channel) {
5958
String clusterName = getClusterName(channel);
6059
if (clusterName != null) {
6160
synchronized (CHANNELS) {
@@ -70,11 +69,11 @@ protected final boolean hasChannels() {
7069
}
7170
}
7271

73-
private String getClusterName(final Channel channel) {
72+
private String getClusterName(final JChannel channel) {
7473
if (channel != null) {
7574
String clusterName = channel.getClusterName();
7675
// clusterName will be null if the Channel is not yet connected, but we still need it!
77-
if (clusterName == null && channel instanceof JChannel) {
76+
if (clusterName == null) {
7877
try {
7978
Field field = JChannel.class.getDeclaredField("cluster_name");
8079
field.setAccessible(true);
@@ -85,7 +84,7 @@ private String getClusterName(final Channel channel) {
8584
return null;
8685
}
8786

88-
protected final void handlePingRequest(Channel channel, InputStream stream) throws Exception {
87+
protected final void handlePingRequest(JChannel channel, InputStream stream) throws Exception {
8988
if (channel != null) {
9089
OpenshiftPing handler = (OpenshiftPing) channel.getProtocolStack().findProtocol(OpenshiftPing.class);
9190
handler.handlePingRequest(stream);

common/src/main/java/org/jgroups/ping/common/server/JDKServer.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import java.net.InetSocketAddress;
2222
import java.util.concurrent.Executors;
2323

24-
import org.jgroups.Channel;
24+
import org.jgroups.JChannel;
2525

2626
import com.sun.net.httpserver.HttpExchange;
2727
import com.sun.net.httpserver.HttpHandler;
@@ -38,7 +38,7 @@ public JDKServer(int port) {
3838
super(port);
3939
}
4040

41-
public synchronized boolean start(Channel channel) throws Exception {
41+
public synchronized boolean start(JChannel channel) throws Exception {
4242
boolean started = false;
4343
if (server == null) {
4444
try {
@@ -57,7 +57,7 @@ public synchronized boolean start(Channel channel) throws Exception {
5757
return started;
5858
}
5959

60-
public synchronized boolean stop(Channel channel) {
60+
public synchronized boolean stop(JChannel channel) {
6161
boolean stopped = false;
6262
removeChannel(channel);
6363
if (server != null && !hasChannels()) {
@@ -82,7 +82,7 @@ public void handle(HttpExchange exchange) throws IOException {
8282
exchange.sendResponseHeaders(200, 0);
8383
try {
8484
String clusterName = exchange.getRequestHeaders().getFirst(CLUSTER_NAME);
85-
Channel channel = server.getChannel(clusterName);
85+
JChannel channel = server.getChannel(clusterName);
8686
try (InputStream stream = exchange.getRequestBody()) {
8787
handlePingRequest(channel, stream);
8888
}

common/src/main/java/org/jgroups/ping/common/server/Server.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,14 @@
1616

1717
package org.jgroups.ping.common.server;
1818

19-
import org.jgroups.Channel;
19+
import org.jgroups.JChannel;
2020

2121
/**
2222
* @author <a href="mailto:[email protected]">Ales Justin</a>
2323
*/
2424
public interface Server {
2525
public static final String CLUSTER_NAME = "CLUSTER_NAME";
26-
public boolean start(Channel channel) throws Exception;
27-
public boolean stop(Channel channel);
28-
public Channel getChannel(String clusterName);
26+
public boolean start(JChannel channel) throws Exception;
27+
public boolean stop(JChannel channel);
28+
public JChannel getChannel(String clusterName);
2929
}

common/src/main/java/org/jgroups/ping/common/server/UndertowServer.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,14 @@
1616

1717
package org.jgroups.ping.common.server;
1818

19+
import java.io.InputStream;
20+
21+
import org.jgroups.JChannel;
22+
1923
import io.undertow.Undertow;
2024
import io.undertow.server.HttpHandler;
2125
import io.undertow.server.HttpServerExchange;
2226

23-
import java.io.InputStream;
24-
25-
import org.jgroups.Channel;
26-
2727
/**
2828
* @author <a href="mailto:[email protected]">Ales Justin</a>
2929
*/
@@ -34,7 +34,7 @@ public UndertowServer(int port) {
3434
super(port);
3535
}
3636

37-
public synchronized boolean start(Channel channel) throws Exception {
37+
public synchronized boolean start(JChannel channel) throws Exception {
3838
boolean started = false;
3939
if (server == null) {
4040
try {
@@ -53,7 +53,7 @@ public synchronized boolean start(Channel channel) throws Exception {
5353
return started;
5454
}
5555

56-
public synchronized boolean stop(Channel channel) {
56+
public synchronized boolean stop(JChannel channel) {
5757
boolean stopped = false;
5858
removeChannel(channel);
5959
if (server != null && !hasChannels()) {
@@ -82,7 +82,7 @@ public void handleRequest(HttpServerExchange exchange) throws Exception {
8282

8383
exchange.startBlocking();
8484
String clusterName = exchange.getRequestHeaders().getFirst(CLUSTER_NAME);
85-
Channel channel = server.getChannel(clusterName);
85+
JChannel channel = server.getChannel(clusterName);
8686
try (InputStream stream = exchange.getInputStream()) {
8787
handlePingRequest(channel, stream);
8888
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package org.openshift.ping.common.compatibility;
2+
3+
/**
4+
* Thrown on incompatibility errors
5+
*
6+
* @author Sebastian Łaskawiec
7+
*/
8+
public class CompatibilityException extends RuntimeException {
9+
10+
public CompatibilityException() {
11+
}
12+
13+
public CompatibilityException(String message) {
14+
super(message);
15+
}
16+
17+
public CompatibilityException(String message, Throwable cause) {
18+
super(message, cause);
19+
}
20+
21+
public CompatibilityException(Throwable cause) {
22+
super(cause);
23+
}
24+
25+
public CompatibilityException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
26+
super(message, cause, enableSuppression, writableStackTrace);
27+
}
28+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package org.openshift.ping.common.compatibility;
2+
3+
import org.jgroups.Version;
4+
5+
/**
6+
* A small set of compatibility checking utils
7+
*
8+
* @author Sebastian Łaskawiec
9+
*/
10+
public class CompatibilityUtils {
11+
12+
private CompatibilityUtils() {
13+
}
14+
15+
/**
16+
* @return <code>true</code> when JGroups 4 is on the classpath. <code>false</code> otherwise.
17+
*/
18+
public static boolean isJGroups4() {
19+
return Version.major == 4;
20+
}
21+
}

kube/src/test/java/org/jgroups/ping/kube/test/PingTestBase.java

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,11 @@
1818

1919
import java.util.ArrayList;
2020
import java.util.List;
21+
import java.util.Objects;
22+
import java.util.concurrent.TimeoutException;
2123

2224
import org.jgroups.JChannel;
25+
import org.jgroups.View;
2326
import org.jgroups.util.Util;
2427
import org.junit.Assert;
2528
import org.junit.Test;
@@ -45,7 +48,7 @@ public void testRestart() throws Exception {
4548
}
4649

4750
protected void doTestCluster() throws Exception {
48-
Util.waitUntilAllChannelsHaveSameSize(10000, 1000, channels);
51+
waitUntilAllChannelsHaveSameView(10000, 1000, channels);
4952

5053
// Tests unicasts from the first to the last
5154
JChannel first = channels[0], last = channels[getNum() - 1];
@@ -96,4 +99,38 @@ protected void doTestCluster() throws Exception {
9699
clearReceivers();
97100
}
98101

102+
/**
103+
* This method has been copied from JGroups. It changed name a couple of times, so we should
104+
* have a safety copy here...
105+
*/
106+
public static void waitUntilAllChannelsHaveSameView(long timeout, long interval, JChannel... channels) throws TimeoutException {
107+
if(interval >= timeout || timeout <= 0)
108+
throw new IllegalArgumentException("interval needs to be smaller than timeout or timeout needs to be > 0");
109+
long target_time=System.currentTimeMillis() + timeout;
110+
while(System.currentTimeMillis() <= target_time) {
111+
boolean all_channels_have_correct_view=true;
112+
View first=channels[0].getView();
113+
for(JChannel ch : channels) {
114+
View view=ch.getView();
115+
if(!Objects.equals(view, first) || view.size() != channels.length) {
116+
all_channels_have_correct_view=false;
117+
break;
118+
}
119+
}
120+
if(all_channels_have_correct_view)
121+
return;
122+
Util.sleep(interval);
123+
}
124+
View[] views=new View[channels.length];
125+
StringBuilder sb=new StringBuilder();
126+
for(int i=0; i < channels.length; i++) {
127+
views[i]=channels[i].getView();
128+
sb.append(channels[i].getName()).append(": ").append(views[i]).append("\n");
129+
}
130+
View first=channels[0].getView();
131+
for(View view : views)
132+
if(!Objects.equals(view, first))
133+
throw new TimeoutException("Timeout " + timeout + " kicked in, views are:\n" + sb);
134+
}
135+
99136
}

0 commit comments

Comments
 (0)