Skip to content

Commit 657fcfe

Browse files
committed
pq: add support for Batch#unread
1 parent d5cc8d6 commit 657fcfe

File tree

5 files changed

+130
-10
lines changed

5 files changed

+130
-10
lines changed

logstash-core/src/main/java/org/logstash/ackedqueue/Batch.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,12 @@ public void close() throws IOException {
6565
}
6666
}
6767

68+
public void unread() throws IOException {
69+
if (firstSeqNum >= 0L) {
70+
this.queue.unread(firstSeqNum, elements.size());
71+
}
72+
}
73+
6874
public int size() {
6975
return elements.size();
7076
}
@@ -77,6 +83,14 @@ public Queue getQueue() {
7783
return queue;
7884
}
7985

86+
long firstSeqNum() {
87+
return firstSeqNum;
88+
}
89+
90+
long lastSeqNum() {
91+
return firstSeqNum + elements.size();
92+
}
93+
8094
/**
8195
*
8296
* @param serialized Collection of serialized elements

logstash-core/src/main/java/org/logstash/ackedqueue/Page.java

Lines changed: 41 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ public final class Page implements Closeable {
3636
final int pageNum;
3737
private long minSeqNum; // TODO: see if we can make it final?
3838
private int elementCount;
39-
private long firstUnreadSeqNum;
4039
private final Queue queue;
4140
private final PageIO pageIO;
4241
private boolean writable;
@@ -45,41 +44,62 @@ public final class Page implements Closeable {
4544
// TODO: go steal LocalCheckpointService in feature/seq_no from ES
4645
// TODO: https://github.com/elastic/elasticsearch/blob/feature/seq_no/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointService.java
4746
private final BitSet ackedSeqNums;
47+
private final BitSet readSeqNums;
4848
private Checkpoint lastCheckpoint;
4949

50+
// Use {@link PageFactory}
5051
Page(int pageNum, Queue queue, long minSeqNum, int elementCount, long firstUnreadSeqNum, BitSet ackedSeqNums, @NotNull PageIO pageIO, boolean writable) {
5152
this.pageNum = pageNum;
5253
this.queue = queue;
5354

5455
this.minSeqNum = minSeqNum;
5556
this.elementCount = elementCount;
56-
this.firstUnreadSeqNum = firstUnreadSeqNum;
5757
this.ackedSeqNums = ackedSeqNums;
58+
this.readSeqNums = readSeqNums(minSeqNum, firstUnreadSeqNum, elementCount);
5859
this.lastCheckpoint = new Checkpoint(0, 0, 0, 0, 0);
5960
this.pageIO = pageIO;
6061
this.writable = writable;
6162

6263
assert this.pageIO != null : "invalid null pageIO";
6364
}
6465

66+
private static BitSet readSeqNums(final long minSeqNum, final long firstUnreadSeqNum, final int elementCount) {
67+
final BitSet bitSet = new BitSet(elementCount);
68+
if (firstUnreadSeqNum > minSeqNum) {
69+
bitSet.set(0, (int) (firstUnreadSeqNum - minSeqNum));
70+
}
71+
return bitSet;
72+
}
73+
6574
public String toString() {
66-
return "pageNum=" + this.pageNum + ", minSeqNum=" + this.minSeqNum + ", elementCount=" + this.elementCount + ", firstUnreadSeqNum=" + this.firstUnreadSeqNum;
75+
return "pageNum=" + this.pageNum +
76+
", minSeqNum=" + this.minSeqNum +
77+
", elementCount=" + this.elementCount +
78+
", firstUnreadSeqNum=" + (this.minSeqNum + this.readSeqNums.nextClearBit(0));
6779
}
6880

6981
/**
70-
* @param limit the maximum number of elements to read, actual number readcan be smaller
82+
* @param limit the maximum number of elements to read, actual number read can be smaller
7183
* @return {@link SequencedList} collection of serialized elements read
7284
* @throws IOException if an IO error occurs
7385
*/
7486
public SequencedList<byte[]> read(int limit) throws IOException {
7587
// first make sure this page is activated, activating previously activated is harmless
7688
this.pageIO.activate();
7789

78-
SequencedList<byte[]> serialized = this.pageIO.read(this.firstUnreadSeqNum, limit);
79-
assert serialized.getSeqNums().get(0) == this.firstUnreadSeqNum :
80-
String.format("firstUnreadSeqNum=%d != first result seqNum=%d", this.firstUnreadSeqNum, serialized.getSeqNums().get(0));
90+
// determine where to begin our read
91+
int firstUnreadOffset = this.readSeqNums.nextClearBit(0);
92+
long firstUnreadSeqNum = this.minSeqNum + firstUnreadOffset;
93+
94+
// determine how many contiguous events we can read without re-emitting any already-read events
95+
int nextReadOffset = this.readSeqNums.nextSetBit(firstUnreadOffset);
96+
int effectiveLimit = (nextReadOffset < 0) ? limit : Math.min(limit, nextReadOffset - firstUnreadOffset);
97+
98+
SequencedList<byte[]> serialized = this.pageIO.read(firstUnreadSeqNum, effectiveLimit);
99+
assert serialized.getSeqNums().get(0) == firstUnreadSeqNum :
100+
String.format("firstUnreadSeqNum=%d != first result seqNum=%d", firstUnreadSeqNum, serialized.getSeqNums().get(0));
81101

82-
this.firstUnreadSeqNum += serialized.getElements().size();
102+
this.readSeqNums.set(firstUnreadOffset, firstUnreadOffset + serialized.getElements().size());
83103

84104
return serialized;
85105
}
@@ -93,7 +113,6 @@ public void write(byte[] bytes, long seqNum, int checkpointMaxWrites) throws IOE
93113

