Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/125171.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 125171
summary: Reindex data stream indices on different nodes
area: Data streams
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
import org.elasticsearch.action.admin.indices.close.CloseIndexResponse;
import org.elasticsearch.action.admin.indices.close.TransportCloseIndexAction;
Expand All @@ -35,9 +36,12 @@
import org.elasticsearch.action.support.broadcast.BroadcastResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.client.internal.transport.NoNodeAvailableException;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Assertions;
Expand All @@ -52,6 +56,7 @@
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.deprecation.DeprecatedIndexPredicate;
import org.elasticsearch.xpack.migrate.MigrateTemplateRegistry;
Expand All @@ -60,6 +65,7 @@
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;

import static org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock.METADATA;
import static org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock.READ_ONLY;
Expand Down Expand Up @@ -101,6 +107,8 @@ public class ReindexDataStreamIndexTransportAction extends HandledTransportActio

private final ClusterService clusterService;
private final Client client;
private final TransportService transportService;
private final AtomicInteger ingestNodeOffsetGenerator = new AtomicInteger(Randomness.get().nextInt());

@Inject
public ReindexDataStreamIndexTransportAction(
Expand All @@ -119,6 +127,7 @@ public ReindexDataStreamIndexTransportAction(
);
this.clusterService = clusterService;
this.client = client;
this.transportService = transportService;
}

@Override
Expand Down Expand Up @@ -305,7 +314,28 @@ void reindex(String sourceIndexName, String destIndexName, ActionListener<BulkBy
listener.onResponse(bulkByScrollResponse);
}
}, listener::onFailure);
client.execute(ReindexAction.INSTANCE, reindexRequest, checkForFailuresListener);
/*
* Reindex will potentially run a pipeline for each document. If we run all reindex requests on the same node (locally), that
* becomes a bottleneck. This code round-robins reindex requests to all ingest nodes to spread out the pipeline workload. When a
* data stream has many indices, this can improve performance a good bit.
*/
final DiscoveryNode[] ingestNodes = clusterService.state().getNodes().getIngestNodes().values().toArray(DiscoveryNode[]::new);
if (ingestNodes.length == 0) {
listener.onFailure(new NoNodeAvailableException("No ingest nodes in cluster"));
} else {
DiscoveryNode ingestNode = ingestNodes[Math.floorMod(ingestNodeOffsetGenerator.incrementAndGet(), ingestNodes.length)];
Copy link
Contributor

@lukewhiting lukewhiting Mar 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checking the API's for Math.floorMod and AtomicInteger, I think this line should correctly handle the case where the AtomicInteger overflows and the dividend becomes negative but is it worth adding an test for that case to future proof or are the current tests OK to handle it passively?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just tried it out, and it is a minor bug -- Math.floorMod(Integer.MAX_VALUE, 17) is equal to Math.floorMod(Integer.MAX_VALUE + 1, 17). So if the test were to start with a value near Integer.MAX_VALUE it would fail (although I'm not too worried about the round-robin repeating a node twice in that very rare situation.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the maximum initial value of that random number to be much smaller so that it never exceeds MAX_VALUE and the test will never fail. I think the actual behavior (choosing the same node twice once every 4.3 billion times) is harmless, and not worth the complexity of fixing.

logger.debug("Sending reindex request to {}", ingestNode.getName());
transportService.sendRequest(
ingestNode,
ReindexAction.NAME,
reindexRequest,
new ActionListenerResponseHandler<>(
checkForFailuresListener,
BulkByScrollResponse::new,
TransportResponseHandler.TRANSPORT_WORKER
)
);
}
}

private void updateSettings(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
Expand All @@ -29,10 +33,16 @@
import org.mockito.MockitoAnnotations;

import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class ReindexDataStreamIndexTransportActionTests extends ESTestCase {
Expand Down Expand Up @@ -112,7 +122,10 @@ public void testReindexIncludesRateLimit() {
)
);

doNothing().when(client).execute(eq(ReindexAction.INSTANCE), request.capture(), any());
ClusterState clusterState = mock(ClusterState.class);
when(clusterState.getNodes()).thenReturn(getTestDiscoveryNodes());
when(clusterService.state()).thenReturn(clusterState);
doNothing().when(transportService).sendRequest(any(), eq(ReindexAction.NAME), request.capture(), any());

action.reindex(sourceIndex, destIndex, listener, taskId);

Expand All @@ -137,7 +150,10 @@ public void testReindexIncludesInfiniteRateLimit() {
Collections.singleton(ReindexDataStreamIndexTransportAction.REINDEX_MAX_REQUESTS_PER_SECOND_SETTING)
)
);
doNothing().when(client).execute(eq(ReindexAction.INSTANCE), request.capture(), any());
ClusterState clusterState = mock(ClusterState.class);
when(clusterState.getNodes()).thenReturn(getTestDiscoveryNodes());
when(clusterService.state()).thenReturn(clusterState);
doNothing().when(transportService).sendRequest(any(), eq(ReindexAction.NAME), request.capture(), any());

action.reindex(sourceIndex, destIndex, listener, taskId);

