Skip to content

Commit 0ae33ec

Browse files
masseykesmalyshev
authored andcommitted
Reindex data stream indices on different nodes (elastic#125171)
1 parent 597d526 commit 0ae33ec

File tree

3 files changed

+198
-3
lines changed

3 files changed

+198
-3
lines changed

docs/changelog/125171.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 125171
2+
summary: Reindex data stream indices on different nodes
3+
area: Data streams
4+
type: enhancement
5+
issues: []

x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.elasticsearch.ElasticsearchException;
1313
import org.elasticsearch.ResourceNotFoundException;
1414
import org.elasticsearch.action.ActionListener;
15+
import org.elasticsearch.action.ActionListenerResponseHandler;
1516
import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
1617
import org.elasticsearch.action.admin.indices.close.CloseIndexResponse;
1718
import org.elasticsearch.action.admin.indices.close.TransportCloseIndexAction;
@@ -35,9 +36,12 @@
3536
import org.elasticsearch.action.support.broadcast.BroadcastResponse;
3637
import org.elasticsearch.action.support.master.AcknowledgedResponse;
3738
import org.elasticsearch.client.internal.Client;
39+
import org.elasticsearch.client.internal.transport.NoNodeAvailableException;
3840
import org.elasticsearch.cluster.block.ClusterBlockException;
3941
import org.elasticsearch.cluster.metadata.IndexMetadata;
42+
import org.elasticsearch.cluster.node.DiscoveryNode;
4043
import org.elasticsearch.cluster.service.ClusterService;
44+
import org.elasticsearch.common.Randomness;
4145
import org.elasticsearch.common.settings.Setting;
4246
import org.elasticsearch.common.settings.Settings;
4347
import org.elasticsearch.core.Assertions;
@@ -52,6 +56,7 @@
5256
import org.elasticsearch.tasks.Task;
5357
import org.elasticsearch.tasks.TaskId;
5458
import org.elasticsearch.threadpool.ThreadPool;
59+
import org.elasticsearch.transport.TransportResponseHandler;
5560
import org.elasticsearch.transport.TransportService;
5661
import org.elasticsearch.xpack.core.deprecation.DeprecatedIndexPredicate;
5762
import org.elasticsearch.xpack.migrate.MigrateTemplateRegistry;
@@ -60,6 +65,7 @@
6065
import java.util.Locale;
6166
import java.util.Map;
6267
import java.util.Objects;
68+
import java.util.concurrent.atomic.AtomicInteger;
6369

6470
import static org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock.METADATA;
6571
import static org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock.READ_ONLY;
@@ -101,6 +107,14 @@ public class ReindexDataStreamIndexTransportAction extends HandledTransportActio
101107

102108
private final ClusterService clusterService;
103109
private final Client client;
110+
private final TransportService transportService;
111+
/*
112+
* The following is incremented in order to keep track of the current round-robin position for ingest nodes that we send sliced requests
113+
* to. We bound its random starting value to less than or equal to 2 ^ 30 (the default is Integer.MAX_VALUE or 2 ^ 31 - 1) only so that
114+
* the unit test doesn't fail if it rolls over Integer.MAX_VALUE (since the node selected is the same for Integer.MAX_VALUE and
115+
* Integer.MAX_VALUE + 1).
116+
*/
117+
private final AtomicInteger ingestNodeOffsetGenerator = new AtomicInteger(Randomness.get().nextInt(2 ^ 30));
104118

105119
@Inject
106120
public ReindexDataStreamIndexTransportAction(
@@ -119,6 +133,7 @@ public ReindexDataStreamIndexTransportAction(
119133
);
120134
this.clusterService = clusterService;
121135
this.client = client;
136+
this.transportService = transportService;
122137
}
123138

124139
@Override
@@ -305,7 +320,28 @@ void reindex(String sourceIndexName, String destIndexName, ActionListener<BulkBy
305320
listener.onResponse(bulkByScrollResponse);
306321
}
307322
}, listener::onFailure);
308-
client.execute(ReindexAction.INSTANCE, reindexRequest, checkForFailuresListener);
323+
/*
324+
* Reindex will potentially run a pipeline for each document. If we run all reindex requests on the same node (locally), that
325+
* becomes a bottleneck. This code round-robins reindex requests to all ingest nodes to spread out the pipeline workload. When a
326+
* data stream has many indices, this can improve performance a good bit.
327+
*/
328+
final DiscoveryNode[] ingestNodes = clusterService.state().getNodes().getIngestNodes().values().toArray(DiscoveryNode[]::new);
329+
if (ingestNodes.length == 0) {
330+
listener.onFailure(new NoNodeAvailableException("No ingest nodes in cluster"));
331+
} else {
332+
DiscoveryNode ingestNode = ingestNodes[Math.floorMod(ingestNodeOffsetGenerator.incrementAndGet(), ingestNodes.length)];
333+
logger.debug("Sending reindex request to {}", ingestNode.getName());
334+
transportService.sendRequest(
335+
ingestNode,
336+
ReindexAction.NAME,
337+
reindexRequest,
338+
new ActionListenerResponseHandler<>(
339+
checkForFailuresListener,
340+
BulkByScrollResponse::new,
341+
TransportResponseHandler.TRANSPORT_WORKER
342+
)
343+
);
344+
}
309345
}
310346

