Skip to content

Commit 0151131

Browse files
ifesdjeenbelliottsmith
authored andcommitted
Split out Topologies into Partitions
patch by Alex Petrov; reviewed by Benedict for CASSANDRA-20838
1 parent ae673a3 commit 0151131

File tree

11 files changed

+178
-212
lines changed

11 files changed

+178
-212
lines changed

src/java/org/apache/cassandra/db/compaction/CompactionIterator.java

Lines changed: 44 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -28,18 +28,14 @@
2828
import java.util.concurrent.TimeUnit;
2929
import java.util.function.LongPredicate;
3030
import java.util.function.Supplier;
31-
32-
import com.google.common.annotations.VisibleForTesting;
33-
import com.google.common.collect.ImmutableSet;
34-
import com.google.common.collect.Ordering;
35-
import org.slf4j.Logger;
36-
import org.slf4j.LoggerFactory;
37-
3831
import accord.local.Cleanup;
3932
import accord.local.DurableBefore;
4033
import accord.local.RedundantBefore;
4134
import accord.utils.Invariants;
4235
import accord.utils.UnhandledEnum;
36+
import com.google.common.annotations.VisibleForTesting;
37+
import com.google.common.collect.ImmutableSet;
38+
import com.google.common.collect.Ordering;
4339
import org.apache.cassandra.config.DatabaseDescriptor;
4440
import org.apache.cassandra.cql3.ColumnIdentifier;
4541
import org.apache.cassandra.db.AbstractCompactionController;
@@ -108,6 +104,9 @@
108104
import org.apache.cassandra.utils.TimeUUID;
109105
import org.apache.cassandra.utils.btree.BTree;
110106
import org.apache.cassandra.utils.btree.UpdateFunction;
107+
import org.slf4j.Logger;
108+
import org.slf4j.LoggerFactory;
109+
111110

112111
import static accord.local.Cleanup.ERASE;
113112
import static accord.local.Cleanup.Input.PARTIAL;
@@ -864,8 +863,6 @@ class AccordJournalPurger extends AbstractPurger
864863

865864
JournalKey key;
866865
AccordRowCompactor<?> compactor;
867-
// Initialize topology serializer during compaction to avoid deserializing redundant epochs
868-
FlyweightSerializer<AccordTopologyUpdate, FlyweightImage> topologySerializer;
869866
final Version userVersion;
870867

871868
public AccordJournalPurger(AccordCompactionInfos compactionInfos, Version version, ColumnFamilyStore cfs)
@@ -875,7 +872,6 @@ public AccordJournalPurger(AccordCompactionInfos compactionInfos, Version versio
875872
this.infos = compactionInfos;
876873
this.recordColumn = cfs.metadata().getColumn(ColumnIdentifier.getInterned("record", false));
877874
this.versionColumn = cfs.metadata().getColumn(ColumnIdentifier.getInterned("user_version", false));
878-
this.topologySerializer = (FlyweightSerializer<AccordTopologyUpdate, FlyweightImage>) (FlyweightSerializer) new AccordTopologyUpdate.AccumulatingSerializer(() -> infos.minEpoch);
879875
}
880876

881877
@SuppressWarnings("unchecked")
@@ -891,7 +887,7 @@ protected void beginPartition(UnfilteredRowIterator partition)
891887
compactor = new AccordCommandRowCompactor(infos, userVersion, nowInSec);
892888
break;
893889
case TOPOLOGY_UPDATE:
894-
compactor = new AccordMergingCompactor(topologySerializer, userVersion);
890+
compactor = new TopologyCompactor((FlyweightSerializer<Object, AccordTopologyUpdate.Accumulator>) key.type.serializer, userVersion, infos.minEpoch);
895891
break;
896892
default:
897893
compactor = new AccordMergingCompactor(key.type.serializer, userVersion);
@@ -945,6 +941,43 @@ static abstract class AccordRowCompactor<T extends FlyweightImage>
945941
abstract UnfilteredRowIterator result(JournalKey journalKey, DecoratedKey partitionKey) throws IOException;
946942
}
947943

