2727import  org .elasticsearch .xpack .esql .action .EsqlQueryAction ;
2828import  org .elasticsearch .xpack .esql .action .EsqlQueryRequest ;
2929import  org .elasticsearch .xpack .esql .action .EsqlQueryResponse ;
30+ import  org .elasticsearch .xpack .esql .plugin .QueryPragmas ;
3031
3132import  java .io .IOException ;
3233import  java .time .Instant ;
@@ -128,6 +129,10 @@ public void testAggMetricInEsqlTSAfterDownsampling() throws Exception {
128129                "cpu": { 
129130                  "type": "double", 
130131                  "time_series_metric": "gauge" 
132+                 }, 
133+                 "request": { 
134+                   "type": "double", 
135+                   "time_series_metric": "counter" 
131136                } 
132137              } 
133138            } 
@@ -144,6 +149,7 @@ public void testAggMetricInEsqlTSAfterDownsampling() throws Exception {
144149                    .field ("host" , randomFrom ("host1" , "host2" , "host3" ))
145150                    .field ("cluster" , randomFrom ("cluster1" , "cluster2" , "cluster3" ))
146151                    .field ("cpu" , randomDouble ())
152+                     .field ("request" , randomDoubleBetween (0 , 100 , true ))
147153                    .endObject ();
148154            } catch  (IOException  e ) {
149155                throw  new  RuntimeException (e );
@@ -155,6 +161,7 @@ public void testAggMetricInEsqlTSAfterDownsampling() throws Exception {
155161        assertAcked (client ().admin ().indices ().rolloverIndex (new  RolloverRequest (dataStreamName , null )));
156162        List <String > backingIndices  = waitForDataStreamBackingIndices (dataStreamName , 2 );
157163        String  sourceIndex  = backingIndices .get (0 );
164+         String  secondIndex  = backingIndices .get (1 );
158165        String  interval  = "5m" ;
159166        String  targetIndex  = "downsample-"  + interval  + "-"  + sourceIndex ;
160167        // Set the source index to read-only state 
@@ -211,6 +218,7 @@ public void testAggMetricInEsqlTSAfterDownsampling() throws Exception {
211218                    .field ("host" , randomFrom ("host1" , "host2" , "host3" ))
212219                    .field ("cluster" , randomFrom ("cluster1" , "cluster2" , "cluster3" ))
213220                    .field ("cpu" , randomDouble ())
221+                     .field ("request" , randomDoubleBetween (0 , 100 , true ))
214222                    .endObject ();
215223            } catch  (IOException  e ) {
216224                throw  new  RuntimeException (e );
@@ -226,35 +234,36 @@ public void testAggMetricInEsqlTSAfterDownsampling() throws Exception {
226234
227235        // Since the downsampled field (cpu) is downsampled in one index and not in the other, we want to confirm 
228236        // first that the field is unsupported and has 2 original types - double and aggregate_metric_double 
229-         try  (var  resp  = esqlCommand ("TS "  + dataStreamName  + " | KEEP @timestamp, host, cluster, cpu" )) {
237+         try  (var  resp  = esqlCommand ("TS "  + dataStreamName  + " | KEEP @timestamp, host, cluster, cpu, request " )) {
230238            var  columns  = resp .columns ();
231-             assertThat (columns , hasSize (4 ));
239+             assertThat (columns , hasSize (5 ));
232240            assertThat (
233241                resp .columns (),
234242                equalTo (
235243                    List .of (
236244                        new  ColumnInfoImpl ("@timestamp" , "date" , null ),
237245                        new  ColumnInfoImpl ("host" , "keyword" , null ),
238246                        new  ColumnInfoImpl ("cluster" , "keyword" , null ),
239-                         new  ColumnInfoImpl ("cpu" , "unsupported" , List .of ("aggregate_metric_double" , "double" ))
247+                         new  ColumnInfoImpl ("cpu" , "unsupported" , List .of ("aggregate_metric_double" , "double" )),
248+                         new  ColumnInfoImpl ("request" , "counter_double" , null )
240249                    )
241250                )
242251            );
243252        }
244253
245254        // test _over_time commands with implicit casting of aggregate_metric_double 
246-         for  (String  innerCommand  : List .of ("min_over_time" , "max_over_time" , "avg_over_time" , "count_over_time" )) {
247-             for  (String  outerCommand  : List .of ("min" , "max" , "sum" , "count" )) {
255+         for  (String  outerCommand  : List .of ("min" , "max" , "sum" , "count" )) {
256+             String  expectedType  = outerCommand .equals ("count" ) ? "long"  : "double" ;
257+             for  (String  innerCommand  : List .of ("min_over_time" , "max_over_time" , "avg_over_time" , "count_over_time" )) {
248258                String  command  = outerCommand  + " ("  + innerCommand  + "(cpu))" ;
249-                 String  expectedType  = innerCommand .equals ("count_over_time" ) || outerCommand .equals ("count" ) ? "long"  : "double" ;
250259                try  (var  resp  = esqlCommand ("TS "  + dataStreamName  + " | STATS "  + command  + " by cluster, bucket(@timestamp, 1 hour)" )) {
251260                    var  columns  = resp .columns ();
252261                    assertThat (columns , hasSize (3 ));
253262                    assertThat (
254263                        resp .columns (),
255264                        equalTo (
256265                            List .of (
257-                                 new  ColumnInfoImpl (command , expectedType , null ),
266+                                 new  ColumnInfoImpl (command , innerCommand . equals ( "count_over_time" ) ?  "long"  :  expectedType , null ),
258267                                new  ColumnInfoImpl ("cluster" , "keyword" , null ),
259268                                new  ColumnInfoImpl ("bucket(@timestamp, 1 hour)" , "date" , null )
260269                            )
@@ -263,6 +272,53 @@ public void testAggMetricInEsqlTSAfterDownsampling() throws Exception {
263272                    // TODO: verify the numbers are accurate 
264273                }
265274            }
275+             // tests on non-downsampled index 
276+             // TODO: combine with above when support for aggregate_metric_double + implicit casting is added 
277+             // TODO: add to counter tests below when support for counters is added 
278+             for  (String  innerCommand  : List .of ("first_over_time" , "last_over_time" )) {
279+                 String  command  = outerCommand  + " ("  + innerCommand  + "(cpu))" ;
280+                 try  (var  resp  = esqlCommand ("TS "  + secondIndex  + " | STATS "  + command  + " by cluster, bucket(@timestamp, 1 hour)" )) {
281+                     var  columns  = resp .columns ();
282+                     assertThat (columns , hasSize (3 ));
283+                     assertThat (
284+                         "resp is "  + resp ,
285+                         columns ,
286+                         equalTo (
287+                             List .of (
288+                                 new  ColumnInfoImpl (command , expectedType , null ),
289+                                 new  ColumnInfoImpl ("cluster" , "keyword" , null ),
290+                                 new  ColumnInfoImpl ("bucket(@timestamp, 1 hour)" , "date" , null )
291+                             )
292+                         )
293+                     );
294+                 }
295+             }
296+ 
297+             // tests on counter types 
298+             // TODO: remove hard-coded pragmas 
299+             var  ratePragmas  = new  QueryPragmas (Settings .builder ().put (QueryPragmas .MAX_CONCURRENT_SHARDS_PER_NODE .getKey (), 1 ).build ());
300+             for  (String  innerCommand  : List .of ("rate" )) {
301+                 String  command  = outerCommand  + " ("  + innerCommand  + "(request))" ;
302+                 String  esqlQuery  = "TS "  + dataStreamName  + " | STATS "  + command  + " by cluster, bucket(@timestamp, 1 hour)" ;
303+                 try  (
304+                     var  resp  = client ().execute (EsqlQueryAction .INSTANCE , new  EsqlQueryRequest ().query (esqlQuery ).pragmas (ratePragmas ))
305+                         .actionGet (30 , TimeUnit .SECONDS )
306+                 ) {
307+                     var  columns  = resp .columns ();
308+                     assertThat (columns , hasSize (3 ));
309+                     assertThat (
310+                         "resp is "  + resp ,
311+                         columns ,
312+                         equalTo (
313+                             List .of (
314+                                 new  ColumnInfoImpl (command , expectedType , null ),
315+                                 new  ColumnInfoImpl ("cluster" , "keyword" , null ),
316+                                 new  ColumnInfoImpl ("bucket(@timestamp, 1 hour)" , "date" , null )
317+                             )
318+                         )
319+                     );
320+                 }
321+             }
266322        }
267323    }
268324
0 commit comments