1717import java .util .HashMap ;
1818import java .util .List ;
1919import java .util .Map ;
20+ import java .util .Set ;
2021
2122import static org .elasticsearch .index .mapper .DateFieldMapper .DEFAULT_DATE_TIME_FORMATTER ;
2223import static org .hamcrest .Matchers .closeTo ;
@@ -39,7 +40,11 @@ record Doc(String host, String cluster, long timestamp, int requestCount, double
3940 final Map <String , Integer > hostToRate = new HashMap <>();
4041 final Map <String , Integer > hostToCpu = new HashMap <>();
4142
42- static final float DEVIATION_LIMIT = 0.2f ;
43+ // We allow a deviation of 15% from the expected rate (which includex an expected drop of 10%).
44+ static final float DEVIATION_LIMIT = 0.15f ;
45+ // We expect a 10% drop in the rate due to not covering window edges and not triggering
46+ // extrapolation logic in the time series engine.
47+ static final float EXPECTED_DROP_RATE = 0.10f ;
4348 static final int LIMIT = 5 ;
4449
4550 @ Before
@@ -71,24 +76,28 @@ public void populateIndex() {
7176 hostToCpu .put ("p" + i , randomIntBetween (0 , 100 ));
7277 }
7378 long timestamp = DEFAULT_DATE_TIME_FORMATTER .parseMillis ("2024-04-15T00:00:00Z" );
74- int numDocs = between (60 , 100 );
79+ int numDocs = between (100 , 300 );
7580 docs .clear ();
81+ // We want docs to span a 6-minute period, so we need to adapt their spacing accordingly.
82+ var tsPerDoc = 360.0 / numDocs ; // 6 minutes divided by number of docs
7683
7784 for (int i = 0 ; i < numDocs ; i ++) {
78- final var tsChange = between ( 1 , 10 );
79- timestamp += tsChange * 1000L ;
80- // List<String> hosts = randomSubsetOf(between(1, hostToClusters.size()), hostToClusters.keySet());
81- // for (String host : hosts) {
85+ final var tsChange = randomDoubleBetween ( tsPerDoc - 1.0 , tsPerDoc + 1.0 , true );
86+ timestamp += Math . round ( tsChange * 1000 ) ;
87+ // We want a subset of hosts to have docs within a give time point.
88+ var hosts = Set . copyOf ( randomSubsetOf ( between ( 2 , hostToClusters . size ()), hostToClusters . keySet ()));
8289 for (String host : hostToClusters .keySet ()) {
8390 var requestCount = requestCounts .compute (host , (k , curr ) -> {
84- // 10 % chance of reset
85- if (randomInt (100 ) <= 10 ) { // todo change 0 to 10
91+ // 15 % chance of reset
92+ if (randomInt (100 ) <= 15 ) {
8693 return Math .toIntExact (Math .round (hostToRate .get (k ) * tsChange ));
8794 } else {
8895 return Math .toIntExact (Math .round ((curr == null ? 0 : curr ) + hostToRate .get (k ) * tsChange ));
8996 }
9097 });
91- docs .add (new Doc (host , hostToClusters .get (host ), timestamp , requestCount , hostToCpu .get (host )));
98+ if (hosts .contains (host )) {
99+ docs .add (new Doc (host , hostToClusters .get (host ), timestamp , requestCount , hostToCpu .get (host )));
100+ }
92101 }
93102 }
94103 Randomness .shuffle (docs );
@@ -115,20 +124,35 @@ public void populateIndex() {
115124 private String hostTable () {
116125 StringBuilder sb = new StringBuilder ();
117126 for (String host : hostToClusters .keySet ()) {
127+ var docsForHost = docs .stream ().filter (d -> d .host ().equals (host )).toList ();
118128 sb .append (host )
119129 .append (" -> " )
120130 .append (hostToClusters .get (host ))
121131 .append (", rate=" )
122132 .append (hostToRate .get (host ))
123133 .append (", cpu=" )
124134 .append (hostToCpu .get (host ))
135+ .append (", numDocs=" )
136+ .append (docsForHost .size ())
125137 .append ("\n " );
126138 }
127139 // Now we add total rate and total CPU used:
128140 sb .append ("Total rate: " ).append (hostToRate .values ().stream ().mapToInt (a -> a ).sum ()).append ("\n " );
129141 sb .append ("Average rate: " ).append (hostToRate .values ().stream ().mapToInt (a -> a ).average ().orElseThrow ()).append ("\n " );
130142 sb .append ("Total CPU: " ).append (hostToCpu .values ().stream ().mapToInt (a -> a ).sum ()).append ("\n " );
131143 sb .append ("Average CPU: " ).append (hostToCpu .values ().stream ().mapToInt (a -> a ).average ().orElseThrow ()).append ("\n " );
144+ // Add global info
145+ sb .append ("Count of docs: " ).append (docs .size ()).append ("\n " );
146+ // Add docs per minute
147+ sb .append ("Docs in each minute:\n " );
148+ Map <Long , Integer > docsPerMinute = new HashMap <>();
149+ for (Doc doc : docs ) {
150+ long minute = (doc .timestamp / 60000 ) % 1000 ; // convert to minutes
151+ docsPerMinute .merge (minute , 1 , Integer ::sum );
152+ }
153+ for (Map .Entry <Long , Integer > entry : docsPerMinute .entrySet ()) {
154+ sb .append ("Minute " ).append (entry .getKey ()).append (": " ).append (entry .getValue ()).append (" docs\n " );
155+ }
132156 return sb .toString ();
133157 }
134158
@@ -149,11 +173,11 @@ public void testRateWithTimeBucketSumByMin() {
149173 equalTo (List .of (new ColumnInfoImpl ("sum(rate(request_count))" , "double" , null ), new ColumnInfoImpl ("ts" , "date" , null )))
150174 );
151175 assertThat (values , hasSize (LIMIT ));
152- for (int i = 0 ; i < LIMIT ; i ++) {
176+ for (int i = 0 ; i < values . size () ; i ++) {
153177 List <Object > row = values .get (i );
154178 assertThat (row , hasSize (2 ));
155179 var totalRate = hostToRate .values ().stream ().mapToDouble (a -> a + 0.0 ).sum ();
156- assertThat ((double ) row .get (0 ), closeTo (totalRate , DEVIATION_LIMIT * totalRate ));
180+ assertThat ((double ) row .get (0 ), closeTo (totalRate * ( 1 - EXPECTED_DROP_RATE ) , DEVIATION_LIMIT * totalRate ));
157181 }
158182 } catch (AssertionError e ) {
159183 throw new AssertionError ("Values:\n " + valuesTable (values ) + "\n Hosts:\n " + hostTable (), e );
@@ -163,17 +187,21 @@ public void testRateWithTimeBucketSumByMin() {
163187
164188 public void testRateWithTimeBucketAvgByMin () {
165189 try (var resp = run ("TS hosts | STATS avg(rate(request_count)) BY ts=bucket(@timestamp, 1minute) | SORT ts | LIMIT 5" )) {
166- assertThat (
167- resp .columns (),
168- equalTo (List .of (new ColumnInfoImpl ("avg(rate(request_count))" , "double" , null ), new ColumnInfoImpl ("ts" , "date" , null )))
169- );
170- List <List <Object >> values = EsqlTestUtils .getValuesList (resp );
171- assertThat (values , hasSize (5 ));
172- for (int i = 0 ; i < 5 ; i ++) {
173- List <Object > row = values .get (i );
174- assertThat (row , hasSize (2 ));
175- var expectedRate = hostToRate .values ().stream ().mapToDouble (a -> a + 0.0 ).sum () / hostToRate .size ();
176- assertThat ((double ) row .get (0 ), closeTo (expectedRate , DEVIATION_LIMIT * expectedRate ));
190+ try {
191+ assertThat (
192+ resp .columns (),
193+ equalTo (List .of (new ColumnInfoImpl ("avg(rate(request_count))" , "double" , null ), new ColumnInfoImpl ("ts" , "date" , null )))
194+ );
195+ List <List <Object >> values = EsqlTestUtils .getValuesList (resp );
196+ assertThat (values , hasSize (LIMIT ));
197+ for (int i = 0 ; i < values .size (); i ++) {
198+ List <Object > row = values .get (i );
199+ assertThat (row , hasSize (2 ));
200+ var expectedRate = hostToRate .values ().stream ().mapToDouble (a -> a + 0.0 ).sum () / hostToRate .size ();
201+ assertThat ((double ) row .get (0 ), closeTo (expectedRate * (1 - EXPECTED_DROP_RATE ), DEVIATION_LIMIT * expectedRate ));
202+ }
203+ } catch (AssertionError e ) {
204+ throw new AssertionError ("Values:\n " + valuesTable (EsqlTestUtils .getValuesList (resp )) + "\n Hosts:\n " + hostTable (), e );
177205 }
178206 }
179207 }
@@ -195,7 +223,7 @@ public void testRateWithTimeBucketSumByMinAndLimitAsParam() {
195223 List <Object > row = values .get (i );
196224 assertThat (row , hasSize (2 ));
197225 double expectedAvg = hostToRate .values ().stream ().mapToDouble (d -> d ).sum () / hostToRate .size ();
198- assertThat ((double ) row .get (0 ), closeTo (expectedAvg , DEVIATION_LIMIT * expectedAvg ));
226+ assertThat ((double ) row .get (0 ), closeTo (expectedAvg * ( 1 - EXPECTED_DROP_RATE ) , DEVIATION_LIMIT * expectedAvg ));
199227 }
200228 } catch (AssertionError e ) {
201229 throw new AssertionError ("Values:\n " + valuesTable (EsqlTestUtils .getValuesList (resp )) + "\n Hosts:\n " + hostTable (), e );
@@ -230,7 +258,7 @@ public void testRateWithTimeBucketAndClusterSumByMin() {
230258 .filter (e -> e .getValue ().equals (cluster ))
231259 .mapToDouble (e -> hostToRate .get (e .getKey ()) + 0.0 )
232260 .sum ();
233- assertThat ((double ) row .get (0 ), closeTo (expectedRate , DEVIATION_LIMIT * expectedRate ));
261+ assertThat ((double ) row .get (0 ), closeTo (expectedRate * ( 1 - EXPECTED_DROP_RATE ) , DEVIATION_LIMIT * expectedRate ));
234262 }
235263 } catch (AssertionError e ) {
236264 throw new AssertionError ("Values:\n " + valuesTable (EsqlTestUtils .getValuesList (resp )) + "\n Hosts:\n " + hostTable (), e );
@@ -265,7 +293,7 @@ public void testRateWithTimeBucketAndClusterAvgByMin() {
265293 .mapToDouble (e -> hostToRate .get (e .getKey ()) + 0.0 )
266294 .average ()
267295 .orElseThrow ();
268- assertThat ((double ) row .get (0 ), closeTo (expectedAvg , DEVIATION_LIMIT * expectedAvg ));
296+ assertThat ((double ) row .get (0 ), closeTo (expectedAvg * ( 1 - EXPECTED_DROP_RATE ) , DEVIATION_LIMIT * expectedAvg ));
269297 }
270298 } catch (AssertionError e ) {
271299 throw new AssertionError ("Values:\n " + valuesTable (EsqlTestUtils .getValuesList (resp )) + "\n Hosts:\n " + hostTable (), e );
@@ -316,12 +344,9 @@ public void testRateWithTimeBucketAndClusterMultipleStatsByMin() {
316344 .mapToDouble (e -> hostToRate .get (e .getKey ()) + 0.0 )
317345 .max ()
318346 .orElseThrow ();
319- assertThat ((double ) row .get (0 ), closeTo (expectedAvg , DEVIATION_LIMIT * expectedAvg ));
320- assertThat ((double ) row .get (2 ), closeTo (expectedAvg , DEVIATION_LIMIT * expectedAvg ));
321- assertThat (
322- (double ) row .get (1 ),
323- closeTo (hostToRate .values ().stream ().mapToDouble (d -> d ).max ().orElseThrow (), DEVIATION_LIMIT * expectedMax )
324- );
347+ assertThat ((double ) row .get (0 ), closeTo (expectedAvg * (1 - EXPECTED_DROP_RATE ), DEVIATION_LIMIT * expectedAvg ));
348+ assertThat ((double ) row .get (2 ), closeTo (expectedAvg * (1 - EXPECTED_DROP_RATE ), DEVIATION_LIMIT * expectedAvg ));
349+ assertThat ((double ) row .get (1 ), closeTo (expectedMax * (1 - EXPECTED_DROP_RATE ), DEVIATION_LIMIT * expectedMax ));
325350 }
326351 } catch (AssertionError e ) {
327352 throw new AssertionError ("Values:\n " + valuesTable (EsqlTestUtils .getValuesList (resp )) + "\n Hosts:\n " + hostTable (), e );
@@ -356,14 +381,14 @@ public void testRateWithTimeBucketAndClusterMultipleMetricsByMin() {
356381 .filter (e -> e .getValue ().equals (cluster ))
357382 .mapToDouble (e -> hostToRate .get (e .getKey ()) + 0.0 )
358383 .sum ();
359- assertThat ((double ) row .get (0 ), closeTo (expectedRate , DEVIATION_LIMIT * expectedRate ));
384+ assertThat ((double ) row .get (0 ), closeTo (expectedRate * ( 1 - EXPECTED_DROP_RATE ) , DEVIATION_LIMIT * expectedRate ));
360385 var expectedCpu = hostToClusters .entrySet ()
361386 .stream ()
362387 .filter (e -> e .getValue ().equals (cluster ))
363388 .mapToDouble (e -> hostToCpu .get (e .getKey ()) + 0.0 )
364389 .max ()
365390 .orElseThrow ();
366- assertThat ((double ) row .get (1 ), closeTo (expectedCpu , DEVIATION_LIMIT * expectedCpu ));
391+ assertThat ((double ) row .get (1 ), closeTo (expectedCpu * ( 1 - EXPECTED_DROP_RATE ) , DEVIATION_LIMIT * expectedCpu ));
367392 }
368393 } catch (AssertionError e ) {
369394 throw new AssertionError ("Values:\n " + valuesTable (EsqlTestUtils .getValuesList (resp )) + "\n Hosts:\n " + hostTable (), e );
@@ -398,7 +423,7 @@ public void testRateWithTimeBucketAndClusterMultipleMetricsAvgByMin() {
398423 .filter (e -> e .getValue ().equals (cluster ))
399424 .mapToDouble (e -> hostToRate .get (e .getKey ()) + 0.0 )
400425 .sum ();
401- assertThat ((double ) row .get (0 ), closeTo (expectedRate , DEVIATION_LIMIT * expectedRate ));
426+ assertThat ((double ) row .get (0 ), closeTo (expectedRate * ( 1 - EXPECTED_DROP_RATE ) , DEVIATION_LIMIT * expectedRate ));
402427 var expectedCpu = hostToClusters .entrySet ()
403428 .stream ()
404429 .filter (e -> e .getValue ().equals (cluster ))
0 commit comments