Skip to content

Commit 41488ee

Browse files
authored
Refactor IndexRouting.ExtractFromSource to be an abstract class (#135206)
With implementations IndexRouting.ExtractFromSource.ForRoutingPath and IndexRouting.ExtractFromSource.ForIndexDimensions. This addresses review comments from #132566. Also fixes cases where the tsid is not provided by the coordinating node, such as for translog operations.
1 parent 5c5366d commit 41488ee

File tree

16 files changed

+531
-325
lines changed

16 files changed

+531
-325
lines changed

server/src/main/java/org/elasticsearch/action/index/IndexRequest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -923,7 +923,7 @@ public Index getConcreteWriteIndex(IndexAbstraction ia, ProjectMetadata project)
923923

924924
@Override
925925
public int route(IndexRouting indexRouting) {
926-
return indexRouting.indexShard(id, routing, tsid, indexSource.contentType(), indexSource.bytes());
926+
return indexRouting.indexShard(this);
927927
}
928928

929929
public IndexRequest setRequireAlias(boolean requireAlias) {

server/src/main/java/org/elasticsearch/cluster/routing/IndexRouting.java

Lines changed: 116 additions & 215 deletions
Large diffs are not rendered by default.
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.cluster.routing;
11+
12+
import org.apache.lucene.util.BytesRef;
13+
import org.elasticsearch.common.ParsingException;
14+
import org.elasticsearch.common.Strings;
15+
import org.elasticsearch.common.util.ByteUtils;
16+
import org.elasticsearch.core.Nullable;
17+
import org.elasticsearch.index.IndexVersions;
18+
import org.elasticsearch.xcontent.XContentParser;
19+
import org.elasticsearch.xcontent.XContentString;
20+
21+
import java.io.IOException;
22+
import java.util.ArrayList;
23+
import java.util.Collections;
24+
import java.util.List;
25+
import java.util.function.IntSupplier;
26+
import java.util.function.Predicate;
27+
28+
import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
29+
import static org.elasticsearch.common.xcontent.XContentParserUtils.expectValueToken;
30+
31+
/**
32+
* A builder for computing a hash from fields in the document source that are part of the
33+
* {@link org.elasticsearch.cluster.metadata.IndexMetadata#INDEX_ROUTING_PATH}.
34+
* It is used in the context of {@link IndexRouting.ExtractFromSource.ForRoutingPath} to determine the shard a document should be routed to.
35+
*/
36+
public class RoutingHashBuilder {
37+
private final List<NameAndHash> hashes = new ArrayList<>();
38+
private final Predicate<String> isRoutingPath;
39+
40+
public RoutingHashBuilder(Predicate<String> isRoutingPath) {
41+
this.isRoutingPath = isRoutingPath;
42+
}
43+
44+
public void addMatching(String fieldName, BytesRef string) {
45+
if (isRoutingPath.test(fieldName)) {
46+
addHash(fieldName, string);
47+
}
48+
}
49+
50+
/**
51+
* Only expected to be called for old indices created before
52+
* {@link IndexVersions#TIME_SERIES_ROUTING_HASH_IN_ID} while creating (during ingestion)
53+
* or synthesizing (at query time) the _id field.
54+
*/
55+
public String createId(byte[] suffix, IntSupplier onEmpty) {
56+
byte[] idBytes = new byte[4 + suffix.length];
57+
ByteUtils.writeIntLE(buildHash(onEmpty), idBytes, 0);
58+
System.arraycopy(suffix, 0, idBytes, 4, suffix.length);
59+
return Strings.BASE_64_NO_PADDING_URL_ENCODER.encodeToString(idBytes);
60+
}
61+
62+
void extractObject(@Nullable String path, XContentParser source) throws IOException {
63+
while (source.currentToken() != XContentParser.Token.END_OBJECT) {
64+
ensureExpectedToken(XContentParser.Token.FIELD_NAME, source.currentToken(), source);
65+
String fieldName = source.currentName();
66+
String subPath = path == null ? fieldName : path + "." + fieldName;
67+
source.nextToken();
68+
extractItem(subPath, source);
69+
}
70+
}
71+
72+
private void extractArray(@Nullable String path, XContentParser source) throws IOException {
73+
while (source.currentToken() != XContentParser.Token.END_ARRAY) {
74+
expectValueToken(source.currentToken(), source);
75+
extractItem(path, source);
76+
}
77+
}
78+
79+
private void extractItem(String path, XContentParser source) throws IOException {
80+
switch (source.currentToken()) {
81+
case START_OBJECT:
82+
source.nextToken();
83+
extractObject(path, source);
84+
source.nextToken();
85+
break;
86+
case VALUE_STRING:
87+
case VALUE_NUMBER:
88+
case VALUE_BOOLEAN:
89+
XContentString.UTF8Bytes utf8Bytes = source.optimizedText().bytes();
90+
addHash(path, new BytesRef(utf8Bytes.bytes(), utf8Bytes.offset(), utf8Bytes.length()));
91+
source.nextToken();
92+
break;
93+
case START_ARRAY:
94+
source.nextToken();
95+
extractArray(path, source);
96+
source.nextToken();
97+
break;
98+
case VALUE_NULL:
99+
source.nextToken();
100+
break;
101+
default:
102+
throw new ParsingException(
103+
source.getTokenLocation(),
104+
"Cannot extract routing path due to unexpected token [{}]",
105+
source.currentToken()
106+
);
107+
}
108+
}
109+
110+
private void addHash(String path, BytesRef value) {
111+
hashes.add(new NameAndHash(new BytesRef(path), IndexRouting.ExtractFromSource.hash(value), hashes.size()));
112+
}
113+
114+
int buildHash(IntSupplier onEmpty) {
115+
if (hashes.isEmpty()) {
116+
return onEmpty.getAsInt();
117+
}
118+
Collections.sort(hashes);
119+
int hash = 0;
120+
for (NameAndHash nah : hashes) {
121+
hash = 31 * hash + (IndexRouting.ExtractFromSource.hash(nah.name) ^ nah.hash);
122+
}
123+
return hash;
124+
}
125+
126+
private record NameAndHash(BytesRef name, int hash, int order) implements Comparable<NameAndHash> {
127+
@Override
128+
public int compareTo(NameAndHash o) {
129+
int i = name.compareTo(o.name);
130+
if (i != 0) return i;
131+
// ensures array values are in the order as they appear in the source
132+
return Integer.compare(order, o.order);
133+
}
134+
}
135+
}

server/src/main/java/org/elasticsearch/index/IndexMode.java

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
import org.elasticsearch.index.mapper.RoutingFields;
3636
import org.elasticsearch.index.mapper.RoutingPathFields;
3737
import org.elasticsearch.index.mapper.SourceFieldMapper;
38-
import org.elasticsearch.index.mapper.SourceToParse;
3938
import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper;
4039
import org.elasticsearch.index.mapper.TimeSeriesRoutingHashFieldMapper;
4140
import org.elasticsearch.index.mapper.TsidExtractingIdFieldMapper;
@@ -113,7 +112,7 @@ public IdFieldMapper buildIdFieldMapper(BooleanSupplier fieldDataEnabled) {
113112
}
114113

115114
@Override
116-
public RoutingFields buildRoutingFields(IndexSettings settings, SourceToParse source) {
115+
public RoutingFields buildRoutingFields(IndexSettings settings) {
117116
return RoutingFields.Noop.INSTANCE;
118117
}
119118

@@ -217,13 +216,15 @@ public IdFieldMapper buildIdFieldMapper(BooleanSupplier fieldDataEnabled) {
217216
}
218217

219218
@Override
220-
public RoutingFields buildRoutingFields(IndexSettings settings, SourceToParse source) {
221-
if (source.tsid() != null) {
222-
// If the source already has a _tsid field, we don't need to extract routing from the source.
219+
public RoutingFields buildRoutingFields(IndexSettings settings) {
220+
IndexRouting indexRouting = settings.getIndexRouting();
221+
if (indexRouting instanceof IndexRouting.ExtractFromSource.ForRoutingPath forRoutingPath) {
222+
return new RoutingPathFields(forRoutingPath.builder());
223+
} else if (indexRouting instanceof IndexRouting.ExtractFromSource.ForIndexDimensions) {
223224
return RoutingFields.Noop.INSTANCE;
225+
} else {
226+
throw new IllegalStateException("Index routing strategy not supported for index_mode=time_series: " + indexRouting);
224227
}
225-
IndexRouting.ExtractFromSource routing = (IndexRouting.ExtractFromSource) settings.getIndexRouting();
226-
return new RoutingPathFields(routing.builder());
227228
}
228229

229230
@Override
@@ -302,7 +303,7 @@ public MetadataFieldMapper timeSeriesRoutingHashFieldMapper() {
302303
}
303304

304305
@Override
305-
public RoutingFields buildRoutingFields(IndexSettings settings, SourceToParse source) {
306+
public RoutingFields buildRoutingFields(IndexSettings settings) {
306307
return RoutingFields.Noop.INSTANCE;
307308
}
308309

@@ -383,7 +384,7 @@ public IdFieldMapper buildIdFieldMapper(BooleanSupplier fieldDataEnabled) {
383384
}
384385

385386
@Override
386-
public RoutingFields buildRoutingFields(IndexSettings settings, SourceToParse source) {
387+
public RoutingFields buildRoutingFields(IndexSettings settings) {
387388
return RoutingFields.Noop.INSTANCE;
388389
}
389390

@@ -539,7 +540,7 @@ public String getName() {
539540
/**
540541
* How {@code time_series_dimension} fields are handled by indices in this mode.
541542
*/
542-
public abstract RoutingFields buildRoutingFields(IndexSettings settings, SourceToParse source);
543+
public abstract RoutingFields buildRoutingFields(IndexSettings settings);
543544

544545
/**
545546
* @return Whether timestamps should be validated for being withing the time range of an index.

server/src/main/java/org/elasticsearch/index/mapper/DocumentParser.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.apache.lucene.index.LeafReaderContext;
1414
import org.apache.lucene.search.Query;
1515
import org.apache.lucene.util.BytesRef;
16+
import org.elasticsearch.cluster.routing.IndexRouting;
1617
import org.elasticsearch.common.Explicit;
1718
import org.elasticsearch.common.regex.Regex;
1819
import org.elasticsearch.common.xcontent.XContentHelper;
@@ -96,7 +97,7 @@ public ParsedDocument parseDocument(SourceToParse source, MappingLookup mappingL
9697
)
9798
)
9899
) {
99-
context = new RootDocumentParserContext(mappingLookup, mappingParserContext, source, parser, source.tsid());
100+
context = new RootDocumentParserContext(mappingLookup, mappingParserContext, source, parser);
100101
validateStart(context.parser());
101102
MetadataFieldMapper[] metadataFieldsMappers = mappingLookup.getMapping().getSortedMetadataMappers();
102103
internalParseDocument(metadataFieldsMappers, context);
@@ -1077,8 +1078,7 @@ private static class RootDocumentParserContext extends DocumentParserContext {
10771078
MappingLookup mappingLookup,
10781079
MappingParserContext mappingParserContext,
10791080
SourceToParse source,
1080-
XContentParser parser,
1081-
BytesRef tsid
1081+
XContentParser parser
10821082
) throws IOException {
10831083
super(
10841084
mappingLookup,
@@ -1087,8 +1087,17 @@ private static class RootDocumentParserContext extends DocumentParserContext {
10871087
mappingLookup.getMapping().getRoot(),
10881088
ObjectMapper.Dynamic.getRootDynamic(mappingLookup)
10891089
);
1090+
IndexSettings indexSettings = mappingParserContext.getIndexSettings();
1091+
BytesRef tsid = source.tsid();
1092+
if (tsid == null
1093+
&& indexSettings.getMode() == IndexMode.TIME_SERIES
1094+
&& indexSettings.getIndexRouting() instanceof IndexRouting.ExtractFromSource.ForIndexDimensions forIndexDimensions) {
1095+
// the tsid is normally set on the coordinating node during shard routing and passed to the data node via the index request
1096+
// but when applying a translog operation, shard routing is not happening, and we have to create the tsid from source
1097+
tsid = forIndexDimensions.buildTsid(source.getXContentType(), source.source());
1098+
}
10901099
this.tsid = tsid;
1091-
assert tsid == null || mappingParserContext.getIndexSettings().getMode() == IndexMode.TIME_SERIES
1100+
assert this.tsid == null || indexSettings.getMode() == IndexMode.TIME_SERIES
10921101
: "tsid should only be set for time series indices";
10931102
if (mappingLookup.getMapping().getRoot().subobjects() == ObjectMapper.Subobjects.ENABLED) {
10941103
this.parser = DotExpandingXContentParser.expandDots(parser, this.path);

server/src/main/java/org/elasticsearch/index/mapper/DocumentParserContext.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,7 @@ protected DocumentParserContext(
271271
null,
272272
null,
273273
SeqNoFieldMapper.SequenceIDFields.emptySeqID(mappingParserContext.getIndexSettings().seqNoIndexOptions()),
274-
RoutingFields.fromIndexSettings(mappingParserContext.getIndexSettings(), source),
274+
RoutingFields.fromIndexSettings(mappingParserContext.getIndexSettings()),
275275
parent,
276276
dynamic,
277277
new HashSet<>(),

server/src/main/java/org/elasticsearch/index/mapper/IdLoader.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import org.apache.lucene.util.BytesRef;
1818
import org.elasticsearch.cluster.metadata.DataStream;
1919
import org.elasticsearch.cluster.routing.IndexRouting;
20+
import org.elasticsearch.cluster.routing.RoutingHashBuilder;
2021
import org.elasticsearch.index.fieldvisitor.LeafStoredFieldLoader;
2122

2223
import java.io.IOException;
@@ -37,7 +38,7 @@ static IdLoader fromLeafStoredFieldLoader() {
3738
/**
3839
* @return returns an {@link IdLoader} instance that syn synthesizes _id from routing, _tsid and @timestamp fields.
3940
*/
40-
static IdLoader createTsIdLoader(IndexRouting.ExtractFromSource indexRouting, List<String> routingPaths) {
41+
static IdLoader createTsIdLoader(IndexRouting.ExtractFromSource.ForRoutingPath indexRouting, List<String> routingPaths) {
4142
return new TsIdLoader(indexRouting, routingPaths);
4243
}
4344

@@ -58,19 +59,19 @@ sealed interface Leaf permits StoredLeaf, TsIdLeaf {
5859

5960
final class TsIdLoader implements IdLoader {
6061

61-
private final IndexRouting.ExtractFromSource indexRouting;
62+
private final IndexRouting.ExtractFromSource.ForRoutingPath indexRouting;
6263
private final List<String> routingPaths;
6364

64-
TsIdLoader(IndexRouting.ExtractFromSource indexRouting, List<String> routingPaths) {
65+
TsIdLoader(IndexRouting.ExtractFromSource.ForRoutingPath indexRouting, List<String> routingPaths) {
6566
this.routingPaths = routingPaths;
6667
this.indexRouting = indexRouting;
6768
}
6869

6970
public IdLoader.Leaf leaf(LeafStoredFieldLoader loader, LeafReader reader, int[] docIdsInLeaf) throws IOException {
70-
IndexRouting.ExtractFromSource.RoutingHashBuilder[] builders = null;
71+
RoutingHashBuilder[] builders = null;
7172
if (indexRouting != null) {
7273
// this branch is for legacy indices before IndexVersions.TIME_SERIES_ROUTING_HASH_IN_ID
73-
builders = new IndexRouting.ExtractFromSource.RoutingHashBuilder[docIdsInLeaf.length];
74+
builders = new RoutingHashBuilder[docIdsInLeaf.length];
7475
for (int i = 0; i < builders.length; i++) {
7576
builders[i] = indexRouting.builder();
7677
}

server/src/main/java/org/elasticsearch/index/mapper/RoutingFields.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@ public interface RoutingFields {
2222
/**
2323
* Collect routing fields from index settings
2424
*/
25-
static RoutingFields fromIndexSettings(IndexSettings indexSettings, SourceToParse source) {
26-
return indexSettings.getMode().buildRoutingFields(indexSettings, source);
25+
static RoutingFields fromIndexSettings(IndexSettings indexSettings) {
26+
return indexSettings.getMode().buildRoutingFields(indexSettings);
2727
}
2828

2929
/**

server/src/main/java/org/elasticsearch/index/mapper/RoutingPathFields.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111

1212
import org.apache.lucene.util.BytesRef;
1313
import org.apache.lucene.util.StringHelper;
14-
import org.elasticsearch.cluster.routing.IndexRouting;
14+
import org.elasticsearch.cluster.routing.RoutingHashBuilder;
1515
import org.elasticsearch.common.bytes.BytesArray;
1616
import org.elasticsearch.common.bytes.BytesReference;
1717
import org.elasticsearch.common.hash.Murmur3Hasher;
@@ -59,17 +59,17 @@ public final class RoutingPathFields implements RoutingFields {
5959
* Builds the routing. Used for building {@code _id}. If null then skipped.
6060
*/
6161
@Nullable
62-
private final IndexRouting.ExtractFromSource.RoutingHashBuilder routingBuilder;
62+
private final RoutingHashBuilder routingBuilder;
6363

64-
public RoutingPathFields(@Nullable IndexRouting.ExtractFromSource.RoutingHashBuilder routingBuilder) {
64+
public RoutingPathFields(@Nullable RoutingHashBuilder routingBuilder) {
6565
this.routingBuilder = routingBuilder;
6666
}
6767

6868
SortedMap<BytesRef, List<BytesReference>> routingValues() {
6969
return Collections.unmodifiableSortedMap(routingValues);
7070
}
7171

72-
IndexRouting.ExtractFromSource.RoutingHashBuilder routingBuilder() {
72+
RoutingHashBuilder routingBuilder() {
7373
return routingBuilder;
7474
}
7575

server/src/main/java/org/elasticsearch/index/mapper/TimeSeriesIdFieldMapper.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
import org.apache.lucene.document.StringField;
1515
import org.apache.lucene.search.Query;
1616
import org.apache.lucene.util.BytesRef;
17-
import org.elasticsearch.cluster.routing.IndexRouting;
17+
import org.elasticsearch.cluster.routing.RoutingHashBuilder;
1818
import org.elasticsearch.common.Strings;
1919
import org.elasticsearch.common.bytes.BytesReference;
2020
import org.elasticsearch.common.io.stream.BytesStreamOutput;
@@ -161,12 +161,13 @@ public void postParse(DocumentParserContext context) throws IOException {
161161
throw new MapperException("Too many dimension fields [" + size + "], max [" + limit + "] dimension fields allowed");
162162
}
163163
timeSeriesId = buildLegacyTsid(routingPathFields).toBytesRef();
164-
} else if (context.getTsid() != null) {
164+
} else if (context.getRoutingFields() instanceof RoutingPathFields routingPathFieldsFromContext) {
165+
routingPathFields = routingPathFieldsFromContext;
166+
timeSeriesId = routingPathFields.buildHash().toBytesRef();
167+
} else {
165168
routingPathFields = null;
169+
assert context.getTsid() != null;
166170
timeSeriesId = context.getTsid();
167-
} else {
168-
routingPathFields = (RoutingPathFields) context.getRoutingFields();
169-
timeSeriesId = routingPathFields.buildHash().toBytesRef();
170171
}
171172

172173
if (this.useDocValuesSkipper) {
@@ -175,7 +176,7 @@ public void postParse(DocumentParserContext context) throws IOException {
175176
context.doc().add(new SortedDocValuesField(fieldType().name(), timeSeriesId));
176177
}
177178

178-
IndexRouting.ExtractFromSource.RoutingHashBuilder routingBuilder;
179+
RoutingHashBuilder routingBuilder;
179180
if (getIndexVersionCreated(context).before(IndexVersions.TIME_SERIES_ROUTING_HASH_IN_ID) && routingPathFields != null) {
180181
// For legacy indices, we need to create the routing hash from the routing path fields.
181182
routingBuilder = routingPathFields.routingBuilder();

0 commit comments

Comments
 (0)