944+
static class TopologyCompactor extends AccordMergingCompactor<AccordTopologyUpdate.Accumulator>
945+
{
946+
AccordTopologyUpdate.TopologyImage lastChangedTopology;
947+
final long minEpoch;
948+
TopologyCompactor(FlyweightSerializer<Object, AccordTopologyUpdate.Accumulator> serializer, Version userVersion, long minEpoch)
949+
{
950+
super(serializer, userVersion);
951+
this.minEpoch = minEpoch;
952+
}
953+
954+
@Override
955+
void reset(JournalKey key, UnfilteredRowIterator partition)
956+
{
957+
super.reset(key, partition);
958+
}
959+
960+
@Override
961+
UnfilteredRowIterator result(JournalKey journalKey, DecoratedKey partitionKey) throws IOException
962+
{
963+
AccordTopologyUpdate.TopologyImage current = builder.get();
964+
965+
if (lastChangedTopology != null && current.getUpdate() != null && lastChangedTopology.getUpdate().isEquivalent(current.getUpdate()))
966+
builder.update(current.asNoOp());
967+
968+
if (builder.get().kind() != AccordTopologyUpdate.Kind.NoOp)
969+
{
970+
lastChangedTopology = builder.get();
971+
Invariants.nonNull(lastChangedTopology.getUpdate());
972+
}
973+
974+
if (builder.get().epoch() >= minEpoch)
975+
return super.result(journalKey, partitionKey);
976+
else
977+
return null;
978+
}
979+
}
980+
948981
static class AccordMergingCompactor<T extends FlyweightImage> extends AccordRowCompactor<T>
949982
{
950983
final T builder;

src/java/org/apache/cassandra/journal/Journal.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -960,12 +960,6 @@ private KeyRefs(int maxSize)
960960
this.segments = new long[maxSize];
961961
}
962962

