Skip to content

Commit 4ec6aed

Browse files
authored
Add file extension metadata to cache miss counter from SharedBlobCacheService (elastic#134374)
1 parent 81d63bc commit 4ec6aed

File tree

6 files changed

+131
-3
lines changed

6 files changed

+131
-3
lines changed

docs/changelog/134374.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 134374
2+
summary: Add file extension metadata to cache miss counter from `SharedBlobCacheService`
3+
area: Search
4+
type: enhancement
5+
issues: []

qa/evil-tests/src/test/java/org/elasticsearch/index/store/LuceneFilesExtensionsTests.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,13 @@
99

1010
package org.elasticsearch.index.store;
1111

12+
import org.apache.lucene.index.IndexFileNames;
1213
import org.elasticsearch.core.Assertions;
1314
import org.elasticsearch.core.SuppressForbidden;
1415
import org.elasticsearch.test.ESTestCase;
1516

17+
import java.util.Locale;
18+
1619
import static org.hamcrest.Matchers.containsString;
1720

1821
public class LuceneFilesExtensionsTests extends ESTestCase {
@@ -21,6 +24,7 @@ public void testUnknownFileExtension() {
2124
if (Assertions.ENABLED) {
2225
AssertionError e = expectThrows(AssertionError.class, () -> LuceneFilesExtensions.fromExtension("abc"));
2326
assertThat(e.getMessage(), containsString("unknown Lucene file extension [abc]"));
27+
assertFalse(LuceneFilesExtensions.isLuceneExtension("abc"));
2428

2529
setEsAllowUnknownLuceneFileExtensions("true");
2630
try {
@@ -41,4 +45,19 @@ public void setEsAllowUnknownLuceneFileExtensions(final String value) {
4145
System.setProperty("es.allow_unknown_lucene_file_extensions", value);
4246
}
4347
}
48+
49+
public void testIsLuceneExtension() {
50+
assertFalse(LuceneFilesExtensions.isLuceneExtension(null));
51+
assertFalse(LuceneFilesExtensions.isLuceneExtension("bcde"));
52+
String randomStringWithLuceneExtension = randomAlphanumericOfLength(10)
53+
+ "."
54+
+ LuceneFilesExtensions.values()[randomInt(LuceneFilesExtensions.values().length) - 1].getExtension();
55+
String extension = IndexFileNames.getExtension(randomStringWithLuceneExtension);
56+
assertTrue(extension + " should be considered a Lucene extension", LuceneFilesExtensions.isLuceneExtension(extension));
57+
String upperCaseExtension = extension.toUpperCase(Locale.ROOT);
58+
assertFalse(
59+
upperCaseExtension + " (uppercase) should not be considered a Lucene extension",
60+
LuceneFilesExtensions.isLuceneExtension(upperCaseExtension)
61+
);
62+
}
4463
}

server/src/main/java/org/elasticsearch/index/store/LuceneFilesExtensions.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,11 @@ public static LuceneFilesExtensions fromExtension(String ext) {
165165
return null;
166166
}
167167

168+
@Nullable
169+
public static boolean isLuceneExtension(String ext) {
170+
return extensions.containsKey(ext);
171+
}
172+
168173
@Nullable
169174
public static LuceneFilesExtensions fromFile(String fileName) {
170175
return fromExtension(IndexFileNames.getExtension(fileName));

x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
import org.apache.logging.log4j.LogManager;
1111
import org.apache.logging.log4j.Logger;
12+
import org.apache.lucene.index.IndexFileNames;
1213
import org.apache.lucene.store.AlreadyClosedException;
1314
import org.elasticsearch.action.ActionListener;
1415
import org.elasticsearch.action.ActionRunnable;
@@ -37,6 +38,7 @@
3738
import org.elasticsearch.core.TimeValue;
3839
import org.elasticsearch.env.Environment;
3940
import org.elasticsearch.env.NodeEnvironment;
41+
import org.elasticsearch.index.store.LuceneFilesExtensions;
4042
import org.elasticsearch.monitor.fs.FsProbe;
4143
import org.elasticsearch.node.NodeRoleSettings;
4244
import org.elasticsearch.threadpool.ThreadPool;
@@ -67,6 +69,9 @@
6769
import java.util.function.Predicate;
6870
import java.util.stream.Collectors;
6971

72+
import static org.elasticsearch.blobcache.BlobCacheMetrics.LUCENE_FILE_EXTENSION_ATTRIBUTE_KEY;
73+
import static org.elasticsearch.blobcache.BlobCacheMetrics.NON_LUCENE_EXTENSION_TO_RECORD;
74+
7075
/**
7176
* A caching layer on a local node to minimize network roundtrips to the remote blob store.
7277
*/
@@ -1254,7 +1259,8 @@ public int populateAndRead(
12541259
final ByteRange rangeToWrite,
12551260
final ByteRange rangeToRead,
12561261
final RangeAvailableHandler reader,
1257-
final RangeMissingHandler writer
1262+
final RangeMissingHandler writer,
1263+
String resourceDescription
12581264
) throws Exception {
12591265
// some cache files can grow after being created, so rangeToWrite can be larger than the initial {@code length}
12601266
assert rangeToWrite.start() >= 0 : rangeToWrite;
@@ -1273,6 +1279,7 @@ public void fillCacheRange(
12731279
IntConsumer progressUpdater,
12741280
ActionListener<Void> completionListener
12751281
) throws IOException {
1282+
String blobFileExtension = getFileExtension(resourceDescription);
12761283
writer.fillCacheRange(
12771284
channel,
12781285
channelPos,
@@ -1283,7 +1290,8 @@ public void fillCacheRange(
12831290
completionListener.map(unused -> {
12841291
var elapsedTime = TimeUnit.NANOSECONDS.toMillis(relativeTimeInNanosSupplier.getAsLong() - startTime);
12851292
blobCacheMetrics.getCacheMissLoadTimes().record(elapsedTime);
1286-
blobCacheMetrics.getCacheMissCounter().increment();
1293+
blobCacheMetrics.getCacheMissCounter()
1294+
.incrementBy(1L, Map.of(LUCENE_FILE_EXTENSION_ATTRIBUTE_KEY, blobFileExtension));
12871295
return null;
12881296
})
12891297
);
@@ -2075,4 +2083,17 @@ public void close() {
20752083
}
20762084
}
20772085
}
2086+
2087+
private static String getFileExtension(String resourceDescription) {
2088+
// TODO: consider introspecting resourceDescription for compound files
2089+
if (resourceDescription.endsWith(LuceneFilesExtensions.CFS.getExtension())) {
2090+
return LuceneFilesExtensions.CFS.getExtension();
2091+
}
2092+
String extension = IndexFileNames.getExtension(resourceDescription);
2093+
if (LuceneFilesExtensions.isLuceneExtension(extension)) {
2094+
return extension;
2095+
} else {
2096+
return NON_LUCENE_EXTENSION_TO_RECORD;
2097+
}
2098+
}
20782099
}

x-pack/plugin/blob-cache/src/test/java/org/elasticsearch/blobcache/shared/SharedBlobCacheServiceTests.java

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,9 @@
4343

4444
import java.io.IOException;
4545
import java.io.InputStream;
46+
import java.nio.ByteBuffer;
47+
import java.nio.file.Files;
48+
import java.nio.file.Path;
4649
import java.util.Collection;
4750
import java.util.HashMap;
4851
import java.util.HashSet;
@@ -51,6 +54,8 @@
5154
import java.util.Set;
5255
import java.util.concurrent.BrokenBarrierException;
5356
import java.util.concurrent.CyclicBarrier;
57+
import java.util.concurrent.ExecutorService;
58+
import java.util.concurrent.Executors;
5459
import java.util.concurrent.TimeUnit;
5560
import java.util.concurrent.atomic.AtomicBoolean;
5661
import java.util.concurrent.atomic.AtomicInteger;
@@ -171,6 +176,78 @@ public void testBasicEviction() throws IOException {
171176
}
172177
}
173178

179+
public void testCacheMissOnPopulate() throws Exception {
180+
Settings settings = Settings.builder()
181+
.put(NODE_NAME_SETTING.getKey(), "node")
182+
.put(SharedBlobCacheService.SHARED_CACHE_SIZE_SETTING.getKey(), ByteSizeValue.ofBytes(size(50)).getStringRep())
183+
.put(SharedBlobCacheService.SHARED_CACHE_REGION_SIZE_SETTING.getKey(), ByteSizeValue.ofBytes(size(10)).getStringRep())
184+
.put("path.home", createTempDir())
185+
.build();
186+
final DeterministicTaskQueue taskQueue = new DeterministicTaskQueue();
187+
RecordingMeterRegistry recordingMeterRegistry = new RecordingMeterRegistry();
188+
BlobCacheMetrics metrics = new BlobCacheMetrics(recordingMeterRegistry);
189+
ExecutorService ioExecutor = Executors.newCachedThreadPool();
190+
try (
191+
NodeEnvironment environment = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings));
192+
var cacheService = new SharedBlobCacheService<>(environment, settings, taskQueue.getThreadPool(), ioExecutor, metrics)
193+
) {
194+
ByteRange rangeRead = ByteRange.of(0L, 1L);
195+
ByteRange rangeWrite = ByteRange.of(0L, 1L);
196+
Path tempFile = createTempFile("test", "other");
197+
String resourceDescription = tempFile.toAbsolutePath().toString();
198+
final var cacheKey = generateCacheKey();
199+
SharedBlobCacheService<Object>.CacheFile cacheFile = cacheService.getCacheFile(cacheKey, 1L);
200+
201+
ByteBuffer writeBuffer = ByteBuffer.allocate(1);
202+
203+
final int bytesRead = cacheFile.populateAndRead(
204+
rangeRead,
205+
rangeWrite,
206+
(channel, pos, relativePos, len) -> len,
207+
(channel, channelPos, streamFactory, relativePos, len, progressUpdater, completionListener) -> {
208+
try (var in = Files.newInputStream(tempFile)) {
209+
SharedBytes.copyToCacheFileAligned(channel, in, channelPos, progressUpdater, writeBuffer.clear());
210+
}
211+
ActionListener.completeWith(completionListener, () -> null);
212+
},
213+
resourceDescription
214+
);
215+
assertThat(bytesRead, is(1));
216+
List<Measurement> measurements = recordingMeterRegistry.getRecorder()
217+
.getMeasurements(InstrumentType.LONG_COUNTER, "es.blob_cache.miss_that_triggered_read.total");
218+
Measurement first = measurements.getFirst();
219+
assertThat(first.attributes().get("file_extension"), is("other"));
220+
assertThat(first.value(), is(1L));
221+
222+
Path tempFile2 = createTempFile("test", "cfs");
223+
resourceDescription = tempFile2.toAbsolutePath().toString();
224+
cacheFile = cacheService.getCacheFile(generateCacheKey(), 1L);
225+
226+
ByteBuffer writeBuffer2 = ByteBuffer.allocate(1);
227+
228+
final int bytesRead2 = cacheFile.populateAndRead(
229+
rangeRead,
230+
rangeWrite,
231+
(channel, pos, relativePos, len) -> len,
232+
(channel, channelPos, streamFactory, relativePos, len, progressUpdater, completionListener) -> {
233+
try (var in = Files.newInputStream(tempFile2)) {
234+
SharedBytes.copyToCacheFileAligned(channel, in, channelPos, progressUpdater, writeBuffer2.clear());
235+
}
236+
ActionListener.completeWith(completionListener, () -> null);
237+
},
238+
resourceDescription
239+
);
240+
assertThat(bytesRead2, is(1));
241+
242+
measurements = recordingMeterRegistry.getRecorder()
243+
.getMeasurements(InstrumentType.LONG_COUNTER, "es.blob_cache.miss_that_triggered_read.total");
244+
Measurement measurement = measurements.get(1);
245+
assertThat(measurement.attributes().get("file_extension"), is("cfs"));
246+
assertThat(measurement.value(), is(1L));
247+
}
248+
ioExecutor.shutdown();
249+
}
250+
174251
private static boolean tryEvict(SharedBlobCacheService.CacheFileRegion<Object> region1) {
175252
if (randomBoolean()) {
176253
return region1.tryEvict();

x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/store/input/FrozenIndexInput.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,8 @@ private void readWithoutBlobCacheSlow(ByteBuffer b, long position, int length) t
177177
return null;
178178
}
179179
}
180-
)
180+
),
181+
fileInfo.physicalName()
181182
);
182183
assert bytesRead == length : bytesRead + " vs " + length;
183184
byteBufferReference.finish(bytesRead);

0 commit comments

Comments
 (0)