Skip to content

Commit cfa23de

Browse files
committed
Adding NodeContext to TransportBroadcastByNodeAction (elastic#138057)
(cherry picked from commit 9df1314) # Conflicts: # server/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java
1 parent f2dae0f commit cfa23de

File tree

12 files changed

+106
-26
lines changed

12 files changed

+106
-26
lines changed

modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportDataStreamsStatsAction.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,8 @@
5151
public class TransportDataStreamsStatsAction extends TransportBroadcastByNodeAction<
5252
DataStreamsStatsAction.Request,
5353
DataStreamsStatsAction.Response,
54-
DataStreamsStatsAction.DataStreamShardStats> {
54+
DataStreamsStatsAction.DataStreamShardStats,
55+
Void> {
5556

5657
private final IndicesService indicesService;
5758

@@ -110,6 +111,7 @@ protected void shardOperation(
110111
DataStreamsStatsAction.Request request,
111112
ShardRouting shardRouting,
112113
Task task,
114+
Void nodeContext,
113115
ActionListener<DataStreamsStatsAction.DataStreamShardStats> listener
114116
) {
115117
ActionListener.completeWith(listener, () -> {

server/src/main/java/org/elasticsearch/action/admin/indices/analyze/TransportReloadAnalyzersAction.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,8 @@
5050
public class TransportReloadAnalyzersAction extends TransportBroadcastByNodeAction<
5151
ReloadAnalyzersRequest,
5252
ReloadAnalyzersResponse,
53-
TransportReloadAnalyzersAction.ReloadResult> {
53+
TransportReloadAnalyzersAction.ReloadResult,
54+
Void> {
5455

5556
public static final ActionType<ReloadAnalyzersResponse> TYPE = new ActionType<>("indices:admin/reload_analyzers");
5657
private static final Logger logger = LogManager.getLogger(TransportReloadAnalyzersAction.class);
@@ -122,6 +123,7 @@ protected void shardOperation(
122123
ReloadAnalyzersRequest request,
123124
ShardRouting shardRouting,
124125
Task task,
126+
Void nodeContext,
125127
ActionListener<ReloadResult> listener
126128
) {
127129
ActionListener.completeWith(listener, () -> {

server/src/main/java/org/elasticsearch/action/admin/indices/cache/clear/TransportClearIndicesCacheAction.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,8 @@
3636
public class TransportClearIndicesCacheAction extends TransportBroadcastByNodeAction<
3737
ClearIndicesCacheRequest,
3838
BroadcastResponse,
39-
TransportBroadcastByNodeAction.EmptyResult> {
39+
TransportBroadcastByNodeAction.EmptyResult,
40+
Void> {
4041

4142
public static final ActionType<BroadcastResponse> TYPE = new ActionType<>("indices:admin/cache/clear");
4243
private final IndicesService indicesService;
@@ -90,6 +91,7 @@ protected void shardOperation(
9091
ClearIndicesCacheRequest request,
9192
ShardRouting shardRouting,
9293
Task task,
94+
Void nodeContext,
9395
ActionListener<EmptyResult> listener
9496
) {
9597
ActionListener.completeWith(listener, () -> {

server/src/main/java/org/elasticsearch/action/admin/indices/forcemerge/TransportForceMergeAction.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@
3838
public class TransportForceMergeAction extends TransportBroadcastByNodeAction<
3939
ForceMergeRequest,
4040
BroadcastResponse,
41-
TransportBroadcastByNodeAction.EmptyResult> {
41+
TransportBroadcastByNodeAction.EmptyResult,
42+
Void> {
4243

4344
private final IndicesService indicesService;
4445
private final ThreadPool threadPool;
@@ -89,7 +90,8 @@ protected void shardOperation(
8990
ForceMergeRequest request,
9091
ShardRouting shardRouting,
9192
Task task,
92-
ActionListener<TransportBroadcastByNodeAction.EmptyResult> listener
93+
Void nodeContext,
94+
ActionListener<EmptyResult> listener
9395
) {
9496
assert (task instanceof CancellableTask) == false; // TODO: add cancellation handling here once the task supports it
9597
threadPool.executor(ThreadPool.Names.FORCE_MERGE).execute(ActionRunnable.supply(listener, () -> {

server/src/main/java/org/elasticsearch/action/admin/indices/recovery/TransportRecoveryAction.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040
* Transport action for shard recovery operation. This transport action does not actually
4141
* perform shard recovery, it only reports on recoveries (both active and complete).
4242
*/
43-
public class TransportRecoveryAction extends TransportBroadcastByNodeAction<RecoveryRequest, RecoveryResponse, RecoveryState> {
43+
public class TransportRecoveryAction extends TransportBroadcastByNodeAction<RecoveryRequest, RecoveryResponse, RecoveryState, Void> {
4444

4545
private final IndicesService indicesService;
4646

@@ -99,7 +99,13 @@ protected RecoveryRequest readRequestFrom(StreamInput in) throws IOException {
9999
}
100100

101101
@Override
102-
protected void shardOperation(RecoveryRequest request, ShardRouting shardRouting, Task task, ActionListener<RecoveryState> listener) {
102+
protected void shardOperation(
103+
RecoveryRequest request,
104+
ShardRouting shardRouting,
105+
Task task,
106+
Void nodeContext,
107+
ActionListener<RecoveryState> listener
108+
) {
103109
ActionListener.completeWith(listener, () -> {
104110
assert task instanceof CancellableTask;
105111
IndexService indexService = indicesService.indexServiceSafe(shardRouting.shardId().getIndex());

server/src/main/java/org/elasticsearch/action/admin/indices/segments/TransportIndicesSegmentsAction.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@
3434
public class TransportIndicesSegmentsAction extends TransportBroadcastByNodeAction<
3535
IndicesSegmentsRequest,
3636
IndicesSegmentResponse,
37-
ShardSegments> {
37+
ShardSegments,
38+
Void> {
3839

3940
private final IndicesService indicesService;
4041

@@ -105,6 +106,7 @@ protected void shardOperation(
105106
IndicesSegmentsRequest request,
106107
ShardRouting shardRouting,
107108
Task task,
109+
Void nodeContext,
108110
ActionListener<ShardSegments> listener
109111
) {
110112
ActionListener.completeWith(listener, () -> {

server/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportFieldUsageAction.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,8 @@
3737
public class TransportFieldUsageAction extends TransportBroadcastByNodeAction<
3838
FieldUsageStatsRequest,
3939
FieldUsageStatsResponse,
40-
FieldUsageShardResponse> {
40+
FieldUsageShardResponse,
41+
Void> {
4142

4243
private final IndicesService indicesService;
4344

@@ -90,6 +91,7 @@ protected void shardOperation(
9091
FieldUsageStatsRequest request,
9192
ShardRouting shardRouting,
9293
Task task,
94+
Void nodeContext,
9395
ActionListener<FieldUsageShardResponse> listener
9496
) {
9597
ActionListener.completeWith(listener, () -> {

server/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,11 @@
3535

3636
import java.io.IOException;
3737

38-
public class TransportIndicesStatsAction extends TransportBroadcastByNodeAction<IndicesStatsRequest, IndicesStatsResponse, ShardStats> {
38+
public class TransportIndicesStatsAction extends TransportBroadcastByNodeAction<
39+
IndicesStatsRequest,
40+
IndicesStatsResponse,
41+
ShardStats,
42+
Void> {
3943

4044
private final IndicesService indicesService;
4145

@@ -105,7 +109,13 @@ protected IndicesStatsRequest readRequestFrom(StreamInput in) throws IOException
105109
}
106110

107111
@Override
108-
protected void shardOperation(IndicesStatsRequest request, ShardRouting shardRouting, Task task, ActionListener<ShardStats> listener) {
112+
protected void shardOperation(
113+
IndicesStatsRequest request,
114+
ShardRouting shardRouting,
115+
Task task,
116+
Void nodeContext,
117+
ActionListener<ShardStats> listener
118+
) {
109119
ActionListener.completeWith(listener, () -> {
110120
assert task instanceof CancellableTask;
111121
IndexService indexService = indicesService.indexServiceSafe(shardRouting.shardId().getIndex());

server/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.elasticsearch.common.io.stream.StreamOutput;
4040
import org.elasticsearch.common.io.stream.Writeable;
4141
import org.elasticsearch.common.util.concurrent.EsExecutors;
42+
import org.elasticsearch.core.Nullable;
4243
import org.elasticsearch.tasks.Task;
4344
import org.elasticsearch.tasks.TaskId;
4445
import org.elasticsearch.transport.TransportChannel;
@@ -69,11 +70,14 @@
6970
* @param <Request> the underlying client request
7071
* @param <Response> the response to the client request
7172
* @param <ShardOperationResult> per-shard operation results
73+
* @param <NodeContext> an (optional) node context created by {@link #createNodeContext} on each node and passed to each call
74+
* to {@link #shardOperation}
7275
*/
7376
public abstract class TransportBroadcastByNodeAction<
7477
Request extends BroadcastRequest<Request>,
7578
Response extends BaseBroadcastResponse,
76-
ShardOperationResult extends Writeable> extends HandledTransportAction<Request, Response> {
79+
ShardOperationResult extends Writeable,
80+
NodeContext> extends HandledTransportAction<Request, Response> {
7781

7882
private static final Logger logger = LogManager.getLogger(TransportBroadcastByNodeAction.class);
7983

@@ -177,15 +181,25 @@ Response newResponse(
177181
* @param request the node-level request
178182
* @param shardRouting the shard on which to execute the operation
179183
* @param task the task for this node-level request
184+
* @param nodeContext the context created by {{@link #createNodeContext()}}
180185
* @param listener the listener to notify with the result of the shard-level operation
181186
*/
182187
protected abstract void shardOperation(
183188
Request request,
184189
ShardRouting shardRouting,
185190
Task task,
191+
@Nullable NodeContext nodeContext,
186192
ActionListener<ShardOperationResult> listener
187193
);
188194

195+
/**
196+
* @return an (optional) node-level context for this operation, passed to each call to {@link #shardOperation}.
197+
*/
198+
@Nullable
199+
protected NodeContext createNodeContext() {
200+
return null;
201+
}
202+
189203
/**
190204
* Determines the shards on which this operation will be executed on. The operation is executed once per shard.
191205
*
@@ -412,7 +426,7 @@ private void executeAsDataNode(
412426
) {
413427
assert Transports.assertNotTransportThread("O(#shards) work must always fork to an appropriate executor");
414428
logger.trace("[{}] executing operation on [{}] shards", actionName, shards.size());
415-
429+
final NodeContext nodeContext = createNodeContext();
416430
new CancellableFanOut<ShardRouting, ShardOperationResult, NodeResponse>() {
417431

418432
final ArrayList<ShardOperationResult> results = new ArrayList<>(shards.size());
@@ -421,7 +435,7 @@ private void executeAsDataNode(
421435
@Override
422436
protected void sendItemRequest(ShardRouting shardRouting, ActionListener<ShardOperationResult> listener) {
423437
logger.trace(() -> format("[%s] executing operation for shard [%s]", actionName, shardRouting.shortSummary()));
424-
ActionRunnable.wrap(listener, l -> shardOperation(request, shardRouting, task, l)).run();
438+
ActionRunnable.wrap(listener, l -> shardOperation(request, shardRouting, task, nodeContext, l)).run();
425439
}
426440

427441
@Override
@@ -608,8 +622,7 @@ public void writeTo(StreamOutput out) throws IOException {
608622
}
609623

610624
/**
611-
* Can be used for implementations of {@link #shardOperation(BroadcastRequest, ShardRouting, Task, ActionListener) shardOperation} for
612-
* which there is no shard-level return value.
625+
* Can be used for implementations of {@link #shardOperation} for which there is no shard-level return value.
613626
*/
614627
public static final class EmptyResult implements Writeable {
615628
public static EmptyResult INSTANCE = new EmptyResult();

server/src/test/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeActionTests.java

Lines changed: 38 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,10 @@
9393
import static org.hamcrest.CoreMatchers.is;
9494
import static org.hamcrest.Matchers.allOf;
9595
import static org.hamcrest.Matchers.anEmptyMap;
96+
import static org.hamcrest.Matchers.equalTo;
9697
import static org.hamcrest.Matchers.greaterThan;
98+
import static org.hamcrest.Matchers.not;
99+
import static org.hamcrest.Matchers.nullValue;
97100
import static org.hamcrest.object.HasToString.hasToString;
98101

99102
public class TransportBroadcastByNodeActionTests extends ESTestCase {
@@ -163,8 +166,9 @@ public ShardResult() {}
163166
public void writeTo(StreamOutput out) throws IOException {}
164167
}
165168

166-
class TestTransportBroadcastByNodeAction extends TransportBroadcastByNodeAction<Request, Response, ShardResult> {
169+
class TestTransportBroadcastByNodeAction extends TransportBroadcastByNodeAction<Request, Response, ShardResult, Integer> {
167170
private final Map<ShardRouting, Object> shards = new HashMap<>();
171+
private Integer expectedNodeContext = null;
168172

169173
TestTransportBroadcastByNodeAction(String actionName) {
170174
super(
@@ -199,7 +203,22 @@ protected Request readRequestFrom(StreamInput in) throws IOException {
199203
}
200204

201205
@Override
202-
protected void shardOperation(Request request, ShardRouting shardRouting, Task task, ActionListener<ShardResult> listener) {
206+
public Integer createNodeContext() {
207+
assertThat(expectedNodeContext, nullValue());
208+
expectedNodeContext = randomInt();
209+
return expectedNodeContext;
210+
}
211+
212+
@Override
213+
protected void shardOperation(
214+
Request request,
215+
ShardRouting shardRouting,
216+
Task task,
217+
Integer nodeContext,
218+
ActionListener<ShardResult> listener
219+
) {
220+
assertThat(expectedNodeContext, not(nullValue()));
221+
assertThat(nodeContext, equalTo(expectedNodeContext));
203222
ActionListener.completeWith(listener, () -> {
204223
if (rarely()) {
205224
shards.put(shardRouting, Boolean.TRUE);
@@ -411,7 +430,7 @@ public void testNoShardOperationsExecutedIfTaskCancelled() throws Exception {
411430
shards.add(shard);
412431
}
413432
}
414-
final TransportBroadcastByNodeAction<Request, Response, ShardResult>.BroadcastByNodeTransportRequestHandler handler =
433+
final TransportBroadcastByNodeAction<Request, Response, ShardResult, Integer>.BroadcastByNodeTransportRequestHandler handler =
415434
action.new BroadcastByNodeTransportRequestHandler();
416435

417436
final PlainActionFuture<TransportResponse> future = new PlainActionFuture<>();
@@ -472,7 +491,7 @@ public void testOperationExecution() throws Exception {
472491
shards.add(shard);
473492
}
474493
}
475-
final TransportBroadcastByNodeAction<Request, Response, ShardResult>.BroadcastByNodeTransportRequestHandler handler =
494+
final TransportBroadcastByNodeAction<Request, Response, ShardResult, Integer>.BroadcastByNodeTransportRequestHandler handler =
476495
action.new BroadcastByNodeTransportRequestHandler();
477496

478497
final PlainActionFuture<TransportResponse> future = new PlainActionFuture<>();
@@ -566,7 +585,7 @@ public void testResultAggregation() throws ExecutionException, InterruptedExcept
566585
}
567586
}
568587
totalSuccessfulShards += shardResults.size();
569-
TransportBroadcastByNodeAction<Request, Response, ShardResult>.NodeResponse nodeResponse = action.new NodeResponse(
588+
TransportBroadcastByNodeAction<Request, Response, ShardResult, Integer>.NodeResponse nodeResponse = action.new NodeResponse(
570589
entry.getKey(), shards.size(), shardResults, exceptions
571590
);
572591
transport.handleResponse(requestId, nodeResponse);
@@ -641,7 +660,13 @@ public void testShardLevelOperationsStopOnCancellation() throws Exception {
641660
int expectedShardId;
642661

643662
@Override
644-
protected void shardOperation(Request request, ShardRouting shardRouting, Task task, ActionListener<ShardResult> listener) {
663+
protected void shardOperation(
664+
Request request,
665+
ShardRouting shardRouting,
666+
Task task,
667+
Integer nodeContext,
668+
ActionListener<ShardResult> listener
669+
) {
645670
// this test runs a node-level operation on three shards, cancelling the task some time during the execution on the second
646671
if (task instanceof CancellableTask cancellableTask) {
647672
assertEquals(expectedShardId++, shardRouting.shardId().id());
@@ -695,7 +720,13 @@ public void testShardResultsReleasedOnCancellation() throws Exception {
695720

696721
action = new TestTransportBroadcastByNodeAction("indices:admin/shard_level_gc_test") {
697722
@Override
698-
protected void shardOperation(Request request, ShardRouting shardRouting, Task task, ActionListener<ShardResult> listener) {
723+
protected void shardOperation(
724+
Request request,
725+
ShardRouting shardRouting,
726+
Task task,
727+
Integer nodeContext,
728+
ActionListener<ShardResult> listener
729+
) {
699730
listeners.add(listener);
700731
}
701732
};

0 commit comments

Comments
 (0)