Skip to content

Commit 209288c

Browse files
committed
Reject PrepareJoin if tokens already assigned
Patch by Sam Tunnicliffe; reviewed by Marcus Eriksson for CASSANDRA-21006
1 parent 496959a commit 209288c

File tree

8 files changed

+173
-13
lines changed

8 files changed

+173
-13
lines changed

CHANGES.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
5.1
2+
* Reject PrepareJoin if tokens are already assigned (CASSANDRA-21006)
23
* Don't update registration status if node state for decommissioned peer is found with the same address (CASSANDRA-21005)
34
* Avoid NPE when meta keyspace placements are empty before CMS is initialized (CASSANDRA-21004)
45
* Gossip entries for hibernating non-members don't block truncate (CASSANDRA-21003)

src/java/org/apache/cassandra/tcm/ClusterMetadata.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -480,9 +480,10 @@ public Transformer withVersion(NodeId nodeId, NodeVersion version)
480480
return this;
481481
}
482482

483-
public Transformer register(NodeId nodeId, NodeAddresses addresses, Location location, NodeVersion version)
483+
@VisibleForTesting
484+
public Transformer unsafeRegisterForTesting(NodeId nodeId, NodeAddresses addresses, Location location, NodeVersion version)
484485
{
485-
directory = directory.with(nodeId, addresses, location, version);
486+
directory = directory.unsafeWithNodeForTesting(nodeId, addresses, location, version);
486487
return this;
487488
}
488489

src/java/org/apache/cassandra/tcm/membership/Directory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,8 @@ public Directory with(NodeAddresses addresses, Location location)
178178
return with(addresses, location, CURRENT);
179179
}
180180

181-
public Directory with(NodeId id, NodeAddresses addresses, Location location, NodeVersion nodeVersion)
181+
@VisibleForTesting
182+
public Directory unsafeWithNodeForTesting(NodeId id, NodeAddresses addresses, Location location, NodeVersion nodeVersion)
182183
{
183184
return with(addresses, id, id.toUUID(), location, nodeVersion);
184185
}

src/java/org/apache/cassandra/tcm/transformations/PrepareJoin.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.io.IOException;
2222
import java.util.HashSet;
2323
import java.util.Set;
24+
import java.util.stream.Collectors;
2425

2526
import com.google.common.collect.ImmutableSet;
2627

@@ -30,6 +31,7 @@
3031
import org.apache.cassandra.dht.Token;
3132
import org.apache.cassandra.io.util.DataInputPlus;
3233
import org.apache.cassandra.io.util.DataOutputPlus;
34+
import org.apache.cassandra.locator.InetAddressAndPort;
3335
import org.apache.cassandra.tcm.ClusterMetadata;
3436
import org.apache.cassandra.tcm.ClusterMetadataService;
3537
import org.apache.cassandra.tcm.Transformation;
@@ -132,6 +134,17 @@ public Result execute(ClusterMetadata prev)
132134
if (!ALLOWED_STATES.contains(prev.directory.peerState(nodeId)))
133135
return new Rejected(INVALID, String.format("Rejecting this plan as the node %s is in state %s",
134136
nodeId, prev.directory.peerState(nodeId)));
137+
Set<Token> alreadyAssigned = prev.tokenMap.tokens().stream().filter(tokens::contains).collect(Collectors.toSet());
138+
if (!alreadyAssigned.isEmpty())
139+
{
140+
String assignedString = alreadyAssigned.stream()
141+
.map(t -> {
142+
NodeId n = prev.tokenMap.owner(t);
143+
InetAddressAndPort e = prev.directory.endpoint(n);
144+
return String.format("%s (node %s|%s)", t, n.id(), e);
145+
}).collect(Collectors.joining(","));
146+
return new Rejected(INVALID, String.format("Rejecting this plan as some tokens are already assigned: [%s]", assignedString));
147+
}
135148

136149
PlacementTransitionPlan transitionPlan = placementProvider.planForJoin(prev, nodeId, tokens, prev.schema.getKeyspaces());
137150

