Skip to content

Commit 26cd00b

Browse files
committed
rest api implementation
also changed api to return null InputStream when there's no message
1 parent b223fa3 commit 26cd00b

File tree

6 files changed

+148
-26
lines changed

6 files changed

+148
-26
lines changed
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
package us.hxbc.clusterhq.queue;
2+
3+
import org.glassfish.grizzly.http.server.Request;
4+
5+
import javax.ws.rs.ClientErrorException;
6+
import javax.ws.rs.DELETE;
7+
import javax.ws.rs.GET;
8+
import javax.ws.rs.POST;
9+
import javax.ws.rs.Path;
10+
import javax.ws.rs.PathParam;
11+
import javax.ws.rs.core.Context;
12+
import javax.ws.rs.core.Response;
13+
import java.io.IOException;
14+
import java.util.HashMap;
15+
import java.util.Map;
16+
17+
import static java.util.Objects.requireNonNull;
18+
19+
public class Api {
20+
private final java.nio.file.Path dir;
21+
private final long CHUNK_SIZE;
22+
private Map<String, Queue> topics = new HashMap<>();
23+
24+
Api(java.nio.file.Path dir, long chunkSize) {
25+
this.dir = requireNonNull(dir);
26+
this.CHUNK_SIZE = chunkSize;
27+
}
28+
29+
@Path("/{topic}/{username}")
30+
@POST
31+
public void subscribe(@PathParam("topic") String topic,
32+
@PathParam("username") String username) throws IOException {
33+
ensureTopic(topic).subscribe(username);
34+
}
35+
36+
private Queue ensureTopic(@PathParam("topic") String topic) throws IOException {
37+
Queue q;
38+
synchronized (topics) {
39+
q = topics.get(topic);
40+
if (q == null) {
41+
q = new Queue(dir, CHUNK_SIZE);
42+
topics.put(topic, q);
43+
}
44+
}
45+
return q;
46+
}
47+
48+
@Path("/{topic}/{username}")
49+
@DELETE
50+
public void unsubscribe(@PathParam("topic") String topic,
51+
@PathParam("username") String username) throws IOException {
52+
Queue q;
53+
synchronized (topics) {
54+
q = topics.get(topic);
55+
}
56+
57+
if (q == null) {
58+
throw new ClientErrorException(Response.Status.NOT_FOUND);
59+
} else {
60+
q.unsubscribe(username);
61+
}
62+
}
63+
64+
@Path("/{topic}")
65+
@POST
66+
public void publish(@PathParam("topic") String topic,
67+
@Context Request request) throws IOException {
68+
ensureTopic(topic).post(request.getInputStream());
69+
request.getInputStream().close();
70+
}
71+
72+
@Path("/{topic}/{username}")
73+
@GET
74+
public Response get(@PathParam("topic") String topic,
75+
@PathParam("username") String username) throws IOException {
76+
Queue q;
77+
synchronized (topics) {
78+
q = topics.get(topic);
79+
}
80+
81+
if (q == null) {
82+
return Response.status(Response.Status.NOT_FOUND).build();
83+
} else {
84+
DataStore.Message m = q.get(username);
85+
if (m.in == null) {
86+
return Response.noContent().build();
87+
} else {
88+
return Response.ok(m.in).build();
89+
}
90+
}
91+
}
92+
}

