-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Optimized field visitor for ES|QL loading from _ignored_source #132428
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
609ee45
0d59c1d
0834dcd
c05fdb3
ac52d06
6349ee8
bd7c5c7
9fb1125
4e5727e
94bc647
5bef798
d9e01b6
28e9163
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,136 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the "Elastic License | ||
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side | ||
* Public License v 1"; you may not use this file except in compliance with, at | ||
* your election, the "Elastic License 2.0", the "GNU Affero General Public | ||
* License v3.0 only", or the "Server Side Public License, v 1". | ||
*/ | ||
|
||
package org.elasticsearch.index.fieldvisitor; | ||
|
||
import org.apache.lucene.index.FieldInfo; | ||
import org.apache.lucene.index.LeafReaderContext; | ||
import org.apache.lucene.index.StoredFieldVisitor; | ||
import org.apache.lucene.util.BytesRef; | ||
import org.elasticsearch.common.bytes.BytesReference; | ||
import org.elasticsearch.index.mapper.BlockLoader; | ||
import org.elasticsearch.index.mapper.IgnoredSourceFieldMapper; | ||
|
||
import java.io.IOException; | ||
import java.util.ArrayList; | ||
import java.util.HashMap; | ||
import java.util.HashSet; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Set; | ||
|
||
class IgnoredSourceFieldLoader extends StoredFieldLoader { | ||
private final boolean forceSequentialReader; | ||
private final Map<String, Set<String>> potentialFieldsInIgnoreSource; | ||
private final Set<String> fieldNames; | ||
|
||
IgnoredSourceFieldLoader(BlockLoader.FieldsSpec spec, boolean forceSequentialReader) { | ||
fieldNames = new HashSet<>(spec.ignoredFieldsSpec().requiredIgnoredFields()); | ||
this.forceSequentialReader = forceSequentialReader; | ||
|
||
HashMap<String, Set<String>> potentialFieldsInIgnoreSource = new HashMap<>(); | ||
for (String requiredIgnoredField : spec.ignoredFieldsSpec().requiredIgnoredFields()) { | ||
for (String potentialStoredField : spec.ignoredFieldsSpec().format().requiredStoredFields(requiredIgnoredField)) { | ||
potentialFieldsInIgnoreSource.computeIfAbsent(potentialStoredField, k -> new HashSet<>()).add(requiredIgnoredField); | ||
} | ||
} | ||
this.potentialFieldsInIgnoreSource = potentialFieldsInIgnoreSource; | ||
} | ||
|
||
@Override | ||
public LeafStoredFieldLoader getLoader(LeafReaderContext ctx, int[] docs) throws IOException { | ||
var reader = forceSequentialReader ? sequentialReader(ctx) : reader(ctx, docs); | ||
var visitor = new SFV(fieldNames, potentialFieldsInIgnoreSource); | ||
return new LeafStoredFieldLoader() { | ||
|
||
private int doc = -1; | ||
|
||
@Override | ||
public void advanceTo(int doc) throws IOException { | ||
if (doc != this.doc) { | ||
visitor.reset(); | ||
reader.accept(doc, visitor); | ||
this.doc = doc; | ||
} | ||
} | ||
|
||
@Override | ||
public BytesReference source() { | ||
assert false : "source() is not supported by IgnoredSourceFieldLoader"; | ||
return null; | ||
} | ||
|
||
@Override | ||
public String id() { | ||
assert false : "id() is not supported by IgnoredSourceFieldLoader"; | ||
return null; | ||
} | ||
|
||
@Override | ||
public String routing() { | ||
assert false : "routing() is not supported by IgnoredSourceFieldLoader"; | ||
return null; | ||
} | ||
|
||
@Override | ||
public Map<String, List<Object>> storedFields() { | ||
return visitor.values; | ||
} | ||
}; | ||
} | ||
|
||
@Override | ||
public List<String> fieldsToLoad() { | ||
return potentialFieldsInIgnoreSource.keySet().stream().toList(); | ||
} | ||
|
||
static class SFV extends StoredFieldVisitor { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In the future we can perhaps explore an implementation that is tuned for just fetching one stored field. |
||
final Map<String, List<Object>> values = new HashMap<>(); | ||
final Set<String> fieldNames; | ||
private final Set<String> unvisitedFields; | ||
final Map<String, Set<String>> potentialFieldsInIgnoreSource; | ||
|
||
SFV(Set<String> fieldNames, Map<String, Set<String>> potentialFieldsInIgnoreSource) { | ||
this.fieldNames = fieldNames; | ||
this.unvisitedFields = new HashSet<>(fieldNames); | ||
this.potentialFieldsInIgnoreSource = potentialFieldsInIgnoreSource; | ||
} | ||
|
||
@Override | ||
public Status needsField(FieldInfo fieldInfo) throws IOException { | ||
if (unvisitedFields.isEmpty()) { | ||
return Status.STOP; | ||
} | ||
|
||
Set<String> foundFields = potentialFieldsInIgnoreSource.get(fieldInfo.name); | ||
if (foundFields == null) { | ||
return Status.NO; | ||
} | ||
|
||
unvisitedFields.removeAll(foundFields); | ||
return Status.YES; | ||
} | ||
|
||
@Override | ||
public void binaryField(FieldInfo fieldInfo, byte[] value) { | ||
values.computeIfAbsent(fieldInfo.name, k -> new ArrayList<>()).add(new BytesRef(value)); | ||
} | ||
|
||
void reset() { | ||
values.clear(); | ||
unvisitedFields.addAll(fieldNames); | ||
} | ||
|
||
} | ||
|
||
static boolean supports(BlockLoader.FieldsSpec spec) { | ||
return spec.storedFieldsSpec().noRequirements() | ||
&& spec.ignoredFieldsSpec().format() == IgnoredSourceFieldMapper.IgnoredSourceFormat.PER_FIELD_IGNORED_SOURCE; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,10 +12,12 @@ | |
import org.apache.lucene.index.FieldInfo; | ||
import org.apache.lucene.index.LeafReader; | ||
import org.apache.lucene.index.LeafReaderContext; | ||
import org.apache.lucene.index.StoredFieldVisitor; | ||
import org.apache.lucene.index.StoredFields; | ||
import org.elasticsearch.common.CheckedBiConsumer; | ||
import org.elasticsearch.common.bytes.BytesReference; | ||
import org.elasticsearch.common.lucene.index.SequentialStoredFieldsLeafReader; | ||
import org.elasticsearch.index.mapper.BlockLoader; | ||
import org.elasticsearch.index.mapper.IgnoredSourceFieldMapper; | ||
import org.elasticsearch.search.fetch.StoredFieldsSpec; | ||
|
||
|
@@ -55,6 +57,26 @@ public static StoredFieldLoader fromSpec(StoredFieldsSpec spec) { | |
return create(spec.requiresSource(), spec.requiredStoredFields()); | ||
} | ||
|
||
/** | ||
* Crates a new StoredFieldLaoader using a BlockLoader.FieldsSpec | ||
*/ | ||
public static StoredFieldLoader fromSpec(BlockLoader.FieldsSpec spec, boolean forceSequentialReader) { | ||
|
||
if (spec.noRequirements()) { | ||
return StoredFieldLoader.empty(); | ||
} | ||
|
||
if (IgnoredSourceFieldLoader.supports(spec)) { | ||
return new IgnoredSourceFieldLoader(spec, forceSequentialReader); | ||
} | ||
|
||
StoredFieldsSpec mergedSpec = spec.storedFieldsSpec().merge(spec.ignoredFieldsSpec().requiredStoredFields()); | ||
if (forceSequentialReader) { | ||
return fromSpecSequential(mergedSpec); | ||
} else { | ||
return fromSpec(mergedSpec); | ||
} | ||
} | ||
|
||
public static StoredFieldLoader create(boolean loadSource, Set<String> fields) { | ||
return create(loadSource, fields, false); | ||
} | ||
|
@@ -141,7 +163,8 @@ public List<String> fieldsToLoad() { | |
}; | ||
} | ||
|
||
private static CheckedBiConsumer<Integer, FieldsVisitor, IOException> reader(LeafReaderContext ctx, int[] docs) throws IOException { | ||
protected static CheckedBiConsumer<Integer, StoredFieldVisitor, IOException> reader(LeafReaderContext ctx, int[] docs) | ||
throws IOException { | ||
LeafReader leafReader = ctx.reader(); | ||
if (docs != null && docs.length > 10 && hasSequentialDocs(docs)) { | ||
return sequentialReader(ctx); | ||
|
@@ -150,7 +173,8 @@ private static CheckedBiConsumer<Integer, FieldsVisitor, IOException> reader(Lea | |
return storedFields::document; | ||
} | ||
|
||
private static CheckedBiConsumer<Integer, FieldsVisitor, IOException> sequentialReader(LeafReaderContext ctx) throws IOException { | ||
protected static CheckedBiConsumer<Integer, StoredFieldVisitor, IOException> sequentialReader(LeafReaderContext ctx) | ||
throws IOException { | ||
LeafReader leafReader = ctx.reader(); | ||
if (leafReader instanceof SequentialStoredFieldsLeafReader lf) { | ||
return lf.getSequentialStoredFieldsReader()::document; | ||
|
@@ -201,7 +225,7 @@ public Map<String, List<Object>> storedFields() { | |
|
||
private static class ReaderStoredFieldLoader implements LeafStoredFieldLoader { | ||
|
||
private final CheckedBiConsumer<Integer, FieldsVisitor, IOException> reader; | ||
private final CheckedBiConsumer<Integer, StoredFieldVisitor, IOException> reader; | ||
private final CustomFieldsVisitor visitor; | ||
private int doc = -1; | ||
|
||
|
@@ -221,7 +245,11 @@ public Status needsField(FieldInfo fieldInfo) { | |
return new CustomFieldsVisitor(fields, loadSource); | ||
} | ||
|
||
ReaderStoredFieldLoader(CheckedBiConsumer<Integer, FieldsVisitor, IOException> reader, boolean loadSource, Set<String> fields) { | ||
ReaderStoredFieldLoader( | ||
CheckedBiConsumer<Integer, StoredFieldVisitor, IOException> reader, | ||
boolean loadSource, | ||
Set<String> fields | ||
) { | ||
this.reader = reader; | ||
this.visitor = getFieldsVisitor(fields, loadSource); | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -98,7 +98,26 @@ interface StoredFields { | |
|
||
RowStrideReader rowStrideReader(LeafReaderContext context) throws IOException; | ||
|
||
StoredFieldsSpec rowStrideStoredFieldSpec(); | ||
record FieldsSpec(StoredFieldsSpec storedFieldsSpec, IgnoredFieldsSpec ignoredFieldsSpec) { | ||
|
||
public static FieldsSpec NO_REQUIREMENTS = new FieldsSpec(StoredFieldsSpec.NO_REQUIREMENTS, IgnoredFieldsSpec.NONE); | ||
|
||
public FieldsSpec merge(FieldsSpec other) { | ||
return new FieldsSpec( | ||
this.storedFieldsSpec.merge(other.storedFieldsSpec), | ||
this.ignoredFieldsSpec.merge(other.ignoredFieldsSpec) | ||
); | ||
} | ||
|
||
public FieldsSpec merge(StoredFieldsSpec other) { | ||
return new FieldsSpec(this.storedFieldsSpec.merge(other), this.ignoredFieldsSpec); | ||
} | ||
|
||
public boolean noRequirements() { | ||
return storedFieldsSpec.noRequirements() && ignoredFieldsSpec.requiredIgnoredFields().isEmpty(); | ||
} | ||
} | ||
|
||
FieldsSpec rowStrideFieldSpec(); | ||
|
||
/** | ||
* Does this loader support loading bytes via calling {@link #ordinals}. | ||
|
@@ -140,8 +159,8 @@ public RowStrideReader rowStrideReader(LeafReaderContext context) { | |
} | ||
|
||
@Override | ||
public StoredFieldsSpec rowStrideStoredFieldSpec() { | ||
return StoredFieldsSpec.NO_REQUIREMENTS; | ||
public FieldsSpec rowStrideFieldSpec() { | ||
return FieldsSpec.NO_REQUIREMENTS; | ||
} | ||
|
||
@Override | ||
|
@@ -237,8 +256,8 @@ public String toString() { | |
} | ||
|
||
@Override | ||
public StoredFieldsSpec rowStrideStoredFieldSpec() { | ||
return StoredFieldsSpec.NO_REQUIREMENTS; | ||
public FieldsSpec rowStrideFieldSpec() { | ||
return FieldsSpec.NO_REQUIREMENTS; | ||
} | ||
|
||
@Override | ||
|
@@ -319,8 +338,8 @@ public String toString() { | |
} | ||
|
||
@Override | ||
public StoredFieldsSpec rowStrideStoredFieldSpec() { | ||
return delegate.rowStrideStoredFieldSpec(); | ||
public FieldsSpec rowStrideFieldSpec() { | ||
return delegate.rowStrideFieldSpec(); | ||
} | ||
|
||
@Override | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I hoping that we can optimize/simplify in a follow up change. This logic is now required for fields with dots and then we don't know which field part there is actually a stored field for.
Ideally we would know based from the mapping what stored field we need. For example if
attributes.host.ip
is requested and noattributes
field is mapped, then the stored field should be_ignored_source.attributes
. If attributes is mapped, but host isn't then the required stored field to load would be_ignored_source.attributes.host
.I also think
requiredStoredFields(...)
also always include the_doc
field (viaFallbackSyntheticSourceBlockLoader.splitIntoFieldPaths(...)
)?Would be great if we need less maps and no sets in
SFV
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, this can probably be optimized by using the mapping to figure out which stored field contains the value instead of just looking at all possible parent stored fields. But let's leave that as a follow-up.