311347
private void updateSettings(

x-pack/plugin/migrate/src/test/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportActionTests.java

Lines changed: 156 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,10 @@
1010
import org.elasticsearch.action.ActionListener;
1111
import org.elasticsearch.action.support.ActionFilters;
1212
import org.elasticsearch.client.internal.Client;
13+
import org.elasticsearch.cluster.ClusterState;
14+
import org.elasticsearch.cluster.node.DiscoveryNode;
15+
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
16+
import org.elasticsearch.cluster.node.DiscoveryNodes;
1317
import org.elasticsearch.cluster.service.ClusterService;
1418
import org.elasticsearch.common.settings.ClusterSettings;
1519
import org.elasticsearch.common.settings.Settings;
@@ -29,10 +33,16 @@
2933
import org.mockito.MockitoAnnotations;
3034

3135
import java.util.Collections;
36+
import java.util.Map;
37+
import java.util.Set;
38+
import java.util.concurrent.atomic.AtomicBoolean;
3239

40+
import static org.hamcrest.Matchers.equalTo;
41+
import static org.hamcrest.Matchers.not;
3342
import static org.mockito.ArgumentMatchers.any;
3443
import static org.mockito.ArgumentMatchers.eq;
3544
import static org.mockito.Mockito.doNothing;
45+
import static org.mockito.Mockito.mock;
3646
import static org.mockito.Mockito.when;
3747

3848
public class ReindexDataStreamIndexTransportActionTests extends ESTestCase {
@@ -112,7 +122,10 @@ public void testReindexIncludesRateLimit() {
112122
)
113123
);
114124

115-
doNothing().when(client).execute(eq(ReindexAction.INSTANCE), request.capture(), any());
125+
ClusterState clusterState = mock(ClusterState.class);
126+
when(clusterState.getNodes()).thenReturn(getTestDiscoveryNodes());
127+
when(clusterService.state()).thenReturn(clusterState);
128+
doNothing().when(transportService).sendRequest(any(), eq(ReindexAction.NAME), request.capture(), any());
116129

117130
action.reindex(sourceIndex, destIndex, listener, taskId);
118131

@@ -137,7 +150,10 @@ public void testReindexIncludesInfiniteRateLimit() {
137150
Collections.singleton(ReindexDataStreamIndexTransportAction.REINDEX_MAX_REQUESTS_PER_SECOND_SETTING)
138151
)
139152
);
140-
doNothing().when(client).execute(eq(ReindexAction.INSTANCE), request.capture(), any());
153+
ClusterState clusterState = mock(ClusterState.class);
154+
when(clusterState.getNodes()).thenReturn(getTestDiscoveryNodes());
155+
when(clusterService.state()).thenReturn(clusterState);
156+
doNothing().when(transportService).sendRequest(any(), eq(ReindexAction.NAME), request.capture(), any());
141157

142158
action.reindex(sourceIndex, destIndex, listener, taskId);
143159

