-
Notifications
You must be signed in to change notification settings - Fork 25.4k
A random-random test for time-series data #132556
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 9 commits
613bda8
2d70cef
19e2f8a
14b4c17
836d49a
64624be
77a368b
f53f904
afdac85
c384e0c
f5bfbf1
820d950
3d9325a
0c02d2a
9fc14e1
0e2de20
51b4083
46c3feb
82a17e9
2349873
079f73a
5249814
b36fcf3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,258 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License | ||
* 2.0; you may not use this file except in compliance with the Elastic License | ||
* 2.0. | ||
*/ | ||
|
||
package org.elasticsearch.xpack.esql.action; | ||
|
||
import org.elasticsearch.Build; | ||
import org.elasticsearch.action.DocWriteRequest; | ||
import org.elasticsearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction; | ||
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; | ||
import org.elasticsearch.common.Strings; | ||
import org.elasticsearch.common.bytes.BytesReference; | ||
import org.elasticsearch.common.compress.CompressedXContent; | ||
import org.elasticsearch.common.settings.Settings; | ||
import org.elasticsearch.common.xcontent.XContentHelper; | ||
import org.elasticsearch.core.Nullable; | ||
import org.elasticsearch.core.Tuple; | ||
import org.elasticsearch.datastreams.DataStreamsPlugin; | ||
import org.elasticsearch.index.IndexMode; | ||
import org.elasticsearch.index.IndexSettings; | ||
import org.elasticsearch.plugins.Plugin; | ||
import org.elasticsearch.test.ESTestCase; | ||
import org.elasticsearch.xcontent.XContentBuilder; | ||
import org.elasticsearch.xcontent.XContentFactory; | ||
import org.elasticsearch.xcontent.XContentType; | ||
import org.elasticsearch.xpack.aggregatemetric.AggregateMetricMapperPlugin; | ||
import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin; | ||
import org.elasticsearch.xpack.esql.plugin.EsqlPlugin; | ||
import org.junit.Before; | ||
|
||
import java.io.IOException; | ||
import java.time.Instant; | ||
import java.util.ArrayList; | ||
import java.util.Collection; | ||
import java.util.HashMap; | ||
import java.util.List; | ||
import java.util.Locale; | ||
import java.util.Map; | ||
import java.util.stream.Collectors; | ||
|
||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; | ||
import static org.hamcrest.Matchers.closeTo; | ||
import static org.hamcrest.Matchers.equalTo; | ||
|
||
public class GenerativeTSIT extends AbstractEsqlIntegTestCase { | ||
pabloem marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
private static final Long NUM_DOCS = 1000L; | ||
private static final String DATASTREAM_NAME = "tsit_ds"; | ||
private List<XContentBuilder> documents = null; | ||
private TSDataGenerationHelper dataGenerationHelper; | ||
|
||
Map<List<String>, List<Map<String, Object>>> groupedRows( | ||
List<XContentBuilder> docs, | ||
List<String> groupingAttributes, | ||
int secondsInWindow | ||
) { | ||
Map<List<String>, List<Map<String, Object>>> groupedMap = new HashMap<>(); | ||
for (XContentBuilder doc : docs) { | ||
Map<String, Object> docMap = XContentHelper.convertToMap(BytesReference.bytes(doc), false, XContentType.JSON).v2(); | ||
@SuppressWarnings("unchecked") | ||
List<String> groupingPairs = groupingAttributes.stream() | ||
.map( | ||
attr -> Tuple.tuple( | ||
attr, | ||
((Map<String, Object>) docMap.getOrDefault("attributes", Map.of())).getOrDefault(attr, "").toString() | ||
) | ||
) | ||
.filter(val -> val.v2().isEmpty() == false) // Filter out empty values | ||
.map(tup -> tup.v1() + ":" + tup.v2()) | ||
.toList(); | ||
pabloem marked this conversation as resolved.
Show resolved
Hide resolved
|
||
long timeBucketStart = windowStart(docMap.get("@timestamp"), secondsInWindow); | ||
var keyList = new ArrayList<>(groupingPairs); | ||
keyList.add(Long.toString(timeBucketStart)); | ||
groupedMap.computeIfAbsent(keyList, k -> new ArrayList<>()).add(docMap); | ||
} | ||
return groupedMap; | ||
} | ||
|
||
static Long windowStart(Object timestampCell, int secondsInWindow) { | ||
// The timestamp is in the 4th column (index 3) | ||
return Instant.parse((String) timestampCell).toEpochMilli() / 1000 / secondsInWindow * secondsInWindow; | ||
pabloem marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
static List<String> getRowKey(List<Object> row, List<String> groupingAttributes) { | ||
List<String> rowKey = new ArrayList<>(); | ||
for (int i = 0; i < groupingAttributes.size(); i++) { | ||
Object value = row.get(i + 4); // Skip the first four columns | ||
if (value != null) { | ||
rowKey.add(groupingAttributes.get(i) + ":" + value); | ||
} | ||
} | ||
rowKey.add(Long.toString(Instant.parse((String) row.get(3)).toEpochMilli() / 1000)); | ||
return rowKey; | ||
} | ||
|
||
@Override | ||
public EsqlQueryResponse run(EsqlQueryRequest request) { | ||
assumeTrue("time series available in snapshot builds only", Build.current().isSnapshot()); | ||
return super.run(request); | ||
} | ||
|
||
@Override | ||
protected Collection<Class<? extends Plugin>> nodePlugins() { | ||
return List.of( | ||
DataStreamsPlugin.class, | ||
LocalStateCompositeXPackPlugin.class, | ||
// Downsample.class, // TODO(pabloem): What are these | ||
AggregateMetricMapperPlugin.class, | ||
EsqlPlugin.class | ||
); | ||
} | ||
|
||
void putTSDBIndexTemplate(List<String> patterns, @Nullable String mappingString) throws IOException { | ||
Settings.Builder settingsBuilder = Settings.builder(); | ||
// Ensure it will be a TSDB data stream | ||
settingsBuilder.put(IndexSettings.MODE.getKey(), IndexMode.TIME_SERIES); | ||
settingsBuilder.putList("index.routing_path", List.of("attributes.*")); | ||
pabloem marked this conversation as resolved.
Show resolved
Hide resolved
|
||
CompressedXContent mappings = mappingString == null ? null : CompressedXContent.fromJSON(mappingString); | ||
// print the mapping | ||
pabloem marked this conversation as resolved.
Show resolved
Hide resolved
|
||
TransportPutComposableIndexTemplateAction.Request request = new TransportPutComposableIndexTemplateAction.Request( | ||
GenerativeTSIT.DATASTREAM_NAME | ||
); | ||
request.indexTemplate( | ||
ComposableIndexTemplate.builder() | ||
.indexPatterns(patterns) | ||
.template(org.elasticsearch.cluster.metadata.Template.builder().settings(settingsBuilder).mappings(mappings)) | ||
.metadata(null) | ||
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate()) | ||
.build() | ||
); | ||
assertAcked(client().execute(TransportPutComposableIndexTemplateAction.TYPE, request)); | ||
} | ||
|
||
@Before | ||
public void populateIndex() throws IOException { | ||
dataGenerationHelper = new TSDataGenerationHelper(NUM_DOCS); | ||
final XContentBuilder builder = XContentFactory.jsonBuilder(); | ||
builder.map(dataGenerationHelper.mapping.raw()); | ||
final String jsonMappings = Strings.toString(builder); | ||
|
||
putTSDBIndexTemplate(List.of(DATASTREAM_NAME + "*"), jsonMappings); | ||
// Now we can push data into the data stream. | ||
for (int i = 0; i < NUM_DOCS; i++) { | ||
var document = dataGenerationHelper.generateDocument(Map.of()); | ||
if (documents == null) { | ||
documents = new ArrayList<>(); | ||
} | ||
documents.add(document); | ||
var indexRequest = client().prepareIndex(DATASTREAM_NAME).setOpType(DocWriteRequest.OpType.CREATE).setSource(document); | ||
indexRequest.setRefreshPolicy(org.elasticsearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE); | ||
indexRequest.get(); | ||
} | ||
} | ||
|
||
/** | ||
* This test validates Gauge metrics aggregation with grouping by time bucket and a subset of dimensions. | ||
* The subset of dimensions is a random subset of the dimensions present in the data. | ||
* The test checks that the max, min, and avg values of the gauge metric - and calculates | ||
* the same values from the documents in the group. | ||
*/ | ||
public void testGroupBySubset() { | ||
pabloem marked this conversation as resolved.
Show resolved
Hide resolved
|
||
var dimensions = ESTestCase.randomNonEmptySubsetOf(dataGenerationHelper.attributesForMetrics); | ||
var dimensionsStr = dimensions.stream().map(d -> "attributes." + d).collect(Collectors.joining(", ")); | ||
try (EsqlQueryResponse resp = run(String.format(Locale.ROOT, """ | ||
TS %s | ||
| STATS max(max_over_time(metrics.gauge_hdd.bytes.used)), | ||
min(min_over_time(metrics.gauge_hdd.bytes.used)), | ||
avg(avg_over_time(metrics.gauge_hdd.bytes.used)) | ||
BY tbucket=bucket(@timestamp, 1 minute), %s | ||
| SORT tbucket | ||
| LIMIT 1000""", DATASTREAM_NAME, dimensionsStr))) { | ||
var groups = groupedRows(documents, dimensions, 60); | ||
List<List<Object>> rows = new ArrayList<>(); | ||
resp.rows().forEach(rowIter -> { | ||
List<Object> row = new ArrayList<>(); | ||
rowIter.forEach(row::add); | ||
rows.add(row); | ||
}); | ||
for (List<Object> row : rows) { | ||
var rowKey = getRowKey(row, dimensions); | ||
List<Map<String, Object>> pointsInGroup = groups.get(rowKey); | ||
@SuppressWarnings("unchecked") | ||
var docValues = pointsInGroup.stream() | ||
.map(doc -> ((Map<String, Integer>) doc.get("metrics")).get("gauge_hdd.bytes.used")) | ||
.toList(); | ||
// Verify that the first column is the max value (the query gets max, avg, min in that order) | ||
docValues.stream().max(Integer::compareTo).ifPresentOrElse(maxValue -> { | ||
pabloem marked this conversation as resolved.
Show resolved
Hide resolved
|
||
var res = ((Long) row.getFirst()).intValue(); | ||
assertThat(res, equalTo(maxValue)); | ||
}, () -> { throw new AssertionError("No values found for group: " + rowKey); }); | ||
// Verify that the second column is the min value (thus why row.get(1)) | ||
docValues.stream().min(Integer::compareTo).ifPresentOrElse(minValue -> { | ||
var res = ((Long) row.get(1)).intValue(); | ||
assertThat(res, equalTo(minValue)); | ||
}, () -> { throw new AssertionError("No values found for group: " + rowKey); }); | ||
// Verify that the second column is the avg value (thus why row.get(2)) | ||
docValues.stream().mapToDouble(Integer::doubleValue).average().ifPresentOrElse(avgValue -> { | ||
var res = (Double) row.get(2); | ||
assertThat(res, closeTo(avgValue, res * 0.5)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we need the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. that was a mistake (I meant to do 5% not 50%). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There should be no error here ( |
||
}, () -> { throw new AssertionError("No values found for group: " + rowKey); }); | ||
} | ||
} | ||
} | ||
|
||
/** | ||
* This test validates Gauge metrics aggregation with grouping by time bucket only. | ||
* The test checks that the max, min, and avg values of the gauge metric - and calculates | ||
* the same values from the documents in the group. Because there is no grouping by dimensions, | ||
* there is only one metric group per time bucket. | ||
*/ | ||
public void testGroupByNothing() { | ||
try (EsqlQueryResponse resp = run(String.format(Locale.ROOT, """ | ||
pabloem marked this conversation as resolved.
Show resolved
Hide resolved
|
||
TS %s | ||
| STATS | ||
max(max_over_time(metrics.gauge_hdd.bytes.used)), | ||
avg(avg_over_time(metrics.gauge_hdd.bytes.used)), | ||
min(min_over_time(metrics.gauge_hdd.bytes.used)) BY tbucket=bucket(@timestamp, 1 minute) | ||
| SORT tbucket | ||
| LIMIT 1000""", DATASTREAM_NAME))) { | ||
List<List<Object>> rows = new ArrayList<>(); | ||
resp.rows().forEach(rowIter -> { | ||
List<Object> row = new ArrayList<>(); | ||
rowIter.forEach(row::add); | ||
rows.add(row); | ||
}); | ||
var groups = groupedRows(documents, List.of(), 60); | ||
for (List<Object> row : rows) { | ||
var windowStart = windowStart(row.get(3), 60); | ||
List<Map<String, Object>> windowDataPoints = groups.get(List.of(Long.toString(windowStart))); | ||
@SuppressWarnings("unchecked") | ||
var docValues = windowDataPoints.stream() | ||
.map(doc -> ((Map<String, Integer>) doc.get("metrics")).get("gauge_hdd.bytes.used")) | ||
.toList(); | ||
// Verify that the first column is the max value (the query gets max, avg, min in that order) | ||
pabloem marked this conversation as resolved.
Show resolved
Hide resolved
|
||
docValues.stream().max(Integer::compareTo).ifPresentOrElse(maxValue -> { | ||
var res = ((Long) row.getFirst()).intValue(); | ||
assertThat(res, equalTo(maxValue)); | ||
}, () -> { throw new AssertionError("No values found for window starting at " + windowStart); }); | ||
// Verify that the second column is the avg value (thus why row.get(1)) | ||
docValues.stream().mapToDouble(Integer::doubleValue).average().ifPresentOrElse(avgValue -> { | ||
var res = (Double) row.get(1); | ||
assertThat(res, closeTo(avgValue, res * 0.5)); | ||
}, () -> { | ||
; | ||
throw new AssertionError("No values found for window starting at " + windowStart); | ||
}); | ||
// Verify that the third column is the min value (thus why row.get(2)) | ||
docValues.stream().min(Integer::compareTo).ifPresentOrElse(minValue -> { | ||
var res = ((Long) row.get(2)).intValue(); | ||
assertThat(res, equalTo(minValue)); | ||
}, () -> { throw new AssertionError("No values found for window starting at " + windowStart); }); | ||
} | ||
} | ||
} | ||
} |
Uh oh!
There was an error while loading. Please reload this page.