Skip to content

Commit 67e7126

Browse files
authored
Generalize mechanism to signal need for a mutable shard (#122241)
Introduce beforeIndexShardMutableOperation in IndexEventListener to allow plugins to react when a mutable shard is needed. Add IndexShard#ensureMutable to let consumers signal this need and be notified when the shard is ready for mutable operations. Relates ES-10787
1 parent e082ad5 commit 67e7126

File tree

5 files changed

+53
-36
lines changed

5 files changed

+53
-36
lines changed

server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.elasticsearch.action.index.IndexRequest;
2424
import org.elasticsearch.action.index.IndexResponse;
2525
import org.elasticsearch.action.support.ActionFilters;
26-
import org.elasticsearch.action.support.RefCountingListener;
2726
import org.elasticsearch.action.support.replication.PostWriteRefresh;
2827
import org.elasticsearch.action.support.replication.TransportReplicationAction;
2928
import org.elasticsearch.action.support.replication.TransportWriteAction;
@@ -152,12 +151,7 @@ protected void shardOperationOnPrimary(
152151
IndexShard primary,
153152
ActionListener<PrimaryResult<BulkShardRequest, BulkShardResponse>> listener
154153
) {
155-
final ActionListener<Void> wrappedListener = listener.delegateFailure(
156-
(l, ignored) -> super.shardOperationOnPrimary(request, primary, l)
157-
);
158-
try (var preBulkProceedListeners = new RefCountingListener(wrappedListener)) {
159-
primary.getIndexingOperationListener().preBulkOnPrimary(primary, () -> preBulkProceedListeners.acquire());
160-
}
154+
primary.ensureMutable(listener.delegateFailure((l, ignored) -> super.shardOperationOnPrimary(request, primary, l)));
161155
}
162156

163157
@Override

server/src/main/java/org/elasticsearch/index/CompositeIndexEventListener.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,24 @@ public void indexShardStateChanged(
137137
}
138138
}
139139

