diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportDataStreamsStatsAction.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportDataStreamsStatsAction.java index 7428a8e758f5d..35803e37850c8 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportDataStreamsStatsAction.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportDataStreamsStatsAction.java @@ -51,7 +51,8 @@ public class TransportDataStreamsStatsAction extends TransportBroadcastByNodeAction< DataStreamsStatsAction.Request, DataStreamsStatsAction.Response, - DataStreamsStatsAction.DataStreamShardStats> { + DataStreamsStatsAction.DataStreamShardStats, + Void> { private final IndicesService indicesService; @@ -110,6 +111,7 @@ protected void shardOperation( DataStreamsStatsAction.Request request, ShardRouting shardRouting, Task task, + Void nodeContext, ActionListener listener ) { ActionListener.completeWith(listener, () -> { diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/analyze/TransportReloadAnalyzersAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/analyze/TransportReloadAnalyzersAction.java index 2d511f493aba7..ea1299f594e6e 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/analyze/TransportReloadAnalyzersAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/analyze/TransportReloadAnalyzersAction.java @@ -50,7 +50,8 @@ public class TransportReloadAnalyzersAction extends TransportBroadcastByNodeAction< ReloadAnalyzersRequest, ReloadAnalyzersResponse, - TransportReloadAnalyzersAction.ReloadResult> { + TransportReloadAnalyzersAction.ReloadResult, + Void> { public static final ActionType TYPE = new ActionType<>("indices:admin/reload_analyzers"); private static final Logger logger = LogManager.getLogger(TransportReloadAnalyzersAction.class); @@ -122,6 +123,7 @@ protected void shardOperation( ReloadAnalyzersRequest request, ShardRouting shardRouting, Task task, + Void nodeContext, ActionListener listener ) { ActionListener.completeWith(listener, () -> { diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/cache/clear/TransportClearIndicesCacheAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/cache/clear/TransportClearIndicesCacheAction.java index fe81dce5eaa83..f79d757dee80c 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/cache/clear/TransportClearIndicesCacheAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/cache/clear/TransportClearIndicesCacheAction.java @@ -36,7 +36,8 @@ public class TransportClearIndicesCacheAction extends TransportBroadcastByNodeAction< ClearIndicesCacheRequest, BroadcastResponse, - TransportBroadcastByNodeAction.EmptyResult> { + TransportBroadcastByNodeAction.EmptyResult, + Void> { public static final ActionType TYPE = new ActionType<>("indices:admin/cache/clear"); private final IndicesService indicesService; @@ -90,6 +91,7 @@ protected void shardOperation( ClearIndicesCacheRequest request, ShardRouting shardRouting, Task task, + Void nodeContext, ActionListener listener ) { ActionListener.completeWith(listener, () -> { diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/forcemerge/TransportForceMergeAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/forcemerge/TransportForceMergeAction.java index da08b78d711cf..686e2ce283e7d 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/forcemerge/TransportForceMergeAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/forcemerge/TransportForceMergeAction.java @@ -38,7 +38,8 @@ public class TransportForceMergeAction extends TransportBroadcastByNodeAction< ForceMergeRequest, BroadcastResponse, - TransportBroadcastByNodeAction.EmptyResult> { + TransportBroadcastByNodeAction.EmptyResult, + Void> { private final IndicesService indicesService; private final ThreadPool threadPool; @@ -89,7 +90,8 @@ protected void shardOperation( ForceMergeRequest request, ShardRouting shardRouting, Task task, - ActionListener listener + Void nodeContext, + ActionListener listener ) { assert (task instanceof CancellableTask) == false; // TODO: add cancellation handling here once the task supports it threadPool.executor(ThreadPool.Names.FORCE_MERGE).execute(ActionRunnable.supply(listener, () -> { diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/recovery/TransportRecoveryAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/recovery/TransportRecoveryAction.java index 4dbbb317cd1f8..ff775a91cdac2 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/recovery/TransportRecoveryAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/recovery/TransportRecoveryAction.java @@ -40,7 +40,7 @@ * Transport action for shard recovery operation. This transport action does not actually * perform shard recovery, it only reports on recoveries (both active and complete). */ -public class TransportRecoveryAction extends TransportBroadcastByNodeAction { +public class TransportRecoveryAction extends TransportBroadcastByNodeAction { private final IndicesService indicesService; @@ -99,7 +99,13 @@ protected RecoveryRequest readRequestFrom(StreamInput in) throws IOException { } @Override - protected void shardOperation(RecoveryRequest request, ShardRouting shardRouting, Task task, ActionListener listener) { + protected void shardOperation( + RecoveryRequest request, + ShardRouting shardRouting, + Task task, + Void nodeContext, + ActionListener listener + ) { ActionListener.completeWith(listener, () -> { assert task instanceof CancellableTask; IndexService indexService = indicesService.indexServiceSafe(shardRouting.shardId().getIndex()); diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/segments/TransportIndicesSegmentsAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/segments/TransportIndicesSegmentsAction.java index be0bf76673f2f..f515851f109cf 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/segments/TransportIndicesSegmentsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/segments/TransportIndicesSegmentsAction.java @@ -34,7 +34,8 @@ public class TransportIndicesSegmentsAction extends TransportBroadcastByNodeAction< IndicesSegmentsRequest, IndicesSegmentResponse, - ShardSegments> { + ShardSegments, + Void> { private final IndicesService indicesService; @@ -105,6 +106,7 @@ protected void shardOperation( IndicesSegmentsRequest request, ShardRouting shardRouting, Task task, + Void nodeContext, ActionListener listener ) { ActionListener.completeWith(listener, () -> { diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportFieldUsageAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportFieldUsageAction.java index d660f94d2c829..aa2c697b6cbcc 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportFieldUsageAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportFieldUsageAction.java @@ -37,7 +37,8 @@ public class TransportFieldUsageAction extends TransportBroadcastByNodeAction< FieldUsageStatsRequest, FieldUsageStatsResponse, - FieldUsageShardResponse> { + FieldUsageShardResponse, + Void> { private final IndicesService indicesService; @@ -90,6 +91,7 @@ protected void shardOperation( FieldUsageStatsRequest request, ShardRouting shardRouting, Task task, + Void nodeContext, ActionListener listener ) { ActionListener.completeWith(listener, () -> { diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java index d3274dab8a386..e7a7a37dbc82b 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java @@ -35,7 +35,11 @@ import java.io.IOException; -public class TransportIndicesStatsAction extends TransportBroadcastByNodeAction { +public class TransportIndicesStatsAction extends TransportBroadcastByNodeAction< + IndicesStatsRequest, + IndicesStatsResponse, + ShardStats, + Void> { private final IndicesService indicesService; @@ -105,7 +109,13 @@ protected IndicesStatsRequest readRequestFrom(StreamInput in) throws IOException } @Override - protected void shardOperation(IndicesStatsRequest request, ShardRouting shardRouting, Task task, ActionListener listener) { + protected void shardOperation( + IndicesStatsRequest request, + ShardRouting shardRouting, + Task task, + Void nodeContext, + ActionListener listener + ) { ActionListener.completeWith(listener, () -> { assert task instanceof CancellableTask; IndexService indexService = indicesService.indexServiceSafe(shardRouting.shardId().getIndex()); diff --git a/server/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java b/server/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java index 5c6171dc22536..e6670c6238b1b 100644 --- a/server/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java @@ -39,6 +39,7 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.core.Nullable; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.transport.TransportChannel; @@ -69,11 +70,14 @@ * @param the underlying client request * @param the response to the client request * @param per-shard operation results + * @param an (optional) node context created by {@link #createNodeContext} on each node and passed to each call + * to {@link #shardOperation} */ public abstract class TransportBroadcastByNodeAction< Request extends BroadcastRequest, Response extends BaseBroadcastResponse, - ShardOperationResult extends Writeable> extends HandledTransportAction { + ShardOperationResult extends Writeable, + NodeContext> extends HandledTransportAction { private static final Logger logger = LogManager.getLogger(TransportBroadcastByNodeAction.class); @@ -177,15 +181,25 @@ Response newResponse( * @param request the node-level request * @param shardRouting the shard on which to execute the operation * @param task the task for this node-level request + * @param nodeContext the context created by {{@link #createNodeContext()}} * @param listener the listener to notify with the result of the shard-level operation */ protected abstract void shardOperation( Request request, ShardRouting shardRouting, Task task, + @Nullable NodeContext nodeContext, ActionListener listener ); + /** + * @return an (optional) node-level context for this operation, passed to each call to {@link #shardOperation}. + */ + @Nullable + protected NodeContext createNodeContext() { + return null; + } + /** * Determines the shards on which this operation will be executed on. The operation is executed once per shard. * @@ -412,7 +426,7 @@ private void executeAsDataNode( ) { assert Transports.assertNotTransportThread("O(#shards) work must always fork to an appropriate executor"); logger.trace("[{}] executing operation on [{}] shards", actionName, shards.size()); - + final NodeContext nodeContext = createNodeContext(); new CancellableFanOut() { final ArrayList results = new ArrayList<>(shards.size()); @@ -421,7 +435,7 @@ private void executeAsDataNode( @Override protected void sendItemRequest(ShardRouting shardRouting, ActionListener listener) { logger.trace(() -> format("[%s] executing operation for shard [%s]", actionName, shardRouting.shortSummary())); - ActionRunnable.wrap(listener, l -> shardOperation(request, shardRouting, task, l)).run(); + ActionRunnable.wrap(listener, l -> shardOperation(request, shardRouting, task, nodeContext, l)).run(); } @Override @@ -608,8 +622,7 @@ public void writeTo(StreamOutput out) throws IOException { } /** - * Can be used for implementations of {@link #shardOperation(BroadcastRequest, ShardRouting, Task, ActionListener) shardOperation} for - * which there is no shard-level return value. + * Can be used for implementations of {@link #shardOperation} for which there is no shard-level return value. */ public static final class EmptyResult implements Writeable { public static EmptyResult INSTANCE = new EmptyResult(); diff --git a/server/src/test/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeActionTests.java b/server/src/test/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeActionTests.java index e0cd8d8390c74..ba7508230968c 100644 --- a/server/src/test/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeActionTests.java @@ -93,7 +93,10 @@ import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.anEmptyMap; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.object.HasToString.hasToString; public class TransportBroadcastByNodeActionTests extends ESTestCase { @@ -163,8 +166,9 @@ public ShardResult() {} public void writeTo(StreamOutput out) throws IOException {} } - class TestTransportBroadcastByNodeAction extends TransportBroadcastByNodeAction { + class TestTransportBroadcastByNodeAction extends TransportBroadcastByNodeAction { private final Map shards = new HashMap<>(); + private Integer expectedNodeContext = null; TestTransportBroadcastByNodeAction(String actionName) { super( @@ -199,7 +203,22 @@ protected Request readRequestFrom(StreamInput in) throws IOException { } @Override - protected void shardOperation(Request request, ShardRouting shardRouting, Task task, ActionListener listener) { + public Integer createNodeContext() { + assertThat(expectedNodeContext, nullValue()); + expectedNodeContext = randomInt(); + return expectedNodeContext; + } + + @Override + protected void shardOperation( + Request request, + ShardRouting shardRouting, + Task task, + Integer nodeContext, + ActionListener listener + ) { + assertThat(expectedNodeContext, not(nullValue())); + assertThat(nodeContext, equalTo(expectedNodeContext)); ActionListener.completeWith(listener, () -> { if (rarely()) { shards.put(shardRouting, Boolean.TRUE); @@ -411,7 +430,7 @@ public void testNoShardOperationsExecutedIfTaskCancelled() throws Exception { shards.add(shard); } } - final TransportBroadcastByNodeAction.BroadcastByNodeTransportRequestHandler handler = + final TransportBroadcastByNodeAction.BroadcastByNodeTransportRequestHandler handler = action.new BroadcastByNodeTransportRequestHandler(); final PlainActionFuture future = new PlainActionFuture<>(); @@ -472,7 +491,7 @@ public void testOperationExecution() throws Exception { shards.add(shard); } } - final TransportBroadcastByNodeAction.BroadcastByNodeTransportRequestHandler handler = + final TransportBroadcastByNodeAction.BroadcastByNodeTransportRequestHandler handler = action.new BroadcastByNodeTransportRequestHandler(); final PlainActionFuture future = new PlainActionFuture<>(); @@ -566,7 +585,7 @@ public void testResultAggregation() throws ExecutionException, InterruptedExcept } } totalSuccessfulShards += shardResults.size(); - TransportBroadcastByNodeAction.NodeResponse nodeResponse = action.new NodeResponse( + TransportBroadcastByNodeAction.NodeResponse nodeResponse = action.new NodeResponse( entry.getKey(), shards.size(), shardResults, exceptions ); transport.handleResponse(requestId, nodeResponse); @@ -641,7 +660,13 @@ public void testShardLevelOperationsStopOnCancellation() throws Exception { int expectedShardId; @Override - protected void shardOperation(Request request, ShardRouting shardRouting, Task task, ActionListener listener) { + protected void shardOperation( + Request request, + ShardRouting shardRouting, + Task task, + Integer nodeContext, + ActionListener listener + ) { // this test runs a node-level operation on three shards, cancelling the task some time during the execution on the second if (task instanceof CancellableTask cancellableTask) { assertEquals(expectedShardId++, shardRouting.shardId().id()); @@ -695,7 +720,13 @@ public void testShardResultsReleasedOnCancellation() throws Exception { action = new TestTransportBroadcastByNodeAction("indices:admin/shard_level_gc_test") { @Override - protected void shardOperation(Request request, ShardRouting shardRouting, Task task, ActionListener listener) { + protected void shardOperation( + Request request, + ShardRouting shardRouting, + Task task, + Integer nodeContext, + ActionListener listener + ) { listeners.add(listener); } }; diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportForgetFollowerAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportForgetFollowerAction.java index f126a546ae850..4395c854eca87 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportForgetFollowerAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportForgetFollowerAction.java @@ -44,7 +44,8 @@ public class TransportForgetFollowerAction extends TransportBroadcastByNodeAction< ForgetFollowerAction.Request, BroadcastResponse, - TransportBroadcastByNodeAction.EmptyResult> { + TransportBroadcastByNodeAction.EmptyResult, + Void> { private final IndicesService indicesService; @@ -96,6 +97,7 @@ protected void shardOperation( final ForgetFollowerAction.Request request, final ShardRouting shardRouting, Task task, + Void nodeContext, ActionListener listener ) { final Index followerIndex = new Index(request.followerIndex(), request.followerIndexUUID()); diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/AbstractTransportSearchableSnapshotsAction.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/AbstractTransportSearchableSnapshotsAction.java index fd77a9f36a3df..4aeab5ea2566d 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/AbstractTransportSearchableSnapshotsAction.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/AbstractTransportSearchableSnapshotsAction.java @@ -40,7 +40,7 @@ public abstract class AbstractTransportSearchableSnapshotsAction< Request extends BroadcastRequest, Response extends BaseBroadcastResponse, - ShardOperationResult extends Writeable> extends TransportBroadcastByNodeAction { + ShardOperationResult extends Writeable> extends TransportBroadcastByNodeAction { private final IndicesService indicesService; private final XPackLicenseState licenseState; @@ -106,7 +106,13 @@ protected ShardsIterator shards(ClusterState state, Request request, String[] co } @Override - protected void shardOperation(Request request, ShardRouting shardRouting, Task task, ActionListener listener) { + protected void shardOperation( + Request request, + ShardRouting shardRouting, + Task task, + Void nodeContext, + ActionListener listener + ) { ActionListener.completeWith(listener, () -> { SearchableSnapshots.ensureValidLicense(licenseState); final IndexShard indexShard = indicesService.indexServiceSafe(shardRouting.index()).getShard(shardRouting.id());