Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,14 @@

import org.elasticsearch.Build;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.Rounding;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.compute.lucene.TimeSeriesSourceOperator;
import org.elasticsearch.compute.operator.DriverProfile;
import org.elasticsearch.compute.operator.TimeSeriesAggregationOperator;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.xpack.esql.EsqlTestUtils;
import org.elasticsearch.xpack.esql.core.type.DataType;
import org.junit.Before;

import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
Expand Down Expand Up @@ -321,319 +318,6 @@ record RateKey(String cluster, String host) {
}
}

@AwaitsFix(bugUrl = "removed?")
public void testRateWithTimeBucket() {
var rounding = new Rounding.Builder(TimeValue.timeValueSeconds(60)).timeZone(ZoneOffset.UTC).build().prepareForUnknown();
record RateKey(String host, String cluster, long interval) {}
Map<RateKey, List<RequestCounter>> groups = new HashMap<>();
for (Doc doc : docs) {
RateKey key = new RateKey(doc.host, doc.cluster, rounding.round(doc.timestamp));
groups.computeIfAbsent(key, k -> new ArrayList<>()).add(new RequestCounter(doc.timestamp, doc.requestCount));
}
Map<Long, List<Double>> bucketToRates = new HashMap<>();
for (Map.Entry<RateKey, List<RequestCounter>> e : groups.entrySet()) {
List<Double> values = bucketToRates.computeIfAbsent(e.getKey().interval, k -> new ArrayList<>());
Double rate = computeRate(e.getValue());
if (rate != null) {
values.add(rate);
}
}
List<Long> sortedKeys = bucketToRates.keySet().stream().sorted().limit(5).toList();
try (var resp = run("TS hosts | STATS sum(rate(request_count)) BY ts=bucket(@timestamp, 1 minute) | SORT ts | LIMIT 5")) {
assertThat(
resp.columns(),
equalTo(List.of(new ColumnInfoImpl("sum(rate(request_count))", "double", null), new ColumnInfoImpl("ts", "date", null)))
);
List<List<Object>> values = EsqlTestUtils.getValuesList(resp);
assertThat(values, hasSize(sortedKeys.size()));
for (int i = 0; i < sortedKeys.size(); i++) {
List<Object> row = values.get(i);
assertThat(row, hasSize(2));
long key = sortedKeys.get(i);
assertThat(row.get(1), equalTo(DEFAULT_DATE_TIME_FORMATTER.formatMillis(key)));
List<Double> bucketValues = bucketToRates.get(key);
if (bucketValues.isEmpty()) {
assertNull(row.get(0));
} else {
assertThat((double) row.get(0), closeTo(bucketValues.stream().mapToDouble(d -> d).sum(), 0.1));
}
}
}
try (var resp = run("TS hosts | STATS avg(rate(request_count)) BY ts=bucket(@timestamp, 1minute) | SORT ts | LIMIT 5")) {
assertThat(
resp.columns(),
equalTo(List.of(new ColumnInfoImpl("avg(rate(request_count))", "double", null), new ColumnInfoImpl("ts", "date", null)))
);
List<List<Object>> values = EsqlTestUtils.getValuesList(resp);
assertThat(values, hasSize(sortedKeys.size()));
for (int i = 0; i < sortedKeys.size(); i++) {
List<Object> row = values.get(i);
assertThat(row, hasSize(2));
long key = sortedKeys.get(i);
assertThat(row.get(1), equalTo(DEFAULT_DATE_TIME_FORMATTER.formatMillis(key)));
List<Double> bucketValues = bucketToRates.get(key);
if (bucketValues.isEmpty()) {
assertNull(row.get(0));
} else {
double avg = bucketValues.stream().mapToDouble(d -> d).sum() / bucketValues.size();
assertThat((double) row.get(0), closeTo(avg, 0.1));
}
}
}
try (var resp = run("""
TS hosts
| STATS avg(rate(request_count)), avg(rate(request_count)) BY ts=bucket(@timestamp, 1minute)
| SORT ts
| LIMIT 5
""")) {
assertThat(
resp.columns(),
equalTo(
List.of(
new ColumnInfoImpl("avg(rate(request_count))", "double", null),
new ColumnInfoImpl("avg(rate(request_count))", "double", null),
new ColumnInfoImpl("ts", "date", null)
)
)
);
List<List<Object>> values = EsqlTestUtils.getValuesList(resp);
assertThat(values, hasSize(sortedKeys.size()));
for (int i = 0; i < sortedKeys.size(); i++) {
List<Object> row = values.get(i);
assertThat(row, hasSize(3));
long key = sortedKeys.get(i);
assertThat(row.get(2), equalTo(DEFAULT_DATE_TIME_FORMATTER.formatMillis(key)));
List<Double> bucketValues = bucketToRates.get(key);
if (bucketValues.isEmpty()) {
assertNull(row.get(0));
assertNull(row.get(1));
} else {
double avg = bucketValues.stream().mapToDouble(d -> d).sum() / bucketValues.size();
assertThat((double) row.get(0), closeTo(avg, 0.1));
assertThat((double) row.get(1), closeTo(avg, 0.1));
}
}
}
}

