Skip to content

Commit dc7a8e6

Browse files
Enable a custom ReplicationTracker override per index (#92353)
Necessary to disable replication via a plugin for stateless.
1 parent 9153bd5 commit dc7a8e6

File tree

7 files changed

+155
-9
lines changed

7 files changed

+155
-9
lines changed

server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import org.elasticsearch.index.engine.NoOpEngine;
5252
import org.elasticsearch.index.flush.FlushStats;
5353
import org.elasticsearch.index.mapper.SourceToParse;
54+
import org.elasticsearch.index.seqno.ReplicationTracker;
5455
import org.elasticsearch.index.seqno.RetentionLeaseSyncer;
5556
import org.elasticsearch.index.seqno.SequenceNumbers;
5657
import org.elasticsearch.index.translog.TestTranslog;
@@ -661,7 +662,8 @@ public static final IndexShard newIndexShard(
661662
cbs,
662663
IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER,
663664
System::nanoTime,
664-
null
665+
null,
666+
ReplicationTracker.DEFAULT_FACTORY
665667
);
666668
}
667669

server/src/main/java/org/elasticsearch/index/IndexModule.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.elasticsearch.index.mapper.IdFieldMapper;
4545
import org.elasticsearch.index.mapper.MapperRegistry;
4646
import org.elasticsearch.index.mapper.MapperService;
47+
import org.elasticsearch.index.seqno.ReplicationTracker;
4748
import org.elasticsearch.index.shard.IndexEventListener;
4849
import org.elasticsearch.index.shard.IndexingOperationListener;
4950
import org.elasticsearch.index.shard.SearchOperationListener;
@@ -167,6 +168,8 @@ public interface DirectoryWrapper extends CheckedFunction<Directory, Directory,
167168
private final Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories;
168169
private final SetOnce<Engine.IndexCommitListener> indexCommitListener = new SetOnce<>();
169170

