Skip to content

Commit 223d852

Browse files
committed
Add a way to replace the value for a specific field in the original source with a patch id
1 parent be076e6 commit 223d852

File tree

10 files changed

+990
-22
lines changed

10 files changed

+990
-22
lines changed

server/src/main/java/org/elasticsearch/index/mapper/DocumentParserContext.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@
1717
import org.elasticsearch.index.IndexSettings;
1818
import org.elasticsearch.index.analysis.IndexAnalyzers;
1919
import org.elasticsearch.index.mapper.MapperService.MergeReason;
20+
import org.elasticsearch.search.builder.SearchSourceBuilder;
2021
import org.elasticsearch.xcontent.FilterXContentParserWrapper;
2122
import org.elasticsearch.xcontent.FlatteningXContentParser;
23+
import org.elasticsearch.xcontent.XContentLocation;
2224
import org.elasticsearch.xcontent.XContentParser;
2325

2426
import java.io.IOException;
@@ -36,6 +38,8 @@
3638
* the lucene data structures and mappings to be dynamically created as the outcome of parsing a document.
3739
*/
3840
public abstract class DocumentParserContext {
41+
public record XContentPatch(String fullPath, XContentLocation location, int id) {}
42+
3943
/**
4044
* Wraps a given context while allowing to override some of its behaviour by re-implementing some of the non final methods
4145
*/
@@ -124,6 +128,7 @@ public int get() {
124128
* in this document and therefore is not present in mapping yet.
125129
*/
126130
private final Set<String> copyToFields;
131+
private final Map<XContentLocation, XContentPatch> sourcePatches;
127132

128133
// Indicates if the source for this context has been cloned and gets parsed multiple times.
129134
private boolean clonedSource;
@@ -146,6 +151,7 @@ private DocumentParserContext(
146151
Set<String> fieldsAppliedFromTemplates,
147152
Set<String> copyToFields,
148153
DynamicMapperSize dynamicMapperSize,
154+
Map<XContentLocation, XContentPatch> sourcePatches,
149155
boolean clonedSource
150156
) {
151157
this.mappingLookup = mappingLookup;
@@ -165,6 +171,7 @@ private DocumentParserContext(
165171
this.fieldsAppliedFromTemplates = fieldsAppliedFromTemplates;
166172
this.copyToFields = copyToFields;
167173
this.dynamicMappersSize = dynamicMapperSize;
174+
this.sourcePatches = sourcePatches;
168175
this.clonedSource = clonedSource;
169176
}
170177

@@ -187,6 +194,7 @@ private DocumentParserContext(ObjectMapper parent, ObjectMapper.Dynamic dynamic,
187194
in.fieldsAppliedFromTemplates,
188195
in.copyToFields,
189196
in.dynamicMappersSize,
197+
in.sourcePatches,
190198
in.clonedSource
191199
);
192200
}
@@ -216,6 +224,7 @@ protected DocumentParserContext(
216224
new HashSet<>(),
217225
new HashSet<>(mappingLookup.fieldTypesLookup().getCopyToDestinationFields()),
218226
new DynamicMapperSize(),
227+
new HashMap<>(),
219228
false
220229
);
221230
}
@@ -300,6 +309,36 @@ public final void addToFieldNames(String field) {
300309
}
301310
}
302311

