Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions logstash-core/src/main/java/org/logstash/ackedqueue/Batch.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ public void close() throws IOException {
}
}

public void unread() throws IOException {
if (firstSeqNum >= 0L) {
this.queue.unread(firstSeqNum, elements.size());
}
}

public int size() {
return elements.size();
}
Expand All @@ -77,6 +83,14 @@ public Queue getQueue() {
return queue;
}

long firstSeqNum() {
return firstSeqNum;
}

long lastSeqNum() {
return firstSeqNum + elements.size();
}

/**
*
* @param serialized Collection of serialized elements
Expand Down
75 changes: 57 additions & 18 deletions logstash-core/src/main/java/org/logstash/ackedqueue/Page.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,53 +33,73 @@
* the status of the page (writeable or not).
* */
public final class Page implements Closeable {
protected final int pageNum;
protected long minSeqNum; // TODO: see if we can make it final?
protected int elementCount;
protected long firstUnreadSeqNum;
protected final Queue queue;
protected PageIO pageIO;
final int pageNum;
private long minSeqNum; // TODO: see if we can make it final?
private int elementCount;
private final Queue queue;
private final PageIO pageIO;
private boolean writable;

// bit 0 is minSeqNum
// TODO: go steal LocalCheckpointService in feature/seq_no from ES
// TODO: https://github.com/elastic/elasticsearch/blob/feature/seq_no/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointService.java
protected BitSet ackedSeqNums;
protected Checkpoint lastCheckpoint;
private final BitSet ackedSeqNums;
private final BitSet readSeqNums;
private Checkpoint lastCheckpoint;

public Page(int pageNum, Queue queue, long minSeqNum, int elementCount, long firstUnreadSeqNum, BitSet ackedSeqNums, @NotNull PageIO pageIO, boolean writable) {
// Use {@link PageFactory}
Page(int pageNum, Queue queue, long minSeqNum, int elementCount, long firstUnreadSeqNum, BitSet ackedSeqNums, @NotNull PageIO pageIO, boolean writable) {
this.pageNum = pageNum;
this.queue = queue;

this.minSeqNum = minSeqNum;
this.elementCount = elementCount;
this.firstUnreadSeqNum = firstUnreadSeqNum;
this.ackedSeqNums = ackedSeqNums;
this.readSeqNums = readSeqNums(minSeqNum, firstUnreadSeqNum, elementCount);
this.lastCheckpoint = new Checkpoint(0, 0, 0, 0, 0);
this.pageIO = pageIO;
this.writable = writable;

assert this.pageIO != null : "invalid null pageIO";
}

private static BitSet readSeqNums(final long minSeqNum, final long firstUnreadSeqNum, final int elementCount) {
final BitSet bitSet = new BitSet(elementCount);
if (firstUnreadSeqNum > minSeqNum) {
bitSet.set(0, (int) (firstUnreadSeqNum - minSeqNum));
}
return bitSet;
}

public String toString() {
return "pageNum=" + this.pageNum + ", minSeqNum=" + this.minSeqNum + ", elementCount=" + this.elementCount + ", firstUnreadSeqNum=" + this.firstUnreadSeqNum;
return "pageNum=" + this.pageNum +
", minSeqNum=" + this.minSeqNum +
", elementCount=" + this.elementCount +
", firstUnreadSeqNum=" + (this.minSeqNum + this.readSeqNums.nextClearBit(0));
}

/**
* @param limit the maximum number of elements to read, actual number readcan be smaller
* @param limit the maximum number of elements to read, actual number read can be smaller
* @return {@link SequencedList} collection of serialized elements read
* @throws IOException if an IO error occurs
*/
public SequencedList<byte[]> read(int limit) throws IOException {
// first make sure this page is activated, activating previously activated is harmless
this.pageIO.activate();

SequencedList<byte[]> serialized = this.pageIO.read(this.firstUnreadSeqNum, limit);
assert serialized.getSeqNums().get(0) == this.firstUnreadSeqNum :
String.format("firstUnreadSeqNum=%d != first result seqNum=%d", this.firstUnreadSeqNum, serialized.getSeqNums().get(0));
// determine where to begin our read
int firstUnreadOffset = this.readSeqNums.nextClearBit(0);
long firstUnreadSeqNum = this.minSeqNum + firstUnreadOffset;

// determine how many contiguous events we can read without re-emitting any already-read events
int nextReadOffset = this.readSeqNums.nextSetBit(firstUnreadOffset);
int effectiveLimit = (nextReadOffset < 0) ? limit : Math.min(limit, nextReadOffset - firstUnreadOffset);

this.firstUnreadSeqNum += serialized.getElements().size();
SequencedList<byte[]> serialized = this.pageIO.read(firstUnreadSeqNum, effectiveLimit);
assert serialized.getSeqNums().get(0) == firstUnreadSeqNum :
String.format("firstUnreadSeqNum=%d != first result seqNum=%d", firstUnreadSeqNum, serialized.getSeqNums().get(0));

this.readSeqNums.set(firstUnreadOffset, firstUnreadOffset + serialized.getElements().size());

return serialized;
}
Expand All @@ -93,7 +113,6 @@ public void write(byte[] bytes, long seqNum, int checkpointMaxWrites) throws IOE

if (this.minSeqNum <= 0) {
this.minSeqNum = seqNum;
this.firstUnreadSeqNum = seqNum;
}
this.elementCount++;

Expand Down Expand Up @@ -121,14 +140,26 @@ public boolean isFullyRead() {
// return this.elementCount <= 0 || this.firstUnreadSeqNum > maxSeqNum();
}

public int getAckedCount() {
return ackedSeqNums.cardinality();
}

public long getUnackedCount() {
return this.elementCount - ackedSeqNums.cardinality();
}

public boolean isFullyAcked() {
final int cardinality = ackedSeqNums.cardinality();
return elementCount > 0 && cardinality == ackedSeqNums.length()
&& cardinality == elementCount;
}

public long unreadCount() {
return this.elementCount <= 0 ? 0 : Math.max(0, (maxSeqNum() - this.firstUnreadSeqNum) + 1);
if (this.elementCount <= 0) {
return 0;
}

return this.elementCount - this.readSeqNums.cardinality();
}

/**
Expand Down Expand Up @@ -179,6 +210,14 @@ public boolean ack(long firstSeqNum, int count, int checkpointMaxAcks) throws IO
return done;
}

public boolean unread(long firstSeqNum, int count) {
final boolean wasFullyRead = isFullyRead();
int unreadChunkFirstOffset = Ints.checkedCast(Math.subtractExact(firstSeqNum, this.minSeqNum));
this.readSeqNums.clear(unreadChunkFirstOffset, unreadChunkFirstOffset+count);

return wasFullyRead;
}

public void checkpoint() throws IOException {
if (this.writable) {
headPageCheckpoint();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import java.io.IOException;
import java.util.BitSet;
import java.util.Objects;

class PageFactory {

Expand All @@ -36,7 +37,7 @@ class PageFactory {
* @return {@link Page} the new head page
*/
public static Page newHeadPage(int pageNum, Queue queue, PageIO pageIO) {
return new Page(pageNum, queue, 0, 0, 0, new BitSet(), pageIO, true);
return new Page(pageNum, queue, 0, 0, 0, ackedSeqNums(null), pageIO, true);
}

/**
Expand All @@ -48,30 +49,19 @@ public static Page newHeadPage(int pageNum, Queue queue, PageIO pageIO) {
* @return {@link Page} the new head page
*/
public static Page newHeadPage(Checkpoint checkpoint, Queue queue, PageIO pageIO) throws IOException {
final Page p = new Page(
assert checkpoint.getMinSeqNum() == pageIO.getMinSeqNum() && checkpoint.getElementCount() == pageIO.getElementCount() :
String.format("checkpoint minSeqNum=%d or elementCount=%d is different than pageIO minSeqNum=%d or elementCount=%d", checkpoint.getMinSeqNum(), checkpoint.getElementCount(), pageIO.getMinSeqNum(), pageIO.getElementCount());

return new Page(
checkpoint.getPageNum(),
queue,
checkpoint.getMinSeqNum(),
checkpoint.getElementCount(),
checkpoint.getFirstUnackedSeqNum(),
new BitSet(),
ackedSeqNums(checkpoint),
pageIO,
true
);
try {
assert checkpoint.getMinSeqNum() == pageIO.getMinSeqNum() && checkpoint.getElementCount() == pageIO.getElementCount() :
String.format("checkpoint minSeqNum=%d or elementCount=%d is different than pageIO minSeqNum=%d or elementCount=%d", checkpoint.getMinSeqNum(), checkpoint.getElementCount(), pageIO.getMinSeqNum(), pageIO.getElementCount());

// this page ackedSeqNums bitset is a new empty bitset, if we have some acked elements, set them in the bitset
if (checkpoint.getFirstUnackedSeqNum() > checkpoint.getMinSeqNum()) {
p.ackedSeqNums.flip(0, (int) (checkpoint.getFirstUnackedSeqNum() - checkpoint.getMinSeqNum()));
}

return p;
} catch (Exception e) {
p.close();
throw e;
}
}

/**
Expand All @@ -83,28 +73,25 @@ public static Page newHeadPage(Checkpoint checkpoint, Queue queue, PageIO pageIO
* @return {@link Page} the new tail page
*/
public static Page newTailPage(Checkpoint checkpoint, Queue queue, PageIO pageIO) throws IOException {
final Page p = new Page(
return new Page(
checkpoint.getPageNum(),
queue,
checkpoint.getMinSeqNum(),
checkpoint.getElementCount(),
checkpoint.getFirstUnackedSeqNum(),
new BitSet(),
ackedSeqNums(checkpoint),
pageIO,
false
);
}

try {
// this page ackedSeqNums bitset is a new empty bitset, if we have some acked elements, set them in the bitset
if (checkpoint.getFirstUnackedSeqNum() > checkpoint.getMinSeqNum()) {
p.ackedSeqNums.flip(0, (int) (checkpoint.getFirstUnackedSeqNum() - checkpoint.getMinSeqNum()));
}
private static BitSet ackedSeqNums(final Checkpoint checkpoint) {
final BitSet ackedSeqNums = new BitSet();

return p;
} catch (Exception e) {
p.close();
throw e;
if (Objects.nonNull(checkpoint) && checkpoint.getFirstUnackedSeqNum() > checkpoint.getMinSeqNum()) {
ackedSeqNums.set(0, (int) (checkpoint.getFirstUnackedSeqNum() - checkpoint.getMinSeqNum()));
}
}

return ackedSeqNums;
}
}
Loading