8
8
9
9
package org .elasticsearch .action .support .nodes ;
10
10
11
+ import org .elasticsearch .ElasticsearchException ;
11
12
import org .elasticsearch .Version ;
12
13
import org .elasticsearch .action .FailedNodeException ;
13
14
import org .elasticsearch .action .support .ActionFilters ;
14
- import org .elasticsearch .action .support .NodeResponseTracker ;
15
15
import org .elasticsearch .action .support .PlainActionFuture ;
16
16
import org .elasticsearch .action .support .broadcast .node .TransportBroadcastByNodeActionTests ;
17
17
import org .elasticsearch .cluster .ClusterName ;
20
20
import org .elasticsearch .cluster .node .DiscoveryNodeRole ;
21
21
import org .elasticsearch .cluster .node .DiscoveryNodes ;
22
22
import org .elasticsearch .cluster .service .ClusterService ;
23
+ import org .elasticsearch .common .Randomness ;
23
24
import org .elasticsearch .common .io .stream .StreamInput ;
24
25
import org .elasticsearch .common .io .stream .StreamOutput ;
25
26
import org .elasticsearch .common .io .stream .Writeable ;
28
29
import org .elasticsearch .tasks .TaskCancelHelper ;
29
30
import org .elasticsearch .tasks .TaskCancelledException ;
30
31
import org .elasticsearch .test .ESTestCase ;
32
+ import org .elasticsearch .test .ReachabilityChecker ;
31
33
import org .elasticsearch .test .transport .CapturingTransport ;
32
34
import org .elasticsearch .threadpool .TestThreadPool ;
33
35
import org .elasticsearch .threadpool .ThreadPool ;
34
36
import org .elasticsearch .transport .TransportRequest ;
35
37
import org .elasticsearch .transport .TransportService ;
38
+ import org .hamcrest .Matchers ;
36
39
import org .junit .After ;
37
40
import org .junit .AfterClass ;
38
41
import org .junit .Before ;
39
42
import org .junit .BeforeClass ;
40
43
41
44
import java .io .IOException ;
42
45
import java .util .ArrayList ;
46
+ import java .util .Arrays ;
43
47
import java .util .Collections ;
44
48
import java .util .HashMap ;
45
49
import java .util .HashSet ;
46
50
import java .util .List ;
47
51
import java .util .Map ;
48
52
import java .util .Set ;
49
- import java .util .concurrent .ExecutionException ;
50
53
import java .util .concurrent .TimeUnit ;
51
- import java .util .function .Supplier ;
52
54
53
55
import static java .util .Collections .emptyMap ;
54
56
import static org .elasticsearch .test .ClusterServiceUtils .createClusterService ;
55
57
import static org .elasticsearch .test .ClusterServiceUtils .setState ;
58
+ import static org .hamcrest .Matchers .greaterThan ;
56
59
import static org .mockito .Mockito .mock ;
57
60
58
61
public class TransportNodesActionTests extends ESTestCase {
@@ -63,11 +66,10 @@ public class TransportNodesActionTests extends ESTestCase {
63
66
private CapturingTransport transport ;
64
67
private TransportService transportService ;
65
68
66
- public void testRequestIsSentToEachNode () throws Exception {
69
+ public void testRequestIsSentToEachNode () {
67
70
TransportNodesAction <TestNodesRequest , TestNodesResponse , TestNodeRequest , TestNodeResponse > action = getTestTransportNodesAction ();
68
71
TestNodesRequest request = new TestNodesRequest ();
69
- PlainActionFuture <TestNodesResponse > listener = new PlainActionFuture <>();
70
- action .new AsyncAction (null , request , listener ).start ();
72
+ action .execute (null , request , new PlainActionFuture <>());
71
73
Map <String , List <CapturingTransport .CapturedRequest >> capturedRequests = transport .getCapturedRequestsByTargetNodeAndClear ();
72
74
int numNodes = clusterService .state ().getNodes ().getSize ();
73
75
// check a request was sent to the right number of nodes
@@ -87,57 +89,18 @@ public void testNodesSelectors() {
87
89
String nodeId = randomFrom (nodeIds );
88
90
nodeSelectors .add (nodeId );
89
91
}
90
- String [] finalNodesIds = nodeSelectors .toArray (new String [nodeSelectors . size ()] );
92
+ String [] finalNodesIds = nodeSelectors .toArray (String []:: new );
91
93
TestNodesRequest request = new TestNodesRequest (finalNodesIds );
92
- action .new AsyncAction (null , request , new PlainActionFuture <>()). start ( );
94
+ action .execute (null , request , new PlainActionFuture <>());
93
95
Map <String , List <CapturingTransport .CapturedRequest >> capturedRequests = transport .getCapturedRequestsByTargetNodeAndClear ();
94
96
assertEquals (clusterService .state ().nodes ().resolveNodes (finalNodesIds ).length , capturedRequests .size ());
95
97
}
96
98
97
- public void testNewResponseNullArray () throws Exception {
98
- TransportNodesAction <TestNodesRequest , TestNodesResponse , TestNodeRequest , TestNodeResponse > action = getTestTransportNodesAction ();
99
- final PlainActionFuture <TestNodesResponse > future = new PlainActionFuture <>();
100
- action .newResponse (new Task (1 , "test" , "test" , "" , null , emptyMap ()), new TestNodesRequest (), null , future );
101
- expectThrows (NullPointerException .class , future ::actionGet );
102
- }
103
-
104
- public void testNewResponse () throws Exception {
105
- TestTransportNodesAction action = getTestTransportNodesAction ();
106
- TestNodesRequest request = new TestNodesRequest ();
107
- List <TestNodeResponse > expectedNodeResponses = mockList (TestNodeResponse ::new , randomIntBetween (0 , 2 ));
108
- expectedNodeResponses .add (new TestNodeResponse ());
109
- List <FailedNodeException > failures = mockList (
110
- () -> new FailedNodeException (
111
- randomAlphaOfLength (8 ),
112
- randomAlphaOfLength (8 ),
113
- new IllegalStateException (randomAlphaOfLength (8 ))
114
- ),
115
- randomIntBetween (0 , 2 )
116
- );
117
-
118
- List <Object > allResponses = new ArrayList <>(expectedNodeResponses );
119
- allResponses .addAll (failures );
120
-
121
- Collections .shuffle (allResponses , random ());
122
-
123
- NodeResponseTracker nodeResponseCollector = new NodeResponseTracker (allResponses );
124
-
125
- final PlainActionFuture <TestNodesResponse > future = new PlainActionFuture <>();
126
- action .newResponse (new Task (1 , "test" , "test" , "" , null , emptyMap ()), request , nodeResponseCollector , future );
127
- TestNodesResponse response = future .actionGet ();
128
-
129
- assertSame (request , response .request );
130
- // note: I shuffled the overall list, so it's not possible to guarantee that it's in the right order
131
- assertTrue (expectedNodeResponses .containsAll (response .getNodes ()));
132
- assertTrue (failures .containsAll (response .failures ()));
133
- }
134
-
135
- public void testCustomResolving () throws Exception {
99
+ public void testCustomResolving () {
136
100
TransportNodesAction <TestNodesRequest , TestNodesResponse , TestNodeRequest , TestNodeResponse > action =
137
101
getDataNodesOnlyTransportNodesAction (transportService );
138
102
TestNodesRequest request = new TestNodesRequest (randomBoolean () ? null : generateRandomStringArray (10 , 5 , false , true ));
139
- PlainActionFuture <TestNodesResponse > listener = new PlainActionFuture <>();
140
- action .new AsyncAction (null , request , listener ).start ();
103
+ action .execute (null , request , new PlainActionFuture <>());
141
104
Map <String , List <CapturingTransport .CapturedRequest >> capturedRequests = transport .getCapturedRequestsByTargetNodeAndClear ();
142
105
// check requests were only sent to data nodes
143
106
for (String nodeTarget : capturedRequests .keySet ()) {
@@ -146,47 +109,98 @@ public void testCustomResolving() throws Exception {
146
109
assertEquals (clusterService .state ().nodes ().getDataNodes ().size (), capturedRequests .size ());
147
110
}
148
111
149
- public void testTaskCancellation () {
150
- TransportNodesAction <TestNodesRequest , TestNodesResponse , TestNodeRequest , TestNodeResponse > action = getTestTransportNodesAction ();
151
- List <String > nodeIds = new ArrayList <>();
152
- for (DiscoveryNode node : clusterService .state ().nodes ()) {
153
- nodeIds .add (node .getId ());
154
- }
112
+ public void testResponseAggregation () {
113
+ final TestTransportNodesAction action = getTestTransportNodesAction ();
155
114
156
- TestNodesRequest request = new TestNodesRequest (nodeIds .toArray (new String [0 ]));
157
- PlainActionFuture <TestNodesResponse > listener = new PlainActionFuture <>();
158
- CancellableTask cancellableTask = new CancellableTask (randomLong (), "transport" , "action" , "" , null , emptyMap ());
159
- TransportNodesAction <TestNodesRequest , TestNodesResponse , TestNodeRequest , TestNodeResponse >.AsyncAction asyncAction =
160
- action .new AsyncAction (cancellableTask , request , listener );
161
- asyncAction .start ();
162
- Map <String , List <CapturingTransport .CapturedRequest >> capturedRequests = transport .getCapturedRequestsByTargetNodeAndClear ();
163
- int cancelAt = randomIntBetween (0 , Math .max (0 , capturedRequests .values ().size () - 2 ));
164
- int requestCount = 0 ;
165
- for (List <CapturingTransport .CapturedRequest > requests : capturedRequests .values ()) {
166
- if (requestCount == cancelAt ) {
167
- TaskCancelHelper .cancel (cancellableTask , "simulated" );
168
- }
169
- for (CapturingTransport .CapturedRequest capturedRequest : requests ) {
115
+ final PlainActionFuture <TestNodesResponse > listener = new PlainActionFuture <>();
116
+ action .execute (null , new TestNodesRequest (), listener );
117
+ assertFalse (listener .isDone ());
118
+
119
+ final Set <String > failedNodeIds = new HashSet <>();
120
+ final Set <DiscoveryNode > successfulNodes = new HashSet <>();
121
+
122
+ for (CapturingTransport .CapturedRequest capturedRequest : transport .getCapturedRequestsAndClear ()) {
123
+ if (randomBoolean ()) {
124
+ successfulNodes .add (capturedRequest .node ());
125
+ transport .handleResponse (capturedRequest .requestId (), new TestNodeResponse (capturedRequest .node ()));
126
+ } else {
127
+ failedNodeIds .add (capturedRequest .node ().getId ());
170
128
if (randomBoolean ()) {
171
- transport .handleResponse (capturedRequest .requestId (), new TestNodeResponse ( capturedRequest . node () ));
129
+ transport .handleRemoteError (capturedRequest .requestId (), new ElasticsearchException ( "simulated" ));
172
130
} else {
173
- transport .handleRemoteError (capturedRequest .requestId (), new TaskCancelledException ("simulated" ));
131
+ transport .handleLocalError (capturedRequest .requestId (), new ElasticsearchException ("simulated" ));
174
132
}
175
133
}
176
- requestCount ++;
177
134
}
178
135
179
- assertTrue (listener .isDone ());
180
- assertTrue (asyncAction .getNodeResponseTracker ().responsesDiscarded ());
181
- expectThrows (ExecutionException .class , TaskCancelledException .class , listener ::get );
136
+ TestNodesResponse response = listener .actionGet (10 , TimeUnit .SECONDS );
137
+
138
+ for (TestNodeResponse nodeResponse : response .getNodes ()) {
139
+ assertThat (successfulNodes , Matchers .hasItem (nodeResponse .getNode ()));
140
+ }
141
+ assertEquals (successfulNodes .size (), response .getNodes ().size ());
142
+
143
+ assertNotEquals (failedNodeIds .isEmpty (), response .hasFailures ());
144
+ for (FailedNodeException failure : response .failures ()) {
145
+ assertThat (failedNodeIds , Matchers .hasItem (failure .nodeId ()));
146
+ if (failure .getCause ()instanceof ElasticsearchException elasticsearchException ) {
147
+ final var cause = elasticsearchException .unwrapCause ();
148
+ assertEquals ("simulated" , cause .getMessage ());
149
+ } else {
150
+ throw new AssertionError ("unexpected exception" , failure );
151
+ }
152
+ }
153
+ assertEquals (failedNodeIds .size (), response .failures ().size ());
182
154
}
183
155
184
- private <T > List <T > mockList (Supplier <T > supplier , int size ) {
185
- List <T > failures = new ArrayList <>(size );
186
- for (int i = 0 ; i < size ; ++i ) {
187
- failures .add (supplier .get ());
156
+ public void testResponsesReleasedOnCancellation () {
157
+ final TestTransportNodesAction action = getTestTransportNodesAction ();
158
+
159
+ final CancellableTask cancellableTask = new CancellableTask (randomLong (), "transport" , "action" , "" , null , emptyMap ());
160
+ final PlainActionFuture <TestNodesResponse > listener = new PlainActionFuture <>();
161
+ action .execute (cancellableTask , new TestNodesRequest (), listener );
162
+
163
+ final List <CapturingTransport .CapturedRequest > capturedRequests = new ArrayList <>(
164
+ Arrays .asList (transport .getCapturedRequestsAndClear ())
165
+ );
166
+ Randomness .shuffle (capturedRequests );
167
+
168
+ final ReachabilityChecker reachabilityChecker = new ReachabilityChecker ();
169
+ final Runnable nextRequestProcessor = () -> {
170
+ var capturedRequest = capturedRequests .remove (0 );
171
+ if (randomBoolean ()) {
172
+ // transport.handleResponse may de/serialize the response, releasing it early, so send the response straight to the handler
173
+ transport .getTransportResponseHandler (capturedRequest .requestId ())
174
+ .handleResponse (reachabilityChecker .register (new TestNodeResponse (capturedRequest .node ())));
175
+ } else {
176
+ // handleRemoteError may de/serialize the exception, releasing it early, so just use handleLocalError
177
+ transport .handleLocalError (
178
+ capturedRequest .requestId (),
179
+ reachabilityChecker .register (new ElasticsearchException ("simulated" ))
180
+ );
181
+ }
182
+ };
183
+
184
+ assertThat (capturedRequests .size (), greaterThan (2 ));
185
+ final var responsesBeforeCancellation = between (1 , capturedRequests .size () - 2 );
186
+ for (int i = 0 ; i < responsesBeforeCancellation ; i ++) {
187
+ nextRequestProcessor .run ();
188
+ }
189
+
190
+ reachabilityChecker .checkReachable ();
191
+ TaskCancelHelper .cancel (cancellableTask , "simulated" );
192
+
193
+ // responses captured before cancellation are now unreachable
194
+ reachabilityChecker .ensureUnreachable ();
195
+
196
+ while (capturedRequests .size () > 0 ) {
197
+ // a response sent after cancellation is dropped immediately
198
+ assertFalse (listener .isDone ());
199
+ nextRequestProcessor .run ();
200
+ reachabilityChecker .ensureUnreachable ();
188
201
}
189
- return failures ;
202
+
203
+ expectThrows (TaskCancelledException .class , () -> listener .actionGet (10 , TimeUnit .SECONDS ));
190
204
}
191
205
192
206
@ BeforeClass
0 commit comments