Skip to content
Closed
6 changes: 6 additions & 0 deletions docs/changelog/125238.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 125238
summary: Sending slice requests to different nodes in `BulkByScrollParallelizationHelper`
to improve performance
area: Reindex
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,19 @@

package org.elasticsearch.reindex;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequest;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
import org.elasticsearch.action.admin.cluster.shards.TransportClusterSearchShardsAction;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.index.reindex.AbstractBulkByScrollRequest;
Expand All @@ -26,12 +31,15 @@
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.slice.SliceBuilder;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

/**
Expand All @@ -40,6 +48,12 @@
class BulkByScrollParallelizationHelper {

static final int AUTO_SLICE_CEILING = 20;
/*
* The following is incremented in order to keep track of the current round-robin position for ingest nodes that we send sliced requests
* to. We randomize where it starts so that all nodes don't begin by sending data to the same node.
*/
private static final AtomicInteger ingestNodeOffsetGenerator = new AtomicInteger(Randomness.get().nextInt(2048));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to get rid of the static here, perhaps this can be kept on the action instead and passed in?

private static final Logger logger = LogManager.getLogger(BulkByScrollParallelizationHelper.class);

private BulkByScrollParallelizationHelper() {}

Expand All @@ -62,13 +76,17 @@ static <Request extends AbstractBulkByScrollRequest<Request>> void startSlicedAc
ActionListener<BulkByScrollResponse> listener,
Client client,
DiscoveryNode node,
Runnable workerAction
Runnable workerAction,
TransportService transportService,
ClusterService clusterService
) {
initTaskState(
task,
request,
client,
listener.delegateFailure((l, v) -> executeSlicedAction(task, request, action, l, client, node, workerAction))
listener.delegateFailure(
(l, v) -> executeSlicedAction(task, request, action, l, client, node, workerAction, transportService, clusterService)
)
);
}