@@ -204,4 +220,142 @@ public void testReindexNegativeRateLimitThrowsError() {
204220
e.getMessage()
205221
);
206222
}
223+
224+
public void testRoundRobin() {
225+
/*
226+
* This tests that the action will round-robin through the list of ingest nodes in the cluster.
227+
*/
228+
String sourceIndex = randomAlphanumericOfLength(10);
229+
String destIndex = randomAlphanumericOfLength(10);
230+
AtomicBoolean failed = new AtomicBoolean(false);
231+
ActionListener<BulkByScrollResponse> listener = new ActionListener<>() {
232+
@Override
233+
public void onResponse(BulkByScrollResponse bulkByScrollResponse) {}
234+
235+
@Override
236+
public void onFailure(Exception e) {
237+
failed.set(true);
238+
}
239+
};
240+
TaskId taskId = TaskId.EMPTY_TASK_ID;
241+
242+
when(clusterService.getClusterSettings()).thenReturn(
243+
new ClusterSettings(
244+
Settings.EMPTY,
245+
Collections.singleton(ReindexDataStreamIndexTransportAction.REINDEX_MAX_REQUESTS_PER_SECOND_SETTING)
246+
)
247+
);
248+
249+
ClusterState clusterState = mock(ClusterState.class);
250+
when(clusterState.getNodes()).thenReturn(getTestDiscoveryNodes());
251+
when(clusterService.state()).thenReturn(clusterState);
252+
ArgumentCaptor<DiscoveryNode> nodeCaptor = ArgumentCaptor.captor();
253+
doNothing().when(transportService).sendRequest(nodeCaptor.capture(), eq(ReindexAction.NAME), request.capture(), any());
254+
255+
action.reindex(sourceIndex, destIndex, listener, taskId);
256+
DiscoveryNode node1 = nodeCaptor.getValue();
257+
assertNotNull(node1);
258+
259+
action.reindex(sourceIndex, destIndex, listener, taskId);
260+
DiscoveryNode node2 = nodeCaptor.getValue();
261+
assertNotNull(node2);
262+
263+
int ingestNodeCount = clusterState.getNodes().getIngestNodes().size();
264+
if (ingestNodeCount > 1) {
265+
assertThat(node1.getName(), not(equalTo(node2.getName())));
266+
}
267+
268+
// check that if we keep going we eventually get back to the original node:
269+
DiscoveryNode node = node2;
270+
for (int i = 0; i < ingestNodeCount - 1; i++) {
271+
action.reindex(sourceIndex, destIndex, listener, taskId);
272+
node = nodeCaptor.getValue();
273+
}
274+
assertNotNull(node);
275+
assertThat(node1.getName(), equalTo(node.getName()));
276+
assertThat(failed.get(), equalTo(false));
277+
278+
// make sure the listener gets notified of failure if there are no ingest nodes:
279+
when(clusterState.getNodes()).thenReturn(getTestDiscoveryNodesNoIngest());
280+
action.reindex(sourceIndex, destIndex, listener, taskId);
281+
assertThat(failed.get(), equalTo(true));
282+
}
283+
284+
private DiscoveryNodes getTestDiscoveryNodes() {
285+
DiscoveryNodes.Builder builder = DiscoveryNodes.builder();
286+
boolean nodeHasIngestRole = false;
287+
int nodeCount = randomIntBetween(1, 10);
288+
for (int i = 0; i < nodeCount; i++) {
289+
final DiscoveryNode discoveryNode = new DiscoveryNode(
290+
"test-name-" + i,
291+
"test-id-" + i,
292+
"test-ephemeral-id-" + i,
293+
"test-hostname-" + i,
294+
"test-hostaddr",
295+
buildNewFakeTransportAddress(),
296+
Map.of(),
297+
randomSet(
298+
1,
299+
5,
300+
() -> randomFrom(
301+
DiscoveryNodeRole.DATA_ROLE,
302+
DiscoveryNodeRole.INGEST_ROLE,
303+
DiscoveryNodeRole.SEARCH_ROLE,
304+
DiscoveryNodeRole.MASTER_ROLE,
305+
DiscoveryNodeRole.MASTER_ROLE
306+
)
307+
),
308+
null,
309+
null
310+
);
311+
nodeHasIngestRole = nodeHasIngestRole || discoveryNode.getRoles().contains(DiscoveryNodeRole.INGEST_ROLE);
312+
builder.add(discoveryNode);
313+
}
314+
if (nodeHasIngestRole == false) {
315+
final DiscoveryNode discoveryNode = new DiscoveryNode(
316+
"test-name-" + nodeCount,
317+
"test-id-" + nodeCount,
318+
"test-ephemeral-id-" + nodeCount,
319+
"test-hostname-" + nodeCount,
320+
"test-hostaddr",
321+
buildNewFakeTransportAddress(),
322+
Map.of(),
323+
Set.of(DiscoveryNodeRole.INGEST_ROLE),
324+
null,
325+
null
326+
);
327+
builder.add(discoveryNode);
328+
}
329+
return builder.build();
330+
}
331+
332+
private DiscoveryNodes getTestDiscoveryNodesNoIngest() {
333+
DiscoveryNodes.Builder builder = DiscoveryNodes.builder();
334+
int nodeCount = randomIntBetween(0, 10);
335+
for (int i = 0; i < nodeCount; i++) {
336+
final DiscoveryNode discoveryNode = new DiscoveryNode(
337+
"test-name-" + i,
338+
"test-id-" + i,
339+
"test-ephemeral-id-" + i,
340+
"test-hostname-" + i,
341+
"test-hostaddr",
342+
buildNewFakeTransportAddress(),
343+
Map.of(),
344+
randomSet(
345+
1,
346+
4,
347+
() -> randomFrom(
348+
DiscoveryNodeRole.DATA_ROLE,
349+
DiscoveryNodeRole.SEARCH_ROLE,
350+
DiscoveryNodeRole.MASTER_ROLE,
351+
DiscoveryNodeRole.MASTER_ROLE
352+
)
353+
),
354+
null,
355+
null
356+
);
357+
builder.add(discoveryNode);
358+
}
359+
return builder.build();
360+
}
207361
}

0 commit comments

Comments
 (0)