94114
if (this.minSeqNum <= 0) {
95115
this.minSeqNum = seqNum;
96-
this.firstUnreadSeqNum = seqNum;
97116
}
98117
this.elementCount++;
99118

@@ -136,7 +155,11 @@ public boolean isFullyAcked() {
136155
}
137156

138157
public long unreadCount() {
139-
return this.elementCount <= 0 ? 0 : Math.max(0, (maxSeqNum() - this.firstUnreadSeqNum) + 1);
158+
if (this.elementCount <= 0) {
159+
return 0;
160+
}
161+
162+
return this.elementCount - this.readSeqNums.cardinality();
140163
}
141164

142165
/**
@@ -187,6 +210,14 @@ public boolean ack(long firstSeqNum, int count, int checkpointMaxAcks) throws IO
187210
return done;
188211
}
189212

213+
public boolean unread(long firstSeqNum, int count) {
214+
final boolean wasFullyRead = isFullyRead();
215+
int unreadChunkFirstOffset = Ints.checkedCast(Math.subtractExact(firstSeqNum, this.minSeqNum));
216+
this.readSeqNums.clear(unreadChunkFirstOffset, unreadChunkFirstOffset+count);
217+
218+
return wasFullyRead;
219+
}
220+
190221
public void checkpoint() throws IOException {
191222
if (this.writable) {
192223
headPageCheckpoint();

logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -758,6 +758,25 @@ public void ack(final long firstAckSeqNum, final int ackCount) throws IOExceptio
758758
}
759759
}
760760

761+
public void unread(final long firstUnreadSeqNum, final int unreadCount) throws IOException {
762+
lock.lock();
763+
try {
764+
if (containsSeq(headPage, firstUnreadSeqNum)) {
765+
this.headPage.unread(firstUnreadSeqNum, unreadCount);
766+
} else {
767+
final int resultIndex = binaryFindPageForSeqnum(firstUnreadSeqNum);
768+
final Page relevantTailPage = this.tailPages.get(resultIndex);
769+
if (relevantTailPage.unread(firstUnreadSeqNum, unreadCount)) {
770+
this.unreadTailPages.add(0, relevantTailPage);
771+
}
772+
notEmpty.signalAll();
773+
}
774+
this.unreadCount -= unreadCount;
775+
} finally {
776+
lock.unlock();
777+
}
778+
}
779+
761780
public CheckpointIO getCheckpointIO() {
762781
return this.checkpointIO;
763782
}
@@ -870,6 +889,10 @@ public long getUnackedCount() {
870889
return sumPages(Page::getUnackedCount);
871890
}
872891

892+
long getPageCount() {
893+
return sumPages((page) -> 1L);
894+
}
895+
873896
private long sumPages(final ToLongFunction<Page> valueGetter) {
874897
lock.lock();
875898
try {

logstash-core/src/test/java/org/logstash/ackedqueue/QueueTest.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@
3939
import java.util.concurrent.TimeUnit;
4040
import java.util.concurrent.TimeoutException;
4141
import java.util.concurrent.atomic.AtomicInteger;
42+
import java.util.concurrent.atomic.AtomicLong;
43+
import java.util.function.Supplier;
4244

4345
import org.junit.After;
4446
import org.junit.Before;
@@ -168,6 +170,52 @@ public void writeWhenPageEqualsQueueSize() throws IOException {
168170
}
169171
}
170172

173+
@Test
174+
public void unreadEvents() throws IOException {
175+
final Settings queueSettings = TestSettings.fileSettingsBuilder(dataPath)
176+
.compressionCodecFactory(CompressionCodec.fromConfigValue("disabled"))
177+
.capacity(1024)
178+
.build();
179+
try (Queue q = new Queue(TestSettings.fileSettingsBuilder(dataPath).capacity(1024).build())) {
180+
q.open();
181+
182+
final AtomicLong atomicLong = new AtomicLong();
183+
Supplier<String> tenByteStringSupplier = () -> String.format("%010d", atomicLong.incrementAndGet());
184+
185+
// insert enough events to fill more than two pages
186+
while (q.getPageCount() < 3) {
187+
q.write(new StringElement(tenByteStringSupplier.get()));
188+
}
189+
190+
// read batches until there is only one page in the quick-pick unreadTailPages
191+
final ArrayList<Batch> batches = new ArrayList<>();
192+
while (q.unreadTailPages.size() > 1) {
193+
batches.add(q.readBatch(2, 500L));
194+
}
195+
196+
// ack the first batch
197+
final Batch ackBatch = batches.remove(0);
198+
ackBatch.close();
199+
200+
// unread the next batch, validate that the page got put back on unreadTailPages
201+
final Batch unreadBatch = batches.remove(0);
202+
unreadBatch.unread();
203+
assertThat(q.unreadTailPages.size(), is(2));
204+
205+
// read another batch, ensure it is the same as the one we previously unread
206+
final long unreadBatchFirstSeqNum = unreadBatch.firstSeqNum();
207+
final Batch nextReadBatch = q.readBatch(3, 500L); // ask for more than the known 2-event unread region
208+
assertThat(q.unreadTailPages.size(), is(1));
209+
210+
assertThat(nextReadBatch.firstSeqNum(), is(unreadBatchFirstSeqNum));
211+
assertThat(nextReadBatch.size(), is(2)); // even though we asked for more, we are getting the entire contiguously-unread region
212+
213+
// now read a new batch, and make sure it's newer than the previous batches
214+
final Batch lastBatch = q.readBatch(2, 500L);
215+
assertThat(lastBatch.firstSeqNum(), is(1 + batches.get(batches.size() - 1).firstSeqNum()));
216+
}
217+
}
218+
171219
@Test
172220
public void singleWriteMultiRead() throws IOException {
173221
try (Queue q = new Queue(TestSettings.persistedQueueSettings(100, dataPath))) {

logstash-core/src/test/java/org/logstash/ackedqueue/TestSettings.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,4 +31,8 @@ public static Settings persistedQueueSettings(int capacity, long size, String fo
3131
return SettingsImpl.fileSettingsBuilder(folder).capacity(capacity)
3232
.queueMaxBytes(size).elementClass(StringElement.class).build();
3333
}
34+
35+
public static Settings.Builder fileSettingsBuilder(String folder) {
36+
return SettingsImpl.fileSettingsBuilder(folder).elementClass(StringElement.class);
37+
}
3438
}

0 commit comments

Comments
 (0)