Skip to content

Commit 7b8f545

Browse files
authored
Fix realtime get of nested fields with synthetic source (#119575)
Today, for get-from-translog operations, we only need to reindex the root document into an in-memory Lucene, as the _source is stored in the root document and is sufficient. However, synthesizing the source for nested fields requires both the root document and its child documents. This causes realtime-get operations (as well as update and update-by-query operations) to miss nested fields. Another issue is that the translog operation is reindexed lazily during get-from-translog operations. As a result, two realtime-get operations can return slightly different outputs: one reading from the translog and the other from Lucene. This change resolves both issues. However, addressing the second issue can degrade the performance of realtime-get and update operations. If slight inconsistencies are acceptable, the translog operation should be reindexed lazily instead. Closes #119553
1 parent 97ae2db commit 7b8f545

File tree

6 files changed

+209
-26
lines changed

6 files changed

+209
-26
lines changed

docs/changelog/119575.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 119575
2+
summary: Fix realtime get of nested fields with synthetic source
3+
area: Mapping
4+
type: bug
5+
issues:
6+
- 119553

muted-tests.yml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -213,9 +213,6 @@ tests:
213213
- class: org.elasticsearch.smoketest.MlWithSecurityIT
214214
method: test {yaml=ml/sparse_vector_search/Test sparse_vector search with query vector and pruning config}
215215
issue: https://github.com/elastic/elasticsearch/issues/119548
216-
- class: org.elasticsearch.index.engine.LuceneSyntheticSourceChangesSnapshotTests
217-
method: testSkipNonRootOfNestedDocuments
218-
issue: https://github.com/elastic/elasticsearch/issues/119553
219216
- class: org.elasticsearch.xpack.ml.integration.ForecastIT
220217
method: testOverflowToDisk
221218
issue: https://github.com/elastic/elasticsearch/issues/117740

server/src/internalClusterTest/java/org/elasticsearch/get/GetActionIT.java

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.elasticsearch.action.get.MultiGetResponse;
2525
import org.elasticsearch.action.index.IndexRequest;
2626
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
27+
import org.elasticsearch.action.support.WriteRequest;
2728
import org.elasticsearch.action.support.broadcast.BroadcastResponse;
2829
import org.elasticsearch.common.Randomness;
2930
import org.elasticsearch.common.Strings;
@@ -33,8 +34,10 @@
3334
import org.elasticsearch.core.CheckedFunction;
3435
import org.elasticsearch.core.Nullable;
3536
import org.elasticsearch.index.IndexModule;
37+
import org.elasticsearch.index.IndexSettings;
3638
import org.elasticsearch.index.engine.EngineTestCase;
3739
import org.elasticsearch.index.engine.VersionConflictEngineException;
40+
import org.elasticsearch.index.mapper.SourceFieldMapper;
3841
import org.elasticsearch.plugins.Plugin;
3942
import org.elasticsearch.rest.RestStatus;
4043
import org.elasticsearch.test.ESIntegTestCase;
@@ -932,6 +935,102 @@ public void testGetRemoteIndex() {
932935
);
933936
}
934937

