Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions src/java/org/apache/cassandra/db/Mutation.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.db.rows.DeserializationHelper;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputBuffer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.io.util.DataOutputPlus;
Expand All @@ -56,6 +57,7 @@
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.AbstractWriteResponseHandler;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.concurrent.Future;

import static com.google.common.base.Preconditions.checkState;
Expand Down Expand Up @@ -656,6 +658,24 @@ public Mutation deserialize(DataInputPlus in, int version, DeserializationHelper
}
}

/**
* Partially deserializes mutation data to get the key and table metadata. This does not skip the remainder of
* the mutation, hence the in argument being DataInputBuffer not DataInputPlus
*/
public Pair<DecoratedKey, TableMetadata> deserializeKeyAndTableMetadata(DataInputBuffer in, int version, DeserializationHelper.Flag flag) throws IOException
{
if (version >= VERSION_51)
in.skipBytes(1); // potentialTxnConflicts

if (version >= VERSION_52)
MutationId.serializer.skip(in, version);

int size = in.readUnsignedVInt32();
assert size > 0;

return PartitionUpdate.serializer.deserializeMetadataAndKey(in, version, flag);
}

public Mutation deserialize(DataInputPlus in, int version) throws IOException
{
return deserialize(in, version, DeserializationHelper.Flag.FROM_REMOTE);
Expand Down
34 changes: 27 additions & 7 deletions src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import org.apache.cassandra.service.accord.serializers.TableMetadatas;
import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.tcm.Epoch;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.btree.BTree;
import org.apache.cassandra.utils.btree.UpdateFunction;
import org.apache.cassandra.utils.vint.VIntCoding;
Expand Down Expand Up @@ -788,16 +789,11 @@ public void serializeWithoutKey(PartitionUpdate update, TableMetadatas tables, D
}
}

public PartitionUpdate deserialize(DataInputPlus in, int version, DeserializationHelper.Flag flag) throws IOException
private TableMetadata getTableMetadata(TableId tableId, Epoch remoteVersion) throws IOException
{
TableId tableId = TableId.deserialize(in);
Epoch remoteVersion = null;
if (version >= MessagingService.VERSION_51)
remoteVersion = Epoch.serializer.deserialize(in);
TableMetadata tableMetadata;
try
{
tableMetadata = Schema.instance.getExistingTableMetadata(tableId);
return Schema.instance.getExistingTableMetadata(tableId);
}
catch (UnknownTableException e)
{
Expand All @@ -810,10 +806,34 @@ public PartitionUpdate deserialize(DataInputPlus in, int version, Deserializatio
}
throw e;
}
}

public PartitionUpdate deserialize(DataInputPlus in, int version, DeserializationHelper.Flag flag) throws IOException
{
TableId tableId = TableId.deserialize(in);
Epoch remoteVersion = null;
if (version >= MessagingService.VERSION_51)
remoteVersion = Epoch.serializer.deserialize(in);
TableMetadata tableMetadata = getTableMetadata(tableId, remoteVersion);
UnfilteredRowIteratorSerializer.Header header = UnfilteredRowIteratorSerializer.serializer.deserializeHeader(tableMetadata, null, in, version, flag);
return deserialize(header, remoteVersion, tableMetadata, in, version, flag);
}

/**
* Partially deserializes a serialized partition update to get the key and table metadata. This does not skip the remainder of
* the mutation, hence the in argument being DataInputBuffer not DataInputPlus
*/
public Pair<DecoratedKey, TableMetadata> deserializeMetadataAndKey(DataInputBuffer in, int version, DeserializationHelper.Flag flag) throws IOException
{
TableId tableId = TableId.deserialize(in);
Epoch remoteVersion = null;
if (version >= MessagingService.VERSION_51)
remoteVersion = Epoch.serializer.deserialize(in);
TableMetadata tableMetadata = getTableMetadata(tableId, remoteVersion);
UnfilteredRowIteratorSerializer.Header header = UnfilteredRowIteratorSerializer.serializer.deserializeHeader(tableMetadata, null, in, version, flag);
return Pair.create(header.key, tableMetadata);
}

