diff --git a/tieredstore/README.md b/tieredstore/README.md index 6b5ecc8c8d4..1532fc3b5fd 100644 --- a/tieredstore/README.md +++ b/tieredstore/README.md @@ -45,12 +45,12 @@ Tiered storage provides some useful metrics, see [RIP-46](https://github.com/apa | Histogram | rocketmq_tiered_store_provider_upload_bytes | byte | | Histogram | rocketmq_tiered_store_provider_download_bytes | byte | | Gauge | rocketmq_tiered_store_dispatch_behind | | -| Gauge | rocketmq_tiered_store_dispatch_latency | byte | +| Gauge | rocketmq_tiered_store_dispatch_latency | milliseconds | | Counter | rocketmq_tiered_store_messages_dispatch_total | | | Counter | rocketmq_tiered_store_messages_out_total | | | Counter | rocketmq_tiered_store_get_message_fallback_total | | | Gauge | rocketmq_tiered_store_read_ahead_cache_count | | -| Gauge | rocketmq_tiered_store_read_ahead_cache_bytes | byte | +| Gauge | rocketmq_tiered_store_read_ahead_cache_bytes | bytes | | Counter | rocketmq_tiered_store_read_ahead_cache_access_total | | | Counter | rocketmq_tiered_store_read_ahead_cache_hit_total | | | Gauge | rocketmq_storage_message_reserve_time | milliseconds | diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java index 4d083284834..e0ebff08cb0 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java @@ -207,7 +207,7 @@ public static void init(Meter meter, Supplier attributesBuild dispatchLatency = meter.gaugeBuilder(GAUGE_DISPATCH_LATENCY) .setDescription("Tiered store dispatch latency") - .setUnit("seconds") + .setUnit("milliseconds") .ofLongs() .buildWithCallback(measurement -> { for (FlatMessageFile flatFile : flatFileStore.deepCopyFlatFileToList()) { @@ -261,7 +261,7 @@ public static void init(Meter meter, Supplier attributesBuild .ofLongs() .buildWithCallback(measurement -> { if (fetcher instanceof MessageStoreFetcherImpl) { - long count = ((MessageStoreFetcherImpl) fetcher).getFetcherCache().stats().loadCount(); + long count = ((MessageStoreFetcherImpl) fetcher).getFetcherCache().estimatedSize(); measurement.record(count, newAttributesBuilder().build()); } }); @@ -272,8 +272,10 @@ public static void init(Meter meter, Supplier attributesBuild .ofLongs() .buildWithCallback(measurement -> { if (fetcher instanceof MessageStoreFetcherImpl) { - long count = ((MessageStoreFetcherImpl) fetcher).getFetcherCache().estimatedSize(); - measurement.record(count, newAttributesBuilder().build()); + long bytes = ((MessageStoreFetcherImpl) fetcher).getFetcherCache().policy().eviction() + .map(eviction -> eviction.weightedSize().orElse(0L)) + .orElse(0L); + measurement.record(bytes, newAttributesBuilder().build()); } }); diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManagerTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManagerTest.java index cc4d9e2c68b..04341389610 100644 --- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManagerTest.java +++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManagerTest.java @@ -16,13 +16,26 @@ */ package org.apache.rocketmq.tieredstore.metrics; +import com.github.benmanes.caffeine.cache.Cache; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.DoubleGaugeBuilder; +import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.api.metrics.ObservableLongGauge; +import io.opentelemetry.api.metrics.ObservableLongMeasurement; import io.opentelemetry.sdk.OpenTelemetrySdk; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; +import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.store.DefaultMessageStore; import org.apache.rocketmq.tieredstore.MessageStoreConfig; import org.apache.rocketmq.tieredstore.TieredMessageStore; +import org.apache.rocketmq.tieredstore.common.SelectBufferResult; import org.apache.rocketmq.tieredstore.core.MessageStoreFetcherImpl; import org.apache.rocketmq.tieredstore.file.FlatFileStore; import org.apache.rocketmq.tieredstore.provider.PosixFileSegment; +import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; @@ -52,4 +65,98 @@ public void init() { public void newAttributesBuilder() { TieredStoreMetricsManager.newAttributesBuilder(); } + + @Test + public void testCacheCountMetric() { + MessageStoreConfig storeConfig = new MessageStoreConfig(); + TieredMessageStore messageStore = Mockito.mock(TieredMessageStore.class); + Mockito.when(messageStore.getStoreConfig()).thenReturn(storeConfig); + Mockito.when(messageStore.getFlatFileStore()).thenReturn(Mockito.mock(FlatFileStore.class)); + // The fetcher will create real cache + MessageStoreFetcherImpl fetcher = new MessageStoreFetcherImpl(messageStore); + + AtomicLong capturedCacheCount = new AtomicLong(-1); + Meter mockMeter = createMockMeter(TieredStoreMetricsConstant.GAUGE_CACHE_COUNT, capturedCacheCount); + + // Prepare cache before init so the gauge callback sees a populated cache instead of an empty one. + int[] bufferSizes = prepareTestCache(fetcher); + + TieredStoreMetricsManager.init(mockMeter, + null, storeConfig, fetcher, + Mockito.mock(FlatFileStore.class), Mockito.mock(DefaultMessageStore.class)); + + // CacheCount gauge should report the number of cached entries. + Assert.assertEquals(bufferSizes.length, capturedCacheCount.get()); + } + + @Test + public void testCacheBytesMetric() { + MessageStoreConfig storeConfig = new MessageStoreConfig(); + TieredMessageStore messageStore = Mockito.mock(TieredMessageStore.class); + Mockito.when(messageStore.getStoreConfig()).thenReturn(storeConfig); + Mockito.when(messageStore.getFlatFileStore()).thenReturn(Mockito.mock(FlatFileStore.class)); + // The fetcher will create real cache + MessageStoreFetcherImpl fetcher = new MessageStoreFetcherImpl(messageStore); + + AtomicLong capturedCacheBytes = new AtomicLong(-1); + Meter mockMeter = createMockMeter(TieredStoreMetricsConstant.GAUGE_CACHE_BYTES, capturedCacheBytes); + + // Prepare cache before init so the gauge callback sees a populated cache instead of an empty one. + int[] bufferSizes = prepareTestCache(fetcher); + + TieredStoreMetricsManager.init(mockMeter, + null, storeConfig, fetcher, + Mockito.mock(FlatFileStore.class), Mockito.mock(DefaultMessageStore.class)); + + // CacheBytes gauge should report the sum of all cached buffer sizes. + int expectedSum = Arrays.stream(bufferSizes).sum(); + Assert.assertEquals(expectedSum, capturedCacheBytes.get()); + } + + private Meter createMockMeter(String targetMetricName, AtomicLong capturedValue) { + Meter mockMeter = Mockito.mock(Meter.class, Mockito.RETURNS_DEEP_STUBS); + + // Setup target gauge builder chain to capture the callback value + DoubleGaugeBuilder targetGaugeBuilder = Mockito.mock(DoubleGaugeBuilder.class, Mockito.RETURNS_DEEP_STUBS); + Mockito.when(mockMeter.gaugeBuilder(targetMetricName)).thenReturn(targetGaugeBuilder); + Mockito.when(targetGaugeBuilder.setDescription(Mockito.anyString())).thenReturn(targetGaugeBuilder); + Mockito.when(targetGaugeBuilder.setUnit(Mockito.anyString())).thenReturn(targetGaugeBuilder); + Mockito.when(targetGaugeBuilder.ofLongs().buildWithCallback(Mockito.any(Consumer.class))) + .thenAnswer(invocation -> { + Consumer callback = invocation.getArgument(0); + // Immediately invoke the callback to capture the current cache state + callback.accept(new ObservableLongMeasurement() { + @Override + public void record(long value) { + capturedValue.set(value); + } + + @Override + public void record(long value, Attributes attributes) { + capturedValue.set(value); + } + }); + return Mockito.mock(ObservableLongGauge.class); + }); + + return mockMeter; + } + + private int[] prepareTestCache(MessageStoreFetcherImpl fetcher) { + Cache cache = fetcher.getFetcherCache(); + String topic = "TestTopic"; + MessageQueue mq1 = new MessageQueue(topic, "broker", 0); + MessageQueue mq2 = new MessageQueue(topic, "broker", 1); + + int[] bufferSizes = {100, 200, 150, 300}; + for (int i = 0; i < bufferSizes.length; i++) { + SelectBufferResult result = new SelectBufferResult( + ByteBuffer.allocate(bufferSizes[i]), 0L, bufferSizes[i], 0L); + MessageQueue mq = i < 2 ? mq1 : mq2; + String key = String.format("%s@%d@%d", mq.getTopic(), mq.getQueueId(), (i + 1) * 100L); + cache.put(key, result); + } + return bufferSizes; + } + }