@AwaitsFix(bugUrl = "removed?")
public void testRateWithTimeBucketAndCluster() {
var rounding = new Rounding.Builder(TimeValue.timeValueSeconds(60)).timeZone(ZoneOffset.UTC).build().prepareForUnknown();
record RateKey(String host, String cluster, long interval) {}
Map<RateKey, List<RequestCounter>> groups = new HashMap<>();
for (Doc doc : docs) {
RateKey key = new RateKey(doc.host, doc.cluster, rounding.round(doc.timestamp));
groups.computeIfAbsent(key, k -> new ArrayList<>()).add(new RequestCounter(doc.timestamp, doc.requestCount));
}
record GroupKey(String cluster, long interval) {}
Map<GroupKey, List<Double>> rateBuckets = new HashMap<>();
for (Map.Entry<RateKey, List<RequestCounter>> e : groups.entrySet()) {
RateKey key = e.getKey();
List<Double> values = rateBuckets.computeIfAbsent(new GroupKey(key.cluster, key.interval), k -> new ArrayList<>());
Double rate = computeRate(e.getValue());
if (rate != null) {
values.add(rate);
}
}
Map<GroupKey, List<Double>> cpuBuckets = new HashMap<>();
for (Doc doc : docs) {
GroupKey key = new GroupKey(doc.cluster, rounding.round(doc.timestamp));
cpuBuckets.computeIfAbsent(key, k -> new ArrayList<>()).add(doc.cpu);
}
List<GroupKey> sortedKeys = rateBuckets.keySet()
.stream()
.sorted(Comparator.comparing(GroupKey::interval).thenComparing(GroupKey::cluster))
.limit(5)
.toList();
try (var resp = run("""
TS hosts
| STATS sum(rate(request_count)) BY ts=bucket(@timestamp, 1 minute), cluster
| SORT ts, cluster
| LIMIT 5""")) {
assertThat(
resp.columns(),
equalTo(
List.of(
new ColumnInfoImpl("sum(rate(request_count))", "double", null),
new ColumnInfoImpl("ts", "date", null),
new ColumnInfoImpl("cluster", "keyword", null)
)
)
);
List<List<Object>> values = EsqlTestUtils.getValuesList(resp);
assertThat(values, hasSize(sortedKeys.size()));
for (int i = 0; i < sortedKeys.size(); i++) {
List<Object> row = values.get(i);
assertThat(row, hasSize(3));
var key = sortedKeys.get(i);
assertThat(row.get(1), equalTo(DEFAULT_DATE_TIME_FORMATTER.formatMillis(key.interval)));
assertThat(row.get(2), equalTo(key.cluster));
List<Double> bucketValues = rateBuckets.get(key);
if (bucketValues.isEmpty()) {
assertNull(row.get(0));
} else {
assertThat((double) row.get(0), closeTo(bucketValues.stream().mapToDouble(d -> d).sum(), 0.1));
}
}
}
try (var resp = run("""
TS hosts
| STATS avg(rate(request_count)) BY ts=bucket(@timestamp, 1minute), cluster
| SORT ts, cluster
| LIMIT 5""")) {
assertThat(
resp.columns(),
equalTo(
List.of(
new ColumnInfoImpl("avg(rate(request_count))", "double", null),
new ColumnInfoImpl("ts", "date", null),
new ColumnInfoImpl("cluster", "keyword", null)
)
)
);
List<List<Object>> values = EsqlTestUtils.getValuesList(resp);
assertThat(values, hasSize(sortedKeys.size()));
for (int i = 0; i < sortedKeys.size(); i++) {
List<Object> row = values.get(i);
assertThat(row, hasSize(3));
var key = sortedKeys.get(i);
assertThat(row.get(1), equalTo(DEFAULT_DATE_TIME_FORMATTER.formatMillis(key.interval)));
assertThat(row.get(2), equalTo(key.cluster));
List<Double> bucketValues = rateBuckets.get(key);
if (bucketValues.isEmpty()) {
assertNull(row.get(0));
} else {
double avg = bucketValues.stream().mapToDouble(d -> d).sum() / bucketValues.size();
assertThat((double) row.get(0), closeTo(avg, 0.1));
}
}
}

try (var resp = run("""
TS hosts
| STATS
s = sum(rate(request_count)),
c = count(rate(request_count)),
max(rate(request_count)),
avg(rate(request_count))
BY ts=bucket(@timestamp, 1minute), cluster
| SORT ts, cluster
| LIMIT 5
| EVAL avg_rate= s/c
| KEEP avg_rate, `max(rate(request_count))`, `avg(rate(request_count))`, ts, cluster
""")) {
assertThat(
resp.columns(),
equalTo(
List.of(
new ColumnInfoImpl("avg_rate", "double", null),
new ColumnInfoImpl("max(rate(request_count))", "double", null),
new ColumnInfoImpl("avg(rate(request_count))", "double", null),
new ColumnInfoImpl("ts", "date", null),
new ColumnInfoImpl("cluster", "keyword", null)
)
)
);
List<List<Object>> values = EsqlTestUtils.getValuesList(resp);
assertThat(values, hasSize(sortedKeys.size()));
for (int i = 0; i < sortedKeys.size(); i++) {
List<Object> row = values.get(i);
assertThat(row, hasSize(5));
var key = sortedKeys.get(i);
assertThat(row.get(3), equalTo(DEFAULT_DATE_TIME_FORMATTER.formatMillis(key.interval)));
assertThat(row.get(4), equalTo(key.cluster));
List<Double> bucketValues = rateBuckets.get(key);
if (bucketValues.isEmpty()) {
assertNull(row.get(0));
assertNull(row.get(1));
} else {
double avg = bucketValues.stream().mapToDouble(d -> d).sum() / bucketValues.size();
assertThat((double) row.get(0), closeTo(avg, 0.1));
double max = bucketValues.stream().mapToDouble(d -> d).max().orElse(0.0);
assertThat((double) row.get(1), closeTo(max, 0.1));
}
assertEquals(row.get(0), row.get(2));
}
}
try (var resp = run("""
TS hosts
| STATS sum(rate(request_count)), max(cpu) BY ts=bucket(@timestamp, 1 minute), cluster
| SORT ts, cluster
| LIMIT 5""")) {
assertThat(
resp.columns(),
equalTo(
List.of(
new ColumnInfoImpl("sum(rate(request_count))", "double", null),
new ColumnInfoImpl("max(cpu)", "double", null),
new ColumnInfoImpl("ts", "date", null),
new ColumnInfoImpl("cluster", "keyword", null)
)
)
);
List<List<Object>> values = EsqlTestUtils.getValuesList(resp);
assertThat(values, hasSize(sortedKeys.size()));
for (int i = 0; i < sortedKeys.size(); i++) {
List<Object> row = values.get(i);
assertThat(row, hasSize(4));
var key = sortedKeys.get(i);
assertThat(row.get(2), equalTo(DEFAULT_DATE_TIME_FORMATTER.formatMillis(key.interval)));
assertThat(row.get(3), equalTo(key.cluster));
List<Double> rateBucket = rateBuckets.get(key);
if (rateBucket.isEmpty()) {
assertNull(row.get(0));
} else {
assertThat((double) row.get(0), closeTo(rateBucket.stream().mapToDouble(d -> d).sum(), 0.1));
}
List<Double> cpuBucket = cpuBuckets.get(key);
if (cpuBuckets.isEmpty()) {
assertNull(row.get(1));
} else {
assertThat((double) row.get(1), closeTo(cpuBucket.stream().mapToDouble(d -> d).max().orElse(0.0), 0.1));
}
}
}
try (var resp = run("""
TS hosts
| STATS sum(rate(request_count)), avg(cpu) BY ts=bucket(@timestamp, 1 minute), cluster
| SORT ts, cluster
| LIMIT 5""")) {
assertThat(
resp.columns(),
equalTo(
List.of(
new ColumnInfoImpl("sum(rate(request_count))", "double", null),
new ColumnInfoImpl("avg(cpu)", "double", null),
new ColumnInfoImpl("ts", "date", null),
new ColumnInfoImpl("cluster", "keyword", null)
)
)
);
List<List<Object>> values = EsqlTestUtils.getValuesList(resp);
assertThat(values, hasSize(sortedKeys.size()));
for (int i = 0; i < sortedKeys.size(); i++) {
List<Object> row = values.get(i);
assertThat(row, hasSize(4));
var key = sortedKeys.get(i);
assertThat(row.get(2), equalTo(DEFAULT_DATE_TIME_FORMATTER.formatMillis(key.interval)));
assertThat(row.get(3), equalTo(key.cluster));
List<Double> rateBucket = rateBuckets.get(key);
if (rateBucket.isEmpty()) {
assertNull(row.get(0));
} else {
assertThat((double) row.get(0), closeTo(rateBucket.stream().mapToDouble(d -> d).sum(), 0.1));
}
List<Double> cpuBucket = cpuBuckets.get(key);
if (cpuBuckets.isEmpty()) {
assertNull(row.get(1));
} else {
double avg = cpuBucket.stream().mapToDouble(d -> d).sum() / cpuBucket.size();
assertThat((double) row.get(1), closeTo(avg, 0.1));
}
}
}
}

public void testApplyRateBeforeFinalGrouping() {
record RateKey(String cluster, String host) {

Expand Down
Loading