Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/125171.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 125171
summary: Reindex data stream indices on different nodes
area: Data streams
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
import org.elasticsearch.action.admin.indices.close.CloseIndexResponse;
import org.elasticsearch.action.admin.indices.close.TransportCloseIndexAction;
Expand All @@ -35,9 +36,12 @@
import org.elasticsearch.action.support.broadcast.BroadcastResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.client.internal.transport.NoNodeAvailableException;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Assertions;
Expand All @@ -52,6 +56,7 @@
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.deprecation.DeprecatedIndexPredicate;
import org.elasticsearch.xpack.migrate.MigrateTemplateRegistry;
Expand All @@ -60,6 +65,7 @@
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;

import static org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock.METADATA;
import static org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock.READ_ONLY;
Expand Down Expand Up @@ -101,6 +107,14 @@ public class ReindexDataStreamIndexTransportAction extends HandledTransportActio

private final ClusterService clusterService;
private final Client client;
private final TransportService transportService;
/*
* The following is incremented in order to keep track of the current round-robin position for ingest nodes that we send sliced requests
* to. We bound its random starting value to less than or equal to 2 ^ 30 (the default is Integer.MAX_VALUE or 2 ^ 31 - 1) only so that
* the unit test doesn't fail if it rolls over Integer.MAX_VALUE (since the node selected is the same for Integer.MAX_VALUE and
* Integer.MAX_VALUE + 1).
*/
private final AtomicInteger ingestNodeOffsetGenerator = new AtomicInteger(Randomness.get().nextInt(2 ^ 30));

@Inject
public ReindexDataStreamIndexTransportAction(
Expand All @@ -119,6 +133,7 @@ public ReindexDataStreamIndexTransportAction(
);
this.clusterService = clusterService;
this.client = client;
this.transportService = transportService;
}

@Override
Expand Down Expand Up @@ -304,7 +319,28 @@ void reindex(String sourceIndexName, String destIndexName, ActionListener<BulkBy
listener.onResponse(bulkByScrollResponse);
}
}, listener::onFailure);
client.execute(ReindexAction.INSTANCE, reindexRequest, checkForFailuresListener);
/*
* Reindex will potentially run a pipeline for each document. If we run all reindex requests on the same node (locally), that
* becomes a bottleneck. This code round-robins reindex requests to all ingest nodes to spread out the pipeline workload. When a
* data stream has many indices, this can improve performance a good bit.
*/
final DiscoveryNode[] ingestNodes = clusterService.state().getNodes().getIngestNodes().values().toArray(DiscoveryNode[]::new);
if (ingestNodes.length == 0) {
listener.onFailure(new NoNodeAvailableException("No ingest nodes in cluster"));
} else {
DiscoveryNode ingestNode = ingestNodes[Math.floorMod(ingestNodeOffsetGenerator.incrementAndGet(), ingestNodes.length)];
logger.debug("Sending reindex request to {}", ingestNode.getName());
transportService.sendRequest(
ingestNode,
ReindexAction.NAME,
reindexRequest,
new ActionListenerResponseHandler<>(
checkForFailuresListener,
BulkByScrollResponse::new,
TransportResponseHandler.TRANSPORT_WORKER
)
);
}
}

private void updateSettings(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
Expand All @@ -29,10 +33,16 @@
import org.mockito.MockitoAnnotations;

import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class ReindexDataStreamIndexTransportActionTests extends ESTestCase {
Expand Down Expand Up @@ -112,7 +122,10 @@ public void testReindexIncludesRateLimit() {
)
);

doNothing().when(client).execute(eq(ReindexAction.INSTANCE), request.capture(), any());
ClusterState clusterState = mock(ClusterState.class);
when(clusterState.getNodes()).thenReturn(getTestDiscoveryNodes());
when(clusterService.state()).thenReturn(clusterState);
doNothing().when(transportService).sendRequest(any(), eq(ReindexAction.NAME), request.capture(), any());

action.reindex(sourceIndex, destIndex, listener, taskId);

Expand All @@ -137,7 +150,10 @@ public void testReindexIncludesInfiniteRateLimit() {
Collections.singleton(ReindexDataStreamIndexTransportAction.REINDEX_MAX_REQUESTS_PER_SECOND_SETTING)
)
);
doNothing().when(client).execute(eq(ReindexAction.INSTANCE), request.capture(), any());
ClusterState clusterState = mock(ClusterState.class);
when(clusterState.getNodes()).thenReturn(getTestDiscoveryNodes());
when(clusterService.state()).thenReturn(clusterState);
doNothing().when(transportService).sendRequest(any(), eq(ReindexAction.NAME), request.capture(), any());

action.reindex(sourceIndex, destIndex, listener, taskId);

Expand Down Expand Up @@ -204,4 +220,142 @@ public void testReindexNegativeRateLimitThrowsError() {
e.getMessage()
);
}

