|
8 | 8 |
|
9 | 9 | package org.opensearch.upgrades; |
10 | 10 |
|
| 11 | +import org.opensearch.Version; |
11 | 12 | import org.opensearch.client.Request; |
12 | 13 | import org.opensearch.client.Response; |
13 | 14 | import org.opensearch.common.settings.Settings; |
14 | 15 |
|
| 16 | +import java.io.IOException; |
15 | 17 | import java.util.Arrays; |
16 | 18 | import java.util.List; |
17 | 19 | import java.util.Locale; |
18 | 20 | import java.util.Map; |
| 21 | +import java.util.concurrent.TimeUnit; |
19 | 22 |
|
20 | 23 | /** |
21 | 24 | * Integration tests for remote publication enabled clusters during rolling upgrades. |
@@ -65,18 +68,122 @@ public void testUpgradeWithRemotePublicationEnabled() throws Exception { |
65 | 68 | verifyComponentTemplateInClusterState(response); |
66 | 69 | verifyComposableTemplateInClusterState(response); |
67 | 70 | verifySettingsInClusterState(); |
| 71 | + } else if (CLUSTER_TYPE == ClusterType.MIXED && firstMixedRound) { |
| 72 | + verifyRemotePublicationEnabled(); |
| 73 | + verifyClusterState(); |
| 74 | + |
| 75 | + // Test both cluster-manager version scenarios to ensure remote state serialization |
| 76 | + // is backwards compatible in both directions: |
| 77 | + // 1. Old CM writes state that new nodes must read from remote store |
| 78 | + // 2. New CM writes state that old nodes must read from remote store |
| 79 | + ensureClusterManagerVersion(false); |
| 80 | + makeClusterStateChange("old_cm"); |
| 81 | + ensureAllNodesHealthy(); |
| 82 | + verifyClusterState(); |
| 83 | + |
| 84 | + ensureClusterManagerVersion(true); |
| 85 | + makeClusterStateChange("new_cm"); |
| 86 | + ensureAllNodesHealthy(); |
| 87 | + verifyClusterState(); |
68 | 88 | } else { |
69 | 89 | verifyRemotePublicationEnabled(); |
| 90 | + verifyClusterState(); |
| 91 | + } |
| 92 | + } |
70 | 93 |
|
71 | | - Request request = new Request("GET", "_cluster/state"); |
72 | | - Response response = client().performRequest(request); |
73 | | - assertOK(response); |
| 94 | + private void verifyClusterState() throws Exception { |
| 95 | + Request request = new Request("GET", "_cluster/state"); |
| 96 | + Response response = client().performRequest(request); |
| 97 | + assertOK(response); |
| 98 | + verifyIndexInClusterState(response); |
| 99 | + verifyTemplateMetadataInClusterState(response); |
| 100 | + verifyComponentTemplateInClusterState(response); |
| 101 | + verifyComposableTemplateInClusterState(response); |
| 102 | + } |
74 | 103 |
|
75 | | - verifyIndexInClusterState(response); |
76 | | - verifyTemplateMetadataInClusterState(response); |
77 | | - verifyComponentTemplateInClusterState(response); |
78 | | - verifyComposableTemplateInClusterState(response); |
| 104 | + /** |
| 105 | + * Returns true if the current cluster-manager node is running the new (upgraded) version. |
| 106 | + */ |
| 107 | + private boolean isClusterManagerOnNewVersion() throws IOException { |
| 108 | + Map<String, Object> clusterState = entityAsMap(client().performRequest(new Request("GET", "_cluster/state"))); |
| 109 | + String clusterManagerNodeId = (String) clusterState.get("master_node"); |
| 110 | + |
| 111 | + Map<String, Object> nodesInfo = entityAsMap(client().performRequest(new Request("GET", "_nodes"))); |
| 112 | + Map<String, Object> nodes = (Map<String, Object>) nodesInfo.get("nodes"); |
| 113 | + Map<String, Object> cmNode = (Map<String, Object>) nodes.get(clusterManagerNodeId); |
| 114 | + Version cmVersion = Version.fromString((String) cmNode.get("version")); |
| 115 | + return cmVersion.after(UPGRADE_FROM_VERSION); |
| 116 | + } |
| 117 | + |
| 118 | + /** |
| 119 | + * Ensures the cluster-manager is on the desired version by repeatedly excluding the current CM |
| 120 | + * to trigger re-elections until a node of the desired version wins. |
| 121 | + */ |
| 122 | + private void ensureClusterManagerVersion(boolean newVersion) throws Exception { |
| 123 | + String versionLabel = newVersion ? "new" : "old"; |
| 124 | + long deadline = System.nanoTime() + TimeUnit.MINUTES.toNanos(1); |
| 125 | + int attempt = 0; |
| 126 | + while (isClusterManagerOnNewVersion() != newVersion) { |
| 127 | + if (System.nanoTime() > deadline) { |
| 128 | + fail("Failed to get cluster-manager on " + versionLabel + " version after 1 minute"); |
| 129 | + } |
| 130 | + String cmName = getClusterManagerNodeName(); |
| 131 | + logger.info("Attempt {} to get {} version CM, excluding current CM [{}]", ++attempt, versionLabel, cmName); |
| 132 | + |
| 133 | + Request exclude = new Request("POST", "/_cluster/voting_config_exclusions"); |
| 134 | + exclude.addParameter("node_names", cmName); |
| 135 | + exclude.addParameter("timeout", "30s"); |
| 136 | + assertOK(client().performRequest(exclude)); |
| 137 | + |
| 138 | + // Wait for a different node to become CM |
| 139 | + assertBusy(() -> assertNotEquals(cmName, getClusterManagerNodeName())); |
| 140 | + |
| 141 | + // Clear exclusion immediately so the node stays in the cluster |
| 142 | + clearVotingConfigExclusions(); |
79 | 143 | } |
| 144 | + logger.info("Cluster manager is on {} version: [{}]", versionLabel, getClusterManagerNodeName()); |
| 145 | + } |
| 146 | + |
| 147 | + private String getClusterManagerNodeName() throws IOException { |
| 148 | + Map<String, Object> clusterState = entityAsMap(client().performRequest(new Request("GET", "_cluster/state"))); |
| 149 | + String cmNodeId = (String) clusterState.get("master_node"); |
| 150 | + Map<String, Object> nodesInfo = entityAsMap(client().performRequest(new Request("GET", "_nodes"))); |
| 151 | + Map<String, Object> nodes = (Map<String, Object>) nodesInfo.get("nodes"); |
| 152 | + Map<String, Object> cmNode = (Map<String, Object>) nodes.get(cmNodeId); |
| 153 | + return (String) cmNode.get("name"); |
| 154 | + } |
| 155 | + |
| 156 | + /** |
| 157 | + * Makes a small cluster state change to force the cluster-manager to publish new state, |
| 158 | + * which exercises the remote state serialization/deserialization path. |
| 159 | + */ |
| 160 | + private void makeClusterStateChange(String suffix) throws IOException { |
| 161 | + Request putSettings = new Request("PUT", "_cluster/settings"); |
| 162 | + putSettings.setJsonEntity(String.format(Locale.ROOT, """ |
| 163 | + { |
| 164 | + "transient": { |
| 165 | + "cluster.routing.allocation.exclude._name": "nonexistent_node_%s" |
| 166 | + } |
| 167 | + }""", suffix)); |
| 168 | + assertOK(client().performRequest(putSettings)); |
| 169 | + } |
| 170 | + |
| 171 | + private void clearVotingConfigExclusions() throws IOException { |
| 172 | + Request clearRequest = new Request("DELETE", "/_cluster/voting_config_exclusions"); |
| 173 | + clearRequest.addParameter("wait_for_removal", "false"); |
| 174 | + assertOK(client().performRequest(clearRequest)); |
| 175 | + } |
| 176 | + |
| 177 | + /** |
| 178 | + * Verifies all 3 nodes are present and the cluster is healthy. If any node failed to apply |
| 179 | + * cluster state (e.g. due to remote state deserialization errors), it will not be part of |
| 180 | + * the cluster and this check will fail. |
| 181 | + */ |
| 182 | + private void ensureAllNodesHealthy() throws Exception { |
| 183 | + ensureHealth(request -> { |
| 184 | + request.addParameter("wait_for_nodes", "3"); |
| 185 | + request.addParameter("timeout", "60s"); |
| 186 | + }); |
80 | 187 | } |
81 | 188 |
|
82 | 189 | private static void createIndexTemplate() throws Exception { |
|
0 commit comments