11const { hrtime } = require ( 'process' ) ;
22const { randomBytes } = require ( 'crypto' ) ;
3+ const PERCENTILES = [ 50 , 75 , 90 , 95 , 99 , 99.9 , 99.99 , 100 ] ;
34
45const TERMINATE_TIMEOUT_MS = process . env . TERMINATE_TIMEOUT_MS ? + process . env . TERMINATE_TIMEOUT_MS : 600000 ;
56const AUTO_COMMIT = process . env . AUTO_COMMIT || 'false' ;
@@ -58,8 +59,84 @@ function genericProduceToTopic(producer, topic, messages) {
5859 } ) ;
5960}
6061
62+
63+ // We use a simple count-sketch for latency percentiles to avoid storing all latencies in memory.
64+ // because we're also measuring the memory usage of the consumer as part of the performance tests.
65+ class LatencyCountSketch {
66+ #numBuckets;
67+ #maxValue;
68+ #buckets;
69+ #counts;
70+ #findNextMinimum;
71+ #changeBaseLogarithm;
72+ #totalCount = 0 ;
73+
74+ constructor ( {
75+ numBuckets = 600 ,
76+ error = 0.01 , // 1% error
77+ maxValue = 60000 , // max 60s latency
78+ } ) {
79+ // Each bucket represents [x * (1 - error), x * (1 + error))
80+ this . #numBuckets = numBuckets ;
81+ this . #findNextMinimum = 1 / ( 1 + error ) * ( 1 - error ) ;
82+ // Change base from natural log to log base findNextMinimum
83+ this . #changeBaseLogarithm = Math . log ( this . #findNextMinimum) ;
84+
85+ this . #maxValue = maxValue ;
86+ this . #buckets = new Array ( this . #numBuckets + 2 ) . fill ( 0 ) ;
87+ this . #buckets[ this . #numBuckets + 1 ] = Number . POSITIVE_INFINITY ;
88+ this . #buckets[ this . #numBuckets] = this . #maxValue;
89+ this . #buckets[ 0 ] = 0 ;
90+ let i = this . #numBuckets - 1 ;
91+ let currentValue = maxValue ;
92+ while ( i >= 1 ) {
93+ let nextMinimum = currentValue * this . #findNextMinimum;
94+ this . #buckets[ i ] = nextMinimum ;
95+ currentValue = nextMinimum ;
96+ i -- ;
97+ }
98+ this . #counts = new Array ( this . #numBuckets + 2 ) . fill ( 0 ) ;
99+ }
100+
101+ add ( latency ) {
102+ let idx = 0 ;
103+ if ( latency > 0 )
104+ idx = Math . floor ( this . #numBuckets - Math . log ( latency / this . #maxValue) / this . #changeBaseLogarithm) ;
105+ idx = ( idx < 0 ) ? 0 :
106+ ( idx > this . #buckets. length - 2 ) ? ( this . #buckets. length - 2 ) :
107+ idx ;
108+
109+ this . #counts[ idx ] ++ ;
110+ this . #totalCount++ ;
111+ }
112+
113+ percentiles ( percentilesArray ) {
114+ const percentileCounts = percentilesArray . map ( p => Math . ceil ( this . #totalCount * p / 100 ) ) ;
115+ const percentileResults = new Array ( percentilesArray . length ) ;
116+ var totalCountSoFar = 0 ;
117+ let j = 0 ;
118+ let sum = 0 ;
119+ for ( let i = 0 ; i < this . #counts. length ; i ++ ) {
120+ sum += this . #counts[ i ] ;
121+ }
122+ for ( let i = 0 ; i < percentileCounts . length ; i ++ ) {
123+ while ( ( totalCountSoFar < percentileCounts [ i ] ) && ( j < this . #counts. length - 1 ) ) {
124+ totalCountSoFar += this . #counts[ j ] ;
125+ j ++ ;
126+ }
127+ const bucketIndex = ( j < this . #counts. length - 1 ) ? j : this . #counts. length - 2 ;
128+ percentileResults [ i ] = [ this . #buckets[ bucketIndex ] , totalCountSoFar , this . #totalCount] ;
129+ }
130+ return percentileResults ;
131+ }
132+ }
133+
61134async function runConsumer ( consumer , topic , warmupMessages , totalMessageCnt , eachBatch , partitionsConsumedConcurrently , stats , actionOnMessages ) {
62135 const handlers = installHandlers ( totalMessageCnt === - 1 ) ;
136+ if ( stats ) {
137+ stats . percentilesTOT1 = new LatencyCountSketch ( { } ) ;
138+ stats . percentilesTOT2 = new LatencyCountSketch ( { } ) ;
139+ }
63140 while ( true ) {
64141 try {
65142 await consumer . connect ( ) ;
@@ -106,6 +183,7 @@ async function runConsumer(consumer, topic, warmupMessages, totalMessageCnt, eac
106183 stats . maxLatencyT0T1 = Math . max ( stats . maxLatencyT0T1 , latency ) ;
107184 stats . avgLatencyT0T1 = ( ( stats . avgLatencyT0T1 * ( numMessages - 1 ) ) + latency ) / numMessages ;
108185 }
186+ stats . percentilesTOT1 . add ( latency ) ;
109187 } else {
110188 if ( ! stats . maxLatencyT0T2 ) {
111189 stats . maxLatencyT0T2 = latency ;
@@ -114,6 +192,7 @@ async function runConsumer(consumer, topic, warmupMessages, totalMessageCnt, eac
114192 stats . maxLatencyT0T2 = Math . max ( stats . maxLatencyT0T2 , latency ) ;
115193 stats . avgLatencyT0T2 = ( ( stats . avgLatencyT0T2 * ( numMessages - 1 ) ) + latency ) / numMessages ;
116194 }
195+ stats . percentilesTOT2 . add ( latency ) ;
117196 }
118197 } ;
119198
@@ -257,6 +336,18 @@ async function runConsumer(consumer, topic, warmupMessages, totalMessageCnt, eac
257336 stats . messageRate = durationSeconds > 0 ?
258337 ( messagesMeasured / durationSeconds ) : Infinity ;
259338 stats . durationSeconds = durationSeconds ;
339+ stats . percentilesTOT1 = stats . percentilesTOT1 . percentiles ( PERCENTILES ) . map ( ( value , index ) => ( {
340+ percentile : PERCENTILES [ index ] ,
341+ value : value [ 0 ] ,
342+ count : value [ 1 ] ,
343+ total : value [ 2 ] ,
344+ } ) ) ;
345+ stats . percentilesTOT2 = stats . percentilesTOT2 . percentiles ( PERCENTILES ) . map ( ( value , index ) => ( {
346+ percentile : PERCENTILES [ index ] ,
347+ value : value [ 0 ] ,
348+ count : value [ 1 ] ,
349+ total : value [ 2 ] ,
350+ } ) ) ;
260351 }
261352 removeHandlers ( handlers ) ;
262353 return rate ;
0 commit comments