clusterhq/queue/src/main/java/us/hxbc/clusterhq/queue/DataStore.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ public static class Message {
133133
public final long nextLSN;
134134

135135
Message(InputStream in, long nextLSN) {
136-
this.in = requireNonNull(in);
136+
this.in = in;
137137
this.nextLSN = nextLSN;
138138
}
139139
}
@@ -144,7 +144,7 @@ public Message get(long lsn) throws IOException {
144144
long relativeLSN = lsn - baseLSN;
145145
long chunkSize;
146146
if (Files.notExists(chunk) || (chunkSize = Files.size(chunk)) < relativeLSN) {
147-
throw new ClientErrorException(Response.Status.NOT_FOUND);
147+
return new Message(null, nextLSN);
148148
}
149149

150150
FileChannel in = null;
@@ -153,7 +153,7 @@ public Message get(long lsn) throws IOException {
153153
logger.debug("seeking to {}/{} in {}", relativeLSN, baseLSN, in.size());
154154
if (in.size() <= relativeLSN) {
155155
in.close();
156-
throw new ClientErrorException(Response.Status.NOT_FOUND);
156+
return new Message(null, nextLSN);
157157
}
158158
in.position(relativeLSN);
159159
ByteBuffer buf = ByteBuffer.allocate(8);
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,35 @@
11
package us.hxbc.clusterhq.queue;
22

3+
import org.glassfish.grizzly.http.server.HttpServer;
4+
import org.glassfish.jersey.grizzly2.httpserver.GrizzlyHttpServerFactory;
5+
import org.glassfish.jersey.server.ResourceConfig;
6+
import org.slf4j.Logger;
7+
import org.slf4j.LoggerFactory;
8+
9+
import java.net.URI;
10+
import java.nio.file.Files;
11+
import java.nio.file.Path;
12+
import java.nio.file.Paths;
13+
314
public class Main {
15+
private static Logger logger = LoggerFactory.getLogger(Main.class);
16+
417
public static void main(String[] args) {
18+
if (args.length != 2) {
19+
System.err.println("Usage: Main <port> <dir>");
20+
System.exit(1);
21+
}
22+
23+
int port = Integer.parseInt(args[0]);
24+
Path dir = Paths.get(args[1]);
25+
if (!Files.isDirectory(dir)) {
26+
System.err.format("%s is not a directory\n", dir);
27+
System.exit(1);
28+
}
529

30+
HttpServer server;
31+
ResourceConfig rc = new ResourceConfig();
32+
rc.register(new Api(dir, 4096));
33+
GrizzlyHttpServerFactory.createHttpServer(URI.create("http://0.0.0.0:" + port), rc);
634
}
735
}

clusterhq/queue/src/main/java/us/hxbc/clusterhq/queue/Queue.java

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -75,16 +75,18 @@ public DataStore.Message get(String user) throws IOException {
7575

7676
synchronized (subscriber) {
7777
DataStore.Message m = dataStore.get(subscriber.nextLSN);
78-
Path p = subscriptionDir.resolve(user);
79-
try (FileChannel out = FileChannel.open(p,
80-
StandardOpenOption.WRITE,
81-
StandardOpenOption.TRUNCATE_EXISTING)) {
82-
ByteBuffer buf = ByteBuffer.allocate(8).putLong(m.nextLSN);
83-
buf.position(0);
84-
out.write(buf);
85-
out.force(true);
78+
if (m.in != null) {
79+
Path p = subscriptionDir.resolve(user);
80+
try (FileChannel out = FileChannel.open(p,
81+
StandardOpenOption.WRITE,
82+
StandardOpenOption.TRUNCATE_EXISTING)) {
83+
ByteBuffer buf = ByteBuffer.allocate(8).putLong(m.nextLSN);
84+
buf.position(0);
85+
out.write(buf);
86+
out.force(true);
87+
}
88+
subscriber.nextLSN = m.nextLSN;
8689
}
87-
subscriber.nextLSN = m.nextLSN;
8890
return m;
8991
}
9092
}

clusterhq/queue/src/test/java/us/hxbc/clusterhq/queue/DataStoreTest.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
import java.nio.file.Path;
1717

1818
import static org.assertj.core.api.Assertions.assertThat;
19-
import static org.assertj.core.api.Assertions.assertThatThrownBy;
2019

2120
public class DataStoreTest {
2221
@Rule
@@ -84,7 +83,7 @@ public void testPost2Chunks() throws Exception {
8483

8584
@Test
8685
public void testGetNothing() throws Exception {
87-
assertThatThrownBy(() -> ds.get(0)).isInstanceOf(ClientErrorException.class);
86+
assertThat(ds.get(0).in).isNull();
8887
}
8988

9089
@Test
@@ -96,17 +95,18 @@ public void testPostMany() throws Exception {
9695

9796
lsn = 0;
9897
int i = 0;
99-
try {
100-
while (true) {
101-
logger.info("retrieving lsn {}", lsn);
102-
DataStore.Message m = ds.get(lsn);
98+
while (true) {
99+
logger.info("retrieving lsn {}", lsn);
100+
DataStore.Message m = ds.get(lsn);
101+
if (m.in != null) {
103102
m.in.close();
104103
lsn = m.nextLSN;
105104
i++;
105+
} else {
106+
break;
106107
}
107-
} catch (ClientErrorException e) {
108-
assertThat(i).isEqualTo(10);
109108
}
109+
assertThat(i).isEqualTo(10);
110110
}
111111

112112
private void dumpFile(Path p) throws IOException {

clusterhq/queue/src/test/java/us/hxbc/clusterhq/queue/QueueTest.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -64,14 +64,14 @@ public void testGetNotSubscribed() throws Exception {
6464
@Test
6565
public void testGetNothing() throws Exception {
6666
queue.subscribe("foo");
67-
assertThatThrownBy(() -> queue.get("foo")).isInstanceOf(ClientErrorException.class);
67+
assertThat(queue.get("foo").in).isNull();
6868
}
6969

7070
@Test
7171
public void testGetAfterPost() throws Exception {
7272
queue.post(string2Stream("hello"));
7373
queue.subscribe("foo");
74-
assertThatThrownBy(() -> queue.get("foo")).isInstanceOf(ClientErrorException.class);
74+
assertThat(queue.get("foo").in).isNull();
7575
}
7676

7777
@Test
@@ -80,7 +80,7 @@ public void testGet() throws Exception {
8080
queue.post(string2Stream("hello"));
8181
DataStore.Message m = queue.get("foo");
8282
assertThat(stream2String(m.in)).isEqualTo("hello");
83-
assertThatThrownBy(() -> queue.get("foo")).isInstanceOf(ClientErrorException.class);
83+
assertThat(queue.get("foo").in).isNull();
8484
}
8585

8686
@Test
@@ -92,7 +92,7 @@ public void testGet2() throws Exception {
9292
assertThat(stream2String(m.in)).isEqualTo("hello");
9393
m = queue.get("foo");
9494
assertThat(stream2String(m.in)).isEqualTo("world");
95-
assertThatThrownBy(() -> queue.get("foo")).isInstanceOf(ClientErrorException.class);
95+
assertThat(queue.get("foo").in).isNull();
9696
}
9797

9898
@Test
@@ -109,7 +109,7 @@ public void testGetMany() throws Exception {
109109
m = queue.get("foo");
110110
assertThat(stream2String(m.in)).isEqualTo("world!");
111111
}
112-
assertThatThrownBy(() -> queue.get("foo")).isInstanceOf(ClientErrorException.class);
112+
assertThat(queue.get("foo").in).isNull();
113113
}
114114

115115
@Test
@@ -118,7 +118,7 @@ public void testResubscribeGet() throws Exception {
118118
queue.unsubscribe("foo");
119119
queue.post(string2Stream("hello"));
120120
queue.subscribe("foo");
121-
assertThatThrownBy(() -> queue.get("foo")).isInstanceOf(ClientErrorException.class);
121+
assertThat(queue.get("foo").in).isNull();
122122
}
123123

124124
@Test

0 commit comments

Comments
 (0)