From 18c9a920bc32bf6a33d21fc551ee02310f7ab5fd Mon Sep 17 00:00:00 2001 From: Aleksey Yeschenko Date: Tue, 23 Sep 2025 16:44:45 +0100 Subject: [PATCH] Truncate mutation journal as logs get reconciled patch by Aleksey Yeschenko; reviewed by Alex Petrov for CASSANDRA-20710 --- .../cassandra/journal/ActiveSegment.java | 41 ++- .../apache/cassandra/journal/Component.java | 10 +- .../apache/cassandra/journal/Descriptor.java | 9 +- .../apache/cassandra/journal/DumpUtil.java | 2 +- .../cassandra/journal/EntrySerializer.java | 11 +- .../org/apache/cassandra/journal/Journal.java | 198 +++++++---- .../cassandra/journal/JournalReadError.java | 4 +- .../cassandra/journal/JournalWriteError.java | 4 +- .../apache/cassandra/journal/KeyStats.java | 108 ++++++ .../apache/cassandra/journal/KeySupport.java | 4 +- .../apache/cassandra/journal/Metadata.java | 3 +- .../apache/cassandra/journal/OnDiskIndex.java | 5 +- .../org/apache/cassandra/journal/Segment.java | 10 + .../apache/cassandra/journal/Segments.java | 34 +- .../cassandra/journal/StaticSegment.java | 25 +- .../cassandra/journal/ValueSerializer.java | 21 ++ .../cassandra/replication/CoordinatorLog.java | 5 + .../replication/CoordinatorLogId.java | 2 +- .../cassandra/replication/Log2OffsetsMap.java | 10 +- .../replication/MutationJournal.java | 332 ++++++++++++++++-- .../replication/MutationTrackingService.java | 38 +- .../apache/cassandra/replication/Offsets.java | 23 +- .../apache/cassandra/replication/Shard.java | 5 + .../service/accord/AccordJournal.java | 22 +- .../cassandra/service/accord/JournalKey.java | 4 +- .../reads/tracked/ReadReconciliations.java | 8 +- .../reads/tracked/TrackedLocalReads.java | 17 +- .../service/reads/tracked/TrackedRead.java | 10 +- src/java/org/apache/cassandra/utils/Crc.java | 20 ++ .../apache/cassandra/utils/FBUtilities.java | 2 + .../test/AccordJournalSimulationTest.java | 17 +- .../apache/cassandra/journal/JournalTest.java | 18 +- .../apache/cassandra/journal/SegmentTest.java | 12 +- .../cassandra/journal/SegmentsTest.java | 9 +- .../cassandra/journal/TimeUUIDKeySupport.java | 11 +- .../replication/MutationJournalTest.java | 181 ++++++++-- .../cassandra/replication/OffsetsTest.java | 42 +++ 37 files changed, 1032 insertions(+), 245 deletions(-) create mode 100644 src/java/org/apache/cassandra/journal/KeyStats.java diff --git a/src/java/org/apache/cassandra/journal/ActiveSegment.java b/src/java/org/apache/cassandra/journal/ActiveSegment.java index 140371f9801e..e84ca40bcffd 100644 --- a/src/java/org/apache/cassandra/journal/ActiveSegment.java +++ b/src/java/org/apache/cassandra/journal/ActiveSegment.java @@ -79,12 +79,19 @@ public final class ActiveSegment extends Segment private final Ref> selfRef; final InMemoryIndex index; + final KeyStats.Active keyStats; private ActiveSegment( - Descriptor descriptor, Params params, InMemoryIndex index, Metadata metadata, KeySupport keySupport) + Descriptor descriptor, + Params params, + InMemoryIndex index, + Metadata metadata, + KeyStats.Active keyStats, + KeySupport keySupport) { super(descriptor, metadata, keySupport); this.index = index; + this.keyStats = keyStats; try { channel = FileChannel.open(file.toPath(), StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE); @@ -98,16 +105,12 @@ private ActiveSegment( } } - public CommitLogPosition currentPosition() - { - return new CommitLogPosition(id(), (int) allocateOffset); - } - - static ActiveSegment create(Descriptor descriptor, Params params, KeySupport keySupport) + static ActiveSegment create( + Descriptor descriptor, Params params, KeySupport keySupport, KeyStats.Factory keyStatsFactory) { InMemoryIndex index = InMemoryIndex.create(keySupport); Metadata metadata = Metadata.empty(); - return new ActiveSegment<>(descriptor, params, index, metadata, keySupport); + return new ActiveSegment<>(descriptor, params, index, metadata, keyStatsFactory.create(), keySupport); } @Override @@ -116,6 +119,16 @@ public InMemoryIndex index() return index; } + public KeyStats.Active keyStats() + { + return keyStats; + } + + public CommitLogPosition currentPosition() + { + return new CommitLogPosition(id(), (int) allocateOffset); + } + boolean isEmpty() { return allocateOffset == 0; @@ -225,6 +238,7 @@ void persistComponents() { index.persist(descriptor); metadata.persist(descriptor); + keyStats.persist(descriptor); SyncUtil.trySyncDir(descriptor.directory); } @@ -236,6 +250,7 @@ private void discard() descriptor.fileFor(Component.DATA).deleteIfExists(); descriptor.fileFor(Component.INDEX).deleteIfExists(); descriptor.fileFor(Component.METADATA).deleteIfExists(); + descriptor.fileFor(Component.KEYSTATS).deleteIfExists(); } @Override @@ -290,6 +305,7 @@ public String name() } } + @Override public boolean isFlushed(long position) { return writtenTo >= position; @@ -465,18 +481,14 @@ final class Allocation extends RecordPointer this.buffer = buffer; } - Segment segment() - { - return ActiveSegment.this; - } - void write(K id, ByteBuffer record) { try { EntrySerializer.write(id, record, keySupport, buffer, descriptor.userVersion); - metadata.update(); index.update(id, position, length); + keyStats.update(id); + metadata.update(); } catch (IOException e) { @@ -508,6 +520,7 @@ void writeInternal(K id, ByteBuffer record) { EntrySerializer.write(id, record, keySupport, buffer, descriptor.userVersion); index.update(id, position, length); + keyStats.update(id); metadata.update(); } catch (IOException e) diff --git a/src/java/org/apache/cassandra/journal/Component.java b/src/java/org/apache/cassandra/journal/Component.java index 07da71536a6d..8e99708de031 100644 --- a/src/java/org/apache/cassandra/journal/Component.java +++ b/src/java/org/apache/cassandra/journal/Component.java @@ -24,15 +24,17 @@ import static accord.utils.SortedArrays.SortedArrayList.ofSorted; -enum Component +public enum Component { - DATA ("data"), - INDEX ("indx"), - METADATA ("meta"); + DATA ("data"), + INDEX ("indx"), + METADATA ("meta"), + KEYSTATS ("keys"); //OFFSET_MAP (".offs"), //INVLALIDATIONS (".invl"); public static final List VALUES = ofSorted(values()); + final String extension; Component(String extension) diff --git a/src/java/org/apache/cassandra/journal/Descriptor.java b/src/java/org/apache/cassandra/journal/Descriptor.java index bac6c7029fc2..e8e8556b568a 100644 --- a/src/java/org/apache/cassandra/journal/Descriptor.java +++ b/src/java/org/apache/cassandra/journal/Descriptor.java @@ -23,6 +23,8 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; +import com.google.common.annotations.VisibleForTesting; + import org.apache.cassandra.io.util.File; import static java.lang.String.format; @@ -90,7 +92,8 @@ public final class Descriptor implements Comparable this.userVersion = userVersion; } - static Descriptor create(File directory, long timestamp, int userVersion) + @VisibleForTesting + public static Descriptor create(File directory, long timestamp, int userVersion) { return new Descriptor(directory, timestamp, 1, CURRENT_JOURNAL_VERSION, userVersion); } @@ -114,12 +117,12 @@ public static Descriptor fromFile(File file) return fromName(file.parent(), file.name()); } - File fileFor(Component component) + public File fileFor(Component component) { return new File(directory, formatFileName(component)); } - File tmpFileFor(Component component) + public File tmpFileFor(Component component) { return new File(directory, formatFileName(component) + '.' + TMP_SUFFIX); } diff --git a/src/java/org/apache/cassandra/journal/DumpUtil.java b/src/java/org/apache/cassandra/journal/DumpUtil.java index d98e758384ab..404a688a3827 100644 --- a/src/java/org/apache/cassandra/journal/DumpUtil.java +++ b/src/java/org/apache/cassandra/journal/DumpUtil.java @@ -37,6 +37,6 @@ public static void dumpMetadata(Descriptor descriptor, Consumer out) public static StaticSegment open(Descriptor descriptor, KeySupport keySupport) { - return StaticSegment.open(descriptor, keySupport); + return StaticSegment.open(descriptor, keySupport, KeyStats.Factory.noop()); } } diff --git a/src/java/org/apache/cassandra/journal/EntrySerializer.java b/src/java/org/apache/cassandra/journal/EntrySerializer.java index 454234454ee6..6861cdae47ac 100644 --- a/src/java/org/apache/cassandra/journal/EntrySerializer.java +++ b/src/java/org/apache/cassandra/journal/EntrySerializer.java @@ -27,11 +27,8 @@ import java.nio.ByteBuffer; import java.util.zip.CRC32; -import static org.apache.cassandra.journal.Journal.validateCRC; - /** * Entry format: - * * [Total Size (4 bytes)] * [Header (variable size)] * [Header CRC (4 bytes)] @@ -95,10 +92,10 @@ static void read(EntryHolder into, CRC32 crc = Crc.crc32(); int headerSize = EntrySerializer.headerSize(keySupport, userVersion); int headerCrc = readAndUpdateHeaderCrc(crc, from, headerSize); - validateCRC(crc, headerCrc); + Crc.validate(crc, headerCrc); int recordCrc = readAndUpdateRecordCrc(crc, from, start + totalSize); - validateCRC(crc, recordCrc); + Crc.validate(crc, recordCrc); } readValidated(into, from, start, keySupport, userVersion); @@ -142,7 +139,7 @@ static int tryRead(EntryHolder into, int headerCrc = readAndUpdateHeaderCrc(crc, from, headerSize); try { - validateCRC(crc, headerCrc); + Crc.validate(crc, headerCrc); } catch (IOException e) { @@ -152,7 +149,7 @@ static int tryRead(EntryHolder into, int recordCrc = readAndUpdateRecordCrc(crc, from, start + totalSize); try { - validateCRC(crc, recordCrc); + Crc.validate(crc, recordCrc); } catch (IOException e) { diff --git a/src/java/org/apache/cassandra/journal/Journal.java b/src/java/org/apache/cassandra/journal/Journal.java index cf3135d77faa..59e2f9ee624c 100644 --- a/src/java/org/apache/cassandra/journal/Journal.java +++ b/src/java/org/apache/cassandra/journal/Journal.java @@ -24,14 +24,15 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.LockSupport; import java.util.function.*; -import java.util.zip.CRC32; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -56,7 +57,6 @@ import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.AbstractIterator; import org.apache.cassandra.utils.CloseableIterator; -import org.apache.cassandra.utils.Crc; import org.apache.cassandra.utils.JVMStabilityInspector; import org.apache.cassandra.utils.MergeIterator; import org.apache.cassandra.utils.Simulate; @@ -77,10 +77,12 @@ /** * A generic append-only journal with some special features: - *

    + *

    + *

      *
    • Records can be looked up by key *
    • Invalidated records get purged during segment compaction - *

    + *

+ *

* * Type parameters: * @param the type of records stored in the journal @@ -98,6 +100,7 @@ public class Journal implements Shutdownable final KeySupport keySupport; final ValueSerializer valueSerializer; + final KeyStats.Factory keyStatsFactory; final Metrics metrics; @@ -170,6 +173,7 @@ public WaitingFor(RecordPointer pointer, Runnable onFlush) this.onFlush = onFlush; } + @Override public void run() { onFlush.run(); @@ -206,11 +210,60 @@ public void close() } } + public static class Builder + { + private final String name; + private final File directory; + private final Params params; + private final KeySupport keySupport; + + private ValueSerializer valueSerializer = ValueSerializer.none(); + private KeyStats.Factory keyStatsFactory = KeyStats.Factory.noop(); + private SegmentCompactor segmentCompactor = SegmentCompactor.noop(); + + public Builder(String name, File directory, Params params, KeySupport keySupport) + { + this.name = name; + this.directory = directory; + this.params = params; + this.keySupport = keySupport; + } + + public Journal build() + { + return new Journal<>(name, directory, params, keySupport, valueSerializer, keyStatsFactory, segmentCompactor); + } + + public Builder valueSerializer(ValueSerializer valueSerializer) + { + this.valueSerializer = valueSerializer; + return this; + } + + public Builder keyStatsFactory(KeyStats.Factory keyStatsFactory) + { + this.keyStatsFactory = keyStatsFactory; + return this; + } + + public Builder segmentCompactor(SegmentCompactor segmentCompactor) + { + this.segmentCompactor = segmentCompactor; + return this; + } + } + + public static Builder builder(String name, File directory, Params params, KeySupport keySupport) + { + return new Builder<>(name, directory, params, keySupport); + } + public Journal(String name, File directory, Params params, KeySupport keySupport, ValueSerializer valueSerializer, + KeyStats.Factory keyStatsFactory, SegmentCompactor segmentCompactor) { this.name = name; @@ -219,6 +272,7 @@ public Journal(String name, this.keySupport = keySupport; this.valueSerializer = valueSerializer; + this.keyStatsFactory = keyStatsFactory; this.metrics = new Metrics<>(name); this.flusherCallbacks = new FlusherCallbacks(); @@ -250,7 +304,7 @@ public void start() : descriptors.get(descriptors.size() - 1).timestamp; nextSegmentId.set(replayLimit = Math.max(currentTimeMillis(), maxTimestamp + 1)); - segments.set(Segments.of(StaticSegment.open(descriptors, keySupport))); + segments.set(Segments.of(StaticSegment.open(descriptors, keySupport, keyStatsFactory))); closer = executorFactory().sequential(name + "-closer"); releaser = executorFactory().sequential(name + "-releaser"); allocator = executorFactory().infiniteLoop(name + "-allocator", new AllocateRunnable(), SAFE, NON_DAEMON, SYNCHRONIZED); @@ -261,12 +315,6 @@ public void start() compactor.start(); } - @VisibleForTesting - public void runCompactorForTesting() - { - compactor.run(); - } - public Compactor compactor() { return compactor; @@ -287,6 +335,7 @@ public boolean isTerminated() return state.get() == State.TERMINATED; } + @Override public void shutdown() { try @@ -606,13 +655,6 @@ public RecordPointer asyncWrite(K id, Writer writer) } } - // TODO (require): Find a better way to test unwritten allocations and/or corruption - @VisibleForTesting - public void unsafeConsumeBytesForTesting(int entrySize, Consumer corrupt) - { - allocate(entrySize).consumeBufferUnsafe(corrupt); - } - private ActiveSegment.Allocation allocate(int entrySize) { ActiveSegment segment = currentSegment; @@ -793,7 +835,7 @@ private void shutDown() throws InterruptedException private ActiveSegment createSegment() { Descriptor descriptor = Descriptor.create(directory, nextSegmentId.getAndIncrement(), params.userVersion()); - return ActiveSegment.create(descriptor, params, keySupport); + return ActiveSegment.create(descriptor, params, keySupport, keyStatsFactory); } private void closeAllSegments() @@ -810,7 +852,7 @@ private void closeAllSegments() } @SuppressWarnings("unused") - public ReferencedSegments selectAndReference(Predicate> selector) + ReferencedSegments selectAndReference(Predicate> selector) { while (true) { @@ -847,6 +889,11 @@ private void removeEmptySegment(ActiveSegment activeSegment) swapSegments(current -> current.withoutEmptySegment(activeSegment)); } + private void removeStaticSegments(Collection> staticSegments) + { + swapSegments(current -> current.withoutStaticSegments(staticSegments)); + } + private void replaceCompletedSegment(ActiveSegment activeSegment, StaticSegment staticSegment) { swapSegments(current -> current.withCompletedSegment(activeSegment, staticSegment)); @@ -948,7 +995,7 @@ public void run() activeSegment.updateWrittenTo(); activeSegment.fsync(); activeSegment.persistComponents(); - replaceCompletedSegment(activeSegment, StaticSegment.open(activeSegment.descriptor, keySupport)); + replaceCompletedSegment(activeSegment, StaticSegment.open(activeSegment.descriptor, keySupport, keyStatsFactory)); activeSegment.release(Journal.this); if (onDone != null) onDone.run(); } @@ -971,27 +1018,16 @@ protected void closeActiveSegmentAndOpenAsStatic(ActiveSegment activeSegme closer.execute(new CloseActiveSegmentRunnable(activeSegment, onDone)); } - @VisibleForTesting - public void closeCurrentSegmentForTestingIfNonEmpty() - { - ActiveSegment segment = currentSegment; - if (segment.isEmpty()) - return; - advanceSegment(segment); - while (!segments().isSwitched(segment)) - { - LockSupport.parkNanos(1000); - } - } - - /* - * Static helper methods used by journal components - */ - - static void validateCRC(CRC32 crc, int readCRC) throws Crc.InvalidCrc + public int dropStaticSegments(Predicate> dropIf) { - if (readCRC != (int) crc.getValue()) - throw new Crc.InvalidCrc(readCRC, (int) crc.getValue()); + Set> toDrop = new HashSet<>(); + segments().selectStatic(dropIf, toDrop); + if (toDrop.isEmpty()) + return 0; + removeStaticSegments(toDrop); + for (StaticSegment segment : toDrop) + segment.discard(this); + return toDrop.size(); } /* @@ -1045,24 +1081,6 @@ private String maybeAddDiskSpaceContext(String message) message, segmentSize, availableDiskSpace, directory); } - @VisibleForTesting - public void truncateForTesting() - { - ActiveSegment discarding = currentSegment; - if (!discarding.isEmpty()) // if there is no data in the segement then ignore it - { - closeCurrentSegmentForTestingIfNonEmpty(); - //TODO (desired): wait for the ActiveSegment to get released, else can see weird race conditions; - // this thread will see the static segmenet and will release it (which will delete the file), - // and the sync thread will then try to release and will fail as the file no longer exists... - while (discarding.selfRef().globalCount() > 0) {} - } - - Segments statics = swapSegments(s -> s.select(Segment::isActive)).select(Segment::isStatic); - for (Segment segment : statics.all()) - ((StaticSegment) segment).discard(this); - } - public interface Writer { void write(DataOutputPlus out, int userVersion) throws IOException; @@ -1279,4 +1297,66 @@ enum State SHUTDOWN, TERMINATED } + + /* + * Test helpers + */ + + @VisibleForTesting + public void unsafeConsumeBytesForTesting(int entrySize, Consumer corrupt) + { + // TODO (require): Find a better way to test unwritten allocations and/or corruption + allocate(entrySize).consumeBufferUnsafe(corrupt); + } + + @VisibleForTesting + public void truncateForTesting() + { + ActiveSegment discarding = currentSegment; + if (!discarding.isEmpty()) // if there is no data in the segement then ignore it + { + closeCurrentSegmentForTestingIfNonEmpty(); + //TODO (desired): wait for the ActiveSegment to get released, else can see weird race conditions; + // this thread will see the static segmenet and will release it (which will delete the file), + // and the sync thread will then try to release and will fail as the file no longer exists... + while (discarding.selfRef().globalCount() > 0) {} + } + + Segments statics = swapSegments(s -> s.select(Segment::isActive)).select(Segment::isStatic); + for (Segment segment : statics.all()) + ((StaticSegment) segment).discard(this); + } + + @VisibleForTesting + public void runCompactorForTesting() + { + compactor.run(); + } + + @VisibleForTesting + public void closeCurrentSegmentForTestingIfNonEmpty() + { + ActiveSegment segment = currentSegment; + if (segment.isEmpty()) + return; + advanceSegment(segment); + while (!segments().isSwitched(segment)) + { + LockSupport.parkNanos(1000); + } + } + + @VisibleForTesting + public void clearNeedsReplayForTesting() + { + Set> toReset = new HashSet<>(); + segments().selectStatic(toReset); + toReset.forEach(s -> s.metadata().clearNeedsReplay()); + } + + @VisibleForTesting + public int countStaticSegmentsForTesting() + { + return segments.get().count(Segment::isStatic); + } } diff --git a/src/java/org/apache/cassandra/journal/JournalReadError.java b/src/java/org/apache/cassandra/journal/JournalReadError.java index 87366c8d7c6b..70ae829b0346 100644 --- a/src/java/org/apache/cassandra/journal/JournalReadError.java +++ b/src/java/org/apache/cassandra/journal/JournalReadError.java @@ -24,13 +24,13 @@ public class JournalReadError extends FSReadError { public final Descriptor descriptor; - JournalReadError(Descriptor descriptor, File file, Throwable throwable) + public JournalReadError(Descriptor descriptor, File file, Throwable throwable) { super(throwable, file); this.descriptor = descriptor; } - JournalReadError(Descriptor descriptor, Component component, Throwable throwable) + public JournalReadError(Descriptor descriptor, Component component, Throwable throwable) { super(throwable, descriptor.fileFor(component)); this.descriptor = descriptor; diff --git a/src/java/org/apache/cassandra/journal/JournalWriteError.java b/src/java/org/apache/cassandra/journal/JournalWriteError.java index 03193af5455a..ef3f278345d8 100644 --- a/src/java/org/apache/cassandra/journal/JournalWriteError.java +++ b/src/java/org/apache/cassandra/journal/JournalWriteError.java @@ -24,13 +24,13 @@ public class JournalWriteError extends FSWriteError { public final Descriptor descriptor; - JournalWriteError(Descriptor descriptor, File file, Throwable throwable) + public JournalWriteError(Descriptor descriptor, File file, Throwable throwable) { super(throwable, file); this.descriptor = descriptor; } - JournalWriteError(Descriptor descriptor, Component component, Throwable throwable) + public JournalWriteError(Descriptor descriptor, Component component, Throwable throwable) { super(throwable, descriptor.fileFor(component)); this.descriptor = descriptor; diff --git a/src/java/org/apache/cassandra/journal/KeyStats.java b/src/java/org/apache/cassandra/journal/KeyStats.java new file mode 100644 index 000000000000..e291f7c8adf1 --- /dev/null +++ b/src/java/org/apache/cassandra/journal/KeyStats.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.journal; + +public interface KeyStats +{ + KeyStats NOOP = (KeyStats) o -> true; + + static KeyStats noop() + { + //noinspection unchecked + return (KeyStats) NOOP; + } + + boolean mayContain(K k); + + interface Active extends KeyStats + { + Active NOOP = new Active<>() + { + @Override + public void update(Object key) + { + // no-op + } + + @Override + public boolean mayContain(Object key) + { + return true; + } + + @Override + public void persist(Descriptor descriptor) + { + // no-op + } + }; + + void update(K key); + void persist(Descriptor descriptor); + } + + interface Static extends KeyStats + { + Static NOOP = key -> true; + } + + interface Factory + { + Factory NOOP = new Factory<>() + { + @Override + public Active create() + { + return Active.NOOP; + } + + @Override + public Static load(Descriptor descriptor) + { + return Static.NOOP; + } + }; + + static Factory noop() + { + //noinspection unchecked + return (Factory) NOOP; + } + + Active create(); + Static load(Descriptor descriptor); + + default Active rebuild(Descriptor descriptor, KeySupport keySupport, int fsyncedLimit) + { + Active active = create(); + try (StaticSegment.SequentialReader reader = StaticSegment.sequentialReader(descriptor, keySupport, fsyncedLimit)) + { + while (reader.advance()) + active.update(reader.key()); + } + return active; + } + + default Static rebuildAndPersist(Descriptor descriptor, KeySupport keySupport, int fsyncedLimit) + { + Active active = rebuild(descriptor, keySupport, fsyncedLimit); + active.persist(descriptor); + return load(descriptor); + } + } +} diff --git a/src/java/org/apache/cassandra/journal/KeySupport.java b/src/java/org/apache/cassandra/journal/KeySupport.java index efc41aa6c88b..5e1a7845f256 100644 --- a/src/java/org/apache/cassandra/journal/KeySupport.java +++ b/src/java/org/apache/cassandra/journal/KeySupport.java @@ -20,7 +20,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.Comparator; -import java.util.zip.Checksum; +import java.util.zip.CRC32; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; @@ -43,7 +43,7 @@ public interface KeySupport extends Comparator K deserialize(ByteBuffer buffer, int position, int userVersion); K deserialize(ByteBuffer buffer, int userVersion); - void updateChecksum(Checksum crc, K key, int userVersion); + void updateChecksum(CRC32 crc, K key, int userVersion); int compareWithKeyAt(K key, ByteBuffer buffer, int position, int userVersion); } diff --git a/src/java/org/apache/cassandra/journal/Metadata.java b/src/java/org/apache/cassandra/journal/Metadata.java index 6f9552a22901..a411713c16a7 100644 --- a/src/java/org/apache/cassandra/journal/Metadata.java +++ b/src/java/org/apache/cassandra/journal/Metadata.java @@ -25,7 +25,6 @@ import org.apache.cassandra.io.util.*; import org.apache.cassandra.utils.Crc; -import static org.apache.cassandra.journal.Journal.validateCRC; import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt; /** @@ -110,7 +109,7 @@ static Metadata read(DataInputPlus in) throws IOException updateChecksumInt(crc, recordsCount); updateChecksumInt(crc, fsyncLimit); updateChecksumInt(crc, needsReplay ? 1 : 0); - validateCRC(crc, in.readInt()); + Crc.validate(crc, in.readInt()); return new Metadata(recordsCount, fsyncLimit, needsReplay); } diff --git a/src/java/org/apache/cassandra/journal/OnDiskIndex.java b/src/java/org/apache/cassandra/journal/OnDiskIndex.java index 0a2ca8cc95d7..abe94ff1f294 100644 --- a/src/java/org/apache/cassandra/journal/OnDiskIndex.java +++ b/src/java/org/apache/cassandra/journal/OnDiskIndex.java @@ -35,7 +35,6 @@ import org.apache.cassandra.utils.Crc; import org.apache.cassandra.utils.memory.MemoryUtil; -import static org.apache.cassandra.journal.Journal.validateCRC; import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt; import static org.apache.cassandra.utils.FBUtilities.updateChecksumLong; @@ -136,11 +135,11 @@ void validate() throws IOException { int entryCount = in.readInt(); updateChecksumInt(crc, entryCount); - validateCRC(crc, in.readInt()); + Crc.validate(crc, in.readInt()); Crc.updateCrc32(crc, buffer, FILE_PREFIX_SIZE, FILE_PREFIX_SIZE + entryCount * ENTRY_SIZE); in.skipBytesFully(entryCount * ENTRY_SIZE); - validateCRC(crc, in.readInt()); + Crc.validate(crc, in.readInt()); if (in.available() != 0) throw new IOException("Trailing data encountered in segment index " + descriptor.fileFor(Component.INDEX)); diff --git a/src/java/org/apache/cassandra/journal/Segment.java b/src/java/org/apache/cassandra/journal/Segment.java index 6a9e80432f60..9bc394c26ff4 100644 --- a/src/java/org/apache/cassandra/journal/Segment.java +++ b/src/java/org/apache/cassandra/journal/Segment.java @@ -63,6 +63,7 @@ public final void tidy() } abstract Index index(); + public abstract KeyStats keyStats(); abstract boolean isActive(); abstract boolean isFlushed(long position); @@ -90,6 +91,9 @@ public long id() boolean readLast(K id, RecordConsumer consumer) { + if (!keyStats().mayContain(id)) + return false; + long offsetAndSize = index().lookUpLast(id); if (offsetAndSize == -1) return false; @@ -108,6 +112,9 @@ boolean readLast(K id, RecordConsumer consumer) boolean readLast(K id, EntrySerializer.EntryHolder into) { + if (!keyStats().mayContain(id)) + return false; + long offsetAndSize = index().lookUpLast(id); if (offsetAndSize == -1 || !read(Index.readOffset(offsetAndSize), Index.readSize(offsetAndSize), into)) return false; @@ -128,6 +135,9 @@ boolean read(RecordPointer pointer, RecordConsumer consumer) void readAll(K id, EntrySerializer.EntryHolder into, RecordConsumer onEntry) { + if (!keyStats().mayContain(id)) + return; + long[] all = index().lookUpAll(id); int prevOffset = Integer.MAX_VALUE; for (int i = 0; i < all.length; i++) diff --git a/src/java/org/apache/cassandra/journal/Segments.java b/src/java/org/apache/cassandra/journal/Segments.java index 4e01bd47b42b..dae1bbeaacf3 100644 --- a/src/java/org/apache/cassandra/journal/Segments.java +++ b/src/java/org/apache/cassandra/journal/Segments.java @@ -67,11 +67,27 @@ Segments withNewActiveSegment(ActiveSegment activeSegment) Segments withoutEmptySegment(ActiveSegment activeSegment) { Long2ObjectHashMap> newSegments = new Long2ObjectHashMap<>(segments); - Segment oldValue = segments.remove(activeSegment.descriptor.timestamp); + Segment oldValue = newSegments.remove(activeSegment.descriptor.timestamp); + Invariants.nonNull(oldValue); Invariants.require(oldValue.asActive().isEmpty()); return new Segments<>(newSegments); } + Segments withoutStaticSegments(Collection> removeSegments) + { + if (removeSegments.isEmpty()) + return this; + + Long2ObjectHashMap> newSegments = new Long2ObjectHashMap<>(segments); + for (StaticSegment segment : removeSegments) + { + Segment oldValue = newSegments.remove(segment.descriptor.timestamp); + if (oldValue != null) + Invariants.require(oldValue.isStatic()); + } + return new Segments<>(newSegments); + } + Segments withCompletedSegment(ActiveSegment activeSegment, StaticSegment staticSegment) { Invariants.requireArgument(activeSegment.descriptor.equals(staticSegment.descriptor)); @@ -192,6 +208,13 @@ void selectStatic(Collection> into) into.add(segment.asStatic()); } + void selectStatic(Predicate> filter, Collection> into) + { + for (Segment segment : segments.values()) + if (segment.isStatic() && filter.test(segment.asStatic())) + into.add(segment.asStatic()); + } + /** * Select segments that could potentially have an entry with the specified ids and * attempt to grab references to them all. @@ -236,6 +259,15 @@ Segments select(Predicate> test) return new Segments<>(selectedSegments); } + int count(Predicate> filter) + { + int count = 0; + for (Segment segment : segments.values()) + if (filter.test(segment)) + count++; + return count; + } + static class ReferencedSegments extends Segments implements AutoCloseable { private final Refs> refs; diff --git a/src/java/org/apache/cassandra/journal/StaticSegment.java b/src/java/org/apache/cassandra/journal/StaticSegment.java index 0dfbaaf83831..84e3fda5b9c2 100644 --- a/src/java/org/apache/cassandra/journal/StaticSegment.java +++ b/src/java/org/apache/cassandra/journal/StaticSegment.java @@ -52,16 +52,19 @@ public final class StaticSegment extends Segment private final Ref> selfRef; private final OnDiskIndex index; + private final KeyStats.Static keyStats; private StaticSegment(Descriptor descriptor, FileChannel channel, MappedByteBuffer buffer, OnDiskIndex index, Metadata metadata, + KeyStats.Static keyStats, KeySupport keySupport) { super(descriptor, metadata, keySupport); this.index = index; + this.keyStats = keyStats; this.channel = channel; this.fsyncLimit = metadata.fsyncLimit(); @@ -76,12 +79,12 @@ private StaticSegment(Descriptor descriptor, * @param descriptors descriptors of the segments to load * @return list of the loaded segments */ - static List> open(Collection descriptors, KeySupport keySupport) + static List> open(Collection descriptors, KeySupport keySupport, KeyStats.Factory keyStatsFactory) { List> segments = new ArrayList<>(descriptors.size()); for (Descriptor descriptor : descriptors) { - StaticSegment segment = open(descriptor, keySupport); + StaticSegment segment = open(descriptor, keySupport, keyStatsFactory); segments.add(segment); } @@ -95,7 +98,7 @@ static List> open(Collection descriptors, KeySu * @return the loaded segment */ @SuppressWarnings({ "resource", "RedundantSuppression" }) - static StaticSegment open(Descriptor descriptor, KeySupport keySupport) + static StaticSegment open(Descriptor descriptor, KeySupport keySupport, KeyStats.Factory keyStatsFactory) { if (!Component.DATA.existsFor(descriptor)) throw new IllegalArgumentException("Data file for segment " + descriptor + " doesn't exist"); @@ -135,9 +138,13 @@ static StaticSegment open(Descriptor descriptor, KeySupport keyS if (index == null) index = OnDiskIndex.rebuildAndPersist(descriptor, keySupport, metadata.fsyncLimit()); + KeyStats.Static keyStats = keyStatsFactory.load(descriptor); + if (keyStats == null) + keyStats = keyStatsFactory.rebuildAndPersist(descriptor, keySupport, metadata.fsyncLimit()); + try { - return internalOpen(descriptor, index, metadata, keySupport); + return internalOpen(descriptor, index, metadata, keyStats, keySupport); } catch (IOException e) { @@ -146,13 +153,13 @@ static StaticSegment open(Descriptor descriptor, KeySupport keyS } private static StaticSegment internalOpen( - Descriptor descriptor, OnDiskIndex index, Metadata metadata, KeySupport keySupport) + Descriptor descriptor, OnDiskIndex index, Metadata metadata, KeyStats.Static keyStats, KeySupport keySupport) throws IOException { File file = descriptor.fileFor(Component.DATA); FileChannel channel = FileChannel.open(file.toPath(), StandardOpenOption.READ); MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_ONLY, 0, channel.size()); - return new StaticSegment<>(descriptor, channel, buffer, index, metadata, keySupport); + return new StaticSegment<>(descriptor, channel, buffer, index, metadata, keyStats, keySupport); } public void close(Journal journal) @@ -242,6 +249,12 @@ OnDiskIndex index() return index; } + @Override + public KeyStats.Static keyStats() + { + return keyStats; + } + public int entryCount() { return index.entryCount(); diff --git a/src/java/org/apache/cassandra/journal/ValueSerializer.java b/src/java/org/apache/cassandra/journal/ValueSerializer.java index 69690d39b28a..b031ffff14b3 100644 --- a/src/java/org/apache/cassandra/journal/ValueSerializer.java +++ b/src/java/org/apache/cassandra/journal/ValueSerializer.java @@ -24,6 +24,27 @@ public interface ValueSerializer { + ValueSerializer NONE = new ValueSerializer<>() + { + @Override + public void serialize(Object key, Object value, DataOutputPlus out, int userVersion) + { + throw new UnsupportedOperationException(); + } + + @Override + public Object deserialize(Object key, DataInputPlus in, int userVersion) + { + throw new UnsupportedOperationException(); + } + }; + + static ValueSerializer none() + { + //noinspection unchecked + return (ValueSerializer) NONE; + } + void serialize(K key, V value, DataOutputPlus out, int userVersion) throws IOException; /** diff --git a/src/java/org/apache/cassandra/replication/CoordinatorLog.java b/src/java/org/apache/cassandra/replication/CoordinatorLog.java index cebfcb3ee23e..6f4a4aed4cd9 100644 --- a/src/java/org/apache/cassandra/replication/CoordinatorLog.java +++ b/src/java/org/apache/cassandra/replication/CoordinatorLog.java @@ -401,6 +401,11 @@ void collectRemotelyMissingMutations(Offsets localOffsets, IntArrayList remoteNo } } + void collectDurablyReconciledOffsets(Log2OffsetsMap.Mutable into) + { + into.add(reconciledPersistedOffsets); + } + boolean isDurablyReconciled(CoordinatorLogOffsets logOffsets) { lock.readLock().lock(); diff --git a/src/java/org/apache/cassandra/replication/CoordinatorLogId.java b/src/java/org/apache/cassandra/replication/CoordinatorLogId.java index 101d95db9e4e..2f87f0ecf477 100644 --- a/src/java/org/apache/cassandra/replication/CoordinatorLogId.java +++ b/src/java/org/apache/cassandra/replication/CoordinatorLogId.java @@ -71,7 +71,7 @@ public long asLong() @VisibleForTesting public static long asLong(int hostId, int hostLogId) { - return ((long) hostId << 32) | hostLogId; + return ((long) hostId << 32) | (hostLogId & 0xFFFFFFFFL); } static int hostId(long coordinatorLogId) diff --git a/src/java/org/apache/cassandra/replication/Log2OffsetsMap.java b/src/java/org/apache/cassandra/replication/Log2OffsetsMap.java index 8e3b2e1bc1e9..cd59e620507e 100644 --- a/src/java/org/apache/cassandra/replication/Log2OffsetsMap.java +++ b/src/java/org/apache/cassandra/replication/Log2OffsetsMap.java @@ -56,10 +56,15 @@ public int idCount() return count; } + public boolean contains(ShortMutationId id) + { + Offsets offsets = asMap().get(id.logId()); + return offsets != null && offsets.contains(id.offset()); + } + @Override public String toString() { - StringBuilder builder = new StringBuilder("Log2OffsetsMap{"); boolean isFirst = true; for (Map.Entry entry : asMap().entrySet()) @@ -110,10 +115,11 @@ protected T create(long logId) return create(new CoordinatorLogId(logId)); } - public void add(ShortMutationId id) + public AbstractMutable add(ShortMutationId id) { T offsets = offsetMap.computeIfAbsent(id.logId(), this::create); offsets.add(id.offset()); + return this; } public void add(Offsets offsets) diff --git a/src/java/org/apache/cassandra/replication/MutationJournal.java b/src/java/org/apache/cassandra/replication/MutationJournal.java index e56cc6c9de29..f01f00723d44 100644 --- a/src/java/org/apache/cassandra/replication/MutationJournal.java +++ b/src/java/org/apache/cassandra/replication/MutationJournal.java @@ -24,19 +24,16 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; -import java.util.zip.Checksum; +import java.util.zip.CRC32; + import javax.annotation.Nullable; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import org.apache.cassandra.journal.*; - -import org.cliffc.high_scale_lib.NonBlockingHashMapLong; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import accord.utils.Invariants; +import org.agrona.collections.Long2LongHashMap; +import org.agrona.collections.Long2ObjectHashMap; import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.Keyspace; @@ -47,11 +44,15 @@ import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.io.util.File; +import org.apache.cassandra.io.util.FileInputStreamPlus; +import org.apache.cassandra.io.util.FileOutputStreamPlus; +import org.apache.cassandra.journal.*; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.schema.Schema; import org.apache.cassandra.schema.TableId; -import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.Crc; import org.apache.cassandra.utils.concurrent.Semaphore; +import org.jctools.maps.NonBlockingHashMapLong; import static org.apache.cassandra.utils.FBUtilities.getAvailableProcessors; @@ -59,7 +60,6 @@ public class MutationJournal { public static final MutationJournal instance = new MutationJournal(); - private static final Logger log = LoggerFactory.getLogger(MutationJournal.class); private final Journal journal; private final Map segmentStateTrackers; @@ -98,17 +98,25 @@ private MutationJournal() @VisibleForTesting MutationJournal(File directory, Params params) { - journal = new Journal<>("MutationJournal", directory, params, MutationIdSupport.INSTANCE, MutationSerializer.INSTANCE, SegmentCompactor.noop()) { - @Override - protected void closeActiveSegmentAndOpenAsStatic(ActiveSegment activeSegment, Runnable onDone) - { - super.closeActiveSegmentAndOpenAsStatic(activeSegment, - () -> { - maybeCleanupStaticSegment(Invariants.nonNull(getSegment(activeSegment.id()))); - if (onDone != null) onDone.run(); - }); - } - }; + journal = + new Journal<>("MutationJournal", + directory, + params, + MutationIdSupport.INSTANCE, + MutationSerializer.INSTANCE, + OffsetRangesFactory.INSTANCE, + SegmentCompactor.noop()) + { + // TODO (expected): a cleaner way to override it; pass a Callbacks object with sanctioned callbacks? + @Override + protected void closeActiveSegmentAndOpenAsStatic(ActiveSegment activeSegment, Runnable onDone) + { + super.closeActiveSegmentAndOpenAsStatic(activeSegment, () -> { + maybeCleanupStaticSegment(Invariants.nonNull(getSegment(activeSegment.id()))); + if (onDone != null) onDone.run(); + }); + } + }; segmentStateTrackers = new NonBlockingHashMapLong<>(); } @@ -291,16 +299,12 @@ public void replay(DeserializedRecordConsumer replayO // TODO: respect SystemKeyspace.getTruncatedPosition(cfs.metadata.id); replayParallelism.acquireThrowUncheckedOnInterrupt(1); Stage.MUTATION.submit(() -> journal.readLast(key, lastSegment, replayOne)) - .addCallback(new BiConsumer() - { - @Override - public void accept(Object o, Throwable fail) - { - if (fail != null && !journal.handleError("Could not replay mutation " + key, fail)) - abort.set(true); - replayParallelism.release(1); - } - }); + .addCallback((BiConsumer) (o, fail) -> + { + if (fail != null && !journal.handleError("Could not replay mutation " + key, fail)) + abort.set(true); + replayParallelism.release(1); + }); } // Wait for all mutations to be applied before returning @@ -309,9 +313,12 @@ public void accept(Object o, Throwable fail) } @VisibleForTesting - public void closeCurrentSegmentForTestingIfNonEmpty() + public int dropReconciledSegments(Log2OffsetsMap reconciledOffsets) { - journal.closeCurrentSegmentForTestingIfNonEmpty(); + return journal.dropStaticSegments((segment) -> { + StaticOffsetRanges ranges = (StaticOffsetRanges) segment.keyStats(); + return ranges.isFullyCovered(reconciledOffsets) && !segment.metadata().needsReplay(); + }); } public void readAll(RecordConsumer consumer) @@ -429,10 +436,10 @@ public ShortMutationId deserialize(ByteBuffer buffer, int userVersion) } @Override - public void updateChecksum(Checksum crc, ShortMutationId id, int userVersion) + public void updateChecksum(CRC32 crc, ShortMutationId id, int userVersion) { - FBUtilities.updateChecksumLong(crc, id.logId()); - FBUtilities.updateChecksumInt(crc, id.offset()); + Crc.updateWithLong(crc, id.logId()); + Crc.updateWithInt(crc, id.offset()); } @Override @@ -453,6 +460,7 @@ public int compare(ShortMutationId id1, ShortMutationId id2) public static class MutationSerializer implements ValueSerializer { public static MutationSerializer INSTANCE = new MutationSerializer(); + @Override public void serialize(ShortMutationId id, Mutation mutation, DataOutputPlus out, int userVersion) throws IOException { @@ -467,4 +475,258 @@ public Mutation deserialize(ShortMutationId id, DataInputPlus in, int userVersio return Mutation.serializer.deserialize(in, userVersion); } } + + /* + * KeyStats component to track per log min and max offset in a segment + */ + + static abstract class OffsetRanges implements KeyStats + { + @Override + public abstract boolean mayContain(ShortMutationId id); + + protected static boolean mayContain(long range, ShortMutationId id) + { + return id.offset() >= minOffset(range) && id.offset() <= maxOffset(range); + } + + protected static int minOffset(long range) + { + return (int) (range >>> 32); + } + + protected static int maxOffset(long range) + { + return (int) range; + } + + protected static long range(int minOffset, int maxOffset) + { + return ((long) minOffset << 32) | (maxOffset & 0xFFFFFFFFL); + } + + abstract Map asMap(); + + @Override + public String toString() + { + StringBuilder builder = new StringBuilder(getClass().getSimpleName()); + builder.append('{'); + for (Map.Entry entry : asMap().entrySet()) + { + CoordinatorLogId logId = new CoordinatorLogId(entry.getKey()); + long range = entry.getValue(); + int min = minOffset(range); + int max = maxOffset(range); + builder.append(logId) + .append("->") + .append('[') + .append(min) + .append(", ") + .append(max) + .append(']') + .append(','); + } + return builder.append('}').toString(); + } + } + + // TODO (consider): an off-heap version + static class ActiveOffsetRanges extends OffsetRanges implements KeyStats.Active + { + private final NonBlockingHashMapLong ranges; + + ActiveOffsetRanges() + { + ranges = new NonBlockingHashMapLong<>(); + } + + @Override + protected Map asMap() + { + return ranges; + } + + @Override + public boolean mayContain(ShortMutationId id) + { + Long range = ranges.get(id.logId()); + return range != null && mayContain(range, id); + } + + @Override + @SuppressWarnings("WrapperTypeMayBePrimitive") + public void update(ShortMutationId id) + { + Long prev, next; + do + { + prev = ranges.get(id.logId()); + int min = prev == null ? id.offset() : Math.min(minOffset(prev), id.offset()); + int max = prev == null ? id.offset() : Math.max(maxOffset(prev), id.offset()); + next = range(min, max); + } + while (!compareAndSet(id.logId(), prev, next)); + } + + // NonBlockingHashMapLong doesn't expose putIfMatch() directly, so we need to have this logic + private boolean compareAndSet(long logId, Long prevValue, Long nextValue) + { + return prevValue == null + ? ranges.putIfAbsent(logId, nextValue) == null + : ranges.replace(logId, prevValue, nextValue); + } + + @Override + public void persist(Descriptor descriptor) + { + File tmpFile = descriptor.tmpFileFor(Component.KEYSTATS); + try (FileOutputStreamPlus out = new FileOutputStreamPlus(tmpFile)) + { + write(out); + + out.flush(); + out.sync(); + } + catch (IOException e) + { + throw new JournalWriteError(descriptor, tmpFile, e); + } + tmpFile.move(descriptor.fileFor(Component.KEYSTATS)); + } + + private void write(DataOutputPlus out) throws IOException + { + CRC32 crc = Crc.crc32(); + int count = ranges.size(); + out.writeInt(count); + Crc.updateWithInt(crc, count); + out.writeInt((int) crc.getValue()); + for (Map.Entry entry : ranges.entrySet()) + { + long logId = entry.getKey(); + long range = entry.getValue(); + out.writeLong(logId); + out.writeLong(range); + Crc.updateWithLong(crc, logId); + Crc.updateWithLong(crc, range); + } + out.writeInt((int) crc.getValue()); + } + } + + static class StaticOffsetRanges extends OffsetRanges implements KeyStats.Static + { + private static final long NO_VALUE = Long.MIN_VALUE; + + private final Long2LongHashMap ranges; + + StaticOffsetRanges(Long2LongHashMap ranges) + { + this.ranges = ranges; + } + + @Override + protected Map asMap() + { + return ranges; + } + + @Override + public boolean mayContain(ShortMutationId id) + { + long range = ranges.get(id.logId()); + return range != NO_VALUE && mayContain(range, id); + } + + static StaticOffsetRanges read(DataInputPlus in) throws IOException + { + CRC32 crc = Crc.crc32(); + int count = in.readInt(); + Crc.updateWithInt(crc, count); + Crc.validate(crc, in.readInt()); + Long2LongHashMap ranges = new Long2LongHashMap(count, 0.65f, NO_VALUE, false); + for (int i = 0; i < count; i++) + { + long logId = in.readLong(); + long range = in.readLong(); + Crc.updateWithLong(crc, logId); + Crc.updateWithLong(crc, range); + ranges.put(logId, range); + } + Crc.validate(crc, in.readInt()); + return new StaticOffsetRanges(ranges); + } + + /** + * @return whether all keys in the segment are fully covered by the specified (durably reconciled) offsets map + */ + boolean isFullyCovered(Log2OffsetsMap durablyReconciled) + { + Long2ObjectHashMap reconciledMap = ((Log2OffsetsMap) durablyReconciled).asMap(); + for (Long2LongHashMap.EntryIterator iter = ranges.entrySet().iterator(); iter.hasNext();) + { + iter.next(); + + long logId = iter.getLongKey(); + long range = iter.getLongValue(); + int min = minOffset(range); + int max = maxOffset(range); + + Offsets offsets = reconciledMap.get(logId); + if (offsets == null) + return false; + if (!offsets.containsRange(min, max)) + return false; + } + return true; + } + } + + static final class OffsetRangesFactory implements KeyStats.Factory + { + static final OffsetRangesFactory INSTANCE = new OffsetRangesFactory(); + + @Override + public ActiveOffsetRanges create() + { + return new ActiveOffsetRanges(); + } + + @Override + public StaticOffsetRanges load(Descriptor descriptor) + { + File file = descriptor.fileFor(Component.KEYSTATS); + try (FileInputStreamPlus in = new FileInputStreamPlus(file)) + { + return StaticOffsetRanges.read(in); + } + catch (IOException e) + { + throw new JournalReadError(descriptor, file, e); + } + } + } + + /* + * Test helpers + */ + + @VisibleForTesting + public void closeCurrentSegmentForTestingIfNonEmpty() + { + journal.closeCurrentSegmentForTestingIfNonEmpty(); + } + + @VisibleForTesting + void clearNeedsReplayForTesting() + { + journal.clearNeedsReplayForTesting(); + } + + @VisibleForTesting + public int countStaticSegmentsForTesting() + { + return journal.countStaticSegmentsForTesting(); + } } diff --git a/src/java/org/apache/cassandra/replication/MutationTrackingService.java b/src/java/org/apache/cassandra/replication/MutationTrackingService.java index 535a4100f2c5..4009dbd39a88 100644 --- a/src/java/org/apache/cassandra/replication/MutationTrackingService.java +++ b/src/java/org/apache/cassandra/replication/MutationTrackingService.java @@ -87,7 +87,7 @@ public class MutationTrackingService /** * Split ranges into this many shards. - * + *

* TODO (expected): ability to rebalance / change this constant */ private static final int SHARD_MULTIPLIER = 8; @@ -547,7 +547,6 @@ private static boolean shardUpdateNeeded(Map current, @N return false; } - private static ConcurrentHashMap applyUpdatedMetadata(Map keyspaceShardsMap, @Nullable ClusterMetadata prev, ClusterMetadata next, LongSupplier logIdProvider, BiConsumer onNewLog) { Preconditions.checkNotNull(next); @@ -619,6 +618,23 @@ private void onNewLog(Shard shard, CoordinatorLog log) } } + private void truncateMutationJournal() + { + Log2OffsetsMap.Mutable reconciledOffsets = new Log2OffsetsMap.Mutable(); + collectDurablyReconciledOffsets(reconciledOffsets); + MutationJournal.instance.dropReconciledSegments(reconciledOffsets); + } + + /** + * Collect every log's durably reconciled offsets. Every mutation covered + * by these offsets can be compacted away by the journal, assuming that all + * relevant memtables had been flushed to disk. + */ + private void collectDurablyReconciledOffsets(Log2OffsetsMap.Mutable into) + { + forEachKeyspace(keyspace -> keyspace.collectDurablyReconciledOffsets(into)); + } + public static class KeyspaceShards { private enum UpdateDecision @@ -847,6 +863,11 @@ void recordFullyReconciledOffsets(ReconciledKeyspaceOffsets keyspaceOffsets) }); } + void collectDurablyReconciledOffsets(Log2OffsetsMap.Mutable into) + { + forEachShard(shard -> shard.collectDurablyReconciledOffsets(into)); + } + void forEachShard(Consumer consumer) { for (Shard shard : shards.values()) @@ -979,8 +1000,15 @@ void start() @Override public void run() + { + run(true); + } + + private void run(boolean dropSegments) { MutationTrackingService.instance.forEachKeyspace(this::run); + if (dropSegments) + MutationTrackingService.instance.truncateMutationJournal(); } private void run(KeyspaceShards shards) @@ -1000,6 +1028,12 @@ public void persistLogStateForTesting() offsetsPersister.run(); } + @VisibleForTesting + public void persistLogStateForTesting(boolean dropSegments) + { + offsetsPersister.run(dropSegments); + } + @VisibleForTesting public void broadcastOffsetsForTesting() { diff --git a/src/java/org/apache/cassandra/replication/Offsets.java b/src/java/org/apache/cassandra/replication/Offsets.java index dfb4f3f404e3..8724c02d4ddd 100644 --- a/src/java/org/apache/cassandra/replication/Offsets.java +++ b/src/java/org/apache/cassandra/replication/Offsets.java @@ -186,12 +186,29 @@ public boolean contains(int offset) { if (size == 0) return false; - int pos = Arrays.binarySearch(bounds, 0, size, offset); if (pos >= 0) return true; // matches one of the bounds + return (-pos - 1) % 2 != 0; + } + + /** + * @return whether the entire [minOffset, maxOffset] range is contained in these Offsets + */ + public boolean containsRange(int minOffset, int maxOffset) + { + if (size == 0) + return false; + + // find the range the min offset falls under + int pos = Arrays.binarySearch(bounds, 0, size, minOffset); + if (pos < 0 && (-pos - 1) % 2 == 0) // min offset is outside any existing range + return false; + + int end = pos >= 0 + ? (pos % 2 == 0 ? pos + 1 : pos) + : -pos - 1; - pos = -pos - 1; - return (pos - 1) % 2 == 0; // offset falls within bounds of an existing range if the bound to the left is an open one + return maxOffset <= bounds[end]; } public void digest(Digest digest) diff --git a/src/java/org/apache/cassandra/replication/Shard.java b/src/java/org/apache/cassandra/replication/Shard.java index 0717c228178e..7274127fcb74 100644 --- a/src/java/org/apache/cassandra/replication/Shard.java +++ b/src/java/org/apache/cassandra/replication/Shard.java @@ -276,6 +276,11 @@ BroadcastLogOffsets collectReplicatedOffsets(boolean durable) return new BroadcastLogOffsets(keyspace, range, offsets, durable); } + void collectDurablyReconciledOffsets(Log2OffsetsMap.Mutable into) + { + logs.values().forEach(log -> log.collectDurablyReconciledOffsets(into)); + } + private CoordinatorLog getOrCreate(Mutation mutation) { return getOrCreate(mutation.id()); diff --git a/src/java/org/apache/cassandra/service/accord/AccordJournal.java b/src/java/org/apache/cassandra/service/accord/AccordJournal.java index aa568104a2fe..8669586642f4 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordJournal.java +++ b/src/java/org/apache/cassandra/service/accord/AccordJournal.java @@ -78,7 +78,6 @@ import org.apache.cassandra.journal.RecordPointer; import org.apache.cassandra.journal.SegmentCompactor; import org.apache.cassandra.journal.StaticSegment; -import org.apache.cassandra.journal.ValueSerializer; import org.apache.cassandra.service.accord.AccordJournalValueSerializers.FlyweightImage; import org.apache.cassandra.service.accord.AccordJournalValueSerializers.IdentityAccumulator; import org.apache.cassandra.service.accord.JournalKey.JournalKeySupport; @@ -134,23 +133,10 @@ public AccordJournal(Params params) public AccordJournal(Params params, File directory, ColumnFamilyStore cfs) { Version userVersion = Version.fromVersion(params.userVersion()); - this.journal = new Journal<>("AccordJournal", directory, params, JournalKey.SUPPORT, - // In Accord, we are using streaming serialization, i.e. Reader/Writer interfaces instead of materializing objects - new ValueSerializer<>() - { - @Override - public void serialize(JournalKey key, Object value, DataOutputPlus out, int userVersion) - { - throw new UnsupportedOperationException(); - } - - @Override - public Object deserialize(JournalKey key, DataInputPlus in, int userVersion) - { - throw new UnsupportedOperationException(); - } - }, - compactor(cfs, userVersion)); + this.journal = + Journal.builder("AccordJournal", directory, params, JournalKey.SUPPORT) + .segmentCompactor(compactor(cfs, userVersion)) + .build(); this.journalTable = new AccordJournalTable<>(journal, JournalKey.SUPPORT, cfs, userVersion); this.params = params; } diff --git a/src/java/org/apache/cassandra/service/accord/JournalKey.java b/src/java/org/apache/cassandra/service/accord/JournalKey.java index 9bd42ebaf155..4bb3bf19e288 100644 --- a/src/java/org/apache/cassandra/service/accord/JournalKey.java +++ b/src/java/org/apache/cassandra/service/accord/JournalKey.java @@ -21,7 +21,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.Objects; -import java.util.zip.Checksum; +import java.util.zip.CRC32; import accord.local.Node.Id; import accord.primitives.Timestamp; @@ -177,7 +177,7 @@ private TxnId deserializeTxnId(ByteBuffer buffer, int position) } @Override - public void updateChecksum(Checksum crc, JournalKey key, int userVersion) + public void updateChecksum(CRC32 crc, JournalKey key, int userVersion) { byte[] out = AccordJournal.keyCRCBytes.get(); serialize(key, out); diff --git a/src/java/org/apache/cassandra/service/reads/tracked/ReadReconciliations.java b/src/java/org/apache/cassandra/service/reads/tracked/ReadReconciliations.java index 1c57781c00cd..13e1e69d1074 100644 --- a/src/java/org/apache/cassandra/service/reads/tracked/ReadReconciliations.java +++ b/src/java/org/apache/cassandra/service/reads/tracked/ReadReconciliations.java @@ -263,19 +263,19 @@ boolean acceptMutation(ShortMutationId ignoredMutationId) /** * Remote summaries minus data node summary offsets - * + *

* This calculation combines BOTH reconciled and unreconciled mutations reported by other nodes, and * then subtracts mutations reported locally for correctness - * + *

* If we subtracted reconciled ids from the unreconciled ids, we could violate read monotonicity in this scenario: * 1. Read starts locally and doesn't see mutation M. * 2. During reconciliation, mutation M arrives and is marked reconciled, other replicas report mutation M as reconciled * 3. If we filtered out reconciled mutations, this read wouldn't augment with M * 4. A concurrent read could see M in its initial data * 5. This read returns without M - * + *

* Instead, we include all mutations and rely on token range filtering during actual mutation - * retrieval (in PartialTrackedRead.augment) to ensure we only augment with mutations + * retrieval (in PartialTrackedRead.augment()) to ensure we only augment with mutations * relevant to this read's range/key */ private Log2OffsetsMap.Mutable augmentingOffsets() diff --git a/src/java/org/apache/cassandra/service/reads/tracked/TrackedLocalReads.java b/src/java/org/apache/cassandra/service/reads/tracked/TrackedLocalReads.java index 066ebc22e068..fa6c3193b377 100644 --- a/src/java/org/apache/cassandra/service/reads/tracked/TrackedLocalReads.java +++ b/src/java/org/apache/cassandra/service/reads/tracked/TrackedLocalReads.java @@ -25,7 +25,6 @@ import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.db.*; -import org.apache.cassandra.db.filter.ColumnFilter; import org.apache.cassandra.locator.ReplicaPlan; import org.apache.cassandra.locator.ReplicaPlans; import org.apache.cassandra.service.reads.ReadCoordinator; @@ -36,16 +35,14 @@ import org.apache.cassandra.replication.ShortMutationId; import org.apache.cassandra.service.reads.SpeculativeRetryPolicy; import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.transport.Dispatcher; import org.apache.cassandra.utils.concurrent.AsyncPromise; import org.apache.cassandra.utils.concurrent.Future; import org.jctools.maps.NonBlockingHashMap; -import org.apache.cassandra.transport.Dispatcher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.function.Consumer; - /** * Since the read reconciliations don't use 2 way callbacks, maps of active reads and reconciliations * are maintained and expired here. @@ -69,8 +66,7 @@ public AsyncPromise beginRead( ReadCommand command, ConsistencyLevel consistencyLevel, int[] summaryNodes, - Dispatcher.RequestTime requestTime, - Consumer partialReadConsumer) + Dispatcher.RequestTime requestTime) { Keyspace keyspace = Keyspace.open(command.metadata().keyspace); ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(command.metadata().id); @@ -100,7 +96,7 @@ public AsyncPromise beginRead( } // TODO: confirm all summaryNodes are present in the replica plan AsyncPromise promise = new AsyncPromise<>(); - beginReadInternal(readId, command, replicaPlan, summaryNodes, requestTime, partialReadConsumer, promise); + beginReadInternal(readId, command, replicaPlan, summaryNodes, requestTime, promise); return promise; } @@ -111,7 +107,6 @@ private void beginReadInternal( ReplicaPlan.AbstractForRead replicaPlan, int[] summaryNodes, Dispatcher.RequestTime requestTime, - Consumer partialReadConsumer, AsyncPromise promise) { PartialTrackedRead read = null; @@ -137,7 +132,7 @@ private void beginReadInternal( } Coordinator coordinator = - new Coordinator(readId, promise, command.columnFilter(), read, replicaPlan.consistencyLevel(), requestTime); + new Coordinator(readId, promise, read, replicaPlan.consistencyLevel(), requestTime); coordinators.put(readId, coordinator); // TODO (expected): reconsider the approach to tracked mutation metrics @@ -187,7 +182,6 @@ private static class Coordinator { private final TrackedRead.Id readId; private final AsyncPromise promise; - private final ColumnFilter selection; private final PartialTrackedRead read; private final ConsistencyLevel consistencyLevel; private final Dispatcher.RequestTime requestTime; @@ -195,14 +189,12 @@ private static class Coordinator Coordinator( TrackedRead.Id readId, AsyncPromise promise, - ColumnFilter selection, PartialTrackedRead read, ConsistencyLevel consistencyLevel, Dispatcher.RequestTime requestTime) { this.readId = readId; this.promise = promise; - this.selection = selection; this.read = Preconditions.checkNotNull(read); this.consistencyLevel = consistencyLevel; this.requestTime = requestTime; @@ -246,7 +238,6 @@ private void complete() if (followUp != null) { - ReadCommand command = read.command(); followUp.addCallback((newResponse, error) -> { if (error != null) { diff --git a/src/java/org/apache/cassandra/service/reads/tracked/TrackedRead.java b/src/java/org/apache/cassandra/service/reads/tracked/TrackedRead.java index 267670a63ebb..975c2742fc1d 100644 --- a/src/java/org/apache/cassandra/service/reads/tracked/TrackedRead.java +++ b/src/java/org/apache/cassandra/service/reads/tracked/TrackedRead.java @@ -36,7 +36,6 @@ import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.locator.*; import org.apache.cassandra.net.*; -import org.apache.cassandra.replication.MutationSummary; import org.apache.cassandra.replication.MutationTrackingService; import org.apache.cassandra.service.reads.ReadCoordinator; import org.apache.cassandra.service.reads.SpeculativeRetryPolicy; @@ -280,7 +279,7 @@ private void start(Dispatcher.RequestTime requestTime, Consumer { AsyncPromise promise = - MutationTrackingService.instance.localReads().beginRead(readId, ClusterMetadata.current(), command, consistencyLevel, summaryNodes, requestTime, partialReadConsumer); + MutationTrackingService.instance.localReads().beginRead(readId, ClusterMetadata.current(), command, consistencyLevel, summaryNodes, requestTime); promise.addCallback((response, error) -> { if (error != null) { @@ -451,7 +450,7 @@ public void executeLocally(Message message, ClusterMetadata m AsyncPromise promise = MutationTrackingService.instance .localReads() - .beginRead(readId, metadata, command, consistencyLevel, summaryNodes, requestTime, null); + .beginRead(readId, metadata, command, consistencyLevel, summaryNodes, requestTime); promise.addCallback((response, error) -> { if (error != null) { @@ -516,11 +515,6 @@ public void executeLocally(Message message, ClusterMetadata m ReadReconciliations.instance.handleSummaryRequest((SummaryRequest) message.payload); } - public TrackedSummaryResponse makeResponse(MutationSummary summary) - { - return new TrackedSummaryResponse(readId, summary, dataNode, summaryNodes); - } - public static final IVersionedSerializer serializer = new IVersionedSerializer<>() { @Override diff --git a/src/java/org/apache/cassandra/utils/Crc.java b/src/java/org/apache/cassandra/utils/Crc.java index f1a31584f364..63b0055f2bed 100644 --- a/src/java/org/apache/cassandra/utils/Crc.java +++ b/src/java/org/apache/cassandra/utils/Crc.java @@ -83,6 +83,26 @@ public static void updateCrc32(CRC32 crc, ByteBuffer buffer, int start, int end) buffer.position(savePosition); } + public static void updateWithInt(CRC32 crc, int v) + { + crc.update((v >>> 24) & 0xFF); + crc.update((v >>> 16) & 0xFF); + crc.update((v >>> 8) & 0xFF); + crc.update((v >>> 0) & 0xFF); + } + + public static void updateWithLong(CRC32 crc, long v) + { + updateWithInt(crc, (int) (v >>> 32)); + updateWithInt(crc, (int) (v & 0xFFFFFFFFL)); + } + + public static void validate(CRC32 crc, int expectedCRC) throws InvalidCrc + { + if (expectedCRC != (int) crc.getValue()) + throw new InvalidCrc(expectedCRC, (int) crc.getValue()); + } + private static final int CRC24_INIT = 0x875060; /** * Polynomial chosen from https://users.ece.cmu.edu/~koopman/crc/index.html, by Philip Koopman diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java b/src/java/org/apache/cassandra/utils/FBUtilities.java index a4493f30a201..3a3a96285d9d 100644 --- a/src/java/org/apache/cassandra/utils/FBUtilities.java +++ b/src/java/org/apache/cassandra/utils/FBUtilities.java @@ -1184,6 +1184,7 @@ public static void updateChecksumShort(Checksum checksum, short v) checksum.update((v >>> 0) & 0xFF); } + // TODO: migrate users to Crc class public static void updateChecksumInt(Checksum checksum, int v) { checksum.update((v >>> 24) & 0xFF); @@ -1192,6 +1193,7 @@ public static void updateChecksumInt(Checksum checksum, int v) checksum.update((v >>> 0) & 0xFF); } + // TODO: migrate users to Crc class public static void updateChecksumLong(Checksum checksum, long v) { updateChecksumInt(checksum, (int) (v >>> 32)); diff --git a/test/simulator/test/org/apache/cassandra/simulator/test/AccordJournalSimulationTest.java b/test/simulator/test/org/apache/cassandra/simulator/test/AccordJournalSimulationTest.java index 36040e115c1e..3ff5f8e8b037 100644 --- a/test/simulator/test/org/apache/cassandra/simulator/test/AccordJournalSimulationTest.java +++ b/test/simulator/test/org/apache/cassandra/simulator/test/AccordJournalSimulationTest.java @@ -21,7 +21,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; -import java.util.zip.Checksum; +import java.util.zip.CRC32; import com.google.common.collect.ImmutableMap; import com.google.common.jimfs.Jimfs; @@ -47,7 +47,6 @@ import org.apache.cassandra.journal.Journal; import org.apache.cassandra.journal.KeySupport; import org.apache.cassandra.journal.RecordPointer; -import org.apache.cassandra.journal.SegmentCompactor; import org.apache.cassandra.journal.ValueSerializer; import org.apache.cassandra.utils.Isolated; import org.apache.cassandra.utils.concurrent.CountDownLatch; @@ -75,14 +74,12 @@ public void simpleRWTest() AccordSpec.JournalSpec spec = new AccordSpec.JournalSpec(); spec.flushPeriod = new DurationSpec.IntSecondsBound(1); - State.journal = new Journal<>("AccordJournal", - new File("/journal"), - spec, - new IdentityKeySerializer(), - new IdentityValueSerializer(), - SegmentCompactor.noop()); + State.journal = + Journal.builder("AccordJournal", new File("/journal"), spec, new IdentityKeySerializer()) + .valueSerializer(new IdentityValueSerializer()) + .build(); }), - () -> check()); + AccordJournalSimulationTest::check); } public static void check() @@ -237,7 +234,7 @@ public String deserialize(ByteBuffer buffer, int position, int userVersion) } @Override - public void updateChecksum(Checksum crc, String key, int userVersion) + public void updateChecksum(CRC32 crc, String key, int userVersion) { crc.update(key.getBytes()); } diff --git a/test/unit/org/apache/cassandra/journal/JournalTest.java b/test/unit/org/apache/cassandra/journal/JournalTest.java index 90b394518cb1..7cdb7edabbfb 100644 --- a/test/unit/org/apache/cassandra/journal/JournalTest.java +++ b/test/unit/org/apache/cassandra/journal/JournalTest.java @@ -52,7 +52,9 @@ public void testSimpleReadWrite() throws IOException directory.deleteRecursiveOnExit(); Journal journal = - new Journal<>("TestJournal", directory, TestParams.ACCORD, TimeUUIDKeySupport.INSTANCE, LongSerializer.INSTANCE, SegmentCompactor.noop()); + Journal.builder("TestJournal", directory, TestParams.ACCORD, TimeUUIDKeySupport.INSTANCE) + .valueSerializer(LongSerializer.INSTANCE) + .build(); journal.start(); @@ -73,7 +75,10 @@ public void testSimpleReadWrite() throws IOException journal.shutdown(); - journal = new Journal<>("TestJournal", directory, TestParams.ACCORD, TimeUUIDKeySupport.INSTANCE, LongSerializer.INSTANCE, SegmentCompactor.noop()); + journal = + Journal.builder("TestJournal", directory, TestParams.ACCORD, TimeUUIDKeySupport.INSTANCE) + .valueSerializer(LongSerializer.INSTANCE) + .build(); journal.start(); assertEquals(1L, (long) journal.readLast(id1)); @@ -91,7 +96,9 @@ public void testReadAll() throws IOException directory.deleteRecursiveOnExit(); Journal journal = - new Journal<>("TestJournalReadAll", directory, TestParams.ACCORD, TimeUUIDKeySupport.INSTANCE, LongSerializer.INSTANCE, SegmentCompactor.noop()); + Journal.builder("TestJournalReadAll", directory, TestParams.ACCORD, TimeUUIDKeySupport.INSTANCE) + .valueSerializer(LongSerializer.INSTANCE) + .build(); journal.start(); @@ -127,11 +134,6 @@ static class LongSerializer implements ValueSerializer { static final LongSerializer INSTANCE = new LongSerializer(); - public int serializedSize(TimeUUID key, Long value, int userVersion) - { - return Long.BYTES; - } - public void serialize(TimeUUID key, Long value, DataOutputPlus out, int userVersion) throws IOException { out.writeLong(value); diff --git a/test/unit/org/apache/cassandra/journal/SegmentTest.java b/test/unit/org/apache/cassandra/journal/SegmentTest.java index 55c54870d8f0..05a0f43f54ff 100644 --- a/test/unit/org/apache/cassandra/journal/SegmentTest.java +++ b/test/unit/org/apache/cassandra/journal/SegmentTest.java @@ -54,7 +54,8 @@ public void testWriteReadActiveSegment() throws IOException Descriptor descriptor = Descriptor.create(directory, System.currentTimeMillis(), 1); - ActiveSegment segment = ActiveSegment.create(descriptor, params(), TimeUUIDKeySupport.INSTANCE); + ActiveSegment segment = + ActiveSegment.create(descriptor, params(), TimeUUIDKeySupport.INSTANCE, KeyStats.Factory.noop()); segment.allocate(record1.remaining()).write(id1, record1); segment.allocate(record2.remaining()).write(id2, record2); @@ -101,7 +102,8 @@ public void testReadClosedSegmentByID() throws IOException Descriptor descriptor = Descriptor.create(directory, System.currentTimeMillis(), 1); - ActiveSegment activeSegment = ActiveSegment.create(descriptor, params(), TimeUUIDKeySupport.INSTANCE); + ActiveSegment activeSegment = + ActiveSegment.create(descriptor, params(), TimeUUIDKeySupport.INSTANCE, KeyStats.Factory.noop()); activeSegment.allocate(record1.remaining()).write(id1, record1); activeSegment.allocate(record2.remaining()).write(id2, record2); @@ -110,7 +112,8 @@ public void testReadClosedSegmentByID() throws IOException activeSegment.close(null); - StaticSegment staticSegment = StaticSegment.open(descriptor, TimeUUIDKeySupport.INSTANCE); + StaticSegment staticSegment = + StaticSegment.open(descriptor, TimeUUIDKeySupport.INSTANCE, KeyStats.Factory.noop()); // read all 4 entries by id and compare with originals EntrySerializer.EntryHolder holder = new EntrySerializer.EntryHolder<>(); @@ -150,7 +153,8 @@ public void testReadClosedSegmentSequentially() throws IOException Descriptor descriptor = Descriptor.create(directory, System.currentTimeMillis(), 1); - ActiveSegment activeSegment = ActiveSegment.create(descriptor, params(), TimeUUIDKeySupport.INSTANCE); + ActiveSegment activeSegment = + ActiveSegment.create(descriptor, params(), TimeUUIDKeySupport.INSTANCE, KeyStats.Factory.noop()); activeSegment.allocate(record1.remaining()).write(id1, record1); activeSegment.allocate(record2.remaining()).write(id2, record2); diff --git a/test/unit/org/apache/cassandra/journal/SegmentsTest.java b/test/unit/org/apache/cassandra/journal/SegmentsTest.java index ecacb5e197d1..2e029f5edda0 100644 --- a/test/unit/org/apache/cassandra/journal/SegmentsTest.java +++ b/test/unit/org/apache/cassandra/journal/SegmentsTest.java @@ -35,7 +35,7 @@ public class SegmentsTest @Test public void testSelect() { - withRandom(0l, rng -> { + withRandom(0L, rng -> { // Create mock segments with different timestamps java.io.File file = File.createTempFile("segments", "test"); List> segmentList = new ArrayList<>(); @@ -108,6 +108,13 @@ public boolean isActive() } @Override Index index() { throw new UnsupportedOperationException(); } + + @Override + public KeyStats keyStats() + { + return KeyStats.noop(); + } + @Override boolean isFlushed(long position) { throw new UnsupportedOperationException(); } @Override public void persistMetadata() { throw new UnsupportedOperationException(); } @Override boolean read(int offset, int size, EntrySerializer.EntryHolder into) { throw new UnsupportedOperationException(); } diff --git a/test/unit/org/apache/cassandra/journal/TimeUUIDKeySupport.java b/test/unit/org/apache/cassandra/journal/TimeUUIDKeySupport.java index 5694a29e7ab4..c75197511ecf 100644 --- a/test/unit/org/apache/cassandra/journal/TimeUUIDKeySupport.java +++ b/test/unit/org/apache/cassandra/journal/TimeUUIDKeySupport.java @@ -19,14 +19,13 @@ import java.io.IOException; import java.nio.ByteBuffer; -import java.util.zip.Checksum; +import java.util.zip.CRC32; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.utils.Crc; import org.apache.cassandra.utils.TimeUUID; -import static org.apache.cassandra.utils.FBUtilities.updateChecksumLong; - class TimeUUIDKeySupport implements KeySupport { static final TimeUUIDKeySupport INSTANCE = new TimeUUIDKeySupport(); @@ -76,10 +75,10 @@ public TimeUUID deserialize(ByteBuffer buffer, int userVersion) } @Override - public void updateChecksum(Checksum crc, TimeUUID key, int userVersion) + public void updateChecksum(CRC32 crc, TimeUUID key, int userVersion) { - updateChecksumLong(crc, key.uuidTimestamp()); - updateChecksumLong(crc, key.lsb()); + Crc.updateWithLong(crc, key.uuidTimestamp()); + Crc.updateWithLong(crc, key.lsb()); } @Override diff --git a/test/unit/org/apache/cassandra/replication/MutationJournalTest.java b/test/unit/org/apache/cassandra/replication/MutationJournalTest.java index ba741799aa1c..72017bc8a1e0 100644 --- a/test/unit/org/apache/cassandra/replication/MutationJournalTest.java +++ b/test/unit/org/apache/cassandra/replication/MutationJournalTest.java @@ -33,13 +33,19 @@ import org.apache.cassandra.db.marshal.UTF8Type; import org.apache.cassandra.io.util.DataOutputBuffer; import org.apache.cassandra.io.util.File; +import org.apache.cassandra.journal.Descriptor; import org.apache.cassandra.journal.TestParams; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.schema.KeyspaceParams; import org.apache.cassandra.schema.Schema; import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.replication.MutationJournal.ActiveOffsetRanges; +import org.apache.cassandra.replication.MutationJournal.StaticOffsetRanges; +import org.apache.cassandra.replication.MutationJournal.OffsetRangesFactory; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; /** * Tests to sanity-check the integration points with Journal @@ -51,6 +57,7 @@ public class MutationJournalTest private static final String TABLE = "mjtt"; private static MutationJournal journal; + private static File directory; @BeforeClass public static void setUp() throws IOException @@ -63,7 +70,7 @@ public static void setUp() throws IOException .addRegularColumn("value", UTF8Type.instance) .build()); - File directory = new File(Files.createTempDirectory("mutation-journal-test-simple")); + directory = new File(Files.createTempDirectory("mutation-journal-test-simple")); directory.deleteRecursiveOnExit(); journal = new MutationJournal(directory, TestParams.MUTATION_JOURNAL); @@ -79,13 +86,8 @@ public static void tearDown() @Test public void testWriteOneReadOne() { - Mutation expected = - new RowUpdateBuilder(Schema.instance.getTableMetadata(KEYSPACE, TABLE), 0, "key") - .clustering("ck") - .add("value", "value") - .build(); - - ShortMutationId id = new ShortMutationId(100L, 0); + ShortMutationId id = id(100L, 0); + Mutation expected = mutation("key", "ck", "value"); journal.write(id, expected); // regular read @@ -103,22 +105,14 @@ public void testWriteOneReadOne() @Test public void testWriteManyReadMany() { - Mutation expected1 = - new RowUpdateBuilder(Schema.instance.getTableMetadata(KEYSPACE, TABLE), 0, "key1") - .clustering("ck1") - .add("value", "value1") - .build(); - Mutation expected2 = - new RowUpdateBuilder(Schema.instance.getTableMetadata(KEYSPACE, TABLE), 0, "key2") - .clustering("ck2") - .add("value", "value2") - .build(); - List expected = List.of(expected1, expected2); - - ShortMutationId id1 = new ShortMutationId(100L, 1); - ShortMutationId id2 = new ShortMutationId(100L, 2); + ShortMutationId id1 = id(100L, 1); + ShortMutationId id2 = id(100L, 2); List ids = List.of(id1, id2); + Mutation expected1 = mutation("key1", "ck1", "value1"); + Mutation expected2 = mutation("key2", "ck2", "value2"); + List expected = List.of(expected1, expected2); + journal.write(id1, expected1); journal.write(id2, expected2); @@ -127,6 +121,149 @@ public void testWriteManyReadMany() assertMutationsEqual(expected, actual); } + @Test + public void testActiveOffsetRanges() + { + { + ActiveOffsetRanges ranges = new ActiveOffsetRanges(); + assertFalse(ranges.mayContain(id(0, 0))); + } + + { + ActiveOffsetRanges ranges = new ActiveOffsetRanges(); + ranges.update(id(0, 1)); + ranges.update(id(0, 9)); + assertFalse(ranges.mayContain(id(0, 0))); + assertFalse(ranges.mayContain(id(0, 10))); + for (int i = 1; i < 10; i++) + assertTrue(ranges.mayContain(id(0, i))); + } + } + + @Test + public void testStaticOffsetRanges() + { + Descriptor descriptor = Descriptor.create(directory, 0, 1); + + ActiveOffsetRanges active = new ActiveOffsetRanges(); + for (int l = 1; l < 11; l++) + { + for (int o = 5; o > 0 ; o--) active.update(id(l, o)); + for (int o = 6; o < 11; o++) active.update(id(l, o)); + } + + active.persist(descriptor); + StaticOffsetRanges loaded = OffsetRangesFactory.INSTANCE.load(descriptor); + assertEquals(active.asMap(), loaded.asMap()); + + // absent log ids + for (int o = 0; o < 11; o++) + { + assertFalse(active.mayContain(id(0, o))); + assertFalse(loaded.mayContain(id(0, o))); + assertFalse(active.mayContain(id(11, o))); + assertFalse(loaded.mayContain(id(11, o))); + } + + // present log ids + for (int l = 1; l < 11; l++) + { + assertFalse(active.mayContain(id(l, 0))); + assertFalse(loaded.mayContain(id(l, 0))); + assertFalse(active.mayContain(id(l, 11))); + assertFalse(loaded.mayContain(id(l, 11))); + for (int o = 1; o < 11; o++) + { + assertTrue(active.mayContain(id(l, o))); + assertTrue(loaded.mayContain(id(l, o))); + } + } + } + + @Test + public void testDropReconcileSegments() + { + ShortMutationId id1 = id(100L, 0); + ShortMutationId id2 = id(100L, 1); + ShortMutationId id3 = id(200L, 2); + ShortMutationId id4 = id(200L, 3); + + Mutation mutation1 = mutation("key1", "ck1", "value1"); + Mutation mutation2 = mutation("key2", "ck2", "value2"); + Mutation mutation3 = mutation("key3", "ck3", "value3"); + Mutation mutation4 = mutation("key4", "ck4", "value4"); + + // write two mutations to the first segment and flush it to make static + journal.write(id1, mutation1); + journal.write(id2, mutation2); + journal.closeCurrentSegmentForTestingIfNonEmpty(); + + // write two mutations to the second segment and flush it to make static + journal.write(id3, mutation3); + journal.write(id4, mutation4); + journal.closeCurrentSegmentForTestingIfNonEmpty(); + + { + // call dropReconciledSegments() with a log2offsets map that covers both segments fully + // *BUT* with the segments still marked as needing replay nothing should be dropped + Log2OffsetsMap.Immutable.Builder builder = new Log2OffsetsMap.Immutable.Builder(); + builder.add(id1); + builder.add(id2); + builder.add(id3); + builder.add(id4); + assertEquals(0, journal.dropReconciledSegments(builder.build())); + // confirm that no static segments have been dropped + assertEquals(2, journal.countStaticSegmentsForTesting()); + } + + // mark both segments as not needing replay + journal.clearNeedsReplayForTesting(); + + { + // call dropReconciledSegments() with a log2offsets map that doesn't cover any segments fully + Log2OffsetsMap.Immutable.Builder builder = new Log2OffsetsMap.Immutable.Builder(); + builder.add(id1); + assertEquals(0, journal.dropReconciledSegments(builder.build())); + // confirm that no static segments got dropped + assertEquals(2, journal.countStaticSegmentsForTesting()); + } + + { + // call dropReconciledSegments() with a log2offsets map that covers only the first segment fully + Log2OffsetsMap.Immutable.Builder builder = new Log2OffsetsMap.Immutable.Builder(); + builder.add(id1); + builder.add(id2); + assertEquals(1, journal.dropReconciledSegments(builder.build())); + // confirm that only one static segment got dropped + assertEquals(1, journal.countStaticSegmentsForTesting()); + } + + { + // call dropReconciledSegments() with a log2offsets map that covers both segments fully + Log2OffsetsMap.Immutable.Builder builder = new Log2OffsetsMap.Immutable.Builder(); + builder.add(id1); + builder.add(id2); + builder.add(id3); + builder.add(id4); + assertEquals(1, journal.dropReconciledSegments(builder.build())); + // confirm that all static segments have now been dropped + assertEquals(0, journal.countStaticSegmentsForTesting()); + } + } + + private ShortMutationId id(long logId, int offset) + { + return new ShortMutationId(logId, offset); + } + + private Mutation mutation(String pk, String ck, String column) + { + return new RowUpdateBuilder(Schema.instance.getTableMetadata(KEYSPACE, TABLE), 0, pk) + .clustering(ck) + .add("value", column) + .build(); + } + public static void assertMutationEquals(Mutation expected, Mutation actual) { if (!serialize(expected).equals(serialize(actual))) diff --git a/test/unit/org/apache/cassandra/replication/OffsetsTest.java b/test/unit/org/apache/cassandra/replication/OffsetsTest.java index b0d5edda0f4c..1b39ff000e0b 100644 --- a/test/unit/org/apache/cassandra/replication/OffsetsTest.java +++ b/test/unit/org/apache/cassandra/replication/OffsetsTest.java @@ -861,9 +861,51 @@ public void testRemoveWithExactSizedArray() } } + @Test public void asListFromListRoundTripTest() { for (Offsets.Mutable offsets : new Offsets.Mutable[] { offsets(), offsets(1, 2), offsets(1, 3, 7, 9) }) assertOffsetsEqual(offsets, Offsets.fromList(LOG_ID, offsets.asList())); } + + @Test + public void testContainsRange() + { + { + Offsets.Mutable offsets = offsets(); + assertFalse(offsets.containsRange(0, 1)); + } + + { + Offsets.Mutable offsets = offsets(2, 4); + + assertTrue(offsets.containsRange(2, 4)); + assertTrue(offsets.containsRange(3, 4)); + assertTrue(offsets.containsRange(2, 3)); + + assertFalse(offsets.containsRange(1, 1)); + assertFalse(offsets.containsRange(1, 2)); + assertFalse(offsets.containsRange(1, 3)); + assertFalse(offsets.containsRange(1, 4)); + + assertFalse(offsets.containsRange(2, 5)); + assertFalse(offsets.containsRange(3, 5)); + assertFalse(offsets.containsRange(4, 5)); + assertFalse(offsets.containsRange(5, 5)); + } + + { + Offsets.Mutable offsets = offsets(2, 4, 6, 8); + + assertTrue(offsets.containsRange(2, 4)); + assertTrue(offsets.containsRange(6, 8)); + + assertFalse(offsets.containsRange(0, 2)); + assertFalse(offsets.containsRange(3, 5)); + assertFalse(offsets.containsRange(4, 6)); + assertFalse(offsets.containsRange(5, 7)); + assertFalse(offsets.containsRange(7, 9)); + assertFalse(offsets.containsRange(9, 9)); + } + } }