Skip to content

Commit 6009154

Browse files
committed
initial queue implementation
1 parent e9b6dc9 commit 6009154

File tree

5 files changed

+264
-2
lines changed

5 files changed

+264
-2
lines changed

clusterhq/queue/queue.iml

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
<?xml version="1.0" encoding="UTF-8"?>
22
<module org.jetbrains.idea.maven.project.MavenProjectsManager.isMavenModule="true" type="JAVA_MODULE" version="4">
3-
<component name="NewModuleRootManager" LANGUAGE_LEVEL="JDK_1_5" inherit-compiler-output="false">
3+
<component name="NewModuleRootManager" LANGUAGE_LEVEL="JDK_1_8" inherit-compiler-output="false">
44
<output url="file://$MODULE_DIR$/target/classes" />
55
<output-test url="file://$MODULE_DIR$/target/test-classes" />
66
<content url="file://$MODULE_DIR$">
@@ -11,5 +11,42 @@
1111
</content>
1212
<orderEntry type="inheritedJdk" />
1313
<orderEntry type="sourceFolder" forTests="false" />
14+
<orderEntry type="library" name="Maven: org.glassfish.jersey.containers:jersey-container-grizzly2-http:2.18" level="project" />
15+
<orderEntry type="library" name="Maven: org.glassfish.hk2.external:javax.inject:2.4.0-b12" level="project" />
16+
<orderEntry type="library" name="Maven: org.glassfish.grizzly:grizzly-http-server:2.3.19" level="project" />
17+
<orderEntry type="library" name="Maven: org.glassfish.grizzly:grizzly-http:2.3.19" level="project" />
18+
<orderEntry type="library" name="Maven: org.glassfish.grizzly:grizzly-framework:2.3.19" level="project" />
19+
<orderEntry type="library" name="Maven: org.glassfish.jersey.core:jersey-common:2.18" level="project" />
20+
<orderEntry type="library" name="Maven: javax.annotation:javax.annotation-api:1.2" level="project" />
21+
<orderEntry type="library" name="Maven: org.glassfish.jersey.bundles.repackaged:jersey-guava:2.18" level="project" />
22+
<orderEntry type="library" name="Maven: org.glassfish.hk2:hk2-api:2.4.0-b12" level="project" />
23+
<orderEntry type="library" name="Maven: org.glassfish.hk2:hk2-utils:2.4.0-b12" level="project" />
24+
<orderEntry type="library" name="Maven: org.glassfish.hk2.external:aopalliance-repackaged:2.4.0-b12" level="project" />
25+
<orderEntry type="library" name="Maven: org.glassfish.hk2:hk2-locator:2.4.0-b12" level="project" />
26+
<orderEntry type="library" name="Maven: org.javassist:javassist:3.18.1-GA" level="project" />
27+
<orderEntry type="library" name="Maven: org.glassfish.hk2:osgi-resource-locator:1.0.1" level="project" />
28+
<orderEntry type="library" name="Maven: org.glassfish.jersey.core:jersey-server:2.18" level="project" />
29+
<orderEntry type="library" name="Maven: org.glassfish.jersey.core:jersey-client:2.18" level="project" />
30+
<orderEntry type="library" name="Maven: org.glassfish.jersey.media:jersey-media-jaxb:2.18" level="project" />
31+
<orderEntry type="library" name="Maven: javax.validation:validation-api:1.1.0.Final" level="project" />
32+
<orderEntry type="library" name="Maven: javax.ws.rs:javax.ws.rs-api:2.0.1" level="project" />
33+
<orderEntry type="library" name="Maven: org.glassfish.jersey.media:jersey-media-json-jackson:2.18" level="project" />
34+
<orderEntry type="library" name="Maven: org.glassfish.jersey.ext:jersey-entity-filtering:2.18" level="project" />
35+
<orderEntry type="library" name="Maven: com.fasterxml.jackson.jaxrs:jackson-jaxrs-base:2.5.1" level="project" />
36+
<orderEntry type="library" name="Maven: com.fasterxml.jackson.core:jackson-core:2.5.1" level="project" />
37+
<orderEntry type="library" name="Maven: com.fasterxml.jackson.core:jackson-databind:2.5.1" level="project" />
38+
<orderEntry type="library" name="Maven: com.fasterxml.jackson.jaxrs:jackson-jaxrs-json-provider:2.5.1" level="project" />
39+
<orderEntry type="library" name="Maven: com.fasterxml.jackson.module:jackson-module-jaxb-annotations:2.5.1" level="project" />
40+
<orderEntry type="library" name="Maven: com.fasterxml.jackson.core:jackson-annotations:2.5.1" level="project" />
41+
<orderEntry type="library" scope="TEST" name="Maven: junit:junit:4.12" level="project" />
42+
<orderEntry type="library" scope="TEST" name="Maven: org.hamcrest:hamcrest-core:1.3" level="project" />
43+
<orderEntry type="library" name="Maven: ch.qos.logback:logback-classic:1.1.3" level="project" />
44+
<orderEntry type="library" name="Maven: ch.qos.logback:logback-core:1.1.3" level="project" />
45+
<orderEntry type="library" name="Maven: org.slf4j:slf4j-api:1.7.7" level="project" />
46+
<orderEntry type="library" scope="TEST" name="Maven: org.assertj:assertj-core:3.2.0" level="project" />
47+
<orderEntry type="library" scope="TEST" name="Maven: org.testng:testng:6.8.8" level="project" />
48+
<orderEntry type="library" scope="TEST" name="Maven: org.beanshell:bsh:2.0b4" level="project" />
49+
<orderEntry type="library" scope="TEST" name="Maven: com.beust:jcommander:1.27" level="project" />
50+
<orderEntry type="library" name="Maven: com.google.guava:guava:18.0" level="project" />
1451
</component>
1552
</module>

