Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/132547.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 132547
summary: Add epoch blob-cache metric
area: Searchable Snapshots
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public class BlobCacheMetrics {

private final LongAdder missCount = new LongAdder();
private final LongAdder readCount = new LongAdder();
private final LongCounter epochChanges;

public enum CachePopulationReason {
/**
Expand Down Expand Up @@ -98,7 +99,8 @@ public BlobCacheMetrics(MeterRegistry meterRegistry) {
"es.blob_cache.population.time.total",
"The time spent copying data into the cache",
"milliseconds"
)
),
meterRegistry.registerLongCounter("es.blob_cache.epoch.total", "The epoch changes of the LFU cache", "count")
);

meterRegistry.registerLongGauge(
Expand Down Expand Up @@ -134,7 +136,8 @@ public BlobCacheMetrics(MeterRegistry meterRegistry) {
LongHistogram cacheMissLoadTimes,
DoubleHistogram cachePopulationThroughput,
LongCounter cachePopulationBytes,
LongCounter cachePopulationTime
LongCounter cachePopulationTime,
LongCounter epochChanges
) {
this.cacheMissCounter = cacheMissCounter;
this.evictedCountNonZeroFrequency = evictedCountNonZeroFrequency;
Expand All @@ -143,6 +146,7 @@ public BlobCacheMetrics(MeterRegistry meterRegistry) {
this.cachePopulationThroughput = cachePopulationThroughput;
this.cachePopulationBytes = cachePopulationBytes;
this.cachePopulationTime = cachePopulationTime;
this.epochChanges = epochChanges;
}

public static final BlobCacheMetrics NOOP = new BlobCacheMetrics(TelemetryProvider.NOOP.getMeterRegistry());
Expand Down Expand Up @@ -201,6 +205,10 @@ public void recordCachePopulationMetrics(
}
}

public void recordEpochChange() {
epochChanges.increment();
}

public void recordRead() {
readCount.increment();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2049,6 +2049,7 @@ public void onFailure(Exception e) {
public void onAfter() {
assert pendingEpoch.get() == epoch.get() + 1;
epoch.incrementAndGet();
blobCacheMetrics.recordEpochChange();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.elasticsearch.env.TestEnvironment;
import org.elasticsearch.node.NodeRoleSettings;
import org.elasticsearch.telemetry.InstrumentType;
import org.elasticsearch.telemetry.Measurement;
import org.elasticsearch.telemetry.RecordingMeterRegistry;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
Expand Down Expand Up @@ -332,6 +333,8 @@ public void testAsynchronousEviction() throws Exception {
}

public void testDecay() throws IOException {
RecordingMeterRegistry recordingMeterRegistry = new RecordingMeterRegistry();
BlobCacheMetrics metrics = new BlobCacheMetrics(recordingMeterRegistry);
// we have 8 regions
Settings settings = Settings.builder()
.put(NODE_NAME_SETTING.getKey(), "node")
Expand All @@ -347,7 +350,7 @@ public void testDecay() throws IOException {
settings,
taskQueue.getThreadPool(),
taskQueue.getThreadPool().executor(ThreadPool.Names.GENERIC),
BlobCacheMetrics.NOOP
metrics
)
) {
assertEquals(4, cacheService.freeRegionCount());
Expand Down Expand Up @@ -375,6 +378,8 @@ public void testDecay() throws IOException {
assertThat(taskQueue.hasRunnableTasks(), is(true));
taskQueue.runAllRunnableTasks();
assertThat(cacheService.epoch(), equalTo(expectedEpoch.incrementAndGet()));
long epochs = recordedEpochs(recordingMeterRegistry);
assertEquals(cacheService.epoch(), epochs);
};

triggerDecay.run();
Expand Down Expand Up @@ -435,11 +440,22 @@ public void testDecay() throws IOException {
}
}

private static long recordedEpochs(RecordingMeterRegistry recordingMeterRegistry) {
long epochs = recordingMeterRegistry.getRecorder()
.getMeasurements(InstrumentType.LONG_COUNTER, "es.blob_cache.epoch.total")
.stream()
.mapToLong(Measurement::getLong)
.sum();
return epochs;
}

/**
* Test when many objects need to decay, in particular useful to measure how long the decay task takes.
* For 1M objects (with no assertions) it took 26ms locally.
*/
public void testMassiveDecay() throws IOException {
RecordingMeterRegistry recordingMeterRegistry = new RecordingMeterRegistry();
BlobCacheMetrics metrics = new BlobCacheMetrics(recordingMeterRegistry);
int regions = 1024; // to measure decay time, increase to 1024*1024 and disable assertions.
Settings settings = Settings.builder()
.put(NODE_NAME_SETTING.getKey(), "node")
Expand All @@ -455,7 +471,7 @@ public void testMassiveDecay() throws IOException {
settings,
taskQueue.getThreadPool(),
taskQueue.getThreadPool().executor(ThreadPool.Names.GENERIC),
BlobCacheMetrics.NOOP
metrics
)
) {
Runnable decay = () -> {
Expand Down Expand Up @@ -496,6 +512,9 @@ public void testMassiveDecay() throws IOException {
}
}
assertThat(freqs.get(4), equalTo(regions - maxRounds + 1));

long epochs = recordedEpochs(recordingMeterRegistry);
assertEquals(cacheService.epoch(), epochs);
}
}

Expand Down