7
7
8
8
package org .elasticsearch .blobcache ;
9
9
10
+ import org .apache .logging .log4j .LogManager ;
11
+ import org .apache .logging .log4j .Logger ;
10
12
import org .elasticsearch .telemetry .TelemetryProvider ;
13
+ import org .elasticsearch .telemetry .metric .DoubleHistogram ;
11
14
import org .elasticsearch .telemetry .metric .LongCounter ;
12
15
import org .elasticsearch .telemetry .metric .LongHistogram ;
13
16
import org .elasticsearch .telemetry .metric .MeterRegistry ;
14
17
18
+ import java .util .Map ;
19
+ import java .util .concurrent .TimeUnit ;
20
+
15
21
public class BlobCacheMetrics {
22
+ private static final Logger logger = LogManager .getLogger (BlobCacheMetrics .class );
23
+
24
+ private static final double BYTES_PER_NANOSECONDS_TO_MEBIBYTES_PER_SECOND = 1e9D / (1 << 20 );
25
+ public static final String CACHE_POPULATION_REASON_ATTRIBUTE_KEY = "reason" ;
26
+ public static final String CACHE_POPULATION_SOURCE_ATTRIBUTE_KEY = "source" ;
27
+ public static final String SHARD_ID_ATTRIBUTE_KEY = "shard_id" ;
28
+ public static final String INDEX_ATTRIBUTE_KEY = "index_name" ;
29
+
16
30
private final LongCounter cacheMissCounter ;
17
31
private final LongCounter evictedCountNonZeroFrequency ;
18
32
private final LongHistogram cacheMissLoadTimes ;
33
+ private final DoubleHistogram cachePopulationThroughput ;
34
+ private final LongCounter cachePopulationBytes ;
35
+ private final LongCounter cachePopulationTime ;
36
+
37
+ public enum CachePopulationReason {
38
+ /**
39
+ * When warming the cache
40
+ */
41
+ Warming ,
42
+ /**
43
+ * When the data we need is not in the cache
44
+ */
45
+ CacheMiss
46
+ }
19
47
20
48
public BlobCacheMetrics (MeterRegistry meterRegistry ) {
21
49
this (
@@ -33,14 +61,39 @@ public BlobCacheMetrics(MeterRegistry meterRegistry) {
33
61
"es.blob_cache.cache_miss_load_times.histogram" ,
34
62
"The time in milliseconds for populating entries in the blob store resulting from a cache miss, expressed as a histogram." ,
35
63
"ms"
64
+ ),
65
+ meterRegistry .registerDoubleHistogram (
66
+ "es.blob_cache.population.throughput.histogram" ,
67
+ "The throughput observed when populating the the cache" ,
68
+ "MiB/second"
69
+ ),
70
+ meterRegistry .registerLongCounter (
71
+ "es.blob_cache.population.bytes.total" ,
72
+ "The number of bytes that have been copied into the cache" ,
73
+ "bytes"
74
+ ),
75
+ meterRegistry .registerLongCounter (
76
+ "es.blob_cache.population.time.total" ,
77
+ "The time spent copying data into the cache" ,
78
+ "milliseconds"
36
79
)
37
80
);
38
81
}
39
82
40
- BlobCacheMetrics (LongCounter cacheMissCounter , LongCounter evictedCountNonZeroFrequency , LongHistogram cacheMissLoadTimes ) {
83
+ BlobCacheMetrics (
84
+ LongCounter cacheMissCounter ,
85
+ LongCounter evictedCountNonZeroFrequency ,
86
+ LongHistogram cacheMissLoadTimes ,
87
+ DoubleHistogram cachePopulationThroughput ,
88
+ LongCounter cachePopulationBytes ,
89
+ LongCounter cachePopulationTime
90
+ ) {
41
91
this .cacheMissCounter = cacheMissCounter ;
42
92
this .evictedCountNonZeroFrequency = evictedCountNonZeroFrequency ;
43
93
this .cacheMissLoadTimes = cacheMissLoadTimes ;
94
+ this .cachePopulationThroughput = cachePopulationThroughput ;
95
+ this .cachePopulationBytes = cachePopulationBytes ;
96
+ this .cachePopulationTime = cachePopulationTime ;
44
97
}
45
98
46
99
public static BlobCacheMetrics NOOP = new BlobCacheMetrics (TelemetryProvider .NOOP .getMeterRegistry ());
@@ -56,4 +109,55 @@ public LongCounter getEvictedCountNonZeroFrequency() {
56
109
public LongHistogram getCacheMissLoadTimes () {
57
110
return cacheMissLoadTimes ;
58
111
}
112
+
113
+ /**
114
+ * Record the various cache population metrics after a chunk is copied to the cache
115
+ *
116
+ * @param bytesCopied The number of bytes copied
117
+ * @param copyTimeNanos The time taken to copy the bytes in nanoseconds
118
+ * @param index The index being loaded
119
+ * @param shardId The ID of the shard being loaded
120
+ * @param cachePopulationReason The reason for the cache being populated
121
+ * @param cachePopulationSource The source from which the data is being loaded
122
+ */
123
+ public void recordCachePopulationMetrics (
124
+ int bytesCopied ,
125
+ long copyTimeNanos ,
126
+ String index ,
127
+ int shardId ,
128
+ CachePopulationReason cachePopulationReason ,
129
+ CachePopulationSource cachePopulationSource
130
+ ) {
131
+ Map <String , Object > metricAttributes = Map .of (
132
+ INDEX_ATTRIBUTE_KEY ,
133
+ index ,
134
+ SHARD_ID_ATTRIBUTE_KEY ,
135
+ shardId ,
136
+ CACHE_POPULATION_REASON_ATTRIBUTE_KEY ,
137
+ cachePopulationReason .name (),
138
+ CACHE_POPULATION_SOURCE_ATTRIBUTE_KEY ,
139
+ cachePopulationSource .name ()
140
+ );
141
+ assert bytesCopied > 0 : "We shouldn't be recording zero-sized copies" ;
142
+ cachePopulationBytes .incrementBy (bytesCopied , metricAttributes );
143
+
144
+ // This is almost certainly paranoid, but if we had a very fast/small copy with a very coarse nanosecond timer it might happen?
145
+ if (copyTimeNanos > 0 ) {
146
+ cachePopulationThroughput .record (toMebibytesPerSecond (bytesCopied , copyTimeNanos ), metricAttributes );
147
+ cachePopulationTime .incrementBy (TimeUnit .NANOSECONDS .toMillis (copyTimeNanos ), metricAttributes );
148
+ } else {
149
+ logger .warn ("Zero-time copy being reported, ignoring" );
150
+ }
151
+ }
152
+
153
+ /**
154
+ * Calculate throughput as MiB/second
155
+ *
156
+ * @param numberOfBytes The number of bytes transferred
157
+ * @param timeInNanoseconds The time taken to transfer in nanoseconds
158
+ * @return The throughput as MiB/second
159
+ */
160
+ private double toMebibytesPerSecond (int numberOfBytes , long timeInNanoseconds ) {
161
+ return ((double ) numberOfBytes / timeInNanoseconds ) * BYTES_PER_NANOSECONDS_TO_MEBIBYTES_PER_SECOND ;
162
+ }
59
163
}
0 commit comments