clusterhq/queue/src/main/java/us/hxbc/clusterhq/queue/DataStore.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,10 @@ Path getChunkPath(long baseLSN) {
4242
return dir.resolve(name);
4343
}
4444

45+
public long getNextLSN() {
46+
return nextLSN;
47+
}
48+
4549
public synchronized long post(InputStream data) throws IOException {
4650
long baseLSN = getBaseLSN(nextLSN);
4751
Path chunk = getChunkPath(baseLSN);
@@ -130,7 +134,7 @@ public Message get(long lsn) throws IOException {
130134
try {
131135
in = FileChannel.open(chunk, StandardOpenOption.READ);
132136
logger.debug("seeking to {}/{} in {}", relativeLSN, baseLSN, in.size());
133-
if (in.size() < relativeLSN) {
137+
if (in.size() <= relativeLSN) {
134138
in.close();
135139
throw new ClientErrorException(Response.Status.NOT_FOUND);
136140
}
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
package us.hxbc.clusterhq.queue;
2+
3+
import javax.ws.rs.ClientErrorException;
4+
import javax.ws.rs.core.Response;
5+
import java.io.IOException;
6+
import java.io.InputStream;
7+
import java.io.OutputStream;
8+
import java.nio.ByteBuffer;
9+
import java.nio.channels.FileChannel;
10+
import java.nio.file.Files;
11+
import java.nio.file.Path;
12+
import java.nio.file.StandardOpenOption;
13+
import java.util.HashMap;
14+
import java.util.Map;
15+
16+
import static java.util.Objects.requireNonNull;
17+
18+
public class Queue {
19+
private static final long CHUNK_SIZE = 4 * 1024; // 4KB
20+
private final Path dataDir, subscriptionDir;
21+
private final Map<String, Subscriber> subscriptions = new HashMap<>();
22+
private final DataStore dataStore;
23+
24+
public Queue(Path dir, long chunkSize) throws IOException {
25+
requireNonNull(dir);
26+
if (!Files.isDirectory(dir)) {
27+
throw new IllegalArgumentException(dir + " is not a directory");
28+
}
29+
30+
dataDir = dir.resolve("data");
31+
Files.createDirectory(dataDir);
32+
subscriptionDir = dir.resolve("subscriptions");
33+
Files.createDirectory(subscriptionDir);
34+
dataStore = new DataStore(dataDir, chunkSize);
35+
}
36+
37+
public void subscribe(String user) throws IOException {
38+
synchronized (subscriptions) {
39+
if (subscriptions.containsKey(user)) {
40+
return;
41+
}
42+
Path p = subscriptionDir.resolve(user);
43+
long nextLSN = dataStore.getNextLSN();
44+
try (FileChannel out = FileChannel.open(p,
45+
StandardOpenOption.CREATE,
46+
StandardOpenOption.WRITE,
47+
StandardOpenOption.TRUNCATE_EXISTING)) {
48+
ByteBuffer buf = ByteBuffer.allocate(8).putLong(nextLSN);
49+
buf.position(0);
50+
out.write(buf);
51+
out.force(true);
52+
}
53+
subscriptions.put(user, new Subscriber(user, nextLSN));
54+
}
55+
}
56+
57+
public void post(InputStream data) throws IOException {
58+
dataStore.post(data);
59+
}
60+
61+
public DataStore.Message get(String user) throws IOException {
62+
Subscriber subscriber;
63+
synchronized (subscriptions) {
64+
subscriber = subscriptions.get(user);
65+
if (subscriber == null) {
66+
throw new ClientErrorException(Response.Status.NOT_FOUND);
67+
}
68+
}
69+
70+
synchronized (subscriber) {
71+
DataStore.Message m = dataStore.get(subscriber.nextLSN);
72+
Path p = subscriptionDir.resolve(user);
73+
try (FileChannel out = FileChannel.open(p,
74+
StandardOpenOption.WRITE,
75+
StandardOpenOption.TRUNCATE_EXISTING)) {
76+
ByteBuffer buf = ByteBuffer.allocate(8).putLong(m.nextLSN);
77+
buf.position(0);
78+
out.write(buf);
79+
out.force(true);
80+
}
81+
subscriber.nextLSN = m.nextLSN;
82+
return m;
83+
}
84+
}
85+
86+
public void unsubscribe(String user) throws IOException {
87+
synchronized (subscriptions) {
88+
Path p = subscriptionDir.resolve(user);
89+
if (!Files.deleteIfExists(p)) {
90+
throw new ClientErrorException(Response.Status.NOT_FOUND);
91+
}
92+
subscriptions.remove(user);
93+
}
94+
}
95+
96+
static class Subscriber {
97+
final String name;
98+
long nextLSN;
99+
100+
Subscriber(String name, long nextLSN) {
101+
this.name = requireNonNull(name);
102+
this.nextLSN = nextLSN;
103+
}
104+
}
105+
}

clusterhq/queue/src/test/java/us/hxbc/clusterhq/queue/DataStoreTest.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import java.nio.file.Path;
1717

1818
import static org.assertj.core.api.Assertions.assertThat;
19+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
1920

2021
public class DataStoreTest {
2122
@Rule
@@ -46,18 +47,23 @@ public void testGetChunkPath() throws Exception {
4647
@Test
4748
public void testPostEmpty() throws Exception {
4849
post1(new byte[0], 0);
50+
assertThat(ds.getNextLSN()).isEqualTo(8);
4951
}
5052

5153
@Test
5254
public void testPost1() throws Exception {
5355
post1(new byte[]{9}, 0);
56+
assertThat(ds.getNextLSN()).isEqualTo(9);
5457
}
5558

5659
@Test
5760
public void testPost2() throws Exception {
5861
long lsn = 0;
5962
lsn = post1(new byte[]{9}, lsn);
6063
lsn = post1(new byte[]{8, 9}, lsn);
64+
65+
// should wrap to next chunk
66+
assertThat(ds.getNextLSN()).isEqualTo(32);
6167
}
6268

6369
@Test
@@ -76,6 +82,11 @@ public void testPost2Chunks() throws Exception {
7682
}
7783
}
7884

85+
@Test
86+
public void testGetNothing() throws Exception {
87+
assertThatThrownBy(() -> ds.get(0)).isInstanceOf(ClientErrorException.class);
88+
}
89+
7990
@Test
8091
public void testPostMany() throws Exception {
8192
long lsn = 0;
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
package us.hxbc.clusterhq.queue;
2+
3+
import com.google.common.io.ByteStreams;
4+
import org.junit.Before;
5+
import org.junit.Rule;
6+
import org.junit.Test;
7+
import org.junit.rules.TemporaryFolder;
8+
import org.slf4j.Logger;
9+
import org.slf4j.LoggerFactory;
10+
11+
import javax.ws.rs.ClientErrorException;
12+
import java.io.ByteArrayInputStream;
13+
import java.io.IOException;
14+
import java.io.InputStream;
15+
import java.nio.file.Files;
16+
import java.nio.file.Path;
17+
18+
import static org.assertj.core.api.Assertions.assertThat;
19+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
20+
21+
public class QueueTest {
22+
@Rule
23+
public TemporaryFolder folder= new TemporaryFolder();
24+
private Logger logger = LoggerFactory.getLogger(getClass());
25+
private Path dir;
26+
private Queue queue;
27+
28+
@Before
29+
public void setup() throws Exception {
30+
dir = folder.newFolder().toPath();
31+
queue = new Queue(dir, 16);
32+
}
33+
34+
@Test
35+
public void testUnSubscribeNotSubscribed() throws Exception {
36+
assertThatThrownBy(() -> queue.unsubscribe("foo")).isInstanceOf(ClientErrorException.class);
37+
}
38+
39+
@Test
40+
public void testSubscribe() throws Exception {
41+
queue.subscribe("foo");
42+
Path p = dir.resolve("subscriptions").resolve("foo");
43+
assertThat(Files.exists(p)).isTrue();
44+
}
45+
46+
private InputStream string2Stream(String str) {
47+
return new ByteArrayInputStream(str.getBytes());
48+
}
49+
50+
private String stream2String(InputStream in) throws IOException {
51+
try {
52+
return new String(ByteStreams.toByteArray(in));
53+
} finally {
54+
in.close();
55+
}
56+
}
57+
58+
@Test
59+
public void testGetNotSubscribed() throws Exception {
60+
queue.post(string2Stream("hello"));
61+
assertThatThrownBy(() -> queue.get("foo")).isInstanceOf(ClientErrorException.class);
62+
}
63+
64+
@Test
65+
public void testGetNothing() throws Exception {
66+
queue.subscribe("foo");
67+
assertThatThrownBy(() -> queue.get("foo")).isInstanceOf(ClientErrorException.class);
68+
}
69+
70+
@Test
71+
public void testGetAfterPost() throws Exception {
72+
queue.post(string2Stream("hello"));
73+
queue.subscribe("foo");
74+
assertThatThrownBy(() -> queue.get("foo")).isInstanceOf(ClientErrorException.class);
75+
}
76+
77+
@Test
78+
public void testGet() throws Exception {
79+
queue.subscribe("foo");
80+
queue.post(string2Stream("hello"));
81+
DataStore.Message m = queue.get("foo");
82+
assertThat(stream2String(m.in)).isEqualTo("hello");
83+
assertThatThrownBy(() -> queue.get("foo")).isInstanceOf(ClientErrorException.class);
84+
}
85+
86+
@Test
87+
public void testGet2() throws Exception {
88+
queue.subscribe("foo");
89+
queue.post(string2Stream("hello"));
90+
queue.post(string2Stream("world"));
91+
DataStore.Message m = queue.get("foo");
92+
assertThat(stream2String(m.in)).isEqualTo("hello");
93+
m = queue.get("foo");
94+
assertThat(stream2String(m.in)).isEqualTo("world");
95+
assertThatThrownBy(() -> queue.get("foo")).isInstanceOf(ClientErrorException.class);
96+
}
97+
98+
@Test
99+
public void testUnsubscribe() throws Exception {
100+
queue.subscribe("foo");
101+
queue.unsubscribe("foo");
102+
Path p = dir.resolve("subscriptions").resolve("foo");
103+
assertThat(Files.exists(p)).isFalse();
104+
}
105+
}

0 commit comments

Comments
 (0)