Skip to content

Commit f73fe6c

Browse files
authored
[ISSUE #9885] Fix tiered store cache count and bytes metrics (#9886)
1 parent 1f387b2 commit f73fe6c

File tree

3 files changed

+115
-6
lines changed

3 files changed

+115
-6
lines changed

tieredstore/README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,12 +45,12 @@ Tiered storage provides some useful metrics, see [RIP-46](https://github.com/apa
4545
| Histogram | rocketmq_tiered_store_provider_upload_bytes | byte |
4646
| Histogram | rocketmq_tiered_store_provider_download_bytes | byte |
4747
| Gauge | rocketmq_tiered_store_dispatch_behind | |
48-
| Gauge | rocketmq_tiered_store_dispatch_latency | byte |
48+
| Gauge | rocketmq_tiered_store_dispatch_latency | milliseconds |
4949
| Counter | rocketmq_tiered_store_messages_dispatch_total | |
5050
| Counter | rocketmq_tiered_store_messages_out_total | |
5151
| Counter | rocketmq_tiered_store_get_message_fallback_total | |
5252
| Gauge | rocketmq_tiered_store_read_ahead_cache_count | |
53-
| Gauge | rocketmq_tiered_store_read_ahead_cache_bytes | byte |
53+
| Gauge | rocketmq_tiered_store_read_ahead_cache_bytes | bytes |
5454
| Counter | rocketmq_tiered_store_read_ahead_cache_access_total | |
5555
| Counter | rocketmq_tiered_store_read_ahead_cache_hit_total | |
5656
| Gauge | rocketmq_storage_message_reserve_time | milliseconds |

tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ public static void init(Meter meter, Supplier<AttributesBuilder> attributesBuild
207207

208208
dispatchLatency = meter.gaugeBuilder(GAUGE_DISPATCH_LATENCY)
209209
.setDescription("Tiered store dispatch latency")
210-
.setUnit("seconds")
210+
.setUnit("milliseconds")
211211
.ofLongs()
212212
.buildWithCallback(measurement -> {
213213
for (FlatMessageFile flatFile : flatFileStore.deepCopyFlatFileToList()) {
@@ -261,7 +261,7 @@ public static void init(Meter meter, Supplier<AttributesBuilder> attributesBuild
261261
.ofLongs()
262262
.buildWithCallback(measurement -> {
263263
if (fetcher instanceof MessageStoreFetcherImpl) {
264-
long count = ((MessageStoreFetcherImpl) fetcher).getFetcherCache().stats().loadCount();
264+
long count = ((MessageStoreFetcherImpl) fetcher).getFetcherCache().estimatedSize();
265265
measurement.record(count, newAttributesBuilder().build());
266266
}
267267
});
@@ -272,8 +272,10 @@ public static void init(Meter meter, Supplier<AttributesBuilder> attributesBuild
272272
.ofLongs()
273273
.buildWithCallback(measurement -> {
274274
if (fetcher instanceof MessageStoreFetcherImpl) {
275-
long count = ((MessageStoreFetcherImpl) fetcher).getFetcherCache().estimatedSize();
276-
measurement.record(count, newAttributesBuilder().build());
275+
long bytes = ((MessageStoreFetcherImpl) fetcher).getFetcherCache().policy().eviction()
276+
.map(eviction -> eviction.weightedSize().orElse(0L))
277+
.orElse(0L);
278+
measurement.record(bytes, newAttributesBuilder().build());
277279
}
278280
});
279281

tieredstore/src/test/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManagerTest.java

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,26 @@
1616
*/
1717
package org.apache.rocketmq.tieredstore.metrics;
1818

19+
import com.github.benmanes.caffeine.cache.Cache;
20+
import io.opentelemetry.api.common.Attributes;
21+
import io.opentelemetry.api.metrics.DoubleGaugeBuilder;
22+
import io.opentelemetry.api.metrics.Meter;
23+
import io.opentelemetry.api.metrics.ObservableLongGauge;
24+
import io.opentelemetry.api.metrics.ObservableLongMeasurement;
1925
import io.opentelemetry.sdk.OpenTelemetrySdk;
26+
import java.nio.ByteBuffer;
27+
import java.util.Arrays;
28+
import java.util.concurrent.atomic.AtomicLong;
29+
import java.util.function.Consumer;
30+
import org.apache.rocketmq.common.message.MessageQueue;
2031
import org.apache.rocketmq.store.DefaultMessageStore;
2132
import org.apache.rocketmq.tieredstore.MessageStoreConfig;
2233
import org.apache.rocketmq.tieredstore.TieredMessageStore;
34+
import org.apache.rocketmq.tieredstore.common.SelectBufferResult;
2335
import org.apache.rocketmq.tieredstore.core.MessageStoreFetcherImpl;
2436
import org.apache.rocketmq.tieredstore.file.FlatFileStore;
2537
import org.apache.rocketmq.tieredstore.provider.PosixFileSegment;
38+
import org.junit.Assert;
2639
import org.junit.Test;
2740
import org.mockito.Mockito;
2841

@@ -52,4 +65,98 @@ public void init() {
5265
public void newAttributesBuilder() {
5366
TieredStoreMetricsManager.newAttributesBuilder();
5467
}
68+
69+
@Test
70+
public void testCacheCountMetric() {
71+
MessageStoreConfig storeConfig = new MessageStoreConfig();
72+
TieredMessageStore messageStore = Mockito.mock(TieredMessageStore.class);
73+
Mockito.when(messageStore.getStoreConfig()).thenReturn(storeConfig);
74+
Mockito.when(messageStore.getFlatFileStore()).thenReturn(Mockito.mock(FlatFileStore.class));
75+
// The fetcher will create real cache
76+
MessageStoreFetcherImpl fetcher = new MessageStoreFetcherImpl(messageStore);
77+
78+
AtomicLong capturedCacheCount = new AtomicLong(-1);
79+
Meter mockMeter = createMockMeter(TieredStoreMetricsConstant.GAUGE_CACHE_COUNT, capturedCacheCount);
80+
81+
// Prepare cache before init so the gauge callback sees a populated cache instead of an empty one.
82+
int[] bufferSizes = prepareTestCache(fetcher);
83+
84+
TieredStoreMetricsManager.init(mockMeter,
85+
null, storeConfig, fetcher,
86+
Mockito.mock(FlatFileStore.class), Mockito.mock(DefaultMessageStore.class));
87+
88+
// CacheCount gauge should report the number of cached entries.
89+
Assert.assertEquals(bufferSizes.length, capturedCacheCount.get());
90+
}
91+
92+
@Test
93+
public void testCacheBytesMetric() {
94+
MessageStoreConfig storeConfig = new MessageStoreConfig();
95+
TieredMessageStore messageStore = Mockito.mock(TieredMessageStore.class);
96+
Mockito.when(messageStore.getStoreConfig()).thenReturn(storeConfig);
97+
Mockito.when(messageStore.getFlatFileStore()).thenReturn(Mockito.mock(FlatFileStore.class));
98+
// The fetcher will create real cache
99+
MessageStoreFetcherImpl fetcher = new MessageStoreFetcherImpl(messageStore);
100+
101+
AtomicLong capturedCacheBytes = new AtomicLong(-1);
102+
Meter mockMeter = createMockMeter(TieredStoreMetricsConstant.GAUGE_CACHE_BYTES, capturedCacheBytes);
103+
104+
// Prepare cache before init so the gauge callback sees a populated cache instead of an empty one.
105+
int[] bufferSizes = prepareTestCache(fetcher);
106+
107+
TieredStoreMetricsManager.init(mockMeter,
108+
null, storeConfig, fetcher,
109+
Mockito.mock(FlatFileStore.class), Mockito.mock(DefaultMessageStore.class));
110+
111+
// CacheBytes gauge should report the sum of all cached buffer sizes.
112+
int expectedSum = Arrays.stream(bufferSizes).sum();
113+
Assert.assertEquals(expectedSum, capturedCacheBytes.get());
114+
}
115+
116+
private Meter createMockMeter(String targetMetricName, AtomicLong capturedValue) {
117+
Meter mockMeter = Mockito.mock(Meter.class, Mockito.RETURNS_DEEP_STUBS);
118+
119+
// Setup target gauge builder chain to capture the callback value
120+
DoubleGaugeBuilder targetGaugeBuilder = Mockito.mock(DoubleGaugeBuilder.class, Mockito.RETURNS_DEEP_STUBS);
121+
Mockito.when(mockMeter.gaugeBuilder(targetMetricName)).thenReturn(targetGaugeBuilder);
122+
Mockito.when(targetGaugeBuilder.setDescription(Mockito.anyString())).thenReturn(targetGaugeBuilder);
123+
Mockito.when(targetGaugeBuilder.setUnit(Mockito.anyString())).thenReturn(targetGaugeBuilder);
124+
Mockito.when(targetGaugeBuilder.ofLongs().buildWithCallback(Mockito.any(Consumer.class)))
125+
.thenAnswer(invocation -> {
126+
Consumer<ObservableLongMeasurement> callback = invocation.getArgument(0);
127+
// Immediately invoke the callback to capture the current cache state
128+
callback.accept(new ObservableLongMeasurement() {
129+
@Override
130+
public void record(long value) {
131+
capturedValue.set(value);
132+
}
133+
134+
@Override
135+
public void record(long value, Attributes attributes) {
136+
capturedValue.set(value);
137+
}
138+
});
139+
return Mockito.mock(ObservableLongGauge.class);
140+
});
141+
142+
return mockMeter;
143+
}
144+
145+
private int[] prepareTestCache(MessageStoreFetcherImpl fetcher) {
146+
Cache<String, SelectBufferResult> cache = fetcher.getFetcherCache();
147+
String topic = "TestTopic";
148+
MessageQueue mq1 = new MessageQueue(topic, "broker", 0);
149+
MessageQueue mq2 = new MessageQueue(topic, "broker", 1);
150+
151+
int[] bufferSizes = {100, 200, 150, 300};
152+
for (int i = 0; i < bufferSizes.length; i++) {
153+
SelectBufferResult result = new SelectBufferResult(
154+
ByteBuffer.allocate(bufferSizes[i]), 0L, bufferSizes[i], 0L);
155+
MessageQueue mq = i < 2 ? mq1 : mq2;
156+
String key = String.format("%s@%d@%d", mq.getTopic(), mq.getQueueId(), (i + 1) * 100L);
157+
cache.put(key, result);
158+
}
159+
return bufferSizes;
160+
}
161+
55162
}

0 commit comments

Comments
 (0)