Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/129872.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 129872
summary: Run `TransportClusterStateAction` on local node
area: Distributed
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,7 @@ public void testJoin() {

ClusterStateResponse clusterStateResponse = client(masterNode).admin()
.cluster()
.prepareState(TEST_REQUEST_TIMEOUT)
.setMasterNodeTimeout(TimeValue.timeValueSeconds(1))
.prepareState(TimeValue.timeValueSeconds(1))
.clear()
.setNodes(true)
.get();
Expand All @@ -79,11 +78,9 @@ public void testJoin() {
registerGceNode(secondNode);
clusterStateResponse = client(secondNode).admin()
.cluster()
.prepareState(TEST_REQUEST_TIMEOUT)
.setMasterNodeTimeout(TimeValue.timeValueSeconds(1))
.prepareState(TimeValue.timeValueSeconds(1))
.clear()
.setNodes(true)
.setLocal(true)
.get();
assertNotNull(clusterStateResponse.getState().nodes().getMasterNodeId());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
import org.apache.http.nio.entity.NStringEntity;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.cluster.state.RemoteClusterStateRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchShardsRequest;
import org.elasticsearch.action.search.SearchShardsResponse;
Expand Down Expand Up @@ -117,7 +117,7 @@ private static MockTransportService startTransport(
newService.registerRequestHandler(
ClusterStateAction.NAME,
EsExecutors.DIRECT_EXECUTOR_SERVICE,
ClusterStateRequest::new,
RemoteClusterStateRequest::new,
(request, channel, task) -> {
DiscoveryNodes.Builder builder = DiscoveryNodes.builder();
for (DiscoveryNode node : knownNodes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,13 @@
},
"params":{
"local":{
"deprecated":true,
"type":"boolean",
"description":"Return local information, do not retrieve the state from master node (default: false)"
},
"master_timeout":{
"type":"time",
"description":"Specify timeout for connection to master"
"description":"Timeout for waiting for new cluster state in case it is blocked"
},
"flat_settings":{
"type":"boolean",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
package org.elasticsearch.action.admin.cluster.state;

import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.coordination.ClusterBootstrapService;
import org.elasticsearch.cluster.metadata.Metadata;
Expand All @@ -18,7 +17,6 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.discovery.MasterNotDiscoveredException;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
Expand Down Expand Up @@ -47,34 +45,15 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
return Collections.singletonList(MockTransportService.TestPlugin.class);
}

public void testNonLocalRequestAlwaysFindsMaster() throws Exception {
runRepeatedlyWhileChangingMaster(() -> {
final ClusterStateRequestBuilder clusterStateRequestBuilder = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT)
.clear()
.setNodes(true)
.setBlocks(true)
.setMasterNodeTimeout(TimeValue.timeValueMillis(100));
final ClusterStateResponse clusterStateResponse;
try {
clusterStateResponse = clusterStateRequestBuilder.get();
} catch (MasterNotDiscoveredException e) {
return; // ok, we hit the disconnected node
}
assertNotNull("should always contain a master node", clusterStateResponse.getState().nodes().getMasterNodeId());
});
}

public void testLocalRequestAlwaysSucceeds() throws Exception {
runRepeatedlyWhileChangingMaster(() -> {
final String node = randomFrom(internalCluster().getNodeNames());
final DiscoveryNodes discoveryNodes = client(node).admin()
.cluster()
.prepareState(TEST_REQUEST_TIMEOUT)
.prepareState(TimeValue.timeValueMillis(100))
.clear()
.setLocal(true)
.setNodes(true)
.setBlocks(true)
.setMasterNodeTimeout(TimeValue.timeValueMillis(100))
.get()
.getState()
.nodes();
Expand All @@ -87,39 +66,6 @@ public void testLocalRequestAlwaysSucceeds() throws Exception {
});
}

public void testNonLocalRequestAlwaysFindsMasterAndWaitsForMetadata() throws Exception {
runRepeatedlyWhileChangingMaster(() -> {
final String node = randomFrom(internalCluster().getNodeNames());
final long metadataVersion = internalCluster().getInstance(ClusterService.class, node)
.getClusterApplierService()
.state()
.metadata()
.version();
final long waitForMetadataVersion = randomLongBetween(Math.max(1, metadataVersion - 3), metadataVersion + 5);
final ClusterStateRequestBuilder clusterStateRequestBuilder = client(node).admin()
.cluster()
.prepareState(TEST_REQUEST_TIMEOUT)
.clear()
.setNodes(true)
.setMetadata(true)
.setBlocks(true)
.setMasterNodeTimeout(TimeValue.timeValueMillis(100))
.setWaitForTimeOut(TimeValue.timeValueMillis(100))
.setWaitForMetadataVersion(waitForMetadataVersion);
final ClusterStateResponse clusterStateResponse;
try {
clusterStateResponse = clusterStateRequestBuilder.get();
} catch (MasterNotDiscoveredException e) {
return; // ok, we hit the disconnected node
}
if (clusterStateResponse.isWaitForTimedOut() == false) {
final ClusterState state = clusterStateResponse.getState();
assertNotNull("should always contain a master node", state.nodes().getMasterNodeId());
assertThat("waited for metadata version", state.metadata().version(), greaterThanOrEqualTo(waitForMetadataVersion));
}
});
}

public void testLocalRequestWaitsForMetadata() throws Exception {
runRepeatedlyWhileChangingMaster(() -> {
final String node = randomFrom(internalCluster().getNodeNames());
Expand All @@ -131,13 +77,11 @@ public void testLocalRequestWaitsForMetadata() throws Exception {
final long waitForMetadataVersion = randomLongBetween(Math.max(1, metadataVersion - 3), metadataVersion + 5);
final ClusterStateResponse clusterStateResponse = client(node).admin()
.cluster()
.prepareState(TEST_REQUEST_TIMEOUT)
.prepareState(TimeValue.timeValueMillis(100))
.clear()
.setLocal(true)
.setMetadata(true)
.setBlocks(true)
.setWaitForMetadataVersion(waitForMetadataVersion)
.setMasterNodeTimeout(TimeValue.timeValueMillis(100))
.setWaitForTimeOut(TimeValue.timeValueMillis(100))
.get();
if (clusterStateResponse.isWaitForTimedOut() == false) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,7 @@ public void testCreateIndexStopsWaitingWhenIndexDeleted() throws Exception {
.execute();

logger.info("--> wait until the cluster state contains the new index");
assertBusy(
() -> assertTrue(clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).get().getState().metadata().getProject().hasIndex(indexName))
);
awaitClusterState(state -> state.metadata().getProject().hasIndex(indexName));

logger.info("--> delete the index");
assertAcked(indicesAdmin().prepareDelete(indexName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import org.elasticsearch.action.admin.cluster.configuration.TransportAddVotingConfigExclusionsAction;
import org.elasticsearch.action.admin.cluster.configuration.TransportClearVotingConfigExclusionsAction;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException;
import org.elasticsearch.cluster.coordination.NoMasterBlockService;
import org.elasticsearch.cluster.metadata.Metadata;
Expand Down Expand Up @@ -68,7 +67,7 @@ public void testTwoNodesNoMasterBlock() throws Exception {
String node1Name = internalCluster().startNode(settings);

logger.info("--> should be blocked, no master...");
ClusterState state = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).setLocal(true).get().getState();
ClusterState state = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).get().getState();
assertThat(state.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID), equalTo(true));
assertThat(state.nodes().getSize(), equalTo(1)); // verify that we still see the local node in the cluster state

Expand All @@ -81,9 +80,7 @@ public void testTwoNodesNoMasterBlock() throws Exception {
.get();
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));

state = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).setLocal(true).get().getState();
assertThat(state.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID), equalTo(false));
state = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).setLocal(true).get().getState();
state = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).get().getState();
assertThat(state.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID), equalTo(false));

state = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).get().getState();
Expand Down Expand Up @@ -123,12 +120,9 @@ public void testTwoNodesNoMasterBlock() throws Exception {
Settings masterDataPathSettings = internalCluster().dataPathSettings(masterNode);
internalCluster().stopNode(masterNode);

assertBusy(() -> {
ClusterState clusterState = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).setLocal(true).get().getState();
assertTrue(clusterState.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID));
});
awaitClusterState(otherNode, clusterState -> clusterState.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID));

state = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).setLocal(true).get().getState();
state = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).get().getState();
assertThat(state.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID), equalTo(true));
// verify that both nodes are still in the cluster state but there is no master
assertThat(state.nodes().getSize(), equalTo(2));
Expand All @@ -144,9 +138,7 @@ public void testTwoNodesNoMasterBlock() throws Exception {
.get();
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));

state = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).setLocal(true).get().getState();
assertThat(state.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID), equalTo(false));
state = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).setLocal(true).get().getState();
state = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).get().getState();
assertThat(state.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID), equalTo(false));

state = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).get().getState();
Expand Down Expand Up @@ -176,10 +168,7 @@ public void testTwoNodesNoMasterBlock() throws Exception {
Settings otherNodeDataPathSettings = internalCluster().dataPathSettings(otherNode);
internalCluster().stopNode(otherNode);

assertBusy(() -> {
ClusterState state1 = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).setLocal(true).get().getState();
assertThat(state1.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID), equalTo(true));
});
awaitClusterState(masterNode, clusterState -> clusterState.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID));

logger.info("--> starting the previous master node again...");
internalCluster().startNode(Settings.builder().put(settings).put(otherNodeDataPathSettings).build());
Expand All @@ -192,9 +181,7 @@ public void testTwoNodesNoMasterBlock() throws Exception {
.get();
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));

state = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).setLocal(true).get().getState();
assertThat(state.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID), equalTo(false));
state = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).setLocal(true).get().getState();
state = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).get().getState();
assertThat(state.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID), equalTo(false));

state = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).get().getState();
Expand All @@ -220,12 +207,9 @@ public void testThreeNodesNoMasterBlock() throws Exception {

ClusterState state;

assertBusy(() -> {
for (Client client : clients()) {
ClusterState state1 = client.admin().cluster().prepareState(TEST_REQUEST_TIMEOUT).setLocal(true).get().getState();
assertThat(state1.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID), equalTo(true));
}
});
for (var node : internalCluster().getNodeNames()) {
awaitClusterState(node, clusterState -> clusterState.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID));
}