140+
@Override
141+
public void beforeIndexShardMutableOperation(IndexShard indexShard, ActionListener<Void> listener) {
142+
iterateBeforeIndexShardMutableOperation(indexShard, listener.delegateResponse((l, e) -> {
143+
logger.warn(() -> format("%s failed to invoke the listener before ensuring shard mutability", indexShard.shardId()), e);
144+
l.onFailure(e);
145+
}));
146+
}
147+
148+
private void iterateBeforeIndexShardMutableOperation(IndexShard indexShard, ActionListener<Void> outerListener) {
149+
callListeners(
150+
indexShard,
151+
listeners.stream()
152+
.map(iel -> (Consumer<ActionListener<Void>>) (l) -> iel.beforeIndexShardMutableOperation(indexShard, l))
153+
.iterator(),
154+
outerListener
155+
);
156+
}
157+
140158
@Override
141159
public void beforeIndexCreated(Index index, Settings indexSettings) {
142160
for (IndexEventListener listener : listeners) {

server/src/main/java/org/elasticsearch/index/shard/IndexEventListener.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,25 @@ default void indexShardStateChanged(
8686
@Nullable String reason
8787
) {}
8888

89+
/**
90+
* Invoked before a shard performs a mutable operation. Mutable operations include, but are not limited to:
91+
* <ul>
92+
* <li>Indexing operations</li>
93+
* <li>Force merges</li>
94+
* </ul>
95+
*
96+
* This method ensures that the shard is ready to accept mutating operations. This is particularly useful in cases
97+
* where the shard initializes its internal {@link org.elasticsearch.index.engine.Engine} lazily, which may take some time.
98+
* The provided listener should be notified once the shard is prepared to proceed with the operation.
99+
* This can be called from a transport thread and therefore the function should be lightweight and not block the thread.
100+
*
101+
* @param indexShard the shard where the mutable operation will be performed
102+
* @param listener the listener to be notified when the shard is ready to proceed
103+
*/
104+
default void beforeIndexShardMutableOperation(IndexShard indexShard, ActionListener<Void> listener) {
105+
listener.onResponse(null);
106+
}
107+
89108
/**
90109
* Called before the index gets created. Note that this is also called
91110
* when the index is created on data nodes

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -451,10 +451,6 @@ public BulkOperationListener getBulkOperationListener() {
451451
return this.bulkOperationListener;
452452
}
453453

454-
public IndexingOperationListener getIndexingOperationListener() {
455-
return this.indexingOperationListeners;
456-
}
457-
458454
public ShardIndexWarmerService warmerService() {
459455
return this.shardWarmerService;
460456
}
@@ -4507,4 +4503,19 @@ public void waitForPrimaryTermAndGeneration(long primaryTerm, long segmentGenera
45074503
);
45084504
}
45094505

4506+
/**
4507+
* Ensures that the shard is ready to perform mutable operations.
4508+
* This method is particularly useful when the shard initializes its internal
4509+
* {@link org.elasticsearch.index.engine.Engine} lazily, as it may take some time before becoming mutable.
4510+
*
4511+
* The provided listener will be notified once the shard is ready for mutating operations.
4512+
*
4513+
* @param listener the listener to be notified when the shard is mutable
4514+
*/
4515+
public void ensureMutable(ActionListener<Void> listener) {
4516+
indexEventListener.beforeIndexShardMutableOperation(this, listener.delegateFailure((l, unused) -> {
4517+
// TODO ES-10826: Acquire ref to engine and retry if it's immutable again?
4518+
l.onResponse(null);
4519+
}));
4520+
}
45104521
}

server/src/main/java/org/elasticsearch/index/shard/IndexingOperationListener.java

Lines changed: 0 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,9 @@
99
package org.elasticsearch.index.shard;
1010

1111
import org.apache.logging.log4j.Logger;
12-
import org.elasticsearch.action.ActionListener;
1312
import org.elasticsearch.index.engine.Engine;
1413

1514
import java.util.List;
16-
import java.util.function.Supplier;
1715

1816
/**
1917
* An indexing listener for indexing, delete, events.
@@ -64,18 +62,6 @@ default void postDelete(ShardId shardId, Engine.Delete delete, Engine.DeleteResu
6462
*/
6563
default void postDelete(ShardId shardId, Engine.Delete delete, Exception ex) {}
6664

67-
/**
68-
* Called when a {@link org.elasticsearch.action.bulk.TransportShardBulkAction} is about to perform index and/or delete operation(s)
69-
* on a primary shard.
70-
*
71-
* This is called from a transport thread and therefore the function should be lightweight and not block the thread. The acquired
72-
* listener(s) can be asynchronously completed on another thread at a later time.
73-
*
74-
* @param indexShard the shard the bulk is about to be performed on
75-
* @param proceedListenerSupplier call this immediately to get a listener which must be completed so that the bulk can proceed.
76-
*/
77-
default void preBulkOnPrimary(IndexShard indexShard, Supplier<ActionListener<Void>> proceedListenerSupplier) {}
78-
7965
/**
8066
* A Composite listener that multiplexes calls to each of the listeners methods.
8167
*/
@@ -163,16 +149,5 @@ public void postDelete(ShardId shardId, Engine.Delete delete, Exception ex) {
163149
}
164150
}
165151
}
166-
167-
@Override
168-
public void preBulkOnPrimary(IndexShard indexShard, Supplier<ActionListener<Void>> proceedListenerSupplier) {
169-
for (IndexingOperationListener listener : listeners) {
170-
try {
171-
listener.preBulkOnPrimary(indexShard, proceedListenerSupplier);
172-
} catch (Exception e) {
173-
logger.warn(() -> "preBulkOnPrimary listener [" + listener + "] failed", e);
174-
}
175-
}
176-
}
177152
}
178153
}

0 commit comments

Comments
 (0)