Skip to content

Commit fcdcba8

Browse files
authored
No translog for search-only shards (#93082)
In due course I expect we will address this more fully, but for now we want to avoid needing to associate a new translog with a search shard during startup. This breaks the finalization phase of peer recovery because we use the translog checkpoint to hold the GCP, so with this commit we just keep hold of the GCP in memory on search shards.
1 parent 54a78c3 commit fcdcba8

File tree

5 files changed

+68
-26
lines changed

5 files changed

+68
-26
lines changed

server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/ShardRoutingRoleIT.java

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,12 @@
2525
import org.elasticsearch.common.settings.Settings;
2626
import org.elasticsearch.common.util.CollectionUtils;
2727
import org.elasticsearch.core.Nullable;
28+
import org.elasticsearch.index.IndexSettings;
29+
import org.elasticsearch.index.engine.EngineFactory;
30+
import org.elasticsearch.index.engine.NoOpEngine;
31+
import org.elasticsearch.index.translog.TranslogStats;
2832
import org.elasticsearch.plugins.ClusterPlugin;
33+
import org.elasticsearch.plugins.EnginePlugin;
2934
import org.elasticsearch.plugins.Plugin;
3035
import org.elasticsearch.plugins.PluginsService;
3136
import org.elasticsearch.snapshots.SnapshotState;
@@ -37,6 +42,7 @@
3742
import java.util.HashSet;
3843
import java.util.List;
3944
import java.util.Map;
45+
import java.util.Optional;
4046

4147
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
4248
import static org.hamcrest.Matchers.anEmptyMap;
@@ -50,7 +56,7 @@ public class ShardRoutingRoleIT extends ESIntegTestCase {
5056

5157
private static final Logger logger = LogManager.getLogger(ShardRoutingRoleIT.class);
5258

53-
public static class TestPlugin extends Plugin implements ClusterPlugin {
59+
public static class TestPlugin extends Plugin implements ClusterPlugin, EnginePlugin {
5460

5561
volatile int numIndexingCopies = 1;
5662

@@ -83,6 +89,17 @@ public Decision canForceAllocatePrimary(ShardRouting shardRouting, RoutingNode n
8389
}
8490
});
8591
}
92+
93+
@Override
94+
public Optional<EngineFactory> getEngineFactory(IndexSettings indexSettings) {
95+
// TODO revert this once replication/recovery ignores unpromotable shards
96+
return Optional.of(config -> new NoOpEngine(config, new TranslogStats(0, 0, 0, 0, 0)));
97+
}
98+
}
99+
100+
@Override
101+
protected boolean addMockInternalEngine() {
102+
return false;
86103
}
87104

88105
@Override
@@ -356,7 +373,7 @@ public AllocationCommand getCancelPrimaryCommand() {
356373
return null;
357374
}
358375

