Skip to content

Commit b02e170

Browse files
committed
restart support
1 parent 2997ff9 commit b02e170

File tree

6 files changed

+126
-9
lines changed

6 files changed

+126
-9
lines changed

clusterhq/queue/src/main/java/us/hxbc/clusterhq/queue/Api.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import java.util.HashMap;
1515
import java.util.Map;
1616

17+
import static com.google.common.base.Throwables.propagate;
1718
import static java.util.Objects.requireNonNull;
1819

1920
@Path("/")
@@ -22,9 +23,26 @@ public class Api {
2223
private final long CHUNK_SIZE;
2324
private Map<String, Queue> topics = new HashMap<>();
2425

25-
Api(java.nio.file.Path dir, long chunkSize) {
26+
Api(java.nio.file.Path dir, long chunkSize) throws IOException {
2627
this.dir = requireNonNull(dir);
2728
this.CHUNK_SIZE = chunkSize;
29+
init();
30+
}
31+
32+
private void init() throws IOException {
33+
Files.list(dir)
34+
.filter(p -> Files.isDirectory(p.resolve("data")) &&
35+
Files.isDirectory(p.resolve("subscriptions")))
36+
.forEach(p -> {
37+
try {
38+
String name = p.getFileName().toString();
39+
Queue q = new Queue(p, CHUNK_SIZE);
40+
topics.put(name, q);
41+
q.spawnGCThread();
42+
} catch (IOException e) {
43+
throw propagate(e);
44+
}
45+
});
2846
}
2947

3048
@Path("/{topic}/{username}")

clusterhq/queue/src/main/java/us/hxbc/clusterhq/queue/DataStore.java

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,6 @@
44
import org.slf4j.Logger;
55
import org.slf4j.LoggerFactory;
66

7-
import javax.ws.rs.ClientErrorException;
8-
import javax.ws.rs.core.Response;
97
import java.io.IOException;
108
import java.io.InputStream;
119
import java.io.StreamCorruptedException;
@@ -16,6 +14,7 @@
1614
import java.nio.file.Files;
1715
import java.nio.file.Path;
1816
import java.nio.file.StandardOpenOption;
17+
import java.util.OptionalLong;
1918

2019
import static java.util.Objects.requireNonNull;
2120

@@ -46,12 +45,43 @@ public class DataStore {
4645
private final long CHUNK_SIZE;
4746
private long nextLSN;
4847

49-
DataStore(Path dir, long chunkSize) {
48+
DataStore(Path dir, long chunkSize) throws IOException {
49+
this(dir, chunkSize, 0);
50+
}
51+
52+
DataStore(Path dir, long chunkSize, long baseLSN) throws IOException {
5053
this.dir = requireNonNull(dir);
5154
if ((chunkSize & (chunkSize - 1)) != 0) {
5255
throw new IllegalArgumentException(chunkSize + " is not power of 2");
5356
}
5457
this.CHUNK_SIZE = chunkSize;
58+
this.nextLSN = baseLSN;
59+
init();
60+
}
61+
62+
private void init() throws IOException {
63+
OptionalLong lsn = Files.list(dir)
64+
.map(p -> p.getFileName().toString())
65+
.mapToLong(s -> Long.parseLong(s, 16))
66+
.max();
67+
if (lsn.isPresent()) {
68+
Path p = dir.resolve(Long.toHexString(lsn.getAsLong()));
69+
long size = Files.size(p);
70+
long foundLSN = lsn.getAsLong() + size;
71+
if (foundLSN < nextLSN) {
72+
throw new StreamCorruptedException(foundLSN + " < " + nextLSN);
73+
}
74+
if (size > CHUNK_SIZE) {
75+
foundLSN = getBaseLSN(foundLSN) + CHUNK_SIZE;
76+
}
77+
nextLSN = foundLSN;
78+
} else {
79+
if (nextLSN % CHUNK_SIZE != 0) {
80+
nextLSN = getBaseLSN(nextLSN) + CHUNK_SIZE;
81+
}
82+
}
83+
84+
logger.info("discovered LSN {}", nextLSN);
5585
}
5686

5787
long getBaseLSN(long lsn) {

clusterhq/queue/src/main/java/us/hxbc/clusterhq/queue/Main.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ public class Main {
1717
private static Logger logger = LoggerFactory.getLogger(Main.class);
1818
final HttpServer server;
1919

20-
Main(int port, Path dir) {
20+
Main(int port, Path dir) throws IOException {
2121
ResourceConfig rc = new ResourceConfig();
2222
rc.registerInstances(new Api(dir, 4096));
2323
if (logger.isDebugEnabled()) {

clusterhq/queue/src/main/java/us/hxbc/clusterhq/queue/Queue.java

Lines changed: 55 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import java.util.HashMap;
1818
import java.util.Map;
1919

20+
import static com.google.common.base.Throwables.propagate;
2021
import static java.util.Objects.requireNonNull;
2122

2223
/**
@@ -33,6 +34,7 @@ public class Queue {
3334
private final DataStore dataStore;
3435
private final Thread gcThread;
3536
private long minLSN = 0;
37+
private boolean shutdown = false;
3638

3739
public Queue(Path dir, long chunkSize) throws IOException {
3840
requireNonNull(dir);
@@ -41,20 +43,59 @@ public Queue(Path dir, long chunkSize) throws IOException {
4143
}
4244

4345
dataDir = dir.resolve("data");
44-
Files.createDirectory(dataDir);
46+
if (!Files.isDirectory(dataDir)) {
47+
Files.createDirectory(dataDir);
48+
}
4549
subscriptionDir = dir.resolve("subscriptions");
46-
Files.createDirectory(subscriptionDir);
47-
dataStore = new DataStore(dataDir, chunkSize);
48-
50+
if (!Files.isDirectory(subscriptionDir)) {
51+
Files.createDirectory(subscriptionDir);
52+
}
4953
gcThread = new Thread(() -> {
5054
while (true) {
55+
synchronized (this) {
56+
if (shutdown) {
57+
break;
58+
}
59+
}
60+
5161
gcNow();
5262
try {
5363
Thread.sleep(10000);
5464
} catch (InterruptedException e) {
5565
}
5666
}
5767
});
68+
69+
init();
70+
long maxLSN = 0;
71+
synchronized (subscriptions) {
72+
for (Subscriber s : subscriptions.values()) {
73+
synchronized (s) {
74+
if (s.nextLSN > maxLSN) {
75+
maxLSN = s.nextLSN;
76+
}
77+
}
78+
}
79+
}
80+
81+
dataStore = new DataStore(dataDir, chunkSize, maxLSN);
82+
}
83+
84+
private void init() throws IOException {
85+
Files.list(subscriptionDir).forEach(p -> {
86+
String name = p.getFileName().toString();
87+
try (FileChannel in = FileChannel.open(p,
88+
StandardOpenOption.READ)) {
89+
ByteBuffer buf = ByteBuffer.allocate(8);
90+
in.read(buf);
91+
buf.position(0);
92+
long nextLSN = buf.getLong();
93+
subscriptions.put(name, new Subscriber(name, nextLSN));
94+
logger.info("discovered subscriber {} @ {}", name, nextLSN);
95+
} catch (IOException e) {
96+
throw propagate(e);
97+
}
98+
});
5899
}
59100

60101
public void subscribe(String user) throws IOException {
@@ -124,9 +165,19 @@ public void unsubscribe(String user) throws IOException {
124165
}
125166

126167
void spawnGCThread() {
168+
shutdown = false;
127169
gcThread.start();
128170
}
129171

172+
void stop() {
173+
shutdown = true;
174+
try {
175+
gcThread.join();
176+
} catch (InterruptedException e) {
177+
e.printStackTrace();
178+
}
179+
}
180+
130181
synchronized void gcNow() {
131182
long curMinLSN = dataStore.getNextLSN();
132183

clusterhq/queue/src/test/java/us/hxbc/clusterhq/queue/DataStoreTest.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,13 @@ public void testGCNone() throws Exception {
137137
assertThat(Files.list(dir).count()).isEqualTo(1);
138138
}
139139

140+
@Test
141+
public void testRestart() throws Exception {
142+
long lsn = post1(new byte[]{9}, 0);
143+
ds = new DataStore(dir, 16);
144+
assertThat(ds.getNextLSN()).isEqualTo(lsn);
145+
}
146+
140147
private void dumpFile(Path p) throws IOException {
141148
byte[] bytes = ByteStreams.toByteArray(Files.newInputStream(p));
142149
for (int i = 0; i < bytes.length; i++) {

clusterhq/queue/src/test/java/us/hxbc/clusterhq/queue/QueueTest.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,4 +183,15 @@ public void testGcAgain() throws Exception {
183183
queue.gcNow();
184184
assertThat(Files.list(dir.resolve("data")).count()).isEqualTo(0);
185185
}
186+
187+
@Test
188+
public void testRestart() throws Exception {
189+
queue.subscribe("foo");
190+
queue.post(string2Stream("hello"));
191+
queue.post(string2Stream("world"));
192+
assertThat(stream2String(queue.get("foo").in)).isEqualTo("hello");
193+
queue = new Queue(dir, 16);
194+
assertThat(stream2String(queue.get("foo").in)).isEqualTo("world");
195+
queue.unsubscribe("foo");
196+
}
186197
}

0 commit comments

Comments
 (0)