Skip to content

Commit fbab357

Browse files
[fix][broker]pulsar_ml_reads_inflight_bytes and pulsar_ml_reads_available_inflight_bytes are 0 at the same time (#25105)
1 parent 4495525 commit fbab357

File tree

2 files changed

+17
-2
lines changed

2 files changed

+17
-2
lines changed

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,8 @@ public class InflightReadsLimiter implements AutoCloseable {
4242

4343
@PulsarDeprecatedMetric(newMetricName = INFLIGHT_READS_LIMITER_LIMIT_METRIC_NAME)
4444
@Deprecated
45-
private static final Gauge PULSAR_ML_READS_BUFFER_SIZE = Gauge
45+
@VisibleForTesting
46+
static final Gauge PULSAR_ML_READS_BUFFER_SIZE = Gauge
4647
.build()
4748
.name("pulsar_ml_reads_inflight_bytes")
4849
.help("Estimated number of bytes retained by data read from storage or cache")
@@ -55,7 +56,8 @@ public class InflightReadsLimiter implements AutoCloseable {
5556

5657
@PulsarDeprecatedMetric(newMetricName = INFLIGHT_READS_LIMITER_USAGE_METRIC_NAME)
5758
@Deprecated
58-
private static final Gauge PULSAR_ML_READS_AVAILABLE_BUFFER_SIZE = Gauge
59+
@VisibleForTesting
60+
static final Gauge PULSAR_ML_READS_AVAILABLE_BUFFER_SIZE = Gauge
5961
.build()
6062
.name("pulsar_ml_reads_available_inflight_bytes")
6163
.help("Available space for inflight data read from storage or cache")
@@ -87,6 +89,7 @@ public InflightReadsLimiter(long maxReadsInFlightSize, int maxReadsInFlightAcqui
8789
if (maxReadsInFlightSize > 0) {
8890
enabled = true;
8991
this.queuedHandles = new ArrayDeque<>();
92+
updateMetrics();
9093
} else {
9194
enabled = false;
9295
this.queuedHandles = null;

managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiterTest.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -538,6 +538,18 @@ public void testAcquireExceedingMaxReadsWhenAllPermitsAvailable() throws Excepti
538538
.isEqualTo(maxReadsInFlightSize);
539539
}
540540

541+
@Test
542+
public void testPrometheusMetrics() throws Exception {
543+
long maxReadsInFlightSize = 100;
544+
@Cleanup
545+
InflightReadsLimiter limiter = new InflightReadsLimiter(maxReadsInFlightSize, ACQUIRE_QUEUE_SIZE,
546+
ACQUIRE_TIMEOUT_MILLIS, mock(ScheduledExecutorService.class), OpenTelemetry.noop());
547+
548+
Assertions.assertThat(limiter.PULSAR_ML_READS_BUFFER_SIZE.get()).isZero();
549+
Assertions.assertThat(limiter.PULSAR_ML_READS_AVAILABLE_BUFFER_SIZE.get())
550+
.isEqualTo(maxReadsInFlightSize);
551+
}
552+
541553
private Pair<OpenTelemetrySdk, InMemoryMetricReader> buildOpenTelemetryAndReader() {
542554
var metricReader = InMemoryMetricReader.create();
543555
var openTelemetry = AutoConfiguredOpenTelemetrySdk.builder()

0 commit comments

Comments
 (0)