Skip to content

Commit 613bda8

Browse files
committed
First test case in prototype messy test file
1 parent f952838 commit 613bda8

File tree

7 files changed

+323
-5
lines changed

7 files changed

+323
-5
lines changed

test/framework/src/main/java/org/elasticsearch/datageneration/FieldType.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ public enum FieldType {
5050
TEXT("text"),
5151
IP("ip"),
5252
CONSTANT_KEYWORD("constant_keyword"),
53+
PASSTHROUGH("passthrough"), // For now this field type does not have default generators.
5354
WILDCARD("wildcard");
5455

5556
private final String name;
@@ -78,6 +79,7 @@ public FieldDataGenerator generator(String fieldName, DataSource dataSource) {
7879
case IP -> new IpFieldDataGenerator(dataSource);
7980
case CONSTANT_KEYWORD -> new ConstantKeywordFieldDataGenerator();
8081
case WILDCARD -> new WildcardFieldDataGenerator(dataSource);
82+
default -> null;
8183
};
8284
}
8385

test/framework/src/main/java/org/elasticsearch/datageneration/MappingGenerator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ public Mapping generate(Template template) {
6464

6565
rawMapping.put("_doc", topLevelMappingParameters);
6666

67-
if (specification.fullyDynamicMapping()) {
67+
if (specification.fullyDynamicMapping() == false) {
6868
// Has to be "true" for fully dynamic mapping
6969
topLevelMappingParameters.remove("dynamic");
7070

test/framework/src/main/java/org/elasticsearch/datageneration/datasource/DataSource.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public <T extends DataSourceResponse> T get(DataSourceRequest<T> request) {
4646
return response;
4747
}
4848
}
49-
50-
throw new IllegalStateException("Request is not supported by data source");
49+
throw new IllegalStateException("Request is not supported by data source. Request: " + request.toString() + "\n" +
50+
"Available handlers: " + handlers.stream().map(Object::getClass).map(Class::getName).toList().toString());
5151
}
5252
}

test/framework/src/main/java/org/elasticsearch/datageneration/datasource/DefaultMappingParametersHandler.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ public DataSourceResponse.LeafMappingParametersGenerator handle(DataSourceReques
4848
case IP -> ipMapping();
4949
case CONSTANT_KEYWORD -> constantKeywordMapping();
5050
case WILDCARD -> wildcardMapping();
51+
default -> throw new IllegalArgumentException("Unsupported field type: " + fieldType);
5152
});
5253
}
5354

test/framework/src/main/java/org/elasticsearch/datageneration/datasource/DefaultObjectGenerationHandler.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,11 @@
1212
import org.elasticsearch.datageneration.FieldType;
1313
import org.elasticsearch.test.ESTestCase;
1414

15+
import java.util.Arrays;
16+
import java.util.List;
1517
import java.util.Optional;
1618
import java.util.Set;
19+
import java.util.stream.Collectors;
1720

