Skip to content

Commit e9b6dc9

Browse files
committed
initial datastore implementation
1 parent 58cbda6 commit e9b6dc9

File tree

4 files changed

+310
-0
lines changed

4 files changed

+310
-0
lines changed

clusterhq/queue/.gitignore

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
.idea/
2+
target/
3+
*~
4+
# below is default github .ignore for java
5+
*.class
6+
# Mobile Tools for Java (J2ME)
7+
.mtj.tmp/
8+
# Package Files #
9+
*.jar
10+
*.war
11+
*.ear
12+
# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
13+
hs_err_pid*
14+

clusterhq/queue/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,11 @@
6565
<version>6.8.8</version>
6666
<scope>test</scope>
6767
</dependency>
68+
<dependency>
69+
<groupId>com.google.guava</groupId>
70+
<artifactId>guava</artifactId>
71+
<version>18.0</version>
72+
</dependency>
6873
</dependencies>
6974

7075
<build>
Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
package us.hxbc.clusterhq.queue;
2+
3+
import com.google.common.io.ByteStreams;
4+
import org.slf4j.Logger;
5+
import org.slf4j.LoggerFactory;
6+
7+
import javax.ws.rs.ClientErrorException;
8+
import javax.ws.rs.core.Response;
9+
import java.io.IOException;
10+
import java.io.InputStream;
11+
import java.io.StreamCorruptedException;
12+
import java.nio.BufferUnderflowException;
13+
import java.nio.ByteBuffer;
14+
import java.nio.channels.Channels;
15+
import java.nio.channels.FileChannel;
16+
import java.nio.file.Files;
17+
import java.nio.file.Path;
18+
import java.nio.file.StandardOpenOption;
19+
20+
import static java.util.Objects.requireNonNull;
21+
22+
public class DataStore {
23+
private Logger logger = LoggerFactory.getLogger(getClass());
24+
private Path dir;
25+
private final long CHUNK_SIZE;
26+
private long nextLSN;
27+
28+
DataStore(Path dir, long chunkSize) {
29+
this.dir = requireNonNull(dir);
30+
if ((chunkSize & (chunkSize - 1)) != 0) {
31+
throw new IllegalArgumentException(chunkSize + " is not power of 2");
32+
}
33+
this.CHUNK_SIZE = chunkSize;
34+
}
35+
36+
long getBaseLSN(long lsn) {
37+
return lsn & ~(CHUNK_SIZE - 1);
38+
}
39+
40+
Path getChunkPath(long baseLSN) {
41+
String name = Long.toHexString(baseLSN);
42+
return dir.resolve(name);
43+
}
44+
45+
public synchronized long post(InputStream data) throws IOException {
46+
long baseLSN = getBaseLSN(nextLSN);
47+
Path chunk = getChunkPath(baseLSN);
48+
49+
logger.debug("Writing to {}", chunk);
50+
long origSize = -1;
51+
FileChannel out = null;
52+
try {
53+
out = FileChannel.open(chunk, StandardOpenOption.CREATE,
54+
StandardOpenOption.WRITE);
55+
origSize = out.size();
56+
out.position(origSize);
57+
int nread;
58+
long total = 0;
59+
ByteBuffer buf = ByteBuffer.allocate(32 * 1024);
60+
buf.mark();
61+
buf.putLong(0); // placeholder for size of this message
62+
buf.limit(buf.position());
63+
buf.reset();
64+
int nwritten = out.write(buf);
65+
if (nwritten != 8) {
66+
throw new IllegalArgumentException(nwritten + " != 8");
67+
}
68+
buf.reset();
69+
buf.limit(buf.capacity());
70+
while ((nread = data.read(buf.array())) > 0) {
71+
buf.limit(nread);
72+
nwritten = out.write(buf);
73+
if (nwritten != nread) {
74+
throw new IllegalArgumentException(nwritten + " != " + nread);
75+
}
76+
buf.reset();
77+
total += nread;
78+
}
79+
buf.reset();
80+
buf.limit(buf.capacity());
81+
buf.putLong(total);
82+
buf.limit(buf.position());
83+
buf.reset();
84+
nwritten = out.write(buf, origSize); // write the real size
85+
if (nwritten != 8) {
86+
throw new IllegalArgumentException(nwritten + " != 8");
87+
}
88+
out.force(true);
89+
nextLSN += total + 8;
90+
if (getBaseLSN(nextLSN) != baseLSN) {
91+
// we exceeded this chunk, round this up to the next chunk
92+
nextLSN = getBaseLSN(nextLSN) + CHUNK_SIZE;
93+
}
94+
} catch (IOException e) {
95+
// truncate the file back to the original size
96+
if (out != null && origSize != -1) {
97+
out.truncate(origSize);
98+
out.force(true);
99+
}
100+
throw e;
101+
} finally {
102+
if (out != null) {
103+
out.close();
104+
}
105+
}
106+
107+
return nextLSN;
108+
}
109+
110+
public static class Message {
111+
public final InputStream in;
112+
public final long nextLSN;
113+
114+
Message(InputStream in, long nextLSN) {
115+
this.in = requireNonNull(in);
116+
this.nextLSN = nextLSN;
117+
}
118+
}
119+
120+
public Message get(long lsn) throws IOException {
121+
long baseLSN = getBaseLSN(lsn);
122+
Path chunk = getChunkPath(baseLSN);
123+
long relativeLSN = lsn - baseLSN;
124+
long chunkSize;
125+
if (Files.notExists(chunk) || (chunkSize = Files.size(chunk)) < relativeLSN) {
126+
throw new ClientErrorException(Response.Status.NOT_FOUND);
127+
}
128+
129+
FileChannel in = null;
130+
try {
131+
in = FileChannel.open(chunk, StandardOpenOption.READ);
132+
logger.debug("seeking to {}/{} in {}", relativeLSN, baseLSN, in.size());
133+
if (in.size() < relativeLSN) {
134+
in.close();
135+
throw new ClientErrorException(Response.Status.NOT_FOUND);
136+
}
137+
in.position(relativeLSN);
138+
ByteBuffer buf = ByteBuffer.allocate(8);
139+
buf.mark();
140+
in.read(buf);
141+
buf.reset();
142+
long messageSize = buf.getLong();
143+
logger.debug("message is {} bytes", messageSize);
144+
if (chunkSize < relativeLSN + 8 + messageSize) {
145+
in.close();
146+
throw new StreamCorruptedException(
147+
String.format("%s/%s is %s bytes but chunk is %s bytes", relativeLSN, lsn, messageSize, chunkSize));
148+
}
149+
150+
long nextLSN = lsn + 8 + messageSize;
151+
if (getBaseLSN(nextLSN) != baseLSN) {
152+
// we exceeded this chunk, round this up to the next chunk
153+
nextLSN = getBaseLSN(nextLSN) + CHUNK_SIZE;
154+
}
155+
return new Message(ByteStreams.limit(Channels.newInputStream(in), messageSize), nextLSN);
156+
} catch (BufferUnderflowException e) {
157+
throw new IOException(e);
158+
} catch (IOException e) {
159+
if (in != null) {
160+
in.close();
161+
}
162+
throw e;
163+
} catch (RuntimeException e) {
164+
if (in != null) {
165+
in.close();
166+
}
167+
throw e;
168+
}
169+
}
170+
}
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
package us.hxbc.clusterhq.queue;
2+
3+
import com.google.common.io.ByteStreams;
4+
import org.junit.Before;
5+
import org.junit.Rule;
6+
import org.junit.Test;
7+
import org.junit.rules.TemporaryFolder;
8+
import org.slf4j.Logger;
9+
import org.slf4j.LoggerFactory;
10+
11+
import javax.ws.rs.ClientErrorException;
12+
import java.io.ByteArrayInputStream;
13+
import java.io.IOException;
14+
import java.io.InputStream;
15+
import java.nio.file.Files;
16+
import java.nio.file.Path;
17+
18+
import static org.assertj.core.api.Assertions.assertThat;
19+
20+
public class DataStoreTest {
21+
@Rule
22+
public TemporaryFolder folder= new TemporaryFolder();
23+
private Logger logger = LoggerFactory.getLogger(getClass());
24+
private Path dir;
25+
private DataStore ds;
26+
27+
@Before
28+
public void setup() throws Exception {
29+
dir = folder.newFolder().toPath();
30+
ds = new DataStore(dir, 16);
31+
}
32+
33+
@Test
34+
public void testGetBaseLSN() throws Exception {
35+
assertThat(ds.getBaseLSN(0)).isEqualTo(0);
36+
assertThat(ds.getBaseLSN(1)).isEqualTo(0);
37+
assertThat(ds.getBaseLSN(4097)).isEqualTo(4096);
38+
}
39+
40+
@Test
41+
public void testGetChunkPath() throws Exception {
42+
assertThat(ds.getChunkPath(0).getFileName().toString()).isEqualTo("0");
43+
assertThat(ds.getChunkPath(256).getFileName().toString()).isEqualTo("100");
44+
}
45+
46+
@Test
47+
public void testPostEmpty() throws Exception {
48+
post1(new byte[0], 0);
49+
}
50+
51+
@Test
52+
public void testPost1() throws Exception {
53+
post1(new byte[]{9}, 0);
54+
}
55+
56+
@Test
57+
public void testPost2() throws Exception {
58+
long lsn = 0;
59+
lsn = post1(new byte[]{9}, lsn);
60+
lsn = post1(new byte[]{8, 9}, lsn);
61+
}
62+
63+
@Test
64+
public void testPost2Chunks() throws Exception {
65+
long lsn = 0;
66+
lsn = post1(new byte[]{9}, lsn);
67+
lsn = post1(new byte[]{8, 9}, lsn);
68+
69+
byte[] payload = new byte[] { 7 };
70+
ds.post(new ByteArrayInputStream(payload));
71+
72+
Path p = ds.getChunkPath(ds.getBaseLSN(lsn));
73+
assertThat(Files.size(p)).isEqualTo(8 + payload.length);
74+
try (InputStream in = ds.get(lsn).in) {
75+
assertThat(ByteStreams.toByteArray(in)).isEqualTo(payload);
76+
}
77+
}
78+
79+
@Test
80+
public void testPostMany() throws Exception {
81+
long lsn = 0;
82+
for (int i = 0; i < 10; i++) {
83+
lsn = post1(new byte[]{(byte) i}, lsn);
84+
}
85+
86+
lsn = 0;
87+
int i = 0;
88+
try {
89+
while (true) {
90+
logger.info("retrieving lsn {}", lsn);
91+
DataStore.Message m = ds.get(lsn);
92+
m.in.close();
93+
lsn = m.nextLSN;
94+
i++;
95+
}
96+
} catch (ClientErrorException e) {
97+
assertThat(i).isEqualTo(10);
98+
}
99+
}
100+
101+
private void dumpFile(Path p) throws IOException {
102+
byte[] bytes = ByteStreams.toByteArray(Files.newInputStream(p));
103+
for (int i = 0; i < bytes.length; i++) {
104+
logger.info("b: {}", bytes[i]);
105+
}
106+
}
107+
108+
private long post1(byte[] payload, long lsn) throws IOException {
109+
Path p = ds.getChunkPath(ds.getBaseLSN(lsn));
110+
long size = Files.exists(p) ? Files.size(p) : 0;
111+
long next = ds.post(new ByteArrayInputStream(payload));
112+
logger.info("lsn is now {}", next);
113+
assertThat(Files.size(p)).isEqualTo(size + 8 + payload.length);
114+
115+
try (InputStream in = ds.get(lsn).in) {
116+
assertThat(ByteStreams.toByteArray(in)).isEqualTo(payload);
117+
}
118+
119+
return next;
120+
}
121+
}

0 commit comments

Comments
 (0)