Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/129872.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 129872
summary: Run `TransportClusterStateAction` on local node
area: "Indices APIs, Distributed"
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,7 @@ public void testJoin() {

ClusterStateResponse clusterStateResponse = client(masterNode).admin()
.cluster()
.prepareState(TEST_REQUEST_TIMEOUT)
.setMasterNodeTimeout(TimeValue.timeValueSeconds(1))
.prepareState(TimeValue.timeValueSeconds(1))
.clear()
.setNodes(true)
.get();
Expand All @@ -79,11 +78,9 @@ public void testJoin() {
registerGceNode(secondNode);
clusterStateResponse = client(secondNode).admin()
.cluster()
.prepareState(TEST_REQUEST_TIMEOUT)
.setMasterNodeTimeout(TimeValue.timeValueSeconds(1))
.prepareState(TimeValue.timeValueSeconds(1))
.clear()
.setNodes(true)
.setLocal(true)
.get();
assertNotNull(clusterStateResponse.getState().nodes().getMasterNodeId());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
import org.apache.http.nio.entity.NStringEntity;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.cluster.state.RemoteClusterStateRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchShardsRequest;
import org.elasticsearch.action.search.SearchShardsResponse;
Expand Down Expand Up @@ -117,7 +117,7 @@ private static MockTransportService startTransport(
newService.registerRequestHandler(
ClusterStateAction.NAME,
EsExecutors.DIRECT_EXECUTOR_SERVICE,
ClusterStateRequest::new,
RemoteClusterStateRequest::new,
(request, channel, task) -> {
DiscoveryNodes.Builder builder = DiscoveryNodes.builder();
for (DiscoveryNode node : knownNodes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,13 @@
},
"params":{
"local":{
"deprecated":true,
"type":"boolean",
"description":"Return local information, do not retrieve the state from master node (default: false)"
},
"master_timeout":{
"type":"time",
"description":"Specify timeout for connection to master"
"description":"Timeout for waiting for new cluster state in case it is blocked"
},
"flat_settings":{
"type":"boolean",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
package org.elasticsearch.action.admin.cluster.state;

import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.coordination.ClusterBootstrapService;
import org.elasticsearch.cluster.metadata.Metadata;
Expand All @@ -18,7 +17,6 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.discovery.MasterNotDiscoveredException;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
Expand Down Expand Up @@ -47,34 +45,15 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
return Collections.singletonList(MockTransportService.TestPlugin.class);
}

public void testNonLocalRequestAlwaysFindsMaster() throws Exception {
runRepeatedlyWhileChangingMaster(() -> {
final ClusterStateRequestBuilder clusterStateRequestBuilder = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT)
.clear()
.setNodes(true)
.setBlocks(true)
.setMasterNodeTimeout(TimeValue.timeValueMillis(100));
final ClusterStateResponse clusterStateResponse;
try {
clusterStateResponse = clusterStateRequestBuilder.get();
} catch (MasterNotDiscoveredException e) {
return; // ok, we hit the disconnected node
}
assertNotNull("should always contain a master node", clusterStateResponse.getState().nodes().getMasterNodeId());
});
}

public void testLocalRequestAlwaysSucceeds() throws Exception {
runRepeatedlyWhileChangingMaster(() -> {
final String node = randomFrom(internalCluster().getNodeNames());
final DiscoveryNodes discoveryNodes = client(node).admin()
.cluster()
.prepareState(TEST_REQUEST_TIMEOUT)
.prepareState(TimeValue.timeValueMillis(100))
.clear()
.setLocal(true)
.setNodes(true)
.setBlocks(true)
.setMasterNodeTimeout(TimeValue.timeValueMillis(100))
.get()
.getState()
.nodes();
Expand All @@ -87,39 +66,6 @@ public void testLocalRequestAlwaysSucceeds() throws Exception {
});
}

public void testNonLocalRequestAlwaysFindsMasterAndWaitsForMetadata() throws Exception {
runRepeatedlyWhileChangingMaster(() -> {
final String node = randomFrom(internalCluster().getNodeNames());
final long metadataVersion = internalCluster().getInstance(ClusterService.class, node)
.getClusterApplierService()
.state()
.metadata()
.version();
final long waitForMetadataVersion = randomLongBetween(Math.max(1, metadataVersion - 3), metadataVersion + 5);
final ClusterStateRequestBuilder clusterStateRequestBuilder = client(node).admin()
.cluster()
.prepareState(TEST_REQUEST_TIMEOUT)
.clear()
.setNodes(true)
.setMetadata(true)
.setBlocks(true)
.setMasterNodeTimeout(TimeValue.timeValueMillis(100))
.setWaitForTimeOut(TimeValue.timeValueMillis(100))
.setWaitForMetadataVersion(waitForMetadataVersion);
final ClusterStateResponse clusterStateResponse;
try {
clusterStateResponse = clusterStateRequestBuilder.get();
} catch (MasterNotDiscoveredException e) {
return; // ok, we hit the disconnected node
}
if (clusterStateResponse.isWaitForTimedOut() == false) {
final ClusterState state = clusterStateResponse.getState();
assertNotNull("should always contain a master node", state.nodes().getMasterNodeId());
assertThat("waited for metadata version", state.metadata().version(), greaterThanOrEqualTo(waitForMetadataVersion));
}
});
}

public void testLocalRequestWaitsForMetadata() throws Exception {
runRepeatedlyWhileChangingMaster(() -> {
final String node = randomFrom(internalCluster().getNodeNames());
Expand All @@ -131,13 +77,11 @@ public void testLocalRequestWaitsForMetadata() throws Exception {
final long waitForMetadataVersion = randomLongBetween(Math.max(1, metadataVersion - 3), metadataVersion + 5);
final ClusterStateResponse clusterStateResponse = client(node).admin()
.cluster()
.prepareState(TEST_REQUEST_TIMEOUT)
.prepareState(TimeValue.timeValueMillis(100))
.clear()
.setLocal(true)
.setMetadata(true)
.setBlocks(true)
.setWaitForMetadataVersion(waitForMetadataVersion)
.setMasterNodeTimeout(TimeValue.timeValueMillis(100))
.setWaitForTimeOut(TimeValue.timeValueMillis(100))
.get();
if (clusterStateResponse.isWaitForTimedOut() == false) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,7 @@ public void testCreateIndexStopsWaitingWhenIndexDeleted() throws Exception {
.execute();

logger.info("--> wait until the cluster state contains the new index");
assertBusy(
() -> assertTrue(clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).get().getState().metadata().getProject().hasIndex(indexName))
);
awaitClusterState(state -> state.metadata().getProject().hasIndex(indexName));

logger.info("--> delete the index");
assertAcked(indicesAdmin().prepareDelete(indexName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import org.elasticsearch.action.admin.cluster.configuration.TransportAddVotingConfigExclusionsAction;
import org.elasticsearch.action.admin.cluster.configuration.TransportClearVotingConfigExclusionsAction;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException;
import org.elasticsearch.cluster.coordination.NoMasterBlockService;
import org.elasticsearch.cluster.metadata.Metadata;
Expand Down Expand Up @@ -68,7 +67,7 @@ public void testTwoNodesNoMasterBlock() throws Exception {
String node1Name = internalCluster().startNode(settings);

logger.info("--> should be blocked, no master...");
ClusterState state = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).setLocal(true).get().getState();
ClusterState state = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).get().getState();
assertThat(state.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID), equalTo(true));
assertThat(state.nodes().getSize(), equalTo(1)); // verify that we still see the local node in the cluster state

Expand All @@ -81,9 +80,7 @@ public void testTwoNodesNoMasterBlock() throws Exception {
.get();
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));

state = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).setLocal(true).get().getState();
assertThat(state.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID), equalTo(false));
state = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).setLocal(true).get().getState();
state = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).get().getState();
assertThat(state.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID), equalTo(false));

state = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).get().getState();
Expand Down Expand Up @@ -123,12 +120,9 @@ public void testTwoNodesNoMasterBlock() throws Exception {
Settings masterDataPathSettings = internalCluster().dataPathSettings(masterNode);
internalCluster().stopNode(masterNode);

assertBusy(() -> {
ClusterState clusterState = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).setLocal(true).get().getState();
assertTrue(clusterState.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID));
});
awaitClusterState(otherNode, clusterState -> clusterState.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID));

state = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).setLocal(true).get().getState();
state = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).get().getState();
assertThat(state.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID), equalTo(true));
// verify that both nodes are still in the cluster state but there is no master
assertThat(state.nodes().getSize(), equalTo(2));
Expand All @@ -144,9 +138,7 @@ public void testTwoNodesNoMasterBlock() throws Exception {
.get();
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));

state = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).setLocal(true).get().getState();
assertThat(state.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID), equalTo(false));
state = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).setLocal(true).get().getState();
state = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).get().getState();
assertThat(state.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID), equalTo(false));

state = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).get().getState();
Expand Down Expand Up @@ -176,10 +168,7 @@ public void testTwoNodesNoMasterBlock() throws Exception {
Settings otherNodeDataPathSettings = internalCluster().dataPathSettings(otherNode);
internalCluster().stopNode(otherNode);

assertBusy(() -> {
ClusterState state1 = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).setLocal(true).get().getState();
assertThat(state1.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID), equalTo(true));
});
awaitClusterState(masterNode, clusterState -> clusterState.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID));

logger.info("--> starting the previous master node again...");
internalCluster().startNode(Settings.builder().put(settings).put(otherNodeDataPathSettings).build());
Expand All @@ -192,9 +181,7 @@ public void testTwoNodesNoMasterBlock() throws Exception {
.get();
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));

state = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).setLocal(true).get().getState();
assertThat(state.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID), equalTo(false));
state = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).setLocal(true).get().getState();
state = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).get().getState();
assertThat(state.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID), equalTo(false));

state = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).get().getState();
Expand All @@ -220,12 +207,9 @@ public void testThreeNodesNoMasterBlock() throws Exception {

ClusterState state;

assertBusy(() -> {
for (Client client : clients()) {
ClusterState state1 = client.admin().cluster().prepareState(TEST_REQUEST_TIMEOUT).setLocal(true).get().getState();
assertThat(state1.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID), equalTo(true));
}
});
for (var node : internalCluster().getNodeNames()) {
awaitClusterState(node, clusterState -> clusterState.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID));
}

