Skip to content

Commit 02d3cc6

Browse files
committed
end-to-end testing
1 parent b02e170 commit 02d3cc6

File tree

5 files changed

+123
-1
lines changed

5 files changed

+123
-1
lines changed

clusterhq/queue/src/main/java/us/hxbc/clusterhq/queue/Api.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,12 @@ private void init() throws IOException {
4545
});
4646
}
4747

48+
public void stop() {
49+
synchronized (topics) {
50+
topics.values().forEach(q -> q.stop());
51+
}
52+
}
53+
4854
@Path("/{topic}/{username}")
4955
@POST
5056
public Response subscribe(@PathParam("topic") String topic,

clusterhq/queue/src/main/java/us/hxbc/clusterhq/queue/DataStore.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,7 @@ public void gc(long needLSN) {
182182
long lsn = Long.parseLong(name, 16);
183183
if (lsn < baseLSN) {
184184
try {
185+
logger.info("gc deleting {}", p);
185186
Files.delete(p);
186187
} catch (IOException e) {
187188
e.printStackTrace();

clusterhq/queue/src/main/java/us/hxbc/clusterhq/queue/Main.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,12 @@
1616
public class Main {
1717
private static Logger logger = LoggerFactory.getLogger(Main.class);
1818
final HttpServer server;
19+
final Api api;
1920

2021
Main(int port, Path dir) throws IOException {
2122
ResourceConfig rc = new ResourceConfig();
22-
rc.registerInstances(new Api(dir, 4096));
23+
api = new Api(dir, 4096);
24+
rc.registerInstances(api);
2325
if (logger.isDebugEnabled()) {
2426
rc.register(new LoggingFilter(java.util.logging.Logger.getGlobal(), false));
2527
}
@@ -30,6 +32,15 @@ void start() throws IOException {
3032
server.start();
3133
}
3234

35+
public int getPort() {
36+
return server.getListeners().stream().findAny().map(n -> n.getPort()).orElse(0);
37+
}
38+
39+
void stop() {
40+
api.stop();
41+
server.shutdownNow();
42+
}
43+
3344
public static void main(String[] args) throws IOException {
3445
if (args.length != 2) {
3546
System.err.println("Usage: Main <port> <dir>");

clusterhq/queue/src/main/java/us/hxbc/clusterhq/queue/Queue.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,7 @@ void spawnGCThread() {
172172
void stop() {
173173
shutdown = true;
174174
try {
175+
gcThread.interrupt();
175176
gcThread.join();
176177
} catch (InterruptedException e) {
177178
e.printStackTrace();
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
package us.hxbc.clusterhq.queue;
2+
3+
import org.junit.After;
4+
import org.junit.Before;
5+
import org.junit.Rule;
6+
import org.junit.Test;
7+
import org.junit.rules.TemporaryFolder;
8+
9+
import javax.ws.rs.client.Client;
10+
import javax.ws.rs.client.ClientBuilder;
11+
import javax.ws.rs.client.WebTarget;
12+
import javax.ws.rs.core.Response;
13+
import java.nio.file.Path;
14+
15+
import static org.assertj.core.api.Assertions.assertThat;
16+
17+
public class MainTest {
18+
@Rule
19+
public TemporaryFolder folder= new TemporaryFolder();
20+
private Path dir;
21+
private Main main;
22+
private int port;
23+
private WebTarget target;
24+
private Client c;
25+
26+
@Before
27+
public void setUp() throws Exception {
28+
dir = folder.newFolder().toPath();
29+
initClient();
30+
}
31+
32+
private void initClient() throws Exception {
33+
main = new Main(0, dir);
34+
main.start();
35+
port = main.getPort();
36+
if (c != null) {
37+
c.close();
38+
}
39+
c = ClientBuilder.newClient();
40+
target = c.target("http://127.0.0.1:" + port);
41+
}
42+
43+
@After
44+
public void tearDown() throws Exception {
45+
main.stop();
46+
}
47+
48+
@Test
49+
public void testSubscribe() throws Exception {
50+
Response resp = target.path("/topic1/user1").request().post(null);
51+
assertThat(resp.getStatus()).isEqualTo(200);
52+
resp = target.path("/topic1/user1").request().post(null);
53+
assertThat(resp.getStatus()).isEqualTo(200);
54+
}
55+
56+
@Test
57+
public void testUnsubscribe() throws Exception {
58+
Response resp;
59+
// unsubscribe without subscribing
60+
resp = target.path("/topic1/user1").request().delete();
61+
assertThat(resp.getStatus()).isEqualTo(404);
62+
63+
// unsubscribe after subscribing
64+
resp = target.path("/topic1/user1").request().post(null);
65+
assertThat(resp.getStatus()).isEqualTo(200);
66+
resp = target.path("/topic1/user1").request().delete();
67+
assertThat(resp.getStatus()).isEqualTo(200);
68+
69+
// unsubscribe again
70+
resp = target.path("/topic1/user1").request().delete();
71+
assertThat(resp.getStatus()).isEqualTo(404);
72+
}
73+
74+
@Test
75+
public void testMessage() throws Exception {
76+
Response resp;
77+
// subscription doesn't exist
78+
resp = target.path("/topic1/user1").request().get();
79+
assertThat(resp.getStatus()).isEqualTo(404);
80+
81+
// no message available
82+
resp = target.path("/topic1/user1").request().post(null);
83+
assertThat(resp.getStatus()).isEqualTo(200);
84+
resp = target.path("/topic1/user1").request().get();
85+
assertThat(resp.getStatus()).isEqualTo(204);
86+
87+
resp = target.path("/topic1").request().post(null);
88+
assertThat(resp.getStatus()).isEqualTo(200);
89+
resp = target.path("/topic1/user1").request().get();
90+
assertThat(resp.getStatus()).isEqualTo(200);
91+
}
92+
93+
@Test
94+
public void testRestart() throws Exception {
95+
Response resp;
96+
resp = target.path("/topic1/user1").request().post(null);
97+
assertThat(resp.getStatus()).isEqualTo(200);
98+
main.stop();
99+
initClient();
100+
resp = target.path("/topic1/user1").request().get();
101+
assertThat(resp.getStatus()).isEqualTo(204);
102+
}
103+
}

0 commit comments

Comments
 (0)