Skip to content

Commit f05b275

Browse files
beobalkrummas
andcommitted
Improve CMS initialization
* Better handling of DOWN unupgraded nodes * Test that aborted CMS initialization cleans up state properly * Clean up orphaned PreInitialize entries in the log on bounce * Unconditionally reset initiator during abort * Ensure that metadata log entries aren't exchanged before CMS initialization is complete * Precalculate common serialization version, excluding non-upgraded and LEFT nodes * Decide if metadata-impacting upgrade is in progress using min common version * Add metadata identifier to nodetool cms output Patch by Sam Tunnicliffe and Marcus Eriksson; reviewed by Sam Tunnicliffe and Marcus Eriksson for CASSANDRA-21036 Co-authored-by: Marcus Eriksson <[email protected]> Co-authored-by: Sam Tunnicliffe <[email protected]>
1 parent b11633b commit f05b275

21 files changed

+716
-47
lines changed

CHANGES.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
5.1
2+
* Improve CMS initialization (CASSANDRA-21036)
23
* Introducing comments and security labels for schema elements (CASSANDRA-20943)
34
* Extend nodetool tablestats for dictionary memory usage (CASSANDRA-20940)
45
* Introduce separate GCInspector thresholds for concurrent GC events (CASSANDRA-20980)

src/java/org/apache/cassandra/db/AbstractMutationVerbHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public void doVerb(Message<T> message) throws IOException
5656

