From 699e7554ed4f5978c78f2ffefe365b4a977da8cf Mon Sep 17 00:00:00 2001 From: Pablo Date: Tue, 24 Jun 2025 11:25:31 -0700 Subject: [PATCH 01/11] Working on rate integration test --- .../xpack/esql/action/TimeSeriesRateIT.java | 191 ++++++++++++++++++ 1 file changed, 191 insertions(+) create mode 100644 x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeSeriesRateIT.java diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeSeriesRateIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeSeriesRateIT.java new file mode 100644 index 0000000000000..a1ec5c687699b --- /dev/null +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeSeriesRateIT.java @@ -0,0 +1,191 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.action; + +import org.elasticsearch.Build; +import org.elasticsearch.common.Randomness; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.xpack.esql.EsqlTestUtils; +import org.junit.Before; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.elasticsearch.index.mapper.DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER; +import static org.hamcrest.Matchers.closeTo; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; + +public class TimeSeriesRateIT extends AbstractEsqlIntegTestCase { + + @Override + protected EsqlQueryResponse run(EsqlQueryRequest request) { + assumeTrue("time series available in snapshot builds only", Build.current().isSnapshot()); + return super.run(request); + } + + record Doc(String host, String cluster, long timestamp, int requestCount, double cpu) {} + + final List docs = new ArrayList<>(); + + final Map hostToClusters = new HashMap<>(); + final Map hostToRate = new HashMap<>(); + final Map hostToCpu = new HashMap<>(); + + @Before + public void populateIndex() { + // this can be expensive, do one + Settings settings = Settings.builder().put("mode", "time_series").putList("routing_path", List.of("host", "cluster")).build(); + client().admin() + .indices() + .prepareCreate("hosts") + .setSettings(settings) + .setMapping( + "@timestamp", + "type=date", + "host", + "type=keyword,time_series_dimension=true", + "cluster", + "type=keyword,time_series_dimension=true", + "cpu", + "type=double,time_series_metric=gauge", + "request_count", + "type=integer,time_series_metric=counter" + ) + .get(); + final Map requestCounts = new HashMap<>(); + for (int i = 0; i < 5; i++) { + hostToClusters.put("p" + i, randomFrom("qa", "prod")); + hostToRate.put("p" + i, randomIntBetween(0, 50)); + requestCounts.put("p" + i, randomIntBetween(0, 100)); + hostToCpu.put("p" + i, randomIntBetween(0, 100)); + } + long timestamp = DEFAULT_DATE_TIME_FORMATTER.parseMillis("2024-04-15T00:00:00Z"); + int numDocs = between(60, 100); + docs.clear(); + + for (int i = 0; i < numDocs; i++) { + List hosts = randomSubsetOf(between(hostToClusters.size() - 1, hostToClusters.size()), hostToClusters.keySet()); + final var tsChange = between(1, 10); + timestamp += tsChange * 1000L; + for (String host : hosts) { + var requestCount = requestCounts.compute(host, (k, curr) -> { + // 20% chance of reset + if (randomInt(100) <= 20) { + return hostToRate.get(k) * tsChange; + } else { + return curr == null ? 0 : curr + hostToRate.get(k) * tsChange; + } + }); + docs.add(new Doc(host, hostToClusters.get(host), timestamp, requestCount, hostToCpu.get(host))); + } + } + Randomness.shuffle(docs); + for (Doc doc : docs) { + client().prepareIndex("hosts") + .setSource( + "@timestamp", + doc.timestamp, + "host", + doc.host, + "cluster", + doc.cluster, + "cpu", + doc.cpu, + "request_count", + doc.requestCount + ) + .get(); + } + client().admin().indices().prepareRefresh("hosts").get(); + } + + private String hostTable() { + StringBuilder sb = new StringBuilder(); + for (String host : hostToClusters.keySet()) { + sb.append(host).append(" -> ").append(hostToClusters.get(host)).append(", rate=").append(hostToRate.get(host)).append(", cpu=").append(hostToCpu.get(host)).append("\n"); + } + // Now we add total rate and total CPU used: + sb.append("Total rate: ").append(hostToRate.values().stream().mapToInt(a -> a).sum()).append("\n"); + sb.append("Total CPU: ").append(hostToCpu.values().stream().mapToInt(a -> a).sum()).append("\n"); + return sb.toString(); + } + + private String valuesTable(List> values) { + StringBuilder sb = new StringBuilder(); + for (List row : values) { + sb.append(row).append("\n"); + } + return sb.toString(); + } + + public void testRateWithTimeBucket() { + var limit = 5; + try (var resp = run("TS hosts | STATS sum(rate(request_count)) BY ts=bucket(@timestamp, 1 minute) | SORT ts | LIMIT " + limit)) { + try { + assertThat( + resp.columns(), + equalTo(List.of(new ColumnInfoImpl("sum(rate(request_count))", "double", null), new ColumnInfoImpl("ts", "date", null))) + ); + List> values = EsqlTestUtils.getValuesList(resp); + assertThat(values, hasSize(limit)); + for (int i = 0; i < limit; i++) { + List row = values.get(i); + assertThat(row, hasSize(2)); + Double bucketValues = hostToRate.values().stream().mapToDouble(a -> a + 0.0).sum(); + assertThat((double) row.get(0), equalTo(bucketValues)); + } + } catch (AssertionError e) { + throw new AssertionError("Values:\n" + valuesTable(EsqlTestUtils.getValuesList(resp)) + "\n Hosts:\n" + hostTable(), e); + } + } + try (var resp = run("TS hosts | STATS avg(rate(request_count)) BY ts=bucket(@timestamp, 1minute) | SORT ts | LIMIT 5")) { + assertThat( + resp.columns(), + equalTo(List.of(new ColumnInfoImpl("avg(rate(request_count))", "double", null), new ColumnInfoImpl("ts", "date", null))) + ); + List> values = EsqlTestUtils.getValuesList(resp); + assertThat(values, hasSize(5)); + for (int i = 0; i < 5; i++) { + List row = values.get(i); + assertThat(row, hasSize(2)); + Double bucketValues = hostToRate.values().stream().mapToDouble(a -> a + 0.0).sum() / hostToRate.size(); + assertThat((double) row.get(0), equalTo(bucketValues)); + } + } + try (var resp = run(""" + TS hosts + | STATS avg(rate(request_count)) BY ts=bucket(@timestamp, 1minute) + | SORT ts + | LIMIT """ + limit)) { + try { + assertThat( + resp.columns(), + equalTo( + List.of( + new ColumnInfoImpl("avg(rate(request_count))", "double", null), + new ColumnInfoImpl("ts", "date", null) + ) + ) + ); + List> values = EsqlTestUtils.getValuesList(resp); + assertThat(values, hasSize(limit)); + for (int i = 0; i < limit; i++) { + List row = values.get(i); + assertThat(row, hasSize(3)); + double avg = hostToRate.values().stream().mapToDouble(d -> d).sum() / hostToRate.size(); + assertThat((double) row.get(0), closeTo(avg, 0.1)); + } + } catch (AssertionError e) { + throw new AssertionError("Values:\n" + valuesTable(EsqlTestUtils.getValuesList(resp)) + "\n Hosts:\n" + hostTable(), e); + } + } + } +} From 8fae5ab53e62b7e06458d7b9553eaada44c36c02 Mon Sep 17 00:00:00 2001 From: Pablo Date: Tue, 24 Jun 2025 16:55:37 -0700 Subject: [PATCH 02/11] Sketch of new rate tests. Unsure of the source of variation >10pct --- .../xpack/esql/action/TimeSeriesIT.java | 316 ------------------ .../xpack/esql/action/TimeSeriesRateIT.java | 276 +++++++++++++-- 2 files changed, 250 insertions(+), 342 deletions(-) diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeSeriesIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeSeriesIT.java index 09e04bfaa742a..e84a75475e76a 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeSeriesIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeSeriesIT.java @@ -9,17 +9,14 @@ import org.elasticsearch.Build; import org.elasticsearch.common.Randomness; -import org.elasticsearch.common.Rounding; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.compute.lucene.TimeSeriesSourceOperator; import org.elasticsearch.compute.operator.DriverProfile; import org.elasticsearch.compute.operator.TimeSeriesAggregationOperator; -import org.elasticsearch.core.TimeValue; import org.elasticsearch.xpack.esql.EsqlTestUtils; import org.elasticsearch.xpack.esql.core.type.DataType; import org.junit.Before; -import java.time.ZoneOffset; import java.util.ArrayList; import java.util.Comparator; import java.util.HashMap; @@ -321,319 +318,6 @@ record RateKey(String cluster, String host) { } } - @AwaitsFix(bugUrl = "removed?") - public void testRateWithTimeBucket() { - var rounding = new Rounding.Builder(TimeValue.timeValueSeconds(60)).timeZone(ZoneOffset.UTC).build().prepareForUnknown(); - record RateKey(String host, String cluster, long interval) {} - Map> groups = new HashMap<>(); - for (Doc doc : docs) { - RateKey key = new RateKey(doc.host, doc.cluster, rounding.round(doc.timestamp)); - groups.computeIfAbsent(key, k -> new ArrayList<>()).add(new RequestCounter(doc.timestamp, doc.requestCount)); - } - Map> bucketToRates = new HashMap<>(); - for (Map.Entry> e : groups.entrySet()) { - List values = bucketToRates.computeIfAbsent(e.getKey().interval, k -> new ArrayList<>()); - Double rate = computeRate(e.getValue()); - if (rate != null) { - values.add(rate); - } - } - List sortedKeys = bucketToRates.keySet().stream().sorted().limit(5).toList(); - try (var resp = run("TS hosts | STATS sum(rate(request_count)) BY ts=bucket(@timestamp, 1 minute) | SORT ts | LIMIT 5")) { - assertThat( - resp.columns(), - equalTo(List.of(new ColumnInfoImpl("sum(rate(request_count))", "double", null), new ColumnInfoImpl("ts", "date", null))) - ); - List> values = EsqlTestUtils.getValuesList(resp); - assertThat(values, hasSize(sortedKeys.size())); - for (int i = 0; i < sortedKeys.size(); i++) { - List row = values.get(i); - assertThat(row, hasSize(2)); - long key = sortedKeys.get(i); - assertThat(row.get(1), equalTo(DEFAULT_DATE_TIME_FORMATTER.formatMillis(key))); - List bucketValues = bucketToRates.get(key); - if (bucketValues.isEmpty()) { - assertNull(row.get(0)); - } else { - assertThat((double) row.get(0), closeTo(bucketValues.stream().mapToDouble(d -> d).sum(), 0.1)); - } - } - } - try (var resp = run("TS hosts | STATS avg(rate(request_count)) BY ts=bucket(@timestamp, 1minute) | SORT ts | LIMIT 5")) { - assertThat( - resp.columns(), - equalTo(List.of(new ColumnInfoImpl("avg(rate(request_count))", "double", null), new ColumnInfoImpl("ts", "date", null))) - ); - List> values = EsqlTestUtils.getValuesList(resp); - assertThat(values, hasSize(sortedKeys.size())); - for (int i = 0; i < sortedKeys.size(); i++) { - List row = values.get(i); - assertThat(row, hasSize(2)); - long key = sortedKeys.get(i); - assertThat(row.get(1), equalTo(DEFAULT_DATE_TIME_FORMATTER.formatMillis(key))); - List bucketValues = bucketToRates.get(key); - if (bucketValues.isEmpty()) { - assertNull(row.get(0)); - } else { - double avg = bucketValues.stream().mapToDouble(d -> d).sum() / bucketValues.size(); - assertThat((double) row.get(0), closeTo(avg, 0.1)); - } - } - } - try (var resp = run(""" - TS hosts - | STATS avg(rate(request_count)), avg(rate(request_count)) BY ts=bucket(@timestamp, 1minute) - | SORT ts - | LIMIT 5 - """)) { - assertThat( - resp.columns(), - equalTo( - List.of( - new ColumnInfoImpl("avg(rate(request_count))", "double", null), - new ColumnInfoImpl("avg(rate(request_count))", "double", null), - new ColumnInfoImpl("ts", "date", null) - ) - ) - ); - List> values = EsqlTestUtils.getValuesList(resp); - assertThat(values, hasSize(sortedKeys.size())); - for (int i = 0; i < sortedKeys.size(); i++) { - List row = values.get(i); - assertThat(row, hasSize(3)); - long key = sortedKeys.get(i); - assertThat(row.get(2), equalTo(DEFAULT_DATE_TIME_FORMATTER.formatMillis(key))); - List bucketValues = bucketToRates.get(key); - if (bucketValues.isEmpty()) { - assertNull(row.get(0)); - assertNull(row.get(1)); - } else { - double avg = bucketValues.stream().mapToDouble(d -> d).sum() / bucketValues.size(); - assertThat((double) row.get(0), closeTo(avg, 0.1)); - assertThat((double) row.get(1), closeTo(avg, 0.1)); - } - } - } - } - - @AwaitsFix(bugUrl = "removed?") - public void testRateWithTimeBucketAndCluster() { - var rounding = new Rounding.Builder(TimeValue.timeValueSeconds(60)).timeZone(ZoneOffset.UTC).build().prepareForUnknown(); - record RateKey(String host, String cluster, long interval) {} - Map> groups = new HashMap<>(); - for (Doc doc : docs) { - RateKey key = new RateKey(doc.host, doc.cluster, rounding.round(doc.timestamp)); - groups.computeIfAbsent(key, k -> new ArrayList<>()).add(new RequestCounter(doc.timestamp, doc.requestCount)); - } - record GroupKey(String cluster, long interval) {} - Map> rateBuckets = new HashMap<>(); - for (Map.Entry> e : groups.entrySet()) { - RateKey key = e.getKey(); - List values = rateBuckets.computeIfAbsent(new GroupKey(key.cluster, key.interval), k -> new ArrayList<>()); - Double rate = computeRate(e.getValue()); - if (rate != null) { - values.add(rate); - } - } - Map> cpuBuckets = new HashMap<>(); - for (Doc doc : docs) { - GroupKey key = new GroupKey(doc.cluster, rounding.round(doc.timestamp)); - cpuBuckets.computeIfAbsent(key, k -> new ArrayList<>()).add(doc.cpu); - } - List sortedKeys = rateBuckets.keySet() - .stream() - .sorted(Comparator.comparing(GroupKey::interval).thenComparing(GroupKey::cluster)) - .limit(5) - .toList(); - try (var resp = run(""" - TS hosts - | STATS sum(rate(request_count)) BY ts=bucket(@timestamp, 1 minute), cluster - | SORT ts, cluster - | LIMIT 5""")) { - assertThat( - resp.columns(), - equalTo( - List.of( - new ColumnInfoImpl("sum(rate(request_count))", "double", null), - new ColumnInfoImpl("ts", "date", null), - new ColumnInfoImpl("cluster", "keyword", null) - ) - ) - ); - List> values = EsqlTestUtils.getValuesList(resp); - assertThat(values, hasSize(sortedKeys.size())); - for (int i = 0; i < sortedKeys.size(); i++) { - List row = values.get(i); - assertThat(row, hasSize(3)); - var key = sortedKeys.get(i); - assertThat(row.get(1), equalTo(DEFAULT_DATE_TIME_FORMATTER.formatMillis(key.interval))); - assertThat(row.get(2), equalTo(key.cluster)); - List bucketValues = rateBuckets.get(key); - if (bucketValues.isEmpty()) { - assertNull(row.get(0)); - } else { - assertThat((double) row.get(0), closeTo(bucketValues.stream().mapToDouble(d -> d).sum(), 0.1)); - } - } - } - try (var resp = run(""" - TS hosts - | STATS avg(rate(request_count)) BY ts=bucket(@timestamp, 1minute), cluster - | SORT ts, cluster - | LIMIT 5""")) { - assertThat( - resp.columns(), - equalTo( - List.of( - new ColumnInfoImpl("avg(rate(request_count))", "double", null), - new ColumnInfoImpl("ts", "date", null), - new ColumnInfoImpl("cluster", "keyword", null) - ) - ) - ); - List> values = EsqlTestUtils.getValuesList(resp); - assertThat(values, hasSize(sortedKeys.size())); - for (int i = 0; i < sortedKeys.size(); i++) { - List row = values.get(i); - assertThat(row, hasSize(3)); - var key = sortedKeys.get(i); - assertThat(row.get(1), equalTo(DEFAULT_DATE_TIME_FORMATTER.formatMillis(key.interval))); - assertThat(row.get(2), equalTo(key.cluster)); - List bucketValues = rateBuckets.get(key); - if (bucketValues.isEmpty()) { - assertNull(row.get(0)); - } else { - double avg = bucketValues.stream().mapToDouble(d -> d).sum() / bucketValues.size(); - assertThat((double) row.get(0), closeTo(avg, 0.1)); - } - } - } - - try (var resp = run(""" - TS hosts - | STATS - s = sum(rate(request_count)), - c = count(rate(request_count)), - max(rate(request_count)), - avg(rate(request_count)) - BY ts=bucket(@timestamp, 1minute), cluster - | SORT ts, cluster - | LIMIT 5 - | EVAL avg_rate= s/c - | KEEP avg_rate, `max(rate(request_count))`, `avg(rate(request_count))`, ts, cluster - """)) { - assertThat( - resp.columns(), - equalTo( - List.of( - new ColumnInfoImpl("avg_rate", "double", null), - new ColumnInfoImpl("max(rate(request_count))", "double", null), - new ColumnInfoImpl("avg(rate(request_count))", "double", null), - new ColumnInfoImpl("ts", "date", null), - new ColumnInfoImpl("cluster", "keyword", null) - ) - ) - ); - List> values = EsqlTestUtils.getValuesList(resp); - assertThat(values, hasSize(sortedKeys.size())); - for (int i = 0; i < sortedKeys.size(); i++) { - List row = values.get(i); - assertThat(row, hasSize(5)); - var key = sortedKeys.get(i); - assertThat(row.get(3), equalTo(DEFAULT_DATE_TIME_FORMATTER.formatMillis(key.interval))); - assertThat(row.get(4), equalTo(key.cluster)); - List bucketValues = rateBuckets.get(key); - if (bucketValues.isEmpty()) { - assertNull(row.get(0)); - assertNull(row.get(1)); - } else { - double avg = bucketValues.stream().mapToDouble(d -> d).sum() / bucketValues.size(); - assertThat((double) row.get(0), closeTo(avg, 0.1)); - double max = bucketValues.stream().mapToDouble(d -> d).max().orElse(0.0); - assertThat((double) row.get(1), closeTo(max, 0.1)); - } - assertEquals(row.get(0), row.get(2)); - } - } - try (var resp = run(""" - TS hosts - | STATS sum(rate(request_count)), max(cpu) BY ts=bucket(@timestamp, 1 minute), cluster - | SORT ts, cluster - | LIMIT 5""")) { - assertThat( - resp.columns(), - equalTo( - List.of( - new ColumnInfoImpl("sum(rate(request_count))", "double", null), - new ColumnInfoImpl("max(cpu)", "double", null), - new ColumnInfoImpl("ts", "date", null), - new ColumnInfoImpl("cluster", "keyword", null) - ) - ) - ); - List> values = EsqlTestUtils.getValuesList(resp); - assertThat(values, hasSize(sortedKeys.size())); - for (int i = 0; i < sortedKeys.size(); i++) { - List row = values.get(i); - assertThat(row, hasSize(4)); - var key = sortedKeys.get(i); - assertThat(row.get(2), equalTo(DEFAULT_DATE_TIME_FORMATTER.formatMillis(key.interval))); - assertThat(row.get(3), equalTo(key.cluster)); - List rateBucket = rateBuckets.get(key); - if (rateBucket.isEmpty()) { - assertNull(row.get(0)); - } else { - assertThat((double) row.get(0), closeTo(rateBucket.stream().mapToDouble(d -> d).sum(), 0.1)); - } - List cpuBucket = cpuBuckets.get(key); - if (cpuBuckets.isEmpty()) { - assertNull(row.get(1)); - } else { - assertThat((double) row.get(1), closeTo(cpuBucket.stream().mapToDouble(d -> d).max().orElse(0.0), 0.1)); - } - } - } - try (var resp = run(""" - TS hosts - | STATS sum(rate(request_count)), avg(cpu) BY ts=bucket(@timestamp, 1 minute), cluster - | SORT ts, cluster - | LIMIT 5""")) { - assertThat( - resp.columns(), - equalTo( - List.of( - new ColumnInfoImpl("sum(rate(request_count))", "double", null), - new ColumnInfoImpl("avg(cpu)", "double", null), - new ColumnInfoImpl("ts", "date", null), - new ColumnInfoImpl("cluster", "keyword", null) - ) - ) - ); - List> values = EsqlTestUtils.getValuesList(resp); - assertThat(values, hasSize(sortedKeys.size())); - for (int i = 0; i < sortedKeys.size(); i++) { - List row = values.get(i); - assertThat(row, hasSize(4)); - var key = sortedKeys.get(i); - assertThat(row.get(2), equalTo(DEFAULT_DATE_TIME_FORMATTER.formatMillis(key.interval))); - assertThat(row.get(3), equalTo(key.cluster)); - List rateBucket = rateBuckets.get(key); - if (rateBucket.isEmpty()) { - assertNull(row.get(0)); - } else { - assertThat((double) row.get(0), closeTo(rateBucket.stream().mapToDouble(d -> d).sum(), 0.1)); - } - List cpuBucket = cpuBuckets.get(key); - if (cpuBuckets.isEmpty()) { - assertNull(row.get(1)); - } else { - double avg = cpuBucket.stream().mapToDouble(d -> d).sum() / cpuBucket.size(); - assertThat((double) row.get(1), closeTo(avg, 0.1)); - } - } - } - } - public void testApplyRateBeforeFinalGrouping() { record RateKey(String cluster, String host) { diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeSeriesRateIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeSeriesRateIT.java index a1ec5c687699b..5fa26d15d28f4 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeSeriesRateIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeSeriesRateIT.java @@ -39,6 +39,9 @@ record Doc(String host, String cluster, long timestamp, int requestCount, double final Map hostToRate = new HashMap<>(); final Map hostToCpu = new HashMap<>(); + static final float DEVIATION_LIMIT = 0.2f; + static final int LIMIT = 5; + @Before public void populateIndex() { // this can be expensive, do one @@ -72,16 +75,17 @@ public void populateIndex() { docs.clear(); for (int i = 0; i < numDocs; i++) { - List hosts = randomSubsetOf(between(hostToClusters.size() - 1, hostToClusters.size()), hostToClusters.keySet()); final var tsChange = between(1, 10); - timestamp += tsChange * 1000L; - for (String host : hosts) { + timestamp += tsChange * 1000L; + // List hosts = randomSubsetOf(between(1, hostToClusters.size()), hostToClusters.keySet()); + // for (String host : hosts) { + for (String host : hostToClusters.keySet()) { var requestCount = requestCounts.compute(host, (k, curr) -> { - // 20% chance of reset - if (randomInt(100) <= 20) { - return hostToRate.get(k) * tsChange; + // 10% chance of reset + if (randomInt(100) <= 10) { // todo change 0 to 10 + return Math.toIntExact(Math.round(hostToRate.get(k) * tsChange)); } else { - return curr == null ? 0 : curr + hostToRate.get(k) * tsChange; + return Math.toIntExact(Math.round((curr == null ? 0 : curr) + hostToRate.get(k) * tsChange)); } }); docs.add(new Doc(host, hostToClusters.get(host), timestamp, requestCount, hostToCpu.get(host))); @@ -105,16 +109,26 @@ public void populateIndex() { .get(); } client().admin().indices().prepareRefresh("hosts").get(); + } private String hostTable() { StringBuilder sb = new StringBuilder(); for (String host : hostToClusters.keySet()) { - sb.append(host).append(" -> ").append(hostToClusters.get(host)).append(", rate=").append(hostToRate.get(host)).append(", cpu=").append(hostToCpu.get(host)).append("\n"); + sb.append(host) + .append(" -> ") + .append(hostToClusters.get(host)) + .append(", rate=") + .append(hostToRate.get(host)) + .append(", cpu=") + .append(hostToCpu.get(host)) + .append("\n"); } // Now we add total rate and total CPU used: sb.append("Total rate: ").append(hostToRate.values().stream().mapToInt(a -> a).sum()).append("\n"); + sb.append("Average rate: ").append(hostToRate.values().stream().mapToInt(a -> a).average().orElseThrow()).append("\n"); sb.append("Total CPU: ").append(hostToCpu.values().stream().mapToInt(a -> a).sum()).append("\n"); + sb.append("Average CPU: ").append(hostToCpu.values().stream().mapToInt(a -> a).average().orElseThrow()).append("\n"); return sb.toString(); } @@ -126,26 +140,28 @@ private String valuesTable(List> values) { return sb.toString(); } - public void testRateWithTimeBucket() { - var limit = 5; - try (var resp = run("TS hosts | STATS sum(rate(request_count)) BY ts=bucket(@timestamp, 1 minute) | SORT ts | LIMIT " + limit)) { + public void testRateWithTimeBucketSumByMin() { + try (var resp = run("TS hosts | STATS sum(rate(request_count)) BY ts=bucket(@timestamp, 1 minute) | SORT ts | LIMIT " + LIMIT)) { + List> values = EsqlTestUtils.getValuesList(resp); try { assertThat( resp.columns(), equalTo(List.of(new ColumnInfoImpl("sum(rate(request_count))", "double", null), new ColumnInfoImpl("ts", "date", null))) ); - List> values = EsqlTestUtils.getValuesList(resp); - assertThat(values, hasSize(limit)); - for (int i = 0; i < limit; i++) { + assertThat(values, hasSize(LIMIT)); + for (int i = 0; i < LIMIT; i++) { List row = values.get(i); assertThat(row, hasSize(2)); - Double bucketValues = hostToRate.values().stream().mapToDouble(a -> a + 0.0).sum(); - assertThat((double) row.get(0), equalTo(bucketValues)); + var totalRate = hostToRate.values().stream().mapToDouble(a -> a + 0.0).sum(); + assertThat((double) row.get(0), closeTo(totalRate, DEVIATION_LIMIT * totalRate)); } } catch (AssertionError e) { - throw new AssertionError("Values:\n" + valuesTable(EsqlTestUtils.getValuesList(resp)) + "\n Hosts:\n" + hostTable(), e); + throw new AssertionError("Values:\n" + valuesTable(values) + "\n Hosts:\n" + hostTable(), e); } } + } + + public void testRateWithTimeBucketAvgByMin() { try (var resp = run("TS hosts | STATS avg(rate(request_count)) BY ts=bucket(@timestamp, 1minute) | SORT ts | LIMIT 5")) { assertThat( resp.columns(), @@ -156,32 +172,240 @@ public void testRateWithTimeBucket() { for (int i = 0; i < 5; i++) { List row = values.get(i); assertThat(row, hasSize(2)); - Double bucketValues = hostToRate.values().stream().mapToDouble(a -> a + 0.0).sum() / hostToRate.size(); - assertThat((double) row.get(0), equalTo(bucketValues)); + var expectedRate = hostToRate.values().stream().mapToDouble(a -> a + 0.0).sum() / hostToRate.size(); + assertThat((double) row.get(0), closeTo(expectedRate, DEVIATION_LIMIT * expectedRate)); } } + } + + public void testRateWithTimeBucketSumByMinAndLimitAsParam() { try (var resp = run(""" TS hosts | STATS avg(rate(request_count)) BY ts=bucket(@timestamp, 1minute) | SORT ts - | LIMIT """ + limit)) { + | LIMIT""" + " " + LIMIT)) { + try { + assertThat( + resp.columns(), + equalTo(List.of(new ColumnInfoImpl("avg(rate(request_count))", "double", null), new ColumnInfoImpl("ts", "date", null))) + ); + List> values = EsqlTestUtils.getValuesList(resp); + assertThat(values, hasSize(LIMIT)); + for (int i = 0; i < LIMIT; i++) { + List row = values.get(i); + assertThat(row, hasSize(2)); + double expectedAvg = hostToRate.values().stream().mapToDouble(d -> d).sum() / hostToRate.size(); + assertThat((double) row.get(0), closeTo(expectedAvg, DEVIATION_LIMIT * expectedAvg)); + } + } catch (AssertionError e) { + throw new AssertionError("Values:\n" + valuesTable(EsqlTestUtils.getValuesList(resp)) + "\n Hosts:\n" + hostTable(), e); + } + } + } + + public void testRateWithTimeBucketAndClusterSumByMin() { + try (var resp = run(""" + TS hosts + | STATS sum(rate(request_count)) BY ts=bucket(@timestamp, 1 minute), cluster + | SORT ts, cluster + | LIMIT 5""")) { + try { + assertThat( + resp.columns(), + equalTo( + List.of( + new ColumnInfoImpl("sum(rate(request_count))", "double", null), + new ColumnInfoImpl("ts", "date", null), + new ColumnInfoImpl("cluster", "keyword", null) + ) + ) + ); + List> values = EsqlTestUtils.getValuesList(resp); + // we have 2 clusters, so we expect 2 * limit rows + for (List row : values) { + assertThat(row, hasSize(3)); + String cluster = (String) row.get(2); + var expectedRate = hostToClusters.entrySet() + .stream() + .filter(e -> e.getValue().equals(cluster)) + .mapToDouble(e -> hostToRate.get(e.getKey()) + 0.0) + .sum(); + assertThat((double) row.get(0), closeTo(expectedRate, DEVIATION_LIMIT * expectedRate)); + } + } catch (AssertionError e) { + throw new AssertionError("Values:\n" + valuesTable(EsqlTestUtils.getValuesList(resp)) + "\n Hosts:\n" + hostTable(), e); + } + } + } + + public void testRateWithTimeBucketAndClusterAvgByMin() { + try (var resp = run(""" + TS hosts + | STATS avg(rate(request_count)) BY ts=bucket(@timestamp, 1minute), cluster + | SORT ts, cluster + | LIMIT 5""")) { try { assertThat( resp.columns(), equalTo( List.of( new ColumnInfoImpl("avg(rate(request_count))", "double", null), - new ColumnInfoImpl("ts", "date", null) + new ColumnInfoImpl("ts", "date", null), + new ColumnInfoImpl("cluster", "keyword", null) ) ) ); List> values = EsqlTestUtils.getValuesList(resp); - assertThat(values, hasSize(limit)); - for (int i = 0; i < limit; i++) { - List row = values.get(i); + for (List row : values) { assertThat(row, hasSize(3)); - double avg = hostToRate.values().stream().mapToDouble(d -> d).sum() / hostToRate.size(); - assertThat((double) row.get(0), closeTo(avg, 0.1)); + String cluster = (String) row.get(2); + var expectedAvg = hostToClusters.entrySet() + .stream() + .filter(e -> e.getValue().equals(cluster)) + .mapToDouble(e -> hostToRate.get(e.getKey()) + 0.0) + .average() + .orElseThrow(); + assertThat((double) row.get(0), closeTo(expectedAvg, DEVIATION_LIMIT * expectedAvg)); + } + } catch (AssertionError e) { + throw new AssertionError("Values:\n" + valuesTable(EsqlTestUtils.getValuesList(resp)) + "\n Hosts:\n" + hostTable(), e); + } + } + } + + public void testRateWithTimeBucketAndClusterMultipleStatsByMin() { + try (var resp = run(""" + TS hosts + | STATS + s = sum(rate(request_count)), + c = count(rate(request_count)), + max(rate(request_count)), + avg(rate(request_count)) + BY ts=bucket(@timestamp, 1minute), cluster + | SORT ts, cluster + | LIMIT 5 + | EVAL avg_rate= s/c + | KEEP avg_rate, `max(rate(request_count))`, `avg(rate(request_count))`, ts, cluster + """)) { + try { + assertThat( + resp.columns(), + equalTo( + List.of( + new ColumnInfoImpl("avg_rate", "double", null), + new ColumnInfoImpl("max(rate(request_count))", "double", null), + new ColumnInfoImpl("avg(rate(request_count))", "double", null), + new ColumnInfoImpl("ts", "date", null), + new ColumnInfoImpl("cluster", "keyword", null) + ) + ) + ); + List> values = EsqlTestUtils.getValuesList(resp); + for (List row : values) { + assertThat(row, hasSize(5)); + String cluster = (String) row.get(4); + var expectedAvg = hostToClusters.entrySet() + .stream() + .filter(e -> e.getValue().equals(cluster)) + .mapToDouble(e -> hostToRate.get(e.getKey()) + 0.0) + .average() + .orElseThrow(); + var expectedMax = hostToClusters.entrySet() + .stream() + .filter(e -> e.getValue().equals(cluster)) + .mapToDouble(e -> hostToRate.get(e.getKey()) + 0.0) + .max() + .orElseThrow(); + assertThat((double) row.get(0), closeTo(expectedAvg, DEVIATION_LIMIT * expectedAvg)); + assertThat((double) row.get(2), closeTo(expectedAvg, DEVIATION_LIMIT * expectedAvg)); + assertThat( + (double) row.get(1), + closeTo(hostToRate.values().stream().mapToDouble(d -> d).max().orElseThrow(), DEVIATION_LIMIT * expectedMax) + ); + } + } catch (AssertionError e) { + throw new AssertionError("Values:\n" + valuesTable(EsqlTestUtils.getValuesList(resp)) + "\n Hosts:\n" + hostTable(), e); + } + } + } + + public void testRateWithTimeBucketAndClusterMultipleMetricsByMin() { + try (var resp = run(""" + TS hosts + | STATS sum(rate(request_count)), max(cpu) BY ts=bucket(@timestamp, 1 minute), cluster + | SORT ts, cluster + | LIMIT 5""")) { + try { + assertThat( + resp.columns(), + equalTo( + List.of( + new ColumnInfoImpl("sum(rate(request_count))", "double", null), + new ColumnInfoImpl("max(cpu)", "double", null), + new ColumnInfoImpl("ts", "date", null), + new ColumnInfoImpl("cluster", "keyword", null) + ) + ) + ); + List> values = EsqlTestUtils.getValuesList(resp); + for (List row : values) { + assertThat(row, hasSize(4)); + String cluster = (String) row.get(3); + var expectedRate = hostToClusters.entrySet() + .stream() + .filter(e -> e.getValue().equals(cluster)) + .mapToDouble(e -> hostToRate.get(e.getKey()) + 0.0) + .sum(); + assertThat((double) row.get(0), closeTo(expectedRate, DEVIATION_LIMIT * expectedRate)); + var expectedCpu = hostToClusters.entrySet() + .stream() + .filter(e -> e.getValue().equals(cluster)) + .mapToDouble(e -> hostToCpu.get(e.getKey()) + 0.0) + .max() + .orElseThrow(); + assertThat((double) row.get(1), closeTo(expectedCpu, DEVIATION_LIMIT * expectedCpu)); + } + } catch (AssertionError e) { + throw new AssertionError("Values:\n" + valuesTable(EsqlTestUtils.getValuesList(resp)) + "\n Hosts:\n" + hostTable(), e); + } + } + } + + public void testRateWithTimeBucketAndClusterMultipleMetricsAvgByMin() { + try (var resp = run(""" + TS hosts + | STATS sum(rate(request_count)), avg(cpu) BY ts=bucket(@timestamp, 1 minute), cluster + | SORT ts, cluster + | LIMIT 5""")) { + try { + assertThat( + resp.columns(), + equalTo( + List.of( + new ColumnInfoImpl("sum(rate(request_count))", "double", null), + new ColumnInfoImpl("avg(cpu)", "double", null), + new ColumnInfoImpl("ts", "date", null), + new ColumnInfoImpl("cluster", "keyword", null) + ) + ) + ); + List> values = EsqlTestUtils.getValuesList(resp); + for (List row : values) { + assertThat(row, hasSize(4)); + String cluster = (String) row.get(3); + var expectedRate = hostToClusters.entrySet() + .stream() + .filter(e -> e.getValue().equals(cluster)) + .mapToDouble(e -> hostToRate.get(e.getKey()) + 0.0) + .sum(); + assertThat((double) row.get(0), closeTo(expectedRate, DEVIATION_LIMIT * expectedRate)); + var expectedCpu = hostToClusters.entrySet() + .stream() + .filter(e -> e.getValue().equals(cluster)) + .mapToDouble(e -> hostToCpu.get(e.getKey()) + 0.0) + .average() + .orElseThrow(); + assertThat((double) row.get(1), closeTo(expectedCpu, DEVIATION_LIMIT * expectedCpu)); } } catch (AssertionError e) { throw new AssertionError("Values:\n" + valuesTable(EsqlTestUtils.getValuesList(resp)) + "\n Hosts:\n" + hostTable(), e); From 091490ba193899aae9abd1704dbae49ee7eba916 Mon Sep 17 00:00:00 2001 From: Pablo Date: Fri, 27 Jun 2025 18:29:37 -0700 Subject: [PATCH 03/11] Tuning and improving auto-test for rate-based aggregations --- .../xpack/esql/action/TimeSeriesRateIT.java | 93 ++++++++++++------- 1 file changed, 59 insertions(+), 34 deletions(-) diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeSeriesRateIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeSeriesRateIT.java index 5fa26d15d28f4..6556c449a0b9d 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeSeriesRateIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeSeriesRateIT.java @@ -17,6 +17,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import static org.elasticsearch.index.mapper.DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER; import static org.hamcrest.Matchers.closeTo; @@ -39,7 +40,11 @@ record Doc(String host, String cluster, long timestamp, int requestCount, double final Map hostToRate = new HashMap<>(); final Map hostToCpu = new HashMap<>(); - static final float DEVIATION_LIMIT = 0.2f; + // We allow a deviation of 15% from the expected rate (which includex an expected drop of 10%). + static final float DEVIATION_LIMIT = 0.15f; + // We expect a 10% drop in the rate due to not covering window edges and not triggering + // extrapolation logic in the time series engine. + static final float EXPECTED_DROP_RATE = 0.10f; static final int LIMIT = 5; @Before @@ -71,24 +76,28 @@ public void populateIndex() { hostToCpu.put("p" + i, randomIntBetween(0, 100)); } long timestamp = DEFAULT_DATE_TIME_FORMATTER.parseMillis("2024-04-15T00:00:00Z"); - int numDocs = between(60, 100); + int numDocs = between(100, 300); docs.clear(); + // We want docs to span a 6-minute period, so we need to adapt their spacing accordingly. + var tsPerDoc = 360.0 / numDocs; // 6 minutes divided by number of docs for (int i = 0; i < numDocs; i++) { - final var tsChange = between(1, 10); - timestamp += tsChange * 1000L; - // List hosts = randomSubsetOf(between(1, hostToClusters.size()), hostToClusters.keySet()); - // for (String host : hosts) { + final var tsChange = randomDoubleBetween(tsPerDoc - 1.0, tsPerDoc + 1.0, true); + timestamp += Math.round(tsChange * 1000); + // We want a subset of hosts to have docs within a give time point. + var hosts = Set.copyOf(randomSubsetOf(between(2, hostToClusters.size()), hostToClusters.keySet())); for (String host : hostToClusters.keySet()) { var requestCount = requestCounts.compute(host, (k, curr) -> { - // 10% chance of reset - if (randomInt(100) <= 10) { // todo change 0 to 10 + // 15% chance of reset + if (randomInt(100) <= 15) { return Math.toIntExact(Math.round(hostToRate.get(k) * tsChange)); } else { return Math.toIntExact(Math.round((curr == null ? 0 : curr) + hostToRate.get(k) * tsChange)); } }); - docs.add(new Doc(host, hostToClusters.get(host), timestamp, requestCount, hostToCpu.get(host))); + if (hosts.contains(host)) { + docs.add(new Doc(host, hostToClusters.get(host), timestamp, requestCount, hostToCpu.get(host))); + } } } Randomness.shuffle(docs); @@ -115,6 +124,7 @@ public void populateIndex() { private String hostTable() { StringBuilder sb = new StringBuilder(); for (String host : hostToClusters.keySet()) { + var docsForHost = docs.stream().filter(d -> d.host().equals(host)).toList(); sb.append(host) .append(" -> ") .append(hostToClusters.get(host)) @@ -122,6 +132,8 @@ private String hostTable() { .append(hostToRate.get(host)) .append(", cpu=") .append(hostToCpu.get(host)) + .append(", numDocs=") + .append(docsForHost.size()) .append("\n"); } // Now we add total rate and total CPU used: @@ -129,6 +141,18 @@ private String hostTable() { sb.append("Average rate: ").append(hostToRate.values().stream().mapToInt(a -> a).average().orElseThrow()).append("\n"); sb.append("Total CPU: ").append(hostToCpu.values().stream().mapToInt(a -> a).sum()).append("\n"); sb.append("Average CPU: ").append(hostToCpu.values().stream().mapToInt(a -> a).average().orElseThrow()).append("\n"); + // Add global info + sb.append("Count of docs: ").append(docs.size()).append("\n"); + // Add docs per minute + sb.append("Docs in each minute:\n"); + Map docsPerMinute = new HashMap<>(); + for (Doc doc : docs) { + long minute = (doc.timestamp / 60000) % 1000; // convert to minutes + docsPerMinute.merge(minute, 1, Integer::sum); + } + for (Map.Entry entry : docsPerMinute.entrySet()) { + sb.append("Minute ").append(entry.getKey()).append(": ").append(entry.getValue()).append(" docs\n"); + } return sb.toString(); } @@ -149,11 +173,11 @@ public void testRateWithTimeBucketSumByMin() { equalTo(List.of(new ColumnInfoImpl("sum(rate(request_count))", "double", null), new ColumnInfoImpl("ts", "date", null))) ); assertThat(values, hasSize(LIMIT)); - for (int i = 0; i < LIMIT; i++) { + for (int i = 0; i < values.size(); i++) { List row = values.get(i); assertThat(row, hasSize(2)); var totalRate = hostToRate.values().stream().mapToDouble(a -> a + 0.0).sum(); - assertThat((double) row.get(0), closeTo(totalRate, DEVIATION_LIMIT * totalRate)); + assertThat((double) row.get(0), closeTo(totalRate * (1 - EXPECTED_DROP_RATE), DEVIATION_LIMIT * totalRate)); } } catch (AssertionError e) { throw new AssertionError("Values:\n" + valuesTable(values) + "\n Hosts:\n" + hostTable(), e); @@ -163,17 +187,21 @@ public void testRateWithTimeBucketSumByMin() { public void testRateWithTimeBucketAvgByMin() { try (var resp = run("TS hosts | STATS avg(rate(request_count)) BY ts=bucket(@timestamp, 1minute) | SORT ts | LIMIT 5")) { - assertThat( - resp.columns(), - equalTo(List.of(new ColumnInfoImpl("avg(rate(request_count))", "double", null), new ColumnInfoImpl("ts", "date", null))) - ); - List> values = EsqlTestUtils.getValuesList(resp); - assertThat(values, hasSize(5)); - for (int i = 0; i < 5; i++) { - List row = values.get(i); - assertThat(row, hasSize(2)); - var expectedRate = hostToRate.values().stream().mapToDouble(a -> a + 0.0).sum() / hostToRate.size(); - assertThat((double) row.get(0), closeTo(expectedRate, DEVIATION_LIMIT * expectedRate)); + try { + assertThat( + resp.columns(), + equalTo(List.of(new ColumnInfoImpl("avg(rate(request_count))", "double", null), new ColumnInfoImpl("ts", "date", null))) + ); + List> values = EsqlTestUtils.getValuesList(resp); + assertThat(values, hasSize(LIMIT)); + for (int i = 0; i < values.size(); i++) { + List row = values.get(i); + assertThat(row, hasSize(2)); + var expectedRate = hostToRate.values().stream().mapToDouble(a -> a + 0.0).sum() / hostToRate.size(); + assertThat((double) row.get(0), closeTo(expectedRate * (1 - EXPECTED_DROP_RATE), DEVIATION_LIMIT * expectedRate)); + } + } catch (AssertionError e) { + throw new AssertionError("Values:\n" + valuesTable(EsqlTestUtils.getValuesList(resp)) + "\n Hosts:\n" + hostTable(), e); } } } @@ -195,7 +223,7 @@ public void testRateWithTimeBucketSumByMinAndLimitAsParam() { List row = values.get(i); assertThat(row, hasSize(2)); double expectedAvg = hostToRate.values().stream().mapToDouble(d -> d).sum() / hostToRate.size(); - assertThat((double) row.get(0), closeTo(expectedAvg, DEVIATION_LIMIT * expectedAvg)); + assertThat((double) row.get(0), closeTo(expectedAvg * (1 - EXPECTED_DROP_RATE), DEVIATION_LIMIT * expectedAvg)); } } catch (AssertionError e) { throw new AssertionError("Values:\n" + valuesTable(EsqlTestUtils.getValuesList(resp)) + "\n Hosts:\n" + hostTable(), e); @@ -230,7 +258,7 @@ public void testRateWithTimeBucketAndClusterSumByMin() { .filter(e -> e.getValue().equals(cluster)) .mapToDouble(e -> hostToRate.get(e.getKey()) + 0.0) .sum(); - assertThat((double) row.get(0), closeTo(expectedRate, DEVIATION_LIMIT * expectedRate)); + assertThat((double) row.get(0), closeTo(expectedRate * (1 - EXPECTED_DROP_RATE), DEVIATION_LIMIT * expectedRate)); } } catch (AssertionError e) { throw new AssertionError("Values:\n" + valuesTable(EsqlTestUtils.getValuesList(resp)) + "\n Hosts:\n" + hostTable(), e); @@ -265,7 +293,7 @@ public void testRateWithTimeBucketAndClusterAvgByMin() { .mapToDouble(e -> hostToRate.get(e.getKey()) + 0.0) .average() .orElseThrow(); - assertThat((double) row.get(0), closeTo(expectedAvg, DEVIATION_LIMIT * expectedAvg)); + assertThat((double) row.get(0), closeTo(expectedAvg * (1 - EXPECTED_DROP_RATE), DEVIATION_LIMIT * expectedAvg)); } } catch (AssertionError e) { throw new AssertionError("Values:\n" + valuesTable(EsqlTestUtils.getValuesList(resp)) + "\n Hosts:\n" + hostTable(), e); @@ -316,12 +344,9 @@ public void testRateWithTimeBucketAndClusterMultipleStatsByMin() { .mapToDouble(e -> hostToRate.get(e.getKey()) + 0.0) .max() .orElseThrow(); - assertThat((double) row.get(0), closeTo(expectedAvg, DEVIATION_LIMIT * expectedAvg)); - assertThat((double) row.get(2), closeTo(expectedAvg, DEVIATION_LIMIT * expectedAvg)); - assertThat( - (double) row.get(1), - closeTo(hostToRate.values().stream().mapToDouble(d -> d).max().orElseThrow(), DEVIATION_LIMIT * expectedMax) - ); + assertThat((double) row.get(0), closeTo(expectedAvg * (1 - EXPECTED_DROP_RATE), DEVIATION_LIMIT * expectedAvg)); + assertThat((double) row.get(2), closeTo(expectedAvg * (1 - EXPECTED_DROP_RATE), DEVIATION_LIMIT * expectedAvg)); + assertThat((double) row.get(1), closeTo(expectedMax * (1 - EXPECTED_DROP_RATE), DEVIATION_LIMIT * expectedMax)); } } catch (AssertionError e) { throw new AssertionError("Values:\n" + valuesTable(EsqlTestUtils.getValuesList(resp)) + "\n Hosts:\n" + hostTable(), e); @@ -356,14 +381,14 @@ public void testRateWithTimeBucketAndClusterMultipleMetricsByMin() { .filter(e -> e.getValue().equals(cluster)) .mapToDouble(e -> hostToRate.get(e.getKey()) + 0.0) .sum(); - assertThat((double) row.get(0), closeTo(expectedRate, DEVIATION_LIMIT * expectedRate)); + assertThat((double) row.get(0), closeTo(expectedRate * (1 - EXPECTED_DROP_RATE), DEVIATION_LIMIT * expectedRate)); var expectedCpu = hostToClusters.entrySet() .stream() .filter(e -> e.getValue().equals(cluster)) .mapToDouble(e -> hostToCpu.get(e.getKey()) + 0.0) .max() .orElseThrow(); - assertThat((double) row.get(1), closeTo(expectedCpu, DEVIATION_LIMIT * expectedCpu)); + assertThat((double) row.get(1), closeTo(expectedCpu * (1 - EXPECTED_DROP_RATE), DEVIATION_LIMIT * expectedCpu)); } } catch (AssertionError e) { throw new AssertionError("Values:\n" + valuesTable(EsqlTestUtils.getValuesList(resp)) + "\n Hosts:\n" + hostTable(), e); @@ -398,7 +423,7 @@ public void testRateWithTimeBucketAndClusterMultipleMetricsAvgByMin() { .filter(e -> e.getValue().equals(cluster)) .mapToDouble(e -> hostToRate.get(e.getKey()) + 0.0) .sum(); - assertThat((double) row.get(0), closeTo(expectedRate, DEVIATION_LIMIT * expectedRate)); + assertThat((double) row.get(0), closeTo(expectedRate * (1 - EXPECTED_DROP_RATE), DEVIATION_LIMIT * expectedRate)); var expectedCpu = hostToClusters.entrySet() .stream() .filter(e -> e.getValue().equals(cluster)) From 697ef5997c73d7397641c6f4e6f405f7f2bce30c Mon Sep 17 00:00:00 2001 From: Pablo Date: Mon, 30 Jun 2025 11:38:27 -0700 Subject: [PATCH 04/11] fixup --- .../org/elasticsearch/xpack/esql/action/TimeSeriesRateIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeSeriesRateIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeSeriesRateIT.java index 6556c449a0b9d..3026a15248d7e 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeSeriesRateIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeSeriesRateIT.java @@ -27,7 +27,7 @@ public class TimeSeriesRateIT extends AbstractEsqlIntegTestCase { @Override - protected EsqlQueryResponse run(EsqlQueryRequest request) { + public EsqlQueryResponse run(EsqlQueryRequest request) { assumeTrue("time series available in snapshot builds only", Build.current().isSnapshot()); return super.run(request); } From c457df1380c36ad0474d5b4866238e3ad7dd49f6 Mon Sep 17 00:00:00 2001 From: Pablo Date: Mon, 7 Jul 2025 12:56:44 -0700 Subject: [PATCH 05/11] addressing comments --- .../xpack/esql/action/TimeSeriesRateIT.java | 67 +++++++++++-------- 1 file changed, 39 insertions(+), 28 deletions(-) diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeSeriesRateIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeSeriesRateIT.java index 3026a15248d7e..9781773da2bd6 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeSeriesRateIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeSeriesRateIT.java @@ -44,8 +44,10 @@ record Doc(String host, String cluster, long timestamp, int requestCount, double static final float DEVIATION_LIMIT = 0.15f; // We expect a 10% drop in the rate due to not covering window edges and not triggering // extrapolation logic in the time series engine. - static final float EXPECTED_DROP_RATE = 0.10f; + static final float EXPECTED_DROP_RATE = 0.12f; static final int LIMIT = 5; + static final int MAX_HOSTS = 5; + static final int PCT_CHANCE_OF_RESET = 15; // 15% chance of resetting the request count @Before public void populateIndex() { @@ -69,7 +71,7 @@ public void populateIndex() { ) .get(); final Map requestCounts = new HashMap<>(); - for (int i = 0; i < 5; i++) { + for (int i = 0; i < MAX_HOSTS; i++) { hostToClusters.put("p" + i, randomFrom("qa", "prod")); hostToRate.put("p" + i, randomIntBetween(0, 50)); requestCounts.put("p" + i, randomIntBetween(0, 100)); @@ -79,17 +81,16 @@ public void populateIndex() { int numDocs = between(100, 300); docs.clear(); // We want docs to span a 6-minute period, so we need to adapt their spacing accordingly. - var tsPerDoc = 360.0 / numDocs; // 6 minutes divided by number of docs + var avgSamplingPeriod = 360.0 / numDocs; // 6 minutes divided by number of docs - then randomized below for (int i = 0; i < numDocs; i++) { - final var tsChange = randomDoubleBetween(tsPerDoc - 1.0, tsPerDoc + 1.0, true); + final var tsChange = randomDoubleBetween(avgSamplingPeriod - 1.0, avgSamplingPeriod + 1.0, true); timestamp += Math.round(tsChange * 1000); // We want a subset of hosts to have docs within a give time point. var hosts = Set.copyOf(randomSubsetOf(between(2, hostToClusters.size()), hostToClusters.keySet())); for (String host : hostToClusters.keySet()) { var requestCount = requestCounts.compute(host, (k, curr) -> { - // 15% chance of reset - if (randomInt(100) <= 15) { + if (randomInt(100) <= PCT_CHANCE_OF_RESET) { return Math.toIntExact(Math.round(hostToRate.get(k) * tsChange)); } else { return Math.toIntExact(Math.round((curr == null ? 0 : curr) + hostToRate.get(k) * tsChange)); @@ -165,12 +166,18 @@ private String valuesTable(List> values) { } public void testRateWithTimeBucketSumByMin() { - try (var resp = run("TS hosts | STATS sum(rate(request_count)) BY ts=bucket(@timestamp, 1 minute) | SORT ts | LIMIT " + LIMIT)) { + try ( + var resp = run( + "TS hosts | STATS sum(rate(request_count)) BY tbucket=bucket(@timestamp, 1 minute) | SORT tbucket | LIMIT " + LIMIT + ) + ) { List> values = EsqlTestUtils.getValuesList(resp); try { assertThat( resp.columns(), - equalTo(List.of(new ColumnInfoImpl("sum(rate(request_count))", "double", null), new ColumnInfoImpl("ts", "date", null))) + equalTo( + List.of(new ColumnInfoImpl("sum(rate(request_count))", "double", null), new ColumnInfoImpl("tbucket", "date", null)) + ) ); assertThat(values, hasSize(LIMIT)); for (int i = 0; i < values.size(); i++) { @@ -186,11 +193,13 @@ public void testRateWithTimeBucketSumByMin() { } public void testRateWithTimeBucketAvgByMin() { - try (var resp = run("TS hosts | STATS avg(rate(request_count)) BY ts=bucket(@timestamp, 1minute) | SORT ts | LIMIT 5")) { + try (var resp = run("TS hosts | STATS avg(rate(request_count)) BY tbucket=bucket(@timestamp, 1minute) | SORT tbucket | LIMIT 5")) { try { assertThat( resp.columns(), - equalTo(List.of(new ColumnInfoImpl("avg(rate(request_count))", "double", null), new ColumnInfoImpl("ts", "date", null))) + equalTo( + List.of(new ColumnInfoImpl("avg(rate(request_count))", "double", null), new ColumnInfoImpl("tbucket", "date", null)) + ) ); List> values = EsqlTestUtils.getValuesList(resp); assertThat(values, hasSize(LIMIT)); @@ -209,13 +218,15 @@ public void testRateWithTimeBucketAvgByMin() { public void testRateWithTimeBucketSumByMinAndLimitAsParam() { try (var resp = run(""" TS hosts - | STATS avg(rate(request_count)) BY ts=bucket(@timestamp, 1minute) - | SORT ts + | STATS avg(rate(request_count)) BY tbucket=bucket(@timestamp, 1minute) + | SORT tbucket | LIMIT""" + " " + LIMIT)) { try { assertThat( resp.columns(), - equalTo(List.of(new ColumnInfoImpl("avg(rate(request_count))", "double", null), new ColumnInfoImpl("ts", "date", null))) + equalTo( + List.of(new ColumnInfoImpl("avg(rate(request_count))", "double", null), new ColumnInfoImpl("tbucket", "date", null)) + ) ); List> values = EsqlTestUtils.getValuesList(resp); assertThat(values, hasSize(LIMIT)); @@ -234,8 +245,8 @@ public void testRateWithTimeBucketSumByMinAndLimitAsParam() { public void testRateWithTimeBucketAndClusterSumByMin() { try (var resp = run(""" TS hosts - | STATS sum(rate(request_count)) BY ts=bucket(@timestamp, 1 minute), cluster - | SORT ts, cluster + | STATS sum(rate(request_count)) BY tbucket=bucket(@timestamp, 1 minute), cluster + | SORT tbucket, cluster | LIMIT 5""")) { try { assertThat( @@ -243,7 +254,7 @@ public void testRateWithTimeBucketAndClusterSumByMin() { equalTo( List.of( new ColumnInfoImpl("sum(rate(request_count))", "double", null), - new ColumnInfoImpl("ts", "date", null), + new ColumnInfoImpl("tbucket", "date", null), new ColumnInfoImpl("cluster", "keyword", null) ) ) @@ -269,8 +280,8 @@ public void testRateWithTimeBucketAndClusterSumByMin() { public void testRateWithTimeBucketAndClusterAvgByMin() { try (var resp = run(""" TS hosts - | STATS avg(rate(request_count)) BY ts=bucket(@timestamp, 1minute), cluster - | SORT ts, cluster + | STATS avg(rate(request_count)) BY tbucket=bucket(@timestamp, 1minute), cluster + | SORT tbucket, cluster | LIMIT 5""")) { try { assertThat( @@ -278,7 +289,7 @@ public void testRateWithTimeBucketAndClusterAvgByMin() { equalTo( List.of( new ColumnInfoImpl("avg(rate(request_count))", "double", null), - new ColumnInfoImpl("ts", "date", null), + new ColumnInfoImpl("tbucket", "date", null), new ColumnInfoImpl("cluster", "keyword", null) ) ) @@ -309,8 +320,8 @@ public void testRateWithTimeBucketAndClusterMultipleStatsByMin() { c = count(rate(request_count)), max(rate(request_count)), avg(rate(request_count)) - BY ts=bucket(@timestamp, 1minute), cluster - | SORT ts, cluster + BY tbucket=bucket(@timestamp, 1minute), cluster + | SORT tbucket, cluster | LIMIT 5 | EVAL avg_rate= s/c | KEEP avg_rate, `max(rate(request_count))`, `avg(rate(request_count))`, ts, cluster @@ -323,7 +334,7 @@ public void testRateWithTimeBucketAndClusterMultipleStatsByMin() { new ColumnInfoImpl("avg_rate", "double", null), new ColumnInfoImpl("max(rate(request_count))", "double", null), new ColumnInfoImpl("avg(rate(request_count))", "double", null), - new ColumnInfoImpl("ts", "date", null), + new ColumnInfoImpl("tbucket", "date", null), new ColumnInfoImpl("cluster", "keyword", null) ) ) @@ -357,8 +368,8 @@ public void testRateWithTimeBucketAndClusterMultipleStatsByMin() { public void testRateWithTimeBucketAndClusterMultipleMetricsByMin() { try (var resp = run(""" TS hosts - | STATS sum(rate(request_count)), max(cpu) BY ts=bucket(@timestamp, 1 minute), cluster - | SORT ts, cluster + | STATS sum(rate(request_count)), max(cpu) BY tbucket=bucket(@timestamp, 1 minute), cluster + | SORT tbucket, cluster | LIMIT 5""")) { try { assertThat( @@ -367,7 +378,7 @@ public void testRateWithTimeBucketAndClusterMultipleMetricsByMin() { List.of( new ColumnInfoImpl("sum(rate(request_count))", "double", null), new ColumnInfoImpl("max(cpu)", "double", null), - new ColumnInfoImpl("ts", "date", null), + new ColumnInfoImpl("tbucket", "date", null), new ColumnInfoImpl("cluster", "keyword", null) ) ) @@ -399,8 +410,8 @@ public void testRateWithTimeBucketAndClusterMultipleMetricsByMin() { public void testRateWithTimeBucketAndClusterMultipleMetricsAvgByMin() { try (var resp = run(""" TS hosts - | STATS sum(rate(request_count)), avg(cpu) BY ts=bucket(@timestamp, 1 minute), cluster - | SORT ts, cluster + | STATS sum(rate(request_count)), avg(cpu) BY tbucket=bucket(@timestamp, 1 minute), cluster + | SORT tbucket, cluster | LIMIT 5""")) { try { assertThat( @@ -409,7 +420,7 @@ public void testRateWithTimeBucketAndClusterMultipleMetricsAvgByMin() { List.of( new ColumnInfoImpl("sum(rate(request_count))", "double", null), new ColumnInfoImpl("avg(cpu)", "double", null), - new ColumnInfoImpl("ts", "date", null), + new ColumnInfoImpl("tbucket", "date", null), new ColumnInfoImpl("cluster", "keyword", null) ) ) From b4bd117cc2d089723172ca1c26078b586ab6c70e Mon Sep 17 00:00:00 2001 From: Pablo Date: Mon, 7 Jul 2025 14:29:24 -0700 Subject: [PATCH 06/11] fixup --- .../org/elasticsearch/xpack/esql/action/TimeSeriesRateIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeSeriesRateIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeSeriesRateIT.java index 9781773da2bd6..3429e5301753b 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeSeriesRateIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeSeriesRateIT.java @@ -324,7 +324,7 @@ public void testRateWithTimeBucketAndClusterMultipleStatsByMin() { | SORT tbucket, cluster | LIMIT 5 | EVAL avg_rate= s/c - | KEEP avg_rate, `max(rate(request_count))`, `avg(rate(request_count))`, ts, cluster + | KEEP avg_rate, `max(rate(request_count))`, `avg(rate(request_count))`, tbucket, cluster """)) { try { assertThat( From 88e6d223ae37c94098ad9d3525fd2d161bb0a669 Mon Sep 17 00:00:00 2001 From: Pablo Date: Tue, 8 Jul 2025 11:53:26 -0700 Subject: [PATCH 07/11] improve failure rate --- .../xpack/esql/action/TimeSeriesRateIT.java | 50 ++++++++++++++----- 1 file changed, 37 insertions(+), 13 deletions(-) diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeSeriesRateIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeSeriesRateIT.java index 3429e5301753b..a7c6ffc81e1f0 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeSeriesRateIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeSeriesRateIT.java @@ -40,11 +40,11 @@ record Doc(String host, String cluster, long timestamp, int requestCount, double final Map hostToRate = new HashMap<>(); final Map hostToCpu = new HashMap<>(); - // We allow a deviation of 15% from the expected rate (which includex an expected drop of 10%). static final float DEVIATION_LIMIT = 0.15f; + static final float SUBGROUP_DEVIATION_LIMIT = 0.22f; // extra deviation tolerance for subgroups due to fewer samples // We expect a 10% drop in the rate due to not covering window edges and not triggering // extrapolation logic in the time series engine. - static final float EXPECTED_DROP_RATE = 0.12f; + static final float EXPECTED_DROP_RATE = 0.15f; static final int LIMIT = 5; static final int MAX_HOSTS = 5; static final int PCT_CHANCE_OF_RESET = 15; // 15% chance of resetting the request count @@ -73,8 +73,8 @@ public void populateIndex() { final Map requestCounts = new HashMap<>(); for (int i = 0; i < MAX_HOSTS; i++) { hostToClusters.put("p" + i, randomFrom("qa", "prod")); - hostToRate.put("p" + i, randomIntBetween(0, 50)); - requestCounts.put("p" + i, randomIntBetween(0, 100)); + hostToRate.put("p" + i, randomIntBetween(10, 50)); + requestCounts.put("p" + i, randomIntBetween(0, 1000)); hostToCpu.put("p" + i, randomIntBetween(0, 100)); } long timestamp = DEFAULT_DATE_TIME_FORMATTER.parseMillis("2024-04-15T00:00:00Z"); @@ -269,7 +269,10 @@ public void testRateWithTimeBucketAndClusterSumByMin() { .filter(e -> e.getValue().equals(cluster)) .mapToDouble(e -> hostToRate.get(e.getKey()) + 0.0) .sum(); - assertThat((double) row.get(0), closeTo(expectedRate * (1 - EXPECTED_DROP_RATE), DEVIATION_LIMIT * expectedRate)); + assertThat( + (double) row.get(0), + closeTo(expectedRate * (1 - EXPECTED_DROP_RATE), SUBGROUP_DEVIATION_LIMIT * expectedRate) + ); } } catch (AssertionError e) { throw new AssertionError("Values:\n" + valuesTable(EsqlTestUtils.getValuesList(resp)) + "\n Hosts:\n" + hostTable(), e); @@ -304,7 +307,10 @@ public void testRateWithTimeBucketAndClusterAvgByMin() { .mapToDouble(e -> hostToRate.get(e.getKey()) + 0.0) .average() .orElseThrow(); - assertThat((double) row.get(0), closeTo(expectedAvg * (1 - EXPECTED_DROP_RATE), DEVIATION_LIMIT * expectedAvg)); + assertThat( + (double) row.get(0), + closeTo(expectedAvg * (1 - EXPECTED_DROP_RATE), SUBGROUP_DEVIATION_LIMIT * expectedAvg) + ); } } catch (AssertionError e) { throw new AssertionError("Values:\n" + valuesTable(EsqlTestUtils.getValuesList(resp)) + "\n Hosts:\n" + hostTable(), e); @@ -355,9 +361,18 @@ public void testRateWithTimeBucketAndClusterMultipleStatsByMin() { .mapToDouble(e -> hostToRate.get(e.getKey()) + 0.0) .max() .orElseThrow(); - assertThat((double) row.get(0), closeTo(expectedAvg * (1 - EXPECTED_DROP_RATE), DEVIATION_LIMIT * expectedAvg)); - assertThat((double) row.get(2), closeTo(expectedAvg * (1 - EXPECTED_DROP_RATE), DEVIATION_LIMIT * expectedAvg)); - assertThat((double) row.get(1), closeTo(expectedMax * (1 - EXPECTED_DROP_RATE), DEVIATION_LIMIT * expectedMax)); + assertThat( + (double) row.get(0), + closeTo(expectedAvg * (1 - EXPECTED_DROP_RATE), SUBGROUP_DEVIATION_LIMIT * expectedAvg) + ); + assertThat( + (double) row.get(2), + closeTo(expectedAvg * (1 - EXPECTED_DROP_RATE), SUBGROUP_DEVIATION_LIMIT * expectedAvg) + ); + assertThat( + (double) row.get(1), + closeTo(expectedMax * (1 - EXPECTED_DROP_RATE), SUBGROUP_DEVIATION_LIMIT * expectedMax) + ); } } catch (AssertionError e) { throw new AssertionError("Values:\n" + valuesTable(EsqlTestUtils.getValuesList(resp)) + "\n Hosts:\n" + hostTable(), e); @@ -392,14 +407,20 @@ public void testRateWithTimeBucketAndClusterMultipleMetricsByMin() { .filter(e -> e.getValue().equals(cluster)) .mapToDouble(e -> hostToRate.get(e.getKey()) + 0.0) .sum(); - assertThat((double) row.get(0), closeTo(expectedRate * (1 - EXPECTED_DROP_RATE), DEVIATION_LIMIT * expectedRate)); + assertThat( + (double) row.get(0), + closeTo(expectedRate * (1 - EXPECTED_DROP_RATE), SUBGROUP_DEVIATION_LIMIT * expectedRate) + ); var expectedCpu = hostToClusters.entrySet() .stream() .filter(e -> e.getValue().equals(cluster)) .mapToDouble(e -> hostToCpu.get(e.getKey()) + 0.0) .max() .orElseThrow(); - assertThat((double) row.get(1), closeTo(expectedCpu * (1 - EXPECTED_DROP_RATE), DEVIATION_LIMIT * expectedCpu)); + assertThat( + (double) row.get(1), + closeTo(expectedCpu * (1 - EXPECTED_DROP_RATE), SUBGROUP_DEVIATION_LIMIT * expectedCpu) + ); } } catch (AssertionError e) { throw new AssertionError("Values:\n" + valuesTable(EsqlTestUtils.getValuesList(resp)) + "\n Hosts:\n" + hostTable(), e); @@ -434,14 +455,17 @@ public void testRateWithTimeBucketAndClusterMultipleMetricsAvgByMin() { .filter(e -> e.getValue().equals(cluster)) .mapToDouble(e -> hostToRate.get(e.getKey()) + 0.0) .sum(); - assertThat((double) row.get(0), closeTo(expectedRate * (1 - EXPECTED_DROP_RATE), DEVIATION_LIMIT * expectedRate)); + assertThat( + (double) row.get(0), + closeTo(expectedRate * (1 - EXPECTED_DROP_RATE), SUBGROUP_DEVIATION_LIMIT * expectedRate) + ); var expectedCpu = hostToClusters.entrySet() .stream() .filter(e -> e.getValue().equals(cluster)) .mapToDouble(e -> hostToCpu.get(e.getKey()) + 0.0) .average() .orElseThrow(); - assertThat((double) row.get(1), closeTo(expectedCpu, DEVIATION_LIMIT * expectedCpu)); + assertThat((double) row.get(1), closeTo(expectedCpu, SUBGROUP_DEVIATION_LIMIT * expectedCpu)); } } catch (AssertionError e) { throw new AssertionError("Values:\n" + valuesTable(EsqlTestUtils.getValuesList(resp)) + "\n Hosts:\n" + hostTable(), e); From 9c628181c63a868d806c989e6b26037e0c4d19ab Mon Sep 17 00:00:00 2001 From: Pablo Date: Wed, 9 Jul 2025 14:28:27 -0700 Subject: [PATCH 08/11] tuned deviation limits to reduce flakiness --- .../org/elasticsearch/xpack/esql/action/TimeSeriesRateIT.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeSeriesRateIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeSeriesRateIT.java index a7c6ffc81e1f0..44ed36248cd5c 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeSeriesRateIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeSeriesRateIT.java @@ -40,8 +40,8 @@ record Doc(String host, String cluster, long timestamp, int requestCount, double final Map hostToRate = new HashMap<>(); final Map hostToCpu = new HashMap<>(); - static final float DEVIATION_LIMIT = 0.15f; - static final float SUBGROUP_DEVIATION_LIMIT = 0.22f; // extra deviation tolerance for subgroups due to fewer samples + static final float DEVIATION_LIMIT = 0.2f; + static final float SUBGROUP_DEVIATION_LIMIT = 0.30f; // extra deviation tolerance for subgroups due to fewer samples // We expect a 10% drop in the rate due to not covering window edges and not triggering // extrapolation logic in the time series engine. static final float EXPECTED_DROP_RATE = 0.15f; From 472875db0481b542f438b47a33c52df5e4e437a7 Mon Sep 17 00:00:00 2001 From: Pablo Date: Thu, 10 Jul 2025 14:21:17 -0700 Subject: [PATCH 09/11] fixup - wider error margin --- .../elasticsearch/xpack/esql/action/TimeSeriesRateIT.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeSeriesRateIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeSeriesRateIT.java index 44ed36248cd5c..568fa718edfe2 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeSeriesRateIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeSeriesRateIT.java @@ -40,9 +40,11 @@ record Doc(String host, String cluster, long timestamp, int requestCount, double final Map hostToRate = new HashMap<>(); final Map hostToCpu = new HashMap<>(); - static final float DEVIATION_LIMIT = 0.2f; - static final float SUBGROUP_DEVIATION_LIMIT = 0.30f; // extra deviation tolerance for subgroups due to fewer samples - // We expect a 10% drop in the rate due to not covering window edges and not triggering + static final float DEVIATION_LIMIT = 0.25f; + // extra deviation tolerance for subgroups due to fewer samples + // at 0.35 deviation limit, we see 2/8000 failures. I am expanding to 0.37 + static final float SUBGROUP_DEVIATION_LIMIT = 0.37f; + // We expect a drop in the rate due to not covering window edges and not triggering // extrapolation logic in the time series engine. static final float EXPECTED_DROP_RATE = 0.15f; static final int LIMIT = 5; From 4e88241b0a8b7406d9637f7168eb2df284bb2ddd Mon Sep 17 00:00:00 2001 From: Pablo Date: Mon, 14 Jul 2025 16:26:18 -0700 Subject: [PATCH 10/11] wider margin --- .../org/elasticsearch/xpack/esql/action/TimeSeriesRateIT.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeSeriesRateIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeSeriesRateIT.java index 568fa718edfe2..9a0a21fc2198a 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeSeriesRateIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeSeriesRateIT.java @@ -42,8 +42,8 @@ record Doc(String host, String cluster, long timestamp, int requestCount, double static final float DEVIATION_LIMIT = 0.25f; // extra deviation tolerance for subgroups due to fewer samples - // at 0.35 deviation limit, we see 2/8000 failures. I am expanding to 0.37 - static final float SUBGROUP_DEVIATION_LIMIT = 0.37f; + // at 0.35 deviation limit, we see 2/8000 failures. I am expanding to 0.4 + static final float SUBGROUP_DEVIATION_LIMIT = 0.45f; // We expect a drop in the rate due to not covering window edges and not triggering // extrapolation logic in the time series engine. static final float EXPECTED_DROP_RATE = 0.15f; From c26e5db9c3c575664d58e93e7239c1f76431a465 Mon Sep 17 00:00:00 2001 From: Pablo Date: Mon, 14 Jul 2025 16:26:48 -0700 Subject: [PATCH 11/11] wider range for aggs --- .../org/elasticsearch/xpack/esql/action/TimeSeriesRateIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeSeriesRateIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeSeriesRateIT.java index 9a0a21fc2198a..92d8c55d66a72 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeSeriesRateIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeSeriesRateIT.java @@ -40,7 +40,7 @@ record Doc(String host, String cluster, long timestamp, int requestCount, double final Map hostToRate = new HashMap<>(); final Map hostToCpu = new HashMap<>(); - static final float DEVIATION_LIMIT = 0.25f; + static final float DEVIATION_LIMIT = 0.30f; // extra deviation tolerance for subgroups due to fewer samples // at 0.35 deviation limit, we see 2/8000 failures. I am expanding to 0.4 static final float SUBGROUP_DEVIATION_LIMIT = 0.45f;