2727import org .elasticsearch .xpack .esql .action .EsqlQueryAction ;
2828import org .elasticsearch .xpack .esql .action .EsqlQueryRequest ;
2929import org .elasticsearch .xpack .esql .action .EsqlQueryResponse ;
30+ import org .elasticsearch .xpack .esql .plugin .QueryPragmas ;
3031
3132import java .io .IOException ;
3233import java .time .Instant ;
@@ -128,6 +129,10 @@ public void testAggMetricInEsqlTSAfterDownsampling() throws Exception {
128129 "cpu": {
129130 "type": "double",
130131 "time_series_metric": "gauge"
132+ },
133+ "request": {
134+ "type": "double",
135+ "time_series_metric": "counter"
131136 }
132137 }
133138 }
@@ -144,6 +149,7 @@ public void testAggMetricInEsqlTSAfterDownsampling() throws Exception {
144149 .field ("host" , randomFrom ("host1" , "host2" , "host3" ))
145150 .field ("cluster" , randomFrom ("cluster1" , "cluster2" , "cluster3" ))
146151 .field ("cpu" , randomDouble ())
152+ .field ("request" , randomDoubleBetween (0 , 100 , true ))
147153 .endObject ();
148154 } catch (IOException e ) {
149155 throw new RuntimeException (e );
@@ -155,6 +161,7 @@ public void testAggMetricInEsqlTSAfterDownsampling() throws Exception {
155161 assertAcked (client ().admin ().indices ().rolloverIndex (new RolloverRequest (dataStreamName , null )));
156162 List <String > backingIndices = waitForDataStreamBackingIndices (dataStreamName , 2 );
157163 String sourceIndex = backingIndices .get (0 );
164+ String secondIndex = backingIndices .get (1 );
158165 String interval = "5m" ;
159166 String targetIndex = "downsample-" + interval + "-" + sourceIndex ;
160167 // Set the source index to read-only state
@@ -211,6 +218,7 @@ public void testAggMetricInEsqlTSAfterDownsampling() throws Exception {
211218 .field ("host" , randomFrom ("host1" , "host2" , "host3" ))
212219 .field ("cluster" , randomFrom ("cluster1" , "cluster2" , "cluster3" ))
213220 .field ("cpu" , randomDouble ())
221+ .field ("request" , randomDoubleBetween (0 , 100 , true ))
214222 .endObject ();
215223 } catch (IOException e ) {
216224 throw new RuntimeException (e );
@@ -226,35 +234,36 @@ public void testAggMetricInEsqlTSAfterDownsampling() throws Exception {
226234
227235 // Since the downsampled field (cpu) is downsampled in one index and not in the other, we want to confirm
228236 // first that the field is unsupported and has 2 original types - double and aggregate_metric_double
229- try (var resp = esqlCommand ("TS " + dataStreamName + " | KEEP @timestamp, host, cluster, cpu" )) {
237+ try (var resp = esqlCommand ("TS " + dataStreamName + " | KEEP @timestamp, host, cluster, cpu, request " )) {
230238 var columns = resp .columns ();
231- assertThat (columns , hasSize (4 ));
239+ assertThat (columns , hasSize (5 ));
232240 assertThat (
233241 resp .columns (),
234242 equalTo (
235243 List .of (
236244 new ColumnInfoImpl ("@timestamp" , "date" , null ),
237245 new ColumnInfoImpl ("host" , "keyword" , null ),
238246 new ColumnInfoImpl ("cluster" , "keyword" , null ),
239- new ColumnInfoImpl ("cpu" , "unsupported" , List .of ("aggregate_metric_double" , "double" ))
247+ new ColumnInfoImpl ("cpu" , "unsupported" , List .of ("aggregate_metric_double" , "double" )),
248+ new ColumnInfoImpl ("request" , "counter_double" , null )
240249 )
241250 )
242251 );
243252 }
244253
245254 // test _over_time commands with implicit casting of aggregate_metric_double
246- for (String innerCommand : List .of ("min_over_time" , "max_over_time" , "avg_over_time" , "count_over_time" )) {
247- for (String outerCommand : List .of ("min" , "max" , "sum" , "count" )) {
255+ for (String outerCommand : List .of ("min" , "max" , "sum" , "count" )) {
256+ String expectedType = outerCommand .equals ("count" ) ? "long" : "double" ;
257+ for (String innerCommand : List .of ("min_over_time" , "max_over_time" , "avg_over_time" , "count_over_time" )) {
248258 String command = outerCommand + " (" + innerCommand + "(cpu))" ;
249- String expectedType = innerCommand .equals ("count_over_time" ) || outerCommand .equals ("count" ) ? "long" : "double" ;
250259 try (var resp = esqlCommand ("TS " + dataStreamName + " | STATS " + command + " by cluster, bucket(@timestamp, 1 hour)" )) {
251260 var columns = resp .columns ();
252261 assertThat (columns , hasSize (3 ));
253262 assertThat (
254263 resp .columns (),
255264 equalTo (
256265 List .of (
257- new ColumnInfoImpl (command , expectedType , null ),
266+ new ColumnInfoImpl (command , innerCommand . equals ( "count_over_time" ) ? "long" : expectedType , null ),
258267 new ColumnInfoImpl ("cluster" , "keyword" , null ),
259268 new ColumnInfoImpl ("bucket(@timestamp, 1 hour)" , "date" , null )
260269 )
@@ -263,6 +272,53 @@ public void testAggMetricInEsqlTSAfterDownsampling() throws Exception {
263272 // TODO: verify the numbers are accurate
264273 }
265274 }
275+ // tests on non-downsampled index
276+ // TODO: combine with above when support for aggregate_metric_double + implicit casting is added
277+ // TODO: add to counter tests below when support for counters is added
278+ for (String innerCommand : List .of ("first_over_time" , "last_over_time" )) {
279+ String command = outerCommand + " (" + innerCommand + "(cpu))" ;
280+ try (var resp = esqlCommand ("TS " + secondIndex + " | STATS " + command + " by cluster, bucket(@timestamp, 1 hour)" )) {
281+ var columns = resp .columns ();
282+ assertThat (columns , hasSize (3 ));
283+ assertThat (
284+ "resp is " + resp ,
285+ columns ,
286+ equalTo (
287+ List .of (
288+ new ColumnInfoImpl (command , expectedType , null ),
289+ new ColumnInfoImpl ("cluster" , "keyword" , null ),
290+ new ColumnInfoImpl ("bucket(@timestamp, 1 hour)" , "date" , null )
291+ )
292+ )
293+ );
294+ }
295+ }
296+
297+ // tests on counter types
298+ // TODO: remove hard-coded pragmas
299+ var ratePragmas = new QueryPragmas (Settings .builder ().put (QueryPragmas .MAX_CONCURRENT_SHARDS_PER_NODE .getKey (), 1 ).build ());
300+ for (String innerCommand : List .of ("rate" )) {
301+ String command = outerCommand + " (" + innerCommand + "(request))" ;
302+ String esqlQuery = "TS " + dataStreamName + " | STATS " + command + " by cluster, bucket(@timestamp, 1 hour)" ;
303+ try (
304+ var resp = client ().execute (EsqlQueryAction .INSTANCE , new EsqlQueryRequest ().query (esqlQuery ).pragmas (ratePragmas ))
305+ .actionGet (30 , TimeUnit .SECONDS )
306+ ) {
307+ var columns = resp .columns ();
308+ assertThat (columns , hasSize (3 ));
309+ assertThat (
310+ "resp is " + resp ,
311+ columns ,
312+ equalTo (
313+ List .of (
314+ new ColumnInfoImpl (command , expectedType , null ),
315+ new ColumnInfoImpl ("cluster" , "keyword" , null ),
316+ new ColumnInfoImpl ("bucket(@timestamp, 1 hour)" , "date" , null )
317+ )
318+ )
319+ );
320+ }
321+ }
266322 }
267323 }
268324
0 commit comments