Skip to content

Commit 37b6ade

Browse files
committed
Accord: Fix minDecidedId calculation
Also fix: - slowCoordinatorDelay calculation for locally truncated command throws ISE - Bad cast of Truncated during replay - Don't throw IllegalStateException in Invariants.expect if accord.testing == false - Replay of empty segment throws NPE Also improve: - Support tracing of recovery and home progress patch by Benedict; reviewed by Alex Petrov for CASSANDRA-20771
1 parent e06b8ea commit 37b6ade

File tree

6 files changed

+67
-55
lines changed

6 files changed

+67
-55
lines changed

src/java/org/apache/cassandra/journal/Journal.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1013,7 +1013,10 @@ public class StaticSegmentKeyIterator implements CloseableIterator<KeyRefs<K>>
10131013

10141014
public StaticSegmentKeyIterator(K min, K max)
10151015
{
1016-
this.segments = selectAndReference(s -> s.isStatic() && (min == null || keySupport.compare(s.index().lastId(), min) >= 0) && (max == null || keySupport.compare(s.index().firstId(), max) <= 0));
1016+
this.segments = selectAndReference(s -> s.isStatic()
1017+
&& s.asStatic().index().entryCount() > 0
1018+
&& (min == null || keySupport.compare(s.index().lastId(), min) >= 0)
1019+
&& (max == null || keySupport.compare(s.index().firstId(), max) <= 0));
10171020
List<Iterator<Head>> iterators = new ArrayList<>(segments.count());
10181021

10191022
for (Segment<K, V> segment : segments.allSorted(true))

src/java/org/apache/cassandra/service/accord/CommandsForRanges.java

Lines changed: 37 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -69,50 +69,46 @@ public class CommandsForRanges extends TreeMap<Timestamp, Summary> implements Co
6969
{
7070
static final IntervalComparators COMPARATORS = new IntervalComparators();
7171
static final IntervalKeyComparators KEY_COMPARATORS = new IntervalKeyComparators();
72-
static class TxnIdInterval implements Comparable<TxnIdInterval>
72+
static class TxnIdInterval extends TokenRange
7373
{
74-
final RoutingKey start, end;
7574
final TxnId txnId;
7675

7776
TxnIdInterval(RoutingKey start, RoutingKey end, TxnId txnId)
7877
{
79-
this.start = start;
80-
this.end = end;
78+
super((TokenKey) start, (TokenKey) end);
8179
this.txnId = txnId;
8280
}
8381

8482
TxnIdInterval(Range range, TxnId txnId)
8583
{
86-
this.start = range.start();
87-
this.end = range.end();
88-
this.txnId = txnId;
89-
}
90-
91-
@Override
92-
public int compareTo(TxnIdInterval that)
93-
{
94-
int c = this.start.compareTo(that.start);
95-
if (c == 0) c = this.end.compareTo(that.end);
96-
if (c == 0) c = this.txnId.compareTo(that.txnId);
97-
return c;
84+
this(range.start(), range.end(), txnId);
9885
}
9986
}
10087

10188
static class IntervalComparators implements IntervalBTree.IntervalComparators<TxnIdInterval>
10289
{
103-
@Override public Comparator<TxnIdInterval> totalOrder() { return TxnIdInterval::compareTo; }
104-
@Override public Comparator<TxnIdInterval> endWithEndSorter() { return (a, b) -> a.end.compareTo(b.end); }
90+
@Override
91+
public Comparator<TxnIdInterval> totalOrder()
92+
{
93+
return (a, b) -> {
94+
int c = a.start().compareTo(b.start());
95+
if (c == 0) c = a.end().compareTo(b.end());
96+
if (c == 0) c = a.txnId.compareTo(b.txnId);
97+
return c;
98+
};
99+
}
100+
@Override public Comparator<TxnIdInterval> endWithEndSorter() { return (a, b) -> a.end().compareTo(b.end()); }
105101

106-
@Override public SymmetricComparator<TxnIdInterval> startWithStartSeeker() { return (a, b) -> startWithStart(a.start.compareTo(b.start)); }
107-
@Override public SymmetricComparator<TxnIdInterval> startWithEndSeeker() { return (a, b) -> startWithEnd(a.start.compareTo(b.end)); }
108-
@Override public SymmetricComparator<TxnIdInterval> endWithStartSeeker() { return (a, b) -> endWithStart(a.end.compareTo(b.start)); }
102+
@Override public SymmetricComparator<TxnIdInterval> startWithStartSeeker() { return (a, b) -> startWithStart(a.start().compareTo(b.start())); }
103+
@Override public SymmetricComparator<TxnIdInterval> startWithEndSeeker() { return (a, b) -> startWithEnd(a.start().compareTo(b.end())); }
104+
@Override public SymmetricComparator<TxnIdInterval> endWithStartSeeker() { return (a, b) -> endWithStart(a.end().compareTo(b.start())); }
109105
}
110106

111107
static class IntervalKeyComparators implements IntervalBTree.WithIntervalComparators<RoutingKey, TxnIdInterval>
112108
{
113-
@Override public AsymmetricComparator<RoutingKey, TxnIdInterval> startWithStartSeeker() { return (a, b) -> keyStartWithStart(a.compareTo(b.start));}
114-
@Override public AsymmetricComparator<RoutingKey, TxnIdInterval> startWithEndSeeker() { return (a, b) -> keyStartWithEnd(a.compareTo(b.end)); }
115-
@Override public AsymmetricComparator<RoutingKey, TxnIdInterval> endWithStartSeeker() { return (a, b) -> keyEndWithStart(a.compareTo(b.start)); }
109+
@Override public AsymmetricComparator<RoutingKey, TxnIdInterval> startWithStartSeeker() { return (a, b) -> keyStartWithStart(a.compareTo(b.start()));}
110+
@Override public AsymmetricComparator<RoutingKey, TxnIdInterval> startWithEndSeeker() { return (a, b) -> keyStartWithEnd(a.compareTo(b.end())); }
111+
@Override public AsymmetricComparator<RoutingKey, TxnIdInterval> endWithStartSeeker() { return (a, b) -> keyEndWithStart(a.compareTo(b.start())); }
116112
}
117113

118114
public CommandsForRanges(Map<? extends Timestamp, ? extends Summary> m)
@@ -217,7 +213,7 @@ private Loader newLoader(@Nullable TxnId primaryTxnId, Unseekables<?> searchKeys
217213
MaxDecidedRX maxDecidedRX = null;
218214
if (primaryTxnId != null && primaryTxnId.is(Txn.Kind.ExclusiveSyncPoint) && findAsDep == null)
219215
maxDecidedRX = commandStore.unsafeGetMaxDecidedRX();
220-
return new Loader(this, searchKeysOrRanges, redundantBefore, testKind, minTxnId, maxTxnId, findAsDep, maxDecidedRX);
216+
return new Loader(this, primaryTxnId, searchKeysOrRanges, redundantBefore, testKind, minTxnId, maxTxnId, findAsDep, maxDecidedRX);
221217
}
222218

223219
private void updateTransitive(UnaryOperator<NavigableMap<TxnId, Ranges>> update)
@@ -267,14 +263,16 @@ public static class Loader extends Summary.Loader
267263
{
268264
private final Manager manager;
269265
private final MaxDecidedRX maxDecidedRX;
266+
private final TxnId primaryTxnId;
270267
private final TxnId minRelevantId;
271268

272-
public Loader(Manager manager, Unseekables<?> searchKeysOrRanges, RedundantBefore redundantBefore, Kinds testKinds, TxnId minTxnId, Timestamp maxTxnId, @Nullable TxnId findAsDep, MaxDecidedRX maxDecidedRX)
269+
public Loader(Manager manager, TxnId primaryTxnId, Unseekables<?> searchKeysOrRanges, RedundantBefore redundantBefore, Kinds testKinds, TxnId minTxnId, Timestamp maxTxnId, @Nullable TxnId findAsDep, MaxDecidedRX maxDecidedRX)
273270
{
274-
super(null, searchKeysOrRanges, redundantBefore, testKinds, minTxnId, maxTxnId, findAsDep);
271+
super(primaryTxnId, searchKeysOrRanges, redundantBefore, testKinds, minTxnId, maxTxnId, findAsDep);
275272
this.manager = manager;
276273
this.maxDecidedRX = maxDecidedRX;
277-
this.minRelevantId = maxDecidedRX == null ? null : TxnId.nonNullOrMax(TxnId.NONE, maxDecidedRX.foldl(searchKeysOrRanges, TxnId::nonNullOrMin, null));
274+
this.primaryTxnId = primaryTxnId;
275+
this.minRelevantId = MaxDecidedRX.minDecidedDependencyId(maxDecidedRX, searchKeysOrRanges, primaryTxnId);
278276
}
279277

280278
public void intersects(Consumer<TxnId> forEach)
@@ -306,14 +304,22 @@ boolean isRelevant(TxnIdInterval txnIdInterval)
306304
{
307305
if (maxDecidedRX == null)
308306
return true;
309-
if (txnIdInterval.txnId.compareTo(minRelevantId) < 0)
307+
308+
if (!isMaybeRelevant(txnIdInterval.txnId))
310309
return false;
311-
return maxDecidedRX.foldl(txnIdInterval.start, txnIdInterval.end, (decided, anyUndecided, test, ignore) -> test.compareTo(decided) >= 0, false, txnIdInterval.txnId, null);
310+
311+
TxnId minRelevantId = MaxDecidedRX.minDecidedDependencyId(maxDecidedRX, Ranges.of(txnIdInterval), primaryTxnId);
312+
return isRelevant(minRelevantId, primaryTxnId);
313+
}
314+
315+
private boolean isRelevant(@Nullable TxnId minRelevantId, TxnId txnId)
316+
{
317+
return minRelevantId == null || minRelevantId.compareTo(txnId) <= 0;
312318
}
313319

314320
boolean isMaybeRelevant(TxnId txnId)
315321
{
316-
return maxDecidedRX == null || txnId.compareTo(minRelevantId) >= 0;
322+
return isRelevant(minRelevantId, txnId);
317323
}
318324

319325
public void forEachInCache(Unseekables<?> keysOrRanges, Consumer<Summary> forEach, AccordCommandStore.Caches caches)

src/java/org/apache/cassandra/service/accord/TokenRange.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public class TokenRange extends Range.EndInclusive
4141
public static final long EMPTY_SIZE = ObjectSizes.measure(new TokenRange(TokenKey.min(TableId.fromLong(0), Murmur3Partitioner.instance), TokenKey.max(TableId.fromLong(0), Murmur3Partitioner.instance)));
4242

4343
// Don't make this public use create or createUnsafe
44-
private TokenRange(TokenKey start, TokenKey end)
44+
protected TokenRange(TokenKey start, TokenKey end)
4545
{
4646
super(start, end);
4747
}
@@ -59,19 +59,19 @@ public static TokenRange createUnsafe(TokenKey start, TokenKey end)
5959
return new TokenRange(start, end);
6060
}
6161

62-
public TableId table()
62+
public final TableId table()
6363
{
6464
return start().table();
6565
}
6666

6767
@Override
68-
public TokenKey start()
68+
public final TokenKey start()
6969
{
7070
return (TokenKey) super.start();
7171
}
7272

7373
@Override
74-
public TokenKey end()
74+
public final TokenKey end()
7575
{
7676
return (TokenKey) super.end();
7777
}

src/java/org/apache/cassandra/service/accord/api/AccordAgent.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,10 @@ public long slowCoordinatorDelay(Node node, SafeCommandStore safeStore, TxnId tx
245245

246246
// TODO (expected): make this a configurable calculation on normal request latencies (like ContentionStrategy)
247247
long oneSecond = SECONDS.toMicros(1L);
248-
long mostRecentStart = Math.max(command.txnId().hlc(), command.promised().hlc());
248+
long promisedHlc = command.promised().hlc();
249+
if (promisedHlc == Long.MAX_VALUE)
250+
promisedHlc = 0;
251+
long mostRecentStart = Math.max(command.txnId().hlc(), promisedHlc);
249252
long waitMicros = recover(txnId).computeWait(retryCount, MICROSECONDS);
250253
long nowMicros = MILLISECONDS.toMicros(Clock.Global.currentTimeMillis());
251254
Invariants.expect(mostRecentStart <= nowMicros + SECONDS.toMicros(1L), "max(%s,%s)>%d", command.txnId(), command.promised(), nowMicros);

test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -150,31 +150,31 @@ public void tracing()
150150
TxnId id = accord.node().nextTxnId(Txn.Kind.Write, Routable.Domain.Key);
151151
Txn txn = createTxn(wrapInTxn(String.format("INSERT INTO %s.%s(k, c, v) VALUES (?, ?, ?)", KEYSPACE, tableName)), 0, 0, 0);
152152

153-
execute(SET_TRACE, 1, id.toString(), "PROGRESS");
154-
assertRows(execute(QUERY_TRACE, id.toString(), "PROGRESS"), row(id.toString(), "PROGRESS", 1));
155-
execute(SET_TRACE, 0, id.toString(), "PROGRESS");
156-
assertRows(execute(QUERY_TRACE, id.toString(), "PROGRESS"));
157-
execute(SET_TRACE, 1, id.toString(), "PROGRESS");
158-
assertRows(execute(QUERY_TRACE, id.toString(), "PROGRESS"), row(id.toString(), "PROGRESS", 1));
153+
execute(SET_TRACE, 1, id.toString(), "WAIT_PROGRESS");
154+
assertRows(execute(QUERY_TRACE, id.toString(), "WAIT_PROGRESS"), row(id.toString(), "WAIT_PROGRESS", 1));
155+
execute(SET_TRACE, 0, id.toString(), "WAIT_PROGRESS");
156+
assertRows(execute(QUERY_TRACE, id.toString(), "WAIT_PROGRESS"));
157+
execute(SET_TRACE, 1, id.toString(), "WAIT_PROGRESS");
158+
assertRows(execute(QUERY_TRACE, id.toString(), "WAIT_PROGRESS"), row(id.toString(), "WAIT_PROGRESS", 1));
159159
execute(UNSET_TRACE1, id.toString());
160-
assertRows(execute(QUERY_TRACE, id.toString(), "PROGRESS"));
161-
execute(SET_TRACE, 1, id.toString(), "PROGRESS");
162-
assertRows(execute(QUERY_TRACE, id.toString(), "PROGRESS"), row(id.toString(), "PROGRESS", 1));
163-
execute(UNSET_TRACE2, id.toString(), "PROGRESS");
164-
assertRows(execute(QUERY_TRACE, id.toString(), "PROGRESS"));
165-
execute(SET_TRACE, 1, id.toString(), "PROGRESS");
166-
assertRows(execute(QUERY_TRACE, id.toString(), "PROGRESS"), row(id.toString(), "PROGRESS", 1));
160+
assertRows(execute(QUERY_TRACE, id.toString(), "WAIT_PROGRESS"));
161+
execute(SET_TRACE, 1, id.toString(), "WAIT_PROGRESS");
162+
assertRows(execute(QUERY_TRACE, id.toString(), "WAIT_PROGRESS"), row(id.toString(), "WAIT_PROGRESS", 1));
163+
execute(UNSET_TRACE2, id.toString(), "WAIT_PROGRESS");
164+
assertRows(execute(QUERY_TRACE, id.toString(), "WAIT_PROGRESS"));
165+
execute(SET_TRACE, 1, id.toString(), "WAIT_PROGRESS");
166+
assertRows(execute(QUERY_TRACE, id.toString(), "WAIT_PROGRESS"), row(id.toString(), "WAIT_PROGRESS", 1));
167167
accord.node().coordinate(id, txn);
168168
filter.preAccept.awaitThrowUncheckedOnInterrupt();
169169

170170
filter.apply.awaitThrowUncheckedOnInterrupt();
171-
spinUntilSuccess(() -> Assertions.assertThat(execute(QUERY_TRACES, id.toString(), "PROGRESS").size()).isGreaterThan(0));
171+
spinUntilSuccess(() -> Assertions.assertThat(execute(QUERY_TRACES, id.toString(), "WAIT_PROGRESS").size()).isGreaterThan(0));
172172
execute(ERASE_TRACES1, id.toString(), "FETCH", Long.MAX_VALUE);
173173
execute(ERASE_TRACES2, id.toString(), "FETCH");
174-
execute(ERASE_TRACES1, id.toString(), "PROGRESS", Long.MAX_VALUE);
175-
Assertions.assertThat(execute(QUERY_TRACES, id.toString(), "PROGRESS").size()).isEqualTo(0);
174+
execute(ERASE_TRACES1, id.toString(), "WAIT_PROGRESS", Long.MAX_VALUE);
175+
Assertions.assertThat(execute(QUERY_TRACES, id.toString(), "WAIT_PROGRESS").size()).isEqualTo(0);
176176
// just check other variants don't fail
177-
execute(ERASE_TRACES2, id.toString(), "PROGRESS");
177+
execute(ERASE_TRACES2, id.toString(), "WAIT_PROGRESS");
178178
execute(ERASE_TRACES3, id.toString());
179179
}
180180
finally

0 commit comments

Comments
 (0)