Skip to content

Commit eac1a43

Browse files
author
Juraj Veverka
committed
implemented websocket server and client support
1 parent 4064936 commit eac1a43

File tree

12 files changed

+341
-14
lines changed

12 files changed

+341
-14
lines changed

hazelcast-cluster/it-tests/build.gradle

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@ targetCompatibility = 10
2020
dependencies {
2121
compile 'org.slf4j:slf4j-api:1.8.0-beta2'
2222
compile 'org.slf4j:slf4j-simple:1.8.0-beta2'
23+
24+
compile 'org.eclipse.jetty.websocket:websocket-client:9.4.11.v20180605'
25+
2326
testCompile 'org.testng:testng:6.14.3'
2427
}
2528

hazelcast-cluster/it-tests/src/main/java/itx/hazelcast/cluster/client/ClientApp.java

Lines changed: 0 additions & 4 deletions
This file was deleted.
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package itx.hazelcast.cluster.client;
2+
3+
import itx.hazelcast.cluster.client.wsclient.SessionListener;
4+
import itx.hazelcast.cluster.client.wsclient.WsClient;
5+
6+
import java.util.concurrent.CountDownLatch;
7+
import java.util.concurrent.TimeUnit;
8+
9+
public class MainClient {
10+
11+
public static void main(String[] args) throws Exception {
12+
13+
CountDownLatch cl = new CountDownLatch(1);
14+
SessionListener sessionListener = new SessionListener() {
15+
16+
@Override
17+
public void onSessionReady() {
18+
cl.countDown();
19+
}
20+
21+
@Override
22+
public void onTextMessage(String message) {
23+
24+
}
25+
26+
@Override
27+
public void onByteMessage(byte[] message) {
28+
29+
}
30+
31+
@Override
32+
public void onSessionClose() {
33+
34+
}
35+
36+
};
37+
38+
WsClient clientApp = new WsClient("ws://localhost:8080/data/websocket", sessionListener);
39+
clientApp.start();
40+
cl.await(10, TimeUnit.SECONDS);
41+
clientApp.sendTextMessage("hello world");
42+
clientApp.sendByteMessage("hi there".getBytes());
43+
clientApp.stop();
44+
}
45+
46+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package itx.hazelcast.cluster.client.wsclient;
2+
3+
public interface SessionListener {
4+
5+
void onSessionReady();
6+
7+
void onTextMessage(String message);
8+
9+
void onByteMessage(byte[] message);
10+
11+
void onSessionClose();
12+
13+
}
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
package itx.hazelcast.cluster.client.wsclient;
2+
3+
import org.eclipse.jetty.websocket.api.Session;
4+
import org.eclipse.jetty.websocket.api.StatusCode;
5+
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
6+
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
7+
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
8+
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
9+
import org.slf4j.Logger;
10+
import org.slf4j.LoggerFactory;
11+
12+
import java.io.IOException;
13+
import java.io.InputStream;
14+
import java.nio.ByteBuffer;
15+
import java.util.concurrent.CountDownLatch;
16+
import java.util.concurrent.TimeUnit;
17+
18+
@WebSocket(maxTextMessageSize = 64 * 1024)
19+
public class SimpleWebSocket {
20+
21+
final private static Logger LOG = LoggerFactory.getLogger(SimpleWebSocket.class);
22+
23+
private final CountDownLatch openLatch;
24+
private final CountDownLatch closeLatch;
25+
private Session session;
26+
private SessionListener sessionListener;
27+
28+
public SimpleWebSocket(SessionListener sessionListener) {
29+
this.openLatch = new CountDownLatch(1);
30+
this.closeLatch = new CountDownLatch(1);
31+
this.sessionListener = sessionListener;
32+
}
33+
34+
public boolean awaitClose(int duration, TimeUnit unit) throws InterruptedException {
35+
return this.closeLatch.await(duration,unit);
36+
}
37+
38+
public boolean awaitOpen(int duration, TimeUnit unit) throws InterruptedException {
39+
return this.openLatch.await(duration,unit);
40+
}
41+
42+
@OnWebSocketClose
43+
public void onClose(int statusCode, String reason) {
44+
LOG.info("Connection closed: {} - {}", statusCode, reason);
45+
this.session = null;
46+
this.closeLatch.countDown(); // trigger latch
47+
this.sessionListener.onSessionClose();
48+
}
49+
50+
@OnWebSocketConnect
51+
public void onConnect(Session session) {
52+
LOG.info("Got connect: {}",session);
53+
this.session = session;
54+
this.sessionListener.onSessionReady();
55+
this.openLatch.countDown();
56+
}
57+
58+
@OnWebSocketMessage
59+
public void onMessage(String msg) {
60+
LOG.info("Got msg: {}",msg);
61+
this.sessionListener.onTextMessage(msg);
62+
}
63+
64+
@OnWebSocketMessage
65+
public void onMessage(InputStream stream) {
66+
try {
67+
this.sessionListener.onByteMessage(stream.readAllBytes());
68+
} catch (IOException e) {
69+
LOG.error("Error: ", e);
70+
}
71+
}
72+
73+
public void close() {
74+
if (session != null) {
75+
this.session.close(StatusCode.NORMAL, "Closing");
76+
}
77+
}
78+
79+
public void sendTextMessage(String message) throws IOException {
80+
session.getRemote().sendString(message);
81+
}
82+
83+
public void sendByteMessage(byte[] message) throws IOException {
84+
session.getRemote().sendBytes(ByteBuffer.wrap(message));
85+
}
86+
87+
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package itx.hazelcast.cluster.client.wsclient;
2+
3+
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
4+
import org.eclipse.jetty.websocket.client.WebSocketClient;
5+
6+
import java.io.IOException;
7+
import java.net.URI;
8+
import java.util.concurrent.TimeUnit;
9+
10+
public class WsClient {
11+
12+
private WebSocketClient client;
13+
private String destUri;
14+
private SessionListener sessionListener;
15+
private SimpleWebSocket socket;
16+
17+
public WsClient(String destUri, SessionListener sessionListener) {
18+
this.destUri = destUri;
19+
this.sessionListener = sessionListener;
20+
}
21+
22+
public void start() throws Exception {
23+
client = new WebSocketClient();
24+
client.start();
25+
URI echoUri = new URI(destUri);
26+
ClientUpgradeRequest request = new ClientUpgradeRequest();
27+
socket = new SimpleWebSocket(sessionListener);
28+
client.connect(socket, echoUri, request);
29+
socket.awaitOpen(10, TimeUnit.SECONDS);
30+
}
31+
32+
public void sendTextMessage(String message) throws IOException {
33+
socket.sendTextMessage(message);
34+
}
35+
36+
public void sendByteMessage(byte[] message) throws IOException {
37+
socket.sendByteMessage(message);
38+
}
39+
40+
public void stop() throws Exception {
41+
socket.close();
42+
socket.awaitClose(10, TimeUnit.SECONDS);
43+
client.stop();
44+
}
45+
46+
}

hazelcast-cluster/server-node/build.gradle

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@ dependencies {
2020
compile "org.eclipse.jetty:jetty-server:${jettyVersion}"
2121
compile "org.eclipse.jetty:jetty-servlet:${jettyVersion}"
2222

23+
compile "org.eclipse.jetty.websocket:websocket-api:${jettyVersion}"
24+
compile "org.eclipse.jetty.websocket:websocket-server:${jettyVersion}"
25+
compile "org.eclipse.jetty.websocket:websocket-client:${jettyVersion}"
26+
2327
compile "com.fasterxml.jackson.core:jackson-databind:2.9.6"
2428
compile "javax.xml.bind:jaxb-api:2.3.0"
2529

hazelcast-cluster/server-node/src/main/java/itx/hazelcast/cluster/server/hazelcast/ServerApp.java

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import itx.hazelcast.cluster.server.rest.RestApplication;
1515
import itx.hazelcast.cluster.server.services.DataService;
1616
import itx.hazelcast.cluster.server.services.DataServiceImpl;
17+
import itx.hazelcast.cluster.server.websocket.WsServlet;
1718
import org.eclipse.jetty.server.HttpConfiguration;
1819
import org.eclipse.jetty.server.HttpConnectionFactory;
1920
import org.eclipse.jetty.server.Server;
@@ -45,22 +46,23 @@ public ServerApp() {
4546
public void startServer() throws Exception {
4647
LOG.info("starting hazelcast ...");
4748
executorService = Executors.newSingleThreadExecutor();
48-
49+
// Register serializers in hazelcast configuration
4950
SerializerConfig sc = new SerializerConfig()
5051
.setImplementation(new InstanceInfoSerializer())
5152
.setTypeClass(InstanceInfo.class);
5253
Config config = new Config();
5354
config.getSerializationConfig().addSerializerConfig(sc);
54-
55+
// Create hazelcast instance
5556
hazelcastInstance = Hazelcast.newHazelcastInstance(config);
5657
MembershipListener membershipListener = new MembershipListenerImpl();
58+
// listen on cluster events
5759
Cluster cluster = hazelcastInstance.getCluster();
5860
cluster.addMembershipListener(membershipListener);
59-
61+
// populate clusterInfo map
6062
Map<String, InstanceInfo> clusterInfo = hazelcastInstance.getMap( "clusterInfo" );
6163
InstanceInfo instanceInfo = createInstanceInfo(cluster.getLocalMember(), clusterInfo);
6264
clusterInfo.put(instanceInfo.getId(), instanceInfo);
63-
65+
// register leadership listener
6466
GateKeepingListener gateKeepingListener = new GateKeepingListenerImpl();
6567
GateKeeperRunnable gateKeeperRunnable = new GateKeeperRunnable(executorService, hazelcastInstance, gateKeepingListener);
6668
executorService.submit(gateKeeperRunnable);
@@ -76,13 +78,15 @@ public void startServer() throws Exception {
7678
ServletContainer restServletContainer = new ServletContainer(resourceConfig);
7779
ServletHolder restServletHolder = new ServletHolder(restServletContainer);
7880
context.addServlet(restServletHolder, "/rest");
79-
81+
// Register websocket handlers
82+
ServletHolder webSocketHolder = new ServletHolder(new WsServlet());
83+
context.addServlet(webSocketHolder, "/websocket");
84+
// Setup http connectors
8085
HttpConfiguration httpConfig = new HttpConfiguration();
8186
HttpConnectionFactory httpConnectionFactory = new HttpConnectionFactory(httpConfig);
8287
ServerConnector http = new ServerConnector(server, httpConnectionFactory);
8388
http.setPort(instanceInfo.getWebServerPort());
8489
server.addConnector(http);
85-
8690
server.start();
8791
LOG.info("init sequence done.");
8892
}
@@ -93,20 +97,22 @@ public void stopServer() throws Exception {
9397
server.stop();
9498
}
9599
if (hazelcastInstance != null) {
96-
ISemaphore semaphore = hazelcastInstance.getSemaphore("gatekeeper");
97-
semaphore.release();
100+
//ISemaphore semaphore = hazelcastInstance.getSemaphore("gatekeeper");
101+
//semaphore.release();
98102
//hazelcastInstance.shutdown();
99103
}
100104
executorService.shutdown();
101105
}
102106

103107
private InstanceInfo createInstanceInfo(Member member, Map<String, InstanceInfo> clusterinfo) throws UnknownHostException {
104108
LOG.info("clusterinfo size {}", clusterinfo.size());
105-
int webServerPort = 8080;
109+
int webServerPort = 8080; //let's start with 8080 and check if this port is free
106110
InetSocketAddress inetSocketAddress = member.getAddress().getInetSocketAddress();
107111
for (InstanceInfo i: clusterinfo.values()) {
108112
if (inetSocketAddress.getHostName().equals(i.getAddress().getHostName())) {
109-
webServerPort++;
113+
if (webServerPort == i.getWebServerPort()) {
114+
webServerPort = i.getWebServerPort() + 1;
115+
}
110116
}
111117
}
112118

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package itx.hazelcast.cluster.server.websocket;
2+
3+
import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest;
4+
import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
5+
import org.eclipse.jetty.websocket.servlet.WebSocketCreator;
6+
import org.slf4j.Logger;
7+
import org.slf4j.LoggerFactory;
8+
9+
import java.util.Map;
10+
import java.util.concurrent.ConcurrentHashMap;
11+
import java.util.concurrent.atomic.AtomicLong;
12+
13+
public class WebSocketCreatorImpl implements WebSocketCreator, WebSocketUnregisterService {
14+
15+
private static final Logger LOG = LoggerFactory.getLogger(WebSocketCreatorImpl.class);
16+
17+
private final Map<Long, WsEndpoint> wsEndPoints;
18+
private final AtomicLong atomicLong;
19+
20+
public WebSocketCreatorImpl() {
21+
this.wsEndPoints = new ConcurrentHashMap<>();
22+
this.atomicLong = new AtomicLong(0);
23+
}
24+
25+
@Override
26+
public Object createWebSocket(ServletUpgradeRequest req, ServletUpgradeResponse resp) {
27+
LOG.info("createWebSocket ...");
28+
Long id = atomicLong.getAndIncrement();
29+
WsEndpoint wsEndpoint = new WsEndpoint(id, this);
30+
wsEndPoints.put(id, wsEndpoint);
31+
return wsEndpoint;
32+
}
33+
34+
@Override
35+
public void unregister(Long id) {
36+
wsEndPoints.remove(id);
37+
}
38+
39+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package itx.hazelcast.cluster.server.websocket;
2+
3+
public interface WebSocketUnregisterService {
4+
5+
void unregister(Long id);
6+
7+
}

0 commit comments

Comments
 (0)