Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/112645.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 112645
summary: Add support for multi-value dimensions
area: Mapping
type: enhancement
issues:
- 110387
1 change: 0 additions & 1 deletion docs/reference/mapping/types/keyword.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,6 @@ index setting limits the number of dimensions in an index.
Dimension fields have the following constraints:

* The `doc_values` and `index` mapping parameters must be `true`.
* Field values cannot be an <<array,array or multi-value>>.
// end::dimension[]
* Dimension values are used to identify a document’s time series. If dimension values are altered in any way during indexing, the document will be stored as belonging to different from intended time series. As a result there are additional constraints:
** The field cannot use a <<normalizer,`normalizer`>>.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1230,3 +1230,78 @@ non string dimension fields:
- match: { .$idx0name.mappings.properties.attributes.properties.double.time_series_dimension: true }
- match: { .$idx0name.mappings.properties.attributes.properties.host\.ip.type: 'ip' }
- match: { .$idx0name.mappings.properties.attributes.properties.host\.ip.time_series_dimension: true }

---
multi value dimensions:
- requires:
cluster_features: ["routing.multi_value_routing_path"]
reason: support for multi-value dimensions

- do:
allowed_warnings:
- "index template [my-dynamic-template] has index patterns [k9s*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [my-dynamic-template] will take precedence during new index creation"
indices.put_index_template:
name: my-dynamic-template
body:
index_patterns: [k9s*]
data_stream: {}
template:
settings:
index:
number_of_shards: 1
mode: time_series
time_series:
start_time: 2023-08-31T13:03:08.138Z

mappings:
properties:
attributes:
type: passthrough
dynamic: true
time_series_dimension: true
priority: 1
dynamic_templates:
- counter_metric:
mapping:
type: integer
time_series_metric: counter

- do:
bulk:
index: k9s
refresh: true
body:
- '{ "create": { "dynamic_templates": { "data": "counter_metric" } } }'
- '{ "@timestamp": "2023-09-01T13:03:08.138Z","data": "10", "attributes": { "dim1": ["a" , "b"], "dim2": [1, 2] } }'
- '{ "create": { "dynamic_templates": { "data": "counter_metric" } } }'
- '{ "@timestamp": "2023-09-01T13:03:08.138Z","data": "20", "attributes": { "dim1": ["b" , "a"], "dim2": [1, 2] } }'
- '{ "create": { "dynamic_templates": { "data": "counter_metric" } } }'
- is_false: errors

- do:
search:
index: k9s
body:
size: 0
aggs:
tsids:
terms:
field: _tsid

- length: { aggregations.tsids.buckets: 2 } # only the order of the dim1 attribute is different, yet we expect to have two distinct time series

- do:
search:
index: k9s
body:
size: 0
aggs:
dims:
terms:
field: dim1

- length: { aggregations.dims.buckets: 2 }
- match: { aggregations.dims.buckets.0.key: a }
- match: { aggregations.dims.buckets.0.doc_count: 2 }
- match: { aggregations.dims.buckets.1.key: b }
- match: { aggregations.dims.buckets.1.doc_count: 2 }
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ missing dimension on routing path field:
multi-value routing path field:
- requires:
test_runner_features: close_to
cluster_features: ["gte_v8.13.0"]
cluster_features: ["gte_v8.13.0", "routing.multi_value_routing_path"]
reason: _tsid hashing introduced in 8.13

- do:
Expand Down Expand Up @@ -172,12 +172,7 @@ multi-value routing path field:
- '{"index": {}}'
- '{"@timestamp": "2021-04-28T18:35:54.467Z", "uid": "df3145b3-0563-4d3b-a0f7-897eb2876ea9", "voltage": 6.8, "unmapped_field": 40, "tag": [ "one", "three" ] }'

- is_true: errors

- match: {items.1.index.error.reason: "Error extracting routing: Routing values must be strings but found [START_ARRAY]" }
- match: {items.3.index.error.reason: "Error extracting routing: Routing values must be strings but found [START_ARRAY]" }
- match: {items.4.index.error.reason: "Error extracting routing: Routing values must be strings but found [START_ARRAY]" }
- match: {items.7.index.error.reason: "Error extracting routing: Routing values must be strings but found [START_ARRAY]" }
- is_false: errors

- do:
search:
Expand All @@ -195,13 +190,21 @@ multi-value routing path field:
avg:
field: voltage

- match: {hits.total.value: 4}
- length: {aggregations.tsids.buckets: 2}
- match: {hits.total.value: 8}
- length: {aggregations.tsids.buckets: 4}

- match: {aggregations.tsids.buckets.0.key: "KDODRmbj7vu4rLWvjrJbpUuaET_vOYoRw6ImzKEcF4sEaGKnXSaKfM0" }
- match: {aggregations.tsids.buckets.0.key: "KDODRmbj7vu4rLWvjrJbpUtt0uPSOYoRw_LI4DD7DFEGEJ3NR3eQkMY" }
- match: {aggregations.tsids.buckets.0.doc_count: 2 }
- close_to: {aggregations.tsids.buckets.0.voltage.value: { value: 6.70, error: 0.01 }}

