77
88package  org .elasticsearch .xpack .downsample ;
99
10+ import  org .elasticsearch .Build ;
1011import  org .elasticsearch .action .admin .cluster .node .capabilities .NodesCapabilitiesRequest ;
1112import  org .elasticsearch .action .admin .indices .delete .DeleteIndexRequest ;
1213import  org .elasticsearch .action .admin .indices .delete .TransportDeleteIndexAction ;
2728import  org .elasticsearch .xpack .esql .action .EsqlQueryAction ;
2829import  org .elasticsearch .xpack .esql .action .EsqlQueryRequest ;
2930import  org .elasticsearch .xpack .esql .action .EsqlQueryResponse ;
31+ import  org .elasticsearch .xpack .esql .plugin .QueryPragmas ;
3032
3133import  java .io .IOException ;
3234import  java .time .Instant ;
@@ -128,6 +130,10 @@ public void testAggMetricInEsqlTSAfterDownsampling() throws Exception {
128130                "cpu": { 
129131                  "type": "double", 
130132                  "time_series_metric": "gauge" 
133+                 }, 
134+                 "request": { 
135+                   "type": "double", 
136+                   "time_series_metric": "counter" 
131137                } 
132138              } 
133139            } 
@@ -144,6 +150,7 @@ public void testAggMetricInEsqlTSAfterDownsampling() throws Exception {
144150                    .field ("host" , randomFrom ("host1" , "host2" , "host3" ))
145151                    .field ("cluster" , randomFrom ("cluster1" , "cluster2" , "cluster3" ))
146152                    .field ("cpu" , randomDouble ())
153+                     .field ("request" , randomDoubleBetween (0 , 100 , true ))
147154                    .endObject ();
148155            } catch  (IOException  e ) {
149156                throw  new  RuntimeException (e );
@@ -155,6 +162,7 @@ public void testAggMetricInEsqlTSAfterDownsampling() throws Exception {
155162        assertAcked (client ().admin ().indices ().rolloverIndex (new  RolloverRequest (dataStreamName , null )));
156163        List <String > backingIndices  = waitForDataStreamBackingIndices (dataStreamName , 2 );
157164        String  sourceIndex  = backingIndices .get (0 );
165+         String  secondIndex  = backingIndices .get (1 );
158166        String  interval  = "5m" ;
159167        String  targetIndex  = "downsample-"  + interval  + "-"  + sourceIndex ;
160168        // Set the source index to read-only state 
@@ -211,6 +219,7 @@ public void testAggMetricInEsqlTSAfterDownsampling() throws Exception {
211219                    .field ("host" , randomFrom ("host1" , "host2" , "host3" ))
212220                    .field ("cluster" , randomFrom ("cluster1" , "cluster2" , "cluster3" ))
213221                    .field ("cpu" , randomDouble ())
222+                     .field ("request" , randomDoubleBetween (0 , 100 , true ))
214223                    .endObject ();
215224            } catch  (IOException  e ) {
216225                throw  new  RuntimeException (e );
@@ -226,35 +235,36 @@ public void testAggMetricInEsqlTSAfterDownsampling() throws Exception {
226235
227236        // Since the downsampled field (cpu) is downsampled in one index and not in the other, we want to confirm 
228237        // first that the field is unsupported and has 2 original types - double and aggregate_metric_double 
229-         try  (var  resp  = esqlCommand ("TS "  + dataStreamName  + " | KEEP @timestamp, host, cluster, cpu" )) {
238+         try  (var  resp  = esqlCommand ("TS "  + dataStreamName  + " | KEEP @timestamp, host, cluster, cpu, request " )) {
230239            var  columns  = resp .columns ();
231-             assertThat (columns , hasSize (4 ));
240+             assertThat (columns , hasSize (5 ));
232241            assertThat (
233242                resp .columns (),
234243                equalTo (
235244                    List .of (
236245                        new  ColumnInfoImpl ("@timestamp" , "date" , null ),
237246                        new  ColumnInfoImpl ("host" , "keyword" , null ),
238247                        new  ColumnInfoImpl ("cluster" , "keyword" , null ),
239-                         new  ColumnInfoImpl ("cpu" , "unsupported" , List .of ("aggregate_metric_double" , "double" ))
248+                         new  ColumnInfoImpl ("cpu" , "unsupported" , List .of ("aggregate_metric_double" , "double" )),
249+                         new  ColumnInfoImpl ("request" , "counter_double" , null )
240250                    )
241251                )
242252            );
243253        }
244254
245255        // test _over_time commands with implicit casting of aggregate_metric_double 
246-         for  (String  innerCommand  : List .of ("min_over_time" , "max_over_time" , "avg_over_time" , "count_over_time" )) {
247-             for  (String  outerCommand  : List .of ("min" , "max" , "sum" , "count" )) {
256+         for  (String  outerCommand  : List .of ("min" , "max" , "sum" , "count" )) {
257+             String  expectedType  = outerCommand .equals ("count" ) ? "long"  : "double" ;
258+             for  (String  innerCommand  : List .of ("min_over_time" , "max_over_time" , "avg_over_time" , "count_over_time" )) {
248259                String  command  = outerCommand  + " ("  + innerCommand  + "(cpu))" ;
249-                 String  expectedType  = innerCommand .equals ("count_over_time" ) || outerCommand .equals ("count" ) ? "long"  : "double" ;
250260                try  (var  resp  = esqlCommand ("TS "  + dataStreamName  + " | STATS "  + command  + " by cluster, bucket(@timestamp, 1 hour)" )) {
251261                    var  columns  = resp .columns ();
252262                    assertThat (columns , hasSize (3 ));
253263                    assertThat (
254264                        resp .columns (),
255265                        equalTo (
256266                            List .of (
257-                                 new  ColumnInfoImpl (command , expectedType , null ),
267+                                 new  ColumnInfoImpl (command , innerCommand . equals ( "count_over_time" ) ?  "long"  :  expectedType , null ),
258268                                new  ColumnInfoImpl ("cluster" , "keyword" , null ),
259269                                new  ColumnInfoImpl ("bucket(@timestamp, 1 hour)" , "date" , null )
260270                            )
@@ -263,6 +273,55 @@ public void testAggMetricInEsqlTSAfterDownsampling() throws Exception {
263273                    // TODO: verify the numbers are accurate 
264274                }
265275            }
276+             // tests on non-downsampled index 
277+             // TODO: combine with above when support for aggregate_metric_double + implicit casting is added 
278+             // TODO: add to counter tests below when support for counters is added 
279+             for  (String  innerCommand  : List .of ("first_over_time" , "last_over_time" )) {
280+                 String  command  = outerCommand  + " ("  + innerCommand  + "(cpu))" ;
281+                 try  (var  resp  = esqlCommand ("TS "  + secondIndex  + " | STATS "  + command  + " by cluster, bucket(@timestamp, 1 hour)" )) {
282+                     var  columns  = resp .columns ();
283+                     assertThat (columns , hasSize (3 ));
284+                     assertThat (
285+                         "resp is "  + resp ,
286+                         columns ,
287+                         equalTo (
288+                             List .of (
289+                                 new  ColumnInfoImpl (command , expectedType , null ),
290+                                 new  ColumnInfoImpl ("cluster" , "keyword" , null ),
291+                                 new  ColumnInfoImpl ("bucket(@timestamp, 1 hour)" , "date" , null )
292+                             )
293+                         )
294+                     );
295+                 }
296+             }
297+ 
298+             // tests on counter types 
299+             // TODO: remove hard-coded pragmas 
300+             assumeTrue ("query pragmas require snapshot build" , Build .current ().isSnapshot ());
301+             var  ratePragmas  = new  QueryPragmas (Settings .builder ().put (QueryPragmas .MAX_CONCURRENT_SHARDS_PER_NODE .getKey (), 1 ).build ());
302+ 
303+             for  (String  innerCommand  : List .of ("rate" )) {
304+                 String  command  = outerCommand  + " ("  + innerCommand  + "(request))" ;
305+                 String  esqlQuery  = "TS "  + dataStreamName  + " | STATS "  + command  + " by cluster, bucket(@timestamp, 1 hour)" ;
306+                 try  (
307+                     var  resp  = client ().execute (EsqlQueryAction .INSTANCE , new  EsqlQueryRequest ().query (esqlQuery ).pragmas (ratePragmas ))
308+                         .actionGet (30 , TimeUnit .SECONDS )
309+                 ) {
310+                     var  columns  = resp .columns ();
311+                     assertThat (columns , hasSize (3 ));
312+                     assertThat (
313+                         "resp is "  + resp ,
314+                         columns ,
315+                         equalTo (
316+                             List .of (
317+                                 new  ColumnInfoImpl (command , expectedType , null ),
318+                                 new  ColumnInfoImpl ("cluster" , "keyword" , null ),
319+                                 new  ColumnInfoImpl ("bucket(@timestamp, 1 hour)" , "date" , null )
320+                             )
321+                         )
322+                     );
323+                 }
324+             }
266325        }
267326    }
268327
0 commit comments