Skip to content

Commit 4b1656d

Browse files
authored
Rebuild Inference Metadata Fields During Snapshot Recovery (#120045)
This PR introduces support for reconstructing inference metadata fields that are removed from `_source` by `SourceFieldMapper#applyFilters` during operations recovery. The inference metadata fields are retrieved using value fetchers and are re-added to `_source` under the `_inference_fields` metadata field.
1 parent 19965d5 commit 4b1656d

18 files changed

+498
-60
lines changed

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3161,7 +3161,7 @@ public Translog.Snapshot newChangesSnapshot(
31613161
final Translog.Snapshot snapshot;
31623162
if (engineConfig.getIndexSettings().isRecoverySourceSyntheticEnabled()) {
31633163
snapshot = new LuceneSyntheticSourceChangesSnapshot(
3164-
engineConfig.getMapperService().mappingLookup(),
3164+
engineConfig.getMapperService(),
31653165
searcher,
31663166
SearchBasedChangesSnapshot.DEFAULT_BATCH_SIZE,
31673167
maxChunkSize,
@@ -3173,6 +3173,7 @@ public Translog.Snapshot newChangesSnapshot(
31733173
);
31743174
} else {
31753175
snapshot = new LuceneChangesSnapshot(
3176+
engineConfig.getMapperService(),
31763177
searcher,
31773178
SearchBasedChangesSnapshot.DEFAULT_BATCH_SIZE,
31783179
fromSeqNo,

server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.elasticsearch.core.Assertions;
2020
import org.elasticsearch.index.IndexVersion;
2121
import org.elasticsearch.index.fieldvisitor.FieldsVisitor;
22+
import org.elasticsearch.index.mapper.MapperService;
2223
import org.elasticsearch.index.mapper.SourceFieldMapper;
2324
import org.elasticsearch.index.translog.Translog;
2425
import org.elasticsearch.transport.Transports;
@@ -46,6 +47,7 @@ public final class LuceneChangesSnapshot extends SearchBasedChangesSnapshot {
4647
/**
4748
* Creates a new "translog" snapshot from Lucene for reading operations whose seq# in the specified range.
4849
*
50+
* @param mapperService the mapper service for this index
4951
* @param engineSearcher the internal engine searcher which will be taken over if the snapshot is opened successfully
5052
* @param searchBatchSize the number of documents should be returned by each search
5153
* @param fromSeqNo the min requesting seq# - inclusive
@@ -56,6 +58,7 @@ public final class LuceneChangesSnapshot extends SearchBasedChangesSnapshot {
5658
* @param indexVersionCreated the version on which this index was created
5759
*/
5860
public LuceneChangesSnapshot(
61+
MapperService mapperService,
5962
Engine.Searcher engineSearcher,
6063
int searchBatchSize,
6164
long fromSeqNo,
@@ -65,7 +68,7 @@ public LuceneChangesSnapshot(
6568
boolean accessStats,
6669
IndexVersion indexVersionCreated
6770
) throws IOException {
68-
super(engineSearcher, searchBatchSize, fromSeqNo, toSeqNo, requiredFullRange, accessStats, indexVersionCreated);
71+
super(mapperService, engineSearcher, searchBatchSize, fromSeqNo, toSeqNo, requiredFullRange, accessStats, indexVersionCreated);
6972
this.creationThread = Assertions.ENABLED ? Thread.currentThread() : null;
7073
this.singleConsumer = singleConsumer;
7174
this.parallelArray = new ParallelArray(this.searchBatchSize);
@@ -214,20 +217,24 @@ private Translog.Operation readDocAsOp(int docIndex) throws IOException {
214217
if (leaf.reader() instanceof SequentialStoredFieldsLeafReader) {
215218
storedFieldsReader = ((SequentialStoredFieldsLeafReader) leaf.reader()).getSequentialStoredFieldsReader();
216219
storedFieldsReaderOrd = leaf.ord;
220+
setNextSourceMetadataReader(leaf);
217221
} else {
218222
storedFieldsReader = null;
219223
storedFieldsReaderOrd = -1;
220224
}
221225
}
222226
}
227+
223228
if (storedFieldsReader != null) {
224229
assert singleConsumer : "Sequential access optimization must not be enabled for multiple consumers";
225230
assert parallelArray.useSequentialStoredFieldsReader;
226231
assert storedFieldsReaderOrd == leaf.ord : storedFieldsReaderOrd + " != " + leaf.ord;
227232
storedFieldsReader.document(segmentDocID, fields);
228233
} else {
234+
setNextSourceMetadataReader(leaf);
229235
leaf.reader().storedFields().document(segmentDocID, fields);
230236
}
237+
final BytesReference source = fields.source() != null ? addSourceMetadata(fields.source(), segmentDocID) : null;
231238

232239
final Translog.Operation op;
233240
final boolean isTombstone = parallelArray.isTombStone[docIndex];
@@ -241,7 +248,6 @@ private Translog.Operation readDocAsOp(int docIndex) throws IOException {
241248
op = new Translog.Delete(id, seqNo, primaryTerm, version);
242249
assert assertDocSoftDeleted(leaf.reader(), segmentDocID) : "Delete op but soft_deletes field is not set [" + op + "]";
243250
} else {
244-
final BytesReference source = fields.source();
245251
if (source == null) {
246252
// TODO: Callers should ask for the range that source should be retained. Thus we should always
247253
// check for the existence source once we make peer-recovery to send ops after the local checkpoint.

server/src/main/java/org/elasticsearch/index/engine/LuceneSyntheticSourceChangesSnapshot.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,11 @@
1313
import org.apache.lucene.search.FieldDoc;
1414
import org.apache.lucene.search.ScoreDoc;
1515
import org.apache.lucene.util.ArrayUtil;
16-
import org.elasticsearch.common.bytes.BytesReference;
1716
import org.elasticsearch.index.IndexSettings;
1817
import org.elasticsearch.index.IndexVersion;
1918
import org.elasticsearch.index.fieldvisitor.LeafStoredFieldLoader;
2019
import org.elasticsearch.index.fieldvisitor.StoredFieldLoader;
21-
import org.elasticsearch.index.mapper.MappingLookup;
20+
import org.elasticsearch.index.mapper.MapperService;
2221
import org.elasticsearch.index.mapper.SourceFieldMetrics;
2322
import org.elasticsearch.index.mapper.SourceLoader;
2423
import org.elasticsearch.index.translog.Translog;
@@ -66,7 +65,7 @@ boolean hasRecoverySourceSize() {
6665
private final Deque<Translog.Operation> operationQueue = new LinkedList<>();
6766

6867
public LuceneSyntheticSourceChangesSnapshot(
69-
MappingLookup mappingLookup,
68+
MapperService mapperService,
7069
Engine.Searcher engineSearcher,
7170
int searchBatchSize,
7271
long maxMemorySizeInBytes,
@@ -76,13 +75,13 @@ public LuceneSyntheticSourceChangesSnapshot(
7675
boolean accessStats,
7776
IndexVersion indexVersionCreated
7877
) throws IOException {
79-
super(engineSearcher, searchBatchSize, fromSeqNo, toSeqNo, requiredFullRange, accessStats, indexVersionCreated);
78+
super(mapperService, engineSearcher, searchBatchSize, fromSeqNo, toSeqNo, requiredFullRange, accessStats, indexVersionCreated);
8079
// a MapperService#updateMapping(...) of empty index may not have been invoked and then mappingLookup is empty
81-
assert engineSearcher.getDirectoryReader().maxDoc() == 0 || mappingLookup.isSourceSynthetic()
80+
assert engineSearcher.getDirectoryReader().maxDoc() == 0 || mapperService.mappingLookup().isSourceSynthetic()
8281
: "either an empty index or synthetic source must be enabled for proper functionality.";
8382
// ensure we can buffer at least one document
8483
this.maxMemorySizeInBytes = maxMemorySizeInBytes > 0 ? maxMemorySizeInBytes : 1;
85-
this.sourceLoader = mappingLookup.newSourceLoader(null, SourceFieldMetrics.NOOP);
84+
this.sourceLoader = mapperService.mappingLookup().newSourceLoader(null, SourceFieldMetrics.NOOP);
8685
Set<String> storedFields = sourceLoader.requiredStoredFields();
8786
this.storedFieldLoader = StoredFieldLoader.create(false, storedFields);
8887
this.lastSeenSeqNo = fromSeqNo - 1;
@@ -194,6 +193,7 @@ private Translog.Operation[] loadDocuments(List<SearchRecord> documentRecords) t
194193

195194
leafFieldLoader = storedFieldLoader.getLoader(leafReaderContext, null);
196195
leafSourceLoader = sourceLoader.leaf(leafReaderContext.reader(), null);
196+
setNextSourceMetadataReader(leafReaderContext);
197197
}
198198
int segmentDocID = docRecord.docID() - docBase;
199199
leafFieldLoader.advanceTo(segmentDocID);
@@ -229,17 +229,16 @@ private Translog.Operation createOperation(
229229
return null;
230230
}
231231
}
232-
BytesReference source = sourceLoader.source(fieldLoader, segmentDocID).internalSourceRef();
232+
var sourceBytes = addSourceMetadata(sourceLoader.source(fieldLoader, segmentDocID).internalSourceRef(), segmentDocID);
233233
return new Translog.Index(
234234
fieldLoader.id(),
235235
docRecord.seqNo(),
236236
docRecord.primaryTerm(),
237237
docRecord.version(),
238-
source,
238+
sourceBytes,
239239
fieldLoader.routing(),
240240
-1 // autogenerated timestamp
241241
);
242242
}
243243
}
244-
245244
}

server/src/main/java/org/elasticsearch/index/engine/SearchBasedChangesSnapshot.java

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,17 @@
2222
import org.apache.lucene.search.SortField;
2323
import org.apache.lucene.search.TopDocs;
2424
import org.apache.lucene.search.TopFieldCollectorManager;
25+
import org.elasticsearch.common.bytes.BytesReference;
2526
import org.elasticsearch.common.lucene.Lucene;
2627
import org.elasticsearch.common.lucene.search.Queries;
2728
import org.elasticsearch.core.IOUtils;
2829
import org.elasticsearch.index.IndexVersion;
30+
import org.elasticsearch.index.mapper.InferenceMetadataFieldsMapper;
31+
import org.elasticsearch.index.mapper.MapperService;
2932
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
33+
import org.elasticsearch.index.mapper.ValueFetcher;
3034
import org.elasticsearch.index.translog.Translog;
35+
import org.elasticsearch.search.lookup.Source;
3136

3237
import java.io.Closeable;
3338
import java.io.IOException;
@@ -44,6 +49,7 @@ public abstract class SearchBasedChangesSnapshot implements Translog.Snapshot, C
4449

4550
private final IndexVersion indexVersionCreated;
4651
private final IndexSearcher indexSearcher;
52+
private final ValueFetcher sourceMetadataFetcher;
4753
private final Closeable onClose;
4854

4955
protected final long fromSeqNo, toSeqNo;
@@ -67,6 +73,7 @@ public abstract class SearchBasedChangesSnapshot implements Translog.Snapshot, C
6773
* @param indexVersionCreated Version of the index when it was created.
6874
*/
6975
protected SearchBasedChangesSnapshot(
76+
MapperService mapperService,
7077
Engine.Searcher engineSearcher,
7178
int searchBatchSize,
7279
long fromSeqNo,
@@ -103,6 +110,19 @@ protected SearchBasedChangesSnapshot(
103110

104111
this.accessStats = accessStats;
105112
this.totalHits = accessStats ? indexSearcher.count(rangeQuery(fromSeqNo, toSeqNo, indexVersionCreated)) : -1;
113+
this.sourceMetadataFetcher = createSourceMetadataValueFetcher(mapperService, indexSearcher);
114+
}
115+
116+
private ValueFetcher createSourceMetadataValueFetcher(MapperService mapperService, IndexSearcher searcher) {
117+
if (mapperService.mappingLookup().inferenceFields().isEmpty()) {
118+
return null;
119+
}
120+
var mapper = (InferenceMetadataFieldsMapper) mapperService.mappingLookup()
121+
.getMapping()
122+
.getMetadataMapperByName(InferenceMetadataFieldsMapper.NAME);
123+
return mapper != null
124+
? mapper.fieldType().valueFetcher(mapperService.mappingLookup(), mapperService.getBitSetProducer(), searcher)
125+
: null;
106126
}
107127

108128
/**
@@ -184,6 +204,45 @@ protected TopDocs nextTopDocs() throws IOException {
184204
return results;
185205
}
186206

207+
/**
208+
* Sets the reader context to enable reading metadata that was removed from the {@code _source}.
209+
* This method sets up the {@code sourceMetadataFetcher} with the provided {@link LeafReaderContext},
210+
* ensuring it is ready to fetch metadata for subsequent operations.
211+
*
212+
* <p>Note: This method should be called before {@link #addSourceMetadata(BytesReference, int)} at the start of every leaf
213+
* to ensure the metadata fetcher is properly initialized.</p>
214+
*/
215+
protected void setNextSourceMetadataReader(LeafReaderContext context) {
216+
if (sourceMetadataFetcher != null) {
217+
sourceMetadataFetcher.setNextReader(context);
218+
}
219+
}
220+
221+
/**
222+
* Creates a new {@link Source} object by combining the provided {@code originalSource}
223+
* with additional metadata fields. If the {@code sourceMetadataFetcher} is null or no metadata
224+
* fields are fetched, the original source is returned unchanged.
225+
*
226+
* @param originalSourceBytes the original source bytes
227+
* @param segmentDocID the document ID used to fetch metadata fields
228+
* @return a new {@link Source} instance containing the original data and additional metadata,
229+
* or the original source if no metadata is added
230+
* @throws IOException if an error occurs while fetching metadata values
231+
*/
232+
protected BytesReference addSourceMetadata(BytesReference originalSourceBytes, int segmentDocID) throws IOException {
233+
if (sourceMetadataFetcher == null) {
234+
return originalSourceBytes;
235+
}
236+
var originalSource = Source.fromBytes(originalSourceBytes);
237+
List<Object> values = sourceMetadataFetcher.fetchValues(originalSource, segmentDocID, List.of());
238+
if (values.isEmpty()) {
239+
return originalSourceBytes;
240+
}
241+
var map = originalSource.source();
242+
map.put(InferenceMetadataFieldsMapper.NAME, values.get(0));
243+
return Source.fromMap(map, originalSource.sourceContentType()).internalSourceRef();
244+
}
245+
187246
static IndexSearcher newIndexSearcher(Engine.Searcher engineSearcher) throws IOException {
188247
return new IndexSearcher(Lucene.wrapAllDocsLive(engineSearcher.getDirectoryReader()));
189248
}

server/src/main/java/org/elasticsearch/index/engine/TranslogDirectoryReader.java

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -159,18 +159,11 @@ static DirectoryReader createInMemoryReader(
159159
IndexWriterConfig.OpenMode.CREATE
160160
).setCodec(engineConfig.getCodec());
161161
try (IndexWriter writer = new IndexWriter(directory, writeConfig)) {
162-
writer.addDocument(parsedDocs.rootDoc());
162+
writer.addDocuments(parsedDocs.docs());
163163
final DirectoryReader reader = open(writer);
164-
if (reader.leaves().size() != 1 || reader.leaves().get(0).reader().numDocs() != 1) {
164+
if (reader.leaves().size() != 1) {
165165
reader.close();
166-
throw new IllegalStateException(
167-
"Expected a single document segment; "
168-
+ "but ["
169-
+ reader.leaves().size()
170-
+ " segments with "
171-
+ reader.leaves().get(0).reader().numDocs()
172-
+ " documents"
173-
);
166+
throw new IllegalStateException("Expected a single segment; " + "but got[" + reader.leaves().size() + "] segments");
174167
}
175168
return reader;
176169
} catch (IOException e) {
@@ -443,7 +436,7 @@ private void readStoredFieldsDirectly(StoredFieldVisitor visitor) throws IOExcep
443436
SourceFieldMapper mapper = mappingLookup.getMapping().getMetadataMapperByClass(SourceFieldMapper.class);
444437
if (mapper != null) {
445438
try {
446-
sourceBytes = mapper.applyFilters(mappingLookup, sourceBytes, null);
439+
sourceBytes = mapper.applyFilters(mappingLookup, sourceBytes, null, true);
447440
} catch (IOException e) {
448441
throw new IOException("Failed to reapply filters after reading from translog", e);
449442
}

server/src/main/java/org/elasticsearch/index/engine/TranslogOperationAsserter.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
import org.apache.lucene.search.similarities.BM25Similarity;
1313
import org.apache.lucene.store.ByteBuffersDirectory;
14+
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
1415
import org.elasticsearch.index.cache.query.TrivialQueryCachingPolicy;
1516
import org.elasticsearch.index.mapper.DocumentParser;
1617
import org.elasticsearch.index.mapper.MappingLookup;
@@ -54,7 +55,10 @@ static Translog.Index synthesizeSource(EngineConfig engineConfig, Translog.Index
5455
final DocumentParser documentParser = engineConfig.getMapperService().documentParser();
5556
try (
5657
var directory = new ByteBuffersDirectory();
57-
var reader = TranslogDirectoryReader.createInMemoryReader(shardId, engineConfig, directory, documentParser, mappingLookup, op)
58+
var reader = ElasticsearchDirectoryReader.wrap(
59+
TranslogDirectoryReader.createInMemoryReader(shardId, engineConfig, directory, documentParser, mappingLookup, op),
60+
new ShardId("index", "_na_", 0)
61+
)
5862
) {
5963
final Engine.Searcher searcher = new Engine.Searcher(
6064
"assert_translog",
@@ -66,7 +70,7 @@ static Translog.Index synthesizeSource(EngineConfig engineConfig, Translog.Index
6670
);
6771
try (
6872
LuceneSyntheticSourceChangesSnapshot snapshot = new LuceneSyntheticSourceChangesSnapshot(
69-
mappingLookup,
73+
engineConfig.getMapperService(),
7074
searcher,
7175
LuceneSyntheticSourceChangesSnapshot.DEFAULT_BATCH_SIZE,
7276
Integer.MAX_VALUE,

0 commit comments

Comments
 (0)