963-
public void segments(LongConsumer consumer)
964-
{
965-
for (int i = 0; i < size; i++)
966-
consumer.accept(segments[i]);
967-
}
968-
969963
public long[] copyOfSegments()
970964
{
971965
return segments == null ? new long[0] : Arrays.copyOf(segments, size);

src/java/org/apache/cassandra/service/accord/AccordJournal.java

Lines changed: 46 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import java.util.Collection;
2525
import java.util.Collections;
2626
import java.util.Deque;
27-
import java.util.Iterator;
2827
import java.util.List;
2928
import java.util.NavigableMap;
3029
import java.util.Queue;
@@ -85,7 +84,6 @@
8584
import org.apache.cassandra.service.accord.AccordJournalValueSerializers.FlyweightImage;
8685
import org.apache.cassandra.service.accord.AccordJournalValueSerializers.IdentityAccumulator;
8786
import org.apache.cassandra.service.accord.JournalKey.JournalKeySupport;
88-
import org.apache.cassandra.service.accord.journal.AccordTopologyUpdate;
8987
import org.apache.cassandra.service.accord.serializers.CommandSerializers;
9088
import org.apache.cassandra.service.accord.serializers.CommandSerializers.ExecuteAtSerializer;
9189
import org.apache.cassandra.service.accord.serializers.DepsSerializers;
@@ -114,6 +112,10 @@
114112
import static accord.local.Cleanup.Input.FULL;
115113
import static org.apache.cassandra.service.accord.AccordJournalValueSerializers.DurableBeforeAccumulator;
116114
import static org.apache.cassandra.service.accord.JournalKey.Type.COMMAND_DIFF;
115+
import static org.apache.cassandra.service.accord.journal.AccordTopologyUpdate.Accumulator;
116+
import static org.apache.cassandra.service.accord.journal.AccordTopologyUpdate.Kind;
117+
import static org.apache.cassandra.service.accord.journal.AccordTopologyUpdate.TopologyImage;
118+
import static org.apache.cassandra.service.accord.journal.AccordTopologyUpdate.newTopology;
117119
import static org.apache.cassandra.utils.FBUtilities.getAvailableProcessors;
118120

119121
public class AccordJournal implements accord.api.Journal, RangeSearcher.Supplier, Shutdownable
@@ -367,33 +369,58 @@ public void saveCommand(int commandStoreId, CommandUpdate update, @Nullable Runn
367369
journal.onDurable(pointer, onFlush);
368370
}
369371

370-
public void patchCommand(int commandStoreId, TxnId txnId, Cleanup cleanup, @Nullable Runnable onFlush)
372+
@Override
373+
public CloseableIterator<TopologyUpdate> replayTopologies()
371374
{
372-
Builder change = new Builder(txnId);
373-
change.maybeCleanup(false, cleanup);
375+
return new CloseableIterator<>()
376+
{
377+
final CloseableIterator<Journal.KeyRefs<JournalKey>> iter = journalTable.keyIterator(topologyUpdateKey(0L),
378+
topologyUpdateKey(Timestamp.MAX_EPOCH));
379+
TopologyImage prev = null;
374380

375-
JournalKey key = new JournalKey(txnId, JournalKey.Type.COMMAND_DIFF, commandStoreId);
376-
RecordPointer pointer = journal.asyncWrite(key, (out, userVersion) -> change.serialize(out, Version.fromVersion(configuration().userVersion())));
377-
if (onFlush != null)
378-
journal.onDurable(pointer, onFlush);
379-
}
381+
@Override
382+
public boolean hasNext()
383+
{
384+
return iter.hasNext();
385+
}
380386

381-
@Override
382-
public Iterator<AccordTopologyUpdate.ImmutableTopoloyImage> replayTopologies()
383-
{
384-
AccordTopologyUpdate.Accumulator accumulator = readAll(TopologyUpdateKey);
385-
return accumulator.images();
387+
@Override
388+
public TopologyUpdate next()
389+
{
390+
Journal.KeyRefs<JournalKey> ref = iter.next();
391+
Accumulator read = readAll(ref.key());
392+
if (read.accumulated.kind() == Kind.NoOp)
393+
prev = read.accumulated.asImage(Invariants.nonNull(prev.getUpdate()));
394+
else
395+
prev = read.accumulated;
396+
397+
return new TopologyUpdate(prev.getUpdate().commandStores,
398+
prev.getUpdate().global);
399+
}
400+
401+
@Override
402+
public void close()
403+
{
404+
iter.close();
405+
}
406+
};
386407
}
387408

388-
private static final JournalKey TopologyUpdateKey = new JournalKey(TxnId.NONE, JournalKey.Type.TOPOLOGY_UPDATE, 0);
389409
@Override
390410
public void saveTopology(TopologyUpdate topologyUpdate, Runnable onFlush)
391411
{
392-
RecordPointer pointer = appendInternal(TopologyUpdateKey, AccordTopologyUpdate.newTopology(topologyUpdate));
412+
RecordPointer pointer = appendInternal(topologyUpdateKey(topologyUpdate.global.epoch()),
413+
newTopology(topologyUpdate));
393414
if (onFlush != null)
394415
journal.onDurable(pointer, onFlush);
395416
}
396417

418+
private static JournalKey topologyUpdateKey(long epoch)
419+
{
420+
return new JournalKey(TxnId.fromValues(epoch, 0L, Node.Id.NONE),
421+
JournalKey.Type.TOPOLOGY_UPDATE, Integer.MAX_VALUE);
422+
}
423+
397424
private static final JournalKey DURABLE_BEFORE_KEY = new JournalKey(TxnId.NONE, JournalKey.Type.DURABLE_BEFORE, 0);
398425

399426
@Override
@@ -460,6 +487,7 @@ public Builder load(int commandStoreId, TxnId txnId)
460487
public <BUILDER extends FlyweightImage> BUILDER readAll(JournalKey key)
461488
{
462489
BUILDER builder = (BUILDER) key.type.serializer.mergerFor();
490+
builder.reset(key);
463491
// TODO (expected): this can be further improved to avoid allocating lambdas
464492
AccordJournalValueSerializers.FlyweightSerializer<?, BUILDER> serializer = (AccordJournalValueSerializers.FlyweightSerializer<?, BUILDER>) key.type.serializer;
465493
// TODO (expected): for those where we store an image, read only the first entry we find in DESC order
@@ -1170,4 +1198,4 @@ private static void skip(TxnId txnId, Field field, DataInputPlus in, Version use
11701198
}
11711199
}
11721200
}
1173-
}
1201+
}

src/java/org/apache/cassandra/service/accord/AccordJournalValueSerializers.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ public Accumulator(A initial)
111111
this.accumulated = initial;
112112
}
113113

114-
protected void update(V newValue)
114+
public void update(V newValue)
115115
{
116116
accumulated = accumulate(accumulated, newValue);
117117
}

