Skip to content

Commit 693f3bf

Browse files
authored
Adjust translog operation assertions for synthetic source (#119330)
When synthetic sources are used in peer recoveries, the translog operations via peer recoveries may differ from those created through replication. This change relaxes the translog operation assertion to account for synthetic source, allowing these operations to be considered equivalent. Closes #119191
1 parent ce2c5d9 commit 693f3bf

File tree

14 files changed

+428
-114
lines changed

14 files changed

+428
-114
lines changed

muted-tests.yml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -232,9 +232,6 @@ tests:
232232
- class: org.elasticsearch.xpack.esql.action.EsqlNodeFailureIT
233233
method: testFailureLoadingFields
234234
issue: https://github.com/elastic/elasticsearch/issues/118000
235-
- class: org.elasticsearch.smoketest.SmokeTestMultiNodeClientYamlTestSuiteIT
236-
method: test {yaml=indices.create/20_synthetic_source/create index with use_synthetic_source}
237-
issue: https://github.com/elastic/elasticsearch/issues/119191
238235
- class: org.elasticsearch.index.mapper.AbstractShapeGeometryFieldMapperTests
239236
method: testCartesianBoundsBlockLoader
240237
issue: https://github.com/elastic/elasticsearch/issues/119201

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -661,7 +661,8 @@ private Translog openTranslog(
661661
translogDeletionPolicy,
662662
globalCheckpointSupplier,
663663
engineConfig.getPrimaryTermSupplier(),
664-
persistedSequenceNumberConsumer
664+
persistedSequenceNumberConsumer,
665+
TranslogOperationAsserter.withEngineConfig(engineConfig)
665666
);
666667
}
667668

