Skip to content

Commit 2c59d76

Browse files
committed
Introduce permitAcquired in ensureMutable
Relates ES-11159
1 parent e397ed3 commit 2c59d76

File tree

6 files changed

+18
-12
lines changed

6 files changed

+18
-12
lines changed

server/src/main/java/org/elasticsearch/action/admin/indices/forcemerge/TransportForceMergeAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ protected void shardOperation(
100100
SubscribableListener.<IndexShard>newForked(l -> {
101101
IndexShard indexShard = indicesService.indexServiceSafe(shardRouting.shardId().getIndex())
102102
.getShard(shardRouting.shardId().id());
103-
indexShard.ensureMutable(l.map(unused -> indexShard));
103+
indexShard.ensureMutable(l.map(unused -> indexShard), false);
104104
}).<EmptyResult>andThen((l, indexShard) -> {
105105
threadPool.executor(ThreadPool.Names.FORCE_MERGE).execute(ActionRunnable.supply(l, () -> {
106106
indexShard.forceMerge(request);

server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ protected void shardOperationOnPrimary(
161161
IndexShard primary,
162162
ActionListener<PrimaryResult<BulkShardRequest, BulkShardResponse>> listener
163163
) {
164-
primary.ensureMutable(listener.delegateFailure((l, ignored) -> super.shardOperationOnPrimary(request, primary, l)));
164+
primary.ensureMutable(listener.delegateFailure((l, ignored) -> super.shardOperationOnPrimary(request, primary, l)), true);
165165
}
166166

167167
@Override

server/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,7 @@ protected void shardOperation(final UpdateRequest request, final ActionListener<
209209
var executor = executor(indexService);
210210
assert ThreadPool.assertCurrentThreadPool(Names.SYSTEM_WRITE, Names.WRITE);
211211

212-
SubscribableListener.newForked(indexShard::ensureMutable)
212+
SubscribableListener.<Void>newForked((l) -> indexShard.ensureMutable(l, false))
213213
// Make sure to fork back to a `write` thread pool if necessary
214214
.<UpdateHelper.Result>andThen(executor, threadPool.getThreadContext(), (l, unused) -> ActionListener.completeWith(l, () -> {
215215
assert ThreadPool.assertCurrentThreadPool(Names.SYSTEM_WRITE, Names.WRITE);

server/src/main/java/org/elasticsearch/index/CompositeIndexEventListener.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -138,18 +138,22 @@ public void indexShardStateChanged(
138138
}
139139

140140
@Override
141-
public void beforeIndexShardMutableOperation(IndexShard indexShard, ActionListener<Void> listener) {
142-
iterateBeforeIndexShardMutableOperation(indexShard, listener.delegateResponse((l, e) -> {
141+
public void beforeIndexShardMutableOperation(IndexShard indexShard, boolean permitAcquired, ActionListener<Void> listener) {
142+
iterateBeforeIndexShardMutableOperation(indexShard, permitAcquired, listener.delegateResponse((l, e) -> {
143143
logger.warn(() -> format("%s failed to invoke the listener before ensuring shard mutability", indexShard.shardId()), e);
144144
l.onFailure(e);
145145
}));
146146
}
147147

148-
private void iterateBeforeIndexShardMutableOperation(IndexShard indexShard, ActionListener<Void> outerListener) {
148+
private void iterateBeforeIndexShardMutableOperation(
149+
IndexShard indexShard,
150+
boolean permitAcquired,
151+
ActionListener<Void> outerListener
152+
) {
149153
callListeners(
150154
indexShard,
151155
listeners.stream()
152-
.map(iel -> (Consumer<ActionListener<Void>>) (l) -> iel.beforeIndexShardMutableOperation(indexShard, l))
156+
.map(iel -> (Consumer<ActionListener<Void>>) (l) -> iel.beforeIndexShardMutableOperation(indexShard, permitAcquired, l))
153157
.iterator(),
154158
outerListener
155159
);

server/src/main/java/org/elasticsearch/index/shard/IndexEventListener.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -91,17 +91,19 @@ default void indexShardStateChanged(
9191
* <ul>
9292
* <li>Indexing operations</li>
9393
* <li>Force merges</li>
94+
* <li>Pre-updates</li>
9495
* </ul>
9596
*
9697
* This method ensures that the shard is ready to accept mutating operations. This is particularly useful in cases
9798
* where the shard initializes its internal {@link org.elasticsearch.index.engine.Engine} lazily, which may take some time.
9899
* The provided listener should be notified once the shard is prepared to proceed with the operation.
99100
* This can be called from a transport thread and therefore the function should be lightweight and not block the thread.
100101
*
101-
* @param indexShard the shard where the mutable operation will be performed
102-
* @param listener the listener to be notified when the shard is ready to proceed
102+
* @param indexShard the shard where the mutable operation will be performed
103+
* @param permitAcquired whether the operation has acquired an operation permit on the shard
104+
* @param listener the listener to be notified when the shard is ready to proceed
103105
*/
104-
default void beforeIndexShardMutableOperation(IndexShard indexShard, ActionListener<Void> listener) {
106+
default void beforeIndexShardMutableOperation(IndexShard indexShard, boolean permitAcquired, ActionListener<Void> listener) {
105107
listener.onResponse(null);
106108
}
107109

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4717,8 +4717,8 @@ public void waitForPrimaryTermAndGeneration(long primaryTerm, long segmentGenera
47174717
*
47184718
* @param listener the listener to be notified when the shard is mutable
47194719
*/
4720-
public void ensureMutable(ActionListener<Void> listener) {
4721-
indexEventListener.beforeIndexShardMutableOperation(this, listener.delegateFailure((l, unused) -> {
4720+
public void ensureMutable(ActionListener<Void> listener, boolean permitAcquired) {
4721+
indexEventListener.beforeIndexShardMutableOperation(this, permitAcquired, listener.delegateFailure((l, unused) -> {
47224722
// TODO ES-10826: Acquire ref to engine and retry if it's immutable again?
47234723
l.onResponse(null);
47244724
}));

0 commit comments

Comments
 (0)