@@ -449,6 +449,7 @@ private static final class InnerSubscriber<T> extends Subscriber<T> {
449
449
private boolean mayNeedToDrain = false ;
450
450
/* protected by emitLock */
451
451
int emitted = 0 ;
452
+ final int THRESHOLD = (int ) (q .capacity () * 0.7 );
452
453
453
454
public InnerSubscriber (MergeSubscriber <T > parent , MergeProducer <T > producer ) {
454
455
this .parentSubscriber = parent ;
@@ -553,8 +554,25 @@ private void emit(T t, boolean complete) {
553
554
} finally {
554
555
drain = parentSubscriber .releaseEmitLock ();
555
556
}
556
- if (emitted > 256 ) {
557
+ if (emitted > THRESHOLD ) {
557
558
// this is for batching requests when we're in a use case that isn't queueing, always fast-pathing the onNext
559
+ /**
560
+ * <pre> {@code
561
+ * Without this batching:
562
+ *
563
+ * Benchmark (size) Mode Samples Score Score error Units
564
+ * r.o.OperatorMergePerf.merge1SyncStreamOfN 1 thrpt 5 5060743.715 100445.513 ops/s
565
+ * r.o.OperatorMergePerf.merge1SyncStreamOfN 1000 thrpt 5 36606.582 1610.582 ops/s
566
+ * r.o.OperatorMergePerf.merge1SyncStreamOfN 1000000 thrpt 5 38.476 0.973 ops/s
567
+ *
568
+ * With this batching:
569
+ *
570
+ * Benchmark (size) Mode Samples Score Score error Units
571
+ * r.o.OperatorMergePerf.merge1SyncStreamOfN 1 thrpt 5 5367945.738 262740.137 ops/s
572
+ * r.o.OperatorMergePerf.merge1SyncStreamOfN 1000 thrpt 5 62703.930 8496.036 ops/s
573
+ * r.o.OperatorMergePerf.merge1SyncStreamOfN 1000000 thrpt 5 72.711 3.746 ops/s
574
+ *} </pre>
575
+ */
558
576
request (emitted );
559
577
// we are modifying this outside of the emit lock ... but this can be considered a "lazySet"
560
578
// and it will be flushed before anything else touches it because the emitLock will be obtained
0 commit comments