Skip to content

Commit ca3f975

Browse files
authored
Merge pull request #24 from slaskawi/CLOUD-974/JGroups_4
CLOUD-974 JGroups 4 support
2 parents af6260c + 7a87fab commit ca3f975

File tree

13 files changed

+160
-34
lines changed

13 files changed

+160
-34
lines changed

common/src/main/java/org/openshift/ping/common/OpenshiftPing.java

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.io.DataInputStream;
2323
import java.io.DataOutputStream;
2424
import java.io.InputStream;
25+
import java.lang.reflect.Method;
2526
import java.net.HttpURLConnection;
2627
import java.net.InetSocketAddress;
2728
import java.net.URL;
@@ -33,6 +34,8 @@
3334
import org.jgroups.Message;
3435
import org.jgroups.annotations.Property;
3536
import org.jgroups.protocols.PING;
37+
import org.openshift.ping.common.compatibility.CompatibilityException;
38+
import org.openshift.ping.common.compatibility.CompatibilityUtils;
3639
import org.openshift.ping.common.server.Server;
3740
import org.openshift.ping.common.server.ServerFactory;
3841
import org.openshift.ping.common.server.Servers;
@@ -63,9 +66,20 @@ public abstract class OpenshiftPing extends PING {
6366
private Server _server;
6467
private String _serverName;
6568

69+
private static Method sendMethod; //handled via reflection due to JGroups 3/4 incompatibility
70+
6671
public OpenshiftPing(String systemEnvPrefix) {
6772
super();
6873
_systemEnvPrefix = trimToNull(systemEnvPrefix);
74+
try {
75+
if(CompatibilityUtils.isJGroups4()) {
76+
sendMethod = this.getClass().getMethod("up", Message.class);
77+
} else {
78+
sendMethod = this.getClass().getMethod("up", Event.class);
79+
}
80+
} catch (Exception e) {
81+
throw new CompatibilityException("Could not find suitable 'up' method.", e);
82+
}
6983
}
7084

7185
protected final String getSystemEnvName(String systemEnvSuffix) {
@@ -193,12 +207,24 @@ public void handlePingRequest(InputStream stream) throws Exception {
193207
Message msg = new Message();
194208
msg.readFrom(dataInput);
195209
try {
196-
up(new Event(Event.MSG, msg));
210+
sendUp(msg);
197211
} catch (Exception e) {
198212
log.error("Error processing GET_MBRS_REQ.", e);
199213
}
200214
}
201215

216+
private void sendUp(Message msg) {
217+
try {
218+
if(CompatibilityUtils.isJGroups4()) {
219+
sendMethod.invoke(this, msg);
220+
} else {
221+
sendMethod.invoke(this, new Event(1, msg));
222+
}
223+
} catch (Exception e) {
224+
throw new CompatibilityException("Could not invoke 'up' method.", e);
225+
}
226+
}
227+
202228
private List<InetSocketAddress> readAll() {
203229
if (isClusteringEnabled()) {
204230
return doReadAll(clusterName);
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package org.openshift.ping.common.compatibility;
2+
3+
/**
4+
* Thrown on incompatibility errors
5+
*
6+
* @author Sebastian Łaskawiec
7+
*/
8+
public class CompatibilityException extends RuntimeException {
9+
10+
public CompatibilityException() {
11+
}
12+
13+
public CompatibilityException(String message) {
14+
super(message);
15+
}
16+
17+
public CompatibilityException(String message, Throwable cause) {
18+
super(message, cause);
19+
}
20+
21+
public CompatibilityException(Throwable cause) {
22+
super(cause);
23+
}
24+
25+
public CompatibilityException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
26+
super(message, cause, enableSuppression, writableStackTrace);
27+
}
28+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package org.openshift.ping.common.compatibility;
2+
3+
import org.jgroups.Version;
4+
5+
/**
6+
* A small set of compatibility checking utils
7+
*
8+
* @author Sebastian Łaskawiec
9+
*/
10+
public class CompatibilityUtils {
11+
12+
private CompatibilityUtils() {
13+
}
14+
15+
/**
16+
* @return <code>true</code> when JGroups 4 is on the classpath. <code>false</code> otherwise.
17+
*/
18+
public static boolean isJGroups4() {
19+
return Version.major == 4;
20+
}
21+
}

common/src/main/java/org/openshift/ping/common/server/AbstractServer.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import java.util.HashMap;
2222
import java.util.Map;
2323

24-
import org.jgroups.Channel;
2524
import org.jgroups.JChannel;
2625
import org.openshift.ping.common.OpenshiftPing;
2726

@@ -31,13 +30,13 @@
3130
public abstract class AbstractServer implements Server {
3231

3332
protected final int port;
34-
protected final Map<String, Channel> CHANNELS = new HashMap<String, Channel>();
33+
protected final Map<String, JChannel> CHANNELS = new HashMap<>();
3534

3635
protected AbstractServer(int port) {
3736
this.port = port;
3837
}
3938

40-
public final Channel getChannel(String clusterName) {
39+
public final JChannel getChannel(String clusterName) {
4140
if (clusterName != null) {
4241
synchronized (CHANNELS) {
4342
return CHANNELS.get(clusterName);
@@ -46,7 +45,7 @@ public final Channel getChannel(String clusterName) {
4645
return null;
4746
}
4847

49-
protected final void addChannel(Channel channel) {
48+
protected final void addChannel(JChannel channel) {
5049
String clusterName = getClusterName(channel);
5150
if (clusterName != null) {
5251
synchronized (CHANNELS) {
@@ -55,7 +54,7 @@ protected final void addChannel(Channel channel) {
5554
}
5655
}
5756

58-
protected final void removeChannel(Channel channel) {
57+
protected final void removeChannel(JChannel channel) {
5958
String clusterName = getClusterName(channel);
6059
if (clusterName != null) {
6160
synchronized (CHANNELS) {
@@ -70,11 +69,11 @@ protected final boolean hasChannels() {
7069
}
7170
}
7271

73-
private String getClusterName(final Channel channel) {
72+
private String getClusterName(final JChannel channel) {
7473
if (channel != null) {
7574
String clusterName = channel.getClusterName();
7675
// clusterName will be null if the Channel is not yet connected, but we still need it!
77-
if (clusterName == null && channel instanceof JChannel) {
76+
if (clusterName == null) {
7877
try {
7978
Field field = JChannel.class.getDeclaredField("cluster_name");
8079
field.setAccessible(true);
@@ -85,7 +84,7 @@ private String getClusterName(final Channel channel) {
8584
return null;
8685
}
8786

88-
protected final void handlePingRequest(Channel channel, InputStream stream) throws Exception {
87+
protected final void handlePingRequest(JChannel channel, InputStream stream) throws Exception {
8988
if (channel != null) {
9089
OpenshiftPing handler = (OpenshiftPing) channel.getProtocolStack().findProtocol(OpenshiftPing.class);
9190
handler.handlePingRequest(stream);

common/src/main/java/org/openshift/ping/common/server/JBossServer.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import org.jboss.com.sun.net.httpserver.HttpExchange;
2525
import org.jboss.com.sun.net.httpserver.HttpHandler;
2626
import org.jboss.com.sun.net.httpserver.HttpServer;
27-
import org.jgroups.Channel;
27+
import org.jgroups.JChannel;
2828

2929
/**
3030
* @author <a href="mailto:[email protected]">Ales Justin</a>
@@ -38,7 +38,7 @@ public JBossServer(int port) {
3838
super(port);
3939
}
4040

41-
public synchronized boolean start(Channel channel) throws Exception {
41+
public synchronized boolean start(JChannel channel) throws Exception {
4242
boolean started = false;
4343
if (server == null) {
4444
try {
@@ -57,7 +57,7 @@ public synchronized boolean start(Channel channel) throws Exception {
5757
return started;
5858
}
5959

60-
public synchronized boolean stop(Channel channel) {
60+
public synchronized boolean stop(JChannel channel) {
6161
boolean stopped = false;
6262
removeChannel(channel);
6363
if (server != null && !hasChannels()) {
@@ -82,7 +82,7 @@ public void handle(HttpExchange exchange) throws IOException {
8282
try {
8383
try {
8484
String clusterName = exchange.getRequestHeaders().getFirst(CLUSTER_NAME);
85-
Channel channel = server.getChannel(clusterName);
85+
JChannel channel = server.getChannel(clusterName);
8686
try (InputStream stream = exchange.getRequestBody()) {
8787
handlePingRequest(channel, stream);
8888
}

common/src/main/java/org/openshift/ping/common/server/JDKServer.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import java.net.InetSocketAddress;
2222
import java.util.concurrent.Executors;
2323

24-
import org.jgroups.Channel;
24+
import org.jgroups.JChannel;
2525

2626
import com.sun.net.httpserver.HttpExchange;
2727
import com.sun.net.httpserver.HttpHandler;
@@ -38,7 +38,7 @@ public JDKServer(int port) {
3838
super(port);
3939
}
4040

41-
public synchronized boolean start(Channel channel) throws Exception {
41+
public synchronized boolean start(JChannel channel) throws Exception {
4242
boolean started = false;
4343
if (server == null) {
4444
try {
@@ -57,7 +57,7 @@ public synchronized boolean start(Channel channel) throws Exception {
5757
return started;
5858
}
5959

60-
public synchronized boolean stop(Channel channel) {
60+
public synchronized boolean stop(JChannel channel) {
6161
boolean stopped = false;
6262
removeChannel(channel);
6363
if (server != null && !hasChannels()) {
@@ -82,7 +82,7 @@ public void handle(HttpExchange exchange) throws IOException {
8282
exchange.sendResponseHeaders(200, 0);
8383
try {
8484
String clusterName = exchange.getRequestHeaders().getFirst(CLUSTER_NAME);
85-
Channel channel = server.getChannel(clusterName);
85+
JChannel channel = server.getChannel(clusterName);
8686
try (InputStream stream = exchange.getRequestBody()) {
8787
handlePingRequest(channel, stream);
8888
}

common/src/main/java/org/openshift/ping/common/server/Server.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,14 @@
1616

1717
package org.openshift.ping.common.server;
1818

19-
import org.jgroups.Channel;
19+
import org.jgroups.JChannel;
2020

2121
/**
2222
* @author <a href="mailto:[email protected]">Ales Justin</a>
2323
*/
2424
public interface Server {
2525
public static final String CLUSTER_NAME = "CLUSTER_NAME";
26-
public boolean start(Channel channel) throws Exception;
27-
public boolean stop(Channel channel);
28-
public Channel getChannel(String clusterName);
26+
public boolean start(JChannel channel) throws Exception;
27+
public boolean stop(JChannel channel);
28+
public JChannel getChannel(String clusterName);
2929
}

common/src/main/java/org/openshift/ping/common/server/UndertowServer.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,14 @@
1616

1717
package org.openshift.ping.common.server;
1818

19+
import java.io.InputStream;
20+
21+
import org.jgroups.JChannel;
22+
1923
import io.undertow.Undertow;
2024
import io.undertow.server.HttpHandler;
2125
import io.undertow.server.HttpServerExchange;
2226

23-
import java.io.InputStream;
24-
25-
import org.jgroups.Channel;
26-
2727
/**
2828
* @author <a href="mailto:[email protected]">Ales Justin</a>
2929
*/
@@ -34,7 +34,7 @@ public UndertowServer(int port) {
3434
super(port);
3535
}
3636

37-
public synchronized boolean start(Channel channel) throws Exception {
37+
public synchronized boolean start(JChannel channel) throws Exception {
3838
boolean started = false;
3939
if (server == null) {
4040
try {
@@ -53,7 +53,7 @@ public synchronized boolean start(Channel channel) throws Exception {
5353
return started;
5454
}
5555

56-
public synchronized boolean stop(Channel channel) {
56+
public synchronized boolean stop(JChannel channel) {
5757
boolean stopped = false;
5858
removeChannel(channel);
5959
if (server != null && !hasChannels()) {
@@ -82,7 +82,7 @@ public void handleRequest(HttpServerExchange exchange) throws Exception {
8282

8383
exchange.startBlocking();
8484
String clusterName = exchange.getRequestHeaders().getFirst(CLUSTER_NAME);
85-
Channel channel = server.getChannel(clusterName);
85+
JChannel channel = server.getChannel(clusterName);
8686
try (InputStream stream = exchange.getInputStream()) {
8787
handlePingRequest(channel, stream);
8888
}

kube/src/test/java/org/openshift/ping/kube/test/PingTestBase.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,19 +16,38 @@
1616

1717
package org.openshift.ping.kube.test;
1818

19+
import java.lang.reflect.Method;
1920
import java.util.ArrayList;
2021
import java.util.List;
2122

23+
import org.jgroups.Channel;
2224
import org.jgroups.JChannel;
2325
import org.jgroups.util.Util;
2426
import org.junit.Assert;
2527
import org.junit.Test;
28+
import org.openshift.ping.common.compatibility.CompatibilityException;
29+
import org.openshift.ping.common.compatibility.CompatibilityUtils;
2630

2731
/**
2832
* @author <a href="mailto:[email protected]">Ales Justin</a>
2933
*/
3034
public abstract class PingTestBase extends TestBase {
3135

36+
//handles via reflection because of JGroups 3/4 incompatibility.
37+
private static final Method waitForViewMethod;
38+
39+
static {
40+
try {
41+
if (CompatibilityUtils.isJGroups4()) {
42+
waitForViewMethod = Util.class.getMethod("waitUntilAllChannelsHaveSameView", long.class, long.class, JChannel[].class);
43+
} else {
44+
waitForViewMethod = Util.class.getMethod("waitUntilAllChannelsHaveSameSize", long.class, long.class, Channel[].class);
45+
}
46+
} catch (NoSuchMethodException e) {
47+
throw new CompatibilityException("Could not find proper 'waitUntilAllChannelsHaveSame*' method.", e);
48+
}
49+
}
50+
3251
@Test
3352
public void testCluster() throws Exception {
3453
doTestCluster();
@@ -45,7 +64,7 @@ public void testRestart() throws Exception {
4564
}
4665

4766
protected void doTestCluster() throws Exception {
48-
Util.waitUntilAllChannelsHaveSameSize(10000, 1000, channels);
67+
waitForViewMethod.invoke(null, 10000, 1000, channels);
4968

5069
// Tests unicasts from the first to the last
5170
JChannel first = channels[0], last = channels[getNum() - 1];

0 commit comments

Comments
 (0)