- match: { aggregations.tsids.buckets.1.key: "KDODRmbj7vu4rLWvjrJbpUvcUWJEddqA4Seo8jbBBBFxwC0lrefCb6A" }
- match: { aggregations.tsids.buckets.1.key: "KDODRmbj7vu4rLWvjrJbpUtt0uPSddqA4WYKglGPR_C0cJe8QGaiC2c" }
- match: {aggregations.tsids.buckets.1.doc_count: 2 }
- close_to: {aggregations.tsids.buckets.1.voltage.value: { value: 7.30, error: 0.01 }}
- close_to: {aggregations.tsids.buckets.1.voltage.value: { value: 7.15, error: 0.01 }}

- match: { aggregations.tsids.buckets.2.key: "KDODRmbj7vu4rLWvjrJbpUuaET_vOYoRw6ImzKEcF4sEaGKnXSaKfM0" }
- match: {aggregations.tsids.buckets.2.doc_count: 2 }
- close_to: {aggregations.tsids.buckets.2.voltage.value: { value: 6.70, error: 0.01 }}

- match: { aggregations.tsids.buckets.3.key: "KDODRmbj7vu4rLWvjrJbpUvcUWJEddqA4Seo8jbBBBFxwC0lrefCb6A" }
- match: {aggregations.tsids.buckets.3.doc_count: 2 }
- close_to: {aggregations.tsids.buckets.3.voltage.value: { value: 7.30, error: 0.01 }}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -44,13 +43,15 @@
import java.util.function.Predicate;

import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.elasticsearch.common.xcontent.XContentParserUtils.expectValueToken;