171+
private final SetOnce<ReplicationTracker.Factory> replicationTrackerFactory = new SetOnce<>();
172+
170173
/**
171174
* Construct the index module for the index with the specified index settings. The index module contains extension points for plugins
172175
* via {@link org.elasticsearch.plugins.Plugin#onIndexModule(IndexModule)}.
@@ -376,6 +379,11 @@ public void setIndexCommitListener(Engine.IndexCommitListener listener) {
376379
this.indexCommitListener.set(Objects.requireNonNull(listener));
377380
}
378381

382+
public void setReplicationTrackerFactory(ReplicationTracker.Factory factory) {
383+
ensureNotFrozen();
384+
this.replicationTrackerFactory.set(factory);
385+
}
386+
379387
IndexEventListener freeze() { // pkg private for testing
380388
if (this.frozen.compareAndSet(false, true)) {
381389
return new CompositeIndexEventListener(indexSettings, indexEventListeners);
@@ -524,7 +532,8 @@ public IndexService newIndexService(
524532
recoveryStateFactory,
525533
indexFoldersDeletionListener,
526534
snapshotCommitSupplier,
527-
indexCommitListener.get()
535+
indexCommitListener.get(),
536+
Objects.requireNonNullElse(replicationTrackerFactory.get(), ReplicationTracker.DEFAULT_FACTORY)
528537
);
529538
success = true;
530539
return indexService;

server/src/main/java/org/elasticsearch/index/IndexService.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import org.elasticsearch.index.mapper.NodeMappingStats;
5555
import org.elasticsearch.index.query.SearchExecutionContext;
5656
import org.elasticsearch.index.query.SearchIndexNameMatcher;
57+
import org.elasticsearch.index.seqno.ReplicationTracker;
5758
import org.elasticsearch.index.seqno.RetentionLeaseSyncer;
5859
import org.elasticsearch.index.shard.IndexEventListener;
5960
import org.elasticsearch.index.shard.IndexShard;
@@ -146,6 +147,8 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
146147
private final Supplier<Sort> indexSortSupplier;
147148
private final ValuesSourceRegistry valuesSourceRegistry;
148149

150+
private final ReplicationTracker.Factory replicationTrackerFactory;
151+
149152
public IndexService(
150153
IndexSettings indexSettings,
151154
IndexCreationContext indexCreationContext,
@@ -177,7 +180,8 @@ public IndexService(
177180
IndexStorePlugin.RecoveryStateFactory recoveryStateFactory,
178181
IndexStorePlugin.IndexFoldersDeletionListener indexFoldersDeletionListener,
179182
IndexStorePlugin.SnapshotCommitSupplier snapshotCommitSupplier,
180-
Engine.IndexCommitListener indexCommitListener
183+
Engine.IndexCommitListener indexCommitListener,
184+
ReplicationTracker.Factory replicationTrackerFactory
181185
) {
182186
super(indexSettings);
183187
this.allowExpensiveQueries = allowExpensiveQueries;
@@ -252,6 +256,7 @@ public IndexService(
252256
this.globalCheckpointTask = new AsyncGlobalCheckpointTask(this);
253257
this.retentionLeaseSyncTask = new AsyncRetentionLeaseSyncTask(this);
254258
}
259+
this.replicationTrackerFactory = replicationTrackerFactory;
255260
updateFsyncTaskIfNecessary();
256261
}
257262

@@ -520,7 +525,8 @@ public synchronized IndexShard createShard(
520525
circuitBreakerService,
521526
snapshotCommitSupplier,
522527
System::nanoTime,
523-
indexCommitListener
528+
indexCommitListener,
529+
replicationTrackerFactory
524530
);
525531
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
526532
eventListener.afterIndexShardCreated(indexShard);

server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@
5252
import java.util.stream.LongStream;
5353
import java.util.stream.Stream;
5454

55+
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
56+
5557
/**
5658
* This class is responsible for tracking the replication group with its progress and safety markers (local and global checkpoints).
5759
*
@@ -65,6 +67,28 @@
6567
*/
6668
public class ReplicationTracker extends AbstractIndexShardComponent implements LongSupplier {
6769

70+
public static final ReplicationTracker.Factory DEFAULT_FACTORY = (
71+
shardId,
72+
allocationId,
73+
indexSettings,
74+
operationPrimaryTerm,
75+
onGlobalCheckpointUpdated,
76+
currentTimeMillisSupplier,
77+
onSyncRetentionLeases,
78+
safeCommitInfoSupplier,
79+
onReplicationGroupUpdated) -> new ReplicationTracker(
80+
shardId,
81+
allocationId,
82+
indexSettings,
83+
operationPrimaryTerm,
84+
UNASSIGNED_SEQ_NO,
85+
onGlobalCheckpointUpdated,
86+
currentTimeMillisSupplier,
87+
onSyncRetentionLeases,
88+
safeCommitInfoSupplier,
89+
onReplicationGroupUpdated
90+
);
91+
6892
/**
6993
* The allocation ID for the shard to which this tracker is a component of.
7094
*/
@@ -1627,4 +1651,22 @@ public int hashCode() {
16271651
return result;
16281652
}
16291653
}
1654+
1655+
/**
1656+
* Factory interface used by {@link org.elasticsearch.index.IndexModule#setReplicationTrackerFactory(Factory)} to enable custom
1657+
* overrides of this class.
1658+
*/
1659+
public interface Factory {
1660+
ReplicationTracker create(
1661+
ShardId shardId,
1662+
String allocationId,
1663+
IndexSettings indexSettings,
1664+
long operationPrimaryTerm,
1665+
LongConsumer onGlobalCheckpointUpdated,
1666+
LongSupplier currentTimeMillisSupplier,
1667+
BiConsumer<RetentionLeases, ActionListener<ReplicationResponse>> onSyncRetentionLeases,
1668+
Supplier<SafeCommitInfo> safeCommitInfoSupplier,
1669+
Consumer<ReplicationGroup> onReplicationGroupUpdated
1670+
);
1671+
}
16301672
}

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -313,7 +313,8 @@ public IndexShard(
313313
final CircuitBreakerService circuitBreakerService,
314314
final IndexStorePlugin.SnapshotCommitSupplier snapshotCommitSupplier,
315315
final LongSupplier relativeTimeInNanosSupplier,
316-
final Engine.IndexCommitListener indexCommitListener
316+
final Engine.IndexCommitListener indexCommitListener,
317+
final ReplicationTracker.Factory replicationTrackerFactory
317318
) throws IOException {
318319
super(shardRouting.shardId(), indexSettings);
319320
assert shardRouting.initializing();
@@ -361,12 +362,11 @@ public IndexShard(
361362
this.pendingPrimaryTerm = primaryTerm;
362363
this.globalCheckpointListeners = new GlobalCheckpointListeners(shardId, threadPool.scheduler(), logger);
363364
this.pendingReplicationActions = new PendingReplicationActions(shardId, threadPool);
364-
this.replicationTracker = new ReplicationTracker(
365+
this.replicationTracker = replicationTrackerFactory.create(
365366
shardId,
366367
aId,
367368
indexSettings,
368369
primaryTerm,
369-
UNASSIGNED_SEQ_NO,
370370
globalCheckpointListeners::globalCheckpointUpdated,
371371
threadPool::absoluteTimeInMillis,
372372
(retentionLeases, listener) -> retentionLeaseSyncer.sync(shardId, aId, getPendingPrimaryTerm(), retentionLeases, listener),
@@ -3774,7 +3774,7 @@ EngineFactory getEngineFactory() {
37743774
}
37753775

37763776
// for tests
3777-
ReplicationTracker getReplicationTracker() {
3777+
public ReplicationTracker getReplicationTracker() {
37783778
return replicationTracker;
37793779
}
37803780

server/src/test/java/org/elasticsearch/index/IndexModuleTests.java

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,10 @@
2424
import org.apache.lucene.tests.index.AssertingDirectoryReader;
2525
import org.apache.lucene.util.SetOnce.AlreadySetException;
2626
import org.elasticsearch.Version;
27+
import org.elasticsearch.action.ActionListener;
2728
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
2829
import org.elasticsearch.action.support.PlainActionFuture;
30+
import org.elasticsearch.action.support.replication.ReplicationResponse;
2931
import org.elasticsearch.cluster.metadata.IndexMetadata;
3032
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
3133
import org.elasticsearch.cluster.node.DiscoveryNode;
@@ -58,14 +60,18 @@
5860
import org.elasticsearch.index.engine.EngineTestCase;
5961
import org.elasticsearch.index.engine.InternalEngine;
6062
import org.elasticsearch.index.engine.InternalEngineFactory;
63+
import org.elasticsearch.index.engine.SafeCommitInfo;
6164
import org.elasticsearch.index.fielddata.IndexFieldDataCache;
6265
import org.elasticsearch.index.mapper.MapperRegistry;
6366
import org.elasticsearch.index.mapper.ParsedDocument;
6467
import org.elasticsearch.index.mapper.Uid;
68+
import org.elasticsearch.index.seqno.ReplicationTracker;
6569
import org.elasticsearch.index.seqno.RetentionLeaseSyncer;
70+
import org.elasticsearch.index.seqno.RetentionLeases;
6671
import org.elasticsearch.index.shard.IndexEventListener;
6772
import org.elasticsearch.index.shard.IndexShard;
6873
import org.elasticsearch.index.shard.IndexingOperationListener;
74+
import org.elasticsearch.index.shard.ReplicationGroup;
6975
import org.elasticsearch.index.shard.SearchOperationListener;
7076
import org.elasticsearch.index.shard.ShardId;
7177
import org.elasticsearch.index.shard.ShardPath;
@@ -106,10 +112,16 @@
106112
import java.util.concurrent.atomic.AtomicBoolean;
107113
import java.util.concurrent.atomic.AtomicLong;
108114
import java.util.concurrent.atomic.AtomicReference;
115+
import java.util.function.BiConsumer;
116+
import java.util.function.Consumer;
117+
import java.util.function.LongConsumer;
118+
import java.util.function.LongSupplier;
119+
import java.util.function.Supplier;
109120

110121
import static java.util.Collections.emptyMap;
111122
import static java.util.Collections.singletonMap;
112123
import static org.elasticsearch.index.IndexService.IndexCreationContext.CREATE_INDEX;
124+
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
113125
import static org.hamcrest.Matchers.containsString;
114126
import static org.hamcrest.Matchers.empty;
115127
import static org.hamcrest.Matchers.equalTo;
@@ -432,6 +444,7 @@ public void testFrozen() {
432444
assertEquals(msg, expectThrows(IllegalStateException.class, () -> module.forceQueryCacheProvider(null)).getMessage());
433445
assertEquals(msg, expectThrows(IllegalStateException.class, () -> module.setDirectoryWrapper(null)).getMessage());
434446
assertEquals(msg, expectThrows(IllegalStateException.class, () -> module.setIndexCommitListener(null)).getMessage());
447+
assertEquals(msg, expectThrows(IllegalStateException.class, () -> module.setReplicationTrackerFactory(null)).getMessage());
435448
}
436449

437450
public void testSetupUnknownSimilarity() {
@@ -718,6 +731,79 @@ public void onIndexCommitDelete(ShardId shardId, IndexCommit deletedCommit) {
718731
}
719732
}
720733

734+
public void testCustomReplicationTrackerFactory() throws IOException {
735+
IndexModule module = new IndexModule(
736+
indexSettings,
737+
emptyAnalysisRegistry,
738+
InternalEngine::new,
739+
Collections.emptyMap(),
740+
() -> true,
741+
indexNameExpressionResolver,
742+
Collections.emptyMap()
743+
);
744+
745+
class CustomReplicationTracker extends ReplicationTracker {
746+
CustomReplicationTracker(
747+
ShardId shardId,
748+
String allocationId,
749+
IndexSettings indexSettings,
750+
long operationPrimaryTerm,
751+
long globalCheckpoint,
752+
LongConsumer onGlobalCheckpointUpdated,
753+
LongSupplier currentTimeMillisSupplier,
754+
BiConsumer<RetentionLeases, ActionListener<ReplicationResponse>> onSyncRetentionLeases,
755+
Supplier<SafeCommitInfo> safeCommitInfoSupplier,
756+
Consumer<ReplicationGroup> onReplicationGroupUpdated
757+
) {
758+
super(
759+
shardId,
760+
allocationId,
761+
indexSettings,
762+
operationPrimaryTerm,
763+
globalCheckpoint,
764+
onGlobalCheckpointUpdated,
765+
currentTimeMillisSupplier,
766+
onSyncRetentionLeases,
767+
safeCommitInfoSupplier,
768+
onReplicationGroupUpdated
769+
);
770+
}
771+
}
772+
module.setReplicationTrackerFactory(
773+
(
774+
shardId,
775+
allocationId,
776+
indexSettings,
777+
operationPrimaryTerm,
778+
onGlobalCheckpointUpdated,
779+
currentTimeMillisSupplier,
780+
onSyncRetentionLeases,
781+
safeCommitInfoSupplier,
782+
onReplicationGroupUpdated) -> new CustomReplicationTracker(
783+
shardId,
784+
allocationId,
785+
indexSettings,
786+
operationPrimaryTerm,
787+
UNASSIGNED_SEQ_NO,
788+
onGlobalCheckpointUpdated,
789+
currentTimeMillisSupplier,
790+
onSyncRetentionLeases,
791+
safeCommitInfoSupplier,
792+
onReplicationGroupUpdated
793+
)
794+
);
795+
final IndexService indexService = newIndexService(module);
796+
ShardId shardId = new ShardId("index", UUIDs.randomBase64UUID(random()), 0);
797+
ShardRouting shardRouting = ShardRouting.newUnassigned(
798+
shardId,
799+
true,
800+
RecoverySource.EmptyStoreRecoverySource.INSTANCE,
801+
new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null)
802+
).initialize("_node_id", null, -1);
803+
IndexShard indexShard = indexService.createShard(shardRouting, s -> {}, RetentionLeaseSyncer.EMPTY);
804+
assertThat(indexShard.getReplicationTracker(), instanceOf(CustomReplicationTracker.class));
805+
}
806+
721807
private ShardRouting createInitializedShardRouting() {
722808
ShardRouting shard = ShardRouting.newUnassigned(
723809
new ShardId("test", "_na_", 0),

test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -504,7 +504,8 @@ protected IndexShard newShard(
504504
breakerService,
505505
IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER,
506506
relativeTimeSupplier,
507-
null
507+
null,
508+
ReplicationTracker.DEFAULT_FACTORY
508509
);
509510
indexShard.addShardFailureCallback(DEFAULT_SHARD_FAILURE_HANDLER);
510511
success = true;

0 commit comments

Comments
 (0)