Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/92729.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 92729
summary: Improve scalability of `BroadcastReplicationActions`
area: Network
type: bug
issues: []
31 changes: 31 additions & 0 deletions server/src/main/java/org/elasticsearch/action/ActionListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.core.CheckedFunction;
import org.elasticsearch.core.CheckedRunnable;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;

import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -252,6 +254,13 @@ public String toString() {
}
}

/**
* Creates a listener which releases the given resource on completion (whether success or failure)
*/
static <Response> ActionListener<Response> releasing(Releasable releasable) {
return wrap(runnableFromReleasable(releasable));
}

/**
* Creates a listener that listens for a response (or failure) and executes the
* corresponding runnable when the response (or failure) is received.
Expand Down Expand Up @@ -362,6 +371,14 @@ static <Response> ActionListener<Response> runAfter(ActionListener<Response> del
return new RunAfterActionListener<>(delegate, runAfter);
}

/**
* Wraps a given listener and returns a new listener which releases the provided {@code releaseAfter}
* resource when the listener is notified via either {@code #onResponse} or {@code #onFailure}.
*/
static <Response> ActionListener<Response> releaseAfter(ActionListener<Response> delegate, Releasable releaseAfter) {
return new RunAfterActionListener<>(delegate, runnableFromReleasable(releaseAfter));
}

final class RunAfterActionListener<T> extends Delegating<T, T> {

private final Runnable runAfter;
Expand Down Expand Up @@ -498,4 +515,18 @@ static <Response> void completeWith(ActionListener<Response> listener, CheckedSu
throw ex;
}
}

private static Runnable runnableFromReleasable(Releasable releasable) {
return new Runnable() {
@Override
public void run() {
Releasables.closeExpectNoException(releasable);
}

@Override
public String toString() {
return "release[" + releasable + "]";
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.util.List;
Expand Down Expand Up @@ -46,15 +47,11 @@ public TransportFlushAction(
client,
actionFilters,
indexNameExpressionResolver,
TransportShardFlushAction.TYPE
TransportShardFlushAction.TYPE,
ThreadPool.Names.FLUSH
);
}

@Override
protected ReplicationResponse newShardResponse() {
return new ReplicationResponse();
}

@Override
protected ShardFlushRequest newShardRequest(FlushRequest request, ShardId shardId) {
return new ShardFlushRequest(request, shardId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.action.support.replication.TransportReplicationAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -68,6 +69,11 @@ public TransportShardFlushAction(
);
}

@Override
protected ClusterBlockLevel globalBlockLevel() {
return ClusterBlockLevel.METADATA_READ;
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Surprised this wasn't checked already, today we just return a trivial success before state recovery:

{
  "_shards" : {
    "total" : 0,
    "successful" : 0,
    "failed" : 0
  }
}

We could leave this as-is ofc.


@Override
protected ReplicationResponse newResponseInstance(StreamInput in) throws IOException {
return new ReplicationResponse(in);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.util.List;
Expand Down Expand Up @@ -48,15 +49,11 @@ public TransportRefreshAction(
client,
actionFilters,
indexNameExpressionResolver,
TransportShardRefreshAction.TYPE
TransportShardRefreshAction.TYPE,
ThreadPool.Names.REFRESH
);
}

@Override
protected ReplicationResponse newShardResponse() {
return new ReplicationResponse();
}

@Override
protected BasicReplicationRequest newShardRequest(RefreshRequest request, ShardId shardId) {
BasicReplicationRequest replicationRequest = new BasicReplicationRequest(shardId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.action.support.replication.TransportReplicationAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -64,6 +65,11 @@ protected ReplicationResponse newResponseInstance(StreamInput in) throws IOExcep
return new ReplicationResponse(in);
}

@Override
protected ClusterBlockLevel globalBlockLevel() {
return ClusterBlockLevel.METADATA_READ;
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Surprised this wasn't checked already, today we just return a trivial success before state recovery:

{
  "_shards" : {
    "total" : 0,
    "successful" : 0,
    "failed" : 0
  }
}

We could leave this as-is ofc.


@Override
protected void shardOperationOnPrimary(
BasicReplicationRequest shardRequest,
Expand Down
Loading