/**
* Generates the shard id for {@code (id, routing)} pairs.
*/
public abstract class IndexRouting {

static final NodeFeature BOOLEAN_ROUTING_PATH = new NodeFeature("routing.boolean_routing_path");
static final NodeFeature MULTI_VALUE_ROUTING_PATH = new NodeFeature("routing.multi_value_routing_path");

/**
* Build the routing from {@link IndexMetadata}.
Expand Down Expand Up @@ -300,12 +301,20 @@ public String createId(Map<String, Object> flat, byte[] suffix) {
Builder b = builder();
for (Map.Entry<String, Object> e : flat.entrySet()) {
if (isRoutingPath.test(e.getKey())) {
b.hashes.add(new NameAndHash(new BytesRef(e.getKey()), hash(new BytesRef(e.getValue().toString()))));
if (e.getValue() instanceof List<?> listValue) {
listValue.forEach(v -> hashValue(b, e.getKey(), v));
} else {
hashValue(b, e.getKey(), e.getValue());
}
}
}
return b.createId(suffix, IndexRouting.ExtractFromSource::defaultOnEmpty);
}

private static void hashValue(Builder b, String key, Object value) {
b.hashes.add(new NameAndHash(new BytesRef(key), hash(new BytesRef(value.toString()))));
}

private static int defaultOnEmpty() {
throw new IllegalArgumentException("Error extracting routing: source didn't contain any routing fields");
}
Expand Down Expand Up @@ -356,6 +365,13 @@ private void extractObject(@Nullable String path, XContentParser source) throws
}
}

private void extractArray(@Nullable String path, XContentParser source) throws IOException {
while (source.currentToken() != Token.END_ARRAY) {
expectValueToken(source.currentToken(), source);
extractItem(path, source);
}
}

private void extractItem(String path, XContentParser source) throws IOException {
switch (source.currentToken()) {
case START_OBJECT:
Expand All @@ -369,34 +385,31 @@ private void extractItem(String path, XContentParser source) throws IOException
hashes.add(new NameAndHash(new BytesRef(path), hash(new BytesRef(source.text()))));
source.nextToken();
break;
case START_ARRAY:
source.nextToken();
extractArray(path, source);
source.nextToken();
break;
case VALUE_NULL:
source.nextToken();
break;
default:
throw new ParsingException(
source.getTokenLocation(),
"Routing values must be strings but found [{}]",
"Cannot extract routing path due to unexpected token [{}]",
source.currentToken()
);
}
}

private int buildHash(IntSupplier onEmpty) {
Collections.sort(hashes);
Iterator<NameAndHash> itr = hashes.iterator();
if (itr.hasNext() == false) {
if (hashes.isEmpty()) {
return onEmpty.getAsInt();
}
NameAndHash prev = itr.next();
int hash = hash(prev.name) ^ prev.hash;
while (itr.hasNext()) {
NameAndHash next = itr.next();
if (prev.name.equals(next.name)) {
throw new IllegalArgumentException("Duplicate routing dimension for [" + next.name + "]");
}
int thisHash = hash(next.name) ^ next.hash;
hash = 31 * hash + thisHash;
prev = next;
Collections.sort(hashes);
int hash = 0;
for (NameAndHash nah : hashes) {
hash = 31 * hash + (hash(nah.name) ^ nah.hash);
}
return hash;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,6 @@ public class RoutingFeatures implements FeatureSpecification {

@Override
public Set<NodeFeature> getFeatures() {
return Set.of(IndexRouting.BOOLEAN_ROUTING_PATH);
return Set.of(IndexRouting.BOOLEAN_ROUTING_PATH, IndexRouting.MULTI_VALUE_ROUTING_PATH);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,20 @@ public static void ensureExpectedToken(Token expected, Token actual, XContentPar
}
}

/**
* Makes sure the provided token {@linkplain Token#isValue() is a value type}
*
* @throws ParsingException if the token is not a value type
*/
public static void expectValueToken(Token actual, XContentParser parser) {
if (actual.isValue() == false) {
throw new ParsingException(
parser.getTokenLocation(),
String.format(Locale.ROOT, "Failed to parse object: expecting value token but found [%s]", actual)
);
}
}

private static ParsingException parsingException(XContentParser parser, Token expected, Token actual) {
return new ParsingException(
parser.getTokenLocation(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,14 @@
import java.io.IOException;
import java.net.InetAddress;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collections;
import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.SortedMap;
import java.util.TreeMap;

/**
* Mapper for {@code _tsid} field included generated when the index is
Expand Down Expand Up @@ -180,16 +181,14 @@ public static class TimeSeriesIdBuilder implements DocumentDimensions {

public static final int MAX_DIMENSIONS = 512;

private record Dimension(BytesRef name, BytesReference value) {}

private final Murmur3Hasher tsidHasher = new Murmur3Hasher(0);

/**
* A sorted set of the serialized values of dimension fields that will be used
* for generating the _tsid field. The map will be used by {@link TimeSeriesIdFieldMapper}
* to build the _tsid field for the document.
*/
private final SortedSet<Dimension> dimensions = new TreeSet<>(Comparator.comparing(o -> o.name));
private final SortedMap<BytesRef, List<BytesReference>> dimensions = new TreeMap<>();
/**
* Builds the routing. Used for building {@code _id}. If null then skipped.
*/
Expand All @@ -207,9 +206,16 @@ public BytesReference buildLegacyTsid() throws IOException {

try (BytesStreamOutput out = new BytesStreamOutput()) {
out.writeVInt(dimensions.size());
for (Dimension entry : dimensions) {
out.writeBytesRef(entry.name);
entry.value.writeTo(out);
for (Map.Entry<BytesRef, List<BytesReference>> entry : dimensions.entrySet()) {
out.writeBytesRef(entry.getKey());
List<BytesReference> value = entry.getValue();
if (value.size() > 1) {
throw new IllegalArgumentException(
"Dimension field [" + entry.getKey().utf8ToString() + "] cannot be a multi-valued field."
);
}
assert value.isEmpty() == false : "dimension value is empty";
value.get(0).writeTo(out);
}
return out.bytes();
}
Expand Down Expand Up @@ -241,18 +247,19 @@ public BytesReference buildTsidHash() {
int tsidHashIndex = StreamOutput.putVInt(tsidHash, len, 0);

tsidHasher.reset();
for (final Dimension dimension : dimensions) {
tsidHasher.update(dimension.name.bytes);
for (final BytesRef name : dimensions.keySet()) {
tsidHasher.update(name.bytes);
}
tsidHashIndex = writeHash128(tsidHasher.digestHash(), tsidHash, tsidHashIndex);

// NOTE: concatenate all dimension value hashes up to a certain number of dimensions
int tsidHashStartIndex = tsidHashIndex;
for (final Dimension dimension : dimensions) {
for (final List<BytesReference> values : dimensions.values()) {
if ((tsidHashIndex - tsidHashStartIndex) >= 4 * numberOfDimensions) {
break;
}
final BytesRef dimensionValueBytesRef = dimension.value.toBytesRef();
assert values.isEmpty() == false : "dimension values are empty";
final BytesRef dimensionValueBytesRef = values.get(0).toBytesRef();
ByteUtils.writeIntLE(
StringHelper.murmurhash3_x86_32(
dimensionValueBytesRef.bytes,
Expand All @@ -268,8 +275,8 @@ public BytesReference buildTsidHash() {

// NOTE: hash all dimension field allValues
tsidHasher.reset();
for (final Dimension dimension : dimensions) {
tsidHasher.update(dimension.value.toBytesRef().bytes);
for (final List<BytesReference> values : dimensions.values()) {
values.forEach(v -> tsidHasher.update(v.toBytesRef().bytes));
}
tsidHashIndex = writeHash128(tsidHasher.digestHash(), tsidHash, tsidHashIndex);

Expand Down Expand Up @@ -372,8 +379,15 @@ public DocumentDimensions validate(final IndexSettings settings) {
}

private void add(String fieldName, BytesReference encoded) throws IOException {
if (dimensions.add(new Dimension(new BytesRef(fieldName), encoded)) == false) {
throw new IllegalArgumentException("Dimension field [" + fieldName + "] cannot be a multi-valued field.");
BytesRef name = new BytesRef(fieldName);
List<BytesReference> values = dimensions.get(name);
if (values == null) {
// optimize for the common case where dimensions are not multi-valued
values = new ArrayList<>(1);
values.add(encoded);
dimensions.put(name, values);
} else {
values.add(encoded);
}
}
}
Expand Down
Loading