public PartitionUpdate deserialize(PartitionKey key, TableMetadatas tables, DataInputPlus in, int version, DeserializationHelper.Flag flag) throws IOException
{
TableMetadata tableMetadata = tables.deserialize(in);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;

import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.collect.ImmutableList;
Expand All @@ -42,6 +44,9 @@
import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
import org.apache.cassandra.locator.RangesAtEndpoint;
import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.replication.ImmutableCoordinatorLogOffsets;
import org.apache.cassandra.replication.Offsets;
import org.apache.cassandra.replication.ReconciledKeyspaceOffsets;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.streaming.IncomingStream;
import org.apache.cassandra.streaming.OutgoingStream;
Expand Down Expand Up @@ -86,8 +91,34 @@ public StreamReceiver createStreamReceiver(StreamSession session, List<Range<Tok
return new CassandraStreamReceiver(cfs, session, ranges, totalStreams);
}

public Predicate<SSTableReader> getSSTablePredicateForKeyspaceRanges(ReconciledKeyspaceOffsets reconciledKeyspaceOffsets)
{
if (reconciledKeyspaceOffsets == null)
return sstable -> true;

return sstable -> {
if (sstable.isRepaired())
return false;

ImmutableCoordinatorLogOffsets sstableOffsets = sstable.getSSTableMetadata().coordinatorLogOffsets;

// if it's not repaired and there are no offsets, it was probably written before the table was using
// mutation tracking and therefore should be considered unreconciled
if (sstableOffsets.isEmpty())
return true;

for (Map.Entry<Long, Offsets.Immutable> entry : sstableOffsets.entries())
{
if (!reconciledKeyspaceOffsets.isFullyReconciled(entry.getKey(), entry.getValue()))
return true;
}

return false;
};
}

@Override
public Collection<OutgoingStream> createOutgoingStreams(StreamSession session, RangesAtEndpoint replicas, TimeUUID pendingRepair, PreviewKind previewKind)
public Collection<OutgoingStream> createOutgoingStreams(StreamSession session, RangesAtEndpoint replicas, TimeUUID pendingRepair, PreviewKind previewKind, ReconciledKeyspaceOffsets reconciledKeyspaceOffsets)
{
Refs<SSTableReader> refs = new Refs<>();
try
Expand All @@ -99,7 +130,14 @@ public Collection<OutgoingStream> createOutgoingStreams(StreamSession session, R
Set<SSTableReader> sstables = Sets.newHashSet();
SSTableIntervalTree intervalTree = buildSSTableIntervalTree(ImmutableList.copyOf(view.select(SSTableSet.CANONICAL)));
Predicate<SSTableReader> predicate;
if (previewKind.isPreview())
if (reconciledKeyspaceOffsets != null)
{
// TODO: relax these restrictions as repair support is add
Preconditions.checkArgument(previewKind == PreviewKind.NONE);
Preconditions.checkArgument(pendingRepair == ActiveRepairService.NO_PENDING_REPAIR);
predicate = getSSTablePredicateForKeyspaceRanges(reconciledKeyspaceOffsets);
}
else if (previewKind.isPreview())
{
predicate = previewKind.predicate();
}
Expand Down
17 changes: 17 additions & 0 deletions src/java/org/apache/cassandra/journal/ActiveSegment.java
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,23 @@ boolean read(int offset, int size, EntrySerializer.EntryHolder<K> into)
return true;
}

@Override
public void readAll(RecordConsumer<K> consumer)
{
// Ensure all in-progress writes are completed and flushed to the buffer
updateWrittenTo();

// Use the forEach method to iterate through all key-offset pairs
index.forEach((key, offset, size) -> {
// Read the record at this offset
EntrySerializer.EntryHolder<K> holder = new EntrySerializer.EntryHolder<>();
if (read(offset, size, holder))
{
consumer.accept(descriptor.timestamp, offset, key, holder.value, descriptor.userVersion);
}
});
}

/**
* Stop writing to this file, flush and close it. Does nothing if the file is already closed.
*/
Expand Down
20 changes: 20 additions & 0 deletions src/java/org/apache/cassandra/journal/InMemoryIndex.java
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,26 @@ static <K> InMemoryIndex<K> rebuild(Descriptor descriptor, KeySupport<K> keySupp
return index;
}

public interface EntryConsumer<K>
{
void accept(K key, int offset, int size);
}

/**
* Iterate through all key-offset pairs in the index.
* For each key, iterates through all its offsets and calls the consumer
* with the key and individual offset/size values.
*/
public void forEach(EntryConsumer<K> consumer)
{
index.forEach((key, offsetsAndSizes) -> {
for (long offsetAndSize : offsetsAndSizes)
{
consumer.accept(key, Index.readOffset(offsetAndSize), Index.readSize(offsetAndSize));
}
});
}

@Override
public void close()
{
Expand Down
61 changes: 60 additions & 1 deletion src/java/org/apache/cassandra/journal/Journal.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.function.*;
import java.util.zip.CRC32;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -174,6 +175,36 @@ public void run()
}
}

public static class Snapshot<K, V> implements AutoCloseable
{
private final Segments.ReferencedSegments<K, V> segments;

private Snapshot(@Nonnull ReferencedSegments<K, V> segments)
{
this.segments = segments;
}

/**
* Read all records from all segments in the journal.
* This method provides an atomic snapshot of all segments and iterates through each one.
*
* @param consumer function to consume each record found
*/
public void readAll(RecordConsumer<K> consumer)
{
for (Segment<K, V> segment : segments.allSorted(false))
{
segment.readAll(consumer);
}
}

@Override
public void close()
{
segments.close();
}
}

public Journal(String name,
File directory,
Params params,
Expand Down Expand Up @@ -370,6 +401,28 @@ public List<V> readAll(K id)
return res;
}

/**
* Read all records from all segments in the journal.
* This method provides an atomic snapshot of all segments and iterates through each one.
*
* @param consumer function to consume each record found
*/
public void readAll(RecordConsumer<K> consumer)
{
try (OpOrder.Group group = readOrder.start())
{
for (Segment<K, V> segment : segments.get().allSorted(false))
{
segment.readAll(consumer);
}
}
}

public Snapshot<K, V> snapshot(Predicate<Segment<K, V>> predicate)
{
return new Snapshot<>(selectAndReference(predicate));
}

/**
* Looks up a record by the provided id, if the value satisfies the provided condition.
* <p/>
Expand Down Expand Up @@ -566,6 +619,12 @@ private ActiveSegment<K, V>.Allocation allocate(int entrySize)
return alloc;
}

@VisibleForTesting
public void advanceSegment()
{
advanceSegment(currentSegment);
}

/*
* Segment allocation logic.
*/
Expand Down Expand Up @@ -742,7 +801,7 @@ private void closeAllSegments()
}