312+
/**
313+
* Registers a patch for a specific field at a given location in the original source.
314+
* The patch modifies the field value in the original `_source` by replacing it with a numerical ID,
315+
* which is then stored in a document field accessible via {@link SearchSourceBuilder#equals(Object)}.
316+
*
317+
* The original field value must still be retrievable when the original `_source` is requested.
318+
* This responsibility lies with the {@link FieldMapper}, ensuring that the patched field can provide
319+
* the original value during retrieval.
320+
*
321+
* Note: Only one patch per field is permitted for each document.
322+
*
323+
* @param fieldMapper The {@link FieldMapper} responsible for applying the patch to the field.
324+
* @param xContentLocation The {@link XContentLocation} representing the location of the field in the original source.
325+
*/
326+
public void addSourceFieldPatch(FieldMapper fieldMapper, XContentLocation xContentLocation) {
327+
var sourceFieldMapper = (SourceFieldMapper) getMetadataMapper(SourceFieldMapper.NAME);
328+
if (sourceFieldMapper == null) {
329+
return;
330+
}
331+
sourceFieldMapper.indexFieldPatch(doc(), fieldMapper, xContentLocation, getSourcePatches());
332+
}
333+
334+
/**
335+
* Returns all {@link XContentPatch} currently registered in this context,
336+
* mapped by their {@link XContentLocation} in the original `_source`.
337+
*/
338+
public Map<XContentLocation, XContentPatch> getSourcePatches() {
339+
return sourcePatches;
340+
}
341+
303342
public final Field version() {
304343
return this.version;
305344
}

server/src/main/java/org/elasticsearch/index/mapper/Mapper.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.elasticsearch.index.IndexVersions;
1919
import org.elasticsearch.xcontent.ToXContentFragment;
2020
import org.elasticsearch.xcontent.XContentBuilder;
21+
import org.elasticsearch.xcontent.XContentLocation;
2122

2223
import java.io.IOException;
2324
import java.util.Arrays;
@@ -175,6 +176,16 @@ public SourceLoader.SyntheticFieldLoader syntheticFieldLoader() {
175176
throw new IllegalArgumentException("field [" + fullPath() + "] of type [" + typeName() + "] doesn't support synthetic source");
176177
}
177178