5757
protected void processMessage(Message<T> message, InetAddressAndPort respondTo)
5858
{
59-
if (message.epoch().isAfter(Epoch.EMPTY))
59+
if (message.epoch().isAfter(Epoch.FIRST))
6060
{
6161
ClusterMetadata metadata = ClusterMetadata.current();
6262
metadata = checkTokenOwnership(metadata, message, respondTo);

src/java/org/apache/cassandra/gms/GossipVerbHandler.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,14 @@
2121
import org.apache.cassandra.net.IVerbHandler;
2222
import org.apache.cassandra.net.Message;
2323
import org.apache.cassandra.tcm.ClusterMetadataService;
24+
import org.apache.cassandra.tcm.Epoch;
2425

2526
public class GossipVerbHandler<T> implements IVerbHandler<T>
2627
{
2728
public void doVerb(Message<T> message)
2829
{
2930
Gossiper.instance.setLastProcessedMessageAt(message.creationTimeMillis());
30-
ClusterMetadataService.instance().fetchLogFromPeerAsync(message.from(), message.epoch());
31+
if (message.epoch().isAfter(Epoch.FIRST))
32+
ClusterMetadataService.instance().fetchLogFromPeerAsync(message.from(), message.epoch());
3133
}
3234
}

src/java/org/apache/cassandra/net/ResponseVerbHandler.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.cassandra.exceptions.RequestFailureReason;
2929
import org.apache.cassandra.tcm.ClusterMetadata;
3030
import org.apache.cassandra.tcm.ClusterMetadataService;
31+
import org.apache.cassandra.tcm.Epoch;
3132
import org.apache.cassandra.tracing.Tracing;
3233
import org.apache.cassandra.utils.FBUtilities;
3334

@@ -92,6 +93,9 @@ public void doVerb(Message message)
9293
private void maybeFetchLogs(Message<?> message)
9394
{
9495
ClusterMetadata metadata = ClusterMetadata.current();
96+
if (!message.epoch().isAfter(Epoch.FIRST))
97+
return;
98+
9599
if (!message.epoch().isAfter(metadata.epoch))
96100
return;
97101

src/java/org/apache/cassandra/tcm/CMSOperations.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ public class CMSOperations implements CMSOperationsMBean
6363
public static final String LOCAL_PENDING = "LOCAL_PENDING";
6464
public static final String COMMITS_PAUSED = "COMMITS_PAUSED";
6565
public static final String REPLICATION_FACTOR = "REPLICATION_FACTOR";
66+
public static final String CMS_ID = "CMS_ID";
6667

6768
private static final Logger logger = LoggerFactory.getLogger(ClusterMetadataService.class);
6869
public static CMSOperations instance = new CMSOperations(ClusterMetadataService.instance());
@@ -161,6 +162,7 @@ public Map<String, String> describeCMS()
161162
info.put(LOCAL_PENDING, Integer.toString(cms.log().pendingBufferSize()));
162163
info.put(COMMITS_PAUSED, Boolean.toString(cms.commitsPaused()));
163164
info.put(REPLICATION_FACTOR, ReplicationParams.meta(metadata).toString());
165+
info.put(CMS_ID, Integer.toString(metadata.metadataIdentifier));
164166
return info;
165167
}
166168

src/java/org/apache/cassandra/tcm/ClusterMetadata.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1004,7 +1004,7 @@ public NodeState myNodeState()
10041004

10051005
public boolean metadataSerializationUpgradeInProgress()
10061006
{
1007-
return !directory.clusterMaxVersion.serializationVersion().equals(directory.clusterMinVersion.serializationVersion());
1007+
return !directory.clusterMaxVersion.serializationVersion().equals(directory.commonSerializationVersion);
10081008
}
10091009

10101010
public static class Serializer implements MetadataSerializer<ClusterMetadata>

src/java/org/apache/cassandra/tcm/Startup.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,8 @@
5858
import org.apache.cassandra.tcm.log.SystemKeyspaceStorage;
5959
import org.apache.cassandra.tcm.membership.NodeId;
6060
import org.apache.cassandra.tcm.membership.NodeState;
61-
import org.apache.cassandra.tcm.migration.Election;
6261
import org.apache.cassandra.tcm.migration.CMSInitializationRequest;
62+
import org.apache.cassandra.tcm.migration.Election;
6363
import org.apache.cassandra.tcm.ownership.UniformRangePlacement;
6464
import org.apache.cassandra.tcm.sequences.InProgressSequences;
6565
import org.apache.cassandra.tcm.sequences.ReconfigureCMS;
@@ -77,7 +77,7 @@
7777
import static org.apache.cassandra.tcm.membership.NodeState.LEFT;
7878
import static org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort;
7979

80-
/**
80+
/**
8181
* Initialize
8282
*/
8383
public class Startup
@@ -141,7 +141,6 @@ public static void initializeAsFirstCMSNode()
141141
ClusterMetadataService.instance().log().bootstrap(addr, datacenter);
142142
ClusterMetadata metadata = ClusterMetadata.current();
143143
assert ClusterMetadataService.state() == LOCAL : String.format("Can't initialize as node hasn't transitioned to CMS state. State: %s.\n%s", ClusterMetadataService.state(), metadata);
144-
145144
Initialize initialize = new Initialize(metadata.initializeClusterIdentifier(addr.hashCode()));
146145
ClusterMetadataService.instance().commit(initialize);
147146
}

src/java/org/apache/cassandra/tcm/log/SystemKeyspaceStorage.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -87,12 +87,25 @@ public void append(Entry entry)
8787

8888
public synchronized static boolean hasAnyEpoch()
8989
{
90-
String query = String.format("SELECT epoch FROM %s.%s LIMIT 1", SchemaConstants.SYSTEM_KEYSPACE_NAME, NAME);
90+
String query = String.format("SELECT epoch, kind FROM %s.%s LIMIT 2", SchemaConstants.SYSTEM_KEYSPACE_NAME, NAME);
9191

92+
int count = 0;
93+
long preInitializeEpoch = -1;
9294
for (UntypedResultSet.Row row : executeInternal(query))
93-
return true;
95+
{
96+
count++;
97+
if (Transformation.Kind.fromId(row.getInt("kind")) == Transformation.Kind.PRE_INITIALIZE_CMS)
98+
preInitializeEpoch = row.getLong("epoch");
99+
}
100+
if (count == 1 && preInitializeEpoch != -1)
101+
{
102+
logger.warn("Cleaning up orphaned PreInitialize at epoch {} - restarting in gossip mode", preInitializeEpoch);
103+
String cleanupQuery = String.format("DELETE from %s.%s where epoch = %d", SchemaConstants.SYSTEM_KEYSPACE_NAME, NAME, preInitializeEpoch);
104+
executeInternal(cleanupQuery);
105+
return false;
106+
}
94107

95-
return false;
108+
return count > 0;
96109
}
97110

98111
@Override

src/java/org/apache/cassandra/tcm/membership/Directory.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ public class Directory implements MetadataValue<Directory>
7373
private final BTreeMap<String, Multimap<String, InetAddressAndPort>> racksByDC;
7474
public final NodeVersion clusterMinVersion;
7575
public final NodeVersion clusterMaxVersion;
76+
public final Version commonSerializationVersion;
7677

7778
public Directory()
7879
{
@@ -115,6 +116,7 @@ private Directory(int nextId,
115116
Pair<NodeVersion, NodeVersion> minMaxVer = minMaxVersions(states, versions);
116117
clusterMinVersion = minMaxVer.left;
117118
clusterMaxVersion = minMaxVer.right;
119+
commonSerializationVersion = minCommonSerializationVersion(states, versions);
118120
}
119121

120122
@Override
@@ -131,6 +133,9 @@ public String toString()
131133
", hostIds=" + hostIds +
132134
", endpointsByDC=" + endpointsByDC +
133135
", racksByDC=" + racksByDC +
136+
", clusterMinVersion=" + clusterMinVersion +
137+
", clusterMaxVersion=" + clusterMaxVersion +
138+
", commonSerializationVersion=" + commonSerializationVersion +
134139
'}';
135140
}
136141

@@ -778,6 +783,21 @@ private static Pair<NodeVersion, NodeVersion> minMaxVersions(BTreeMap<NodeId, No
778783
return Pair.create(minVersion, maxVersion);
779784
}
780785

786+
public static Version minCommonSerializationVersion(BTreeMap<NodeId, NodeState> states, BTreeMap<NodeId, NodeVersion> versions)
787+
{
788+
int commonVersion = Integer.MAX_VALUE;
789+
for (Map.Entry<NodeId, NodeState> entry : states.entrySet())
790+
{
791+
if (entry.getValue() != NodeState.LEFT)
792+
{
793+
NodeVersion ver = versions.get(entry.getKey());
794+
if (ver.serializationVersion > Version.OLD.asInt() && ver.serializationVersion < commonVersion)
795+
commonVersion = ver.serializationVersion;
796+
}
797+
}
798+
return commonVersion == Integer.MAX_VALUE ? NodeVersion.CURRENT_METADATA_VERSION : Version.fromInt(commonVersion);
799+
}
800+
781801
@Override
782802
public int hashCode()
783803
{

src/java/org/apache/cassandra/tcm/membership/NodeVersion.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.io.IOException;
2222
import java.util.Objects;
2323

24+
import com.google.common.annotations.VisibleForTesting;
2425
import org.apache.cassandra.io.util.DataInputPlus;
2526
import org.apache.cassandra.io.util.DataOutputPlus;
2627
import org.apache.cassandra.tcm.serialization.MetadataSerializer;
@@ -39,6 +40,8 @@ public class NodeVersion implements Comparable<NodeVersion>
3940
private static final CassandraVersion SINCE_VERSION = CassandraVersion.CASSANDRA_5_1;
4041

4142
public final CassandraVersion cassandraVersion;
43+
44+
// this must be kept as an int, otherwise we can't deserialize future NodeVersions
4245
public final int serializationVersion;
4346

4447
public NodeVersion(CassandraVersion cassandraVersion, Version serializationVersion)
@@ -113,10 +116,9 @@ public static class Serializer implements MetadataSerializer<NodeVersion>
113116
@Override
114117
public void serialize(NodeVersion t, DataOutputPlus out, Version version) throws IOException
115118
{
116-
out.writeUTF(t.cassandraVersion.toString());
117119
if (t.serializationVersion == Version.UNKNOWN.asInt())
118120
throw new IllegalStateException("Should not serialize UNKNOWN version");
119-
out.writeUnsignedVInt32(t.serializationVersion);
121+
serializeHelper(out, t.cassandraVersion.toString(), t.serializationVersion);
120122
}
121123

122124
@Override
@@ -133,5 +135,16 @@ public long serializedSize(NodeVersion t, Version version)
133135
return sizeof(t.cassandraVersion.toString()) +
134136
sizeofUnsignedVInt(t.serializationVersion);
135137
}
138+
/**
139+
* Used to be able to generate a NodeVersion with a future serialization version, for test!
140+
*/
141+
@VisibleForTesting
142+
static void serializeHelper(DataOutputPlus out, String cassandraVersion, int serializationVersion) throws IOException
143+
{
144+
out.writeUTF(cassandraVersion);
145+
out.writeUnsignedVInt32(serializationVersion);
146+
}
136147
}
148+
149+
137150
}

0 commit comments

Comments
 (0)