938+
public void testRealTimeGetNestedFields() {
939+
String index = "test";
940+
SourceFieldMapper.Mode sourceMode = randomFrom(SourceFieldMapper.Mode.values());
941+
assertAcked(
942+
prepareCreate(index).setMapping("title", "type=keyword", "author", "type=nested")
943+
.setSettings(
944+
indexSettings(1, 0).put("index.refresh_interval", -1)
945+
.put(IndexSettings.INDEX_MAPPER_SOURCE_MODE_SETTING.getKey(), sourceMode)
946+
)
947+
);
948+
ensureGreen();
949+
String source0 = """
950+
{
951+
"title": "t0",
952+
"author": [
953+
{
954+
"name": "a0"
955+
}
956+
]
957+
}
958+
""";
959+
prepareIndex(index).setRefreshPolicy(WriteRequest.RefreshPolicy.NONE).setId("0").setSource(source0, XContentType.JSON).get();
960+
// start tracking translog locations
961+
assertTrue(client().prepareGet(index, "0").setRealtime(true).get().isExists());
962+
String source1 = """
963+
{
964+
"title": ["t1"],
965+
"author": [
966+
{
967+
"name": "a1"
968+
}
969+
]
970+
}
971+
""";
972+
prepareIndex(index).setRefreshPolicy(WriteRequest.RefreshPolicy.NONE).setId("1").setSource(source1, XContentType.JSON).get();
973+
String source2 = """
974+
{
975+
"title": ["t1", "t2"],
976+
"author": [
977+
{
978+
"name": "a1"
979+
},
980+
{
981+
"name": "a2"
982+
}
983+
]
984+
}
985+
""";
986+
prepareIndex(index).setRefreshPolicy(WriteRequest.RefreshPolicy.NONE).setId("2").setSource(source2, XContentType.JSON).get();
987+
String source3 = """
988+
{
989+
"title": ["t1", "t3", "t2"]
990+
}
991+
""";
992+
prepareIndex(index).setRefreshPolicy(WriteRequest.RefreshPolicy.NONE).setId("3").setSource(source3, XContentType.JSON).get();
993+
GetResponse translog1 = client().prepareGet(index, "1").setRealtime(true).get();
994+
GetResponse translog2 = client().prepareGet(index, "2").setRealtime(true).get();
995+
GetResponse translog3 = client().prepareGet(index, "3").setRealtime(true).get();
996+
assertTrue(translog1.isExists());
997+
assertTrue(translog2.isExists());
998+
assertTrue(translog3.isExists());
999+
switch (sourceMode) {
1000+
case STORED -> {
1001+
assertThat(translog1.getSourceAsBytesRef().utf8ToString(), equalTo(source1));
1002+
assertThat(translog2.getSourceAsBytesRef().utf8ToString(), equalTo(source2));
1003+
assertThat(translog3.getSourceAsBytesRef().utf8ToString(), equalTo(source3));
1004+
}
1005+
case SYNTHETIC -> {
1006+
assertThat(translog1.getSourceAsBytesRef().utf8ToString(), equalTo("""
1007+
{"author":{"name":"a1"},"title":"t1"}"""));
1008+
assertThat(translog2.getSourceAsBytesRef().utf8ToString(), equalTo("""
1009+
{"author":[{"name":"a1"},{"name":"a2"}],"title":["t1","t2"]}"""));
1010+
assertThat(translog3.getSourceAsBytesRef().utf8ToString(), equalTo("""
1011+
{"title":["t1","t2","t3"]}"""));
1012+
}
1013+
case DISABLED -> {
1014+
assertNull(translog1.getSourceAsBytesRef());
1015+
assertNull(translog2.getSourceAsBytesRef());
1016+
assertNull(translog3.getSourceAsBytesRef());
1017+
}
1018+
}
1019+
assertFalse(client().prepareGet(index, "1").setRealtime(false).get().isExists());
1020+
assertFalse(client().prepareGet(index, "2").setRealtime(false).get().isExists());
1021+
assertFalse(client().prepareGet(index, "3").setRealtime(false).get().isExists());
1022+
refresh(index);
1023+
GetResponse lucene1 = client().prepareGet(index, "1").setRealtime(randomBoolean()).get();
1024+
GetResponse lucene2 = client().prepareGet(index, "2").setRealtime(randomBoolean()).get();
1025+
GetResponse lucene3 = client().prepareGet(index, "3").setRealtime(randomBoolean()).get();
1026+
assertTrue(lucene1.isExists());
1027+
assertTrue(lucene2.isExists());
1028+
assertTrue(lucene3.isExists());
1029+
assertThat(translog1.getSourceAsBytesRef(), equalTo(lucene1.getSourceAsBytesRef()));
1030+
assertThat(translog2.getSourceAsBytesRef(), equalTo(lucene2.getSourceAsBytesRef()));
1031+
assertThat(translog3.getSourceAsBytesRef(), equalTo(lucene3.getSourceAsBytesRef()));
1032+
}
1033+
9351034
private void assertGetFieldsAlwaysWorks(String index, String docId, String[] fields) {
9361035
assertGetFieldsAlwaysWorks(index, docId, fields, null);
9371036
}

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -819,7 +819,7 @@ private GetResult getFromTranslog(
819819
) throws IOException {
820820
assert get.isReadFromTranslog();
821821
translogGetCount.incrementAndGet();
822-
final TranslogDirectoryReader inMemoryReader = new TranslogDirectoryReader(
822+
final DirectoryReader inMemoryReader = TranslogDirectoryReader.create(
823823
shardId,
824824
index,
825825
mappingLookup,

server/src/main/java/org/elasticsearch/index/engine/TranslogDirectoryReader.java

Lines changed: 102 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
package org.elasticsearch.index.engine;
1111

12+
import org.apache.lucene.codecs.StoredFieldsReader;
1213
import org.apache.lucene.index.BaseTermsEnum;
1314
import org.apache.lucene.index.BinaryDocValues;
1415
import org.apache.lucene.index.ByteVectorValues;
@@ -46,6 +47,9 @@
4647
import org.apache.lucene.util.Bits;
4748
import org.apache.lucene.util.BytesRef;
4849
import org.elasticsearch.common.bytes.BytesReference;
50+
import org.elasticsearch.common.lucene.Lucene;
51+
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
52+
import org.elasticsearch.common.lucene.index.SequentialStoredFieldsLeafReader;
4953
import org.elasticsearch.common.xcontent.XContentHelper;
5054
import org.elasticsearch.core.IOUtils;
5155
import org.elasticsearch.index.fieldvisitor.FieldNamesProvidingStoredFieldsVisitor;
@@ -76,21 +80,49 @@
7680
* into an in-memory Lucene segment that is created on-demand.
7781
*/
7882
final class TranslogDirectoryReader extends DirectoryReader {
79-
private final TranslogLeafReader leafReader;
83+
private final LeafReader leafReader;
8084

81-
TranslogDirectoryReader(
85+
static DirectoryReader create(
8286
ShardId shardId,
8387
Translog.Index operation,
8488
MappingLookup mappingLookup,
8589
DocumentParser documentParser,
8690
EngineConfig engineConfig,
8791
Runnable onSegmentCreated
8892
) throws IOException {
89-
this(new TranslogLeafReader(shardId, operation, mappingLookup, documentParser, engineConfig, onSegmentCreated));
93+
final Directory directory = new ByteBuffersDirectory();
94+
boolean success = false;
95+
try {
96+
final LeafReader leafReader;
97+
// When using synthetic source, the translog operation must always be reindexed into an in-memory Lucene to ensure consistent
98+
// output for realtime-get operations. However, this can degrade the performance of realtime-get and update operations.
99+
// If slight inconsistencies in realtime-get operations are acceptable, the translog operation can be reindexed lazily.
100+
if (mappingLookup.isSourceSynthetic()) {
101+
onSegmentCreated.run();
102+
leafReader = createInMemoryReader(shardId, engineConfig, directory, documentParser, mappingLookup, false, operation);
103+
} else {
104+
leafReader = new TranslogLeafReader(
105+
shardId,
106+
operation,
107+
mappingLookup,
108+
documentParser,
109+
engineConfig,
110+
directory,
111+
onSegmentCreated
112+
);
113+
}
114+
var directoryReader = ElasticsearchDirectoryReader.wrap(new TranslogDirectoryReader(directory, leafReader), shardId);
115+
success = true;
116+
return directoryReader;
117+
} finally {
118+
if (success == false) {
119+
IOUtils.closeWhileHandlingException(directory);
120+
}
121+
}
90122
}
91123

92-
private TranslogDirectoryReader(TranslogLeafReader leafReader) throws IOException {
93-
super(leafReader.directory, new LeafReader[] { leafReader }, null);
124+
private TranslogDirectoryReader(Directory directory, LeafReader leafReader) throws IOException {
125+
super(directory, new LeafReader[] { leafReader }, null);
94126
this.leafReader = leafReader;
95127
}
96128

@@ -139,12 +171,13 @@ public CacheHelper getReaderCacheHelper() {
139171
return leafReader.getReaderCacheHelper();
140172
}
141173

142-
static DirectoryReader createInMemoryReader(
174+
private static LeafReader createInMemoryReader(
143175
ShardId shardId,
144176
EngineConfig engineConfig,
145177
Directory directory,
146178
DocumentParser documentParser,
147179
MappingLookup mappingLookup,
180+
boolean rootDocOnly,
148181
Translog.Index operation
149182
) {
150183
final ParsedDocument parsedDocs = documentParser.parseDocument(
@@ -159,13 +192,55 @@ static DirectoryReader createInMemoryReader(
159192
IndexWriterConfig.OpenMode.CREATE
160193
).setCodec(engineConfig.getCodec());
161194
try (IndexWriter writer = new IndexWriter(directory, writeConfig)) {
162-
writer.addDocuments(parsedDocs.docs());
195+
final int numDocs;
196+
if (rootDocOnly) {
197+
numDocs = 1;
198+
writer.addDocument(parsedDocs.rootDoc());
199+
} else {
200+
numDocs = parsedDocs.docs().size();
201+
writer.addDocuments(parsedDocs.docs());
202+
}
163203
final DirectoryReader reader = open(writer);
164-
if (reader.leaves().size() != 1) {
204+
if (reader.leaves().size() != 1 || reader.leaves().get(0).reader().numDocs() != numDocs) {
165205
reader.close();
166-
throw new IllegalStateException("Expected a single segment; " + "but got[" + reader.leaves().size() + "] segments");
206+
throw new IllegalStateException(
207+
"Expected a single segment with "
208+
+ numDocs
209+
+ " documents, "
210+
+ "but ["
211+
+ reader.leaves().size()
212+
+ " segments with "
213+
+ reader.leaves().get(0).reader().numDocs()
214+
+ " documents"
215+
);
167216
}
168-
return reader;
217+
LeafReader leafReader = reader.leaves().get(0).reader();
218+
return new SequentialStoredFieldsLeafReader(leafReader) {
219+
@Override
220+
protected void doClose() throws IOException {
221+
IOUtils.close(super::doClose, directory);
222+
}
223+
224+
@Override
225+
public CacheHelper getCoreCacheHelper() {
226+
return leafReader.getCoreCacheHelper();
227+
}
228+
229+
@Override
230+
public CacheHelper getReaderCacheHelper() {
231+
return leafReader.getReaderCacheHelper();
232+
}
233+
234+
@Override
235+
public StoredFieldsReader getSequentialStoredFieldsReader() {
236+
return Lucene.segmentReader(leafReader).getFieldsReader().getMergeInstance();
237+
}
238+
239+
@Override
240+
protected StoredFieldsReader doGetSequentialStoredFieldsReader(StoredFieldsReader reader) {
241+
return reader;
242+
}
243+
};
169244
} catch (IOException e) {
170245
throw new EngineException(shardId, "failed to create an in-memory segment for get [" + operation.id() + "]", e);
171246
}
@@ -252,6 +327,7 @@ private static class TranslogLeafReader extends LeafReader {
252327
MappingLookup mappingLookup,
253328
DocumentParser documentParser,
254329
EngineConfig engineConfig,
330+
Directory directory,
255331
Runnable onSegmentCreated
256332
) {
257333
this.shardId = shardId;
@@ -260,7 +336,7 @@ private static class TranslogLeafReader extends LeafReader {
260336
this.documentParser = documentParser;
261337
this.engineConfig = engineConfig;
262338
this.onSegmentCreated = onSegmentCreated;
263-
this.directory = new ByteBuffersDirectory();
339+
this.directory = directory;
264340
this.uid = Uid.encodeId(operation.id());
265341
}
266342

@@ -272,7 +348,15 @@ private LeafReader getDelegate() {
272348
ensureOpen();
273349
reader = delegate.get();
274350
if (reader == null) {
275-
var indexReader = createInMemoryReader(shardId, engineConfig, directory, documentParser, mappingLookup, operation);
351+
var indexReader = createInMemoryReader(
352+
shardId,
353+
engineConfig,
354+
directory,
355+
documentParser,
356+
mappingLookup,
357+
true,
358+
operation
359+
);
276360
reader = indexReader.leaves().get(0).reader();
277361
final LeafReader existing = delegate.getAndSet(reader);
278362
assert existing == null;
@@ -457,7 +541,12 @@ private void readStoredFieldsDirectly(StoredFieldVisitor visitor) throws IOExcep
457541

458542
@Override
459543
protected synchronized void doClose() throws IOException {
460-
IOUtils.close(delegate.get(), directory);
544+
final LeafReader leaf = delegate.get();
545+
if (leaf != null) {
546+
leaf.close();
547+
} else {
548+
directory.close();
549+
}
461550
}
462551
}
463552

server/src/main/java/org/elasticsearch/index/engine/TranslogOperationAsserter.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,6 @@
1010
package org.elasticsearch.index.engine;
1111

1212
import org.apache.lucene.search.similarities.BM25Similarity;
13-
import org.apache.lucene.store.ByteBuffersDirectory;
14-
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
1513
import org.elasticsearch.index.cache.query.TrivialQueryCachingPolicy;
1614
import org.elasticsearch.index.mapper.DocumentParser;
1715
import org.elasticsearch.index.mapper.MappingLookup;
@@ -53,13 +51,7 @@ static Translog.Index synthesizeSource(EngineConfig engineConfig, Translog.Index
5351
final ShardId shardId = engineConfig.getShardId();
5452
final MappingLookup mappingLookup = engineConfig.getMapperService().mappingLookup();
5553
final DocumentParser documentParser = engineConfig.getMapperService().documentParser();
56-
try (
57-
var directory = new ByteBuffersDirectory();
58-
var reader = ElasticsearchDirectoryReader.wrap(
59-
TranslogDirectoryReader.createInMemoryReader(shardId, engineConfig, directory, documentParser, mappingLookup, op),
60-
new ShardId("index", "_na_", 0)
61-
)
62-
) {
54+
try (var reader = TranslogDirectoryReader.create(shardId, op, mappingLookup, documentParser, engineConfig, () -> {})) {
6355
final Engine.Searcher searcher = new Engine.Searcher(
6456
"assert_translog",
6557
reader,

0 commit comments

Comments
 (0)