src/java/org/apache/cassandra/service/accord/AccordKeyspace.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -544,12 +544,12 @@ public static DecoratedKey decorate(JournalKey key)
544544
{
545545
int commandStoreIdBytes = VIntCoding.computeUnsignedVIntSize(key.commandStoreId);
546546
int length = commandStoreIdBytes + 1;
547-
if (key.type == JournalKey.Type.COMMAND_DIFF)
547+
if (key.type.usesTxnId)
548548
length += CommandSerializers.txnId.serializedSize(key.id);
549549
ByteBuffer pk = ByteBuffer.allocate(length);
550550
ByteBufferAccessor.instance.putUnsignedVInt32(pk, 0, key.commandStoreId);
551551
pk.put(commandStoreIdBytes, (byte)key.type.id);
552-
if (key.type == JournalKey.Type.COMMAND_DIFF)
552+
if (key.type.usesTxnId)
553553
CommandSerializers.txnId.serializeComparable(key.id, pk, ByteBufferAccessor.instance, commandStoreIdBytes + 1);
554554
return Journal.partitioner.decorateKey(pk);
555555
}
@@ -569,7 +569,13 @@ public static JournalKey getJournalKey(ByteBuffer bb)
569569
int storeId = ByteBufferAccessor.instance.getUnsignedVInt32(bb, 0);
570570
int offset = VIntCoding.readLengthOfVInt(bb, 0);
571571
JournalKey.Type type = JournalKey.Type.fromId(bb.get(offset));
572-
TxnId txnId = type != JournalKey.Type.COMMAND_DIFF ? TxnId.NONE : CommandSerializers.txnId.deserializeComparable(bb, ByteBufferAccessor.instance, offset + 1);
572+
TxnId txnId;
573+
574+
if (type.usesTxnId)
575+
txnId = CommandSerializers.txnId.deserializeComparable(bb, ByteBufferAccessor.instance, offset + 1);
576+
else
577+
txnId = TxnId.NONE;
578+
573579
return new JournalKey(txnId, type, storeId);
574580
}
575581
}

src/java/org/apache/cassandra/service/accord/AccordService.java

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import java.util.Collection;
2424
import java.util.Collections;
2525
import java.util.HashSet;
26-
import java.util.Iterator;
2726
import java.util.List;
2827
import java.util.Set;
2928
import java.util.concurrent.ExecutionException;
@@ -45,7 +44,6 @@
4544
import org.slf4j.Logger;
4645
import org.slf4j.LoggerFactory;
4746

48-
import accord.api.Journal;
4947
import accord.api.ProtocolModifiers;
5048
import accord.coordinate.CoordinateMaxConflict;
5149
import accord.coordinate.CoordinateTransaction;
@@ -130,12 +128,14 @@
130128
import org.apache.cassandra.tcm.listeners.ChangeListener;
131129
import org.apache.cassandra.tcm.membership.NodeId;
132130
import org.apache.cassandra.transport.Dispatcher;
131+
import org.apache.cassandra.utils.CloseableIterator;
133132
import org.apache.cassandra.utils.ExecutorUtils;
134133
import org.apache.cassandra.utils.FBUtilities;
135134
import org.apache.cassandra.utils.concurrent.AsyncPromise;
136135
import org.apache.cassandra.utils.concurrent.Future;
137136
import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
138137

138+
import static accord.api.Journal.TopologyUpdate;
139139
import static accord.api.ProtocolModifiers.Toggles.FastExec.MAY_BYPASS_SAFESTORE;
140140
import static accord.local.LoadKeys.SYNC;
141141
import static accord.local.LoadKeysFor.READ_WRITE;
@@ -157,7 +157,6 @@
157157
import static org.apache.cassandra.journal.Params.ReplayMode.RESET;
158158
import static org.apache.cassandra.metrics.ClientRequestsMetricsHolder.accordReadBookkeeping;
159159
import static org.apache.cassandra.metrics.ClientRequestsMetricsHolder.accordWriteBookkeeping;
160-
import static org.apache.cassandra.service.accord.journal.AccordTopologyUpdate.ImmutableTopoloyImage;
161160
import static org.apache.cassandra.service.consensus.migration.ConsensusRequestRouter.getTableMetadata;
162161
import static org.apache.cassandra.utils.Clock.Global.nanoTime;
163162

