|
19 | 19 | package org.apache.cassandra.distributed.test.hostreplacement;
|
20 | 20 |
|
21 | 21 | import java.io.IOException;
|
| 22 | +import java.net.UnknownHostException; |
22 | 23 | import java.util.Arrays;
|
23 | 24 | import java.util.List;
|
| 25 | +import java.util.UUID; |
24 | 26 |
|
25 | 27 | import org.junit.Test;
|
26 | 28 | import org.slf4j.Logger;
|
|
32 | 34 | import org.apache.cassandra.distributed.api.Feature;
|
33 | 35 | import org.apache.cassandra.distributed.api.ICoordinator;
|
34 | 36 | import org.apache.cassandra.distributed.api.IInvokableInstance;
|
| 37 | +import org.apache.cassandra.distributed.api.IIsolatedExecutor; |
35 | 38 | import org.apache.cassandra.distributed.api.SimpleQueryResult;
|
36 | 39 | import org.apache.cassandra.distributed.api.TokenSupplier;
|
| 40 | +import org.apache.cassandra.distributed.impl.InstanceConfig; |
37 | 41 | import org.apache.cassandra.distributed.shared.AssertUtils;
|
| 42 | +import org.apache.cassandra.distributed.shared.ClusterUtils; |
| 43 | +import org.apache.cassandra.distributed.shared.WithProperties; |
38 | 44 | import org.apache.cassandra.distributed.test.TestBaseImpl;
|
| 45 | +import org.apache.cassandra.gms.ApplicationState; |
| 46 | +import org.apache.cassandra.gms.EndpointState; |
| 47 | +import org.apache.cassandra.gms.Gossiper; |
| 48 | +import org.apache.cassandra.gms.VersionedValue; |
| 49 | +import org.apache.cassandra.io.util.FileUtils; |
| 50 | +import org.apache.cassandra.locator.InetAddressAndPort; |
| 51 | +import org.apache.cassandra.service.StorageService; |
39 | 52 | import org.assertj.core.api.Assertions;
|
40 | 53 |
|
41 | 54 | import static org.apache.cassandra.config.CassandraRelevantProperties.BOOTSTRAP_SKIP_SCHEMA_CHECK;
|
|
44 | 57 | import static org.apache.cassandra.distributed.shared.ClusterUtils.assertRingIs;
|
45 | 58 | import static org.apache.cassandra.distributed.shared.ClusterUtils.awaitRingHealthy;
|
46 | 59 | import static org.apache.cassandra.distributed.shared.ClusterUtils.awaitRingJoin;
|
| 60 | +import static org.apache.cassandra.distributed.shared.ClusterUtils.getDirectories; |
47 | 61 | import static org.apache.cassandra.distributed.shared.ClusterUtils.getTokenMetadataTokens;
|
48 | 62 | import static org.apache.cassandra.distributed.shared.ClusterUtils.replaceHostAndStart;
|
49 | 63 | import static org.apache.cassandra.distributed.shared.ClusterUtils.stopUnchecked;
|
| 64 | +import static org.apache.cassandra.gms.Gossiper.Props.DISABLE_THREAD_VALIDATION; |
| 65 | +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; |
| 66 | +import static org.junit.Assert.assertFalse; |
50 | 67 |
|
51 | 68 | public class HostReplacementTest extends TestBaseImpl
|
52 | 69 | {
|
@@ -205,6 +222,91 @@ public void seedGoesDownBeforeDownHost() throws IOException
|
205 | 222 | }
|
206 | 223 | }
|
207 | 224 |
|
| 225 | + /** |
| 226 | + * Make sure that a node stuck in hibernate state due to failed replacement can retry the replacement procedure and succeed. |
| 227 | + */ |
| 228 | + @Test |
| 229 | + public void retryingFailedReplaceWithNodeInHibernateState() throws IOException |
| 230 | + { |
| 231 | + try (WithProperties properties = new WithProperties()) |
| 232 | + { |
| 233 | + properties.setProperty(DISABLE_THREAD_VALIDATION, "true"); |
| 234 | + |
| 235 | + // given a two node cluster with one seed |
| 236 | + TokenSupplier even = TokenSupplier.evenlyDistributedTokens(2); |
| 237 | + try (Cluster cluster = Cluster.build(2) |
| 238 | + .withConfig(c -> c.with(Feature.GOSSIP, Feature.NATIVE_PROTOCOL) |
| 239 | + .set(Constants.KEY_DTEST_API_STARTUP_FAILURE_AS_SHUTDOWN, true)) |
| 240 | + .withTokenSupplier(node -> even.token(node == 3 ? 2 : node)) |
| 241 | + .start() ) |
| 242 | + { |
| 243 | + IInvokableInstance seed = cluster.get(1); |
| 244 | + IInvokableInstance nodeToReplace = cluster.get(2); |
| 245 | + |
| 246 | + setupCluster(cluster); |
| 247 | + SimpleQueryResult expectedState = nodeToReplace.coordinator().executeWithResult("SELECT * FROM " + KEYSPACE + ".tbl", ConsistencyLevel.ALL); |
| 248 | + |
| 249 | + // when |
| 250 | + // stop the node to replace |
| 251 | + stopUnchecked(nodeToReplace); |
| 252 | + // wipe the node to replace |
| 253 | + getDirectories(nodeToReplace).forEach(FileUtils::deleteRecursive); |
| 254 | + |
| 255 | + String toReplaceAddress = nodeToReplace.config().broadcastAddress().getAddress().getHostAddress(); |
| 256 | + // set hibernate status for the node to replace on seed |
| 257 | + seed.runOnInstance(putInHibernation(toReplaceAddress)); |
| 258 | + |
| 259 | + // we need to fake a new host id |
| 260 | + ((InstanceConfig) nodeToReplace.config()).setHostId(UUID.randomUUID()); |
| 261 | + // enable autoboostrap |
| 262 | + nodeToReplace.config().set("auto_bootstrap", true); |
| 263 | + |
| 264 | + // first replacement will fail as the node was announced as hibernated and no-one can contact it as startup |
| 265 | + assertThatExceptionOfType(IllegalStateException.class).isThrownBy(() -> { |
| 266 | + ClusterUtils.start(nodeToReplace, props -> { |
| 267 | + // set the replacement address |
| 268 | + props.setProperty("cassandra.replace_address", toReplaceAddress); |
| 269 | + }); |
| 270 | + }).withMessageContaining("Unable to contact any seeds"); |
| 271 | + |
| 272 | + // then |
| 273 | + // retrying replacement will succeed as the node announced itself as shutdown before killing itself |
| 274 | + ClusterUtils.start(nodeToReplace, props -> { |
| 275 | + // set the replacement address |
| 276 | + props.setProperty("cassandra.replace_address", toReplaceAddress); |
| 277 | + }); |
| 278 | + assertFalse("replaces node should be up", nodeToReplace.isShutdown()); |
| 279 | + |
| 280 | + // the data after replacement should be consistent |
| 281 | + awaitRingJoin(seed, nodeToReplace); |
| 282 | + awaitRingJoin(nodeToReplace, seed); |
| 283 | + |
| 284 | + validateRows(seed.coordinator(), expectedState); |
| 285 | + validateRows(nodeToReplace.coordinator(), expectedState); |
| 286 | + } |
| 287 | + } |
| 288 | + } |
| 289 | + |
| 290 | + private static IIsolatedExecutor.SerializableRunnable putInHibernation(String address) |
| 291 | + { |
| 292 | + return () -> { |
| 293 | + InetAddressAndPort endpoint; |
| 294 | + try |
| 295 | + { |
| 296 | + endpoint = InetAddressAndPort.getByName(address); |
| 297 | + } |
| 298 | + catch (UnknownHostException e) |
| 299 | + { |
| 300 | + throw new RuntimeException(e); |
| 301 | + } |
| 302 | + EndpointState epState = Gossiper.instance.getEndpointStateForEndpoint(endpoint); |
| 303 | + VersionedValue newStatus = StorageService.instance.valueFactory.hibernate(true); |
| 304 | + epState.addApplicationState(ApplicationState.STATUS, newStatus); |
| 305 | + epState.addApplicationState(ApplicationState.STATUS_WITH_PORT, newStatus); |
| 306 | + Gossiper.instance.handleMajorStateChange(endpoint, epState); |
| 307 | + }; |
| 308 | + } |
| 309 | + |
208 | 310 | static void setupCluster(Cluster cluster)
|
209 | 311 | {
|
210 | 312 | fixDistributedSchemas(cluster);
|
|
0 commit comments