Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -923,7 +923,7 @@ public Index getConcreteWriteIndex(IndexAbstraction ia, ProjectMetadata project)

@Override
public int route(IndexRouting indexRouting) {
return indexRouting.indexShard(id, routing, tsid, indexSource.contentType(), indexSource.bytes());
return indexRouting.indexShard(this);
}

public IndexRequest setRequireAlias(boolean requireAlias) {
Expand Down
322 changes: 114 additions & 208 deletions server/src/main/java/org/elasticsearch/cluster/routing/IndexRouting.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.cluster.routing;

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.ParsingException;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.util.ByteUtils;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.index.IndexVersions;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xcontent.XContentString;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.function.IntSupplier;
import java.util.function.Predicate;

import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.elasticsearch.common.xcontent.XContentParserUtils.expectValueToken;

/**
* A builder for computing a hash from fields in the document source that are part of the
* {@link org.elasticsearch.cluster.metadata.IndexMetadata#INDEX_ROUTING_PATH}.
* It is used in the context of {@link IndexRouting.ExtractFromSource.ForRoutingPath} to determine the shard a document should be routed to.
*/
public class RoutingHashBuilder {
private final List<NameAndHash> hashes = new ArrayList<>();
private final Predicate<String> isRoutingPath;

public RoutingHashBuilder(Predicate<String> isRoutingPath) {
this.isRoutingPath = isRoutingPath;
}

public void addMatching(String fieldName, BytesRef string) {
if (isRoutingPath.test(fieldName)) {
addHash(fieldName, string);
}
}

/**
* Only expected to be called for old indices created before
* {@link IndexVersions#TIME_SERIES_ROUTING_HASH_IN_ID} while creating (during ingestion)
* or synthesizing (at query time) the _id field.
*/
public String createId(byte[] suffix, IntSupplier onEmpty) {
byte[] idBytes = new byte[4 + suffix.length];
ByteUtils.writeIntLE(buildHash(onEmpty), idBytes, 0);
System.arraycopy(suffix, 0, idBytes, 4, suffix.length);
return Strings.BASE_64_NO_PADDING_URL_ENCODER.encodeToString(idBytes);
}

void extractObject(@Nullable String path, XContentParser source) throws IOException {
while (source.currentToken() != XContentParser.Token.END_OBJECT) {
ensureExpectedToken(XContentParser.Token.FIELD_NAME, source.currentToken(), source);
String fieldName = source.currentName();
String subPath = path == null ? fieldName : path + "." + fieldName;
source.nextToken();
extractItem(subPath, source);
}
}

private void extractArray(@Nullable String path, XContentParser source) throws IOException {
while (source.currentToken() != XContentParser.Token.END_ARRAY) {
expectValueToken(source.currentToken(), source);
extractItem(path, source);
}
}

private void extractItem(String path, XContentParser source) throws IOException {
switch (source.currentToken()) {
case START_OBJECT:
source.nextToken();
extractObject(path, source);
source.nextToken();
break;
case VALUE_STRING:
case VALUE_NUMBER:
case VALUE_BOOLEAN:
XContentString.UTF8Bytes utf8Bytes = source.optimizedText().bytes();
addHash(path, new BytesRef(utf8Bytes.bytes(), utf8Bytes.offset(), utf8Bytes.length()));
source.nextToken();
break;
case START_ARRAY:
source.nextToken();
extractArray(path, source);
source.nextToken();
break;
case VALUE_NULL:
source.nextToken();
break;
default:
throw new ParsingException(
source.getTokenLocation(),
"Cannot extract routing path due to unexpected token [{}]",
source.currentToken()
);
}
}

private void addHash(String path, BytesRef value) {
hashes.add(new NameAndHash(new BytesRef(path), IndexRouting.ExtractFromSource.hash(value), hashes.size()));
}

int buildHash(IntSupplier onEmpty) {
if (hashes.isEmpty()) {
return onEmpty.getAsInt();
}
Collections.sort(hashes);
int hash = 0;
for (NameAndHash nah : hashes) {
hash = 31 * hash + (IndexRouting.ExtractFromSource.hash(nah.name) ^ nah.hash);
}
return hash;
}

private record NameAndHash(BytesRef name, int hash, int order) implements Comparable<NameAndHash> {
@Override
public int compareTo(NameAndHash o) {
int i = name.compareTo(o.name);
if (i != 0) return i;
// ensures array values are in the order as they appear in the source
return Integer.compare(order, o.order);
}
}
}
22 changes: 12 additions & 10 deletions server/src/main/java/org/elasticsearch/index/IndexMode.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.elasticsearch.index.mapper.RoutingFields;
import org.elasticsearch.index.mapper.RoutingPathFields;
import org.elasticsearch.index.mapper.SourceFieldMapper;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper;
import org.elasticsearch.index.mapper.TimeSeriesRoutingHashFieldMapper;
import org.elasticsearch.index.mapper.TsidExtractingIdFieldMapper;
Expand Down Expand Up @@ -114,7 +113,7 @@ public IdFieldMapper buildIdFieldMapper(BooleanSupplier fieldDataEnabled) {
}

@Override
public RoutingFields buildRoutingFields(IndexSettings settings, SourceToParse source) {
public RoutingFields buildRoutingFields(IndexSettings settings) {
return RoutingFields.Noop.INSTANCE;
}

Expand Down Expand Up @@ -218,13 +217,16 @@ public IdFieldMapper buildIdFieldMapper(BooleanSupplier fieldDataEnabled) {
}

@Override
public RoutingFields buildRoutingFields(IndexSettings settings, SourceToParse source) {
if (source.tsid() != null) {
// If the source already has a _tsid field, we don't need to extract routing from the source.
public RoutingFields buildRoutingFields(IndexSettings settings) {
if (settings.getIndexRouting() instanceof IndexRouting.ExtractFromSource.ForRoutingPath forRoutingPath) {
return new RoutingPathFields(forRoutingPath.builder());
} else if (settings.getIndexRouting() instanceof IndexRouting.ExtractFromSource.ForIndexDimensions) {
return RoutingFields.Noop.INSTANCE;
} else {
throw new IllegalStateException(
"Index routing strategy not supported for index_mode=time_series: " + settings.getIndexRouting()
);
}
IndexRouting.ExtractFromSource routing = (IndexRouting.ExtractFromSource) settings.getIndexRouting();
return new RoutingPathFields(routing.builder());
}

@Override
Expand Down Expand Up @@ -303,7 +305,7 @@ public MetadataFieldMapper timeSeriesRoutingHashFieldMapper() {
}

@Override
public RoutingFields buildRoutingFields(IndexSettings settings, SourceToParse source) {
public RoutingFields buildRoutingFields(IndexSettings settings) {
return RoutingFields.Noop.INSTANCE;
}

Expand Down Expand Up @@ -384,7 +386,7 @@ public IdFieldMapper buildIdFieldMapper(BooleanSupplier fieldDataEnabled) {
}

@Override
public RoutingFields buildRoutingFields(IndexSettings settings, SourceToParse source) {
public RoutingFields buildRoutingFields(IndexSettings settings) {
return RoutingFields.Noop.INSTANCE;
}

Expand Down Expand Up @@ -540,7 +542,7 @@ public String getName() {
/**
* How {@code time_series_dimension} fields are handled by indices in this mode.
*/
public abstract RoutingFields buildRoutingFields(IndexSettings settings, SourceToParse source);
public abstract RoutingFields buildRoutingFields(IndexSettings settings);

/**
* @return Whether timestamps should be validated for being withing the time range of an index.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.Query;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.cluster.routing.IndexRouting;
import org.elasticsearch.common.Explicit;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.xcontent.XContentHelper;
Expand Down Expand Up @@ -96,7 +97,7 @@ public ParsedDocument parseDocument(SourceToParse source, MappingLookup mappingL
)
)
) {
context = new RootDocumentParserContext(mappingLookup, mappingParserContext, source, parser, source.tsid());
context = new RootDocumentParserContext(mappingLookup, mappingParserContext, source, parser);
validateStart(context.parser());
MetadataFieldMapper[] metadataFieldsMappers = mappingLookup.getMapping().getSortedMetadataMappers();
internalParseDocument(metadataFieldsMappers, context);
Expand Down Expand Up @@ -1077,8 +1078,7 @@ private static class RootDocumentParserContext extends DocumentParserContext {
MappingLookup mappingLookup,
MappingParserContext mappingParserContext,
SourceToParse source,
XContentParser parser,
BytesRef tsid
XContentParser parser
) throws IOException {
super(
mappingLookup,
Expand All @@ -1087,8 +1087,17 @@ private static class RootDocumentParserContext extends DocumentParserContext {
mappingLookup.getMapping().getRoot(),
ObjectMapper.Dynamic.getRootDynamic(mappingLookup)
);
IndexSettings indexSettings = mappingParserContext.getIndexSettings();
BytesRef tsid = source.tsid();
if (tsid == null
&& indexSettings.getMode() == IndexMode.TIME_SERIES
&& indexSettings.getIndexRouting() instanceof IndexRouting.ExtractFromSource.ForIndexDimensions forIndexDimensions) {
// the tsid is normally set on the coordinating node during shard routing and passed to the data node via the index request
// but when applying a translog operation, shard routing is not happening, and we have to create the tsid from source
tsid = forIndexDimensions.buildTsid(source.getXContentType(), source.source());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally we would have

else {
    assert tsid == forIndexDimensions.buildTsid(source.getXContentType(), source.source());
}

(for when dimensions are in use), but I guess this is not always upheld.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When it comes to replaying translog operations (which include the id but not the tsid), the effect of this check is similar because the id is based on the tsid, and it also checks in the ForRoutingPath case:

if (context.sourceToParse().id() != null && false == context.sourceToParse().id().equals(id)) {
throw new IllegalArgumentException(
String.format(
Locale.ROOT,
"_id must be unset or set to [%s] but was [%s] because [%s] is in time_series mode",
id,
context.sourceToParse().id(),
context.indexSettings().getIndexMetadata().getIndex().getName()
)
);
}
context.id(id);

this.tsid = tsid;
assert tsid == null || mappingParserContext.getIndexSettings().getMode() == IndexMode.TIME_SERIES
assert this.tsid == null || indexSettings.getMode() == IndexMode.TIME_SERIES
: "tsid should only be set for time series indices";
if (mappingLookup.getMapping().getRoot().subobjects() == ObjectMapper.Subobjects.ENABLED) {
this.parser = DotExpandingXContentParser.expandDots(parser, this.path);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ protected DocumentParserContext(
null,
null,
SeqNoFieldMapper.SequenceIDFields.emptySeqID(mappingParserContext.getIndexSettings().seqNoIndexOptions()),
RoutingFields.fromIndexSettings(mappingParserContext.getIndexSettings(), source),
RoutingFields.fromIndexSettings(mappingParserContext.getIndexSettings()),
parent,
dynamic,
new HashSet<>(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.routing.IndexRouting;
import org.elasticsearch.cluster.routing.RoutingHashBuilder;
import org.elasticsearch.index.fieldvisitor.LeafStoredFieldLoader;

import java.io.IOException;
Expand All @@ -37,7 +38,7 @@ static IdLoader fromLeafStoredFieldLoader() {
/**
* @return returns an {@link IdLoader} instance that syn synthesizes _id from routing, _tsid and @timestamp fields.
*/
static IdLoader createTsIdLoader(IndexRouting.ExtractFromSource indexRouting, List<String> routingPaths) {
static IdLoader createTsIdLoader(IndexRouting.ExtractFromSource.ForRoutingPath indexRouting, List<String> routingPaths) {
return new TsIdLoader(indexRouting, routingPaths);
}

Expand All @@ -58,19 +59,19 @@ sealed interface Leaf permits StoredLeaf, TsIdLeaf {

final class TsIdLoader implements IdLoader {

private final IndexRouting.ExtractFromSource indexRouting;
private final IndexRouting.ExtractFromSource.ForRoutingPath indexRouting;
private final List<String> routingPaths;

TsIdLoader(IndexRouting.ExtractFromSource indexRouting, List<String> routingPaths) {
TsIdLoader(IndexRouting.ExtractFromSource.ForRoutingPath indexRouting, List<String> routingPaths) {
this.routingPaths = routingPaths;
this.indexRouting = indexRouting;
}

public IdLoader.Leaf leaf(LeafStoredFieldLoader loader, LeafReader reader, int[] docIdsInLeaf) throws IOException {
IndexRouting.ExtractFromSource.RoutingHashBuilder[] builders = null;
RoutingHashBuilder[] builders = null;
if (indexRouting != null) {
// this branch is for legacy indices before IndexVersions.TIME_SERIES_ROUTING_HASH_IN_ID
builders = new IndexRouting.ExtractFromSource.RoutingHashBuilder[docIdsInLeaf.length];
builders = new RoutingHashBuilder[docIdsInLeaf.length];
for (int i = 0; i < builders.length; i++) {
builders[i] = indexRouting.builder();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ public interface RoutingFields {
/**
* Collect routing fields from index settings
*/
static RoutingFields fromIndexSettings(IndexSettings indexSettings, SourceToParse source) {
return indexSettings.getMode().buildRoutingFields(indexSettings, source);
static RoutingFields fromIndexSettings(IndexSettings indexSettings) {
return indexSettings.getMode().buildRoutingFields(indexSettings);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.StringHelper;
import org.elasticsearch.cluster.routing.IndexRouting;
import org.elasticsearch.cluster.routing.RoutingHashBuilder;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.hash.Murmur3Hasher;
Expand Down Expand Up @@ -59,17 +59,17 @@ public final class RoutingPathFields implements RoutingFields {
* Builds the routing. Used for building {@code _id}. If null then skipped.
*/
@Nullable
private final IndexRouting.ExtractFromSource.RoutingHashBuilder routingBuilder;
private final RoutingHashBuilder routingBuilder;

public RoutingPathFields(@Nullable IndexRouting.ExtractFromSource.RoutingHashBuilder routingBuilder) {
public RoutingPathFields(@Nullable RoutingHashBuilder routingBuilder) {
this.routingBuilder = routingBuilder;
}

SortedMap<BytesRef, List<BytesReference>> routingValues() {
return Collections.unmodifiableSortedMap(routingValues);
}

IndexRouting.ExtractFromSource.RoutingHashBuilder routingBuilder() {
RoutingHashBuilder routingBuilder() {
return routingBuilder;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import org.apache.lucene.document.StringField;
import org.apache.lucene.search.Query;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.cluster.routing.IndexRouting;
import org.elasticsearch.cluster.routing.RoutingHashBuilder;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
Expand Down Expand Up @@ -161,12 +161,13 @@ public void postParse(DocumentParserContext context) throws IOException {
throw new MapperException("Too many dimension fields [" + size + "], max [" + limit + "] dimension fields allowed");
}
timeSeriesId = buildLegacyTsid(routingPathFields).toBytesRef();
} else if (context.getTsid() != null) {
} else if (context.getRoutingFields() instanceof RoutingPathFields rpf) {
routingPathFields = rpf;
timeSeriesId = routingPathFields.buildHash().toBytesRef();
} else {
routingPathFields = null;
assert context.getTsid() != null;
timeSeriesId = context.getTsid();
} else {
routingPathFields = (RoutingPathFields) context.getRoutingFields();
timeSeriesId = routingPathFields.buildHash().toBytesRef();
}

if (this.useDocValuesSkipper) {
Expand All @@ -175,7 +176,7 @@ public void postParse(DocumentParserContext context) throws IOException {
context.doc().add(new SortedDocValuesField(fieldType().name(), timeSeriesId));
}

IndexRouting.ExtractFromSource.RoutingHashBuilder routingBuilder;
RoutingHashBuilder routingBuilder;
if (getIndexVersionCreated(context).before(IndexVersions.TIME_SERIES_ROUTING_HASH_IN_ID) && routingPathFields != null) {
// For legacy indices, we need to create the routing hash from the routing path fields.
routingBuilder = routingPathFields.routingBuilder();
Expand Down
Loading