Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,16 @@

package org.elasticsearch.action.admin.cluster.stats;

import org.elasticsearch.common.TriConsumer;

import java.util.HashMap;
import java.util.Map;
import java.util.function.BiConsumer;

public final class MappingVisitor {
public static final String PROPERTIES = "properties";
public static final String FIELD_TYPE = "type";
public static final String MULTI_FIELDS = "fields";

private MappingVisitor() {}

Expand All @@ -25,7 +31,7 @@ private static void visitMapping(
final String path,
final BiConsumer<String, Map<String, ?>> fieldMappingConsumer
) {
Object properties = mapping.get("properties");
Object properties = mapping.get(PROPERTIES);
if (properties instanceof Map) {
@SuppressWarnings("unchecked")
Map<String, ?> propertiesAsMap = (Map<String, ?>) properties;
Expand All @@ -40,7 +46,7 @@ private static void visitMapping(
visitMapping(fieldMapping, prefix + ".", fieldMappingConsumer);

// Multi fields
Object fieldsO = fieldMapping.get("fields");
Object fieldsO = fieldMapping.get(MULTI_FIELDS);
if (fieldsO instanceof Map) {
@SuppressWarnings("unchecked")
Map<String, ?> fields = (Map<String, ?>) fieldsO;
Expand Down Expand Up @@ -75,4 +81,67 @@ public static void visitRuntimeMapping(Map<String, ?> mapping, BiConsumer<String
runtimeFieldMappingConsumer.accept(entry.getKey(), runtimeFieldMapping);
}
}

/**
* This visitor traverses the source mapping and copies the structure to the destination mapping after applying
* the fieldMappingConsumer to the individual field mappings.
*/
public static void visitAndCopyMapping(
final Map<String, ?> sourceMapping,
final Map<String, Object> destMapping,
final TriConsumer<String, Map<String, ?>, Map<String, Object>> fieldMappingConsumer
) {
Map<String, ?> sourceProperties = getMapOrNull(sourceMapping.get(PROPERTIES));
if (sourceProperties == null) {
return;
}
Map<String, Object> destProperties = new HashMap<>(sourceProperties.size());
destMapping.put(PROPERTIES, destProperties);

for (Map.Entry<String, ?> entry : sourceProperties.entrySet()) {
Map<String, ?> sourceFieldMapping = getMapOrNull(entry.getValue());
if (sourceFieldMapping == null) {
return;
}
var destFieldMapping = processAndCopy(entry.getKey(), sourceFieldMapping, destProperties, fieldMappingConsumer);
visitAndCopyMapping(sourceFieldMapping, destFieldMapping, fieldMappingConsumer);

// Multi fields
Map<String, ?> sourceMultiFields = getMapOrNull(sourceFieldMapping.get(MULTI_FIELDS));
if (sourceMultiFields == null) {
continue;
}
Map<String, Object> destFields = new HashMap<>(sourceMultiFields.size());
destFieldMapping.put(MULTI_FIELDS, destFields);
for (Map.Entry<String, ?> multiFieldEntry : sourceMultiFields.entrySet()) {
String multiFieldName = multiFieldEntry.getKey();
Map<String, ?> sourceMultiFieldMapping = getMapOrNull(multiFieldEntry.getValue());
if (sourceMultiFieldMapping == null) {
continue;
}
processAndCopy(multiFieldName, sourceMultiFieldMapping, destFields, fieldMappingConsumer);
}
}
}

private static Map<String, ?> getMapOrNull(Object object) {
if (object instanceof Map) {
@SuppressWarnings("unchecked")
Map<String, ?> map = (Map<String, ?>) object;
return map;
}
return null;
}

private static Map<String, Object> processAndCopy(
String fieldName,
Map<String, ?> sourceFieldMapping,
Map<String, Object> destParentMap,
TriConsumer<String, Map<String, ?>, Map<String, Object>> fieldMappingConsumer
) {
Map<String, Object> destFieldMapping = new HashMap<>(sourceFieldMapping.size());
destParentMap.put(fieldName, destFieldMapping);
fieldMappingConsumer.apply(fieldName, sourceFieldMapping, destFieldMapping);
return destFieldMapping;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -136,4 +136,61 @@ public void testCountRuntimeFields() {
private static void collectRuntimeTypes(Map<String, ?> mapping, Set<String> types) {
MappingVisitor.visitRuntimeMapping(mapping, (f, m) -> types.add(m.get("type").toString()));
}

@SuppressWarnings("unchecked")
public void testConvertLongToKeyword() {
Map<String, Object> longType = Map.of("type", "long");
Map<String, Object> textType = Map.of("type", "text");
Map<String, Object> floatType = Map.of("type", "float", "scaling_factor", 1000);
Map<String, Object> multiField = Map.of("type", "keyword", "fields", Map.of("my-long", longType, "my-float", floatType));
Map<String, Object> objectField = Map.of("type", "keyword", "properties", Map.of("my-text", textType, "my-long", longType));
Map<String, Object> expectedProperties = Map.of(
"properties",
Map.of("my-long", longType, "my-float", floatType, "my-multi-field", multiField, "my-object", objectField)
);

HashMap<String, Object> result = new HashMap<>();
MappingVisitor.visitAndCopyMapping(expectedProperties, result, (fieldName, source, dest) -> {
for (String key : source.keySet()) {
if (key.equals("type") && source.get(key).equals("long")) {
dest.put(key, "keyword");
} else {
dest.put(key, source.get(key));
}
}
});

assertTrue(result.containsKey("properties"));
Map<String, Object> properties = (Map<String, Object>) result.get("properties");

assertTrue(properties.containsKey("my-long"));
Map<String, Object> myLong = (Map<String, Object>) properties.get("my-long");
assertEquals("keyword", myLong.get("type"));

assertTrue(properties.containsKey("my-float"));
Map<String, Object> myFloat = (Map<String, Object>) properties.get("my-float");
assertEquals("float", myFloat.get("type"));
assertEquals(1000, myFloat.get("scaling_factor"));

assertTrue(properties.containsKey("my-multi-field"));
Map<String, Object> myMultiField = (Map<String, Object>) properties.get("my-multi-field");
assertEquals("keyword", myMultiField.get("type"));
assertTrue(myMultiField.containsKey("fields"));
Map<String, Object> foundFields = (Map<String, Object>) myMultiField.get("fields");
assertTrue(foundFields.containsKey("my-long"));
assertEquals("keyword", ((Map<String, Object>) foundFields.get("my-long")).get("type"));
assertTrue(foundFields.containsKey("my-float"));
assertEquals("float", ((Map<String, Object>) foundFields.get("my-float")).get("type"));
assertEquals(1000, ((Map<String, Object>) foundFields.get("my-float")).get("scaling_factor"));

assertTrue(properties.containsKey("my-object"));
Map<String, Object> myObject = (Map<String, Object>) properties.get("my-object");
assertEquals("keyword", myObject.get("type"));
assertTrue(myObject.containsKey("properties"));
Map<String, Object> foundSubObjects = (Map<String, Object>) myObject.get("properties");
assertTrue(foundSubObjects.containsKey("my-long"));
assertEquals("keyword", ((Map<String, Object>) foundSubObjects.get("my-long")).get("type"));
assertTrue(foundSubObjects.containsKey("my-text"));
assertEquals("text", ((Map<String, Object>) foundSubObjects.get("my-text")).get("type"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,13 @@ public void testDownsamplingPassthroughDimensions() throws Exception {
"attributes": {
"type": "passthrough",
"priority": 10,
"time_series_dimension": true
"time_series_dimension": true,
"properties": {
"os.name": {
"type": "keyword",
"time_series_dimension": true
}
}
},
"metrics.cpu_usage": {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add a passthrough for metrics, without time_series_dimension set.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this covered by testDownsamplingPassthroughMetrics?

"type": "double",
Expand All @@ -70,7 +76,83 @@ public void testDownsamplingPassthroughDimensions() throws Exception {
.startObject()
.field("@timestamp", ts)
.field("attributes.host.name", randomFrom("host1", "host2", "host3"))
.field("attributes.os.name", randomFrom("linux", "windows", "macos"))
.field("metrics.cpu_usage", randomDouble())
.endObject();
} catch (IOException e) {
throw new RuntimeException(e);
}
};
bulkIndex(dataStreamName, sourceSupplier, 100);
// Rollover to ensure the index we will downsample is not the write index
assertAcked(client().admin().indices().rolloverIndex(new RolloverRequest(dataStreamName, null)));
List<String> backingIndices = waitForDataStreamBackingIndices(dataStreamName, 2);
String sourceIndex = backingIndices.get(0);
String interval = "5m";
String targetIndex = "downsample-" + interval + "-" + sourceIndex;
// Set the source index to read-only state
assertAcked(
indicesAdmin().prepareUpdateSettings(sourceIndex)
.setSettings(Settings.builder().put(IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.getKey(), true).build())
);

DownsampleConfig downsampleConfig = new DownsampleConfig(new DateHistogramInterval(interval));
assertAcked(
client().execute(
DownsampleAction.INSTANCE,
new DownsampleAction.Request(TEST_REQUEST_TIMEOUT, sourceIndex, targetIndex, TIMEOUT, downsampleConfig)
)
);

// Wait for downsampling to complete
SubscribableListener<Void> listener = ClusterServiceUtils.addMasterTemporaryStateListener(clusterState -> {
final var indexMetadata = clusterState.metadata().getProject().index(targetIndex);
if (indexMetadata == null) {
return false;
}
var downsampleStatus = IndexMetadata.INDEX_DOWNSAMPLE_STATUS.get(indexMetadata.getSettings());
return downsampleStatus == IndexMetadata.DownsampleTaskStatus.SUCCESS;
});
safeAwait(listener);

assertDownsampleIndexFieldsAndDimensions(sourceIndex, targetIndex, downsampleConfig);
}

public void testDownsamplingPassthroughMetrics() throws Exception {
String dataStreamName = "metrics-foo";
// Set up template
putTSDBIndexTemplate("my-template", List.of("metrics-foo"), null, """
{
"properties": {
"attributes.os.name": {
"type": "keyword",
"time_series_dimension": true
},
"metrics": {
"type": "passthrough",
"priority": 10,
"properties": {
"cpu_usage": {
"type": "double",
"time_series_metric": "gauge"
}
}
}
}
}
""", null, null);

// Create data stream by indexing documents
final Instant now = Instant.now();
Supplier<XContentBuilder> sourceSupplier = () -> {
String ts = randomDateForRange(now.minusSeconds(60 * 60).toEpochMilli(), now.plusSeconds(60 * 29).toEpochMilli());
try {
return XContentFactory.jsonBuilder()
.startObject()
.field("@timestamp", ts)
.field("attributes.os.name", randomFrom("linux", "windows", "macos"))
.field("metrics.cpu_usage", randomDouble())
.field("metrics.memory_usage", randomDouble())
.endObject();
} catch (IOException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public boolean isTimeSeriesDimension(final String unused, final Map<String, ?> f
return Boolean.TRUE.equals(fieldMapping.get(TIME_SERIES_DIMENSION_PARAM)) && isPassthroughField(fieldMapping) == false;
}

public static boolean isPassthroughField(final Map<String, ?> fieldMapping) {
public boolean isPassthroughField(final Map<String, ?> fieldMapping) {
return PassThroughObjectMapper.CONTENT_TYPE.equals(fieldMapping.get(ContextMapping.FIELD_TYPE));
}

Expand Down
Loading