Skip to content

Commit 6ab73a1

Browse files
Optimized field visitor for ES|QL loading from _ignored_source (#132428)
This PR builds on the work in #132142 to optimize loading values from _ignored_source by stopping the FieldVisitor early, once all required fields have been visited. Relates to #130886.
1 parent 36a00d1 commit 6ab73a1

File tree

8 files changed

+616
-39
lines changed

8 files changed

+616
-39
lines changed
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.index.fieldvisitor;
11+
12+
import org.apache.lucene.index.FieldInfo;
13+
import org.apache.lucene.index.LeafReaderContext;
14+
import org.apache.lucene.index.StoredFieldVisitor;
15+
import org.apache.lucene.util.BytesRef;
16+
import org.elasticsearch.common.bytes.BytesReference;
17+
import org.elasticsearch.index.mapper.IgnoredSourceFieldMapper;
18+
import org.elasticsearch.search.fetch.StoredFieldsSpec;
19+
20+
import java.io.IOException;
21+
import java.util.ArrayList;
22+
import java.util.HashMap;
23+
import java.util.HashSet;
24+
import java.util.List;
25+
import java.util.Map;
26+
import java.util.Set;
27+
28+
class IgnoredSourceFieldLoader extends StoredFieldLoader {
29+
private final boolean forceSequentialReader;
30+
private final Map<String, Set<String>> potentialFieldsInIgnoreSource;
31+
private final Set<String> fieldNames;
32+
33+
IgnoredSourceFieldLoader(StoredFieldsSpec spec, boolean forceSequentialReader) {
34+
assert IgnoredSourceFieldLoader.supports(spec);
35+
36+
fieldNames = new HashSet<>(spec.ignoredFieldsSpec().requiredIgnoredFields());
37+
this.forceSequentialReader = forceSequentialReader;
38+
39+
HashMap<String, Set<String>> potentialFieldsInIgnoreSource = new HashMap<>();
40+
for (String requiredIgnoredField : spec.ignoredFieldsSpec().requiredIgnoredFields()) {
41+
for (String potentialStoredField : spec.ignoredFieldsSpec().format().requiredStoredFields(requiredIgnoredField)) {
42+
potentialFieldsInIgnoreSource.computeIfAbsent(potentialStoredField, k -> new HashSet<>()).add(requiredIgnoredField);
43+
}
44+
}
45+
this.potentialFieldsInIgnoreSource = potentialFieldsInIgnoreSource;
46+
}
47+
48+
@Override
49+
public LeafStoredFieldLoader getLoader(LeafReaderContext ctx, int[] docs) throws IOException {
50+
var reader = forceSequentialReader ? sequentialReader(ctx) : reader(ctx, docs);
51+
var visitor = new SFV(fieldNames, potentialFieldsInIgnoreSource);
52+
return new LeafStoredFieldLoader() {
53+
54+
private int doc = -1;
55+
56+
@Override
57+
public void advanceTo(int doc) throws IOException {
58+
if (doc != this.doc) {
59+
visitor.reset();
60+
reader.accept(doc, visitor);
61+
this.doc = doc;
62+
}
63+
}
64+
65+
@Override
66+
public BytesReference source() {
67+
assert false : "source() is not supported by IgnoredSourceFieldLoader";
68+
return null;
69+
}
70+
71+
@Override
72+
public String id() {
73+
assert false : "id() is not supported by IgnoredSourceFieldLoader";
74+
return null;
75+
}
76+
77+
@Override
78+
public String routing() {
79+
assert false : "routing() is not supported by IgnoredSourceFieldLoader";
80+
return null;
81+
}
82+
83+
@Override
84+
public Map<String, List<Object>> storedFields() {
85+
return visitor.values;
86+
}
87+
};
88+
}
89+
90+
@Override
91+
public List<String> fieldsToLoad() {
92+
return potentialFieldsInIgnoreSource.keySet().stream().toList();
93+
}
94+
95+
static class SFV extends StoredFieldVisitor {
96+
final Map<String, List<Object>> values = new HashMap<>();
97+
final Set<String> fieldNames;
98+
private final Set<String> unvisitedFields;
99+
final Map<String, Set<String>> potentialFieldsInIgnoreSource;
100+
101+
SFV(Set<String> fieldNames, Map<String, Set<String>> potentialFieldsInIgnoreSource) {
102+
this.fieldNames = fieldNames;
103+
this.unvisitedFields = new HashSet<>(fieldNames);
104+
this.potentialFieldsInIgnoreSource = potentialFieldsInIgnoreSource;
105+
}
106+
107+
@Override
108+
public Status needsField(FieldInfo fieldInfo) throws IOException {
109+
if (unvisitedFields.isEmpty()) {
110+
return Status.STOP;
111+
}
112+
113+
Set<String> foundFields = potentialFieldsInIgnoreSource.get(fieldInfo.name);
114+
if (foundFields == null) {
115+
return Status.NO;
116+
}
117+
118+
unvisitedFields.removeAll(foundFields);
119+
return Status.YES;
120+
}
121+
122+
@Override
123+
public void binaryField(FieldInfo fieldInfo, byte[] value) {
124+
values.computeIfAbsent(fieldInfo.name, k -> new ArrayList<>()).add(new BytesRef(value));
125+
}
126+
127+
void reset() {
128+
values.clear();
129+
unvisitedFields.addAll(fieldNames);
130+
}
131+
132+
}
133+
134+
static boolean supports(StoredFieldsSpec spec) {
135+
return spec.onlyRequiresIgnoredFields()
136+
&& spec.ignoredFieldsSpec().format() == IgnoredSourceFieldMapper.IgnoredSourceFormat.PER_FIELD_IGNORED_SOURCE;
137+
}
138+
}

server/src/main/java/org/elasticsearch/index/fieldvisitor/StoredFieldLoader.java

Lines changed: 28 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.apache.lucene.index.FieldInfo;
1313
import org.apache.lucene.index.LeafReader;
1414
import org.apache.lucene.index.LeafReaderContext;
15+
import org.apache.lucene.index.StoredFieldVisitor;
1516
import org.apache.lucene.index.StoredFields;
1617
import org.elasticsearch.common.CheckedBiConsumer;
1718
import org.elasticsearch.common.bytes.BytesReference;
@@ -49,10 +50,26 @@ public abstract class StoredFieldLoader {
4950
* Creates a new StoredFieldLoader using a StoredFieldsSpec
5051
*/
5152
public static StoredFieldLoader fromSpec(StoredFieldsSpec spec) {
53+
return fromSpec(spec, false);
54+
}
55+
56+
/**
57+
* Creates a new StoredFieldLoader using a StoredFieldsSpec that is optimized
58+
* for loading documents in order.
59+
*/
60+
public static StoredFieldLoader fromSpecSequential(StoredFieldsSpec spec) {
61+
return fromSpec(spec, true);
62+
}
63+
64+
private static StoredFieldLoader fromSpec(StoredFieldsSpec spec, boolean forceSequentialReader) {
5265
if (spec.noRequirements()) {
5366
return StoredFieldLoader.empty();
5467
}
55-
return create(spec.requiresSource(), spec.requiredStoredFields());
68+
69+
if (IgnoredSourceFieldLoader.supports(spec)) {
70+
return new IgnoredSourceFieldLoader(spec, forceSequentialReader);
71+
}
72+
return create(spec.requiresSource(), spec.requiredStoredFields(), forceSequentialReader);
5673
}
5774

5875
public static StoredFieldLoader create(boolean loadSource, Set<String> fields) {
@@ -85,28 +102,6 @@ public List<String> fieldsToLoad() {
85102
};
86103
}
87104

88-
/**
89-
* Creates a new StoredFieldLoader using a StoredFieldsSpec that is optimized
90-
* for loading documents in order.
91-
*/
92-
public static StoredFieldLoader fromSpecSequential(StoredFieldsSpec spec) {
93-
if (spec.noRequirements()) {
94-
return StoredFieldLoader.empty();
95-
}
96-
List<String> fieldsToLoad = fieldsToLoad(spec.requiresSource(), spec.requiredStoredFields());
97-
return new StoredFieldLoader() {
98-
@Override
99-
public LeafStoredFieldLoader getLoader(LeafReaderContext ctx, int[] docs) throws IOException {
100-
return new ReaderStoredFieldLoader(sequentialReader(ctx), spec.requiresSource(), spec.requiredStoredFields());
101-
}
102-
103-
@Override
104-
public List<String> fieldsToLoad() {
105-
return fieldsToLoad;
106-
}
107-
};
108-
}
109-
110105
/**
111106
* Creates a StoredFieldLoader tuned for sequential reads of _source
112107
*/
@@ -141,7 +136,8 @@ public List<String> fieldsToLoad() {
141136
};
142137
}
143138

144-
private static CheckedBiConsumer<Integer, FieldsVisitor, IOException> reader(LeafReaderContext ctx, int[] docs) throws IOException {
139+
protected static CheckedBiConsumer<Integer, StoredFieldVisitor, IOException> reader(LeafReaderContext ctx, int[] docs)
140+
throws IOException {
145141
LeafReader leafReader = ctx.reader();
146142
if (docs != null && docs.length > 10 && hasSequentialDocs(docs)) {
147143
return sequentialReader(ctx);
@@ -150,7 +146,8 @@ private static CheckedBiConsumer<Integer, FieldsVisitor, IOException> reader(Lea
150146
return storedFields::document;
151147
}
152148

153-
private static CheckedBiConsumer<Integer, FieldsVisitor, IOException> sequentialReader(LeafReaderContext ctx) throws IOException {
149+
protected static CheckedBiConsumer<Integer, StoredFieldVisitor, IOException> sequentialReader(LeafReaderContext ctx)
150+
throws IOException {
154151
LeafReader leafReader = ctx.reader();
155152
if (leafReader instanceof SequentialStoredFieldsLeafReader lf) {
156153
return lf.getSequentialStoredFieldsReader()::document;
@@ -201,7 +198,7 @@ public Map<String, List<Object>> storedFields() {
201198

202199
private static class ReaderStoredFieldLoader implements LeafStoredFieldLoader {
203200

204-
private final CheckedBiConsumer<Integer, FieldsVisitor, IOException> reader;
201+
private final CheckedBiConsumer<Integer, StoredFieldVisitor, IOException> reader;
205202
private final CustomFieldsVisitor visitor;
206203
private int doc = -1;
207204

@@ -221,7 +218,11 @@ public Status needsField(FieldInfo fieldInfo) {
221218
return new CustomFieldsVisitor(fields, loadSource);
222219
}
223220

224-
ReaderStoredFieldLoader(CheckedBiConsumer<Integer, FieldsVisitor, IOException> reader, boolean loadSource, Set<String> fields) {
221+
ReaderStoredFieldLoader(
222+
CheckedBiConsumer<Integer, StoredFieldVisitor, IOException> reader,
223+
boolean loadSource,
224+
Set<String> fields
225+
) {
225226
this.reader = reader;
226227
this.visitor = getFieldsVisitor(fields, loadSource);
227228
}

server/src/main/java/org/elasticsearch/index/mapper/FallbackSyntheticSourceBlockLoader.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import java.util.Optional;
2424
import java.util.Set;
2525
import java.util.Stack;
26-
import java.util.stream.Collectors;
2726

2827
/**
2928
* Block loader for fields that use fallback synthetic source implementation.
@@ -66,14 +65,7 @@ public RowStrideReader rowStrideReader(LeafReaderContext context) throws IOExcep
6665

6766
@Override
6867
public StoredFieldsSpec rowStrideStoredFieldSpec() {
69-
Set<String> ignoredFieldNames;
70-
if (ignoredSourceFormat == IgnoredSourceFieldMapper.IgnoredSourceFormat.PER_FIELD_IGNORED_SOURCE) {
71-
ignoredFieldNames = fieldPaths.stream().map(IgnoredSourceFieldMapper::ignoredFieldName).collect(Collectors.toSet());
72-
} else {
73-
ignoredFieldNames = Set.of(IgnoredSourceFieldMapper.NAME);
74-
}
75-
76-
return new StoredFieldsSpec(false, false, ignoredFieldNames);
68+
return new StoredFieldsSpec(false, false, Set.of(), new IgnoredFieldsSpec(Set.of(fieldName), ignoredSourceFormat));
7769
}
7870

7971
@Override
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.index.mapper;
11+
12+
import org.elasticsearch.ElasticsearchException;
13+
14+
import java.util.HashSet;
15+
import java.util.Set;
16+
import java.util.stream.Collectors;
17+
18+
/**
19+
* Defines which fields need to be loaded from _ignored_source during a fetch.
20+
*/
21+
public record IgnoredFieldsSpec(Set<String> requiredIgnoredFields, IgnoredSourceFieldMapper.IgnoredSourceFormat format) {
22+
public static IgnoredFieldsSpec NONE = new IgnoredFieldsSpec(Set.of(), IgnoredSourceFieldMapper.IgnoredSourceFormat.NO_IGNORED_SOURCE);
23+
24+
public boolean noRequirements() {
25+
return requiredIgnoredFields.isEmpty();
26+
}
27+
28+
public IgnoredFieldsSpec merge(IgnoredFieldsSpec other) {
29+
if (this.format == IgnoredSourceFieldMapper.IgnoredSourceFormat.NO_IGNORED_SOURCE) {
30+
return other;
31+
}
32+
if (other.format == IgnoredSourceFieldMapper.IgnoredSourceFormat.NO_IGNORED_SOURCE) {
33+
return this;
34+
}
35+
if (other.requiredIgnoredFields.isEmpty()) {
36+
return this;
37+
}
38+
if (this.requiredIgnoredFields.isEmpty()) {
39+
return other;
40+
}
41+
42+
if (this.format != other.format) {
43+
throw new ElasticsearchException(
44+
"failed to merge IgnoredFieldsSpec with differing formats " + this.format.name() + "," + other.format.name()
45+
);
46+
}
47+
48+
Set<String> mergedFields = new HashSet<>(requiredIgnoredFields);
49+
mergedFields.addAll(other.requiredIgnoredFields);
50+
return new IgnoredFieldsSpec(mergedFields, format);
51+
}
52+
53+
/**
54+
* Get the set of stored fields required to load the specified fields from _ignored_source.
55+
*/
56+
public Set<String> requiredStoredFields() {
57+
return requiredIgnoredFields.stream().flatMap(field -> format.requiredStoredFields(field).stream()).collect(Collectors.toSet());
58+
59+
}
60+
}

server/src/main/java/org/elasticsearch/index/mapper/IgnoredSourceFieldMapper.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import java.util.List;
4242
import java.util.Map;
4343
import java.util.Set;
44+
import java.util.stream.Collectors;
4445
import java.util.stream.Stream;
4546

4647
/**
@@ -277,6 +278,11 @@ public Map<String, List<IgnoredSourceFieldMapper.NameValue>> loadSingleIgnoredFi
277278
public void writeIgnoredFields(Collection<NameValue> ignoredFieldValues) {
278279
assert false : "cannot write " + ignoredFieldValues.size() + " values with format NO_IGNORED_SOURCE";
279280
}
281+
282+
@Override
283+
public Set<String> requiredStoredFields(String fieldName) {
284+
return Set.of();
285+
}
280286
},
281287
SINGLE_IGNORED_SOURCE {
282288
@Override
@@ -327,6 +333,11 @@ public void writeIgnoredFields(Collection<NameValue> ignoredFieldValues) {
327333
nameValue.doc().add(new StoredField(NAME, encode(nameValue)));
328334
}
329335
}
336+
337+
@Override
338+
public Set<String> requiredStoredFields(String fieldName) {
339+
return Set.of(IgnoredSourceFieldMapper.NAME);
340+
}
330341
},
331342
PER_FIELD_IGNORED_SOURCE {
332343
@Override
@@ -403,6 +414,14 @@ public void writeIgnoredFields(Collection<NameValue> ignoredFieldValues) {
403414
}
404415
}
405416
}
417+
418+
@Override
419+
public Set<String> requiredStoredFields(String fieldName) {
420+
return FallbackSyntheticSourceBlockLoader.splitIntoFieldPaths(fieldName)
421+
.stream()
422+
.map(IgnoredSourceFieldMapper::ignoredFieldName)
423+
.collect(Collectors.toSet());
424+
}
406425
};
407426

408427
public abstract Map<String, List<IgnoredSourceFieldMapper.NameValue>> loadAllIgnoredFields(
@@ -416,6 +435,11 @@ public abstract Map<String, List<IgnoredSourceFieldMapper.NameValue>> loadSingle
416435
);
417436

418437
public abstract void writeIgnoredFields(Collection<NameValue> ignoredFieldValues);
438+
439+
/**
440+
* Get the set of stored fields needed to retrieve the value for fieldName
441+
*/
442+
public abstract Set<String> requiredStoredFields(String fieldName);
419443
}
420444

421445
public IgnoredSourceFormat ignoredSourceFormat() {

0 commit comments

Comments
 (0)