Skip to content

Commit 59b3f79

Browse files
authored
Move hollow hook from permits to bulk action (#120945)
We do not need the hook for unhollowing shards on all actions taking primary permits, but rather only one the one that can ingest into the engine. So moving the hook to the bulk action. Relates ES-10654
1 parent 952bf22 commit 59b3f79

File tree

6 files changed

+46
-39
lines changed

6 files changed

+46
-39
lines changed

server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.elasticsearch.action.index.IndexRequest;
2424
import org.elasticsearch.action.index.IndexResponse;
2525
import org.elasticsearch.action.support.ActionFilters;
26+
import org.elasticsearch.action.support.RefCountingListener;
2627
import org.elasticsearch.action.support.replication.PostWriteRefresh;
2728
import org.elasticsearch.action.support.replication.TransportReplicationAction;
2829
import org.elasticsearch.action.support.replication.TransportWriteAction;
@@ -145,6 +146,20 @@ protected BulkShardResponse newResponseInstance(StreamInput in) throws IOExcepti
145146
return new BulkShardResponse(in);
146147
}
147148

149+
@Override
150+
protected void shardOperationOnPrimary(
151+
BulkShardRequest request,
152+
IndexShard primary,
153+
ActionListener<PrimaryResult<BulkShardRequest, BulkShardResponse>> listener
154+
) {
155+
final ActionListener<Void> wrappedListener = listener.delegateFailure(
156+
(l, ignored) -> super.shardOperationOnPrimary(request, primary, l)
157+
);
158+
try (var preBulkProceedListeners = new RefCountingListener(wrappedListener)) {
159+
primary.getIndexingOperationListener().preBulkOnPrimary(primary, () -> preBulkProceedListeners.acquire());
160+
}
161+
}
162+
148163
@Override
149164
protected void dispatchedShardOperationOnPrimary(
150165
BulkShardRequest request,

server/src/main/java/org/elasticsearch/index/CompositeIndexEventListener.java

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import java.util.Iterator;
2929
import java.util.List;
3030
import java.util.function.Consumer;
31-
import java.util.function.Supplier;
3231

3332
import static org.elasticsearch.core.Strings.format;
3433

@@ -351,15 +350,4 @@ public void afterFilesRestoredFromRepository(IndexShard indexShard) {
351350
}
352351
}
353352

354-
@Override
355-
public void onAcquirePrimaryOperationPermit(IndexShard indexShard, Supplier<ActionListener<Void>> onPermitAcquiredListenerSupplier) {
356-
for (IndexEventListener listener : listeners) {
357-
try {
358-
listener.onAcquirePrimaryOperationPermit(indexShard, onPermitAcquiredListenerSupplier);
359-
} catch (Exception e) {
360-
logger.warn(() -> "[" + indexShard.shardId() + "] failed to invoke the listener on acquiring a primary permit", e);
361-
throw e;
362-
}
363-
}
364-
}
365353
}

server/src/main/java/org/elasticsearch/index/shard/IndexEventListener.java

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717
import org.elasticsearch.index.IndexSettings;
1818
import org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason;
1919

20-
import java.util.function.Supplier;
21-
2220
/**
2321
* An index event listener is the primary extension point for plugins and build-in services
2422
* to react / listen to per-index and per-shard events. These listeners are registered per-index
@@ -192,14 +190,4 @@ default void afterIndexShardRecovery(IndexShard indexShard, ActionListener<Void>
192190
* @param indexShard the shard that is recovering
193191
*/
194192
default void afterFilesRestoredFromRepository(IndexShard indexShard) {}
195-
196-
/**
197-
* Called when a single primary permit is acquired for the given shard (see
198-
* {@link IndexShard#acquirePrimaryOperationPermit(ActionListener, java.util.concurrent.Executor)}).
199-
*
200-
* @param indexShard the shard of which a primary permit is requested
201-
* @param onPermitAcquiredListenerSupplier call this immediately to get a listener when the permit is acquired. The listener must be
202-
* completed in order for the permit to be given to the acquiring operation.
203-
*/
204-
default void onAcquirePrimaryOperationPermit(IndexShard indexShard, Supplier<ActionListener<Void>> onPermitAcquiredListenerSupplier) {}
205193
}

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
3535
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
3636
import org.elasticsearch.action.support.PlainActionFuture;
37-
import org.elasticsearch.action.support.RefCountingListener;
3837
import org.elasticsearch.action.support.SubscribableListener;
3938
import org.elasticsearch.action.support.replication.PendingReplicationActions;
4039
import org.elasticsearch.action.support.replication.ReplicationResponse;
@@ -452,6 +451,10 @@ public BulkOperationListener getBulkOperationListener() {
452451
return this.bulkOperationListener;
453452
}
454453

