Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@
public class TransportDataStreamsStatsAction extends TransportBroadcastByNodeAction<
DataStreamsStatsAction.Request,
DataStreamsStatsAction.Response,
DataStreamsStatsAction.DataStreamShardStats> {
DataStreamsStatsAction.DataStreamShardStats,
Void> {

private final IndicesService indicesService;
private final ProjectResolver projectResolver;
Expand Down Expand Up @@ -114,6 +115,7 @@ protected void shardOperation(
DataStreamsStatsAction.Request request,
ShardRouting shardRouting,
Task task,
Void nodeContext,
ActionListener<DataStreamsStatsAction.DataStreamShardStats> listener
) {
ActionListener.completeWith(listener, () -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@
public class TransportReloadAnalyzersAction extends TransportBroadcastByNodeAction<
ReloadAnalyzersRequest,
ReloadAnalyzersResponse,
TransportReloadAnalyzersAction.ReloadResult> {
TransportReloadAnalyzersAction.ReloadResult,
Void> {

public static final ActionType<ReloadAnalyzersResponse> TYPE = new ActionType<>("indices:admin/reload_analyzers");
private static final Logger logger = LogManager.getLogger(TransportReloadAnalyzersAction.class);
Expand Down Expand Up @@ -122,6 +123,7 @@ protected void shardOperation(
ReloadAnalyzersRequest request,
ShardRouting shardRouting,
Task task,
Void nodeContext,
ActionListener<ReloadResult> listener
) {
ActionListener.completeWith(listener, () -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@
public class TransportClearIndicesCacheAction extends TransportBroadcastByNodeAction<
ClearIndicesCacheRequest,
BroadcastResponse,
TransportBroadcastByNodeAction.EmptyResult> {
TransportBroadcastByNodeAction.EmptyResult,
Void> {

public static final ActionType<BroadcastResponse> TYPE = new ActionType<>("indices:admin/cache/clear");
private final IndicesService indicesService;
Expand Down Expand Up @@ -94,6 +95,7 @@ protected void shardOperation(
ClearIndicesCacheRequest request,
ShardRouting shardRouting,
Task task,
Void nodeContext,
ActionListener<EmptyResult> listener
) {
ActionListener.completeWith(listener, () -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@
public class TransportForceMergeAction extends TransportBroadcastByNodeAction<
ForceMergeRequest,
BroadcastResponse,
TransportBroadcastByNodeAction.EmptyResult> {
TransportBroadcastByNodeAction.EmptyResult,
Void> {

private final IndicesService indicesService;
private final ThreadPool threadPool;
Expand Down Expand Up @@ -94,7 +95,8 @@ protected void shardOperation(
ForceMergeRequest request,
ShardRouting shardRouting,
Task task,
ActionListener<TransportBroadcastByNodeAction.EmptyResult> listener
Void nodeContext,
ActionListener<EmptyResult> listener
) {
assert (task instanceof CancellableTask) == false; // TODO: add cancellation handling here once the task supports it
SubscribableListener.<IndexShard>newForked(l -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
* Transport action for shard recovery operation. This transport action does not actually
* perform shard recovery, it only reports on recoveries (both active and complete).
*/
public class TransportRecoveryAction extends TransportBroadcastByNodeAction<RecoveryRequest, RecoveryResponse, RecoveryState> {
public class TransportRecoveryAction extends TransportBroadcastByNodeAction<RecoveryRequest, RecoveryResponse, RecoveryState, Void> {

private final IndicesService indicesService;
private final ProjectResolver projectResolver;
Expand Down Expand Up @@ -103,7 +103,13 @@ protected RecoveryRequest readRequestFrom(StreamInput in) throws IOException {
}

@Override
protected void shardOperation(RecoveryRequest request, ShardRouting shardRouting, Task task, ActionListener<RecoveryState> listener) {
protected void shardOperation(
RecoveryRequest request,
ShardRouting shardRouting,
Task task,
Void nodeContext,
ActionListener<RecoveryState> listener
) {
ActionListener.completeWith(listener, () -> {
assert task instanceof CancellableTask;
IndexService indexService = indicesService.indexServiceSafe(shardRouting.shardId().getIndex());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@
public class TransportIndicesSegmentsAction extends TransportBroadcastByNodeAction<
IndicesSegmentsRequest,
IndicesSegmentResponse,
ShardSegments> {
ShardSegments,
Void> {

private final IndicesService indicesService;
private final ProjectResolver projectResolver;
Expand Down Expand Up @@ -109,6 +110,7 @@ protected void shardOperation(
IndicesSegmentsRequest request,
ShardRouting shardRouting,
Task task,
Void nodeContext,
ActionListener<ShardSegments> listener
) {
ActionListener.completeWith(listener, () -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@
public class TransportFieldUsageAction extends TransportBroadcastByNodeAction<
FieldUsageStatsRequest,
FieldUsageStatsResponse,
FieldUsageShardResponse> {
FieldUsageShardResponse,
Void> {

private final IndicesService indicesService;
private final ProjectResolver projectResolver;
Expand Down Expand Up @@ -94,6 +95,7 @@ protected void shardOperation(
FieldUsageStatsRequest request,
ShardRouting shardRouting,
Task task,
Void nodeContext,
ActionListener<FieldUsageShardResponse> listener
) {
ActionListener.completeWith(listener, () -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,11 @@

import java.io.IOException;

public class TransportIndicesStatsAction extends TransportBroadcastByNodeAction<IndicesStatsRequest, IndicesStatsResponse, ShardStats> {
public class TransportIndicesStatsAction extends TransportBroadcastByNodeAction<
IndicesStatsRequest,
IndicesStatsResponse,
ShardStats,
Void> {

private final IndicesService indicesService;
private final ProjectResolver projectResolver;
Expand Down Expand Up @@ -109,7 +113,13 @@ protected IndicesStatsRequest readRequestFrom(StreamInput in) throws IOException
}

@Override
protected void shardOperation(IndicesStatsRequest request, ShardRouting shardRouting, Task task, ActionListener<ShardStats> listener) {
protected void shardOperation(
IndicesStatsRequest request,
ShardRouting shardRouting,
Task task,
Void nodeContext,
ActionListener<ShardStats> listener
) {
ActionListener.completeWith(listener, () -> {
assert task instanceof CancellableTask;
IndexService indexService = indicesService.indexServiceSafe(shardRouting.shardId().getIndex());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.FixForMultiProject;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.transport.AbstractTransportRequest;
Expand Down Expand Up @@ -70,11 +71,14 @@
* @param <Request> the underlying client request
* @param <Response> the response to the client request
* @param <ShardOperationResult> per-shard operation results
* @param <NodeContext> an (optional) node context created by {@link #createNodeContext} on each node and passed to each call
* to {@link #shardOperation}
*/
public abstract class TransportBroadcastByNodeAction<
Request extends BroadcastRequest<Request>,
Response extends BaseBroadcastResponse,
ShardOperationResult extends Writeable> extends HandledTransportAction<Request, Response> {
ShardOperationResult extends Writeable,
NodeContext> extends HandledTransportAction<Request, Response> {

private static final Logger logger = LogManager.getLogger(TransportBroadcastByNodeAction.class);

Expand Down Expand Up @@ -178,15 +182,25 @@ Response newResponse(
* @param request the node-level request
* @param shardRouting the shard on which to execute the operation
* @param task the task for this node-level request
* @param nodeContext the context created by {{@link #createNodeContext()}}
* @param listener the listener to notify with the result of the shard-level operation
*/
protected abstract void shardOperation(
Request request,
ShardRouting shardRouting,
Task task,
@Nullable NodeContext nodeContext,
ActionListener<ShardOperationResult> listener
);

/**
* @return an (optional) node-level context for this operation, passed to each call to {@link #shardOperation}.
*/
@Nullable
protected NodeContext createNodeContext() {
return null;
}

/**
* Determines the shards on which this operation will be executed on. The operation is executed once per shard.
*
Expand Down Expand Up @@ -415,7 +429,7 @@ private void executeAsDataNode(
) {
assert Transports.assertNotTransportThread("O(#shards) work must always fork to an appropriate executor");
logger.trace("[{}] executing operation on [{}] shards", actionName, shards.size());

final NodeContext nodeContext = createNodeContext();
new CancellableFanOut<ShardRouting, ShardOperationResult, NodeResponse>() {

final ArrayList<ShardOperationResult> results = new ArrayList<>(shards.size());
Expand All @@ -424,7 +438,7 @@ private void executeAsDataNode(
@Override
protected void sendItemRequest(ShardRouting shardRouting, ActionListener<ShardOperationResult> listener) {
logger.trace(() -> format("[%s] executing operation for shard [%s]", actionName, shardRouting.shortSummary()));
ActionRunnable.wrap(listener, l -> shardOperation(request, shardRouting, task, l)).run();
ActionRunnable.wrap(listener, l -> shardOperation(request, shardRouting, task, nodeContext, l)).run();
}

@Override
Expand Down Expand Up @@ -610,8 +624,7 @@ public void writeTo(StreamOutput out) throws IOException {
}

/**
* Can be used for implementations of {@link #shardOperation(BroadcastRequest, ShardRouting, Task, ActionListener) shardOperation} for
* which there is no shard-level return value.
* Can be used for implementations of {@link #shardOperation} for which there is no shard-level return value.
*/
public static final class EmptyResult implements Writeable {
public static final EmptyResult INSTANCE = new EmptyResult();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,10 @@
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.anEmptyMap;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.object.HasToString.hasToString;

public class TransportBroadcastByNodeActionTests extends ESTestCase {
Expand Down Expand Up @@ -165,8 +168,9 @@ public ShardResult() {}
public void writeTo(StreamOutput out) throws IOException {}
}

class TestTransportBroadcastByNodeAction extends TransportBroadcastByNodeAction<Request, Response, ShardResult> {
class TestTransportBroadcastByNodeAction extends TransportBroadcastByNodeAction<Request, Response, ShardResult, Integer> {
private final Map<ShardRouting, Object> shards = new HashMap<>();
private Integer expectedNodeContext = null;

TestTransportBroadcastByNodeAction(String actionName) {
super(
Expand Down Expand Up @@ -201,7 +205,22 @@ protected Request readRequestFrom(StreamInput in) throws IOException {
}

@Override
protected void shardOperation(Request request, ShardRouting shardRouting, Task task, ActionListener<ShardResult> listener) {
public Integer createNodeContext() {
assertThat(expectedNodeContext, nullValue());
expectedNodeContext = randomInt();
return expectedNodeContext;
}

@Override
protected void shardOperation(
Request request,
ShardRouting shardRouting,
Task task,
Integer nodeContext,
ActionListener<ShardResult> listener
) {
assertThat(expectedNodeContext, not(nullValue()));
assertThat(nodeContext, equalTo(expectedNodeContext));
ActionListener.completeWith(listener, () -> {
if (rarely()) {
shards.put(shardRouting, Boolean.TRUE);
Expand Down Expand Up @@ -413,7 +432,7 @@ public void testNoShardOperationsExecutedIfTaskCancelled() throws Exception {
shards.add(shard);
}
}
final TransportBroadcastByNodeAction<Request, Response, ShardResult>.BroadcastByNodeTransportRequestHandler handler =
final TransportBroadcastByNodeAction<Request, Response, ShardResult, Integer>.BroadcastByNodeTransportRequestHandler handler =
action.new BroadcastByNodeTransportRequestHandler();

final PlainActionFuture<TransportResponse> future = new PlainActionFuture<>();
Expand Down Expand Up @@ -474,7 +493,7 @@ public void testOperationExecution() throws Exception {
shards.add(shard);
}
}
final TransportBroadcastByNodeAction<Request, Response, ShardResult>.BroadcastByNodeTransportRequestHandler handler =
final TransportBroadcastByNodeAction<Request, Response, ShardResult, Integer>.BroadcastByNodeTransportRequestHandler handler =
action.new BroadcastByNodeTransportRequestHandler();

final PlainActionFuture<TransportResponse> future = new PlainActionFuture<>();
Expand Down Expand Up @@ -568,7 +587,7 @@ public void testResultAggregation() throws ExecutionException, InterruptedExcept
}
}
totalSuccessfulShards += shardResults.size();
TransportBroadcastByNodeAction<Request, Response, ShardResult>.NodeResponse nodeResponse = action.new NodeResponse(
TransportBroadcastByNodeAction<Request, Response, ShardResult, Integer>.NodeResponse nodeResponse = action.new NodeResponse(
entry.getKey(), shards.size(), shardResults, exceptions
);
transport.handleResponse(requestId, nodeResponse);
Expand Down Expand Up @@ -643,7 +662,13 @@ public void testShardLevelOperationsStopOnCancellation() throws Exception {
int expectedShardId;

@Override
protected void shardOperation(Request request, ShardRouting shardRouting, Task task, ActionListener<ShardResult> listener) {
protected void shardOperation(
Request request,
ShardRouting shardRouting,
Task task,
Integer nodeContext,
ActionListener<ShardResult> listener
) {
// this test runs a node-level operation on three shards, cancelling the task some time during the execution on the second
if (task instanceof CancellableTask cancellableTask) {
assertEquals(expectedShardId++, shardRouting.shardId().id());
Expand Down Expand Up @@ -697,7 +722,13 @@ public void testShardResultsReleasedOnCancellation() throws Exception {

action = new TestTransportBroadcastByNodeAction("indices:admin/shard_level_gc_test") {
@Override
protected void shardOperation(Request request, ShardRouting shardRouting, Task task, ActionListener<ShardResult> listener) {
protected void shardOperation(
Request request,
ShardRouting shardRouting,
Task task,
Integer nodeContext,
ActionListener<ShardResult> listener
) {
listeners.add(listener);
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@
public class TransportForgetFollowerAction extends TransportBroadcastByNodeAction<
ForgetFollowerAction.Request,
BroadcastResponse,
TransportBroadcastByNodeAction.EmptyResult> {
TransportBroadcastByNodeAction.EmptyResult,
Void> {

private final IndicesService indicesService;

Expand Down Expand Up @@ -96,6 +97,7 @@ protected void shardOperation(
final ForgetFollowerAction.Request request,
final ShardRouting shardRouting,
Task task,
Void nodeContext,
ActionListener<EmptyResult> listener
) {
final Index followerIndex = new Index(request.followerIndex(), request.followerIndexUUID());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
public abstract class AbstractTransportSearchableSnapshotsAction<
Request extends BroadcastRequest<Request>,
Response extends BaseBroadcastResponse,
ShardOperationResult extends Writeable> extends TransportBroadcastByNodeAction<Request, Response, ShardOperationResult> {
ShardOperationResult extends Writeable> extends TransportBroadcastByNodeAction<Request, Response, ShardOperationResult, Void> {

private final IndicesService indicesService;
private final XPackLicenseState licenseState;
Expand Down Expand Up @@ -106,7 +106,13 @@ protected ShardsIterator shards(ClusterState state, Request request, String[] co
}

@Override
protected void shardOperation(Request request, ShardRouting shardRouting, Task task, ActionListener<ShardOperationResult> listener) {
protected void shardOperation(
Request request,
ShardRouting shardRouting,
Task task,
Void nodeContext,
ActionListener<ShardOperationResult> listener
) {
ActionListener.completeWith(listener, () -> {
SearchableSnapshots.ensureValidLicense(licenseState);
final IndexShard indexShard = indicesService.indexServiceSafe(shardRouting.index()).getShard(shardRouting.id());
Expand Down