Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;

import static org.junit.Assert.assertNotNull;
Expand Down Expand Up @@ -150,7 +151,7 @@ public SingletonOrdsBuilder appendOrd(int value) {

@Override
public BlockLoader.AggregateMetricDoubleBuilder aggregateMetricDoubleBuilder(int count) {
return null;
return new AggregateMetricDoubleBlockBuilder();
}
};
}
Expand Down Expand Up @@ -243,4 +244,98 @@ public void close() {
// TODO assert that we close the test block builders
}
}

/**
* Test implementation of {@link org.elasticsearch.index.mapper.BlockLoader.AggregateMetricDoubleBuilder}.
* The implementation here is fairly close to the production one.
*/
private static class AggregateMetricDoubleBlockBuilder implements BlockLoader.AggregateMetricDoubleBuilder {
private final DoubleBuilder min = new DoubleBuilder();
private final DoubleBuilder max = new DoubleBuilder();
private final DoubleBuilder sum = new DoubleBuilder();
private final IntBuilder count = new IntBuilder();

private static class DoubleBuilder extends TestBlock.Builder implements BlockLoader.DoubleBuilder {
@Override
public BlockLoader.DoubleBuilder appendDouble(double value) {
add(value);
return this;
}
}

private static class IntBuilder extends TestBlock.Builder implements BlockLoader.IntBuilder {
@Override
public BlockLoader.IntBuilder appendInt(int value) {
add(value);
return this;
}
}

@Override
public BlockLoader.DoubleBuilder min() {
return min;
}

@Override
public BlockLoader.DoubleBuilder max() {
return max;
}

@Override
public BlockLoader.DoubleBuilder sum() {
return sum;
}

@Override
public BlockLoader.IntBuilder count() {
return count;
}

@Override
public BlockLoader.Block build() {
var minBlock = min.build();
var maxBlock = max.build();
var sumBlock = sum.build();
var countBlock = count.build();

assert minBlock.size() == maxBlock.size();
assert maxBlock.size() == sumBlock.size();
assert sumBlock.size() == countBlock.size();

var values = new ArrayList<>(minBlock.size());

for (int i = 0; i < minBlock.size(); i++) {
// we need to represent this complex block somehow
var value = new HashMap<String, Object>();
value.put("min", minBlock.values.get(i));
value.put("max", maxBlock.values.get(i));
value.put("sum", sumBlock.values.get(i));
value.put("value_count", countBlock.values.get(i));

values.add(value);
}

return new TestBlock(values);
}

@Override
public BlockLoader.Builder appendNull() {
throw new UnsupportedOperationException();
}

@Override
public BlockLoader.Builder beginPositionEntry() {
throw new UnsupportedOperationException();
}

@Override
public BlockLoader.Builder endPositionEntry() {
throw new UnsupportedOperationException();
}

@Override
public void close() {

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ default DataSourceResponse.VersionStringGenerator handle(DataSourceRequest.Versi
return null;
}

default DataSourceResponse.AggregateMetricDoubleGenerator handle(DataSourceRequest.AggregateMetricDoubleGenerator request) {
return null;
}

default DataSourceResponse.NullWrapper handle(DataSourceRequest.NullWrapper request) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,12 @@ public DataSourceResponse.VersionStringGenerator accept(DataSourceHandler handle
}
}

record AggregateMetricDoubleGenerator() implements DataSourceRequest<DataSourceResponse.AggregateMetricDoubleGenerator> {
public DataSourceResponse.AggregateMetricDoubleGenerator accept(DataSourceHandler handler) {
return handler.handle(this);
}
}

record NullWrapper() implements DataSourceRequest<DataSourceResponse.NullWrapper> {
public DataSourceResponse.NullWrapper accept(DataSourceHandler handler) {
return handler.handle(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ record IpGenerator(Supplier<InetAddress> generator) implements DataSourceRespons

record VersionStringGenerator(Supplier<String> generator) implements DataSourceResponse {}

record AggregateMetricDoubleGenerator(Supplier<Map<String, Number>> generator) implements DataSourceResponse {}

record NullWrapper(Function<Supplier<Object>, Supplier<Object>> wrapper) implements DataSourceResponse {}

record ArrayWrapper(Function<Supplier<Object>, Supplier<Object>> wrapper) implements DataSourceResponse {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -851,6 +851,7 @@ protected void parseCreateField(DocumentParserContext context) throws IOExceptio
// by its FieldMapper#parse()
throw e;
}

for (Map.Entry<Metric, Number> parsed : metricsParsed.entrySet()) {
NumberFieldMapper delegateFieldMapper = metricFieldMappers.get(parsed.getKey());
delegateFieldMapper.indexValue(context, parsed.getValue());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.aggregatemetric.mapper;

import org.elasticsearch.index.mapper.BlockLoaderTestCase;
import org.elasticsearch.logsdb.datageneration.datasource.DataSourceHandler;
import org.elasticsearch.logsdb.datageneration.datasource.DataSourceRequest;
import org.elasticsearch.logsdb.datageneration.datasource.DataSourceResponse;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.xpack.aggregatemetric.AggregateMetricMapperPlugin;
import org.elasticsearch.xpack.aggregatemetric.mapper.datageneration.AggregateMetricDoubleDataSourceHandler;

import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

public class AggregateMetricDoubleFieldBlockLoaderTests extends BlockLoaderTestCase {
public AggregateMetricDoubleFieldBlockLoaderTests(Params params) {
super("aggregate_metric_double", List.of(new AggregateMetricDoubleDataSourceHandler(), new DataSourceHandler() {
@Override
public DataSourceResponse.ObjectArrayGenerator handle(DataSourceRequest.ObjectArrayGenerator request) {
// aggregate_metric_double does not support multiple values in a document so we can't have object arrays
return new DataSourceResponse.ObjectArrayGenerator(Optional::empty);
}
}), params);
}

@Override
protected Object expected(Map<String, Object> fieldMapping, Object value, TestContext testContext) {
if (value instanceof Map<?, ?> map) {
var expected = new HashMap<String, Object>(map.size());

// put explicit `null` for metrics that are not present, this is how the block looks like
Arrays.stream(AggregateMetricDoubleFieldMapper.Metric.values())
.map(AggregateMetricDoubleFieldMapper.Metric::toString)
.sorted()
.forEach(m -> {
expected.put(m, map.get(m));
});

return expected;
}

// malformed or null, return "empty row"
return new HashMap<>() {
{
put("min", null);
put("max", null);
put("sum", null);
put("value_count", null);
}
};
}

@Override
protected Collection<? extends Plugin> getPlugins() {
return List.of(new AggregateMetricMapperPlugin());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.aggregatemetric.mapper.datageneration;

import org.elasticsearch.logsdb.datageneration.datasource.DataSourceHandler;
import org.elasticsearch.logsdb.datageneration.datasource.DataSourceRequest;
import org.elasticsearch.logsdb.datageneration.datasource.DataSourceResponse;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.aggregatemetric.mapper.AggregateMetricDoubleFieldMapper;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;

public class AggregateMetricDoubleDataSourceHandler implements DataSourceHandler {
@Override
public DataSourceResponse.AggregateMetricDoubleGenerator handle(DataSourceRequest.AggregateMetricDoubleGenerator request) {
return new DataSourceResponse.AggregateMetricDoubleGenerator(() -> {
var metricContainer = new HashMap<String, Number>();

// min and max must make sense - max has to be gte min
double min = ESTestCase.randomDoubleBetween(-Double.MAX_VALUE, 1_000_000_000, false);
double max = ESTestCase.randomDoubleBetween(min, Double.MAX_VALUE, true);

metricContainer.put("min", min);
metricContainer.put("max", max);
metricContainer.put("sum", ESTestCase.randomDouble());
metricContainer.put("value_count", ESTestCase.randomIntBetween(1, Integer.MAX_VALUE));

return metricContainer;
});
}

@Override
public DataSourceResponse.LeafMappingParametersGenerator handle(DataSourceRequest.LeafMappingParametersGenerator request) {
if (request.fieldType().equals("aggregate_metric_double") == false) {
return null;
}

return new DataSourceResponse.LeafMappingParametersGenerator(() -> {
var map = new HashMap<String, Object>();

List<AggregateMetricDoubleFieldMapper.Metric> metrics = ESTestCase.randomNonEmptySubsetOf(
Arrays.asList(AggregateMetricDoubleFieldMapper.Metric.values())
);

map.put("metrics", metrics.stream().map(Enum::toString).toList());
map.put("default_metric", metrics.get(ESTestCase.randomIntBetween(0, metrics.size() - 1)));

if (ESTestCase.randomBoolean()) {
map.put("ignore_malformed", ESTestCase.randomBoolean());
}

return map;
});
}

@Override
public DataSourceResponse.FieldDataGenerator handle(DataSourceRequest.FieldDataGenerator request) {
if (request.fieldType().equals("aggregate_metric_double") == false) {
return null;
}

return new DataSourceResponse.FieldDataGenerator(new AggregateMetricDoubleFieldDataGenerator(request.dataSource()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.aggregatemetric.mapper.datageneration;

import org.elasticsearch.logsdb.datageneration.FieldDataGenerator;
import org.elasticsearch.logsdb.datageneration.datasource.DataSource;
import org.elasticsearch.logsdb.datageneration.datasource.DataSourceRequest;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.function.Supplier;

public class AggregateMetricDoubleFieldDataGenerator implements FieldDataGenerator {
private final Supplier<Map<String, Number>> metrics;
private final Function<Supplier<Object>, Supplier<Object>> nullWrapper;
private final Function<Supplier<Object>, Supplier<Object>> nullAndMalformedWrapper;

public AggregateMetricDoubleFieldDataGenerator(DataSource dataSource) {
this.metrics = dataSource.get(new DataSourceRequest.AggregateMetricDoubleGenerator()).generator();

this.nullWrapper = dataSource.get(new DataSourceRequest.NullWrapper()).wrapper();

var strings = dataSource.get(new DataSourceRequest.StringGenerator()).generator();
var malformed = dataSource.get(new DataSourceRequest.MalformedWrapper(strings::get)).wrapper();
this.nullAndMalformedWrapper = malformed.andThen(nullWrapper);
}

@Override
@SuppressWarnings("unchecked")
public Object generateValue(Map<String, Object> fieldMapping) {
if (fieldMapping == null) {
// this field can't be properly mapped with dynamic mapping
return null;
}

// metrics returned have all metric fields but we only need those that appear in the mapping
Supplier<Object> metricsAdjustedForMapping = () -> {
var metric = metrics.get();

var adjusted = new HashMap<String, Number>();
for (var metricName : (List<String>) fieldMapping.get("metrics")) {
adjusted.put(metricName, metric.get(metricName));
}

return adjusted;
};

if ((Boolean) fieldMapping.getOrDefault("ignore_malformed", false)) {
return nullAndMalformedWrapper.apply(metricsAdjustedForMapping).get();
}

return nullWrapper.apply(metricsAdjustedForMapping).get();
}
}