Skip to content

Commit b4f6c7c

Browse files
committed
Topology compaction omits image records if minEpoch is ahead of last image
Also Fix: - RegisteredCallback should remove itself from callback map when cancelled - Do not throw CancellationException when processing requests that have been aborted, as may be caused by a successful meaningful reply that can be overridden - system_accord_debug.{executors,coordinations} fail with ClassCastException - CommandStore.updateMinHlc eats up CPU as called much too often - AccordCache not notifying flushed on shutdown Also Improve: - Support skipping Deps - Violation information reported - Sort CommandStore shards by id patch by Benedict; reviewed by Aleksey Yeschenko for CASSANDRA-20896
1 parent 965a391 commit b4f6c7c

File tree

16 files changed

+241
-105
lines changed

16 files changed

+241
-105
lines changed

modules/accord

src/java/org/apache/cassandra/db/compaction/CompactionIterator.java

Lines changed: 28 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@
9595
import org.apache.cassandra.service.accord.JournalKey;
9696
import org.apache.cassandra.service.accord.api.TokenKey;
9797
import org.apache.cassandra.service.accord.journal.AccordTopologyUpdate;
98+
import org.apache.cassandra.service.accord.journal.AccordTopologyUpdate.TopologyImage;
9899
import org.apache.cassandra.service.accord.serializers.Version;
99100
import org.apache.cassandra.service.paxos.PaxosRepairHistory;
100101
import org.apache.cassandra.service.paxos.uncommitted.PaxosRows;
@@ -118,6 +119,8 @@
118119
import static org.apache.cassandra.config.DatabaseDescriptor.paxosStatePurging;
119120
import static org.apache.cassandra.service.accord.AccordKeyspace.CFKAccessor;
120121
import static org.apache.cassandra.service.accord.AccordKeyspace.JournalColumns.getJournalKey;
122+
import static org.apache.cassandra.service.accord.journal.AccordTopologyUpdate.Kind.Image;
123+
import static org.apache.cassandra.service.accord.journal.AccordTopologyUpdate.Kind.Repeat;
121124