Expand Down Expand Up @@ -204,4 +220,142 @@ public void testReindexNegativeRateLimitThrowsError() {
e.getMessage()
);
}

public void testRoundRobin() {
/*
* This tests that the action will round-robin through the list of ingest nodes in the cluster.
*/
String sourceIndex = randomAlphanumericOfLength(10);
String destIndex = randomAlphanumericOfLength(10);
AtomicBoolean failed = new AtomicBoolean(false);
ActionListener<BulkByScrollResponse> listener = new ActionListener<>() {
@Override
public void onResponse(BulkByScrollResponse bulkByScrollResponse) {}

@Override
public void onFailure(Exception e) {
failed.set(true);
}
};
TaskId taskId = TaskId.EMPTY_TASK_ID;

when(clusterService.getClusterSettings()).thenReturn(
new ClusterSettings(
Settings.EMPTY,
Collections.singleton(ReindexDataStreamIndexTransportAction.REINDEX_MAX_REQUESTS_PER_SECOND_SETTING)
)
);

ClusterState clusterState = mock(ClusterState.class);
when(clusterState.getNodes()).thenReturn(getTestDiscoveryNodes());
when(clusterService.state()).thenReturn(clusterState);
ArgumentCaptor<DiscoveryNode> nodeCaptor = ArgumentCaptor.captor();
doNothing().when(transportService).sendRequest(nodeCaptor.capture(), eq(ReindexAction.NAME), request.capture(), any());

action.reindex(sourceIndex, destIndex, listener, taskId);
DiscoveryNode node1 = nodeCaptor.getValue();
assertNotNull(node1);

action.reindex(sourceIndex, destIndex, listener, taskId);
DiscoveryNode node2 = nodeCaptor.getValue();
assertNotNull(node2);

int ingestNodeCount = clusterState.getNodes().getIngestNodes().size();
if (ingestNodeCount > 1) {
assertThat(node1.getName(), not(equalTo(node2.getName())));
}

// check that if we keep going we eventually get back to the original node:
DiscoveryNode node = node2;
for (int i = 0; i < ingestNodeCount - 1; i++) {
action.reindex(sourceIndex, destIndex, listener, taskId);
node = nodeCaptor.getValue();
}
assertNotNull(node);
assertThat(node1.getName(), equalTo(node.getName()));
assertThat(failed.get(), equalTo(false));

// make sure the listener gets notified of failure if there are no ingest nodes:
when(clusterState.getNodes()).thenReturn(getTestDiscoveryNodesNoIngest());
action.reindex(sourceIndex, destIndex, listener, taskId);
assertThat(failed.get(), equalTo(true));
}

private DiscoveryNodes getTestDiscoveryNodes() {
DiscoveryNodes.Builder builder = DiscoveryNodes.builder();
boolean nodeHasIngestRole = false;
int nodeCount = randomIntBetween(1, 10);
for (int i = 0; i < nodeCount; i++) {
final DiscoveryNode discoveryNode = new DiscoveryNode(
"test-name-" + i,
"test-id-" + i,
"test-ephemeral-id-" + i,
"test-hostname-" + i,
"test-hostaddr",
buildNewFakeTransportAddress(),
Map.of(),
randomSet(
1,
5,
() -> randomFrom(
DiscoveryNodeRole.DATA_ROLE,
DiscoveryNodeRole.INGEST_ROLE,
DiscoveryNodeRole.SEARCH_ROLE,
DiscoveryNodeRole.MASTER_ROLE,
DiscoveryNodeRole.MASTER_ROLE
)
),
null,
null
);
nodeHasIngestRole = nodeHasIngestRole || discoveryNode.getRoles().contains(DiscoveryNodeRole.INGEST_ROLE);
builder.add(discoveryNode);
}
if (nodeHasIngestRole == false) {
final DiscoveryNode discoveryNode = new DiscoveryNode(
"test-name-" + nodeCount,
"test-id-" + nodeCount,
"test-ephemeral-id-" + nodeCount,
"test-hostname-" + nodeCount,
"test-hostaddr",
buildNewFakeTransportAddress(),
Map.of(),
Set.of(DiscoveryNodeRole.INGEST_ROLE),
null,
null
);
builder.add(discoveryNode);
}
return builder.build();
}

private DiscoveryNodes getTestDiscoveryNodesNoIngest() {
DiscoveryNodes.Builder builder = DiscoveryNodes.builder();
int nodeCount = randomIntBetween(0, 10);
for (int i = 0; i < nodeCount; i++) {
final DiscoveryNode discoveryNode = new DiscoveryNode(
"test-name-" + i,
"test-id-" + i,
"test-ephemeral-id-" + i,
"test-hostname-" + i,
"test-hostaddr",
buildNewFakeTransportAddress(),
Map.of(),
randomSet(
1,
4,
() -> randomFrom(
DiscoveryNodeRole.DATA_ROLE,
DiscoveryNodeRole.SEARCH_ROLE,
DiscoveryNodeRole.MASTER_ROLE,
DiscoveryNodeRole.MASTER_ROLE
)
),
null,
null
);
builder.add(discoveryNode);
}
return builder.build();
}
}
Loading