-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Introduce FallbackSyntheticSourceBlockLoader and apply it to keyword fields #119546
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 13 commits
f967b21
db00265
ada0166
4e2bff8
42839e2
bfcf484
5fed411
5d170ba
c936d4c
0a51e05
af4614f
e3ec594
91956b5
9209744
39007f9
2d7fe91
e62d3ea
42841e1
bc2876f
dbb082b
8893e03
2a64271
5251092
e338129
8aa717b
82d7cca
60d5bfe
c6dff1d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,236 @@ | ||
| /* | ||
| * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
| * or more contributor license agreements. Licensed under the "Elastic License | ||
| * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side | ||
| * Public License v 1"; you may not use this file except in compliance with, at | ||
| * your election, the "Elastic License 2.0", the "GNU Affero General Public | ||
| * License v3.0 only", or the "Server Side Public License, v 1". | ||
| */ | ||
|
|
||
| package org.elasticsearch.index.mapper; | ||
|
|
||
| import org.apache.lucene.index.LeafReaderContext; | ||
| import org.apache.lucene.index.SortedSetDocValues; | ||
| import org.elasticsearch.search.fetch.StoredFieldsSpec; | ||
| import org.elasticsearch.xcontent.XContentParser; | ||
| import org.elasticsearch.xcontent.XContentParserConfiguration; | ||
|
|
||
| import java.io.IOException; | ||
| import java.util.ArrayList; | ||
| import java.util.HashMap; | ||
| import java.util.HashSet; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.Optional; | ||
| import java.util.Set; | ||
|
|
||
| /** | ||
| * Block loader for fields that use fallback synthetic source implementation. | ||
| * <br> | ||
| * Usually fields have doc_values or stored fields and block loaders use them directly. In some cases neither is available | ||
| * and we would fall back to (potentially synthetic) _source. However, in case of synthetic source, there is actually no need to | ||
| * construct the entire _source. We know that there is no doc_values and stored fields, and therefore we will be using fallback synthetic | ||
| * source. That is equivalent to just reading _ignored_source stored field directly and doing an in-place synthetic source just | ||
| * for this field. | ||
| * <br> | ||
| * See {@link IgnoredSourceFieldMapper}. | ||
| */ | ||
| public abstract class FallbackSyntheticSourceBlockLoader implements BlockLoader { | ||
| private final Reader<?> reader; | ||
| private final String fieldName; | ||
|
|
||
| protected FallbackSyntheticSourceBlockLoader(Reader<?> reader, String fieldName) { | ||
| this.reader = reader; | ||
| this.fieldName = fieldName; | ||
| } | ||
|
|
||
| @Override | ||
| public ColumnAtATimeReader columnAtATimeReader(LeafReaderContext context) throws IOException { | ||
| return null; | ||
| } | ||
|
|
||
| @Override | ||
| public RowStrideReader rowStrideReader(LeafReaderContext context) throws IOException { | ||
| return new IgnoredSourceRowStrideReader<>(fieldName, reader); | ||
| } | ||
|
|
||
| @Override | ||
| public StoredFieldsSpec rowStrideStoredFieldSpec() { | ||
| return new StoredFieldsSpec(false, false, Set.of(IgnoredSourceFieldMapper.NAME)); | ||
| } | ||
|
|
||
| @Override | ||
| public boolean supportsOrdinals() { | ||
| return false; | ||
| } | ||
|
|
||
| @Override | ||
| public SortedSetDocValues ordinals(LeafReaderContext context) throws IOException { | ||
| throw new UnsupportedOperationException(); | ||
| } | ||
|
|
||
| private record IgnoredSourceRowStrideReader<T>(String fieldName, Reader<T> reader) implements RowStrideReader { | ||
| @Override | ||
| public void read(int docId, StoredFields storedFields, Builder builder) throws IOException { | ||
| var ignoredSource = storedFields.storedFields().get(IgnoredSourceFieldMapper.NAME); | ||
| if (ignoredSource == null) { | ||
| return; | ||
| } | ||
|
|
||
| Map<String, List<IgnoredSourceFieldMapper.NameValue>> valuesForFieldAndParents = new HashMap<>(); | ||
|
|
||
| // Contains name of the field and all its parents | ||
| Set<String> fieldNames = new HashSet<>() { | ||
| { | ||
| add("_doc"); | ||
| } | ||
| }; | ||
|
|
||
| var current = new StringBuilder(); | ||
| for (String part : fieldName.split("\\.")) { | ||
| current.append(part); | ||
| fieldNames.add(current.toString()); | ||
| } | ||
|
|
||
| for (Object value : ignoredSource) { | ||
| IgnoredSourceFieldMapper.NameValue nameValue = IgnoredSourceFieldMapper.decode(value); | ||
| if (fieldNames.contains(nameValue.name())) { | ||
| valuesForFieldAndParents.computeIfAbsent(nameValue.name(), k -> new ArrayList<>()).add(nameValue); | ||
| } | ||
| } | ||
|
|
||
| // TODO figure out how to handle XContentDataHelper#voidValue() | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. iirc voidValue means to ignore ignored source, because we do have values in doc values for a field? I'm trying to remember in what cases we use void value. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
|
|
||
| var blockValues = new ArrayList<T>(); | ||
|
|
||
| var leafFieldValue = valuesForFieldAndParents.get(fieldName); | ||
| if (leafFieldValue != null) { | ||
| readFromFieldValue(leafFieldValue, blockValues); | ||
| } else { | ||
| readFromParentValue(valuesForFieldAndParents, blockValues); | ||
| } | ||
|
|
||
| if (blockValues.isEmpty() == false) { | ||
| if (blockValues.size() > 1) { | ||
| builder.beginPositionEntry(); | ||
| } | ||
|
|
||
| reader.writeToBlock(blockValues, builder); | ||
|
|
||
| if (blockValues.size() > 1) { | ||
| builder.endPositionEntry(); | ||
| } | ||
| } else { | ||
| builder.appendNull(); | ||
| } | ||
| } | ||
|
|
||
| private void readFromFieldValue(List<IgnoredSourceFieldMapper.NameValue> nameValues, List<T> blockValues) throws IOException { | ||
| if (nameValues.isEmpty()) { | ||
| return; | ||
| } | ||
|
|
||
| for (var nameValue : nameValues) { | ||
| // Leaf field is stored directly (not as a part of a parent object), let's try to decode it. | ||
| Optional<Object> singleValue = XContentDataHelper.decode(nameValue.value()); | ||
| if (singleValue.isPresent()) { | ||
| reader.convertValue(singleValue.get(), blockValues); | ||
| continue; | ||
| } | ||
|
|
||
| // We have a value for this field but it's an array or an object | ||
| var type = XContentDataHelper.decodeType(nameValue.value()); | ||
| assert type.isPresent(); | ||
|
|
||
| try ( | ||
| XContentParser parser = type.get() | ||
| .xContent() | ||
| .createParser( | ||
| XContentParserConfiguration.EMPTY, | ||
| nameValue.value().bytes, | ||
| nameValue.value().offset + 1, | ||
| nameValue.value().length - 1 | ||
| ) | ||
| ) { | ||
| parser.nextToken(); | ||
| reader.parse(parser, blockValues); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private void readFromParentValue( | ||
| Map<String, List<IgnoredSourceFieldMapper.NameValue>> valuesForFieldAndParents, | ||
| List<T> blockValues | ||
| ) throws IOException { | ||
| if (valuesForFieldAndParents.isEmpty()) { | ||
| return; | ||
| } | ||
|
|
||
| // If a parent object is stored at a particular level its children won't be stored. | ||
| // So we should only ever have one parent here. | ||
| assert valuesForFieldAndParents.size() == 1 : "_ignored_source field contains multiple levels of the same object"; | ||
| var parentValues = valuesForFieldAndParents.values().iterator().next(); | ||
|
|
||
| for (var nameValue : parentValues) { | ||
| parseFieldFromParent(nameValue, blockValues); | ||
| } | ||
| } | ||
|
|
||
| private void parseFieldFromParent(IgnoredSourceFieldMapper.NameValue nameValue, List<T> blockValues) throws IOException { | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So this stuff is pretty similar to existing logic in |
||
| var type = XContentDataHelper.decodeType(nameValue.value()); | ||
| assert type.isPresent(); | ||
|
|
||
| String nameAtThisLevel = fieldName.substring(nameValue.name().length() + 1); | ||
| var filterParserConfig = XContentParserConfiguration.EMPTY.withFiltering(null, Set.of(nameAtThisLevel), Set.of(), true); | ||
| try ( | ||
| XContentParser parser = type.get() | ||
| .xContent() | ||
| .createParser(filterParserConfig, nameValue.value().bytes, nameValue.value().offset + 1, nameValue.value().length - 1) | ||
| ) { | ||
| parser.nextToken(); | ||
| var fieldNameInParser = new StringBuilder(nameValue.name()); | ||
| while (true) { | ||
| if (parser.currentToken() == XContentParser.Token.FIELD_NAME) { | ||
| fieldNameInParser.append('.').append(parser.currentName()); | ||
| if (fieldNameInParser.toString().equals(fieldName)) { | ||
| parser.nextToken(); | ||
| break; | ||
| } | ||
| } | ||
| parser.nextToken(); | ||
| } | ||
|
|
||
| reader.parse(parser, blockValues); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public boolean canReuse(int startingDocID) { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't know what to do here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Like |
||
| // TODO | ||
| return false; | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Field-specific implementation that converts data stored in _ignored_source field to block loader values. | ||
| * @param <T> | ||
| */ | ||
| public interface Reader<T> { | ||
| /** | ||
| * Converts a raw stored value for this field to a value in a format suitable for block loader and adds it to the provided | ||
| * accumulator. | ||
| * @param value raw decoded value from _ignored_source field (synthetic _source value) | ||
| * @param accumulator list containing the result of conversion | ||
| */ | ||
| void convertValue(Object value, List<T> accumulator); | ||
|
|
||
| /** | ||
| * Parses one or more complex values using a provided parser and adds them to the provided accumulator. | ||
| * @param parser parser of a value from _ignored_source field (synthetic _source value) | ||
| * @param accumulator list containing the results of parsing | ||
| */ | ||
| void parse(XContentParser parser, List<T> accumulator) throws IOException; | ||
|
|
||
| void writeToBlock(List<T> values, Builder blockBuilder); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -64,13 +64,15 @@ | |
| import org.elasticsearch.search.runtime.StringScriptFieldTermQuery; | ||
| import org.elasticsearch.search.runtime.StringScriptFieldWildcardQuery; | ||
| import org.elasticsearch.xcontent.XContentBuilder; | ||
| import org.elasticsearch.xcontent.XContentParser; | ||
|
|
||
| import java.io.IOException; | ||
| import java.io.UncheckedIOException; | ||
| import java.util.ArrayList; | ||
| import java.util.Arrays; | ||
| import java.util.Collection; | ||
| import java.util.Collections; | ||
| import java.util.List; | ||
| import java.util.Locale; | ||
| import java.util.Map; | ||
| import java.util.Objects; | ||
|
|
@@ -631,6 +633,68 @@ public BlockLoader blockLoader(BlockLoaderContext blContext) { | |
| if (isStored()) { | ||
| return new BlockStoredFieldsReader.BytesFromBytesRefsBlockLoader(name()); | ||
| } | ||
|
|
||
| if (isSyntheticSource) { | ||
| var reader = new FallbackSyntheticSourceBlockLoader.Reader<BytesRef>() { | ||
| @Override | ||
| public void convertValue(Object value, List<BytesRef> accumulator) { | ||
| assert value instanceof BytesRef; | ||
|
||
|
|
||
| String stringValue = ((BytesRef) value).utf8ToString(); | ||
| String adjusted = applyIgnoreAboveAndNormalizer(stringValue); | ||
| if (adjusted != null) { | ||
| // TODO what if the value didn't change? | ||
| accumulator.add(new BytesRef(adjusted)); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public void parse(XContentParser parser, List<BytesRef> accumulator) throws IOException { | ||
| if (parser.currentToken() == XContentParser.Token.VALUE_NULL) { | ||
| return; | ||
| } | ||
|
|
||
| if (parser.currentToken() == XContentParser.Token.START_ARRAY) { | ||
| while (parser.nextToken() != XContentParser.Token.END_ARRAY) { | ||
| if (parser.currentToken() == XContentParser.Token.VALUE_NULL) { | ||
| continue; | ||
| } | ||
|
|
||
| assert parser.currentToken() == XContentParser.Token.VALUE_STRING; | ||
|
|
||
| var value = applyIgnoreAboveAndNormalizer(parser.text()); | ||
| if (value != null) { | ||
| accumulator.add(new BytesRef(value)); | ||
| } | ||
| } | ||
| return; | ||
| } | ||
|
|
||
| assert parser.currentToken() == XContentParser.Token.VALUE_STRING : "Unexpected token " + parser.currentToken(); | ||
| var value = applyIgnoreAboveAndNormalizer(parser.text()); | ||
| if (value != null) { | ||
| accumulator.add(new BytesRef(value)); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public void writeToBlock(List<BytesRef> values, BlockLoader.Builder blockBuilder) { | ||
| var bytesRefBuilder = (BlockLoader.BytesRefBuilder) blockBuilder; | ||
|
|
||
| for (var value : values) { | ||
| bytesRefBuilder.appendBytesRef(value); | ||
| } | ||
| } | ||
| }; | ||
|
|
||
| return new FallbackSyntheticSourceBlockLoader(reader, name()) { | ||
| @Override | ||
| public Builder builder(BlockFactory factory, int expectedCount) { | ||
| return factory.bytesRefs(expectedCount); | ||
| } | ||
| }; | ||
martijnvg marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| SourceValueFetcher fetcher = sourceValueFetcher(blContext.sourcePaths(name())); | ||
| return new BlockSourceReader.BytesRefsBlockLoader(fetcher, sourceBlockLoaderLookup(blContext)); | ||
| } | ||
|
|
@@ -714,15 +778,19 @@ private SourceValueFetcher sourceValueFetcher(Set<String> sourcePaths) { | |
| @Override | ||
| protected String parseSourceValue(Object value) { | ||
| String keywordValue = value.toString(); | ||
| if (keywordValue.length() > ignoreAbove) { | ||
| return null; | ||
| } | ||
|
|
||
| return normalizeValue(normalizer(), name(), keywordValue); | ||
| return applyIgnoreAboveAndNormalizer(keywordValue); | ||
| } | ||
| }; | ||
| } | ||
|
|
||
| private String applyIgnoreAboveAndNormalizer(String value) { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This got me thinking, but I think is expected in the context of es|ql? You would never see un-normalized or ignored values. With synthetic source (and normal source), you just get what got stored. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, this is how reading from stored source works as well. |
||
| if (value.length() > ignoreAbove) { | ||
| return null; | ||
| } | ||
|
|
||
| return normalizeValue(normalizer(), name(), value); | ||
| } | ||
|
|
||
| @Override | ||
| public Object valueForDisplay(Object value) { | ||
| if (value == null) { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm still trying to understand if this is necessary. #113827 introduced filtering in source construction. If we set
SourceFilterinSourceLoader.Syntheticinitialization to filter for the requested fields only, do we get a more generic solution that covers all uses of synthetic source? I assume the latter is used inSourceBlockLoader.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had a conversation with @martijnvg about this. There is a more general problem here.
SourceProvider/SourceLoaderinfra we are currently using is designed for_searchandgetuse cases and just doesn't fit into ESQL. See #117792. So this is not solely an optimization but also allows to remove workarounds like #117792.