Skip to content

Commit c736d22

Browse files
committed
Gossip doesn't converge due to race condition when updating EndpointStates multiple fields
patch by David Capwell, Matt Byrd; reviewed by Blake Eggleston, Brandon Williams for CASSANDRA-20659
1 parent 422b8a6 commit c736d22

File tree

6 files changed

+109
-55
lines changed

6 files changed

+109
-55
lines changed

CHANGES.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
4.0.18
2+
* Gossip doesn't converge due to race condition when updating EndpointStates multiple fields (CASSANDRA-20659)
23
* Handle sstable metadata stats file getting a new mtime after compaction has finished (CASSANDRA-18119)
34
* Honor MAX_PARALLEL_TRANSFERS correctly (CASSANDRA-20532)
45
* Updating a column with a new TTL but same expiration time is non-deterministic and causes repair mismatches. (CASSANDRA-20561)

src/java/org/apache/cassandra/gms/EndpointState.java

Lines changed: 85 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@
2424

2525
import javax.annotation.Nullable;
2626

27+
import com.google.common.annotations.VisibleForTesting;
28+
29+
import com.google.common.base.Function;
2730
import org.slf4j.Logger;
2831
import org.slf4j.LoggerFactory;
2932

@@ -37,63 +40,105 @@
3740
* This abstraction represents both the HeartBeatState and the ApplicationState in an EndpointState
3841
* instance. Any state for a given endpoint can be retrieved from this instance.
3942
*/
40-
41-
4243
public class EndpointState
4344
{
4445
protected static final Logger logger = LoggerFactory.getLogger(EndpointState.class);
4546

4647
public final static IVersionedSerializer<EndpointState> serializer = new EndpointStateSerializer();
4748

48-
private volatile HeartBeatState hbState;
49-
private final AtomicReference<Map<ApplicationState, VersionedValue>> applicationState;
49+
private static class View
50+
{
51+
final HeartBeatState hbState;
52+
final Map<ApplicationState, VersionedValue> applicationState;
53+
54+
private View(HeartBeatState hbState, Map<ApplicationState, VersionedValue> applicationState)
55+
{
56+
this.hbState = hbState;
57+
this.applicationState = applicationState;
58+
}
59+
}
60+
61+
private final AtomicReference<View> ref;
5062

5163
/* fields below do not get serialized */
5264
private volatile long updateTimestamp;
5365
private volatile boolean isAlive;
5466

5567
public EndpointState(HeartBeatState initialHbState)
5668
{
57-
this(initialHbState, new EnumMap<ApplicationState, VersionedValue>(ApplicationState.class));
69+
this(initialHbState, new EnumMap<>(ApplicationState.class));
5870
}
5971

6072
public EndpointState(EndpointState other)
6173
{
62-
this(new HeartBeatState(other.hbState), new EnumMap<>(other.applicationState.get()));
74+
ref = new AtomicReference<>(other.ref.get());
75+
updateTimestamp = System.nanoTime();
76+
isAlive = true;
6377
}
6478

65-
EndpointState(HeartBeatState initialHbState, Map<ApplicationState, VersionedValue> states)
79+
@VisibleForTesting
80+
public EndpointState(HeartBeatState initialHbState, Map<ApplicationState, VersionedValue> states)
6681
{
67-
hbState = initialHbState;
68-
applicationState = new AtomicReference<Map<ApplicationState, VersionedValue>>(new EnumMap<>(states));
82+
ref = new AtomicReference<>(new View(initialHbState, new EnumMap<>(states)));
6983
updateTimestamp = System.nanoTime();
7084
isAlive = true;
7185
}
7286

73-
HeartBeatState getHeartBeatState()
87+
@VisibleForTesting
88+
public HeartBeatState getHeartBeatState()
89+
{
90+
return ref.get().hbState;
91+
}
92+
93+
public void updateHeartBeat()
94+
{
95+
updateHeartBeat(HeartBeatState::updateHeartBeat);
96+
}
97+
98+
public void forceNewerGenerationUnsafe()
99+
{
100+
updateHeartBeat(HeartBeatState::forceNewerGenerationUnsafe);
101+
}
102+
103+
@VisibleForTesting
104+
public void forceHighestPossibleVersionUnsafe()
74105
{
75-
return hbState;
106+
updateHeartBeat(HeartBeatState::forceHighestPossibleVersionUnsafe);
76107
}
77108

78-
void setHeartBeatState(HeartBeatState newHbState)
109+
void unsafeSetEmptyHeartBeatState()
79110
{
80-
updateTimestamp();
81-
hbState = newHbState;
111+
updateHeartBeat(ignore -> HeartBeatState.empty());
112+
}
113+
114+
private void updateHeartBeat(Function<HeartBeatState, HeartBeatState> fn)
115+
{
116+
HeartBeatState previous = null;
117+
HeartBeatState update = null;
118+
while (true)
119+
{
120+
View view = ref.get();
121+
if (previous == null || view.hbState != previous) // if this races with updating states then can avoid bumping versions
122+
update = fn.apply(view.hbState);
123+
if (ref.compareAndSet(view, new View(update, view.applicationState)))
124+
return;
125+
previous = view.hbState;
126+
}
82127
}
83128

84129
public VersionedValue getApplicationState(ApplicationState key)
85130
{
86-
return applicationState.get().get(key);
131+
return ref.get().applicationState.get(key);
87132
}
88133

89134
public boolean containsApplicationState(ApplicationState key)
90135
{
91-
return applicationState.get().containsKey(key);
136+
return ref.get().applicationState.containsKey(key);
92137
}
93138

94139
public Set<Map.Entry<ApplicationState, VersionedValue>> states()
95140
{
96-
return applicationState.get().entrySet();
141+
return ref.get().applicationState.entrySet();
97142
}
98143

99144
public void addApplicationState(ApplicationState key, VersionedValue value)
@@ -107,36 +152,47 @@ public void addApplicationStates(Map<ApplicationState, VersionedValue> values)
107152
}
108153