test/unit/org/apache/cassandra/service/accord/AccordTopologyUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ static void addNode(ClusterMetadata.Transformer transformer, int node, Token tok
8181
NodeId nodeId = nodeId(node);
8282
InetAddressAndPort ep = ep(node);
8383
NodeAddresses addresses = new NodeAddresses(nodeId.toUUID(), ep, ep, ep);
84-
transformer.register(nodeId, addresses, LOCATION, NodeVersion.CURRENT);
84+
transformer.unsafeRegisterForTesting(nodeId, addresses, LOCATION, NodeVersion.CURRENT);
8585
transformer.withNodeState(nodeId, NodeState.JOINED);
8686
transformer.proposeToken(nodeId, Collections.singleton(token));
8787
transformer.addToRackAndDC(nodeId);

test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -254,9 +254,9 @@ static void configureClass(ReadRepairStrategy repairStrategy) throws Throwable
254254
ClusterMetadataTestHelper.addEndpoint(replica1.endpoint(), new BytesToken(new byte[] { 0 }), dataCenter1, rack);
255255
ClusterMetadataTestHelper.addEndpoint(replica2.endpoint(), new BytesToken(new byte[] { 1 }), dataCenter1, rack);
256256
ClusterMetadataTestHelper.addEndpoint(replica3.endpoint(), new BytesToken(new byte[] { 2 }), dataCenter1, rack);
257-
ClusterMetadataTestHelper.addEndpoint(remoteReplica1.endpoint(), new BytesToken(new byte[] { 0 }), dataCenter2, rack);
258-
ClusterMetadataTestHelper.addEndpoint(remoteReplica2.endpoint(), new BytesToken(new byte[] { 1 }), dataCenter2, rack);
259-
ClusterMetadataTestHelper.addEndpoint(remoteReplica3.endpoint(), new BytesToken(new byte[] { 2 }), dataCenter2, rack);
257+
ClusterMetadataTestHelper.addEndpoint(remoteReplica1.endpoint(), new BytesToken(new byte[] { 3 }), dataCenter2, rack);
258+
ClusterMetadataTestHelper.addEndpoint(remoteReplica2.endpoint(), new BytesToken(new byte[] { 4 }), dataCenter2, rack);
259+
ClusterMetadataTestHelper.addEndpoint(remoteReplica3.endpoint(), new BytesToken(new byte[] { 5 }), dataCenter2, rack);
260260

261261
for (Replica replica : replicas)
262262
{
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.cassandra.tcm.transformations;
20+
21+
import java.util.Collection;
22+
import java.util.Collections;
23+
import java.util.HashSet;
24+
import java.util.Random;
25+
import java.util.Set;
26+
27+
import com.google.common.collect.Sets;
28+
import org.junit.Before;
29+
import org.junit.Test;
30+
31+
import org.slf4j.Logger;
32+
import org.slf4j.LoggerFactory;
33+
34+
import org.apache.cassandra.dht.IPartitioner;
35+
import org.apache.cassandra.dht.Murmur3Partitioner;
36+
import org.apache.cassandra.dht.Token;
37+
import org.apache.cassandra.distributed.test.log.ClusterMetadataTestHelper;
38+
import org.apache.cassandra.exceptions.ExceptionCode;
39+
import org.apache.cassandra.tcm.ClusterMetadata;
40+
import org.apache.cassandra.tcm.Transformation;
41+
import org.apache.cassandra.tcm.membership.Directory;
42+
import org.apache.cassandra.tcm.membership.Location;
43+
import org.apache.cassandra.tcm.membership.NodeId;
44+
import org.apache.cassandra.tcm.membership.NodeState;
45+
import org.apache.cassandra.tcm.membership.NodeVersion;
46+
import org.apache.cassandra.tcm.ownership.OwnershipUtils;
47+
48+
import static org.apache.cassandra.tcm.membership.MembershipUtils.nodeAddresses;
49+
import static org.junit.Assert.assertEquals;
50+
import static org.junit.Assert.assertTrue;
51+
52+
public class PrepareJoinTest
53+
{
54+
private static final Logger logger = LoggerFactory.getLogger(PrepareJoinTest.class);
55+
private Random random;
56+
private IPartitioner partitioner;
57+
private NodeId other;
58+
private NodeId joining;
59+
60+
@Before
61+
public void setup()
62+
{
63+
long seed = System.nanoTime();
64+
logger.info("Running test with seed {}", seed);
65+
random = new Random(seed);
66+
partitioner = Murmur3Partitioner.instance;
67+
other = new NodeId(1);
68+
joining = new NodeId(2);
69+
}
70+
71+
@Test
72+
public void singletonTokenAlreadyAssigned()
73+
{
74+
ClusterMetadata metadata = metadata();
75+
Collection<Token> assigned = metadata.tokenMap.tokens(other);
76+
Set<Token> toJoin = Collections.singleton(assigned.iterator().next());
77+
78+
PrepareJoin prepare = new PrepareJoin(joining, toJoin, PrepareLeaveTest.dummyPlacementProvider, true, true);
79+
Transformation.Result result = prepare.execute(metadata);
80+
assertTrue(result.isRejected());
81+
assertEquals(ExceptionCode.INVALID, result.rejected().code);
82+
assertTrue(result.rejected().reason.startsWith("Rejecting this plan as some tokens are already assigned"));
83+
}
84+
85+
@Test
86+
public void multipleTokensAlreadyAssigned()
87+
{
88+
ClusterMetadata metadata = metadata();
89+
Collection<Token> assigned = metadata.tokenMap.tokens(other);
90+
Set<Token> toJoin = new HashSet<>(assigned);
91+
92+
PrepareJoin prepare = new PrepareJoin(joining, toJoin, PrepareLeaveTest.dummyPlacementProvider, true, true);
93+
Transformation.Result result = prepare.execute(metadata);
94+
assertTrue(result.isRejected());
95+
assertEquals(ExceptionCode.INVALID, result.rejected().code);
96+
assertTrue(result.rejected().reason.startsWith("Rejecting this plan as some tokens are already assigned"));
97+
}
98+
99+
@Test
100+
public void noTokensAlreadyAssigned()
101+
{
102+
ClusterMetadata metadata = metadata();
103+
Collection<Token> assigned = metadata.tokenMap.tokens(other);
104+
Set<Token> toJoin = OwnershipUtils.randomTokens(16, partitioner, random);
105+
while (!Sets.intersection(new HashSet<>(assigned), toJoin).isEmpty())
106+
toJoin = OwnershipUtils.randomTokens(16, partitioner, random);
107+
108+
PrepareJoin prepare = new PrepareJoin(joining, toJoin, PrepareLeaveTest.dummyPlacementProvider, true, true);
109+
Transformation.Result result = prepare.execute(metadata);
110+
assertTrue(result.isSuccess());
111+
}
112+
113+
private ClusterMetadata metadata()
114+
{
115+
partitioner = Murmur3Partitioner.instance;
116+
other = new NodeId(1);
117+
joining = new NodeId(2);
118+
Location location = new Location("dc", "rack");
119+
Directory directory = new Directory().unsafeWithNodeForTesting(other, nodeAddresses(random), location, NodeVersion.CURRENT)
120+
.withNodeState(other, NodeState.JOINED)
121+
.unsafeWithNodeForTesting(joining, nodeAddresses(random), location, NodeVersion.CURRENT)
122+
.withNodeState(joining, NodeState.REGISTERED);
123+
Set<Token> ownedTokens = OwnershipUtils.randomTokens(16, partitioner, random);
124+
return ClusterMetadataTestHelper.minimalForTesting(partitioner)
125+
.transformer()
126+
.with(directory)
127+
.proposeToken(other, ownedTokens)
128+
.build().metadata;
129+
}
130+
}

test/unit/org/apache/cassandra/tcm/transformations/PrepareLeaveTest.java

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -151,27 +151,41 @@ private ClusterMetadata prepMetadata(Keyspaces kss, int countDc1, int countDc2)
151151
public static PlacementProvider dummyPlacementProvider = new PlacementProvider()
152152
{
153153
@Override
154-
public DataPlacements calculatePlacements(Epoch epoch, List<Range<Token>> ranges, ClusterMetadata metadata, Keyspaces keyspaces) { return null; }
154+
public DataPlacements calculatePlacements(Epoch epoch, List<Range<Token>> ranges, ClusterMetadata metadata, Keyspaces keyspaces)
155+
{
156+
return null;
157+
}
155158

156159
@Override
157-
public PlacementTransitionPlan planForJoin(ClusterMetadata metadata, NodeId nodeId, Set<Token> tokens, Keyspaces keyspaces) { return null;}
160+
public PlacementTransitionPlan planForJoin(ClusterMetadata metadata, NodeId nodeId, Set<Token> tokens, Keyspaces keyspaces)
161+
{
162+
return noop();
163+
}
158164

159165
@Override
160166
public PlacementTransitionPlan planForMove(ClusterMetadata metadata, NodeId nodeId, Set<Token> tokens, Keyspaces keyspaces)
161167
{
162-
return null;
168+
return noop();
163169
}
164170

165171
@Override
166172
public PlacementTransitionPlan planForDecommission(ClusterMetadata metadata, NodeId nodeId, Keyspaces keyspaces)
173+
{
174+
return noop();
175+
}
176+
177+
@Override
178+
public PlacementTransitionPlan planForReplacement(ClusterMetadata metadata, NodeId replaced, NodeId replacement, Keyspaces keyspaces)
179+
{
180+
return noop();
181+
}
182+
183+
private PlacementTransitionPlan noop()
167184
{
168185
return new PlacementTransitionPlan(PlacementDeltas.empty(),
169186
PlacementDeltas.empty(),
170187
PlacementDeltas.empty(),
171188
PlacementDeltas.empty());
172189
}
173-
174-
@Override
175-
public PlacementTransitionPlan planForReplacement(ClusterMetadata metadata, NodeId replaced, NodeId replacement, Keyspaces keyspaces) { return null; }
176190
};
177191
}

0 commit comments

Comments
 (0)