server/src/main/java/org/elasticsearch/index/engine/NoOpEngine.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,8 @@ public void trimUnreferencedTranslogFiles() {
166166
translogDeletionPolicy,
167167
engineConfig.getGlobalCheckpointSupplier(),
168168
engineConfig.getPrimaryTermSupplier(),
169-
seqNo -> {}
169+
seqNo -> {},
170+
TranslogOperationAsserter.DEFAULT
170171
)
171172
) {
172173
translog.trimUnreferencedReaders();

server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,8 @@ private static TranslogStats translogStats(final EngineConfig config, final Segm
267267
translogDeletionPolicy,
268268
config.getGlobalCheckpointSupplier(),
269269
config.getPrimaryTermSupplier(),
270-
seqNo -> {}
270+
seqNo -> {},
271+
TranslogOperationAsserter.DEFAULT
271272
)
272273
) {
273274
return translog.stats();

server/src/main/java/org/elasticsearch/index/engine/TranslogDirectoryReader.java

Lines changed: 41 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -99,10 +99,6 @@ private static UnsupportedOperationException unsupported() {
9999
return new UnsupportedOperationException();
100100
}
101101

102-
public TranslogLeafReader getLeafReader() {
103-
return leafReader;
104-
}
105-
106102
@Override
107103
protected DirectoryReader doOpenIfChanged() {
108104
throw unsupported();
@@ -143,6 +139,45 @@ public CacheHelper getReaderCacheHelper() {
143139
return leafReader.getReaderCacheHelper();
144140
}
145141

142+
static DirectoryReader createInMemoryReader(
143+
ShardId shardId,
144+
EngineConfig engineConfig,
145+
Directory directory,
146+
DocumentParser documentParser,
147+
MappingLookup mappingLookup,
148+
Translog.Index operation
149+
) {
150+
final ParsedDocument parsedDocs = documentParser.parseDocument(
151+
new SourceToParse(operation.id(), operation.source(), XContentHelper.xContentType(operation.source()), operation.routing()),
152+
mappingLookup
153+
);
154+
155+
parsedDocs.updateSeqID(operation.seqNo(), operation.primaryTerm());
156+
parsedDocs.version().setLongValue(operation.version());
157+
// To guarantee indexability, we configure the analyzer and codec using the main engine configuration
158+
final IndexWriterConfig writeConfig = new IndexWriterConfig(engineConfig.getAnalyzer()).setOpenMode(
159+
IndexWriterConfig.OpenMode.CREATE
160+
).setCodec(engineConfig.getCodec());
161+
try (IndexWriter writer = new IndexWriter(directory, writeConfig)) {
162+
writer.addDocument(parsedDocs.rootDoc());
163+
final DirectoryReader reader = open(writer);
164+
if (reader.leaves().size() != 1 || reader.leaves().get(0).reader().numDocs() != 1) {
165+
reader.close();
166+
throw new IllegalStateException(
167+
"Expected a single document segment; "
168+
+ "but ["
169+
+ reader.leaves().size()
170+
+ " segments with "
171+
+ reader.leaves().get(0).reader().numDocs()
172+
+ " documents"
173+
);
174+
}
175+
return reader;
176+
} catch (IOException e) {
177+
throw new EngineException(shardId, "failed to create an in-memory segment for get [" + operation.id() + "]", e);
178+
}
179+
}
180+
146181
private static class TranslogLeafReader extends LeafReader {
147182

148183
private static final FieldInfo FAKE_SOURCE_FIELD = new FieldInfo(
@@ -244,7 +279,8 @@ private LeafReader getDelegate() {
244279
ensureOpen();
245280
reader = delegate.get();
246281
if (reader == null) {
247-
reader = createInMemoryLeafReader();
282+
var indexReader = createInMemoryReader(shardId, engineConfig, directory, documentParser, mappingLookup, operation);
283+
reader = indexReader.leaves().get(0).reader();
248284
final LeafReader existing = delegate.getAndSet(reader);
249285
assert existing == null;
250286
onSegmentCreated.run();
@@ -254,39 +290,6 @@ private LeafReader getDelegate() {
254290
return reader;
255291
}
256292

257-
private LeafReader createInMemoryLeafReader() {
258-
assert Thread.holdsLock(this);
259-
final ParsedDocument parsedDocs = documentParser.parseDocument(
260-
new SourceToParse(operation.id(), operation.source(), XContentHelper.xContentType(operation.source()), operation.routing()),
261-
mappingLookup
262-
);
263-
264-
parsedDocs.updateSeqID(operation.seqNo(), operation.primaryTerm());
265-
parsedDocs.version().setLongValue(operation.version());
266-
// To guarantee indexability, we configure the analyzer and codec using the main engine configuration
267-
final IndexWriterConfig writeConfig = new IndexWriterConfig(engineConfig.getAnalyzer()).setOpenMode(
268-
IndexWriterConfig.OpenMode.CREATE
269-
).setCodec(engineConfig.getCodec());
270-
try (IndexWriter writer = new IndexWriter(directory, writeConfig)) {
271-
writer.addDocument(parsedDocs.rootDoc());
272-
final DirectoryReader reader = open(writer);
273-
if (reader.leaves().size() != 1 || reader.leaves().get(0).reader().numDocs() != 1) {
274-
reader.close();
275-
throw new IllegalStateException(
276-
"Expected a single document segment; "
277-
+ "but ["
278-
+ reader.leaves().size()
279-
+ " segments with "
280-
+ reader.leaves().get(0).reader().numDocs()
281-
+ " documents"
282-
);
283-
}
284-
return reader.leaves().get(0).reader();
285-
} catch (IOException e) {
286-
throw new EngineException(shardId, "failed to create an in-memory segment for get [" + operation.id() + "]", e);
287-
}
288-
}
289-
290293
@Override
291294
public CacheHelper getCoreCacheHelper() {
292295
return getDelegate().getCoreCacheHelper();
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.index.engine;
11+
12+
import org.apache.lucene.search.similarities.BM25Similarity;
13+
import org.apache.lucene.store.ByteBuffersDirectory;
14+
import org.elasticsearch.index.cache.query.TrivialQueryCachingPolicy;
15+
import org.elasticsearch.index.mapper.DocumentParser;
16+
import org.elasticsearch.index.mapper.MappingLookup;
17+
import org.elasticsearch.index.shard.ShardId;
18+
import org.elasticsearch.index.translog.Translog;
19+
20+
import java.io.IOException;
21+
22+
/**
23+
*
24+
* A utility class to assert that translog operations with the same sequence number
25+
* in the same generation are either identical or equivalent when synthetic sources are used.
26+
*/
27+
public abstract class TranslogOperationAsserter {
28+
public static final TranslogOperationAsserter DEFAULT = new TranslogOperationAsserter() {
29+
};
30+
31+
private TranslogOperationAsserter() {
32+
33+
}
34+
35+
public static TranslogOperationAsserter withEngineConfig(EngineConfig engineConfig) {
36+
return new TranslogOperationAsserter() {
37+
@Override
38+
public boolean assertSameIndexOperation(Translog.Index o1, Translog.Index o2) throws IOException {
39+
if (super.assertSameIndexOperation(o1, o2)) {
40+
return true;
41+
}
42+
if (engineConfig.getIndexSettings().isRecoverySourceSyntheticEnabled()) {
43+
return super.assertSameIndexOperation(synthesizeSource(engineConfig, o1), o2)
44+
|| super.assertSameIndexOperation(o1, synthesizeSource(engineConfig, o2));
45+
}
46+
return false;
47+
}
48+
};
49+
}
50+
51+
static Translog.Index synthesizeSource(EngineConfig engineConfig, Translog.Index op) throws IOException {
52+
final ShardId shardId = engineConfig.getShardId();
53+
final MappingLookup mappingLookup = engineConfig.getMapperService().mappingLookup();
54+
final DocumentParser documentParser = engineConfig.getMapperService().documentParser();
55+
try (
56+
var directory = new ByteBuffersDirectory();
57+
var reader = TranslogDirectoryReader.createInMemoryReader(shardId, engineConfig, directory, documentParser, mappingLookup, op)
58+
) {
59+
final Engine.Searcher searcher = new Engine.Searcher(
60+
"assert_translog",
61+
reader,
62+
new BM25Similarity(),
63+
null,
64+
TrivialQueryCachingPolicy.NEVER,
65+
() -> {}
66+
);
67+
try (
68+
LuceneSyntheticSourceChangesSnapshot snapshot = new LuceneSyntheticSourceChangesSnapshot(
69+
mappingLookup,
70+
searcher,
71+
LuceneSyntheticSourceChangesSnapshot.DEFAULT_BATCH_SIZE,
72+
Integer.MAX_VALUE,
73+
op.seqNo(),
74+
op.seqNo(),
75+
true,
76+
false,
77+
engineConfig.getIndexSettings().getIndexVersionCreated()
78+
)
79+
) {
80+
final Translog.Operation normalized = snapshot.next();
81+
assert normalized != null : "expected one operation; got zero";
82+
return (Translog.Index) normalized;
83+
}
84+
}
85+
}
86+
87+
public boolean assertSameIndexOperation(Translog.Index o1, Translog.Index o2) throws IOException {
88+
return Translog.Index.equalsWithoutAutoGeneratedTimestamp(o1, o2);
89+
}
90+
}

server/src/main/java/org/elasticsearch/index/translog/Translog.java

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.elasticsearch.core.Releasable;
2828
import org.elasticsearch.index.IndexSettings;
2929
import org.elasticsearch.index.engine.Engine;
30+
import org.elasticsearch.index.engine.TranslogOperationAsserter;
3031
import org.elasticsearch.index.mapper.IdFieldMapper;
3132
import org.elasticsearch.index.mapper.MapperService;
3233
import org.elasticsearch.index.mapper.Uid;
@@ -123,6 +124,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
123124
private final TranslogDeletionPolicy deletionPolicy;
124125
private final LongConsumer persistedSequenceNumberConsumer;
125126
private final OperationListener operationListener;
127+
private final TranslogOperationAsserter operationAsserter;
126128

127129
/**
128130
* Creates a new Translog instance. This method will create a new transaction log unless the given {@link TranslogGeneration} is
@@ -150,14 +152,16 @@ public Translog(
150152
TranslogDeletionPolicy deletionPolicy,
151153
final LongSupplier globalCheckpointSupplier,
152154
final LongSupplier primaryTermSupplier,
153-
final LongConsumer persistedSequenceNumberConsumer
155+
final LongConsumer persistedSequenceNumberConsumer,
156+
final TranslogOperationAsserter operationAsserter
154157
) throws IOException {
155158
super(config.getShardId(), config.getIndexSettings());
156159
this.config = config;
157160
this.globalCheckpointSupplier = globalCheckpointSupplier;
158161
this.primaryTermSupplier = primaryTermSupplier;
159162
this.persistedSequenceNumberConsumer = persistedSequenceNumberConsumer;
160163
this.operationListener = config.getOperationListener();
164+
this.operationAsserter = operationAsserter;
161165
this.deletionPolicy = deletionPolicy;
162166
this.translogUUID = translogUUID;
163167
this.bigArrays = config.getBigArrays();
@@ -586,6 +590,7 @@ TranslogWriter createWriter(
586590
bigArrays,
587591
diskIoBufferPool,
588592
operationListener,
593+
operationAsserter,
589594
config.fsync()
590595
);
591596
} catch (final IOException e) {
@@ -1269,17 +1274,8 @@ public boolean equals(Object o) {
12691274
return false;
12701275
}
12711276

1272-
Index index = (Index) o;
1273-
1274-
if (version != index.version
1275-
|| seqNo != index.seqNo
1276-
|| primaryTerm != index.primaryTerm
1277-
|| id.equals(index.id) == false
1278-
|| autoGeneratedIdTimestamp != index.autoGeneratedIdTimestamp
1279-
|| source.equals(index.source) == false) {
1280-
return false;
1281-
}
1282-
return Objects.equals(routing, index.routing);
1277+
Index other = (Index) o;
1278+
return autoGeneratedIdTimestamp == other.autoGeneratedIdTimestamp && equalsWithoutAutoGeneratedTimestamp(this, other);
12831279
}
12841280

12851281
@Override
@@ -1315,6 +1311,15 @@ public long getAutoGeneratedIdTimestamp() {
13151311
return autoGeneratedIdTimestamp;
13161312
}
13171313

1314+
public static boolean equalsWithoutAutoGeneratedTimestamp(Translog.Index o1, Translog.Index o2) {
1315+
return o1.version == o2.version
1316+
&& o1.seqNo == o2.seqNo
1317+
&& o1.primaryTerm == o2.primaryTerm
1318+
&& o1.id.equals(o2.id)
1319+
&& o1.source.equals(o2.source)
1320+
&& Objects.equals(o1.routing, o2.routing);
1321+
}
1322+
13181323
}
13191324

13201325
public static final class Delete extends Operation {
@@ -1962,6 +1967,7 @@ public static String createEmptyTranslog(
19621967
BigArrays.NON_RECYCLING_INSTANCE,
19631968
DiskIoBufferPool.INSTANCE,
19641969
TranslogConfig.NOOP_OPERATION_LISTENER,
1970+
TranslogOperationAsserter.DEFAULT,
19651971
true
19661972
);
19671973
writer.close();

0 commit comments

Comments
 (0)