Skip to content

Commit 9153bd5

Browse files
authored
Make sure EngineConfig can access whether shard is primary (#92326)
Make sure engine factories can access the `RecoveryState#primary` flag, so plugins can create different engines for primary and replica indices.
1 parent 625879c commit 9153bd5

File tree

9 files changed

+47
-16
lines changed

9 files changed

+47
-16
lines changed

server/src/internalClusterTest/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,8 @@ EngineConfig engineConfigWithLargerIndexingMemory(EngineConfig config) {
7979
config.getSnapshotCommitSupplier(),
8080
config.getLeafSorter(),
8181
config.getRelativeTimeInNanosSupplier(),
82-
config.getIndexCommitListener()
82+
config.getIndexCommitListener(),
83+
config.isRecoveringAsPrimary()
8384
);
8485
}
8586

server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,8 @@ public Supplier<RetentionLeases> retentionLeasesSupplier() {
128128
@Nullable
129129
private final Engine.IndexCommitListener indexCommitListener;
130130

131+
private final boolean recoveringAsPrimary;
132+
131133
/**
132134
* Creates a new {@link org.elasticsearch.index.engine.EngineConfig}
133135
*/
@@ -156,7 +158,8 @@ public EngineConfig(
156158
IndexStorePlugin.SnapshotCommitSupplier snapshotCommitSupplier,
157159
Comparator<LeafReader> leafSorter,
158160
LongSupplier relativeTimeInNanosSupplier,
159-
Engine.IndexCommitListener indexCommitListener
161+
Engine.IndexCommitListener indexCommitListener,
162+
boolean recoveringAsPrimary
160163
) {
161164
this.shardId = shardId;
162165
this.indexSettings = indexSettings;
@@ -198,6 +201,7 @@ public EngineConfig(
198201
this.leafSorter = leafSorter;
199202
this.relativeTimeInNanosSupplier = relativeTimeInNanosSupplier;
200203
this.indexCommitListener = indexCommitListener;
204+
this.recoveringAsPrimary = recoveringAsPrimary;
201205
}
202206

203207
/**
@@ -405,4 +409,12 @@ public LongSupplier getRelativeTimeInNanosSupplier() {
405409
public Engine.IndexCommitListener getIndexCommitListener() {
406410
return indexCommitListener;
407411
}
412+
413+
/**
414+
* Represents the primary state only in case when a recovery starts.
415+
* IMPORTANT: The flag is a temporary solution and should NOT be used outside of Stateless.
416+
*/
417+
public boolean isRecoveringAsPrimary() {
418+
return recoveringAsPrimary;
419+
}
408420
}

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3274,7 +3274,8 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) {
32743274
snapshotCommitSupplier,
32753275
isTimeseriesIndex ? TIMESERIES_LEAF_READERS_SORTER : null,
32763276
relativeTimeInNanosSupplier,
3277-
indexCommitListener
3277+
indexCommitListener,
3278+
recoveryState != null && recoveryState.getPrimary()
32783279
);
32793280
}
32803281

server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3590,7 +3590,8 @@ public void testRecoverFromForeignTranslog() throws IOException {
35903590
IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER,
35913591
null,
35923592
config.getRelativeTimeInNanosSupplier(),
3593-
null
3593+
null,
3594+
false
35943595
);
35953596
expectThrows(EngineCreationFailureException.class, () -> new InternalEngine(brokenConfig));
35963597

