1919import org .elasticsearch .xpack .esql .qa .rest .generative .command .pipe .KeepGenerator ;
2020import org .elasticsearch .xpack .esql .qa .rest .generative .command .pipe .LimitGenerator ;
2121import org .elasticsearch .xpack .esql .qa .rest .generative .command .pipe .LookupJoinGenerator ;
22+ import org .elasticsearch .xpack .esql .qa .rest .generative .command .pipe .MetricsStatsGenerator ;
2223import org .elasticsearch .xpack .esql .qa .rest .generative .command .pipe .MvExpandGenerator ;
2324import org .elasticsearch .xpack .esql .qa .rest .generative .command .pipe .RenameGenerator ;
2425import org .elasticsearch .xpack .esql .qa .rest .generative .command .pipe .SortGenerator ;
2526import org .elasticsearch .xpack .esql .qa .rest .generative .command .pipe .StatsGenerator ;
2627import org .elasticsearch .xpack .esql .qa .rest .generative .command .pipe .WhereGenerator ;
2728import org .elasticsearch .xpack .esql .qa .rest .generative .command .source .FromGenerator ;
29+ import org .elasticsearch .xpack .esql .qa .rest .generative .command .source .MetricGenerator ;
2830
2931import java .util .List ;
3032import java .util .Set ;
@@ -51,6 +53,11 @@ public record QueryExecuted(String query, int depth, List<Column> outputSchema,
5153 */
5254 static List <CommandGenerator > SOURCE_COMMANDS = List .of (FromGenerator .INSTANCE );
5355
56+ /**
57+ * Commands at the beginning of queries that begin queries on time series indices, eg. TS
58+ */
59+ static List <CommandGenerator > TIME_SERIES_SOURCE_COMMANDS = List .of (MetricGenerator .INSTANCE );
60+
5461 /**
5562 * These are downstream commands, ie. that cannot appear as the first command in a query
5663 */
@@ -72,14 +79,42 @@ public record QueryExecuted(String query, int depth, List<Column> outputSchema,
7279 WhereGenerator .INSTANCE
7380 );
7481
82+ static List <CommandGenerator > TIME_SERIES_PIPE_COMMANDS = List .of (
83+ ChangePointGenerator .INSTANCE ,
84+ DissectGenerator .INSTANCE ,
85+ DropGenerator .INSTANCE ,
86+ EnrichGenerator .INSTANCE ,
87+ EvalGenerator .INSTANCE ,
88+ ForkGenerator .INSTANCE ,
89+ GrokGenerator .INSTANCE ,
90+ KeepGenerator .INSTANCE ,
91+ LimitGenerator .INSTANCE ,
92+ LookupJoinGenerator .INSTANCE ,
93+ MetricsStatsGenerator .INSTANCE ,
94+ MvExpandGenerator .INSTANCE ,
95+ RenameGenerator .INSTANCE ,
96+ SortGenerator .INSTANCE ,
97+ StatsGenerator .INSTANCE ,
98+ WhereGenerator .INSTANCE
99+ );
100+
75101 public static CommandGenerator sourceCommand () {
76102 return randomFrom (SOURCE_COMMANDS );
77103 }
78104
105+ public static CommandGenerator timeSeriesSourceCommand () {
106+ return randomFrom (TIME_SERIES_SOURCE_COMMANDS );
107+ }
108+
79109 public static CommandGenerator randomPipeCommandGenerator () {
80110 return randomFrom (PIPE_COMMANDS );
81111 }
82112
113+ public static CommandGenerator randomMetricsPipeCommandGenerator () {
114+ // todo better way
115+ return randomFrom (TIME_SERIES_PIPE_COMMANDS );
116+ }
117+
83118 public interface Executor {
84119 void run (CommandGenerator generator , CommandGenerator .CommandDescription current );
85120
@@ -95,7 +130,8 @@ public static void generatePipeline(
95130 final int depth ,
96131 CommandGenerator commandGenerator ,
97132 final CommandGenerator .QuerySchema schema ,
98- Executor executor
133+ Executor executor ,
134+ boolean isTimeSeries
99135 ) {
100136 CommandGenerator .CommandDescription desc = commandGenerator .generate (List .of (), List .of (), schema );
101137 executor .run (commandGenerator , desc );
@@ -107,7 +143,7 @@ public static void generatePipeline(
107143 if (executor .currentSchema ().isEmpty ()) {
108144 break ;
109145 }
110- commandGenerator = EsqlQueryGenerator .randomPipeCommandGenerator ();
146+ commandGenerator = isTimeSeries ? randomMetricsPipeCommandGenerator () : EsqlQueryGenerator .randomPipeCommandGenerator ();
111147 desc = commandGenerator .generate (executor .previousCommands (), executor .currentSchema (), schema );
112148 if (desc == CommandGenerator .EMPTY_DESCRIPTION ) {
113149 continue ;
@@ -217,6 +253,61 @@ public static boolean sortable(Column col) {
217253 || col .type .equals ("version" );
218254 }
219255
256+ public static String metricsAgg (List <Column > previousOutput ) {
257+ String outerCommand = randomFrom ("min" , "max" , "sum" , "count" , "avg" );
258+ String innerCommand = switch (randomIntBetween (0 , 3 )) {
259+ case 0 -> {
260+ // input can be numerics + aggregate_metric_double
261+ String numericPlusAggMetricFieldName = randomMetricsNumericField (previousOutput );
262+ if (numericPlusAggMetricFieldName == null ) {
263+ yield null ;
264+ }
265+ yield switch ((randomIntBetween (0 , 3 ))) {
266+ case 0 -> "max_over_time(" + numericPlusAggMetricFieldName + ")" ;
267+ case 1 -> "min_over_time(" + numericPlusAggMetricFieldName + ")" ;
268+ case 2 -> "sum_over_time(" + numericPlusAggMetricFieldName + ")" ;
269+ default -> "avg_over_time(" + numericPlusAggMetricFieldName + ")" ;
270+ };
271+ }
272+ case 1 -> {
273+ // input can be a counter
274+ String counterField = randomCounterField (previousOutput );
275+ if (counterField == null ) {
276+ yield null ;
277+ }
278+ yield "rate(" + counterField + ")" ;
279+ }
280+ case 2 -> {
281+ // numerics except aggregate_metric_double
282+ // TODO: move to case 0 when support for aggregate_metric_double is added to these functions
283+ String numericFieldName = randomNumericField (previousOutput );
284+ if (numericFieldName == null ) {
285+ yield null ;
286+ }
287+ yield (randomBoolean () ? "first_over_time(" : "last_over_time(" ) + numericFieldName + ")" ;
288+ }
289+ default -> {
290+ // TODO: add other types that count_over_time supports
291+ String otherFieldName = randomBoolean () ? randomStringField (previousOutput ) : randomNumericOrDateField (previousOutput );
292+ if (otherFieldName == null ) {
293+ yield null ;
294+ }
295+ if (randomBoolean ()) {
296+ yield "count_over_time(" + otherFieldName + ")" ;
297+ } else {
298+ yield "count_distinct_over_time(" + otherFieldName + ")" ;
299+ // TODO: replace with the below
300+ // yield "count_distinct_over_time(" + otherFieldName + (randomBoolean() ? ", " + randomNonNegativeInt() : "") + ")";
301+ }
302+ }
303+ };
304+ if (innerCommand == null ) {
305+ // TODO: figure out a default that maybe makes more sense than using a timestamp field
306+ innerCommand = "count_over_time(" + randomDateField (previousOutput ) + ")" ;
307+ }
308+ return outerCommand + "(" + innerCommand + ")" ;
309+ }
310+
220311 public static String agg (List <Column > previousOutput ) {
221312 String name = randomNumericOrDateField (previousOutput );
222313 if (name != null && randomBoolean ()) {
@@ -251,6 +342,30 @@ public static String randomNumericField(List<Column> previousOutput) {
251342 return randomName (previousOutput , Set .of ("long" , "integer" , "double" ));
252343 }
253344
345+ public static String randomMetricsNumericField (List <Column > previousOutput ) {
346+ Set <String > allowedTypes = Set .of ("double" , "long" , "unsigned_long" , "integer" , "aggregate_metric_double" );
347+ List <String > items = previousOutput .stream ()
348+ .filter (
349+ x -> allowedTypes .contains (x .type ())
350+ || (x .type ().equals ("unsupported" ) && canBeCastedToAggregateMetricDouble (x .originalTypes ()))
351+ )
352+ .map (Column ::name )
353+ .toList ();
354+ if (items .isEmpty ()) {
355+ return null ;
356+ }
357+ return items .get (randomIntBetween (0 , items .size () - 1 ));
358+ }
359+
360+ public static String randomCounterField (List <Column > previousOutput ) {
361+ return randomName (previousOutput , Set .of ("counter_long" , "counter_double" , "counter_integer" ));
362+ }
363+
364+ private static boolean canBeCastedToAggregateMetricDouble (List <String > types ) {
365+ return types .contains ("aggregate_metric_double" )
366+ && Set .of ("double" , "long" , "unsigned_long" , "integer" , "aggregate_metric_double" ).containsAll (types );
367+ }
368+
254369 public static String randomStringField (List <Column > previousOutput ) {
255370 return randomName (previousOutput , Set .of ("text" , "keyword" ));
256371 }
0 commit comments