359-
public void testSearchRouting() throws InterruptedException {
376+
public void testSearchRouting() {
360377

361378
var routingTableWatcher = new RoutingTableWatcher();
362379
routingTableWatcher.numReplicas = Math.max(1, routingTableWatcher.numReplicas);
@@ -371,7 +388,7 @@ public void testSearchRouting() throws InterruptedException {
371388
masterClusterService.addListener(routingTableWatcher);
372389

373390
createIndex(INDEX_NAME, routingTableWatcher.getIndexSettings());
374-
indexRandom(true, INDEX_NAME, between(1, 100));
391+
// TODO index some documents here once recovery/replication ignore unpromotable shards
375392
ensureGreen(INDEX_NAME);
376393

377394
final var searchShardProfileKeys = new HashSet<String>();

server/src/main/java/org/elasticsearch/index/engine/NoOpEngine.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,14 @@
1717
import org.apache.lucene.store.Directory;
1818
import org.elasticsearch.common.lucene.Lucene;
1919
import org.elasticsearch.common.util.concurrent.ReleasableLock;
20+
import org.elasticsearch.core.Nullable;
2021
import org.elasticsearch.index.seqno.SequenceNumbers;
2122
import org.elasticsearch.index.shard.DocsStats;
2223
import org.elasticsearch.index.store.Store;
2324
import org.elasticsearch.index.translog.Translog;
2425
import org.elasticsearch.index.translog.TranslogConfig;
2526
import org.elasticsearch.index.translog.TranslogDeletionPolicy;
27+
import org.elasticsearch.index.translog.TranslogStats;
2628

2729
import java.io.IOException;
2830
import java.io.UncheckedIOException;
@@ -43,7 +45,11 @@ public final class NoOpEngine extends ReadOnlyEngine {
4345
private final DocsStats docsStats;
4446

4547
public NoOpEngine(EngineConfig config) {
46-
super(config, null, null, true, Function.identity(), true, true);
48+
this(config, null);
49+
}
50+
51+
public NoOpEngine(EngineConfig config, @Nullable TranslogStats translogStats) {
52+
super(config, null, translogStats, true, Function.identity(), true, true);
4753
this.segmentsStats = new SegmentsStats();
4854
Directory directory = store.directory();
4955
try (DirectoryReader reader = openDirectory(directory)) {

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -291,6 +291,9 @@ Runnable getGlobalCheckpointSyncer() {
291291
private volatile long startedRelativeTimeInNanos;
292292
private volatile long indexingTimeBeforeShardStartedInNanos;
293293

294+
// the translog keeps track of the GCP, but unpromotable shards have no translog so we need to track the GCP here instead
295+
private volatile long globalCheckPointIfUnpromotable;
296+
294297
public IndexShard(
295298
final ShardRouting shardRouting,
296299
final IndexSettings indexSettings,
@@ -1587,6 +1590,11 @@ static Engine.Searcher wrapSearcher(
15871590
); // completes stats recording
15881591
}
15891592

1593+
public void setGlobalCheckpointIfUnpromotable(long globalCheckpoint) {
1594+
assert shardRouting.isPromotableToPrimary() == false : "must only call this on unpromotable shards";
1595+
globalCheckPointIfUnpromotable = globalCheckpoint;
1596+
}
1597+
15901598
private static final class NonClosingReaderWrapper extends FilterDirectoryReader {
15911599

15921600
private NonClosingReaderWrapper(DirectoryReader in) throws IOException {
@@ -1885,12 +1893,16 @@ int runTranslogRecovery(Engine engine, Translog.Snapshot snapshot, Engine.Operat
18851893
}
18861894

18871895
private void loadGlobalCheckpointToReplicationTracker() throws IOException {
1888-
// we have to set it before we open an engine and recover from the translog because
1889-
// acquiring a snapshot from the translog causes a sync which causes the global checkpoint to be pulled in,
1890-
// and an engine can be forced to close in ctor which also causes the global checkpoint to be pulled in.
1891-
final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY);
1892-
final long globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath(), translogUUID);
1893-
replicationTracker.updateGlobalCheckpointOnReplica(globalCheckpoint, "read from translog checkpoint");
1896+
if (shardRouting.isPromotableToPrimary()) {
1897+
// we have to set it before we open an engine and recover from the translog because
1898+
// acquiring a snapshot from the translog causes a sync which causes the global checkpoint to be pulled in,
1899+
// and an engine can be forced to close in ctor which also causes the global checkpoint to be pulled in.
1900+
final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY);
1901+
final long globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath(), translogUUID);
1902+
replicationTracker.updateGlobalCheckpointOnReplica(globalCheckpoint, "read from translog checkpoint");
1903+
} else {
1904+
replicationTracker.updateGlobalCheckpointOnReplica(globalCheckPointIfUnpromotable, "from CleanFilesRequest");
1905+
}
18941906
}
18951907

18961908
/**

server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -313,14 +313,20 @@ public static StartRecoveryRequest getStartRecoveryRequest(
313313

314314
Store.MetadataSnapshot metadataSnapshot;
315315
try {
316-
metadataSnapshot = recoveryTarget.indexShard().snapshotStoreMetadata();
317-
// Make sure that the current translog is consistent with the Lucene index; otherwise, we have to throw away the Lucene index.
318-
try {
319-
final String expectedTranslogUUID = metadataSnapshot.commitUserData().get(Translog.TRANSLOG_UUID_KEY);
320-
final long globalCheckpoint = Translog.readGlobalCheckpoint(recoveryTarget.translogLocation(), expectedTranslogUUID);
321-
assert globalCheckpoint + 1 >= startingSeqNo : "invalid startingSeqNo " + startingSeqNo + " >= " + globalCheckpoint;
322-
} catch (IOException | TranslogCorruptedException e) {
323-
logGlobalCheckpointWarning(logger, startingSeqNo, e);
316+
if (recoveryTarget.indexShard().routingEntry().isPromotableToPrimary()) {
317+
metadataSnapshot = recoveryTarget.indexShard().snapshotStoreMetadata();
318+
// Make sure that the current translog is consistent with the Lucene index; otherwise, we have to throw away the Lucene
319+
// index.
320+
try {
321+
final String expectedTranslogUUID = metadataSnapshot.commitUserData().get(Translog.TRANSLOG_UUID_KEY);
322+
final long globalCheckpoint = Translog.readGlobalCheckpoint(recoveryTarget.translogLocation(), expectedTranslogUUID);
323+
assert globalCheckpoint + 1 >= startingSeqNo : "invalid startingSeqNo " + startingSeqNo + " >= " + globalCheckpoint;
324+
} catch (IOException | TranslogCorruptedException e) {
325+
logGlobalCheckpointWarning(logger, startingSeqNo, e);
326+
metadataSnapshot = Store.MetadataSnapshot.EMPTY;
327+
startingSeqNo = UNASSIGNED_SEQ_NO;
328+
}
329+
} else {
324330
metadataSnapshot = Store.MetadataSnapshot.EMPTY;
325331
startingSeqNo = UNASSIGNED_SEQ_NO;
326332
}

server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -497,15 +497,16 @@ public void cleanFiles(
497497
try {
498498
if (indexShard.routingEntry().isPromotableToPrimary()) {
499499
store.cleanupAndVerify("recovery CleanFilesRequestHandler", sourceMetadata);
500+
final String translogUUID = Translog.createEmptyTranslog(
501+
indexShard.shardPath().resolveTranslog(),
502+
globalCheckpoint,
503+
shardId,
504+
indexShard.getPendingPrimaryTerm()
505+
);
506+
store.associateIndexWithNewTranslog(translogUUID);
507+
} else {
508+
indexShard.setGlobalCheckpointIfUnpromotable(globalCheckpoint);
500509
}
501-
final String translogUUID = Translog.createEmptyTranslog(
502-
indexShard.shardPath().resolveTranslog(),
503-
globalCheckpoint,
504-
shardId,
505-
indexShard.getPendingPrimaryTerm()
506-
);
507-
store.associateIndexWithNewTranslog(translogUUID);
508-
509510
if (indexShard.getRetentionLeases().leases().isEmpty()) {
510511
// if empty, may be a fresh IndexShard, so write an empty leases file to disk
511512
indexShard.persistRetentionLeases();

0 commit comments

Comments
 (0)