1010import org .elasticsearch .action .ActionListener ;
1111import org .elasticsearch .action .support .ActionFilters ;
1212import org .elasticsearch .client .internal .Client ;
13+ import org .elasticsearch .cluster .ClusterState ;
14+ import org .elasticsearch .cluster .node .DiscoveryNode ;
15+ import org .elasticsearch .cluster .node .DiscoveryNodeRole ;
16+ import org .elasticsearch .cluster .node .DiscoveryNodes ;
1317import org .elasticsearch .cluster .service .ClusterService ;
1418import org .elasticsearch .common .settings .ClusterSettings ;
1519import org .elasticsearch .common .settings .Settings ;
2933import org .mockito .MockitoAnnotations ;
3034
3135import java .util .Collections ;
36+ import java .util .Map ;
37+ import java .util .Set ;
38+ import java .util .concurrent .atomic .AtomicBoolean ;
3239
40+ import static org .hamcrest .Matchers .equalTo ;
41+ import static org .hamcrest .Matchers .not ;
3342import static org .mockito .ArgumentMatchers .any ;
3443import static org .mockito .ArgumentMatchers .eq ;
3544import static org .mockito .Mockito .doNothing ;
45+ import static org .mockito .Mockito .mock ;
3646import static org .mockito .Mockito .when ;
3747
3848public class ReindexDataStreamIndexTransportActionTests extends ESTestCase {
@@ -112,7 +122,10 @@ public void testReindexIncludesRateLimit() {
112122 )
113123 );
114124
115- doNothing ().when (client ).execute (eq (ReindexAction .INSTANCE ), request .capture (), any ());
125+ ClusterState clusterState = mock (ClusterState .class );
126+ when (clusterState .getNodes ()).thenReturn (getTestDiscoveryNodes ());
127+ when (clusterService .state ()).thenReturn (clusterState );
128+ doNothing ().when (transportService ).sendRequest (any (), eq (ReindexAction .NAME ), request .capture (), any ());
116129
117130 action .reindex (sourceIndex , destIndex , listener , taskId );
118131
@@ -137,7 +150,10 @@ public void testReindexIncludesInfiniteRateLimit() {
137150 Collections .singleton (ReindexDataStreamIndexTransportAction .REINDEX_MAX_REQUESTS_PER_SECOND_SETTING )
138151 )
139152 );
140- doNothing ().when (client ).execute (eq (ReindexAction .INSTANCE ), request .capture (), any ());
153+ ClusterState clusterState = mock (ClusterState .class );
154+ when (clusterState .getNodes ()).thenReturn (getTestDiscoveryNodes ());
155+ when (clusterService .state ()).thenReturn (clusterState );
156+ doNothing ().when (transportService ).sendRequest (any (), eq (ReindexAction .NAME ), request .capture (), any ());
141157
142158 action .reindex (sourceIndex , destIndex , listener , taskId );
143159
@@ -204,4 +220,142 @@ public void testReindexNegativeRateLimitThrowsError() {
204220 e .getMessage ()
205221 );
206222 }
223+
224+ public void testRoundRobin () {
225+ /*
226+ * This tests that the action will round-robin through the list of ingest nodes in the cluster.
227+ */
228+ String sourceIndex = randomAlphanumericOfLength (10 );
229+ String destIndex = randomAlphanumericOfLength (10 );
230+ AtomicBoolean failed = new AtomicBoolean (false );
231+ ActionListener <BulkByScrollResponse > listener = new ActionListener <>() {
232+ @ Override
233+ public void onResponse (BulkByScrollResponse bulkByScrollResponse ) {}
234+
235+ @ Override
236+ public void onFailure (Exception e ) {
237+ failed .set (true );
238+ }
239+ };
240+ TaskId taskId = TaskId .EMPTY_TASK_ID ;
241+
242+ when (clusterService .getClusterSettings ()).thenReturn (
243+ new ClusterSettings (
244+ Settings .EMPTY ,
245+ Collections .singleton (ReindexDataStreamIndexTransportAction .REINDEX_MAX_REQUESTS_PER_SECOND_SETTING )
246+ )
247+ );
248+
249+ ClusterState clusterState = mock (ClusterState .class );
250+ when (clusterState .getNodes ()).thenReturn (getTestDiscoveryNodes ());
251+ when (clusterService .state ()).thenReturn (clusterState );
252+ ArgumentCaptor <DiscoveryNode > nodeCaptor = ArgumentCaptor .captor ();
253+ doNothing ().when (transportService ).sendRequest (nodeCaptor .capture (), eq (ReindexAction .NAME ), request .capture (), any ());
254+
255+ action .reindex (sourceIndex , destIndex , listener , taskId );
256+ DiscoveryNode node1 = nodeCaptor .getValue ();
257+ assertNotNull (node1 );
258+
259+ action .reindex (sourceIndex , destIndex , listener , taskId );
260+ DiscoveryNode node2 = nodeCaptor .getValue ();
261+ assertNotNull (node2 );
262+
263+ int ingestNodeCount = clusterState .getNodes ().getIngestNodes ().size ();
264+ if (ingestNodeCount > 1 ) {
265+ assertThat (node1 .getName (), not (equalTo (node2 .getName ())));
266+ }
267+
268+ // check that if we keep going we eventually get back to the original node:
269+ DiscoveryNode node = node2 ;
270+ for (int i = 0 ; i < ingestNodeCount - 1 ; i ++) {
271+ action .reindex (sourceIndex , destIndex , listener , taskId );
272+ node = nodeCaptor .getValue ();
273+ }
274+ assertNotNull (node );
275+ assertThat (node1 .getName (), equalTo (node .getName ()));
276+ assertThat (failed .get (), equalTo (false ));
277+
278+ // make sure the listener gets notified of failure if there are no ingest nodes:
279+ when (clusterState .getNodes ()).thenReturn (getTestDiscoveryNodesNoIngest ());
280+ action .reindex (sourceIndex , destIndex , listener , taskId );
281+ assertThat (failed .get (), equalTo (true ));
282+ }
283+
284+ private DiscoveryNodes getTestDiscoveryNodes () {
285+ DiscoveryNodes .Builder builder = DiscoveryNodes .builder ();
286+ boolean nodeHasIngestRole = false ;
287+ int nodeCount = randomIntBetween (1 , 10 );
288+ for (int i = 0 ; i < nodeCount ; i ++) {
289+ final DiscoveryNode discoveryNode = new DiscoveryNode (
290+ "test-name-" + i ,
291+ "test-id-" + i ,
292+ "test-ephemeral-id-" + i ,
293+ "test-hostname-" + i ,
294+ "test-hostaddr" ,
295+ buildNewFakeTransportAddress (),
296+ Map .of (),
297+ randomSet (
298+ 1 ,
299+ 5 ,
300+ () -> randomFrom (
301+ DiscoveryNodeRole .DATA_ROLE ,
302+ DiscoveryNodeRole .INGEST_ROLE ,
303+ DiscoveryNodeRole .SEARCH_ROLE ,
304+ DiscoveryNodeRole .MASTER_ROLE ,
305+ DiscoveryNodeRole .MASTER_ROLE
306+ )
307+ ),
308+ null ,
309+ null
310+ );
311+ nodeHasIngestRole = nodeHasIngestRole || discoveryNode .getRoles ().contains (DiscoveryNodeRole .INGEST_ROLE );
312+ builder .add (discoveryNode );
313+ }
314+ if (nodeHasIngestRole == false ) {
315+ final DiscoveryNode discoveryNode = new DiscoveryNode (
316+ "test-name-" + nodeCount ,
317+ "test-id-" + nodeCount ,
318+ "test-ephemeral-id-" + nodeCount ,
319+ "test-hostname-" + nodeCount ,
320+ "test-hostaddr" ,
321+ buildNewFakeTransportAddress (),
322+ Map .of (),
323+ Set .of (DiscoveryNodeRole .INGEST_ROLE ),
324+ null ,
325+ null
326+ );
327+ builder .add (discoveryNode );
328+ }
329+ return builder .build ();
330+ }
331+
332+ private DiscoveryNodes getTestDiscoveryNodesNoIngest () {
333+ DiscoveryNodes .Builder builder = DiscoveryNodes .builder ();
334+ int nodeCount = randomIntBetween (0 , 10 );
335+ for (int i = 0 ; i < nodeCount ; i ++) {
336+ final DiscoveryNode discoveryNode = new DiscoveryNode (
337+ "test-name-" + i ,
338+ "test-id-" + i ,
339+ "test-ephemeral-id-" + i ,
340+ "test-hostname-" + i ,
341+ "test-hostaddr" ,
342+ buildNewFakeTransportAddress (),
343+ Map .of (),
344+ randomSet (
345+ 1 ,
346+ 4 ,
347+ () -> randomFrom (
348+ DiscoveryNodeRole .DATA_ROLE ,
349+ DiscoveryNodeRole .SEARCH_ROLE ,
350+ DiscoveryNodeRole .MASTER_ROLE ,
351+ DiscoveryNodeRole .MASTER_ROLE
352+ )
353+ ),
354+ null ,
355+ null
356+ );
357+ builder .add (discoveryNode );
358+ }
359+ return builder .build ();
360+ }
207361}
0 commit comments