1821
import static org.elasticsearch.test.ESTestCase.randomAlphaOfLengthBetween;
1922
import static org.elasticsearch.test.ESTestCase.randomDouble;
@@ -56,12 +59,19 @@ public String generateFieldName() {
5659

5760
// UNSIGNED_LONG is excluded because it is mapped as long
5861
// and values larger than long fail to parse.
59-
private static final Set<FieldType> EXCLUDED_FROM_DYNAMIC_MAPPING = Set.of(FieldType.UNSIGNED_LONG);
62+
private static final Set<FieldType> EXCLUDED_FROM_DYNAMIC_MAPPING = Set.of(FieldType.UNSIGNED_LONG, FieldType.PASSTHROUGH);
6063

6164
@Override
6265
public DataSourceResponse.FieldTypeGenerator handle(DataSourceRequest.FieldTypeGenerator request) {
6366
return new DataSourceResponse.FieldTypeGenerator(
64-
() -> new DataSourceResponse.FieldTypeGenerator.FieldTypeInfo(ESTestCase.randomFrom(FieldType.values()).toString())
67+
() -> {
68+
// All field types minus the excluded ones.
69+
var fieldTypes = Arrays.stream(FieldType.values())
70+
.filter(fieldType -> EXCLUDED_FROM_DYNAMIC_MAPPING.contains(fieldType) == false)
71+
.collect(Collectors.toSet());
72+
var fieldType = ESTestCase.randomFrom(fieldTypes);
73+
return new DataSourceResponse.FieldTypeGenerator.FieldTypeInfo(fieldType.toString());
74+
}
6575
);
6676
}
6777

x-pack/plugin/esql/build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ dependencies {
3535
compileOnly project(':modules:lang-painless:spi')
3636
compileOnly project(xpackModule('esql-core'))
3737
compileOnly project(xpackModule('ml'))
38+
compileOnly project(path: xpackModule('mapper-aggregate-metric'))
39+
compileOnly project(path: xpackModule('downsample'))
3840
implementation project(xpackModule('kql'))
3941
implementation project('compute')
4042
implementation project('compute:ann')
Lines changed: 303 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,303 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.action;
9+
10+
import org.elasticsearch.Build;
11+
import org.elasticsearch.action.DocWriteRequest;
12+
import org.elasticsearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction;
13+
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
14+
import org.elasticsearch.cluster.metadata.DataStreamLifecycle;
15+
import org.elasticsearch.common.Randomness;
16+
import org.elasticsearch.common.Strings;
17+
import org.elasticsearch.common.bytes.BytesReference;
18+
import org.elasticsearch.common.compress.CompressedXContent;
19+
import org.elasticsearch.common.settings.Settings;
20+
import org.elasticsearch.common.xcontent.XContentHelper;
21+
import org.elasticsearch.core.Nullable;
22+
import org.elasticsearch.core.Tuple;
23+
import org.elasticsearch.datageneration.DataGeneratorSpecification;
24+
import org.elasticsearch.datageneration.DocumentGenerator;
25+
import org.elasticsearch.datageneration.FieldType;
26+
import org.elasticsearch.datageneration.Mapping;
27+
import org.elasticsearch.datageneration.MappingGenerator;
28+
import org.elasticsearch.datageneration.Template;
29+
import org.elasticsearch.datageneration.TemplateGenerator;
30+
import org.elasticsearch.datageneration.fields.PredefinedField;
31+
import org.elasticsearch.datastreams.DataStreamsPlugin;
32+
import org.elasticsearch.index.IndexMode;
33+
import org.elasticsearch.index.IndexSettings;
34+
import org.elasticsearch.plugins.Plugin;
35+
import org.elasticsearch.test.ESTestCase;
36+
import org.elasticsearch.xcontent.XContentBuilder;
37+
import org.elasticsearch.xcontent.XContentFactory;
38+
import org.elasticsearch.xcontent.XContentType;
39+
import org.elasticsearch.xpack.aggregatemetric.AggregateMetricMapperPlugin;
40+
import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
41+
import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
42+
import org.junit.Before;
43+
44+
import java.io.IOException;
45+
import java.time.Instant;
46+
import java.util.ArrayList;
47+
import java.util.Collection;
48+
import java.util.HashMap;
49+
import java.util.List;
50+
import java.util.Map;
51+
import java.util.stream.Collectors;
52+
import java.util.stream.IntStream;
53+
54+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
55+
import static org.hamcrest.Matchers.closeTo;
56+
import static org.hamcrest.Matchers.equalTo;
57+
58+
public class GenerativeTSIT extends AbstractEsqlIntegTestCase {
59+
60+
private static final String DATASTREAM_NAME = "tsit_ds";
61+
private List<XContentBuilder> documents = null;
62+
63+
static class DataGenerationHelper {
64+
65+
private static Object randomDimensionValue(String dimensionName) {
66+
// We use dimensionName to determine the type of the value.
67+
var isNumeric = dimensionName.hashCode() % 5 == 0;
68+
if (isNumeric) {
69+
// Numeric values are sometimes passed as integers and sometimes as strings.
70+
return ESTestCase.randomBoolean()
71+
? ESTestCase.randomIntBetween(1, 1000)
72+
: Integer.toString(ESTestCase.randomIntBetween(1, 1000));
73+
} else {
74+
return ESTestCase.randomAlphaOfLengthBetween(1, 20);
75+
}
76+
}
77+
78+
DataGenerationHelper() {
79+
// Metrics coming into our system have a pre-set group of attributes.
80+
List<String> attributesForMetrics = ESTestCase.randomList(1, 300, () -> ESTestCase.randomAlphaOfLengthBetween(1, 30));
81+
int numTimeSeries = ESTestCase.randomIntBetween(100, 1000); // TODO: Larger size of timeseries
82+
// allTimeSeries contains the list of dimension-values for each time series.
83+
List<List<Tuple<String, Object>>> allTimeSeries = IntStream.range(0, numTimeSeries).mapToObj(tsIdx -> {
84+
List<String> dimensionsInMetric = ESTestCase.randomNonEmptySubsetOf(attributesForMetrics);
85+
// TODO: How do we handle the case when there are no dimensions?
86+
return dimensionsInMetric.stream().map(attr -> new Tuple<>(attr, randomDimensionValue(attr))).collect(Collectors.toList());
87+
}).toList();
88+
89+
spec = DataGeneratorSpecification.builder()
90+
.withPredefinedFields(
91+
List.of(
92+
new PredefinedField.WithGenerator(
93+
"@timestamp",
94+
FieldType.DATE,
95+
Map.of("type", "date"),
96+
fieldMapping -> ESTestCase.randomInstantBetween(Instant.now().minusSeconds(2 * 60 * 60), Instant.now())
97+
),
98+
new PredefinedField.WithGenerator(
99+
"attributes",
100+
FieldType.PASSTHROUGH,
101+
Map.of("type", "passthrough", "time_series_dimension", true, "dynamic", true, "priority", 1),
102+
(ignored) -> {
103+
var tsDimensions = ESTestCase.randomFrom(allTimeSeries);
104+
return tsDimensions.stream().collect(Collectors.toMap(Tuple::v1, Tuple::v2));
105+
}
106+
),
107+
// TODO: How do we add a `metrics` top field that contains all metrics and indexes
108+
// them dynamically?
109+
new PredefinedField.WithGenerator(
110+
"metrics",
111+
FieldType.PASSTHROUGH,
112+
Map.of("type", "passthrough", "dynamic", true, "priority", 10),
113+
(ignored) -> Map.of("gauge_hdd.bytes.used", Randomness.get().nextLong(0, 1000000000L))
114+
)
115+
)
116+
)
117+
.build();
118+
119+
documentGenerator = new DocumentGenerator(spec);
120+
template = new TemplateGenerator(spec).generate();
121+
mapping = new MappingGenerator(spec).generate(template);
122+
var doc = mapping.raw().get("_doc");
123+
@SuppressWarnings("unchecked")
124+
Map<String, Object> docMap = ((Map<String, Object>) doc);
125+
// Add dynamic templates to the mapping
126+
docMap.put(
127+
"dynamic_templates",
128+
List.of(
129+
Map.of(
130+
"counter_long",
131+
Map.of("path_match", "metrics.counter_*", "mapping", Map.of("type", "long", "time_series_metric", "counter"))
132+
),
133+
Map.of(
134+
"gauge_long",
135+
Map.of("path_match", "metrics.gauge_*", "mapping", Map.of("type", "long", "time_series_metric", "gauge"))
136+
),
137+
Map.of(
138+
"counter_double",
139+
Map.of("path_match", "metrics.counter_*", "mapping", Map.of("type", "double", "time_series_metric", "counter"))
140+
),
141+
Map.of(
142+
"gauge_double",
143+
Map.of("path_match", "metrics.gauge_*", "mapping", Map.of("type", "double", "time_series_metric", "gauge"))
144+
)
145+
)
146+
);
147+
}
148+
149+
final DataGeneratorSpecification spec;
150+
final DocumentGenerator documentGenerator;
151+
final Template template;
152+
final Mapping mapping;
153+
154+
XContentBuilder generateDocument(Map<String, Object> additionalFields) throws IOException {
155+
var doc = XContentFactory.jsonBuilder();
156+
var generated = documentGenerator.generate(template, mapping);
157+
generated.putAll(additionalFields);
158+
159+
doc.map(generated);
160+
return doc;
161+
}
162+
}
163+
164+
Map<List<String>, List<Map<String, Object>>> groupedRows(
165+
List<XContentBuilder> docs,
166+
List<String> groupingAttributes,
167+
int secondsInWindow
168+
) {
169+
Map<List<String>, List<Map<String, Object>>> groupedMap = new HashMap<>();
170+
for (XContentBuilder doc : docs) {
171+
Map<String, Object> docMap = XContentHelper.convertToMap(BytesReference.bytes(doc), false, XContentType.JSON).v2();
172+
@SuppressWarnings("unchecked")
173+
List<String> groupingValues = groupingAttributes.stream()
174+
.map(attr -> ((Map<String, Object>) docMap.get("attributes")).get(attr).toString())
175+
.collect(Collectors.toList());
176+
// TODO: Verify that this window start calculation is correct.
177+
long timeBucketStart = Instant.parse(((String) docMap.get("@timestamp"))).toEpochMilli() / 1000 / secondsInWindow
178+
* secondsInWindow;
179+
String key = String.join("|", groupingValues) + "|" + timeBucketStart;
180+
// TODO: Why use this pipe syntax lol
181+
groupedMap.computeIfAbsent(List.of(key), k -> new ArrayList<>()).add(docMap);
182+
}
183+
return groupedMap;
184+
}
185+
186+
@Override
187+
public EsqlQueryResponse run(EsqlQueryRequest request) {
188+
assumeTrue("time series available in snapshot builds only", Build.current().isSnapshot());
189+
return super.run(request);
190+
}
191+
192+
@Override
193+
protected Collection<Class<? extends Plugin>> nodePlugins() {
194+
return List.of(
195+
DataStreamsPlugin.class,
196+
LocalStateCompositeXPackPlugin.class,
197+
// Downsample.class, // TODO(pabloem): What are these
198+
AggregateMetricMapperPlugin.class,
199+
EsqlPlugin.class
200+
);
201+
}
202+
203+
void putTSDBIndexTemplate(
204+
String id,
205+
List<String> patterns,
206+
@Nullable Settings settings,
207+
@Nullable String mappingString,
208+
@Nullable DataStreamLifecycle.Template lifecycle,
209+
@Nullable Map<String, Object> metadata
210+
) throws IOException {
211+
Settings.Builder settingsBuilder = Settings.builder();
212+
if (settings != null) {
213+
settingsBuilder.put(settings);
214+
}
215+
// Ensure it will be a TSDB data stream
216+
settingsBuilder.put(IndexSettings.MODE.getKey(), IndexMode.TIME_SERIES);
217+
settingsBuilder.putList("index.routing_path", List.of("attributes.*"));
218+
CompressedXContent mappings = mappingString == null ? null : CompressedXContent.fromJSON(mappingString);
219+
// print the mapping
220+
TransportPutComposableIndexTemplateAction.Request request = new TransportPutComposableIndexTemplateAction.Request(id);
221+
request.indexTemplate(
222+
ComposableIndexTemplate.builder()
223+
.indexPatterns(patterns)
224+
.template(
225+
org.elasticsearch.cluster.metadata.Template.builder().settings(settingsBuilder).mappings(mappings).lifecycle(lifecycle)
226+
)
227+
.metadata(metadata)
228+
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate())
229+
.build()
230+
);
231+
assertAcked(client().execute(TransportPutComposableIndexTemplateAction.TYPE, request));
232+
}
233+
234+
@Before
235+
public void populateIndex() throws IOException {
236+
var dataGenHelper = new DataGenerationHelper();
237+
final XContentBuilder builder = XContentFactory.jsonBuilder();
238+
builder.map(dataGenHelper.mapping.raw());
239+
// print the mapping
240+
System.out.println("PABLO Data stream mapping: " + Strings.toString(builder));
241+
final String jsonMappings = Strings.toString(builder);
242+
243+
putTSDBIndexTemplate(DATASTREAM_NAME, List.of(DATASTREAM_NAME + "*"), null, jsonMappings, null, null);
244+
// Now we can push data into the data stream.
245+
for (int i = 0; i < 1000; i++) {
246+
var document = dataGenHelper.generateDocument(Map.of());
247+
if (documents == null) {
248+
documents = new ArrayList<>();
249+
}
250+
documents.add(document);
251+
var indexRequest = client().prepareIndex(DATASTREAM_NAME).setOpType(DocWriteRequest.OpType.CREATE).setSource(document);
252+
indexRequest.setRefreshPolicy(org.elasticsearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE);
253+
indexRequest.get();
254+
}
255+
}
256+
257+
public void testGroupByNothing() {
258+
try (
259+
var resp = run(
260+
String.format(
261+
"""
262+
TS %s
263+
| STATS max(max_over_time(metrics.gauge_hdd.bytes.used)), avg(avg_over_time(metrics.gauge_hdd.bytes.used)), min(min_over_time(metrics.gauge_hdd.bytes.used)) BY tbucket=bucket(@timestamp, 1 minute)
264+
| SORT tbucket
265+
| LIMIT 100""",
266+
DATASTREAM_NAME
267+
)
268+
)
269+
) {
270+
List<List<Object>> rows = new ArrayList<>();
271+
resp.rows().forEach(rowIter -> {
272+
List<Object> row = new ArrayList<>();
273+
rowIter.forEach(row::add);
274+
rows.add(row);
275+
});
276+
var groups = groupedRows(documents, List.of(), 60);
277+
for (int i = 0; i < rows.size(); i++) {
278+
var row = rows.get(i);
279+
var windowStart = Instant.parse((String) row.getLast()).toEpochMilli() / 1000 / 60 * 60;
280+
var windowDataPoints = groups.get(List.of(String.format("|%d", windowStart)));
281+
@SuppressWarnings("unchecked")
282+
var docValues = windowDataPoints.stream()
283+
.map(doc -> ((Map<String, Integer>) doc.get("metrics")).get("gauge_hdd.bytes.used"))
284+
.toList();
285+
docValues.stream().max(Integer::compareTo).ifPresentOrElse(maxValue -> {
286+
var res = ((Long) row.getFirst()).intValue();
287+
assertThat(res, equalTo(maxValue));
288+
}, () -> { throw new AssertionError("No values found for window starting at " + windowStart); });
289+
docValues.stream().mapToDouble(Integer::doubleValue).average().ifPresentOrElse(avgValue -> {
290+
var res = (Double) row.get(1);
291+
assertThat(res, closeTo(avgValue, res * 0.5));
292+
}, () -> {
293+
;
294+
throw new AssertionError("No values found for window starting at " + windowStart);
295+
});
296+
docValues.stream().min(Integer::compareTo).ifPresentOrElse(minValue -> {
297+
var res = ((Long) row.get(2)).intValue();
298+
assertThat(res, equalTo(minValue));
299+
}, () -> { throw new AssertionError("No values found for window starting at " + windowStart); });
300+
}
301+
}
302+
}
303+
}

0 commit comments

Comments
 (0)