logger.info("--> start one more node");
internalCluster().startNode(settings);
Expand Down Expand Up @@ -261,8 +245,9 @@ public void testThreeNodesNoMasterBlock() throws Exception {
assertHitCount(prepareSearch().setSize(0).setQuery(QueryBuilders.matchAllQuery()), 100);
}

final var masterName = internalCluster().getMasterName();
List<String> nonMasterNodes = new ArrayList<>(
Sets.difference(Sets.newHashSet(internalCluster().getNodeNames()), Collections.singleton(internalCluster().getMasterName()))
Sets.difference(Sets.newHashSet(internalCluster().getNodeNames()), Collections.singleton(masterName))
);
Settings nonMasterDataPathSettings1 = internalCluster().dataPathSettings(nonMasterNodes.get(0));
Settings nonMasterDataPathSettings2 = internalCluster().dataPathSettings(nonMasterNodes.get(1));
Expand All @@ -271,10 +256,7 @@ public void testThreeNodesNoMasterBlock() throws Exception {

logger.info("--> verify that there is no master anymore on remaining node");
// spin here to wait till the state is set
assertBusy(() -> {
ClusterState st = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).setLocal(true).get().getState();
assertThat(st.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID), equalTo(true));
});
awaitClusterState(masterName, clusterState -> clusterState.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID));

logger.info("--> start back the 2 nodes ");
internalCluster().startNodes(nonMasterDataPathSettings1, nonMasterDataPathSettings2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,17 +82,10 @@ public void testNoMasterActions() throws Exception {
internalCluster().setDisruptionScheme(disruptionScheme);
disruptionScheme.startDisrupting();

final Client clientToMasterlessNode = client();

assertBusy(() -> {
ClusterState state = clientToMasterlessNode.admin()
.cluster()
.prepareState(TEST_REQUEST_TIMEOUT)
.setLocal(true)
.get()
.getState();
assertTrue(state.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID));
});
final String masterLessNode = internalCluster().getRandomNodeName();
final Client clientToMasterlessNode = client(masterLessNode);

awaitClusterState(masterLessNode, state -> state.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID));

assertRequestBuilderThrows(
clientToMasterlessNode.prepareGet("test", "1"),
Expand Down Expand Up @@ -246,17 +239,10 @@ public void testNoMasterActionsWriteMasterBlock() throws Exception {
internalCluster().setDisruptionScheme(disruptionScheme);
disruptionScheme.startDisrupting();

final Client clientToMasterlessNode = client();
final String masterLessNode = internalCluster().getRandomNodeName();
final Client clientToMasterlessNode = client(masterLessNode);

assertBusy(() -> {
ClusterState state = clientToMasterlessNode.admin()
.cluster()
.prepareState(TEST_REQUEST_TIMEOUT)
.setLocal(true)
.get()
.getState();
assertTrue(state.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID));
});
awaitClusterState(masterLessNode, state -> state.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID));

GetResponse getResponse = clientToMasterlessNode.prepareGet("test1", "1").get();
assertExists(getResponse);
Expand Down Expand Up @@ -346,12 +332,9 @@ public void testNoMasterActionsMetadataWriteMasterBlock() throws Exception {
internalCluster().setDisruptionScheme(disruptionScheme);
disruptionScheme.startDisrupting();

assertBusy(() -> {
for (String node : nodesWithShards) {
ClusterState state = client(node).admin().cluster().prepareState(TEST_REQUEST_TIMEOUT).setLocal(true).get().getState();
assertTrue(state.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID));
}
});
for (String node : nodesWithShards) {
awaitClusterState(node, state -> state.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID));
}

GetResponse getResponse = client(randomFrom(nodesWithShards)).prepareGet("test1", "1").get();
assertExists(getResponse);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,7 @@ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {

private static void assertClusterUuid(boolean expectCommitted, String expectedValue) {
for (String nodeName : internalCluster().getNodeNames()) {
final Metadata metadata = client(nodeName).admin()
.cluster()
.prepareState(TEST_REQUEST_TIMEOUT)
.setLocal(true)
.get()
.getState()
.metadata();
final Metadata metadata = client(nodeName).admin().cluster().prepareState(TEST_REQUEST_TIMEOUT).get().getState().metadata();
assertEquals(expectCommitted, metadata.clusterUUIDCommitted());
assertEquals(expectedValue, metadata.clusterUUID());

Expand Down
Loading
Loading