77
88package org .elasticsearch .xpack .downsample ;
99
10+ import org .elasticsearch .Build ;
1011import org .elasticsearch .action .admin .cluster .node .capabilities .NodesCapabilitiesRequest ;
1112import org .elasticsearch .action .admin .indices .delete .DeleteIndexRequest ;
1213import org .elasticsearch .action .admin .indices .delete .TransportDeleteIndexAction ;
2728import org .elasticsearch .xpack .esql .action .EsqlQueryAction ;
2829import org .elasticsearch .xpack .esql .action .EsqlQueryRequest ;
2930import org .elasticsearch .xpack .esql .action .EsqlQueryResponse ;
31+ import org .elasticsearch .xpack .esql .plugin .QueryPragmas ;
3032
3133import java .io .IOException ;
3234import java .time .Instant ;
@@ -128,6 +130,10 @@ public void testAggMetricInEsqlTSAfterDownsampling() throws Exception {
128130 "cpu": {
129131 "type": "double",
130132 "time_series_metric": "gauge"
133+ },
134+ "request": {
135+ "type": "double",
136+ "time_series_metric": "counter"
131137 }
132138 }
133139 }
@@ -144,6 +150,7 @@ public void testAggMetricInEsqlTSAfterDownsampling() throws Exception {
144150 .field ("host" , randomFrom ("host1" , "host2" , "host3" ))
145151 .field ("cluster" , randomFrom ("cluster1" , "cluster2" , "cluster3" ))
146152 .field ("cpu" , randomDouble ())
153+ .field ("request" , randomDoubleBetween (0 , 100 , true ))
147154 .endObject ();
148155 } catch (IOException e ) {
149156 throw new RuntimeException (e );
@@ -155,6 +162,7 @@ public void testAggMetricInEsqlTSAfterDownsampling() throws Exception {
155162 assertAcked (client ().admin ().indices ().rolloverIndex (new RolloverRequest (dataStreamName , null )));
156163 List <String > backingIndices = waitForDataStreamBackingIndices (dataStreamName , 2 );
157164 String sourceIndex = backingIndices .get (0 );
165+ String secondIndex = backingIndices .get (1 );
158166 String interval = "5m" ;
159167 String targetIndex = "downsample-" + interval + "-" + sourceIndex ;
160168 // Set the source index to read-only state
@@ -211,6 +219,7 @@ public void testAggMetricInEsqlTSAfterDownsampling() throws Exception {
211219 .field ("host" , randomFrom ("host1" , "host2" , "host3" ))
212220 .field ("cluster" , randomFrom ("cluster1" , "cluster2" , "cluster3" ))
213221 .field ("cpu" , randomDouble ())
222+ .field ("request" , randomDoubleBetween (0 , 100 , true ))
214223 .endObject ();
215224 } catch (IOException e ) {
216225 throw new RuntimeException (e );
@@ -226,35 +235,36 @@ public void testAggMetricInEsqlTSAfterDownsampling() throws Exception {
226235
227236 // Since the downsampled field (cpu) is downsampled in one index and not in the other, we want to confirm
228237 // first that the field is unsupported and has 2 original types - double and aggregate_metric_double
229- try (var resp = esqlCommand ("TS " + dataStreamName + " | KEEP @timestamp, host, cluster, cpu" )) {
238+ try (var resp = esqlCommand ("TS " + dataStreamName + " | KEEP @timestamp, host, cluster, cpu, request " )) {
230239 var columns = resp .columns ();
231- assertThat (columns , hasSize (4 ));
240+ assertThat (columns , hasSize (5 ));
232241 assertThat (
233242 resp .columns (),
234243 equalTo (
235244 List .of (
236245 new ColumnInfoImpl ("@timestamp" , "date" , null ),
237246 new ColumnInfoImpl ("host" , "keyword" , null ),
238247 new ColumnInfoImpl ("cluster" , "keyword" , null ),
239- new ColumnInfoImpl ("cpu" , "unsupported" , List .of ("aggregate_metric_double" , "double" ))
248+ new ColumnInfoImpl ("cpu" , "unsupported" , List .of ("aggregate_metric_double" , "double" )),
249+ new ColumnInfoImpl ("request" , "counter_double" , null )
240250 )
241251 )
242252 );
243253 }
244254
245255 // test _over_time commands with implicit casting of aggregate_metric_double
246- for (String innerCommand : List .of ("min_over_time" , "max_over_time" , "avg_over_time" , "count_over_time" )) {
247- for (String outerCommand : List .of ("min" , "max" , "sum" , "count" )) {
256+ for (String outerCommand : List .of ("min" , "max" , "sum" , "count" )) {
257+ String expectedType = outerCommand .equals ("count" ) ? "long" : "double" ;
258+ for (String innerCommand : List .of ("min_over_time" , "max_over_time" , "avg_over_time" , "count_over_time" )) {
248259 String command = outerCommand + " (" + innerCommand + "(cpu))" ;
249- String expectedType = innerCommand .equals ("count_over_time" ) || outerCommand .equals ("count" ) ? "long" : "double" ;
250260 try (var resp = esqlCommand ("TS " + dataStreamName + " | STATS " + command + " by cluster, bucket(@timestamp, 1 hour)" )) {
251261 var columns = resp .columns ();
252262 assertThat (columns , hasSize (3 ));
253263 assertThat (
254264 resp .columns (),
255265 equalTo (
256266 List .of (
257- new ColumnInfoImpl (command , expectedType , null ),
267+ new ColumnInfoImpl (command , innerCommand . equals ( "count_over_time" ) ? "long" : expectedType , null ),
258268 new ColumnInfoImpl ("cluster" , "keyword" , null ),
259269 new ColumnInfoImpl ("bucket(@timestamp, 1 hour)" , "date" , null )
260270 )
@@ -263,6 +273,55 @@ public void testAggMetricInEsqlTSAfterDownsampling() throws Exception {
263273 // TODO: verify the numbers are accurate
264274 }
265275 }
276+ // tests on non-downsampled index
277+ // TODO: combine with above when support for aggregate_metric_double + implicit casting is added
278+ // TODO: add to counter tests below when support for counters is added
279+ for (String innerCommand : List .of ("first_over_time" , "last_over_time" )) {
280+ String command = outerCommand + " (" + innerCommand + "(cpu))" ;
281+ try (var resp = esqlCommand ("TS " + secondIndex + " | STATS " + command + " by cluster, bucket(@timestamp, 1 hour)" )) {
282+ var columns = resp .columns ();
283+ assertThat (columns , hasSize (3 ));
284+ assertThat (
285+ "resp is " + resp ,
286+ columns ,
287+ equalTo (
288+ List .of (
289+ new ColumnInfoImpl (command , expectedType , null ),
290+ new ColumnInfoImpl ("cluster" , "keyword" , null ),
291+ new ColumnInfoImpl ("bucket(@timestamp, 1 hour)" , "date" , null )
292+ )
293+ )
294+ );
295+ }
296+ }
297+
298+ // tests on counter types
299+ // TODO: remove hard-coded pragmas
300+ assumeTrue ("query pragmas require snapshot build" , Build .current ().isSnapshot ());
301+ var ratePragmas = new QueryPragmas (Settings .builder ().put (QueryPragmas .MAX_CONCURRENT_SHARDS_PER_NODE .getKey (), 1 ).build ());
302+
303+ for (String innerCommand : List .of ("rate" )) {
304+ String command = outerCommand + " (" + innerCommand + "(request))" ;
305+ String esqlQuery = "TS " + dataStreamName + " | STATS " + command + " by cluster, bucket(@timestamp, 1 hour)" ;
306+ try (
307+ var resp = client ().execute (EsqlQueryAction .INSTANCE , new EsqlQueryRequest ().query (esqlQuery ).pragmas (ratePragmas ))
308+ .actionGet (30 , TimeUnit .SECONDS )
309+ ) {
310+ var columns = resp .columns ();
311+ assertThat (columns , hasSize (3 ));
312+ assertThat (
313+ "resp is " + resp ,
314+ columns ,
315+ equalTo (
316+ List .of (
317+ new ColumnInfoImpl (command , expectedType , null ),
318+ new ColumnInfoImpl ("cluster" , "keyword" , null ),
319+ new ColumnInfoImpl ("bucket(@timestamp, 1 hour)" , "date" , null )
320+ )
321+ )
322+ );
323+ }
324+ }
266325 }
267326 }
268327
0 commit comments