File tree Expand file tree Collapse file tree 4 files changed +50
-1
lines changed
main/java/us/hxbc/clusterhq/queue
test/java/us/hxbc/clusterhq/queue Expand file tree Collapse file tree 4 files changed +50
-1
lines changed Original file line number Diff line number Diff line change 128128 <jersey .version>2.18</jersey .version>
129129 <project .build.sourceEncoding>UTF-8</project .build.sourceEncoding>
130130 </properties >
131- </project >
131+ </project >
Original file line number Diff line number Diff line change 1919
2020import static java .util .Objects .requireNonNull ;
2121
22+ /**
23+ * DataStore stores messages for our queue. Logically it's a log structured
24+ * append only data store. Logs are chunked into different files so old
25+ * messages can be garbage collected relatively easily.
26+ *
27+ * each message has a LSN (log sequence number) which is a 64 bit always
28+ * incrementing number. LSN is also the logical address of the position
29+ * of the message.
30+ *
31+ * LSN is translated to physical files base on the configured chunk size.
32+ * Suppose chunk size is 4, then LSN 0 would be in a file called "0", LSN
33+ * 4 would be in a file called "1", and so on. The file names are hex encoded.
34+ *
35+ * LSN marks the beginning of the message, so if LSN 0 is a message that's
36+ * 5 bytes, it will still be in a file called "0". The next message will get
37+ * LSN 8 so that it will begin at offset 0 in the file "2".
38+ */
2239public class DataStore {
2340 private Logger logger = LoggerFactory .getLogger (getClass ());
2441 private Path dir ;
Original file line number Diff line number Diff line change 1515
1616import static java .util .Objects .requireNonNull ;
1717
18+ /**
19+ * Queue manages subscriptions. It uses DataStore to store the actual
20+ * messages. Each subscription contains the next LSN to start
21+ * retrieving messages at. This LSN is updated each time a message is
22+ * retrieved.
23+ */
1824public class Queue {
1925 private static final long CHUNK_SIZE = 4 * 1024 ; // 4KB
2026 private final Path dataDir , subscriptionDir ;
Original file line number Diff line number Diff line change @@ -95,6 +95,32 @@ public void testGet2() throws Exception {
9595 assertThatThrownBy (() -> queue .get ("foo" )).isInstanceOf (ClientErrorException .class );
9696 }
9797
98+ @ Test
99+ public void testGetMany () throws Exception {
100+ queue .subscribe ("foo" );
101+ for (int i = 0 ; i < 10 ; i ++) {
102+ queue .post (string2Stream ("hello, " ));
103+ queue .post (string2Stream ("world!" ));
104+ }
105+ DataStore .Message m ;
106+ for (int i = 0 ; i < 10 ; i ++) {
107+ m = queue .get ("foo" );
108+ assertThat (stream2String (m .in )).isEqualTo ("hello, " );
109+ m = queue .get ("foo" );
110+ assertThat (stream2String (m .in )).isEqualTo ("world!" );
111+ }
112+ assertThatThrownBy (() -> queue .get ("foo" )).isInstanceOf (ClientErrorException .class );
113+ }
114+
115+ @ Test
116+ public void testResubscribeGet () throws Exception {
117+ queue .subscribe ("foo" );
118+ queue .unsubscribe ("foo" );
119+ queue .post (string2Stream ("hello" ));
120+ queue .subscribe ("foo" );
121+ assertThatThrownBy (() -> queue .get ("foo" )).isInstanceOf (ClientErrorException .class );
122+ }
123+
98124 @ Test
99125 public void testUnsubscribe () throws Exception {
100126 queue .subscribe ("foo" );
You can’t perform that action at this time.
0 commit comments