109154
public void addApplicationStates(Set<Map.Entry<ApplicationState, VersionedValue>> values)
155+
{
156+
addApplicationStates(values, null);
157+
}
158+
159+
public void addApplicationStates(Set<Map.Entry<ApplicationState, VersionedValue>> values, @Nullable HeartBeatState hbState)
110160
{
111161
while (true)
112162
{
113-
Map<ApplicationState, VersionedValue> orig = applicationState.get();
163+
View view = this.ref.get();
164+
Map<ApplicationState, VersionedValue> orig = view.applicationState;
114165
Map<ApplicationState, VersionedValue> copy = new EnumMap<>(orig);
115166

116167
for (Map.Entry<ApplicationState, VersionedValue> value : values)
117168
copy.put(value.getKey(), value.getValue());
118169

119-
if (applicationState.compareAndSet(orig, copy))
170+
if (this.ref.compareAndSet(view, new View(hbState == null ? view.hbState : hbState, copy)))
171+
{
172+
if (hbState != null)
173+
updateTimestamp();
120174
return;
175+
}
121176
}
122177
}
123178

124179
void removeMajorVersion3LegacyApplicationStates()
125180
{
126181
while (hasLegacyFields())
127182
{
128-
Map<ApplicationState, VersionedValue> orig = applicationState.get();
183+
View view = ref.get();
184+
Map<ApplicationState, VersionedValue> orig = view.applicationState;
129185
Map<ApplicationState, VersionedValue> updatedStates = filterMajorVersion3LegacyApplicationStates(orig);
130186
// avoid updating if no state is removed
131187
if (orig.size() == updatedStates.size()
132-
|| applicationState.compareAndSet(orig, updatedStates))
188+
|| ref.compareAndSet(view, new View(view.hbState, updatedStates)))
133189
return;
134190
}
135191
}
136192

137193
private boolean hasLegacyFields()
138194
{
139-
Set<ApplicationState> statesPresent = applicationState.get().keySet();
195+
Set<ApplicationState> statesPresent = ref.get().applicationState.keySet();
140196
if (statesPresent.isEmpty())
141197
return false;
142198
return (statesPresent.contains(ApplicationState.STATUS) && statesPresent.contains(ApplicationState.STATUS_WITH_PORT))
@@ -193,16 +249,18 @@ void markDead()
193249

194250
public boolean isStateEmpty()
195251
{
196-
return applicationState.get().isEmpty();
252+
return ref.get().applicationState.isEmpty();
197253
}
198254

199255
/**
200256
* @return true if {@link HeartBeatState#isEmpty()} is true and no STATUS application state exists
201257
*/
202258
public boolean isEmptyWithoutStatus()
203259
{
204-
Map<ApplicationState, VersionedValue> state = applicationState.get();
205-
return hbState.isEmpty() && !(state.containsKey(ApplicationState.STATUS_WITH_PORT) || state.containsKey(ApplicationState.STATUS));
260+
View view = ref.get();
261+
Map<ApplicationState, VersionedValue> state = view.applicationState;
262+
boolean hasStatus = state.containsKey(ApplicationState.STATUS_WITH_PORT) || state.containsKey(ApplicationState.STATUS);
263+
return view.hbState.isEmpty() && !hasStatus;
206264
}
207265

208266
public boolean isRpcReady()
@@ -253,7 +311,8 @@ public CassandraVersion getReleaseVersion()
253311

254312
public String toString()
255313
{
256-
return "EndpointState: HeartBeatState = " + hbState + ", AppStateMap = " + applicationState.get();
314+
View view = ref.get();
315+
return "EndpointState: HeartBeatState = " + view.hbState + ", AppStateMap = " + view.applicationState;
257316
}
258317
}
259318

