diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportDataStreamsStatsAction.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportDataStreamsStatsAction.java index e4e942ca79e7a..c5026a660210f 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportDataStreamsStatsAction.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportDataStreamsStatsAction.java @@ -52,7 +52,8 @@ public class TransportDataStreamsStatsAction extends TransportBroadcastByNodeAction< DataStreamsStatsAction.Request, DataStreamsStatsAction.Response, - DataStreamsStatsAction.DataStreamShardStats> { + DataStreamsStatsAction.DataStreamShardStats, + Void> { private final IndicesService indicesService; private final ProjectResolver projectResolver; @@ -114,6 +115,7 @@ protected void shardOperation( DataStreamsStatsAction.Request request, ShardRouting shardRouting, Task task, + Void nodeContext, ActionListener listener ) { ActionListener.completeWith(listener, () -> { diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/analyze/TransportReloadAnalyzersAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/analyze/TransportReloadAnalyzersAction.java index 2d511f493aba7..ea1299f594e6e 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/analyze/TransportReloadAnalyzersAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/analyze/TransportReloadAnalyzersAction.java @@ -50,7 +50,8 @@ public class TransportReloadAnalyzersAction extends TransportBroadcastByNodeAction< ReloadAnalyzersRequest, ReloadAnalyzersResponse, - TransportReloadAnalyzersAction.ReloadResult> { + TransportReloadAnalyzersAction.ReloadResult, + Void> { public static final ActionType TYPE = new ActionType<>("indices:admin/reload_analyzers"); private static final Logger logger = LogManager.getLogger(TransportReloadAnalyzersAction.class); @@ -122,6 +123,7 @@ protected void shardOperation( ReloadAnalyzersRequest request, ShardRouting shardRouting, Task task, + Void nodeContext, ActionListener listener ) { ActionListener.completeWith(listener, () -> { diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/cache/clear/TransportClearIndicesCacheAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/cache/clear/TransportClearIndicesCacheAction.java index 65d8d43776c72..66aa4f734640c 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/cache/clear/TransportClearIndicesCacheAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/cache/clear/TransportClearIndicesCacheAction.java @@ -37,7 +37,8 @@ public class TransportClearIndicesCacheAction extends TransportBroadcastByNodeAction< ClearIndicesCacheRequest, BroadcastResponse, - TransportBroadcastByNodeAction.EmptyResult> { + TransportBroadcastByNodeAction.EmptyResult, + Void> { public static final ActionType TYPE = new ActionType<>("indices:admin/cache/clear"); private final IndicesService indicesService; @@ -94,6 +95,7 @@ protected void shardOperation( ClearIndicesCacheRequest request, ShardRouting shardRouting, Task task, + Void nodeContext, ActionListener listener ) { ActionListener.completeWith(listener, () -> { diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/forcemerge/TransportForceMergeAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/forcemerge/TransportForceMergeAction.java index 8f6df9a1c3a61..eb6d8f973fc61 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/forcemerge/TransportForceMergeAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/forcemerge/TransportForceMergeAction.java @@ -40,7 +40,8 @@ public class TransportForceMergeAction extends TransportBroadcastByNodeAction< ForceMergeRequest, BroadcastResponse, - TransportBroadcastByNodeAction.EmptyResult> { + TransportBroadcastByNodeAction.EmptyResult, + Void> { private final IndicesService indicesService; private final ThreadPool threadPool; @@ -94,7 +95,8 @@ protected void shardOperation( ForceMergeRequest request, ShardRouting shardRouting, Task task, - ActionListener listener + Void nodeContext, + ActionListener listener ) { assert (task instanceof CancellableTask) == false; // TODO: add cancellation handling here once the task supports it SubscribableListener.newForked(l -> { diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/recovery/TransportRecoveryAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/recovery/TransportRecoveryAction.java index 142493a738cfe..bbd29bb4d39ee 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/recovery/TransportRecoveryAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/recovery/TransportRecoveryAction.java @@ -41,7 +41,7 @@ * Transport action for shard recovery operation. This transport action does not actually * perform shard recovery, it only reports on recoveries (both active and complete). */ -public class TransportRecoveryAction extends TransportBroadcastByNodeAction { +public class TransportRecoveryAction extends TransportBroadcastByNodeAction { private final IndicesService indicesService; private final ProjectResolver projectResolver; @@ -103,7 +103,13 @@ protected RecoveryRequest readRequestFrom(StreamInput in) throws IOException { } @Override - protected void shardOperation(RecoveryRequest request, ShardRouting shardRouting, Task task, ActionListener listener) { + protected void shardOperation( + RecoveryRequest request, + ShardRouting shardRouting, + Task task, + Void nodeContext, + ActionListener listener + ) { ActionListener.completeWith(listener, () -> { assert task instanceof CancellableTask; IndexService indexService = indicesService.indexServiceSafe(shardRouting.shardId().getIndex()); diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/segments/TransportIndicesSegmentsAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/segments/TransportIndicesSegmentsAction.java index 76d0c0175f94d..319b6ad73487c 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/segments/TransportIndicesSegmentsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/segments/TransportIndicesSegmentsAction.java @@ -35,7 +35,8 @@ public class TransportIndicesSegmentsAction extends TransportBroadcastByNodeAction< IndicesSegmentsRequest, IndicesSegmentResponse, - ShardSegments> { + ShardSegments, + Void> { private final IndicesService indicesService; private final ProjectResolver projectResolver; @@ -109,6 +110,7 @@ protected void shardOperation( IndicesSegmentsRequest request, ShardRouting shardRouting, Task task, + Void nodeContext, ActionListener listener ) { ActionListener.completeWith(listener, () -> { diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportFieldUsageAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportFieldUsageAction.java index be9709a596e77..1c0f970a6ff20 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportFieldUsageAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportFieldUsageAction.java @@ -38,7 +38,8 @@ public class TransportFieldUsageAction extends TransportBroadcastByNodeAction< FieldUsageStatsRequest, FieldUsageStatsResponse, - FieldUsageShardResponse> { + FieldUsageShardResponse, + Void> { private final IndicesService indicesService; private final ProjectResolver projectResolver; @@ -94,6 +95,7 @@ protected void shardOperation( FieldUsageStatsRequest request, ShardRouting shardRouting, Task task, + Void nodeContext, ActionListener listener ) { ActionListener.completeWith(listener, () -> { diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java index 210fb42ca584b..77b6264671063 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java @@ -36,7 +36,11 @@ import java.io.IOException; -public class TransportIndicesStatsAction extends TransportBroadcastByNodeAction { +public class TransportIndicesStatsAction extends TransportBroadcastByNodeAction< + IndicesStatsRequest, + IndicesStatsResponse, + ShardStats, + Void> { private final IndicesService indicesService; private final ProjectResolver projectResolver; @@ -109,7 +113,13 @@ protected IndicesStatsRequest readRequestFrom(StreamInput in) throws IOException } @Override - protected void shardOperation(IndicesStatsRequest request, ShardRouting shardRouting, Task task, ActionListener listener) { + protected void shardOperation( + IndicesStatsRequest request, + ShardRouting shardRouting, + Task task, + Void nodeContext, + ActionListener listener + ) { ActionListener.completeWith(listener, () -> { assert task instanceof CancellableTask; IndexService indexService = indicesService.indexServiceSafe(shardRouting.shardId().getIndex()); diff --git a/server/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java b/server/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java index 899f83f5f6fb1..e8c0fc92cf544 100644 --- a/server/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java @@ -40,6 +40,7 @@ import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.core.FixForMultiProject; +import org.elasticsearch.core.Nullable; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.transport.AbstractTransportRequest; @@ -70,11 +71,14 @@ * @param the underlying client request * @param the response to the client request * @param per-shard operation results + * @param an (optional) node context created by {@link #createNodeContext} on each node and passed to each call + * to {@link #shardOperation} */ public abstract class TransportBroadcastByNodeAction< Request extends BroadcastRequest, Response extends BaseBroadcastResponse, - ShardOperationResult extends Writeable> extends HandledTransportAction { + ShardOperationResult extends Writeable, + NodeContext> extends HandledTransportAction { private static final Logger logger = LogManager.getLogger(TransportBroadcastByNodeAction.class); @@ -178,15 +182,25 @@ Response newResponse( * @param request the node-level request * @param shardRouting the shard on which to execute the operation * @param task the task for this node-level request + * @param nodeContext the context created by {{@link #createNodeContext()}} * @param listener the listener to notify with the result of the shard-level operation */ protected abstract void shardOperation( Request request, ShardRouting shardRouting, Task task, + @Nullable NodeContext nodeContext, ActionListener listener ); + /** + * @return an (optional) node-level context for this operation, passed to each call to {@link #shardOperation}. + */ + @Nullable + protected NodeContext createNodeContext() { + return null; + } + /** * Determines the shards on which this operation will be executed on. The operation is executed once per shard. * @@ -415,7 +429,7 @@ private void executeAsDataNode( ) { assert Transports.assertNotTransportThread("O(#shards) work must always fork to an appropriate executor"); logger.trace("[{}] executing operation on [{}] shards", actionName, shards.size()); - + final NodeContext nodeContext = createNodeContext(); new CancellableFanOut() { final ArrayList results = new ArrayList<>(shards.size()); @@ -424,7 +438,7 @@ private void executeAsDataNode( @Override protected void sendItemRequest(ShardRouting shardRouting, ActionListener listener) { logger.trace(() -> format("[%s] executing operation for shard [%s]", actionName, shardRouting.shortSummary())); - ActionRunnable.wrap(listener, l -> shardOperation(request, shardRouting, task, l)).run(); + ActionRunnable.wrap(listener, l -> shardOperation(request, shardRouting, task, nodeContext, l)).run(); } @Override @@ -610,8 +624,7 @@ public void writeTo(StreamOutput out) throws IOException { } /** - * Can be used for implementations of {@link #shardOperation(BroadcastRequest, ShardRouting, Task, ActionListener) shardOperation} for - * which there is no shard-level return value. + * Can be used for implementations of {@link #shardOperation} for which there is no shard-level return value. */ public static final class EmptyResult implements Writeable { public static final EmptyResult INSTANCE = new EmptyResult(); diff --git a/server/src/test/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeActionTests.java b/server/src/test/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeActionTests.java index 8e86044a33673..f3bfe979976a4 100644 --- a/server/src/test/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeActionTests.java @@ -95,7 +95,10 @@ import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.anEmptyMap; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.object.HasToString.hasToString; public class TransportBroadcastByNodeActionTests extends ESTestCase { @@ -165,8 +168,9 @@ public ShardResult() {} public void writeTo(StreamOutput out) throws IOException {} } - class TestTransportBroadcastByNodeAction extends TransportBroadcastByNodeAction { + class TestTransportBroadcastByNodeAction extends TransportBroadcastByNodeAction { private final Map shards = new HashMap<>(); + private Integer expectedNodeContext = null; TestTransportBroadcastByNodeAction(String actionName) { super( @@ -201,7 +205,22 @@ protected Request readRequestFrom(StreamInput in) throws IOException { } @Override - protected void shardOperation(Request request, ShardRouting shardRouting, Task task, ActionListener listener) { + public Integer createNodeContext() { + assertThat(expectedNodeContext, nullValue()); + expectedNodeContext = randomInt(); + return expectedNodeContext; + } + + @Override + protected void shardOperation( + Request request, + ShardRouting shardRouting, + Task task, + Integer nodeContext, + ActionListener listener + ) { + assertThat(expectedNodeContext, not(nullValue())); + assertThat(nodeContext, equalTo(expectedNodeContext)); ActionListener.completeWith(listener, () -> { if (rarely()) { shards.put(shardRouting, Boolean.TRUE); @@ -413,7 +432,7 @@ public void testNoShardOperationsExecutedIfTaskCancelled() throws Exception { shards.add(shard); } } - final TransportBroadcastByNodeAction.BroadcastByNodeTransportRequestHandler handler = + final TransportBroadcastByNodeAction.BroadcastByNodeTransportRequestHandler handler = action.new BroadcastByNodeTransportRequestHandler(); final PlainActionFuture future = new PlainActionFuture<>(); @@ -474,7 +493,7 @@ public void testOperationExecution() throws Exception { shards.add(shard); } } - final TransportBroadcastByNodeAction.BroadcastByNodeTransportRequestHandler handler = + final TransportBroadcastByNodeAction.BroadcastByNodeTransportRequestHandler handler = action.new BroadcastByNodeTransportRequestHandler(); final PlainActionFuture future = new PlainActionFuture<>(); @@ -568,7 +587,7 @@ public void testResultAggregation() throws ExecutionException, InterruptedExcept } } totalSuccessfulShards += shardResults.size(); - TransportBroadcastByNodeAction.NodeResponse nodeResponse = action.new NodeResponse( + TransportBroadcastByNodeAction.NodeResponse nodeResponse = action.new NodeResponse( entry.getKey(), shards.size(), shardResults, exceptions ); transport.handleResponse(requestId, nodeResponse); @@ -643,7 +662,13 @@ public void testShardLevelOperationsStopOnCancellation() throws Exception { int expectedShardId; @Override - protected void shardOperation(Request request, ShardRouting shardRouting, Task task, ActionListener listener) { + protected void shardOperation( + Request request, + ShardRouting shardRouting, + Task task, + Integer nodeContext, + ActionListener listener + ) { // this test runs a node-level operation on three shards, cancelling the task some time during the execution on the second if (task instanceof CancellableTask cancellableTask) { assertEquals(expectedShardId++, shardRouting.shardId().id()); @@ -697,7 +722,13 @@ public void testShardResultsReleasedOnCancellation() throws Exception { action = new TestTransportBroadcastByNodeAction("indices:admin/shard_level_gc_test") { @Override - protected void shardOperation(Request request, ShardRouting shardRouting, Task task, ActionListener listener) { + protected void shardOperation( + Request request, + ShardRouting shardRouting, + Task task, + Integer nodeContext, + ActionListener listener + ) { listeners.add(listener); } }; diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportForgetFollowerAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportForgetFollowerAction.java index cb5c6f9985753..bfbec3c72c010 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportForgetFollowerAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportForgetFollowerAction.java @@ -44,7 +44,8 @@ public class TransportForgetFollowerAction extends TransportBroadcastByNodeAction< ForgetFollowerAction.Request, BroadcastResponse, - TransportBroadcastByNodeAction.EmptyResult> { + TransportBroadcastByNodeAction.EmptyResult, + Void> { private final IndicesService indicesService; @@ -96,6 +97,7 @@ protected void shardOperation( final ForgetFollowerAction.Request request, final ShardRouting shardRouting, Task task, + Void nodeContext, ActionListener listener ) { final Index followerIndex = new Index(request.followerIndex(), request.followerIndexUUID()); diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/AbstractTransportSearchableSnapshotsAction.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/AbstractTransportSearchableSnapshotsAction.java index 25b38e12a0680..8af6db59691c4 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/AbstractTransportSearchableSnapshotsAction.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/AbstractTransportSearchableSnapshotsAction.java @@ -40,7 +40,7 @@ public abstract class AbstractTransportSearchableSnapshotsAction< Request extends BroadcastRequest, Response extends BaseBroadcastResponse, - ShardOperationResult extends Writeable> extends TransportBroadcastByNodeAction { + ShardOperationResult extends Writeable> extends TransportBroadcastByNodeAction { private final IndicesService indicesService; private final XPackLicenseState licenseState; @@ -106,7 +106,13 @@ protected ShardsIterator shards(ClusterState state, Request request, String[] co } @Override - protected void shardOperation(Request request, ShardRouting shardRouting, Task task, ActionListener listener) { + protected void shardOperation( + Request request, + ShardRouting shardRouting, + Task task, + Void nodeContext, + ActionListener listener + ) { ActionListener.completeWith(listener, () -> { SearchableSnapshots.ensureValidLicense(licenseState); final IndexShard indexShard = indicesService.indexServiceSafe(shardRouting.index()).getShard(shardRouting.id());