3030import java .nio .file .Path ;
3131import java .nio .file .Paths ;
3232import java .util .ArrayList ;
33+ import java .util .Date ;
3334import java .util .List ;
34- import java .util .concurrent . TimeUnit ;
35+ import java .util .Objects ;
3536import java .util .concurrent .atomic .AtomicBoolean ;
3637import java .util .concurrent .locks .Condition ;
3738import java .util .concurrent .locks .ReentrantLock ;
39+
3840import org .apache .logging .log4j .LogManager ;
3941import org .apache .logging .log4j .Logger ;
4042import org .logstash .FileLockFactory ;
@@ -64,6 +66,10 @@ public final class Queue implements Closeable {
6466
6567 protected volatile long unreadCount ;
6668
69+ // the readDemand is a record of the currently-waiting-reader's demand and expiry
70+ // it *MUST ONLY* be accessed when `lock.isHeldByCurrentThread() == true`
71+ private ReadDemand readDemand ;
72+
6773 private final CheckpointIO checkpointIO ;
6874 private final int pageCapacity ;
6975 private final long maxBytes ;
@@ -428,6 +434,10 @@ public long write(Queueable element) throws IOException {
428434 throw new QueueRuntimeException (QueueExceptionMessages .BIGGER_DATA_THAN_PAGE_SIZE );
429435 }
430436
437+ // since a reader's batch cannot span multiple pages,
438+ // we flag a force-flush when changing the head page.
439+ boolean needsForceFlush = false ;
440+
431441 // create a new head page if the current does not have sufficient space left for data to be written
432442 if (!this .headPage .hasSpace (data .length )) {
433443
@@ -446,13 +456,14 @@ public long write(Queueable element) throws IOException {
446456
447457 // create new head page
448458 newCheckpointedHeadpage (newHeadPageNum );
459+ needsForceFlush = true ;
449460 }
450461
451462 long seqNum = this .seqNum += 1 ;
452463 this .headPage .write (data , seqNum , this .checkpointMaxWrites );
453464 this .unreadCount ++;
454465
455- notEmpty . signal ( );
466+ maybeSignalReadDemand ( needsForceFlush );
456467
457468 // now check if we reached a queue full state and block here until it is not full
458469 // for the next write or the queue was closed.
@@ -647,7 +658,7 @@ private SerializedBatchHolder readPageBatch(Page p, int limit, long timeout) thr
647658 boolean elapsed ;
648659 // a head page is fully read but can be written to so let's wait for more data
649660 try {
650- elapsed = !notEmpty . await (timeout , TimeUnit . MILLISECONDS );
661+ elapsed = !awaitReadDemand (timeout , left );
651662 } catch (InterruptedException e ) {
652663 // set back the interrupted flag
653664 Thread .currentThread ().interrupt ();
@@ -917,4 +928,39 @@ private Batch deserialize() {
917928 return new Batch (elements , firstSeqNum , Queue .this );
918929 }
919930 }
931+
932+ private boolean awaitReadDemand (final long timeoutMillis , final int elementsNeeded ) throws InterruptedException {
933+ assert this .lock .isHeldByCurrentThread ();
934+
935+ final long deadlineMillis = Math .addExact (System .currentTimeMillis (), timeoutMillis );
936+ this .readDemand = new ReadDemand (deadlineMillis , elementsNeeded );
937+
938+ boolean unElapsed = this .notEmpty .awaitUntil (new Date (deadlineMillis ));
939+ this .readDemand = null ;
940+ return unElapsed ;
941+ }
942+
943+ private void maybeSignalReadDemand (boolean forceSignal ) {
944+ assert this .lock .isHeldByCurrentThread ();
945+
946+ // if we're not forcing, and if the current read demand has
947+ // neither been met nor expired, this method becomes a no-op.
948+ if (!forceSignal && Objects .nonNull (readDemand )) {
949+ if (unreadCount < readDemand .elementsNeeded && System .currentTimeMillis () < readDemand .deadlineMillis ) {
950+ return ;
951+ }
952+ }
953+
954+ this .notEmpty .signal ();
955+ }
956+
957+ private static class ReadDemand {
958+ final long deadlineMillis ;
959+ final int elementsNeeded ;
960+
961+ ReadDemand (long deadlineMillis , int elementsNeeded ) {
962+ this .deadlineMillis = deadlineMillis ;
963+ this .elementsNeeded = elementsNeeded ;
964+ }
965+ }
920966}
0 commit comments