src/java/org/apache/cassandra/gms/Gossiper.java

Lines changed: 11 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -310,7 +310,7 @@ public void run()
310310
taskLock.lock();
311311

312312
/* Update the local heartbeat counter. */
313-
endpointStateMap.get(FBUtilities.getBroadcastAddressAndPort()).getHeartBeatState().updateHeartBeat();
313+
endpointStateMap.get(FBUtilities.getBroadcastAddressAndPort()).updateHeartBeat();
314314
if (logger.isTraceEnabled())
315315
logger.trace("My heartbeat is now {}", endpointStateMap.get(FBUtilities.getBroadcastAddressAndPort()).getHeartBeatState().getHeartBeatVersion());
316316
final List<GossipDigest> gDigests = new ArrayList<>();
@@ -598,7 +598,7 @@ protected void markAsShutdown(InetAddressAndPort endpoint)
598598
epState.addApplicationState(ApplicationState.STATUS_WITH_PORT, shutdown);
599599
epState.addApplicationState(ApplicationState.STATUS, StorageService.instance.valueFactory.shutdown(true));
600600
epState.addApplicationState(ApplicationState.RPC_READY, StorageService.instance.valueFactory.rpcReady(false));
601-
epState.getHeartBeatState().forceHighestPossibleVersionUnsafe();
601+
epState.forceHighestPossibleVersionUnsafe();
602602
markDead(endpoint, epState);
603603
FailureDetector.instance.forceConviction(endpoint);
604604
GossiperDiagnostics.markedAsShutdown(this, endpoint);
@@ -778,7 +778,7 @@ public void advertiseRemoving(InetAddressAndPort endpoint, UUID hostId, UUID loc
778778
// update the other node's generation to mimic it as if it had changed it itself
779779
logger.info("Advertising removal for {}", endpoint);
780780
epState.updateTimestamp(); // make sure we don't evict it too soon
781-
epState.getHeartBeatState().forceNewerGenerationUnsafe();
781+
epState.forceNewerGenerationUnsafe();
782782
Map<ApplicationState, VersionedValue> states = new EnumMap<>(ApplicationState.class);
783783
states.put(ApplicationState.STATUS_WITH_PORT, StorageService.instance.valueFactory.removingNonlocal(hostId));
784784
states.put(ApplicationState.STATUS, StorageService.instance.valueFactory.removingNonlocal(hostId));
@@ -798,7 +798,7 @@ public void advertiseTokenRemoved(InetAddressAndPort endpoint, UUID hostId)
798798
{
799799
EndpointState epState = endpointStateMap.get(endpoint);
800800
epState.updateTimestamp(); // make sure we don't evict it too soon
801-
epState.getHeartBeatState().forceNewerGenerationUnsafe();
801+
epState.forceNewerGenerationUnsafe();
802802
long expireTime = computeExpireTime();
803803
epState.addApplicationState(ApplicationState.STATUS_WITH_PORT, StorageService.instance.valueFactory.removedNonlocal(hostId, expireTime));
804804
epState.addApplicationState(ApplicationState.STATUS, StorageService.instance.valueFactory.removedNonlocal(hostId, expireTime));
@@ -849,7 +849,7 @@ else if (newState.getHeartBeatState().getGeneration() != generation)
849849
else if (newState.getHeartBeatState().getHeartBeatVersion() != heartbeat)
850850
throw new RuntimeException("Endpoint still alive: " + endpoint + " heartbeat changed while trying to assassinate it");
851851
epState.updateTimestamp(); // make sure we don't evict it too soon
852-
epState.getHeartBeatState().forceNewerGenerationUnsafe();
852+
epState.forceNewerGenerationUnsafe();
853853
}
854854

855855
Collection<Token> tokens = null;
@@ -1580,15 +1580,7 @@ private void applyNewStates(InetAddressAndPort addr, EndpointState localState, E
15801580
// don't assert here, since if the node restarts the version will go back to zero
15811581
int oldVersion = localState.getHeartBeatState().getHeartBeatVersion();
15821582

1583-
localState.setHeartBeatState(remoteState.getHeartBeatState());
1584-
if (logger.isTraceEnabled())
1585-
logger.trace("Updating heartbeat state version to {} from {} for {} ...", localState.getHeartBeatState().getHeartBeatVersion(), oldVersion, addr);
1586-
1587-
Set<Entry<ApplicationState, VersionedValue>> remoteStates = remoteState.states();
1588-
assert remoteState.getHeartBeatState().getGeneration() == localState.getHeartBeatState().getGeneration();
1589-
1590-
1591-
Set<Entry<ApplicationState, VersionedValue>> updatedStates = remoteStates.stream().filter(entry -> {
1583+
Set<Entry<ApplicationState, VersionedValue>> updatedStates = remoteState.states().stream().filter(entry -> {
15921584
// filter out the states that are already up to date (has the same or higher version)
15931585
VersionedValue local = localState.getApplicationState(entry.getKey());
15941586
return (local == null || local.version < entry.getValue().version);
@@ -1601,7 +1593,9 @@ private void applyNewStates(InetAddressAndPort addr, EndpointState localState, E
16011593
logger.trace("Updating {} state version to {} for {}", entry.getKey().toString(), entry.getValue().version, addr);
16021594
}
16031595
}
1604-
localState.addApplicationStates(updatedStates);
1596+
localState.addApplicationStates(updatedStates, remoteState.getHeartBeatState());
1597+
if (logger.isTraceEnabled())
1598+
logger.trace("Updating heartbeat state version to {} from {} for {} ...", localState.getHeartBeatState().getHeartBeatVersion(), oldVersion, addr);
16051599

16061600
// get rid of legacy fields once the cluster is not in mixed mode
16071601
if (!hasMajorVersion3OrUnknownNodes())
@@ -1983,7 +1977,7 @@ public void maybeInitializeLocalState(int generationNbr)
19831977
public void forceNewerGeneration()
19841978
{
19851979
EndpointState epstate = endpointStateMap.get(FBUtilities.getBroadcastAddressAndPort());
1986-
epstate.getHeartBeatState().forceNewerGenerationUnsafe();
1980+
epstate.forceNewerGenerationUnsafe();
19871981
}
19881982

19891983

@@ -2004,7 +1998,7 @@ public void addSavedEndpoint(InetAddressAndPort ep)
20041998
if (epState != null)
20051999
{
20062000
logger.debug("not replacing a previous epState for {}, but reusing it: {}", ep, epState);
2007-
epState.setHeartBeatState(HeartBeatState.empty());
2001+
epState.unsafeSetEmptyHeartBeatState();
20082002
}
20092003
else
20102004
{

src/java/org/apache/cassandra/gms/HeartBeatState.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@ public class HeartBeatState
3333

3434
public static final IVersionedSerializer<HeartBeatState> serializer = new HeartBeatStateSerializer();
3535

36-
private volatile int generation;
37-
private volatile int version;
36+
private final int generation;
37+
private final int version;
3838

3939
HeartBeatState(int gen)
4040
{
@@ -67,29 +67,29 @@ public boolean isEmpty()
6767
return version == EMPTY_VERSION;
6868
}
6969

70-
int getGeneration()
70+
public int getGeneration()
7171
{
7272
return generation;
7373
}
7474

75-
void updateHeartBeat()
75+
HeartBeatState updateHeartBeat()
7676
{
77-
version = VersionGenerator.getNextVersion();
77+
return new HeartBeatState(generation, VersionGenerator.getNextVersion());
7878
}
7979

80-
int getHeartBeatVersion()
80+
public int getHeartBeatVersion()
8181
{
8282
return version;
8383
}
8484

85-
void forceNewerGenerationUnsafe()
85+
HeartBeatState forceNewerGenerationUnsafe()
8686
{
87-
generation += 1;
87+
return new HeartBeatState(generation + 1, version);
8888
}
8989

90-
void forceHighestPossibleVersionUnsafe()
90+
HeartBeatState forceHighestPossibleVersionUnsafe()
9191
{
92-
version = Integer.MAX_VALUE;
92+
return new HeartBeatState(generation, Integer.MAX_VALUE);
9393
}
9494

9595
public String toString()

test/unit/org/apache/cassandra/gms/GossiperTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,7 @@ public void testDuplicatedStateUpdate() throws Exception
304304
proposedRemoteState = new EndpointState(proposedRemoteHeartBeat);
305305

306306
// Bump the heartbeat version and use the same TOKENS state
307-
proposedRemoteHeartBeat.updateHeartBeat();
307+
proposedRemoteState.updateHeartBeat();
308308
proposedRemoteState.addApplicationState(ApplicationState.TOKENS, tokensValue);
309309

310310
// The following state change should only update heartbeat without updating the TOKENS state

test/unit/org/apache/cassandra/gms/SerializationsTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ private static class Statics
130130
private static List<GossipDigest> Digests = new ArrayList<GossipDigest>();
131131

132132
{
133-
HeartbeatSt.updateHeartBeat();
133+
EndpointSt.updateHeartBeat();
134134
EndpointSt.addApplicationState(ApplicationState.LOAD, vv0);
135135
EndpointSt.addApplicationState(ApplicationState.STATUS_WITH_PORT, vv1);
136136
for (int i = 0; i < 100; i++)

0 commit comments

Comments
 (0)