Skip to content

Commit f8f18da

Browse files
committed
Adding nodeContext to TransportBroadcastByNodeAction
1 parent 71579e0 commit f8f18da

File tree

12 files changed

+105
-32
lines changed

12 files changed

+105
-32
lines changed

modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportDataStreamsStatsAction.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,8 @@
5252
public class TransportDataStreamsStatsAction extends TransportBroadcastByNodeAction<
5353
DataStreamsStatsAction.Request,
5454
DataStreamsStatsAction.Response,
55-
DataStreamsStatsAction.DataStreamShardStats> {
55+
DataStreamsStatsAction.DataStreamShardStats,
56+
Void> {
5657

5758
private final IndicesService indicesService;
5859
private final ProjectResolver projectResolver;
@@ -114,7 +115,8 @@ protected void shardOperation(
114115
DataStreamsStatsAction.Request request,
115116
ShardRouting shardRouting,
116117
Task task,
117-
ActionListener<DataStreamsStatsAction.DataStreamShardStats> listener
118+
ActionListener<DataStreamsStatsAction.DataStreamShardStats> listener,
119+
Void nodeContext
118120
) {
119121
ActionListener.completeWith(listener, () -> {
120122
assert shardRouting.isSearchable() : "shard routing is not searchable: " + shardRouting;

server/src/main/java/org/elasticsearch/action/admin/indices/analyze/TransportReloadAnalyzersAction.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,8 @@
5050
public class TransportReloadAnalyzersAction extends TransportBroadcastByNodeAction<
5151
ReloadAnalyzersRequest,
5252
ReloadAnalyzersResponse,
53-
TransportReloadAnalyzersAction.ReloadResult> {
53+
TransportReloadAnalyzersAction.ReloadResult,
54+
Void> {
5455

5556
public static final ActionType<ReloadAnalyzersResponse> TYPE = new ActionType<>("indices:admin/reload_analyzers");
5657
private static final Logger logger = LogManager.getLogger(TransportReloadAnalyzersAction.class);
@@ -122,7 +123,8 @@ protected void shardOperation(
122123
ReloadAnalyzersRequest request,
123124
ShardRouting shardRouting,
124125
Task task,
125-
ActionListener<ReloadResult> listener
126+
ActionListener<ReloadResult> listener,
127+
Void nodeContext
126128
) {
127129
ActionListener.completeWith(listener, () -> {
128130
logger.info("reloading analyzers for index shard " + shardRouting);

server/src/main/java/org/elasticsearch/action/admin/indices/cache/clear/TransportClearIndicesCacheAction.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,8 @@
3737
public class TransportClearIndicesCacheAction extends TransportBroadcastByNodeAction<
3838
ClearIndicesCacheRequest,
3939
BroadcastResponse,
40-
TransportBroadcastByNodeAction.EmptyResult> {
40+
TransportBroadcastByNodeAction.EmptyResult,
41+
Void> {
4142

4243
public static final ActionType<BroadcastResponse> TYPE = new ActionType<>("indices:admin/cache/clear");
4344
private final IndicesService indicesService;
@@ -94,7 +95,8 @@ protected void shardOperation(
9495
ClearIndicesCacheRequest request,
9596
ShardRouting shardRouting,
9697
Task task,
97-
ActionListener<EmptyResult> listener
98+
ActionListener<EmptyResult> listener,
99+
Void nodeContext
98100
) {
99101
ActionListener.completeWith(listener, () -> {
100102
indicesService.clearIndexShardCache(

server/src/main/java/org/elasticsearch/action/admin/indices/forcemerge/TransportForceMergeAction.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@
4040
public class TransportForceMergeAction extends TransportBroadcastByNodeAction<
4141
ForceMergeRequest,
4242
BroadcastResponse,
43-
TransportBroadcastByNodeAction.EmptyResult> {
43+
TransportBroadcastByNodeAction.EmptyResult,
44+
Void> {
4445

4546
private final IndicesService indicesService;
4647
private final ThreadPool threadPool;
@@ -94,7 +95,8 @@ protected void shardOperation(
9495
ForceMergeRequest request,
9596
ShardRouting shardRouting,
9697
Task task,
97-
ActionListener<TransportBroadcastByNodeAction.EmptyResult> listener
98+
ActionListener<TransportBroadcastByNodeAction.EmptyResult> listener,
99+
Void nodeContext
98100
) {
99101
assert (task instanceof CancellableTask) == false; // TODO: add cancellation handling here once the task supports it
100102
SubscribableListener.<IndexShard>newForked(l -> {

server/src/main/java/org/elasticsearch/action/admin/indices/recovery/TransportRecoveryAction.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
* Transport action for shard recovery operation. This transport action does not actually
4242
* perform shard recovery, it only reports on recoveries (both active and complete).
4343
*/
44-
public class TransportRecoveryAction extends TransportBroadcastByNodeAction<RecoveryRequest, RecoveryResponse, RecoveryState> {
44+
public class TransportRecoveryAction extends TransportBroadcastByNodeAction<RecoveryRequest, RecoveryResponse, RecoveryState, Void> {
4545

4646
private final IndicesService indicesService;
4747
private final ProjectResolver projectResolver;
@@ -103,7 +103,13 @@ protected RecoveryRequest readRequestFrom(StreamInput in) throws IOException {
103103
}
104104

105105
@Override
106-
protected void shardOperation(RecoveryRequest request, ShardRouting shardRouting, Task task, ActionListener<RecoveryState> listener) {
106+
protected void shardOperation(
107+
RecoveryRequest request,
108+
ShardRouting shardRouting,
109+
Task task,
110+
ActionListener<RecoveryState> listener,
111+
Void nodeContext
112+
) {
107113
ActionListener.completeWith(listener, () -> {
108114
assert task instanceof CancellableTask;
109115
IndexService indexService = indicesService.indexServiceSafe(shardRouting.shardId().getIndex());

server/src/main/java/org/elasticsearch/action/admin/indices/segments/TransportIndicesSegmentsAction.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@
3535
public class TransportIndicesSegmentsAction extends TransportBroadcastByNodeAction<
3636
IndicesSegmentsRequest,
3737
IndicesSegmentResponse,
38-
ShardSegments> {
38+
ShardSegments,
39+
Void> {
3940

4041
private final IndicesService indicesService;
4142
private final ProjectResolver projectResolver;
@@ -109,7 +110,8 @@ protected void shardOperation(
109110
IndicesSegmentsRequest request,
110111
ShardRouting shardRouting,
111112
Task task,
112-
ActionListener<ShardSegments> listener
113+
ActionListener<ShardSegments> listener,
114+
Void nodeContext
113115
) {
114116
ActionListener.completeWith(listener, () -> {
115117
assert task instanceof CancellableTask;

server/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportFieldUsageAction.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@
3838
public class TransportFieldUsageAction extends TransportBroadcastByNodeAction<
3939
FieldUsageStatsRequest,
4040
FieldUsageStatsResponse,
41-
FieldUsageShardResponse> {
41+
FieldUsageShardResponse,
42+
Void> {
4243

4344
private final IndicesService indicesService;
4445
private final ProjectResolver projectResolver;
@@ -94,7 +95,8 @@ protected void shardOperation(
9495
FieldUsageStatsRequest request,
9596
ShardRouting shardRouting,
9697
Task task,
97-
ActionListener<FieldUsageShardResponse> listener
98+
ActionListener<FieldUsageShardResponse> listener,
99+
Void nodeContext
98100
) {
99101
ActionListener.completeWith(listener, () -> {
100102
final ShardId shardId = shardRouting.shardId();

server/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,11 @@
3636

3737
import java.io.IOException;
3838

39-
public class TransportIndicesStatsAction extends TransportBroadcastByNodeAction<IndicesStatsRequest, IndicesStatsResponse, ShardStats> {
39+
public class TransportIndicesStatsAction extends TransportBroadcastByNodeAction<
40+
IndicesStatsRequest,
41+
IndicesStatsResponse,
42+
ShardStats,
43+
Void> {
4044

4145
private final IndicesService indicesService;
4246
private final ProjectResolver projectResolver;
@@ -109,7 +113,13 @@ protected IndicesStatsRequest readRequestFrom(StreamInput in) throws IOException
109113
}
110114

111115
@Override
112-
protected void shardOperation(IndicesStatsRequest request, ShardRouting shardRouting, Task task, ActionListener<ShardStats> listener) {
116+
protected void shardOperation(
117+
IndicesStatsRequest request,
118+
ShardRouting shardRouting,
119+
Task task,
120+
ActionListener<ShardStats> listener,
121+
Void nodeContext
122+
) {
113123
ActionListener.completeWith(listener, () -> {
114124
assert task instanceof CancellableTask;
115125
IndexService indexService = indicesService.indexServiceSafe(shardRouting.shardId().getIndex());

server/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,8 @@
7474
public abstract class TransportBroadcastByNodeAction<
7575
Request extends BroadcastRequest<Request>,
7676
Response extends BaseBroadcastResponse,
77-
ShardOperationResult extends Writeable> extends HandledTransportAction<Request, Response> {
77+
ShardOperationResult extends Writeable,
78+
NodeContext> extends HandledTransportAction<Request, Response> {
7879

7980
private static final Logger logger = LogManager.getLogger(TransportBroadcastByNodeAction.class);
8081

@@ -184,9 +185,14 @@ protected abstract void shardOperation(
184185
Request request,
185186
ShardRouting shardRouting,
186187
Task task,
187-
ActionListener<ShardOperationResult> listener
188+
ActionListener<ShardOperationResult> listener,
189+
NodeContext nodeContext
188190
);
189191

192+
protected NodeContext createNodeContext() {
193+
return null;
194+
}
195+
190196
/**
191197
* Determines the shards on which this operation will be executed on. The operation is executed once per shard.
192198
*
@@ -415,7 +421,7 @@ private void executeAsDataNode(
415421
) {
416422
assert Transports.assertNotTransportThread("O(#shards) work must always fork to an appropriate executor");
417423
logger.trace("[{}] executing operation on [{}] shards", actionName, shards.size());
418-
424+
NodeContext nodeContext = createNodeContext();
419425
new CancellableFanOut<ShardRouting, ShardOperationResult, NodeResponse>() {
420426

421427
final ArrayList<ShardOperationResult> results = new ArrayList<>(shards.size());
@@ -424,7 +430,7 @@ private void executeAsDataNode(
424430
@Override
425431
protected void sendItemRequest(ShardRouting shardRouting, ActionListener<ShardOperationResult> listener) {
426432
logger.trace(() -> format("[%s] executing operation for shard [%s]", actionName, shardRouting.shortSummary()));
427-
ActionRunnable.wrap(listener, l -> shardOperation(request, shardRouting, task, l)).run();
433+
ActionRunnable.wrap(listener, l -> shardOperation(request, shardRouting, task, l, nodeContext)).run();
428434
}
429435

430436
@Override
@@ -610,7 +616,7 @@ public void writeTo(StreamOutput out) throws IOException {
610616
}
611617

612618
/**
613-
* Can be used for implementations of {@link #shardOperation(BroadcastRequest, ShardRouting, Task, ActionListener) shardOperation} for
619+
* Can be used for implementations of {@link #shardOperation(BroadcastRequest, ShardRouting, Task, ActionListener, NodeContext) shardOperation} for
614620
* which there is no shard-level return value.
615621
*/
616622
public static final class EmptyResult implements Writeable {

server/src/test/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeActionTests.java

Lines changed: 38 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,10 @@
9595
import static org.hamcrest.CoreMatchers.is;
9696
import static org.hamcrest.Matchers.allOf;
9797
import static org.hamcrest.Matchers.anEmptyMap;
98+
import static org.hamcrest.Matchers.equalTo;
9899
import static org.hamcrest.Matchers.greaterThan;
100+
import static org.hamcrest.Matchers.not;
101+
import static org.hamcrest.Matchers.nullValue;
99102
import static org.hamcrest.object.HasToString.hasToString;
100103

101104
public class TransportBroadcastByNodeActionTests extends ESTestCase {
@@ -165,8 +168,9 @@ public ShardResult() {}
165168
public void writeTo(StreamOutput out) throws IOException {}
166169
}
167170

168-
class TestTransportBroadcastByNodeAction extends TransportBroadcastByNodeAction<Request, Response, ShardResult> {
171+
class TestTransportBroadcastByNodeAction extends TransportBroadcastByNodeAction<Request, Response, ShardResult, Integer> {
169172
private final Map<ShardRouting, Object> shards = new HashMap<>();
173+
private Integer expectedNodeContext = null;
170174

171175
TestTransportBroadcastByNodeAction(String actionName) {
172176
super(
@@ -201,7 +205,22 @@ protected Request readRequestFrom(StreamInput in) throws IOException {
201205
}
202206

203207
@Override
204-
protected void shardOperation(Request request, ShardRouting shardRouting, Task task, ActionListener<ShardResult> listener) {
208+
public Integer createNodeContext() {
209+
assertThat(expectedNodeContext, nullValue());
210+
expectedNodeContext = randomInt();
211+
return expectedNodeContext;
212+
}
213+
214+
@Override
215+
protected void shardOperation(
216+
Request request,
217+
ShardRouting shardRouting,
218+
Task task,
219+
ActionListener<ShardResult> listener,
220+
Integer nodeContext
221+
) {
222+
assertThat(expectedNodeContext, not(nullValue()));
223+
assertThat(nodeContext, equalTo(expectedNodeContext));
205224
ActionListener.completeWith(listener, () -> {
206225
if (rarely()) {
207226
shards.put(shardRouting, Boolean.TRUE);
@@ -413,7 +432,7 @@ public void testNoShardOperationsExecutedIfTaskCancelled() throws Exception {
413432
shards.add(shard);
414433
}
415434
}
416-
final TransportBroadcastByNodeAction<Request, Response, ShardResult>.BroadcastByNodeTransportRequestHandler handler =
435+
final TransportBroadcastByNodeAction<Request, Response, ShardResult, Integer>.BroadcastByNodeTransportRequestHandler handler =
417436
action.new BroadcastByNodeTransportRequestHandler();
418437

419438
final PlainActionFuture<TransportResponse> future = new PlainActionFuture<>();
@@ -474,7 +493,7 @@ public void testOperationExecution() throws Exception {
474493
shards.add(shard);
475494
}
476495
}
477-
final TransportBroadcastByNodeAction<Request, Response, ShardResult>.BroadcastByNodeTransportRequestHandler handler =
496+
final TransportBroadcastByNodeAction<Request, Response, ShardResult, Integer>.BroadcastByNodeTransportRequestHandler handler =
478497
action.new BroadcastByNodeTransportRequestHandler();
479498

480499
final PlainActionFuture<TransportResponse> future = new PlainActionFuture<>();
@@ -568,7 +587,7 @@ public void testResultAggregation() throws ExecutionException, InterruptedExcept
568587
}
569588
}
570589
totalSuccessfulShards += shardResults.size();
571-
TransportBroadcastByNodeAction<Request, Response, ShardResult>.NodeResponse nodeResponse = action.new NodeResponse(
590+
TransportBroadcastByNodeAction<Request, Response, ShardResult, Integer>.NodeResponse nodeResponse = action.new NodeResponse(
572591
entry.getKey(), shards.size(), shardResults, exceptions
573592
);
574593
transport.handleResponse(requestId, nodeResponse);
@@ -643,7 +662,13 @@ public void testShardLevelOperationsStopOnCancellation() throws Exception {
643662
int expectedShardId;
644663

645664
@Override
646-
protected void shardOperation(Request request, ShardRouting shardRouting, Task task, ActionListener<ShardResult> listener) {
665+
protected void shardOperation(
666+
Request request,
667+
ShardRouting shardRouting,
668+
Task task,
669+
ActionListener<ShardResult> listener,
670+
Integer nodeContext
671+
) {
647672
// this test runs a node-level operation on three shards, cancelling the task some time during the execution on the second
648673
if (task instanceof CancellableTask cancellableTask) {
649674
assertEquals(expectedShardId++, shardRouting.shardId().id());
@@ -697,7 +722,13 @@ public void testShardResultsReleasedOnCancellation() throws Exception {
697722

698723
action = new TestTransportBroadcastByNodeAction("indices:admin/shard_level_gc_test") {
699724
@Override
700-
protected void shardOperation(Request request, ShardRouting shardRouting, Task task, ActionListener<ShardResult> listener) {
725+
protected void shardOperation(
726+
Request request,
727+
ShardRouting shardRouting,
728+
Task task,
729+
ActionListener<ShardResult> listener,
730+
Integer nodeContext
731+
) {
701732
listeners.add(listener);
702733
}
703734
};

0 commit comments

Comments
 (0)