Skip to content

Commit 82c1f22

Browse files
committed
plumbed GC through Queue
Queue will GC with the min LSN for all subscribers of a topic
1 parent 600a92f commit 82c1f22

File tree

3 files changed

+91
-5
lines changed

3 files changed

+91
-5
lines changed

clusterhq/queue/src/main/java/us/hxbc/clusterhq/queue/Api.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import org.glassfish.grizzly.http.server.Request;
44

5-
import javax.ws.rs.ClientErrorException;
65
import javax.ws.rs.DELETE;
76
import javax.ws.rs.GET;
87
import javax.ws.rs.POST;
@@ -45,6 +44,7 @@ private Queue ensureTopic(@PathParam("topic") String topic) throws IOException {
4544
Files.createDirectory(p);
4645
q = new Queue(p, CHUNK_SIZE);
4746
topics.put(topic, q);
47+
q.spawnGCThread();
4848
}
4949
}
5050
return q;

clusterhq/queue/src/main/java/us/hxbc/clusterhq/queue/Queue.java

Lines changed: 45 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import java.io.IOException;
66
import java.io.InputStream;
77
import java.io.OutputStream;
8+
import java.io.StreamCorruptedException;
89
import java.nio.ByteBuffer;
910
import java.nio.channels.FileChannel;
1011
import java.nio.file.Files;
@@ -26,6 +27,8 @@ public class Queue {
2627
private final Path dataDir, subscriptionDir;
2728
private final Map<String, Subscriber> subscriptions = new HashMap<>();
2829
private final DataStore dataStore;
30+
private final Thread gcThread;
31+
private long minLSN = 0;
2932

3033
public Queue(Path dir, long chunkSize) throws IOException {
3134
requireNonNull(dir);
@@ -38,6 +41,16 @@ public Queue(Path dir, long chunkSize) throws IOException {
3841
subscriptionDir = dir.resolve("subscriptions");
3942
Files.createDirectory(subscriptionDir);
4043
dataStore = new DataStore(dataDir, chunkSize);
44+
45+
gcThread = new Thread(() -> {
46+
while (true) {
47+
gcNow();
48+
try {
49+
Thread.sleep(10000);
50+
} catch (InterruptedException e) {
51+
}
52+
}
53+
});
4154
}
4255

4356
public void subscribe(String user) throws IOException {
@@ -73,9 +86,13 @@ public DataStore.Message get(String user) throws IOException {
7386
}
7487
}
7588

76-
synchronized (subscriber) {
77-
DataStore.Message m = dataStore.get(subscriber.nextLSN);
78-
if (m.in != null) {
89+
DataStore.Message m = dataStore.get(subscriber.nextLSN);
90+
if (m.in != null) {
91+
if (m.nextLSN <= subscriber.nextLSN) {
92+
throw new StreamCorruptedException(m.nextLSN + " <= " + subscriber.nextLSN);
93+
}
94+
95+
synchronized (subscriber) {
7996
Path p = subscriptionDir.resolve(user);
8097
try (FileChannel out = FileChannel.open(p,
8198
StandardOpenOption.WRITE,
@@ -87,8 +104,9 @@ public DataStore.Message get(String user) throws IOException {
87104
}
88105
subscriber.nextLSN = m.nextLSN;
89106
}
90-
return m;
91107
}
108+
109+
return m;
92110
}
93111

94112
public void unsubscribe(String user) throws IOException {
@@ -101,6 +119,29 @@ public void unsubscribe(String user) throws IOException {
101119
}
102120
}
103121

122+
void spawnGCThread() {
123+
gcThread.start();
124+
}
125+
126+
synchronized void gcNow() {
127+
long curMinLSN = Long.MAX_VALUE;
128+
129+
synchronized (subscriptions) {
130+
for (Subscriber s : subscriptions.values()) {
131+
synchronized (s) {
132+
if (s.nextLSN < curMinLSN) {
133+
curMinLSN = s.nextLSN;
134+
}
135+
}
136+
}
137+
}
138+
139+
if (curMinLSN > minLSN) {
140+
dataStore.gc(curMinLSN);
141+
minLSN = curMinLSN;
142+
}
143+
}
144+
104145
static class Subscriber {
105146
final String name;
106147
long nextLSN;

clusterhq/queue/src/test/java/us/hxbc/clusterhq/queue/QueueTest.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,4 +128,49 @@ public void testUnsubscribe() throws Exception {
128128
Path p = dir.resolve("subscriptions").resolve("foo");
129129
assertThat(Files.exists(p)).isFalse();
130130
}
131+
132+
@Test
133+
public void testGCNone() throws Exception {
134+
queue.subscribe("foo");
135+
queue.post(string2Stream("hello"));
136+
queue.gcNow();
137+
assertThat(stream2String(queue.get("foo").in)).isEqualTo("hello");
138+
}
139+
140+
@Test
141+
public void testGCUnsubscribed() throws Exception {
142+
queue.subscribe("foo");
143+
queue.post(string2Stream("hello"));
144+
queue.post(string2Stream("world"));
145+
queue.unsubscribe("foo");
146+
assertThat(Files.list(dir.resolve("data")).count()).isEqualTo(1);
147+
queue.gcNow();
148+
assertThat(Files.list(dir.resolve("data")).count()).isEqualTo(0);
149+
}
150+
151+
@Test
152+
public void testGCMsgGot() throws Exception {
153+
queue.subscribe("foo");
154+
queue.post(string2Stream("hello"));
155+
queue.post(string2Stream("world"));
156+
assertThat(stream2String(queue.get("foo").in)).isEqualTo("hello");
157+
assertThat(stream2String(queue.get("foo").in)).isEqualTo("world");
158+
assertThat(Files.list(dir.resolve("data")).count()).isEqualTo(1);
159+
queue.gcNow();
160+
assertThat(Files.list(dir.resolve("data")).count()).isEqualTo(0);
161+
}
162+
163+
@Test
164+
public void testGcNone2Users() throws Exception {
165+
queue.subscribe("foo");
166+
queue.subscribe("bar");
167+
queue.post(string2Stream("hello"));
168+
queue.post(string2Stream("world"));
169+
assertThat(stream2String(queue.get("foo").in)).isEqualTo("hello");
170+
assertThat(stream2String(queue.get("foo").in)).isEqualTo("world");
171+
assertThat(stream2String(queue.get("bar").in)).isEqualTo("hello");
172+
assertThat(Files.list(dir.resolve("data")).count()).isEqualTo(1);
173+
queue.gcNow();
174+
assertThat(Files.list(dir.resolve("data")).count()).isEqualTo(1);
175+
}
131176
}

0 commit comments

Comments
 (0)