Skip to content

Commit ee4e132

Browse files
allow concurrent Batch deserialization (#17050) (#17106)
Currently the deserialization is behind the readBatch's lock, so any large batch will take time deserializing, causing any other Queue writer (e.g. netty executor threads) and any other Queue reader (pipeline worker) to block. This commit moves the deserialization out of the lock, allowing multiple pipeline workers to deserialize batches concurrently. - add intermediate batch-holder from `Queue` methods - make the intermediate batch-holder a private inner class of `Queue` with a descriptive name `SerializedBatchHolder` Co-authored-by: Ry Biesemeyer <[email protected]> (cherry picked from commit 637f447) Co-authored-by: João Duarte <[email protected]>
1 parent bb6c2fe commit ee4e132

File tree

1 file changed

+28
-5
lines changed
  • logstash-core/src/main/java/org/logstash/ackedqueue

1 file changed

+28
-5
lines changed

logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -590,13 +590,18 @@ public void ensurePersistedUpto(long seqNum) throws IOException{
590590
* @throws IOException if an IO error occurs
591591
*/
592592
public synchronized Batch nonBlockReadBatch(int limit) throws IOException {
593+
final SerializedBatchHolder serializedBatchHolder;
593594
lock.lock();
594595
try {
595596
Page p = nextReadPage();
596-
return (isHeadPage(p) && p.isFullyRead()) ? null : readPageBatch(p, limit, 0L);
597+
if (isHeadPage(p) && p.isFullyRead()) {
598+
return null;
599+
}
600+
serializedBatchHolder = readPageBatch(p, limit, 0L);
597601
} finally {
598602
lock.unlock();
599603
}
604+
return serializedBatchHolder.deserialize();
600605
}
601606

602607
/**
@@ -607,7 +612,11 @@ public synchronized Batch nonBlockReadBatch(int limit) throws IOException {
607612
* @throws QueueRuntimeException if queue is closed
608613
* @throws IOException if an IO error occurs
609614
*/
610-
public synchronized Batch readBatch(int limit, long timeout) throws IOException {
615+
public Batch readBatch(int limit, long timeout) throws IOException {
616+
return readSerializedBatch(limit, timeout).deserialize();
617+
}
618+
619+
private synchronized SerializedBatchHolder readSerializedBatch(int limit, long timeout) throws IOException {
611620
lock.lock();
612621

613622
try {
@@ -618,15 +627,15 @@ public synchronized Batch readBatch(int limit, long timeout) throws IOException
618627
}
619628

620629
/**
621-
* read a {@link Batch} from the given {@link Page}. If the page is a head page, try to maximize the
630+
* read a {@link SerializedBatchHolder} from the given {@link Page}. If the page is a head page, try to maximize the
622631
* batch size by waiting for writes.
623632
* @param p the {@link Page} to read from.
624633
* @param limit size limit of the batch to read.
625634
* @param timeout the maximum time to wait in milliseconds on write operations.
626635
* @return {@link Batch} with read elements or null if nothing was read
627636
* @throws IOException if an IO error occurs
628637
*/
629-
private Batch readPageBatch(Page p, int limit, long timeout) throws IOException {
638+
private SerializedBatchHolder readPageBatch(Page p, int limit, long timeout) throws IOException {
630639
int left = limit;
631640
final List<byte[]> elements = new ArrayList<>(limit);
632641

@@ -678,7 +687,7 @@ private Batch readPageBatch(Page p, int limit, long timeout) throws IOException
678687
removeUnreadPage(p);
679688
}
680689

681-
return new Batch(elements, firstSeqNum, this);
690+
return new SerializedBatchHolder(elements, firstSeqNum);
682691
}
683692

684693
/**
@@ -894,4 +903,18 @@ private static boolean containsSeq(final Page page, final long seqNum) {
894903
final long pMaxSeq = pMinSeq + (long) page.getElementCount();
895904
return seqNum >= pMinSeq && seqNum < pMaxSeq;
896905
}
906+
907+
class SerializedBatchHolder {
908+
private final List<byte[]> elements;
909+
private final long firstSeqNum;
910+
911+
private SerializedBatchHolder(List<byte[]> elements, long firstSeqNum) {
912+
this.elements = elements;
913+
this.firstSeqNum = firstSeqNum;
914+
}
915+
916+
private Batch deserialize() {
917+
return new Batch(elements, firstSeqNum, Queue.this);
918+
}
919+
}
897920
}

0 commit comments

Comments
 (0)