@SuppressWarnings("unused")
ReferencedSegments<K, V> selectAndReference(Predicate<Segment<K,V>> selector)
public ReferencedSegments<K, V> selectAndReference(Predicate<Segment<K,V>> selector)
{
while (true)
{
Expand Down
6 changes: 6 additions & 0 deletions src/java/org/apache/cassandra/journal/Segment.java
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,12 @@ void readAll(K id, EntrySerializer.EntryHolder<K> into, RecordConsumer<K> onEntr
}
}

/**
* Read all records from this segment using the RecordConsumer interface.
* This provides access to deserialized keys and raw value ByteBuffers for streaming scenarios.
*/
public abstract void readAll(RecordConsumer<K> consumer);

@Override
public int compareTo(Segment<K, V> that)
{
Expand Down
12 changes: 12 additions & 0 deletions src/java/org/apache/cassandra/journal/StaticSegment.java
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,18 @@ public void forEachRecord(RecordConsumer<K> consumer)
}
}

@Override
public void readAll(RecordConsumer<K> consumer)
{
try (SequentialReader<K> reader = sequentialReader(descriptor, keySupport, fsyncLimit))
{
while (reader.advance())
{
consumer.accept(descriptor.timestamp, reader.offset(), reader.key(), reader.record(), descriptor.userVersion);
}
}
}

/*
* Sequential and in-key order reading (replay and components rebuild)
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,16 @@ boolean isEmpty()
@Override
public String toString()
{
return "ShardReplicatedOffsets{" + keyspace + ", " + range + ", " + replicatedOffsets + ", " + durable + '}';
StringBuilder sb = new StringBuilder('[');
boolean isFirst = true;
for (Offsets.Immutable logOffsets : replicatedOffsets)
{
if (!isFirst) sb.append(", ");
sb.append('(').append(logOffsets.logId()).append(" -> ").append(logOffsets).append(')');
isFirst = false;
}
sb.append(']');
return "ShardReplicatedOffsets{" + keyspace + ", " + range + ", " + sb + ", " + durable + '}';
}

public static final IVerbHandler<BroadcastLogOffsets> verbHandler = message -> {
Expand Down
Loading