Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions oak-segment-tar/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,12 @@
<artifactId>org.apache.sling.testing.osgi-mock.junit4</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>4.2.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.osgi</groupId>
<artifactId>org.osgi.service.event</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,12 @@
import java.util.Set;
import java.util.UUID;
import java.util.function.Consumer;
import java.util.function.Supplier;

import org.apache.jackrabbit.oak.api.jmx.CacheStatsMBean;
import org.apache.jackrabbit.oak.commons.Buffer;
import org.apache.jackrabbit.oak.commons.conditions.Validate;
import org.apache.jackrabbit.oak.commons.pio.Closer;
import org.apache.jackrabbit.oak.segment.CachingSegmentReader;
import org.apache.jackrabbit.oak.segment.RecordType;
import org.apache.jackrabbit.oak.segment.Revisions;
Expand All @@ -48,13 +51,16 @@
import org.apache.jackrabbit.oak.segment.SegmentStore;
import org.apache.jackrabbit.oak.segment.SegmentTracker;
import org.apache.jackrabbit.oak.segment.SegmentWriter;
import org.apache.jackrabbit.oak.segment.file.preloader.SegmentPreloader;
import org.apache.jackrabbit.oak.segment.file.tar.EntryRecovery;
import org.apache.jackrabbit.oak.segment.file.tar.GCGeneration;
import org.apache.jackrabbit.oak.segment.file.tar.TarFiles;
import org.apache.jackrabbit.oak.segment.file.tar.TarRecovery;
import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitor;
import org.apache.jackrabbit.oak.segment.spi.monitor.RemoteStoreMonitor;
import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentNodeStorePersistence;
import org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.PersistentCache;
import org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.PersistentCachePreloadingConfiguration;
import org.apache.jackrabbit.oak.spi.blob.BlobStore;
import org.apache.jackrabbit.oak.stats.StatsOptions;
import org.jetbrains.annotations.NotNull;
Expand Down Expand Up @@ -89,6 +95,8 @@ public abstract class AbstractFileStore implements SegmentStore, Closeable {
*/
private static final int MAX_STORE_VERSION = 2;

protected final @Nullable PersistentCache persistentCache;

static ManifestChecker newManifestChecker(SegmentNodeStorePersistence persistence, boolean strictVersionCheck) throws IOException {
return ManifestChecker.newManifestChecker(
persistence.getManifestFile(),
Expand Down Expand Up @@ -130,7 +138,7 @@ public void recoverEntry(UUID uuid, byte[] data, EntryRecovery entryRecovery) th
protected final IOMonitor ioMonitor;

protected final RemoteStoreMonitor remoteStoreMonitor;

protected final int binariesInlineThreshold;

AbstractFileStore(final FileStoreBuilder builder) {
Expand All @@ -156,6 +164,18 @@ public SegmentId newSegmentId(long msb, long lsb) {
this.remoteStoreMonitor = builder.getRemoteStoreMonitor();
this.segmentBufferMonitor = new SegmentBufferMonitor(builder.getStatsProvider());
this.binariesInlineThreshold = builder.getBinariesInlineThreshold();
this.persistentCache = initializePersistentCache(builder, this::getTarFiles);
}

private static @Nullable PersistentCache initializePersistentCache(FileStoreBuilder builder, Supplier<TarFiles> tarFilesSupplier) {
PersistentCache persistentCache = builder.getPersistentCache();
PersistentCachePreloadingConfiguration preloadingConfig = builder.getPreloadingConfiguration();
if (preloadingConfig != null) {
Validate.checkState(persistentCache != null,
"PersistentCache must be configured when using a PersistentCachePreloadConfiguration");
persistentCache = SegmentPreloader.decorate(persistentCache, preloadingConfig, tarFilesSupplier);
}
return persistentCache;
}

static SegmentNotFoundException asSegmentNotFoundException(Exception e, SegmentId id) {
Expand All @@ -165,6 +185,8 @@ static SegmentNotFoundException asSegmentNotFoundException(Exception e, SegmentI
return new SegmentNotFoundException(id, e);
}

abstract TarFiles getTarFiles();

@NotNull
public CacheStatsMBean getSegmentCacheStats() {
return segmentCache.getCacheStats();
Expand Down Expand Up @@ -192,7 +214,7 @@ public SegmentReader getReader() {
public SegmentIdProvider getSegmentIdProvider() {
return tracker;
}

public int getBinariesInlineThreshold() {
return binariesInlineThreshold;
}
Expand Down Expand Up @@ -281,6 +303,25 @@ public void consume(int number, RecordType type, int offset) {
return binaryReferences;
}

@Override
public void close() {
doClose();
System.gc(); // for any memory-mappings that are no longer used
log.info("TarMK closed: {}", directory);
}

protected void doClose() {
Closer closer = Closer.create();
registerCloseables(closer);
closeAndLogOnFail(closer);
}

protected void registerCloseables(Closer closer) {
if (persistentCache instanceof Closeable) {
closer.register((Closeable) persistentCache);
}
}

static void closeAndLogOnFail(Closeable closeable) {
if (closeable != null) {
try {
Expand All @@ -292,11 +333,19 @@ static void closeAndLogOnFail(Closeable closeable) {
}
}

Segment readSegmentUncached(TarFiles tarFiles, SegmentId id) {
Buffer buffer = tarFiles.readSegment(id.getMostSignificantBits(), id.getLeastSignificantBits());
Segment readSegmentUncached(SegmentId id) {
Buffer buffer;
if (persistentCache != null) {
buffer = persistentCache.readSegment(id.getMostSignificantBits(), id.getLeastSignificantBits(),
() -> getTarFiles().readSegment(id.getMostSignificantBits(), id.getLeastSignificantBits()));
} else {
buffer = getTarFiles().readSegment(id.getMostSignificantBits(), id.getLeastSignificantBits());
}

if (buffer == null) {
throw new SegmentNotFoundException(id);
}

segmentBufferMonitor.trackAllocation(buffer);
return new Segment(tracker, id, buffer);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,11 @@ FileStore bind(TarRevisions revisions) throws IOException {
}
}

@Override
TarFiles getTarFiles() {
return tarFiles;
}

@NotNull
private Supplier<RecordId> initialNode() {
return new Supplier<RecordId>() {
Expand Down Expand Up @@ -483,21 +488,23 @@ public void close() {
log.warn("Unable to flush the store", e);
}

Closer closer = Closer.create();
closer.register(repositoryLock::unlock);
closer.register(tarFiles) ;
closer.register(revisions);

closeAndLogOnFail(closer);
doClose();
}

// Try removing pending files in case the scheduler didn't have a chance to run yet
System.gc(); // for any memory-mappings that are no longer used
fileReaper.reap();

log.info("TarMK closed: {}", directory);
}

@Override
protected void registerCloseables(Closer closer) {
closer.register(repositoryLock::unlock);
closer.register(tarFiles) ;
closer.register(revisions);
super.registerCloseables(closer);
}

@Override
public boolean containsSegment(SegmentId id) {
try (ShutDownCloser ignored = shutDown.keepAlive()) {
Expand All @@ -509,7 +516,7 @@ public boolean containsSegment(SegmentId id) {
@NotNull
public Segment readSegment(final SegmentId id) {
try (ShutDownCloser ignored = shutDown.keepAlive()) {
return segmentCache.getSegment(id, () -> readSegmentUncached(tarFiles, id));
return segmentCache.getSegment(id, () -> readSegmentUncached(id));
} catch (ExecutionException | UncheckedExecutionException e) {
if (e.getCause() instanceof RepositoryNotReachableException) {
RepositoryNotReachableException re = (RepositoryNotReachableException) e.getCause();
Expand Down Expand Up @@ -573,6 +580,10 @@ public void writeSegment(SegmentId id, byte[] buffer, int offset, int length) th
if (!eagerSegmentCaching && segment != null) {
segmentCache.putSegment(segment);
}

if (persistentCache != null) {
persistentCache.writeSegment(id.getMostSignificantBits(), id.getLeastSignificantBits(), Buffer.wrap(buffer, offset, length));
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
import org.apache.jackrabbit.oak.segment.file.tar.TarPersistence;
import org.apache.jackrabbit.oak.segment.spi.monitor.*;
import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentNodeStorePersistence;
import org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.PersistentCache;
import org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.PersistentCachePreloadingConfiguration;
import org.apache.jackrabbit.oak.segment.tool.iotrace.IOTraceLogWriter;
import org.apache.jackrabbit.oak.segment.tool.iotrace.IOTraceMonitor;
import org.apache.jackrabbit.oak.spi.blob.BlobStore;
Expand Down Expand Up @@ -94,7 +96,7 @@ public class FileStoreBuilder {
private boolean memoryMapping = MEMORY_MAPPING_DEFAULT;

private boolean offHeapAccess = getBoolean("access.off.heap");

private int binariesInlineThreshold = Segment.MEDIUM_LIMIT;

private SegmentNodeStorePersistence persistence;
Expand All @@ -108,6 +110,12 @@ public class FileStoreBuilder {
@Nullable
private EvictingWriteCacheManager cacheManager;

@Nullable
private PersistentCache persistentCache;

@Nullable
private PersistentCachePreloadingConfiguration preloadingConfig;

private class FileStoreGCListener extends DelegatingGCMonitor implements GCListener {
@Override
public void compactionSucceeded(@NotNull GCGeneration newGeneration) {
Expand Down Expand Up @@ -256,6 +264,34 @@ public FileStoreBuilder withNodeDeduplicationCacheSize(int nodeDeduplicationCach
return this;
}

/**
* The {@link PersistentCache} instance to be used, if any. By default no {@code PersistentCache}
* is configured.
*
* @param persistentCache the persistent cache
* @return this instance
*/
@NotNull
public FileStoreBuilder withPersistentCache(@NotNull PersistentCache persistentCache) {
this.persistentCache = requireNonNull(persistentCache);
return this;
}

/**
* The {@link PersistentCachePreloadingConfiguration} to be used for preloading segments.
* This configuration must be used together with a {@link PersistentCache}, configured
* via {@link #withPersistentCache(PersistentCache)}. By default, no segment preloading
* is configured.
*
* @param preloadingConfig the configuration for persistent cache preloading
* @return this instance
*/
@NotNull
public FileStoreBuilder withPersistentCachePreloading(@NotNull PersistentCachePreloadingConfiguration preloadingConfig) {
this.preloadingConfig = requireNonNull(preloadingConfig);
return this;
}

/**
* Turn memory mapping on or off
*
Expand Down Expand Up @@ -397,7 +433,7 @@ public FileStoreBuilder withEagerSegmentCaching(boolean eagerSegmentCaching) {
this.eagerSegmentCaching = eagerSegmentCaching;
return this;
}

/**
* Sets the threshold under which binaries are inlined in data segments.
* @param binariesInlineThreshold the threshold
Expand Down Expand Up @@ -585,11 +621,19 @@ boolean getStrictVersionCheck() {
boolean getEagerSegmentCaching() {
return eagerSegmentCaching;
}

int getBinariesInlineThreshold() {
return binariesInlineThreshold;
}

@Nullable PersistentCache getPersistentCache() {
return persistentCache;
}

@Nullable PersistentCachePreloadingConfiguration getPreloadingConfiguration() {
return preloadingConfig;
}

@Override
public String toString() {
return "FileStoreBuilder{" +
Expand All @@ -607,6 +651,8 @@ public String toString() {
", memoryMapping=" + memoryMapping +
", offHeapAccess=" + offHeapAccess +
", gcOptions=" + gcOptions +
", persistentCache=" + persistentCache +
", persistentCachePreloadingConfiguration=" + preloadingConfig +
'}';
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;

Expand Down Expand Up @@ -94,6 +93,11 @@ ReadOnlyFileStore bind(@NotNull ReadOnlyRevisions revisions) throws IOException
return this;
}

@Override
TarFiles getTarFiles() {
return tarFiles;
}

/**
* Go to the specified {@code revision}
*
Expand All @@ -120,25 +124,17 @@ public boolean containsSegment(SegmentId id) {
@NotNull
public Segment readSegment(final SegmentId id) {
try {
return segmentCache.getSegment(id, new Callable<Segment>() {
@Override
public Segment call() throws Exception {
return readSegmentUncached(tarFiles, id);
}
});
return segmentCache.getSegment(id, () -> readSegmentUncached(id));
} catch (ExecutionException | UncheckedExecutionException e) {
throw asSegmentNotFoundException(e, id);
}
}

@Override
public void close() {
Closer closer = Closer.create();
protected void registerCloseables(Closer closer) {
closer.register(tarFiles);
closer.register(revisions);
closeAndLogOnFail(closer);
System.gc(); // for any memory-mappings that are no longer used
log.info("TarMK closed: {}", directory);
super.registerCloseables(closer);
}

@NotNull
Expand Down
Loading
Loading