@@ -7260,7 +7261,8 @@ public void testNotWarmUpSearcherInEngineCtor() throws Exception {
72607261
config.getSnapshotCommitSupplier(),
72617262
config.getLeafSorter(),
72627263
config.getRelativeTimeInNanosSupplier(),
7263-
config.getIndexCommitListener()
7264+
config.getIndexCommitListener(),
7265+
config.isRecoveringAsPrimary()
72647266
);
72657267
try (InternalEngine engine = createEngine(configWithWarmer)) {
72667268
assertThat(warmedUpReaders, empty());

server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2900,11 +2900,15 @@ public void testRecoverFromTranslog() throws IOException {
29002900
}
29012901

29022902
public void testShardActiveDuringInternalRecovery() throws IOException {
2903-
IndexShard shard = newStartedShard(true);
2903+
boolean isPrimary = randomBoolean();
2904+
IndexShard shard = newStartedShard(isPrimary);
29042905
indexDoc(shard, "_doc", "0");
29052906
shard = reinitShard(shard);
29062907
DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
2907-
shard.markAsRecovering("for testing", new RecoveryState(shard.routingEntry(), localNode, null));
2908+
DiscoveryNode sourceNode = isPrimary
2909+
? null
2910+
: new DiscoveryNode("bar", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
2911+
shard.markAsRecovering("for testing", new RecoveryState(shard.routingEntry(), localNode, sourceNode));
29082912
// Shard is still inactive since we haven't started recovering yet
29092913
assertFalse(shard.isActive());
29102914
shard.prepareForIndexRecovery();
@@ -2914,6 +2918,8 @@ public void testShardActiveDuringInternalRecovery() throws IOException {
29142918
shard.openEngineAndRecoverFromTranslog();
29152919
// Shard should now be active since we did recover:
29162920
assertTrue(shard.isActive());
2921+
// Recovery state should be propagated to the engine
2922+
assertEquals(isPrimary, shard.getEngine().config().isRecoveringAsPrimary());
29172923
closeShards(shard);
29182924
}
29192925

@@ -4520,7 +4526,8 @@ public void testCloseShardWhileEngineIsWarming() throws Exception {
45204526
IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER,
45214527
config.getLeafSorter(),
45224528
config.getRelativeTimeInNanosSupplier(),
4523-
config.getIndexCommitListener()
4529+
config.getIndexCommitListener(),
4530+
config.isRecoveringAsPrimary()
45244531
);
45254532
return new InternalEngine(configWithWarmer);
45264533
});

server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,8 @@ public void onFailedEngine(String reason, @Nullable Exception e) {
153153
IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER,
154154
null,
155155
System::nanoTime,
156-
null
156+
null,
157+
false
157158
);
158159
engine = new InternalEngine(config);
159160
engine.recoverFromTranslog((e, s) -> 0, Long.MAX_VALUE);

server/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -402,7 +402,8 @@ EngineConfig configWithRefreshListener(EngineConfig config, ReferenceManager.Ref
402402
config.getSnapshotCommitSupplier(),
403403
config.getLeafSorter(),
404404
config.getRelativeTimeInNanosSupplier(),
405-
config.getIndexCommitListener()
405+
config.getIndexCommitListener(),
406+
config.isRecoveringAsPrimary()
406407
);
407408
}
408409

test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,8 @@ public EngineConfig copy(EngineConfig config, LongSupplier globalCheckpointSuppl
275275
config.getSnapshotCommitSupplier(),
276276
config.getLeafSorter(),
277277
config.getRelativeTimeInNanosSupplier(),
278-
config.getIndexCommitListener()
278+
config.getIndexCommitListener(),
279+
config.isRecoveringAsPrimary()
279280
);
280281
}
281282

@@ -305,7 +306,8 @@ public EngineConfig copy(EngineConfig config, Analyzer analyzer) {
305306
config.getSnapshotCommitSupplier(),
306307
config.getLeafSorter(),
307308
config.getRelativeTimeInNanosSupplier(),
308-
config.getIndexCommitListener()
309+
config.getIndexCommitListener(),
310+
config.isRecoveringAsPrimary()
309311
);
310312
}
311313

@@ -335,7 +337,8 @@ public EngineConfig copy(EngineConfig config, MergePolicy mergePolicy) {
335337
config.getSnapshotCommitSupplier(),
336338
config.getLeafSorter(),
337339
config.getRelativeTimeInNanosSupplier(),
338-
config.getIndexCommitListener()
340+
config.getIndexCommitListener(),
341+
config.isRecoveringAsPrimary()
339342
);
340343
}
341344

@@ -858,7 +861,8 @@ public EngineConfig config(
858861
IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER,
859862
null,
860863
System::nanoTime,
861-
indexCommitListener
864+
indexCommitListener,
865+
false
862866
);
863867
}
864868

@@ -896,7 +900,8 @@ protected EngineConfig config(EngineConfig config, Store store, Path translogPat
896900
config.getSnapshotCommitSupplier(),
897901
config.getLeafSorter(),
898902
config.getRelativeTimeInNanosSupplier(),
899-
config.getIndexCommitListener()
903+
config.getIndexCommitListener(),
904+
config.isRecoveringAsPrimary()
900905
);
901906
}
902907

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -288,7 +288,8 @@ public void onFailedEngine(String reason, Exception e) {
288288
IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER,
289289
null,
290290
System::nanoTime,
291-
null
291+
null,
292+
false
292293
);
293294
}
294295

0 commit comments

Comments
 (0)