Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 27 additions & 14 deletions src/java/org/apache/cassandra/journal/ActiveSegment.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,19 @@ public final class ActiveSegment<K, V> extends Segment<K, V>
private final Ref<Segment<K, V>> selfRef;

final InMemoryIndex<K> index;
final KeyStats.Active<K> keyStats;

private ActiveSegment(
Descriptor descriptor, Params params, InMemoryIndex<K> index, Metadata metadata, KeySupport<K> keySupport)
Descriptor descriptor,
Params params,
InMemoryIndex<K> index,
Metadata metadata,
KeyStats.Active<K> keyStats,
KeySupport<K> keySupport)
{
super(descriptor, metadata, keySupport);
this.index = index;
this.keyStats = keyStats;
try
{
channel = FileChannel.open(file.toPath(), StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE);
Expand All @@ -98,16 +105,12 @@ private ActiveSegment(
}
}

public CommitLogPosition currentPosition()
{
return new CommitLogPosition(id(), (int) allocateOffset);
}

static <K, V> ActiveSegment<K, V> create(Descriptor descriptor, Params params, KeySupport<K> keySupport)
static <K, V> ActiveSegment<K, V> create(
Descriptor descriptor, Params params, KeySupport<K> keySupport, KeyStats.Factory<K> keyStatsFactory)
{
InMemoryIndex<K> index = InMemoryIndex.create(keySupport);
Metadata metadata = Metadata.empty();
return new ActiveSegment<>(descriptor, params, index, metadata, keySupport);
return new ActiveSegment<>(descriptor, params, index, metadata, keyStatsFactory.create(), keySupport);
}

@Override
Expand All @@ -116,6 +119,16 @@ public InMemoryIndex<K> index()
return index;
}

public KeyStats.Active<K> keyStats()
{
return keyStats;
}

public CommitLogPosition currentPosition()
{
return new CommitLogPosition(id(), (int) allocateOffset);
}

boolean isEmpty()
{
return allocateOffset == 0;
Expand Down Expand Up @@ -225,6 +238,7 @@ void persistComponents()
{
index.persist(descriptor);
metadata.persist(descriptor);
keyStats.persist(descriptor);
SyncUtil.trySyncDir(descriptor.directory);
}

Expand All @@ -236,6 +250,7 @@ private void discard()
descriptor.fileFor(Component.DATA).deleteIfExists();
descriptor.fileFor(Component.INDEX).deleteIfExists();
descriptor.fileFor(Component.METADATA).deleteIfExists();
descriptor.fileFor(Component.KEYSTATS).deleteIfExists();
}

@Override
Expand Down Expand Up @@ -290,6 +305,7 @@ public String name()
}
}

@Override
public boolean isFlushed(long position)
{
return writtenTo >= position;
Expand Down Expand Up @@ -465,18 +481,14 @@ final class Allocation extends RecordPointer
this.buffer = buffer;
}

Segment<K, V> segment()
{
return ActiveSegment.this;
}

void write(K id, ByteBuffer record)
{
try
{
EntrySerializer.write(id, record, keySupport, buffer, descriptor.userVersion);
metadata.update();
index.update(id, position, length);
keyStats.update(id);
metadata.update();
}
catch (IOException e)
{
Expand Down Expand Up @@ -508,6 +520,7 @@ void writeInternal(K id, ByteBuffer record)
{
EntrySerializer.write(id, record, keySupport, buffer, descriptor.userVersion);
index.update(id, position, length);
keyStats.update(id);
metadata.update();
}
catch (IOException e)
Expand Down
10 changes: 6 additions & 4 deletions src/java/org/apache/cassandra/journal/Component.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,17 @@

import static accord.utils.SortedArrays.SortedArrayList.ofSorted;

enum Component
public enum Component
{
DATA ("data"),
INDEX ("indx"),
METADATA ("meta");
DATA ("data"),
INDEX ("indx"),
METADATA ("meta"),
KEYSTATS ("keys");
//OFFSET_MAP (".offs"),
//INVLALIDATIONS (".invl");

public static final List<Component> VALUES = ofSorted(values());

final String extension;

Component(String extension)
Expand Down
9 changes: 6 additions & 3 deletions src/java/org/apache/cassandra/journal/Descriptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import com.google.common.annotations.VisibleForTesting;

import org.apache.cassandra.io.util.File;

import static java.lang.String.format;
Expand Down Expand Up @@ -90,7 +92,8 @@ public final class Descriptor implements Comparable<Descriptor>
this.userVersion = userVersion;
}

static Descriptor create(File directory, long timestamp, int userVersion)
@VisibleForTesting
public static Descriptor create(File directory, long timestamp, int userVersion)
{
return new Descriptor(directory, timestamp, 1, CURRENT_JOURNAL_VERSION, userVersion);
}
Expand All @@ -114,12 +117,12 @@ public static Descriptor fromFile(File file)
return fromName(file.parent(), file.name());
}

File fileFor(Component component)
public File fileFor(Component component)
{
return new File(directory, formatFileName(component));
}

File tmpFileFor(Component component)
public File tmpFileFor(Component component)
{
return new File(directory, formatFileName(component) + '.' + TMP_SUFFIX);
}
Expand Down
2 changes: 1 addition & 1 deletion src/java/org/apache/cassandra/journal/DumpUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,6 @@ public static void dumpMetadata(Descriptor descriptor, Consumer<String> out)

public static <K, V> StaticSegment<K, V> open(Descriptor descriptor, KeySupport<K> keySupport)
{
return StaticSegment.open(descriptor, keySupport);
return StaticSegment.open(descriptor, keySupport, KeyStats.Factory.noop());
}
}
11 changes: 4 additions & 7 deletions src/java/org/apache/cassandra/journal/EntrySerializer.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,8 @@
import java.nio.ByteBuffer;
import java.util.zip.CRC32;

import static org.apache.cassandra.journal.Journal.validateCRC;

/**
* Entry format:
*
* [Total Size (4 bytes)]
* [Header (variable size)]
* [Header CRC (4 bytes)]
Expand Down Expand Up @@ -95,10 +92,10 @@ static <K> void read(EntryHolder<K> into,
CRC32 crc = Crc.crc32();
int headerSize = EntrySerializer.headerSize(keySupport, userVersion);
int headerCrc = readAndUpdateHeaderCrc(crc, from, headerSize);
validateCRC(crc, headerCrc);
Crc.validate(crc, headerCrc);

int recordCrc = readAndUpdateRecordCrc(crc, from, start + totalSize);
validateCRC(crc, recordCrc);
Crc.validate(crc, recordCrc);
}

readValidated(into, from, start, keySupport, userVersion);
Expand Down Expand Up @@ -142,7 +139,7 @@ static <K> int tryRead(EntryHolder<K> into,
int headerCrc = readAndUpdateHeaderCrc(crc, from, headerSize);
try
{
validateCRC(crc, headerCrc);
Crc.validate(crc, headerCrc);
}
catch (IOException e)
{
Expand All @@ -152,7 +149,7 @@ static <K> int tryRead(EntryHolder<K> into,
int recordCrc = readAndUpdateRecordCrc(crc, from, start + totalSize);
try
{
validateCRC(crc, recordCrc);
Crc.validate(crc, recordCrc);
}
catch (IOException e)
{
Expand Down
Loading