Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -923,7 +923,7 @@ public Index getConcreteWriteIndex(IndexAbstraction ia, ProjectMetadata project)

@Override
public int route(IndexRouting indexRouting) {
return indexRouting.indexShard(id, routing, tsid, indexSource.contentType(), indexSource.bytes());
return indexRouting.indexShard(this);
}

public IndexRequest setRequireAlias(boolean requireAlias) {
Expand Down
322 changes: 114 additions & 208 deletions server/src/main/java/org/elasticsearch/cluster/routing/IndexRouting.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.cluster.routing;

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.ParsingException;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.util.ByteUtils;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.index.IndexVersions;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xcontent.XContentString;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.function.IntSupplier;
import java.util.function.Predicate;

import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.elasticsearch.common.xcontent.XContentParserUtils.expectValueToken;

/**
* A builder for computing a hash from fields in the document source that are part of the
* {@link org.elasticsearch.cluster.metadata.IndexMetadata#INDEX_ROUTING_PATH}.
* It is used in the context of {@link IndexRouting.ExtractFromSource.ForRoutingPath} to determine the shard a document should be routed to.
*/
public class RoutingHashBuilder {
private final List<NameAndHash> hashes = new ArrayList<>();
private final Predicate<String> isRoutingPath;

public RoutingHashBuilder(Predicate<String> isRoutingPath) {
this.isRoutingPath = isRoutingPath;
}

public void addMatching(String fieldName, BytesRef string) {
if (isRoutingPath.test(fieldName)) {
addHash(fieldName, string);
}
}

/**
* Only expected to be called for old indices created before
* {@link IndexVersions#TIME_SERIES_ROUTING_HASH_IN_ID} while creating (during ingestion)
* or synthesizing (at query time) the _id field.
*/
public String createId(byte[] suffix, IntSupplier onEmpty) {
byte[] idBytes = new byte[4 + suffix.length];
ByteUtils.writeIntLE(buildHash(onEmpty), idBytes, 0);
System.arraycopy(suffix, 0, idBytes, 4, suffix.length);
return Strings.BASE_64_NO_PADDING_URL_ENCODER.encodeToString(idBytes);
}

void extractObject(@Nullable String path, XContentParser source) throws IOException {
while (source.currentToken() != XContentParser.Token.END_OBJECT) {
ensureExpectedToken(XContentParser.Token.FIELD_NAME, source.currentToken(), source);
String fieldName = source.currentName();
String subPath = path == null ? fieldName : path + "." + fieldName;
source.nextToken();
extractItem(subPath, source);
}
}

private void extractArray(@Nullable String path, XContentParser source) throws IOException {
while (source.currentToken() != XContentParser.Token.END_ARRAY) {
expectValueToken(source.currentToken(), source);
extractItem(path, source);
}
}

private void extractItem(String path, XContentParser source) throws IOException {
switch (source.currentToken()) {
case START_OBJECT:
source.nextToken();
extractObject(path, source);
source.nextToken();
break;
case VALUE_STRING:
case VALUE_NUMBER:
case VALUE_BOOLEAN:
XContentString.UTF8Bytes utf8Bytes = source.optimizedText().bytes();
addHash(path, new BytesRef(utf8Bytes.bytes(), utf8Bytes.offset(), utf8Bytes.length()));
source.nextToken();
break;
case START_ARRAY:
source.nextToken();
extractArray(path, source);
source.nextToken();
break;
case VALUE_NULL:
source.nextToken();
break;
default:
throw new ParsingException(
source.getTokenLocation(),
"Cannot extract routing path due to unexpected token [{}]",
source.currentToken()
);
}
}

private void addHash(String path, BytesRef value) {
hashes.add(new NameAndHash(new BytesRef(path), IndexRouting.ExtractFromSource.hash(value), hashes.size()));
}

int buildHash(IntSupplier onEmpty) {
if (hashes.isEmpty()) {
return onEmpty.getAsInt();
}
Collections.sort(hashes);
int hash = 0;
for (NameAndHash nah : hashes) {
hash = 31 * hash + (IndexRouting.ExtractFromSource.hash(nah.name) ^ nah.hash);
}
return hash;
}

private record NameAndHash(BytesRef name, int hash, int order) implements Comparable<NameAndHash> {
@Override
public int compareTo(NameAndHash o) {
int i = name.compareTo(o.name);
if (i != 0) return i;
// ensures array values are in the order as they appear in the source
return Integer.compare(order, o.order);
}
}
}
3 changes: 2 additions & 1 deletion server/src/main/java/org/elasticsearch/index/IndexMode.java
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,8 @@ public RoutingFields buildRoutingFields(IndexSettings settings, SourceToParse so
// If the source already has a _tsid field, we don't need to extract routing from the source.
return RoutingFields.Noop.INSTANCE;
}
IndexRouting.ExtractFromSource routing = (IndexRouting.ExtractFromSource) settings.getIndexRouting();
IndexRouting.ExtractFromSource.ForRoutingPath routing = (IndexRouting.ExtractFromSource.ForRoutingPath) settings
.getIndexRouting();
return new RoutingPathFields(routing.builder());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.routing.IndexRouting;
import org.elasticsearch.cluster.routing.RoutingHashBuilder;
import org.elasticsearch.index.fieldvisitor.LeafStoredFieldLoader;

import java.io.IOException;
Expand All @@ -37,7 +38,7 @@ static IdLoader fromLeafStoredFieldLoader() {
/**
* @return returns an {@link IdLoader} instance that syn synthesizes _id from routing, _tsid and @timestamp fields.
*/
static IdLoader createTsIdLoader(IndexRouting.ExtractFromSource indexRouting, List<String> routingPaths) {
static IdLoader createTsIdLoader(IndexRouting.ExtractFromSource.ForRoutingPath indexRouting, List<String> routingPaths) {
return new TsIdLoader(indexRouting, routingPaths);
}

Expand All @@ -58,19 +59,19 @@ sealed interface Leaf permits StoredLeaf, TsIdLeaf {

final class TsIdLoader implements IdLoader {

private final IndexRouting.ExtractFromSource indexRouting;
private final IndexRouting.ExtractFromSource.ForRoutingPath indexRouting;
private final List<String> routingPaths;

TsIdLoader(IndexRouting.ExtractFromSource indexRouting, List<String> routingPaths) {
TsIdLoader(IndexRouting.ExtractFromSource.ForRoutingPath indexRouting, List<String> routingPaths) {
this.routingPaths = routingPaths;
this.indexRouting = indexRouting;
}

public IdLoader.Leaf leaf(LeafStoredFieldLoader loader, LeafReader reader, int[] docIdsInLeaf) throws IOException {
IndexRouting.ExtractFromSource.RoutingHashBuilder[] builders = null;
RoutingHashBuilder[] builders = null;
if (indexRouting != null) {
// this branch is for legacy indices before IndexVersions.TIME_SERIES_ROUTING_HASH_IN_ID
builders = new IndexRouting.ExtractFromSource.RoutingHashBuilder[docIdsInLeaf.length];
builders = new RoutingHashBuilder[docIdsInLeaf.length];
for (int i = 0; i < builders.length; i++) {
builders[i] = indexRouting.builder();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.StringHelper;
import org.elasticsearch.cluster.routing.IndexRouting;
import org.elasticsearch.cluster.routing.RoutingHashBuilder;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.hash.Murmur3Hasher;
Expand Down Expand Up @@ -59,17 +59,17 @@ public final class RoutingPathFields implements RoutingFields {
* Builds the routing. Used for building {@code _id}. If null then skipped.
*/
@Nullable
private final IndexRouting.ExtractFromSource.RoutingHashBuilder routingBuilder;
private final RoutingHashBuilder routingBuilder;

public RoutingPathFields(@Nullable IndexRouting.ExtractFromSource.RoutingHashBuilder routingBuilder) {
public RoutingPathFields(@Nullable RoutingHashBuilder routingBuilder) {
this.routingBuilder = routingBuilder;
}

SortedMap<BytesRef, List<BytesReference>> routingValues() {
return Collections.unmodifiableSortedMap(routingValues);
}

IndexRouting.ExtractFromSource.RoutingHashBuilder routingBuilder() {
RoutingHashBuilder routingBuilder() {
return routingBuilder;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import org.apache.lucene.document.StringField;
import org.apache.lucene.search.Query;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.cluster.routing.IndexRouting;
import org.elasticsearch.cluster.routing.RoutingHashBuilder;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
Expand Down Expand Up @@ -175,7 +175,7 @@ public void postParse(DocumentParserContext context) throws IOException {
context.doc().add(new SortedDocValuesField(fieldType().name(), timeSeriesId));
}

IndexRouting.ExtractFromSource.RoutingHashBuilder routingBuilder;
RoutingHashBuilder routingBuilder;
if (getIndexVersionCreated(context).before(IndexVersions.TIME_SERIES_ROUTING_HASH_IN_ID) && routingPathFields != null) {
// For legacy indices, we need to create the routing hash from the routing path fields.
routingBuilder = routingPathFields.routingBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.cluster.routing.IndexRouting;
import org.elasticsearch.cluster.routing.RoutingHashBuilder;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.hash.MurmurHash3;
import org.elasticsearch.common.hash.MurmurHash3.Hash128;
Expand Down Expand Up @@ -46,11 +47,7 @@ public IndexFieldData.Builder fielddataBuilder(FieldDataContext fieldDataContext

private static final long SEED = 0;

public static BytesRef createField(
DocumentParserContext context,
IndexRouting.ExtractFromSource.RoutingHashBuilder routingBuilder,
BytesRef tsid
) {
public static BytesRef createField(DocumentParserContext context, RoutingHashBuilder routingBuilder, BytesRef tsid) {
final long timestamp = DataStreamTimestampFieldMapper.extractTimestampValue(context.doc());
String id;
if (routingBuilder != null) {
Expand All @@ -65,7 +62,9 @@ public static BytesRef createField(
* at all we just skip the assertion because we can't be sure
* it always must pass.
*/
IndexRouting.ExtractFromSource indexRouting = (IndexRouting.ExtractFromSource) context.indexSettings().getIndexRouting();
IndexRouting.ExtractFromSource.ForRoutingPath indexRouting = (IndexRouting.ExtractFromSource.ForRoutingPath) context
.indexSettings()
.getIndexRouting();
assert context.getDynamicMappers().isEmpty() == false
|| context.getDynamicRuntimeFields().isEmpty() == false
|| id.equals(indexRouting.createId(context.sourceToParse().getXContentType(), context.sourceToParse().source(), suffix));
Expand Down Expand Up @@ -115,7 +114,7 @@ public static String createId(int routingHash, BytesRef tsid, long timestamp) {

public static String createId(
boolean dynamicMappersExists,
IndexRouting.ExtractFromSource.RoutingHashBuilder routingBuilder,
RoutingHashBuilder routingBuilder,
BytesRef tsid,
long timestamp,
byte[] suffix
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -951,10 +951,10 @@ public SourceLoader newSourceLoader(@Nullable SourceFilter filter) {
@Override
public IdLoader newIdLoader() {
if (indexService.getIndexSettings().getMode() == IndexMode.TIME_SERIES) {
IndexRouting.ExtractFromSource indexRouting = null;
IndexRouting.ExtractFromSource.ForRoutingPath indexRouting = null;
List<String> routingPaths = null;
if (indexService.getIndexSettings().getIndexVersionCreated().before(IndexVersions.TIME_SERIES_ROUTING_HASH_IN_ID)) {
indexRouting = (IndexRouting.ExtractFromSource) indexService.getIndexSettings().getIndexRouting();
indexRouting = (IndexRouting.ExtractFromSource.ForRoutingPath) indexService.getIndexSettings().getIndexRouting();
routingPaths = indexService.getMetadata().getRoutingPaths();
for (String routingField : routingPaths) {
if (routingField.contains("*")) {
Expand Down
Loading