public void testRoundRobin() {
/*
* This tests that the action will round-robin through the list of ingest nodes in the cluster.
*/
String sourceIndex = randomAlphanumericOfLength(10);
String destIndex = randomAlphanumericOfLength(10);
AtomicBoolean failed = new AtomicBoolean(false);
ActionListener<BulkByScrollResponse> listener = new ActionListener<>() {
@Override
public void onResponse(BulkByScrollResponse bulkByScrollResponse) {}

@Override
public void onFailure(Exception e) {
failed.set(true);
}
};
TaskId taskId = TaskId.EMPTY_TASK_ID;

when(clusterService.getClusterSettings()).thenReturn(
new ClusterSettings(
Settings.EMPTY,
Collections.singleton(ReindexDataStreamIndexTransportAction.REINDEX_MAX_REQUESTS_PER_SECOND_SETTING)
)
);

ClusterState clusterState = mock(ClusterState.class);
when(clusterState.getNodes()).thenReturn(getTestDiscoveryNodes());
when(clusterService.state()).thenReturn(clusterState);
ArgumentCaptor<DiscoveryNode> nodeCaptor = ArgumentCaptor.captor();
doNothing().when(transportService).sendRequest(nodeCaptor.capture(), eq(ReindexAction.NAME), request.capture(), any());

action.reindex(sourceIndex, destIndex, listener, taskId);
DiscoveryNode node1 = nodeCaptor.getValue();
assertNotNull(node1);

action.reindex(sourceIndex, destIndex, listener, taskId);
DiscoveryNode node2 = nodeCaptor.getValue();
assertNotNull(node2);

int ingestNodeCount = clusterState.getNodes().getIngestNodes().size();
if (ingestNodeCount > 1) {
assertThat(node1.getName(), not(equalTo(node2.getName())));
}

// check that if we keep going we eventually get back to the original node:
DiscoveryNode node = node2;
for (int i = 0; i < ingestNodeCount - 1; i++) {
action.reindex(sourceIndex, destIndex, listener, taskId);
node = nodeCaptor.getValue();
}
assertNotNull(node);
assertThat(node1.getName(), equalTo(node.getName()));
assertThat(failed.get(), equalTo(false));

// make sure the listener gets notified of failure if there are no ingest nodes:
when(clusterState.getNodes()).thenReturn(getTestDiscoveryNodesNoIngest());
action.reindex(sourceIndex, destIndex, listener, taskId);
assertThat(failed.get(), equalTo(true));
}

private DiscoveryNodes getTestDiscoveryNodes() {
DiscoveryNodes.Builder builder = DiscoveryNodes.builder();
boolean nodeHasIngestRole = false;
int nodeCount = randomIntBetween(1, 10);
for (int i = 0; i < nodeCount; i++) {
final DiscoveryNode discoveryNode = new DiscoveryNode(
"test-name-" + i,
"test-id-" + i,
"test-ephemeral-id-" + i,
"test-hostname-" + i,
"test-hostaddr",
buildNewFakeTransportAddress(),
Map.of(),
randomSet(
1,
5,
() -> randomFrom(
DiscoveryNodeRole.DATA_ROLE,
DiscoveryNodeRole.INGEST_ROLE,
DiscoveryNodeRole.SEARCH_ROLE,
DiscoveryNodeRole.MASTER_ROLE,
DiscoveryNodeRole.MASTER_ROLE
)
),
null,
null
);
nodeHasIngestRole = nodeHasIngestRole || discoveryNode.getRoles().contains(DiscoveryNodeRole.INGEST_ROLE);
builder.add(discoveryNode);
}
if (nodeHasIngestRole == false) {
final DiscoveryNode discoveryNode = new DiscoveryNode(
"test-name-" + nodeCount,
"test-id-" + nodeCount,
"test-ephemeral-id-" + nodeCount,
"test-hostname-" + nodeCount,
"test-hostaddr",
buildNewFakeTransportAddress(),
Map.of(),
Set.of(DiscoveryNodeRole.INGEST_ROLE),
null,
null
);
builder.add(discoveryNode);
}
return builder.build();
}

private DiscoveryNodes getTestDiscoveryNodesNoIngest() {
DiscoveryNodes.Builder builder = DiscoveryNodes.builder();
int nodeCount = randomIntBetween(0, 10);
for (int i = 0; i < nodeCount; i++) {
final DiscoveryNode discoveryNode = new DiscoveryNode(
"test-name-" + i,
"test-id-" + i,
"test-ephemeral-id-" + i,
"test-hostname-" + i,
"test-hostaddr",
buildNewFakeTransportAddress(),
Map.of(),
randomSet(
1,
4,
() -> randomFrom(
DiscoveryNodeRole.DATA_ROLE,
DiscoveryNodeRole.SEARCH_ROLE,
DiscoveryNodeRole.MASTER_ROLE,
DiscoveryNodeRole.MASTER_ROLE
)
),
null,
null
);
builder.add(discoveryNode);
}
return builder.build();
}
}