122125
/**
123126
* Merge multiple iterators over the content of sstable into a "compacted" iterator.
@@ -868,7 +871,6 @@ class AccordJournalPurger extends AbstractPurger
868871
public AccordJournalPurger(AccordCompactionInfos compactionInfos, Version version, ColumnFamilyStore cfs)
869872
{
870873
this.userVersion = version;
871-
872874
this.infos = compactionInfos;
873875
this.recordColumn = cfs.metadata().getColumn(ColumnIdentifier.getInterned("record", false));
874876
this.versionColumn = cfs.metadata().getColumn(ColumnIdentifier.getInterned("user_version", false));
@@ -943,8 +945,10 @@ static abstract class AccordRowCompactor<T extends FlyweightImage>
943945

944946
static class TopologyCompactor extends AccordMergingCompactor<AccordTopologyUpdate.Accumulator>
945947
{
946-
AccordTopologyUpdate.TopologyImage lastChangedTopology;
948+
TopologyImage lastImage;
949+
boolean hasWritten;
947950
final long minEpoch;
951+
948952
TopologyCompactor(FlyweightSerializer<Object, AccordTopologyUpdate.Accumulator> serializer, Version userVersion, long minEpoch)
949953
{
950954
super(serializer, userVersion);
@@ -960,21 +964,33 @@ void reset(JournalKey key, UnfilteredRowIterator partition)
960964
@Override
961965
UnfilteredRowIterator result(JournalKey journalKey, DecoratedKey partitionKey) throws IOException
962966
{
963-
AccordTopologyUpdate.TopologyImage current = builder.get();
967+
Invariants.require(lastImage != null || !hasWritten);
968+
TopologyImage read = builder.read();
964969

965-
if (lastChangedTopology != null && current.getUpdate() != null && lastChangedTopology.getUpdate().isEquivalent(current.getUpdate()))
966-
builder.update(current.asNoOp());
970+
if (read.epoch() < minEpoch)
971+
{
972+
if (read.kind() == Image)
973+
lastImage = read;
974+
return null;
975+
}
967976

968-
if (builder.get().kind() != AccordTopologyUpdate.Kind.NoOp)
977+
TopologyImage write = read;
978+
if (read.kind() == Repeat && !hasWritten)
969979
{
970-
lastChangedTopology = builder.get();
971-
Invariants.nonNull(lastChangedTopology.getUpdate());
980+
Invariants.require(lastImage != null);
981+
write = new TopologyImage(read.epoch(), Image, lastImage.getUpdate());
982+
}
983+
else if (hasWritten && read.kind() == Repeat && lastImage.getUpdate().isEquivalent(read.getUpdate()))
984+
{
985+
write = read.asRepeat();
972986
}
973987

974-
if (builder.get().epoch() >= minEpoch)
975-
return super.result(journalKey, partitionKey);
976-
else
977-
return null;
988+
if (write.kind() == Image)
989+
lastImage = write;
990+
991+
hasWritten = true;
992+
builder.write(write);
993+
return super.result(journalKey, partitionKey);
978994
}
979995
}
980996

src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -219,8 +219,8 @@ private ExecutorsTable()
219219
" txn_id 'TxnIdUtf8Type',\n" +
220220
" txn_id_additional 'TxnIdUtf8Type',\n" +
221221
" keys text,\n" +
222-
" keysLoad text,\n" +
223-
" keysLoadFor text,\n" +
222+
" keys_loading text,\n" +
223+
" keys_loading_for text,\n" +
224224
" PRIMARY KEY (executor_id, status, position, unique_position)" +
225225
')', UTF8Type.instance));
226226
}
@@ -242,14 +242,14 @@ public DataSet data()
242242
else uniquePos = 0;
243243
prev = info;
244244
PreLoadContext preLoadContext = info.preLoadContext();
245-
ds.row(executorId, info.status(), info.position(), uniquePos)
245+
ds.row(executorId, Objects.toString(info.status()), info.position(), uniquePos)
246246
.column("description", info.describe())
247247
.column("command_store_id", info.commandStoreId())
248-
.column("txn_id", preLoadContext == null ? null : preLoadContext.primaryTxnId())
249-
.column("txn_id_additional", preLoadContext == null ? null : preLoadContext.additionalTxnId())
250-
.column("keys", preLoadContext == null ? null : preLoadContext.keys())
251-
.column("keysLoad", preLoadContext == null ? null : preLoadContext.loadKeys())
252-
.column("keysLoadFor", preLoadContext == null ? null : preLoadContext.loadKeysFor())
248+
.column("txn_id", preLoadContext == null ? null : toStringOrNull(preLoadContext.primaryTxnId()))
249+
.column("txn_id_additional", preLoadContext == null ? null : toStringOrNull(preLoadContext.additionalTxnId()))
250+
.column("keys", preLoadContext == null ? null : toStringOrNull(preLoadContext.keys()))
251+
.column("keys_loading", preLoadContext == null ? null : toStringOrNull(preLoadContext.loadKeys()))
252+
.column("keys_loading_for", preLoadContext == null ? null : toStringOrNull(preLoadContext.loadKeysFor()))
253253
;
254254
}
255255
}
@@ -265,9 +265,9 @@ private CoordinationsTable()
265265
super(parse(VIRTUAL_ACCORD_DEBUG, COORDINATIONS,
266266
"Accord Coordination State",
267267
"CREATE TABLE %s (\n" +
268-
" txn_id int,\n" +
268+
" txn_id 'TxnIdUtf8Type',\n" +
269269
" kind text,\n" +
270-
" coordination_id int,\n" +
270+
" coordination_id bigint,\n" +
271271
" description text,\n" +
272272
" nodes text,\n" +
273273
" nodes_inflight text,\n" +
@@ -286,7 +286,7 @@ public DataSet data()
286286
SimpleDataSet ds = new SimpleDataSet(metadata());
287287
for (Coordination c : coordinations)
288288
{
289-
ds.row(c.txnId(), c.kind().toString(), c.coordinationId())
289+
ds.row(toStringOrNull(c.txnId()), c.kind().toString(), c.coordinationId())
290290
.column("nodes", toStringOrNull(c.nodes()))
291291
.column("nodes_inflight", toStringOrNull(c.inflight()))
292292
.column("nodes_contacted", toStringOrNull(c.contacted()))

src/java/org/apache/cassandra/service/accord/AccordCacheEntry.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -335,6 +335,21 @@ void onSuccess(Runnable onSuccess)
335335
this.onSuccess = new ArrayList<>();
336336
this.onSuccess.add(onSuccess);
337337
}
338+
339+
static void notify(List<Runnable> onSuccess)
340+
{
341+
if (onSuccess != null)
342+
{
343+
onSuccess.forEach(run -> {
344+
try { run.run(); }
345+
catch (Throwable t)
346+
{
347+
Thread thread = Thread.currentThread();
348+
thread.getUncaughtExceptionHandler().uncaughtException(thread, t);
349+
}
350+
});
351+
}
352+
}
338353
}
339354

340355
public interface SaveExecutor
@@ -506,8 +521,7 @@ boolean save()
506521
setStatus(LOADED);
507522
if (waitingToSave != null)
508523
this.state = state;
509-
if (identity.onSuccess != null)
510-
identity.onSuccess.forEach(Runnable::run);
524+
UniqueSave.notify(identity.onSuccess);
511525
return false;
512526
}
513527
else
@@ -521,6 +535,9 @@ boolean save()
521535

522536
boolean saved(Object identity, Throwable fail)
523537
{
538+
if (identity instanceof UniqueSave)
539+
UniqueSave.notify(((UniqueSave) identity).onSuccess);
540+
524541
if (!is(SAVING))
525542
return false;
526543

src/java/org/apache/cassandra/service/accord/AccordCommandStore.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,6 @@
7474
import org.apache.cassandra.service.accord.IAccordService.AccordCompactionInfo;
7575
import org.apache.cassandra.service.accord.api.TokenKey;
7676
import org.apache.cassandra.service.accord.txn.TxnRead;
77-
import org.apache.cassandra.service.paxos.PaxosState;
7877
import org.apache.cassandra.utils.Clock;
7978

8079
import static accord.api.Journal.CommandUpdate;
@@ -584,13 +583,6 @@ void maybeLoadRangesForEpoch(CommandStores.RangesForEpoch rangesForEpoch)
584583
loadRangesForEpoch(rangesForEpoch);
585584
}
586585

587-
@Override
588-
public void updateRangesForEpoch(SafeCommandStore safeStore)
589-
{
590-
super.updateRangesForEpoch(safeStore);
591-
updateMinHlc(PaxosState.ballotTracker().getLowBound().unixMicros() + 1);
592-
}
593-
594586
// TODO (expected): handle journal failures, and consider how we handle partial failures.
595587
// Very likely we will not be able to safely or cleanly handle partial failures of this logic, but decide and document.
596588
// TODO (desired): consider merging with PersistentField? This version is cheaper to manage which may be preferable at the CommandStore level.

src/java/org/apache/cassandra/service/accord/AccordJournal.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -389,12 +389,17 @@ public boolean hasNext()
389389
public TopologyUpdate next()
390390
{
391391
Journal.KeyRefs<JournalKey> ref = iter.next();
392-
System.out.println(ref.key());
393-
Accumulator read = readAll(ref.key());
394-
if (read.accumulated.kind() == Kind.NoOp)
395-
prev = read.accumulated.asImage(Invariants.nonNull(prev.getUpdate()));
396-
else
397-
prev = read.accumulated;
392+
Accumulator reader = readAll(ref.key());
393+
if (reader.read().kind() == Kind.Repeat)
394+
{
395+
if (prev == null)
396+
{
397+
logger.error("Encountered TopologyImage Repeat record for epoch {}, but no prior image record was found", ref.key().id.epoch());
398+
return null;
399+
}
400+
prev = reader.read().asImage(Invariants.nonNull(prev.getUpdate()));
401+
}
402+
else prev = reader.read();
398403

399404
return new TopologyUpdate(prev.getUpdate().commandStores,
400405
prev.getUpdate().global);
@@ -411,6 +416,9 @@ public void close()
411416
while (iter.hasNext())
412417
{
413418
TopologyUpdate next = iter.next();
419+
if (next == null)
420+
continue;
421+
414422
Invariants.require(prev == null || next.global.epoch() > prev.global.epoch());
415423
// Due to partial compaction, we can clean up only some of the old epochs, creating gaps. We skip these epochs here.
416424
if (prev != null && next.global.epoch() > prev.global.epoch() + 1)

src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import accord.primitives.Unseekables;
4343
import org.apache.cassandra.service.accord.AccordCommandStore.ExclusiveCaches;
4444
import org.apache.cassandra.service.accord.AccordCommandStore.SafeRedundantBefore;
45+
import org.apache.cassandra.service.paxos.PaxosState;
4546

4647
import static accord.utils.Invariants.illegalState;
4748

@@ -169,6 +170,13 @@ protected AccordSafeCommandsForKey getInternal(RoutingKey key)
169170
return commandsForKey.get(key);
170171
}
171172

173+
@Override
174+
public void setRangesForEpoch(CommandStores.RangesForEpoch rangesForEpoch)
175+
{
176+
super.setRangesForEpoch(rangesForEpoch);
177+
commandStore.updateMinHlc(PaxosState.ballotTracker().getLowBound().unixMicros() + 1);
178+
}
179+
172180
@Override
173181
public AccordCommandStore commandStore()
174182
{

src/java/org/apache/cassandra/service/accord/AccordService.java

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import com.google.common.primitives.Ints;
4040

4141
import org.apache.cassandra.metrics.AccordReplicaMetrics;
42+
import org.apache.cassandra.service.accord.api.AccordViolationHandler;
4243
import org.apache.cassandra.utils.Clock;
4344
import org.apache.cassandra.utils.concurrent.ImmediateFuture;
4445
import org.slf4j.Logger;
@@ -132,6 +133,7 @@
132133
import org.apache.cassandra.utils.FBUtilities;
133134
import org.apache.cassandra.utils.concurrent.AsyncPromise;
134135
import org.apache.cassandra.utils.concurrent.Future;
136+
import org.apache.cassandra.utils.concurrent.Promise;
135137
import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
136138

137139
import static accord.api.Journal.TopologyUpdate;
@@ -315,7 +317,10 @@ public synchronized static AccordService startup(NodeId tcmId)
315317
as.node.durability().start();
316318

317319
instance = as;
320+
318321
AccordReplicaMetrics.touch();
322+
AccordViolationHandler.setup();
323+
319324
WatermarkCollector.fetchAndReportWatermarksAsync(as.configService);
320325
return as;
321326
}
@@ -596,11 +601,27 @@ public static <V> V getBlocking(AsyncChain<V> async, @Nullable TxnId txnId, Seek
596601
async.begin(result);
597602
return result.awaitAndGet();
598603
}
604+
599605
public static <V> V getBlocking(AsyncChain<V> async, Seekables<?, ?> keysOrRanges, RequestBookkeeping bookkeeping, long startedAt, long deadline)
600606
{
601607
return getBlocking(async, keysOrRanges, bookkeeping, startedAt, deadline, false);
602608
}
603609

610+
public static <V> V getBlocking(AsyncChain<V> async)
611+
{
612+
return asPromise(async).syncUninterruptibly().getNow();
613+
}
614+
615+
public static <V> Promise<V> asPromise(AsyncChain<V> async)
616+
{
617+
AsyncPromise<V> promise = new AsyncPromise<>();
618+
async.begin((result, failure) -> {
619+
if (failure == null) promise.trySuccess(result);
620+
else promise.tryFailure(failure);
621+
});
622+
return promise;
623+
}
624+
604625
public static Keys intersecting(Keys keys)
605626
{
606627
if (keys.isEmpty())
@@ -974,7 +995,7 @@ public Node node()
974995
@Override
975996
public void ensureMinHlc(long minHlc)
976997
{
977-
node.updateMinHlc(minHlc >= 0 ? minHlc : 0);
998+
asPromise(node.updateMinHlc(minHlc >= 0 ? minHlc : 0)).syncUninterruptibly();
978999
}
9791000

9801001
public AccordJournal journal()
@@ -985,13 +1006,7 @@ public AccordJournal journal()
9851006
@Override
9861007
public Future<Void> epochReady(Epoch epoch)
9871008
{
988-
AsyncPromise<Void> promise = new AsyncPromise<>();
989-
AsyncChain<Void> ready = configService.epochReady(epoch.getEpoch());
990-
ready.begin((result, failure) -> {
991-
if (failure == null) promise.trySuccess(result);
992-
else promise.tryFailure(failure);
993-
});
994-
return promise;
1009+
return asPromise(configService.epochReady(epoch.getEpoch()));
9951010
}
9961011

9971012
@Override

src/java/org/apache/cassandra/service/accord/api/AccordAgent.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -131,15 +131,6 @@ public void setNodeId(Node.Id id)
131131
self = id;
132132
}
133133

134-
@Override
135-
public void onInconsistentTimestamp(Command command, Timestamp prev, Timestamp next)
136-
{
137-
// TODO (expected): better reporting
138-
AssertionError error = new AssertionError("Inconsistent execution timestamp detected for txnId " + command.txnId() + ": " + prev + " != " + next);
139-
onUncaughtException(error);
140-
throw error;
141-
}
142-
143134
@Override
144135
public void onFailedBootstrap(int attempts, String phase, Ranges ranges, Runnable retry, Throwable failure)
145136
{

src/java/org/apache/cassandra/service/accord/api/AccordViolationHandler.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,28 +18,37 @@
1818

1919
package org.apache.cassandra.service.accord.api;
2020

21-
import javax.annotation.Nullable;
22-
2321
import org.slf4j.Logger;
2422
import org.slf4j.LoggerFactory;
2523

2624
import accord.api.ViolationHandler;
25+
import accord.local.Command;
26+
import accord.local.SafeCommandStore;
2727
import accord.primitives.Participants;
28+
import accord.primitives.Route;
2829
import accord.primitives.Timestamp;
2930
import accord.primitives.TxnId;
3031

32+
import static accord.utils.Invariants.illegalState;
33+
3134
public class AccordViolationHandler implements ViolationHandler
3235
{
3336
private static final Logger logger = LoggerFactory.getLogger(AccordViolationHandler.class);
3437

35-
static
38+
public static void setup()
3639
{
3740
ViolationHandlerHolder.set(AccordViolationHandler::new);
3841
}
3942

4043
@Override
41-
public void onViolation(String message, Participants<?> participants, @Nullable TxnId notWitnessed, @Nullable Timestamp notWitnessedExecuteAt, @Nullable TxnId by, @Nullable Timestamp byEexecuteAt)
44+
public void onTimestampViolation(SafeCommandStore safeStore, Command command, Participants<?> otherParticipants, Route<?> otherRoute, Timestamp otherExecuteAt)
45+
{
46+
throw illegalState(ViolationHandler.timestampViolationMessage(safeStore, command, otherParticipants, otherRoute, otherExecuteAt));
47+
}
48+
49+
@Override
50+
public void onDependencyViolation(Participants<?> participants, TxnId notWitnessed, Timestamp notWitnessedExecuteAt, TxnId by, Timestamp byExecuteAt)
4251
{
43-
logger.error(message);
52+
logger.error(ViolationHandler.dependencyViolationMessage(participants, notWitnessed, notWitnessedExecuteAt, by, byExecuteAt));
4453
}
4554
}

0 commit comments

Comments
 (0)