179+
/**
180+
* Creates a {@link SourceLoader.PatchFieldLoader} to load patches that were previously indexed
181+
* with {@link DocumentParserContext#addSourceFieldPatch(FieldMapper, XContentLocation)}.
182+
*
183+
* Returns {@code null} if the field doesn't allow patching.
184+
*/
185+
protected SourceLoader.PatchFieldLoader patchFieldLoader() {
186+
return null;
187+
}
188+
178189
@Override
179190
public String toString() {
180191
return Strings.toString(this);

server/src/main/java/org/elasticsearch/index/mapper/Mapping.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,10 @@ public SourceLoader.SyntheticFieldLoader syntheticFieldLoader() {
131131
return root.syntheticFieldLoader(stream);
132132
}
133133

134+
protected SourceLoader.PatchFieldLoader patchFieldLoader() {
135+
return root.patchFieldLoader();
136+
}
137+
134138
/**
135139
* Merges a new mapping into the existing one.
136140
*

server/src/main/java/org/elasticsearch/index/mapper/NestedObjectMapper.java

Lines changed: 44 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
package org.elasticsearch.index.mapper;
1111

1212
import org.apache.lucene.index.LeafReader;
13+
import org.apache.lucene.search.CheckedIntConsumer;
1314
import org.apache.lucene.search.DocIdSetIterator;
1415
import org.apache.lucene.search.IndexSearcher;
1516
import org.apache.lucene.search.Query;
@@ -376,6 +377,27 @@ protected MapperMergeContext createChildContext(MapperMergeContext mapperMergeCo
376377
);
377378
}
378379

380+
@Override
381+
protected SourceLoader.PatchFieldLoader patchFieldLoader() {
382+
var patchLoader = super.patchFieldLoader();
383+
if (patchLoader == null) {
384+
return null;
385+
}
386+
return context -> {
387+
var patchFieldLoader = patchLoader.leaf(context);
388+
if (patchFieldLoader == null) {
389+
return null;
390+
}
391+
IndexSearcher searcher = new IndexSearcher(context.reader());
392+
searcher.setQueryCache(null);
393+
var childScorer = searcher.createWeight(nestedTypeFilter, ScoreMode.COMPLETE_NO_SCORES, 1f).scorer(context);
394+
var parentsDocs = bitsetProducer.apply(parentTypeFilter).getBitSet(context);
395+
return (doc, acc) -> {
396+
collectChildren(nestedTypePath, doc, parentsDocs, childScorer.iterator(), child -> patchFieldLoader.load(child, acc));
397+
};
398+
};
399+
}
400+
379401
@Override
380402
public SourceLoader.SyntheticFieldLoader syntheticFieldLoader() {
381403
if (storeArraySource()) {
@@ -427,29 +449,15 @@ public DocValuesLoader docValuesLoader(LeafReader leafReader, int[] docIdsInLeaf
427449
if (childScorer != null) {
428450
var parentDocs = parentBitSetProducer.get().getBitSet(leafReader.getContext());
429451
return parentDoc -> {
430-
collectChildren(parentDoc, parentDocs, childScorer.iterator());
452+
children.clear();
453+
collectChildren(nestedTypePath, parentDoc, parentDocs, childScorer.iterator(), child -> children.add(child));
431454
return children.size() > 0;
432455
};
433456
} else {
434457
return parentDoc -> false;
435458
}
436459
}
437460

438-
private List<Integer> collectChildren(int parentDoc, BitSet parentDocs, DocIdSetIterator childIt) throws IOException {
439-
assert parentDoc < 0 || parentDocs.get(parentDoc) : "wrong context, doc " + parentDoc + " is not a parent of " + nestedTypePath;
440-
final int prevParentDoc = parentDoc > 0 ? parentDocs.prevSetBit(parentDoc - 1) : -1;
441-
int childDocId = childIt.docID();
442-
if (childDocId <= prevParentDoc) {
443-
childDocId = childIt.advance(prevParentDoc + 1);
444-
}
445-
446-
children.clear();
447-
for (; childDocId < parentDoc; childDocId = childIt.nextDoc()) {
448-
children.add(childDocId);
449-
}
450-
return children;
451-
}
452-
453461
@Override
454462
public boolean hasValue() {
455463
return children.size() > 0;
@@ -477,4 +485,24 @@ public String fieldName() {
477485
return NestedObjectMapper.this.fullPath();
478486
}
479487
}
488+
489+
private static void collectChildren(
490+
String nestedTypePath,
491+
int parentDoc,
492+
BitSet parentDocs,
493+
DocIdSetIterator childIt,
494+
CheckedIntConsumer<IOException> childConsumer
495+
) throws IOException {
496+
assert parentDoc < 0 || parentDocs.get(parentDoc) : "wrong context, doc " + parentDoc + " is not a parent of " + nestedTypePath;
497+
final int prevParentDoc = parentDoc > 0 ? parentDocs.prevSetBit(parentDoc - 1) : -1;
498+
int childDocId = childIt.docID();
499+
if (childDocId <= prevParentDoc) {
500+
childDocId = childIt.advance(prevParentDoc + 1);
501+
}
502+
503+
for (; childDocId < parentDoc; childDocId = childIt.nextDoc()) {
504+
childConsumer.accept(childDocId);
505+
}
506+
}
507+
480508
}

server/src/main/java/org/elasticsearch/index/mapper/ObjectMapper.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import java.util.Objects;
3838
import java.util.Optional;
3939
import java.util.TreeMap;
40+
import java.util.stream.Collectors;
4041
import java.util.stream.Stream;
4142

4243
public class ObjectMapper extends Mapper {
@@ -808,6 +809,31 @@ protected void doXContent(XContentBuilder builder, Params params) throws IOExcep
808809

809810
}
810811

812+
@Override
813+
protected SourceLoader.PatchFieldLoader patchFieldLoader() {
814+
var loaders = mappers.values().stream().map(m -> m.patchFieldLoader()).filter(l -> l != null).collect(Collectors.toList());
815+
if (loaders.isEmpty()) {
816+
return null;
817+
}
818+
return context -> {
819+
final List<SourceLoader.PatchFieldLoader.Leaf> leaves = new ArrayList<>();
820+
for (var loader : loaders) {
821+
var leaf = loader.leaf(context);
822+
if (leaf != null) {
823+
leaves.add(leaf);
824+
}
825+
}
826+
if (leaves.isEmpty()) {
827+
return null;
828+
}
829+
return (doc, acc) -> {
830+
for (var leaf : leaves) {
831+
leaf.load(doc, acc);
832+
}
833+
};
834+
};
835+
}
836+
811837
protected SourceLoader.SyntheticFieldLoader syntheticFieldLoader(Stream<Mapper> mappers, boolean isFragment) {
812838
var fields = mappers.sorted(Comparator.comparing(Mapper::fullPath))
813839
.map(Mapper::syntheticFieldLoader)
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.index.mapper;
10+
11+
import org.elasticsearch.common.bytes.BytesReference;
12+
import org.elasticsearch.common.io.stream.BytesStreamOutput;
13+
import org.elasticsearch.common.xcontent.XContentHelper;
14+
import org.elasticsearch.xcontent.XContent;
15+
import org.elasticsearch.xcontent.XContentBuilder;
16+
import org.elasticsearch.xcontent.XContentGenerator;
17+
import org.elasticsearch.xcontent.XContentParser;
18+
import org.elasticsearch.xcontent.XContentParserConfiguration;
19+
20+
import java.io.IOException;
21+
import java.util.Set;
22+
23+
public class PatchSourceUtils {
24+
public interface CheckedTriConsumer<S, T, U> {
25+
void apply(S s, T t, U u) throws IOException;
26+
}
27+
28+
/**
29+
* Parses the given {@code source} and returns a new version with all values referenced by the
30+
* provided {@code patchFullPaths} replaced using the {@code patchApply} consumer.
31+
*/
32+
public static BytesReference patchSource(
33+
BytesReference source,
34+
XContent xContent,
35+
Set<String> patchFullPaths,
36+
CheckedTriConsumer<String, XContentParser, XContentGenerator> patchApply
37+
) throws IOException {
38+
BytesStreamOutput streamOutput = new BytesStreamOutput();
39+
XContentBuilder builder = new XContentBuilder(xContent, streamOutput);
40+
try (XContentParser parser = XContentHelper.createParserNotCompressed(XContentParserConfiguration.EMPTY, source, xContent.type())) {
41+
if ((parser.currentToken() == null) && (parser.nextToken() == null)) {
42+
return source;
43+
}
44+
parseAndPatchSource(builder.generator(), parser, "", patchFullPaths, patchApply);
45+
return BytesReference.bytes(builder);
46+
}
47+
}
48+
49+
private static void parseAndPatchSource(
50+
XContentGenerator destination,
51+
XContentParser parser,
52+
String fullPath,
53+
Set<String> patchFullPaths,
54+
CheckedTriConsumer<String, XContentParser, XContentGenerator> patchApply
55+
) throws IOException {
56+
XContentParser.Token token = parser.currentToken();
57+
if (token == XContentParser.Token.FIELD_NAME) {
58+
String fieldName = parser.currentName();
59+
destination.writeFieldName(fieldName);
60+
token = parser.nextToken();
61+
fullPath = fullPath + (fullPath.isEmpty() ? "" : ".") + fieldName;
62+
if (patchFullPaths.contains(fullPath)) {
63+
patchApply.apply(fullPath, parser, destination);
64+
return;
65+
}
66+
}
67+
68+
switch (token) {
69+
case START_ARRAY -> {
70+
destination.writeStartArray();
71+
while (parser.nextToken() != XContentParser.Token.END_ARRAY) {
72+
parseAndPatchSource(destination, parser, fullPath, patchFields, patchApply);
73+
}
74+
destination.writeEndArray();
75+
}
76+
case START_OBJECT -> {
77+
destination.writeStartObject();
78+
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
79+
parseAndPatchSource(destination, parser, fullPath, patchFields, patchApply);
80+
}
81+
destination.writeEndObject();
82+
}
83+
default -> // others are simple:
84+
destination.copyCurrentEvent(parser);
85+
}
86+
}
87+
}

0 commit comments

Comments
 (0)