Skip to content

Commit 55f0c4e

Browse files
author
Juraj Veverka
committed
implemented integration test using websocket client
1 parent eac1a43 commit 55f0c4e

33 files changed

+837
-175
lines changed

hazelcast-cluster/README.md

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,4 +22,17 @@ First server-node started will use port 8080, second server-node in cluster will
2222
third will use 8082 an so on.
2323

2424
## Run integration tests
25-
TODO
25+
In order to run integration tests, JDK 11 and tmux has to be installed.
26+
1. Build project and distribution.
27+
```
28+
gradle clean build installDist
29+
```
30+
2. Start 3-node cluster in new terminal using script below.
31+
Wait till cluster is formed.
32+
```
33+
./start-3-node-cluster.sh
34+
```
35+
3. While cluster is running, run integration tests in new terminal window.
36+
```
37+
gradle clean test -Dtest.profile=integration
38+
```

hazelcast-cluster/common/build.gradle

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ repositories {
2121
sourceCompatibility = 11
2222
targetCompatibility = 11
2323

24-
def grpcVersion = '1.13.2'
24+
def grpcVersion = '1.14.0'
2525
mainClassName = 'itx.examples.grpc.service.Main'
2626

2727
dependencies {
@@ -42,7 +42,7 @@ dependencies {
4242

4343
protobuf {
4444
protoc {
45-
artifact = 'com.google.protobuf:protoc:3.3.0'
45+
artifact = 'com.google.protobuf:protoc:3.6.1'
4646
}
4747
plugins {
4848
grpc {

hazelcast-cluster/common/src/main/proto/messages.proto

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,3 +14,44 @@ message Address {
1414
string hostName = 1;
1515
int32 port = 2;
1616
}
17+
18+
message SubscribeToTopicRequest {
19+
string topicId = 1;
20+
int64 requestId = 2;
21+
}
22+
23+
message SubscribeToTopicResponse {
24+
int64 requestId = 2;
25+
string subscriptionId = 3;
26+
}
27+
28+
message UnsubscribeFromTopicRequest {
29+
int64 requestId = 1;
30+
string subscriptionId = 2;
31+
}
32+
33+
message UnsubscribeFromTopicResponse {
34+
int64 requestId = 1;
35+
}
36+
37+
message DataMessage {
38+
string topicId = 1;
39+
string message = 2;
40+
}
41+
42+
message DataMessageEvent {
43+
string topicId = 1;
44+
string message = 2;
45+
string subscriptionId = 3;
46+
}
47+
48+
message MessageWrapper {
49+
oneof payload {
50+
SubscribeToTopicRequest subscribeToTopicRequest = 1;
51+
SubscribeToTopicResponse subscribeToTopicResponse = 2;
52+
UnsubscribeFromTopicRequest unsubscribeFromTopicRequest = 3;
53+
UnsubscribeFromTopicResponse unsubscribeFromTopicResponse = 4;
54+
DataMessage dataMessage = 5;
55+
DataMessageEvent dataMessageEvent = 6;
56+
}
57+
}

hazelcast-cluster/it-tests/build.gradle

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,10 @@ test {
1212
testLogging {
1313
events "passed", "skipped", "failed"
1414
}
15+
16+
if (System.properties['test.profile'] != 'integration') {
17+
exclude '**/*ITTest*'
18+
}
1519
}
1620

1721
sourceCompatibility = 10

hazelcast-cluster/it-tests/src/main/java/itx/hazelcast/cluster/client/MainClient.java

Lines changed: 0 additions & 46 deletions
This file was deleted.

hazelcast-cluster/it-tests/src/main/java/itx/hazelcast/cluster/client/wsclient/SessionListener.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
package itx.hazelcast.cluster.client.wsclient;
22

3+
import itx.hazelcast.cluster.dto.MessageWrapper;
4+
35
public interface SessionListener {
46

57
void onSessionReady();
68

79
void onTextMessage(String message);
810

9-
void onByteMessage(byte[] message);
11+
void onMessageWrapper(MessageWrapper messageWrapper);
1012

1113
void onSessionClose();
1214

hazelcast-cluster/it-tests/src/main/java/itx/hazelcast/cluster/client/wsclient/SimpleWebSocket.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package itx.hazelcast.cluster.client.wsclient;
22

3+
import itx.hazelcast.cluster.dto.MessageWrapper;
34
import org.eclipse.jetty.websocket.api.Session;
45
import org.eclipse.jetty.websocket.api.StatusCode;
56
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
@@ -49,10 +50,10 @@ public void onClose(int statusCode, String reason) {
4950

5051
@OnWebSocketConnect
5152
public void onConnect(Session session) {
52-
LOG.info("Got connect: {}",session);
53+
LOG.info("onConnect");
5354
this.session = session;
54-
this.sessionListener.onSessionReady();
5555
this.openLatch.countDown();
56+
this.sessionListener.onSessionReady();
5657
}
5758

5859
@OnWebSocketMessage
@@ -62,9 +63,10 @@ public void onMessage(String msg) {
6263
}
6364

6465
@OnWebSocketMessage
65-
public void onMessage(InputStream stream) {
66+
public void onMessage(InputStream inputStream) {
6667
try {
67-
this.sessionListener.onByteMessage(stream.readAllBytes());
68+
MessageWrapper messageWrapper = MessageWrapper.parseFrom(inputStream.readAllBytes());
69+
this.sessionListener.onMessageWrapper(messageWrapper);
6870
} catch (IOException e) {
6971
LOG.error("Error: ", e);
7072
}

hazelcast-cluster/it-tests/src/main/java/itx/hazelcast/cluster/client/wsclient/WsClient.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package itx.hazelcast.cluster.client.wsclient;
22

3+
import itx.hazelcast.cluster.dto.MessageWrapper;
34
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
45
import org.eclipse.jetty.websocket.client.WebSocketClient;
56

@@ -33,8 +34,8 @@ public void sendTextMessage(String message) throws IOException {
3334
socket.sendTextMessage(message);
3435
}
3536

36-
public void sendByteMessage(byte[] message) throws IOException {
37-
socket.sendByteMessage(message);
37+
public void sendMessageWrapper(MessageWrapper messageWrapper) throws IOException {
38+
socket.sendByteMessage(messageWrapper.toByteArray());
3839
}
3940

4041
public void stop() throws Exception {
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package itx.hazelcast.cluster.client.tests;
2+
3+
import itx.hazelcast.cluster.client.wsclient.SessionListener;
4+
import itx.hazelcast.cluster.dto.MessageWrapper;
5+
6+
import java.util.ArrayList;
7+
import java.util.Collections;
8+
import java.util.List;
9+
import java.util.concurrent.CountDownLatch;
10+
import java.util.concurrent.TimeUnit;
11+
12+
public class BlockingSessionListener implements SessionListener {
13+
14+
private final CountDownLatch clStart;
15+
private CountDownLatch clMessageWrapper;
16+
private final List<MessageWrapper> messageBuffer;
17+
18+
public BlockingSessionListener() {
19+
this.clStart = new CountDownLatch(1);
20+
this.messageBuffer = new ArrayList<>();
21+
}
22+
23+
@Override
24+
public void onSessionReady() {
25+
clStart.countDown();
26+
}
27+
28+
public void awaitForSessionReady(long time, TimeUnit timeUnit) throws InterruptedException {
29+
clStart.await(time, timeUnit);
30+
}
31+
32+
public void resetMessageBuffer(int expectedMessageCount) {
33+
clMessageWrapper = new CountDownLatch(expectedMessageCount);
34+
messageBuffer.clear();
35+
}
36+
37+
public List<MessageWrapper> awaitMessages(long time, TimeUnit timeUnit) throws InterruptedException {
38+
clMessageWrapper.await(time, timeUnit);
39+
return Collections.unmodifiableList(messageBuffer);
40+
}
41+
42+
@Override
43+
public void onTextMessage(String message) {
44+
}
45+
46+
@Override
47+
public void onMessageWrapper(MessageWrapper messageWrapper) {
48+
messageBuffer.add(messageWrapper);
49+
clMessageWrapper.countDown();
50+
}
51+
52+
@Override
53+
public void onSessionClose() {
54+
55+
}
56+
57+
}
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
package itx.hazelcast.cluster.client.tests;
2+
3+
import itx.hazelcast.cluster.client.wsclient.WsClient;
4+
import itx.hazelcast.cluster.dto.DataMessage;
5+
import itx.hazelcast.cluster.dto.DataMessageEvent;
6+
import itx.hazelcast.cluster.dto.MessageWrapper;
7+
import itx.hazelcast.cluster.dto.SubscribeToTopicRequest;
8+
import itx.hazelcast.cluster.dto.SubscribeToTopicResponse;
9+
import itx.hazelcast.cluster.dto.UnsubscribeFromTopicRequest;
10+
import itx.hazelcast.cluster.dto.UnsubscribeFromTopicResponse;
11+
import org.testng.Assert;
12+
import org.testng.annotations.Test;
13+
14+
import java.util.List;
15+
import java.util.concurrent.TimeUnit;
16+
17+
public class SingleNodeITTest {
18+
19+
@Test
20+
public void testSingleNodeClientMessage() throws Exception {
21+
String topic = "topic1";
22+
BlockingSessionListener sessionListener = new BlockingSessionListener();
23+
24+
// start client
25+
WsClient client = new WsClient("ws://localhost:8080/data/websocket", sessionListener);
26+
client.start();
27+
sessionListener.awaitForSessionReady(10, TimeUnit.SECONDS);
28+
29+
// subscribe to topic and wait for subscription
30+
sessionListener.resetMessageBuffer(1);
31+
SubscribeToTopicRequest subscribeToTopicRequest = SubscribeToTopicRequest.newBuilder()
32+
.setRequestId(1)
33+
.setTopicId(topic)
34+
.build();
35+
MessageWrapper messageWrapper = MessageWrapper.newBuilder()
36+
.setSubscribeToTopicRequest(subscribeToTopicRequest)
37+
.build();
38+
39+
client.sendMessageWrapper(messageWrapper);
40+
41+
List<MessageWrapper> messageWrappers = sessionListener.awaitMessages(10, TimeUnit.SECONDS);
42+
SubscribeToTopicResponse subscribeToTopicResponse = messageWrappers.get(0).getSubscribeToTopicResponse();
43+
Assert.assertNotNull(subscribeToTopicResponse);
44+
Assert.assertEquals(subscribeToTopicResponse.getRequestId(), subscribeToTopicRequest.getRequestId());
45+
Assert.assertNotNull(subscribeToTopicResponse.getSubscriptionId());
46+
47+
// send data message to topic and wait for message event from same topic
48+
sessionListener.resetMessageBuffer(1);
49+
DataMessage dataMessage = DataMessage.newBuilder()
50+
.setTopicId(topic)
51+
.setMessage("hello world")
52+
.build();
53+
messageWrapper = MessageWrapper.newBuilder()
54+
.setDataMessage(dataMessage)
55+
.build();
56+
client.sendMessageWrapper(messageWrapper);
57+
58+
messageWrappers = sessionListener.awaitMessages(10, TimeUnit.SECONDS);
59+
DataMessageEvent dataMessageEvent = messageWrappers.get(0).getDataMessageEvent();
60+
Assert.assertNotNull(dataMessageEvent);
61+
Assert.assertEquals(dataMessage.getMessage(), dataMessageEvent.getMessage());
62+
Assert.assertEquals(dataMessage.getTopicId(), dataMessageEvent.getTopicId());
63+
Assert.assertEquals(subscribeToTopicResponse.getSubscriptionId(), dataMessageEvent.getSubscriptionId());
64+
65+
// unsubscribe from topic and wait for confirmation
66+
sessionListener.resetMessageBuffer(1);
67+
UnsubscribeFromTopicRequest unsubscribeFromTopicRequest = UnsubscribeFromTopicRequest.newBuilder()
68+
.setRequestId(2)
69+
.setSubscriptionId(subscribeToTopicResponse.getSubscriptionId())
70+
.build();
71+
messageWrapper = MessageWrapper.newBuilder()
72+
.setUnsubscribeFromTopicRequest(unsubscribeFromTopicRequest)
73+
.build();
74+
client.sendMessageWrapper(messageWrapper);
75+
messageWrappers = sessionListener.awaitMessages(10, TimeUnit.SECONDS);
76+
UnsubscribeFromTopicResponse unsubscribeFromTopicResponse = messageWrappers.get(0).getUnsubscribeFromTopicResponse();
77+
Assert.assertNotNull(unsubscribeFromTopicResponse);
78+
Assert.assertEquals(unsubscribeFromTopicRequest.getRequestId(), unsubscribeFromTopicResponse.getRequestId());
79+
80+
// stop client
81+
client.stop();
82+
}
83+
84+
}

0 commit comments

Comments
 (0)