Skip to content

Commit 9df1314

Browse files
authored
Adding NodeContext to TransportBroadcastByNodeAction (#138057)
1 parent 8c01219 commit 9df1314

File tree

12 files changed

+106
-26
lines changed

12 files changed

+106
-26
lines changed

modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportDataStreamsStatsAction.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,8 @@
5252
public class TransportDataStreamsStatsAction extends TransportBroadcastByNodeAction<
5353
DataStreamsStatsAction.Request,
5454
DataStreamsStatsAction.Response,
55-
DataStreamsStatsAction.DataStreamShardStats> {
55+
DataStreamsStatsAction.DataStreamShardStats,
56+
Void> {
5657

5758
private final IndicesService indicesService;
5859
private final ProjectResolver projectResolver;
@@ -114,6 +115,7 @@ protected void shardOperation(
114115
DataStreamsStatsAction.Request request,
115116
ShardRouting shardRouting,
116117
Task task,
118+
Void nodeContext,
117119
ActionListener<DataStreamsStatsAction.DataStreamShardStats> listener
118120
) {
119121
ActionListener.completeWith(listener, () -> {

server/src/main/java/org/elasticsearch/action/admin/indices/analyze/TransportReloadAnalyzersAction.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,8 @@
5050
public class TransportReloadAnalyzersAction extends TransportBroadcastByNodeAction<
5151
ReloadAnalyzersRequest,
5252
ReloadAnalyzersResponse,
53-
TransportReloadAnalyzersAction.ReloadResult> {
53+
TransportReloadAnalyzersAction.ReloadResult,
54+
Void> {
5455

5556
public static final ActionType<ReloadAnalyzersResponse> TYPE = new ActionType<>("indices:admin/reload_analyzers");
5657
private static final Logger logger = LogManager.getLogger(TransportReloadAnalyzersAction.class);
@@ -122,6 +123,7 @@ protected void shardOperation(
122123
ReloadAnalyzersRequest request,
123124
ShardRouting shardRouting,
124125
Task task,
126+
Void nodeContext,
125127
ActionListener<ReloadResult> listener
126128
) {
127129
ActionListener.completeWith(listener, () -> {

server/src/main/java/org/elasticsearch/action/admin/indices/cache/clear/TransportClearIndicesCacheAction.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,8 @@
3737
public class TransportClearIndicesCacheAction extends TransportBroadcastByNodeAction<
3838
ClearIndicesCacheRequest,
3939
BroadcastResponse,
40-
TransportBroadcastByNodeAction.EmptyResult> {
40+
TransportBroadcastByNodeAction.EmptyResult,
41+
Void> {
4142

4243
public static final ActionType<BroadcastResponse> TYPE = new ActionType<>("indices:admin/cache/clear");
4344
private final IndicesService indicesService;
@@ -94,6 +95,7 @@ protected void shardOperation(
9495
ClearIndicesCacheRequest request,
9596
ShardRouting shardRouting,
9697
Task task,
98+
Void nodeContext,
9799
ActionListener<EmptyResult> listener
98100
) {
99101
ActionListener.completeWith(listener, () -> {

server/src/main/java/org/elasticsearch/action/admin/indices/forcemerge/TransportForceMergeAction.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@
4040
public class TransportForceMergeAction extends TransportBroadcastByNodeAction<
4141
ForceMergeRequest,
4242
BroadcastResponse,
43-
TransportBroadcastByNodeAction.EmptyResult> {
43+
TransportBroadcastByNodeAction.EmptyResult,
44+
Void> {
4445

4546
private final IndicesService indicesService;
4647
private final ThreadPool threadPool;
@@ -94,7 +95,8 @@ protected void shardOperation(
9495
ForceMergeRequest request,
9596
ShardRouting shardRouting,
9697
Task task,
97-
ActionListener<TransportBroadcastByNodeAction.EmptyResult> listener
98+
Void nodeContext,
99+
ActionListener<EmptyResult> listener
98100
) {
99101
assert (task instanceof CancellableTask) == false; // TODO: add cancellation handling here once the task supports it
100102
SubscribableListener.<IndexShard>newForked(l -> {

server/src/main/java/org/elasticsearch/action/admin/indices/recovery/TransportRecoveryAction.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
* Transport action for shard recovery operation. This transport action does not actually
4242
* perform shard recovery, it only reports on recoveries (both active and complete).
4343
*/
44-
public class TransportRecoveryAction extends TransportBroadcastByNodeAction<RecoveryRequest, RecoveryResponse, RecoveryState> {
44+
public class TransportRecoveryAction extends TransportBroadcastByNodeAction<RecoveryRequest, RecoveryResponse, RecoveryState, Void> {
4545

4646
private final IndicesService indicesService;
4747
private final ProjectResolver projectResolver;
@@ -103,7 +103,13 @@ protected RecoveryRequest readRequestFrom(StreamInput in) throws IOException {
103103
}
104104

105105
@Override
106-
protected void shardOperation(RecoveryRequest request, ShardRouting shardRouting, Task task, ActionListener<RecoveryState> listener) {
106+
protected void shardOperation(
107+
RecoveryRequest request,
108+
ShardRouting shardRouting,
109+
Task task,
110+
Void nodeContext,
111+
ActionListener<RecoveryState> listener
112+
) {
107113
ActionListener.completeWith(listener, () -> {
108114
assert task instanceof CancellableTask;
109115
IndexService indexService = indicesService.indexServiceSafe(shardRouting.shardId().getIndex());

server/src/main/java/org/elasticsearch/action/admin/indices/segments/TransportIndicesSegmentsAction.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@
3535
public class TransportIndicesSegmentsAction extends TransportBroadcastByNodeAction<
3636
IndicesSegmentsRequest,
3737
IndicesSegmentResponse,
38-
ShardSegments> {
38+
ShardSegments,
39+
Void> {
3940

4041
private final IndicesService indicesService;
4142
private final ProjectResolver projectResolver;
@@ -109,6 +110,7 @@ protected void shardOperation(
109110
IndicesSegmentsRequest request,
110111
ShardRouting shardRouting,
111112
Task task,
113+
Void nodeContext,
112114
ActionListener<ShardSegments> listener
113115
) {
114116
ActionListener.completeWith(listener, () -> {

server/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportFieldUsageAction.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@
3838
public class TransportFieldUsageAction extends TransportBroadcastByNodeAction<
3939
FieldUsageStatsRequest,
4040
FieldUsageStatsResponse,
41-
FieldUsageShardResponse> {
41+
FieldUsageShardResponse,
42+
Void> {
4243

4344
private final IndicesService indicesService;
4445
private final ProjectResolver projectResolver;
@@ -94,6 +95,7 @@ protected void shardOperation(
9495
FieldUsageStatsRequest request,
9596
ShardRouting shardRouting,
9697
Task task,
98+
Void nodeContext,
9799
ActionListener<FieldUsageShardResponse> listener
98100
) {
99101
ActionListener.completeWith(listener, () -> {

server/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,11 @@
3636

3737
import java.io.IOException;
3838

39-
public class TransportIndicesStatsAction extends TransportBroadcastByNodeAction<IndicesStatsRequest, IndicesStatsResponse, ShardStats> {
39+
public class TransportIndicesStatsAction extends TransportBroadcastByNodeAction<
40+
IndicesStatsRequest,
41+
IndicesStatsResponse,
42+
ShardStats,
43+
Void> {
4044

4145
private final IndicesService indicesService;
4246
private final ProjectResolver projectResolver;
@@ -109,7 +113,13 @@ protected IndicesStatsRequest readRequestFrom(StreamInput in) throws IOException
109113
}
110114

111115
@Override
112-
protected void shardOperation(IndicesStatsRequest request, ShardRouting shardRouting, Task task, ActionListener<ShardStats> listener) {
116+
protected void shardOperation(
117+
IndicesStatsRequest request,
118+
ShardRouting shardRouting,
119+
Task task,
120+
Void nodeContext,
121+
ActionListener<ShardStats> listener
122+
) {
113123
ActionListener.completeWith(listener, () -> {
114124
assert task instanceof CancellableTask;
115125
IndexService indexService = indicesService.indexServiceSafe(shardRouting.shardId().getIndex());

server/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.elasticsearch.common.io.stream.Writeable;
4141
import org.elasticsearch.common.util.concurrent.EsExecutors;
4242
import org.elasticsearch.core.FixForMultiProject;
43+
import org.elasticsearch.core.Nullable;
4344
import org.elasticsearch.tasks.Task;
4445
import org.elasticsearch.tasks.TaskId;
4546
import org.elasticsearch.transport.AbstractTransportRequest;
@@ -70,11 +71,14 @@
7071
* @param <Request> the underlying client request
7172
* @param <Response> the response to the client request
7273
* @param <ShardOperationResult> per-shard operation results
74+
* @param <NodeContext> an (optional) node context created by {@link #createNodeContext} on each node and passed to each call
75+
* to {@link #shardOperation}
7376
*/
7477
public abstract class TransportBroadcastByNodeAction<
7578
Request extends BroadcastRequest<Request>,
7679
Response extends BaseBroadcastResponse,
77-
ShardOperationResult extends Writeable> extends HandledTransportAction<Request, Response> {
80+
ShardOperationResult extends Writeable,
81+
NodeContext> extends HandledTransportAction<Request, Response> {
7882

7983
private static final Logger logger = LogManager.getLogger(TransportBroadcastByNodeAction.class);
8084

@@ -178,15 +182,25 @@ Response newResponse(
178182
* @param request the node-level request
179183
* @param shardRouting the shard on which to execute the operation
180184
* @param task the task for this node-level request
185+
* @param nodeContext the context created by {{@link #createNodeContext()}}
181186
* @param listener the listener to notify with the result of the shard-level operation
182187
*/
183188
protected abstract void shardOperation(
184189
Request request,
185190
ShardRouting shardRouting,
186191
Task task,
192+
@Nullable NodeContext nodeContext,
187193
ActionListener<ShardOperationResult> listener
188194
);
189195

196+
/**
197+
* @return an (optional) node-level context for this operation, passed to each call to {@link #shardOperation}.
198+
*/
199+
@Nullable
200+
protected NodeContext createNodeContext() {
201+
return null;
202+
}
203+
190204
/**
191205
* Determines the shards on which this operation will be executed on. The operation is executed once per shard.
192206
*
@@ -415,7 +429,7 @@ private void executeAsDataNode(
415429
) {
416430
assert Transports.assertNotTransportThread("O(#shards) work must always fork to an appropriate executor");
417431
logger.trace("[{}] executing operation on [{}] shards", actionName, shards.size());
418-
432+
final NodeContext nodeContext = createNodeContext();
419433
new CancellableFanOut<ShardRouting, ShardOperationResult, NodeResponse>() {
420434

421435
final ArrayList<ShardOperationResult> results = new ArrayList<>(shards.size());
@@ -424,7 +438,7 @@ private void executeAsDataNode(
424438
@Override
425439
protected void sendItemRequest(ShardRouting shardRouting, ActionListener<ShardOperationResult> listener) {
426440
logger.trace(() -> format("[%s] executing operation for shard [%s]", actionName, shardRouting.shortSummary()));
427-
ActionRunnable.wrap(listener, l -> shardOperation(request, shardRouting, task, l)).run();
441+
ActionRunnable.wrap(listener, l -> shardOperation(request, shardRouting, task, nodeContext, l)).run();
428442
}
429443

430444
@Override
@@ -610,8 +624,7 @@ public void writeTo(StreamOutput out) throws IOException {
610624
}
611625

612626
/**
613-
* Can be used for implementations of {@link #shardOperation(BroadcastRequest, ShardRouting, Task, ActionListener) shardOperation} for
614-
* which there is no shard-level return value.
627+
* Can be used for implementations of {@link #shardOperation} for which there is no shard-level return value.
615628
*/
616629
public static final class EmptyResult implements Writeable {
617630
public static final EmptyResult INSTANCE = new EmptyResult();

server/src/test/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeActionTests.java

Lines changed: 38 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,10 @@
9595
import static org.hamcrest.CoreMatchers.is;
9696
import static org.hamcrest.Matchers.allOf;
9797
import static org.hamcrest.Matchers.anEmptyMap;
98+
import static org.hamcrest.Matchers.equalTo;
9899
import static org.hamcrest.Matchers.greaterThan;
100+
import static org.hamcrest.Matchers.not;
101+
import static org.hamcrest.Matchers.nullValue;
99102
import static org.hamcrest.object.HasToString.hasToString;
100103

101104
public class TransportBroadcastByNodeActionTests extends ESTestCase {
@@ -165,8 +168,9 @@ public ShardResult() {}
165168
public void writeTo(StreamOutput out) throws IOException {}
166169
}
167170

168-
class TestTransportBroadcastByNodeAction extends TransportBroadcastByNodeAction<Request, Response, ShardResult> {
171+
class TestTransportBroadcastByNodeAction extends TransportBroadcastByNodeAction<Request, Response, ShardResult, Integer> {
169172
private final Map<ShardRouting, Object> shards = new HashMap<>();
173+
private Integer expectedNodeContext = null;
170174

171175
TestTransportBroadcastByNodeAction(String actionName) {
172176
super(
@@ -201,7 +205,22 @@ protected Request readRequestFrom(StreamInput in) throws IOException {
201205
}
202206

203207
@Override
204-
protected void shardOperation(Request request, ShardRouting shardRouting, Task task, ActionListener<ShardResult> listener) {
208+
public Integer createNodeContext() {
209+
assertThat(expectedNodeContext, nullValue());
210+
expectedNodeContext = randomInt();
211+
return expectedNodeContext;
212+
}
213+
214+
@Override
215+
protected void shardOperation(
216+
Request request,
217+
ShardRouting shardRouting,
218+
Task task,
219+
Integer nodeContext,
220+
ActionListener<ShardResult> listener
221+
) {
222+
assertThat(expectedNodeContext, not(nullValue()));
223+
assertThat(nodeContext, equalTo(expectedNodeContext));
205224
ActionListener.completeWith(listener, () -> {
206225
if (rarely()) {
207226
shards.put(shardRouting, Boolean.TRUE);
@@ -413,7 +432,7 @@ public void testNoShardOperationsExecutedIfTaskCancelled() throws Exception {
413432
shards.add(shard);
414433
}
415434
}
416-
final TransportBroadcastByNodeAction<Request, Response, ShardResult>.BroadcastByNodeTransportRequestHandler handler =
435+
final TransportBroadcastByNodeAction<Request, Response, ShardResult, Integer>.BroadcastByNodeTransportRequestHandler handler =
417436
action.new BroadcastByNodeTransportRequestHandler();
418437

419438
final PlainActionFuture<TransportResponse> future = new PlainActionFuture<>();
@@ -474,7 +493,7 @@ public void testOperationExecution() throws Exception {
474493
shards.add(shard);
475494
}
476495
}
477-
final TransportBroadcastByNodeAction<Request, Response, ShardResult>.BroadcastByNodeTransportRequestHandler handler =
496+
final TransportBroadcastByNodeAction<Request, Response, ShardResult, Integer>.BroadcastByNodeTransportRequestHandler handler =
478497
action.new BroadcastByNodeTransportRequestHandler();
479498

480499
final PlainActionFuture<TransportResponse> future = new PlainActionFuture<>();
@@ -568,7 +587,7 @@ public void testResultAggregation() throws ExecutionException, InterruptedExcept
568587
}
569588
}
570589
totalSuccessfulShards += shardResults.size();
571-
TransportBroadcastByNodeAction<Request, Response, ShardResult>.NodeResponse nodeResponse = action.new NodeResponse(
590+
TransportBroadcastByNodeAction<Request, Response, ShardResult, Integer>.NodeResponse nodeResponse = action.new NodeResponse(
572591
entry.getKey(), shards.size(), shardResults, exceptions
573592
);
574593
transport.handleResponse(requestId, nodeResponse);
@@ -643,7 +662,13 @@ public void testShardLevelOperationsStopOnCancellation() throws Exception {
643662
int expectedShardId;
644663

645664
@Override
646-
protected void shardOperation(Request request, ShardRouting shardRouting, Task task, ActionListener<ShardResult> listener) {
665+
protected void shardOperation(
666+
Request request,
667+
ShardRouting shardRouting,
668+
Task task,
669+
Integer nodeContext,
670+
ActionListener<ShardResult> listener
671+
) {
647672
// this test runs a node-level operation on three shards, cancelling the task some time during the execution on the second
648673
if (task instanceof CancellableTask cancellableTask) {
649674
assertEquals(expectedShardId++, shardRouting.shardId().id());
@@ -697,7 +722,13 @@ public void testShardResultsReleasedOnCancellation() throws Exception {
697722

698723
action = new TestTransportBroadcastByNodeAction("indices:admin/shard_level_gc_test") {
699724
@Override
700-
protected void shardOperation(Request request, ShardRouting shardRouting, Task task, ActionListener<ShardResult> listener) {
725+
protected void shardOperation(
726+
Request request,
727+
ShardRouting shardRouting,
728+
Task task,
729+
Integer nodeContext,
730+
ActionListener<ShardResult> listener
731+
) {
701732
listeners.add(listener);
702733
}
703734
};

0 commit comments

Comments
 (0)