454+
public IndexingOperationListener getIndexingOperationListener() {
455+
return this.indexingOperationListeners;
456+
}
457+
455458
public ShardIndexWarmerService warmerService() {
456459
return this.shardWarmerService;
457460
}
@@ -3585,19 +3588,7 @@ public void acquirePrimaryOperationPermit(
35853588
) {
35863589
verifyNotClosed();
35873590
assert shardRouting.primary() : "acquirePrimaryOperationPermit should only be called on primary shard: " + shardRouting;
3588-
3589-
ActionListener<Releasable> onPermitAcquiredWrapped = onPermitAcquired.delegateFailureAndWrap((delegate, releasable) -> {
3590-
final ActionListener<Releasable> wrappedListener = indexShardOperationPermits.wrapContextPreservingActionListener(
3591-
delegate,
3592-
executorOnDelay,
3593-
forceExecution
3594-
);
3595-
try (var listeners = new RefCountingListener(wrappedListener.map(unused -> releasable))) {
3596-
indexEventListener.onAcquirePrimaryOperationPermit(this, () -> listeners.acquire());
3597-
}
3598-
});
3599-
3600-
indexShardOperationPermits.acquire(wrapPrimaryOperationPermitListener(onPermitAcquiredWrapped), executorOnDelay, forceExecution);
3591+
indexShardOperationPermits.acquire(wrapPrimaryOperationPermitListener(onPermitAcquired), executorOnDelay, forceExecution);
36013592
}
36023593

36033594
public boolean isPrimaryMode() {

server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,7 @@ private void innerAcquire(
230230
onAcquired.onResponse(releasable);
231231
}
232232

233-
public <T extends Closeable> ActionListener<T> wrapContextPreservingActionListener(
233+
private <T extends Closeable> ActionListener<T> wrapContextPreservingActionListener(
234234
ActionListener<T> listener,
235235
@Nullable final Executor executorOnDelay,
236236
final boolean forceExecution

server/src/main/java/org/elasticsearch/index/shard/IndexingOperationListener.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,11 @@
99
package org.elasticsearch.index.shard;
1010

1111
import org.apache.logging.log4j.Logger;
12+
import org.elasticsearch.action.ActionListener;
1213
import org.elasticsearch.index.engine.Engine;
1314

1415
import java.util.List;
16+
import java.util.function.Supplier;
1517

1618
/**
1719
* An indexing listener for indexing, delete, events.
@@ -62,6 +64,18 @@ default void postDelete(ShardId shardId, Engine.Delete delete, Engine.DeleteResu
6264
*/
6365
default void postDelete(ShardId shardId, Engine.Delete delete, Exception ex) {}
6466

67+
/**
68+
* Called when a {@link org.elasticsearch.action.bulk.TransportShardBulkAction} is about to perform index and/or delete operation(s)
69+
* on a primary shard.
70+
*
71+
* This is called from a transport thread and therefore the function should be lightweight and not block the thread. The acquired
72+
* listener(s) can be asynchronously completed on another thread at a later time.
73+
*
74+
* @param indexShard the shard the bulk is about to be performed on
75+
* @param proceedListenerSupplier call this immediately to get a listener which must be completed so that the bulk can proceed.
76+
*/
77+
default void preBulkOnPrimary(IndexShard indexShard, Supplier<ActionListener<Void>> proceedListenerSupplier) {}
78+
6579
/**
6680
* A Composite listener that multiplexes calls to each of the listeners methods.
6781
*/
@@ -149,5 +163,16 @@ public void postDelete(ShardId shardId, Engine.Delete delete, Exception ex) {
149163
}
150164
}
151165
}
166+
167+
@Override
168+
public void preBulkOnPrimary(IndexShard indexShard, Supplier<ActionListener<Void>> proceedListenerSupplier) {
169+
for (IndexingOperationListener listener : listeners) {
170+
try {
171+
listener.preBulkOnPrimary(indexShard, proceedListenerSupplier);
172+
} catch (Exception e) {
173+
logger.warn(() -> "preBulkOnPrimary listener [" + listener + "] failed", e);
174+
}
175+
}
176+
}
152177
}
153178
}

0 commit comments

Comments
 (0)