@@ -429,20 +428,22 @@ public synchronized void startup()
429428
ClusterMetadata metadata = ClusterMetadata.current();
430429
configService.updateMapping(metadata);
431430

432-
List<ImmutableTopoloyImage> images = new ArrayList<>();
431+
List<TopologyUpdate> images = new ArrayList<>();
433432

433+
TopologyUpdate prev = null;
434434
// Collect locally known topologies
435-
Iterator<ImmutableTopoloyImage> iter = journal.replayTopologies();
436-
Journal.TopologyUpdate prev = null;
437-
while (iter.hasNext())
435+
try (CloseableIterator<TopologyUpdate> iter = journal.replayTopologies())
438436
{
439-
ImmutableTopoloyImage next = iter.next();
440-
// Due to partial compaction, we can clean up only some of the old epochs, creating gaps. We skip these epochs here.
441-
if (prev != null && next.global.epoch() > prev.global.epoch() + 1)
442-
images.clear();
437+
while (iter.hasNext())
438+
{
439+
TopologyUpdate next = iter.next();
440+
// Due to partial compaction, we can clean up only some of the old epochs, creating gaps. We skip these epochs here.
441+
if (prev != null && next.global.epoch() > prev.global.epoch() + 1)
442+
images.clear();
443443

444-
images.add(next);
445-
prev = next;
444+
images.add(next);
445+
prev = next;
446+
}
446447
}
447448

448449
// Instantiate latest topology from the log, if known
@@ -452,7 +453,7 @@ public synchronized void startup()
452453
}
453454

454455
// Replay local epochs
455-
for (ImmutableTopoloyImage image : images)
456+
for (TopologyUpdate image : images)
456457
configService.reportTopology(image.global);
457458

458459
// Subscribe to TCM events
@@ -524,9 +525,6 @@ private TopologyRange fetchTopologies(long from) throws ExecutionException, Inte
524525
try
525526
{
526527
logger.info("Fetching topologies for epochs [{}, {}] from {}", from, metadata.epoch.getEpoch(), peers);
527-
Invariants.require(from <= metadata.epoch.getEpoch(),
528-
"Accord epochs should never be ahead of TCM ones, but %d was ahead of %d", from, metadata.epoch.getEpoch());
529-
530528
Future<TopologyRange> futures = FetchTopologies.fetch(SharedContext.Global.instance,
531529
peers,
532530
from,

src/java/org/apache/cassandra/service/accord/JournalKey.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -257,22 +257,24 @@ public String toString()
257257

258258
public enum Type
259259
{
260-
COMMAND_DIFF (0, new CommandDiffSerializer()),
261-
REDUNDANT_BEFORE (1, new RedundantBeforeSerializer()),
262-
DURABLE_BEFORE (2, new DurableBeforeSerializer()),
263-
SAFE_TO_READ (3, new SafeToReadSerializer()),
264-
BOOTSTRAP_BEGAN_AT (4, new BootstrapBeganAtSerializer()),
265-
RANGES_FOR_EPOCH (5, new RangesForEpochSerializer()),
266-
TOPOLOGY_UPDATE (6, AccordTopologyUpdate.AccumulatingSerializer.defaultInstance),
260+
COMMAND_DIFF (0, new CommandDiffSerializer(), true),
261+
REDUNDANT_BEFORE (1, new RedundantBeforeSerializer(), false),
262+
DURABLE_BEFORE (2, new DurableBeforeSerializer(), false),
263+
SAFE_TO_READ (3, new SafeToReadSerializer(), false),
264+
BOOTSTRAP_BEGAN_AT (4, new BootstrapBeganAtSerializer(), false),
265+
RANGES_FOR_EPOCH (5, new RangesForEpochSerializer(), false),
266+
TOPOLOGY_UPDATE (6, new AccordTopologyUpdate.FlyweightSerializer(), true),
267267
;
268268

269269
public final int id;
270270
public final FlyweightSerializer<?, ?> serializer;
271+
public final boolean usesTxnId;
271272

272-
Type(int id, FlyweightSerializer<?, ?> serializer)
273+
Type(int id, FlyweightSerializer<?, ?> serializer, boolean usesTxnId)
273274
{
274275
this.id = id;
275276
this.serializer = serializer;
277+
this.usesTxnId = usesTxnId;
276278
}
277279

278280
private static final Type[] idToTypeMapping;

0 commit comments

Comments
 (0)