Expand All @@ -89,10 +107,12 @@ static <Request extends AbstractBulkByScrollRequest<Request>> void executeSliced
ActionListener<BulkByScrollResponse> listener,
Client client,
DiscoveryNode node,
Runnable workerAction
Runnable workerAction,
TransportService transportService,
ClusterService clusterService
) {
if (task.isLeader()) {
sendSubRequests(client, action, node.getId(), task, request, listener);
sendSubRequests(client, action, node.getId(), task, request, listener, transportService, clusterService);
} else if (task.isWorker()) {
workerAction.run();
} else {
Expand Down Expand Up @@ -158,20 +178,44 @@ private static <Request extends AbstractBulkByScrollRequest<Request>> void sendS
String localNodeId,
BulkByScrollTask task,
Request request,
ActionListener<BulkByScrollResponse> listener
ActionListener<BulkByScrollResponse> listener,
TransportService transportService,
ClusterService clusterService
) {

LeaderBulkByScrollTaskState worker = task.getLeaderState();
int totalSlices = worker.getSlices();
TaskId parentTaskId = new TaskId(localNodeId, task.getId());
final DiscoveryNode[] ingestNodes = clusterService.state().getNodes().getIngestNodes().values().toArray(DiscoveryNode[]::new);
boolean localNodeIsIngestNode = clusterService.state().getNodes().getLocalNode().isIngestNode();
for (final SearchRequest slice : sliceIntoSubRequests(request.getSearchRequest(), IdFieldMapper.NAME, totalSlices)) {
// TODO move the request to the correct node. maybe here or somehow do it as part of startup for reindex in general....
Request requestForSlice = request.forSlice(parentTaskId, slice, totalSlices);
ActionListener<BulkByScrollResponse> sliceListener = ActionListener.wrap(
r -> worker.onSliceResponse(listener, slice.source().slice().getId(), r),
e -> worker.onSliceFailure(listener, slice.source().slice().getId(), e)
);
client.execute(action, requestForSlice, sliceListener);
if (ingestNodes.length == 0 || (localNodeIsIngestNode && ingestNodes.length == 1)) {
/*
* Either there were no ingest nodes so running locally is better than failing, or this node is the only ingest node so
* just use the client to run it.
*/
client.execute(action, requestForSlice, sliceListener);
} else {
/*
* Indexing will potentially run a pipeline for each document. If we run all slices on the same node (locally), that
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pipelines that hog the CPU that much during reindex sounds problematic, I wonder if that is worth looking into instead? Perhaps you have more detail to share (privately is good too).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't even have to be a really CPU-heavy pipeline. Just the existence of a trivial set processor, for example, means that the data has to be deserialized and reserialized. Just that serialization slows things down a good bit (since without pipelines the data is never deserialized before being sent to the correct node for indexing). There might be something smarter we can do on the pipeline side, but this seemed like an easy workaround to spread out that work (although taking the comment below about rebalancing into account, this is no longer an easy workaround).

* becomes a bottleneck. This code round-robins slice requests to all ingest nodes to spread out the pipeline workload. When
* an index has many slices, this can improve performance a good bit.
*/
DiscoveryNode ingestNode = ingestNodes[Math.floorMod(ingestNodeOffsetGenerator.incrementAndGet(), ingestNodes.length)];
logger.debug("Sending request for slice to {}", ingestNode.getName());
transportService.sendRequest(
ingestNode,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there are expectations of this running on the local node, for instance here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't realize that we had a rebalance API, and that it worked on the assumption that all subtasks were local. That would be a much bigger change to handle (I assume we'd have to put information about where each child task is running into the LeaderBulkByScrollTaskState?). So I think I'll close this for now, and maybe revisit it if we see evidence of this causing performance problems in the wild.

action.name(),
requestForSlice,
new ActionListenerResponseHandler<>(sliceListener, BulkByScrollResponse::new, TransportResponseHandler.TRANSPORT_WORKER)
);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xcontent.XContentParserConfiguration;
Expand All @@ -83,6 +84,7 @@ public class Reindexer {
private static final Logger logger = LogManager.getLogger(Reindexer.class);

private final ClusterService clusterService;
private final TransportService transportService;
private final ProjectResolver projectResolver;
private final Client client;
private final ThreadPool threadPool;
Expand All @@ -92,6 +94,7 @@ public class Reindexer {

Reindexer(
ClusterService clusterService,
TransportService transportService,
ProjectResolver projectResolver,
Client client,
ThreadPool threadPool,
Expand All @@ -100,6 +103,7 @@ public class Reindexer {
@Nullable ReindexMetrics reindexMetrics
) {
this.clusterService = clusterService;
this.transportService = transportService;
this.projectResolver = projectResolver;
this.client = client;
this.threadPool = threadPool;
Expand Down Expand Up @@ -143,7 +147,9 @@ public void execute(BulkByScrollTask task, ReindexRequest request, Client bulkCl
})
);
searchAction.start();
}
},
transportService,
clusterService
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class TransportDeleteByQueryAction extends HandledTransportAction<DeleteB
private final Client client;
private final ScriptService scriptService;
private final ClusterService clusterService;
private final TransportService transportService;
private final DeleteByQueryMetrics deleteByQueryMetrics;

@Inject
Expand All @@ -52,6 +53,7 @@ public TransportDeleteByQueryAction(
this.client = client;
this.scriptService = scriptService;
this.clusterService = clusterService;
this.transportService = transportService;
this.deleteByQueryMetrics = deleteByQueryMetrics;
}

Expand Down Expand Up @@ -86,7 +88,9 @@ public void doExecute(Task task, DeleteByQueryRequest request, ActionListener<Bu
}
})
).start();
}
},
transportService,
clusterService
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,16 @@ protected TransportReindexAction(
projectResolver,
autoCreateIndex
);
this.reindexer = new Reindexer(clusterService, projectResolver, client, threadPool, scriptService, sslConfig, reindexMetrics);
this.reindexer = new Reindexer(
clusterService,
transportService,
projectResolver,
client,
threadPool,
scriptService,
sslConfig,
reindexMetrics
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public class TransportUpdateByQueryAction extends HandledTransportAction<UpdateB
private final Client client;
private final ScriptService scriptService;
private final ClusterService clusterService;
private final TransportService transportService;
private final UpdateByQueryMetrics updateByQueryMetrics;

@Inject
Expand All @@ -64,6 +65,7 @@ public TransportUpdateByQueryAction(
this.client = client;
this.scriptService = scriptService;
this.clusterService = clusterService;
this.transportService = transportService;
this.updateByQueryMetrics = updateByQueryMetrics;
}

Expand Down Expand Up @@ -100,7 +102,9 @@ protected void doExecute(Task task, UpdateByQueryRequest request, ActionListener
}
})
).start();
}
},
transportService,
clusterService
);
}

Expand Down
Loading
Loading