From bec900b2aaa0dd5a6c7d9158970116e1aa40fcaf Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Wed, 19 Mar 2025 11:40:57 -0500 Subject: [PATCH 1/4] Sending BulkByScrollParallelizationHelper to different nodes to improve performance --- .../BulkByScrollParallelizationHelper.java | 52 ++++++++++++++++--- .../org/elasticsearch/reindex/Reindexer.java | 8 ++- .../reindex/TransportDeleteByQueryAction.java | 6 ++- .../reindex/TransportReindexAction.java | 11 +++- .../reindex/TransportUpdateByQueryAction.java | 6 ++- 5 files changed, 73 insertions(+), 10 deletions(-) diff --git a/modules/reindex/src/main/java/org/elasticsearch/reindex/BulkByScrollParallelizationHelper.java b/modules/reindex/src/main/java/org/elasticsearch/reindex/BulkByScrollParallelizationHelper.java index 0b5be5e0f0b5b..ab88fa9a91f22 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/reindex/BulkByScrollParallelizationHelper.java +++ b/modules/reindex/src/main/java/org/elasticsearch/reindex/BulkByScrollParallelizationHelper.java @@ -9,7 +9,10 @@ package org.elasticsearch.reindex; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionListenerResponseHandler; import org.elasticsearch.action.ActionType; import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequest; import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse; @@ -17,6 +20,8 @@ import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Randomness; import org.elasticsearch.index.Index; import org.elasticsearch.index.mapper.IdFieldMapper; import org.elasticsearch.index.reindex.AbstractBulkByScrollRequest; @@ -26,12 +31,15 @@ import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.slice.SliceBuilder; import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.transport.TransportResponseHandler; +import org.elasticsearch.transport.TransportService; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; /** @@ -40,6 +48,8 @@ class BulkByScrollParallelizationHelper { static final int AUTO_SLICE_CEILING = 20; + private static final AtomicInteger ingestNodeOffsetGenerator = new AtomicInteger(Randomness.get().nextInt()); + private static final Logger logger = LogManager.getLogger(BulkByScrollParallelizationHelper.class); private BulkByScrollParallelizationHelper() {} @@ -62,13 +72,17 @@ static > void startSlicedAc ActionListener listener, Client client, DiscoveryNode node, - Runnable workerAction + Runnable workerAction, + TransportService transportService, + ClusterService clusterService ) { initTaskState( task, request, client, - listener.delegateFailure((l, v) -> executeSlicedAction(task, request, action, l, client, node, workerAction)) + listener.delegateFailure( + (l, v) -> executeSlicedAction(task, request, action, l, client, node, workerAction, transportService, clusterService) + ) ); } @@ -89,10 +103,12 @@ static > void executeSliced ActionListener listener, Client client, DiscoveryNode node, - Runnable workerAction + Runnable workerAction, + TransportService transportService, + ClusterService clusterService ) { if (task.isLeader()) { - sendSubRequests(client, action, node.getId(), task, request, listener); + sendSubRequests(client, action, node.getId(), task, request, listener, transportService, clusterService); } else if (task.isWorker()) { workerAction.run(); } else { @@ -158,12 +174,16 @@ private static > void sendS String localNodeId, BulkByScrollTask task, Request request, - ActionListener listener + ActionListener listener, + TransportService transportService, + ClusterService clusterService ) { LeaderBulkByScrollTaskState worker = task.getLeaderState(); int totalSlices = worker.getSlices(); TaskId parentTaskId = new TaskId(localNodeId, task.getId()); + final DiscoveryNode[] ingestNodes = clusterService.state().getNodes().getIngestNodes().values().toArray(DiscoveryNode[]::new); + boolean localNodeIsIngestNode = clusterService.state().getNodes().getLocalNode().isIngestNode(); for (final SearchRequest slice : sliceIntoSubRequests(request.getSearchRequest(), IdFieldMapper.NAME, totalSlices)) { // TODO move the request to the correct node. maybe here or somehow do it as part of startup for reindex in general.... Request requestForSlice = request.forSlice(parentTaskId, slice, totalSlices); @@ -171,7 +191,27 @@ private static > void sendS r -> worker.onSliceResponse(listener, slice.source().slice().getId(), r), e -> worker.onSliceFailure(listener, slice.source().slice().getId(), e) ); - client.execute(action, requestForSlice, sliceListener); + if (ingestNodes.length == 0 || (localNodeIsIngestNode && ingestNodes.length == 1)) { + /* + * Either there were no ingest nodes so running locally is better than failing, or this node is the only ingest node so + * just use the client to run it. + */ + client.execute(action, requestForSlice, sliceListener); + } else { + /* + * Indexing will potentially run a pipeline for each document. If we run all slices on the same node (locally), that + * becomes a bottleneck. This code round-robins slice requests to all ingest nodes to spread out the pipeline workload. When + * an index has many slices, this can improve performance a good bit. + */ + DiscoveryNode ingestNode = ingestNodes[Math.floorMod(ingestNodeOffsetGenerator.incrementAndGet(), ingestNodes.length)]; + logger.debug("Sending request for slice to {}", ingestNode.getName()); + transportService.sendRequest( + ingestNode, + action.name(), + requestForSlice, + new ActionListenerResponseHandler<>(sliceListener, BulkByScrollResponse::new, TransportResponseHandler.TRANSPORT_WORKER) + ); + } } } diff --git a/modules/reindex/src/main/java/org/elasticsearch/reindex/Reindexer.java b/modules/reindex/src/main/java/org/elasticsearch/reindex/Reindexer.java index c908a26335ac6..8727a901cc04b 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/reindex/Reindexer.java +++ b/modules/reindex/src/main/java/org/elasticsearch/reindex/Reindexer.java @@ -59,6 +59,7 @@ import org.elasticsearch.script.Script; import org.elasticsearch.script.ScriptService; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.XContentParser; import org.elasticsearch.xcontent.XContentParserConfiguration; @@ -83,6 +84,7 @@ public class Reindexer { private static final Logger logger = LogManager.getLogger(Reindexer.class); private final ClusterService clusterService; + private final TransportService transportService; private final ProjectResolver projectResolver; private final Client client; private final ThreadPool threadPool; @@ -92,6 +94,7 @@ public class Reindexer { Reindexer( ClusterService clusterService, + TransportService transportService, ProjectResolver projectResolver, Client client, ThreadPool threadPool, @@ -100,6 +103,7 @@ public class Reindexer { @Nullable ReindexMetrics reindexMetrics ) { this.clusterService = clusterService; + this.transportService = transportService; this.projectResolver = projectResolver; this.client = client; this.threadPool = threadPool; @@ -143,7 +147,9 @@ public void execute(BulkByScrollTask task, ReindexRequest request, Client bulkCl }) ); searchAction.start(); - } + }, + transportService, + clusterService ); } diff --git a/modules/reindex/src/main/java/org/elasticsearch/reindex/TransportDeleteByQueryAction.java b/modules/reindex/src/main/java/org/elasticsearch/reindex/TransportDeleteByQueryAction.java index ead9c06288ee0..13b36d588f422 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/reindex/TransportDeleteByQueryAction.java +++ b/modules/reindex/src/main/java/org/elasticsearch/reindex/TransportDeleteByQueryAction.java @@ -35,6 +35,7 @@ public class TransportDeleteByQueryAction extends HandledTransportAction Date: Fri, 21 Mar 2025 15:00:11 -0500 Subject: [PATCH 2/4] adding a unit test --- ...ulkByScrollParallelizationHelperTests.java | 168 ++++++++++++++++++ 1 file changed, 168 insertions(+) diff --git a/modules/reindex/src/test/java/org/elasticsearch/reindex/BulkByScrollParallelizationHelperTests.java b/modules/reindex/src/test/java/org/elasticsearch/reindex/BulkByScrollParallelizationHelperTests.java index ebb4471566fbd..8314b1e8243ff 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/reindex/BulkByScrollParallelizationHelperTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/reindex/BulkByScrollParallelizationHelperTests.java @@ -9,17 +9,48 @@ package org.elasticsearch.reindex; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionType; import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.client.internal.Client; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodeRole; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.index.mapper.IdFieldMapper; +import org.elasticsearch.index.reindex.BulkByScrollResponse; +import org.elasticsearch.index.reindex.BulkByScrollTask; +import org.elasticsearch.index.reindex.ReindexAction; +import org.elasticsearch.index.reindex.ReindexRequest; import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.transport.TransportService; +import org.mockito.ArgumentCaptor; import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import static org.elasticsearch.reindex.BulkByScrollParallelizationHelper.sliceIntoSubRequests; import static org.elasticsearch.search.RandomSearchRequestGenerator.randomSearchRequest; import static org.elasticsearch.search.RandomSearchRequestGenerator.randomSearchSourceBuilder; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.not; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class BulkByScrollParallelizationHelperTests extends ESTestCase { public void testSliceIntoSubRequests() throws IOException { @@ -48,4 +79,141 @@ public void testSliceIntoSubRequests() throws IOException { currentSliceId++; } } + + public void testExecuteSlicedAction() { + ActionType action = ReindexAction.INSTANCE; + DiscoveryNode localNode = getTestDiscoveryNode(0, randomBoolean()); + String localNodeId = localNode.getId(); + BulkByScrollTask task = new BulkByScrollTask( + randomLong(), + randomAlphaOfLength(10), + action.name(), + randomAlphaOfLength(10), + new TaskId(localNodeId, randomLong()), + Map.of() + ); + int numberOfSlices = randomIntBetween(2, 100); + task.setWorkerCount(numberOfSlices); + ReindexRequest request = new ReindexRequest(); + AtomicBoolean failed = new AtomicBoolean(false); + ActionListener listener = new ActionListener<>() { + @Override + public void onResponse(BulkByScrollResponse bulkByScrollResponse) {} + + @Override + public void onFailure(Exception e) { + failed.set(true); + } + }; + Client client = mock(Client.class); + Runnable workerAction = () -> {}; + TransportService transportService = mock(TransportService.class); + ClusterService clusterService = mock(ClusterService.class); + ClusterState clusterState = mock(ClusterState.class); + ArgumentCaptor nodeCaptor = ArgumentCaptor.captor(); + ArgumentCaptor requestCaptor = ArgumentCaptor.captor(); + doNothing().when(transportService).sendRequest(nodeCaptor.capture(), eq(ReindexAction.NAME), requestCaptor.capture(), any()); + { + // We have multiple ingest nodes, so we make sure that requests are not always sent to the same one + when(clusterService.state()).thenReturn(clusterState); + when(clusterState.getNodes()).thenReturn(getTestDiscoveryNodes(randomIntBetween(2, 10))); + BulkByScrollParallelizationHelper.executeSlicedAction( + task, + request, + action, + listener, + client, + localNode, + workerAction, + transportService, + clusterService + ); + verify(transportService, times(numberOfSlices)).sendRequest(any(), any(), any(), any()); + verify(client, times(0)).execute(any(), any(), any()); + List nodesUsed = nodeCaptor.getAllValues(); + assertThat(nodesUsed.size(), equalTo(numberOfSlices)); + DiscoveryNode firstNode = nodesUsed.get(0); + assertNotNull(firstNode); + DiscoveryNode previousNode = firstNode; + for (int i = 1; i < nodesUsed.size(); i++) { + DiscoveryNode node = nodesUsed.get(i); + assertNotNull(node); + assertThat(node.getId(), not(equalTo(previousNode.getId()))); + previousNode = node; + } + assertThat(failed.get(), equalTo(false)); + } + /* + * If there are no ingest nodes, we expect it to just use the client. If we have a single ingest node, we have no need of + * round-robin, so we expect it to use the client + */ + for (int ingestNodeCount = 0; ingestNodeCount < 2; ingestNodeCount++) { + when(clusterService.state()).thenReturn(clusterState); + when(clusterState.getNodes()).thenReturn(getTestDiscoveryNodes(ingestNodeCount)); + reset(client); + reset(transportService); + BulkByScrollParallelizationHelper.executeSlicedAction( + task, + request, + action, + listener, + client, + localNode, + workerAction, + transportService, + clusterService + ); + assertThat(failed.get(), equalTo(false)); + verify(transportService, times(0)).sendRequest(any(), any(), any(), any()); + verify(client, times(numberOfSlices)).execute(any(), any(), any()); + assertThat(failed.get(), equalTo(false)); + } + } + + private DiscoveryNodes getTestDiscoveryNodes(int ingestNodeCount) { + DiscoveryNodes.Builder builder = DiscoveryNodes.builder(); + int additionalNodeCount = randomIntBetween(ingestNodeCount == 0 ? 1 : 0, 10); + List discoveryNodeList = new ArrayList<>(); + for (int i = 0; i < ingestNodeCount; i++) { + discoveryNodeList.add(getTestDiscoveryNode(i, true)); + } + for (int i = 0; i < additionalNodeCount; i++) { + discoveryNodeList.add(getTestDiscoveryNode(i + ingestNodeCount, false)); + } + for (DiscoveryNode discoveryNode : discoveryNodeList) { + builder.add(discoveryNode); + } + builder.localNodeId(discoveryNodeList.stream().map(DiscoveryNode::getId).findAny().orElseThrow()); + return builder.build(); + } + + private DiscoveryNode getTestDiscoveryNode(int i, boolean ingestNode) { + Set roles = new HashSet<>(); + if (ingestNode) { + roles.add(DiscoveryNodeRole.INGEST_ROLE); + } + Set otherRoles = randomSet( + ingestNode ? 0 : 1, + 4, + () -> randomFrom( + DiscoveryNodeRole.DATA_ROLE, + DiscoveryNodeRole.SEARCH_ROLE, + DiscoveryNodeRole.MASTER_ROLE, + DiscoveryNodeRole.MASTER_ROLE + ) + ); + roles.addAll(otherRoles); + return new DiscoveryNode( + "test-name-" + i, + "test-id-" + i, + "test-ephemeral-id-" + i, + "test-hostname-" + i, + "test-hostaddr", + buildNewFakeTransportAddress(), + Map.of(), + roles, + null, + null + ); + } } From a85c864ee977867fd9504427e00e1c049d1abb08 Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Fri, 21 Mar 2025 15:01:16 -0500 Subject: [PATCH 3/4] setting max ingestNodeOffsetGenerator initial value to a reasonable number --- .../reindex/BulkByScrollParallelizationHelper.java | 6 +++++- .../action/ReindexDataStreamIndexTransportAction.java | 8 +++----- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/modules/reindex/src/main/java/org/elasticsearch/reindex/BulkByScrollParallelizationHelper.java b/modules/reindex/src/main/java/org/elasticsearch/reindex/BulkByScrollParallelizationHelper.java index ab88fa9a91f22..1915a4f9bab5d 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/reindex/BulkByScrollParallelizationHelper.java +++ b/modules/reindex/src/main/java/org/elasticsearch/reindex/BulkByScrollParallelizationHelper.java @@ -48,7 +48,11 @@ class BulkByScrollParallelizationHelper { static final int AUTO_SLICE_CEILING = 20; - private static final AtomicInteger ingestNodeOffsetGenerator = new AtomicInteger(Randomness.get().nextInt()); + /* + * The following is incremented in order to keep track of the current round-robin position for ingest nodes that we send sliced requests + * to. We randomize where it starts so that all nodes don't begin by sending data to the same node. + */ + private static final AtomicInteger ingestNodeOffsetGenerator = new AtomicInteger(Randomness.get().nextInt(2048)); private static final Logger logger = LogManager.getLogger(BulkByScrollParallelizationHelper.class); private BulkByScrollParallelizationHelper() {} diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java index 189bdc1012790..15deb2b3a8caf 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java @@ -109,12 +109,10 @@ public class ReindexDataStreamIndexTransportAction extends HandledTransportActio private final Client client; private final TransportService transportService; /* - * The following is incremented in order to keep track of the current round-robin position for ingest nodes that we send sliced requests - * to. We bound its random starting value to less than or equal to 2 ^ 30 (the default is Integer.MAX_VALUE or 2 ^ 31 - 1) only so that - * the unit test doesn't fail if it rolls over Integer.MAX_VALUE (since the node selected is the same for Integer.MAX_VALUE and - * Integer.MAX_VALUE + 1). + * The following is incremented in order to keep track of the current round-robin position for ingest nodes that we send reindex + * requests to. We randomize where it starts so that all nodes don't begin by sending data to the same node. */ - private final AtomicInteger ingestNodeOffsetGenerator = new AtomicInteger(Randomness.get().nextInt(2 ^ 30)); + private final AtomicInteger ingestNodeOffsetGenerator = new AtomicInteger(Randomness.get().nextInt(2048)); @Inject public ReindexDataStreamIndexTransportAction( From 30c501eb09425d19f554970487fa2aaefc4ba21d Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Fri, 21 Mar 2025 15:06:50 -0500 Subject: [PATCH 4/4] Update docs/changelog/125238.yaml --- docs/changelog/125238.yaml | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 docs/changelog/125238.yaml diff --git a/docs/changelog/125238.yaml b/docs/changelog/125238.yaml new file mode 100644 index 0000000000000..6e0af98eb635e --- /dev/null +++ b/docs/changelog/125238.yaml @@ -0,0 +1,6 @@ +pr: 125238 +summary: Sending slice requests to different nodes in `BulkByScrollParallelizationHelper` + to improve performance +area: Reindex +type: enhancement +issues: []