logger.info("--> start one more node");
internalCluster().startNode(settings);
Expand Down Expand Up @@ -261,8 +245,9 @@ public void testThreeNodesNoMasterBlock() throws Exception {
assertHitCount(prepareSearch().setSize(0).setQuery(QueryBuilders.matchAllQuery()), 100);
}

final var masterName = internalCluster().getMasterName();
List<String> nonMasterNodes = new ArrayList<>(
Sets.difference(Sets.newHashSet(internalCluster().getNodeNames()), Collections.singleton(internalCluster().getMasterName()))
Sets.difference(Sets.newHashSet(internalCluster().getNodeNames()), Collections.singleton(masterName))
);
Settings nonMasterDataPathSettings1 = internalCluster().dataPathSettings(nonMasterNodes.get(0));
Settings nonMasterDataPathSettings2 = internalCluster().dataPathSettings(nonMasterNodes.get(1));
Expand All @@ -271,10 +256,7 @@ public void testThreeNodesNoMasterBlock() throws Exception {

logger.info("--> verify that there is no master anymore on remaining node");
// spin here to wait till the state is set
assertBusy(() -> {
ClusterState st = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).setLocal(true).get().getState();
assertThat(st.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID), equalTo(true));
});
awaitClusterState(masterName, clusterState -> clusterState.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID));

logger.info("--> start back the 2 nodes ");
internalCluster().startNodes(nonMasterDataPathSettings1, nonMasterDataPathSettings2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,17 +82,10 @@ public void testNoMasterActions() throws Exception {
internalCluster().setDisruptionScheme(disruptionScheme);
disruptionScheme.startDisrupting();

final Client clientToMasterlessNode = client();

assertBusy(() -> {
ClusterState state = clientToMasterlessNode.admin()
.cluster()
.prepareState(TEST_REQUEST_TIMEOUT)
.setLocal(true)
.get()
.getState();
assertTrue(state.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID));
});
final String masterlessNode = internalCluster().getRandomNodeName();
final Client clientToMasterlessNode = client(masterlessNode);

awaitClusterState(masterlessNode, state -> state.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID));

assertRequestBuilderThrows(
clientToMasterlessNode.prepareGet("test", "1"),
Expand Down Expand Up @@ -246,17 +239,10 @@ public void testNoMasterActionsWriteMasterBlock() throws Exception {
internalCluster().setDisruptionScheme(disruptionScheme);
disruptionScheme.startDisrupting();

final Client clientToMasterlessNode = client();
final String masterlessNode = internalCluster().getRandomNodeName();
final Client clientToMasterlessNode = client(masterlessNode);

assertBusy(() -> {
ClusterState state = clientToMasterlessNode.admin()
.cluster()
.prepareState(TEST_REQUEST_TIMEOUT)
.setLocal(true)
.get()
.getState();
assertTrue(state.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID));
});
awaitClusterState(masterlessNode, state -> state.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID));

GetResponse getResponse = clientToMasterlessNode.prepareGet("test1", "1").get();
assertExists(getResponse);
Expand Down Expand Up @@ -346,12 +332,9 @@ public void testNoMasterActionsMetadataWriteMasterBlock() throws Exception {
internalCluster().setDisruptionScheme(disruptionScheme);
disruptionScheme.startDisrupting();

assertBusy(() -> {
for (String node : nodesWithShards) {
ClusterState state = client(node).admin().cluster().prepareState(TEST_REQUEST_TIMEOUT).setLocal(true).get().getState();
assertTrue(state.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID));
}
});
for (String node : nodesWithShards) {
awaitClusterState(node, state -> state.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID));
}

GetResponse getResponse = client(randomFrom(nodesWithShards)).prepareGet("test1", "1").get();
assertExists(getResponse);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,7 @@ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {

private static void assertClusterUuid(boolean expectCommitted, String expectedValue) {
for (String nodeName : internalCluster().getNodeNames()) {
final Metadata metadata = client(nodeName).admin()
.cluster()
.prepareState(TEST_REQUEST_TIMEOUT)
.setLocal(true)
.get()
.getState()
.metadata();
final Metadata metadata = client(nodeName).admin().cluster().prepareState(TEST_REQUEST_TIMEOUT).get().getState().metadata();
assertEquals(expectCommitted, metadata.clusterUUIDCommitted());
assertEquals(expectedValue, metadata.clusterUUID());

Expand Down
Loading
Loading