Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2023 Sonu Kumar
* Copyright (c) 2019-2025 Sonu Kumar
*
* Licensed under the Apache License, Version 2.0 (the "License");
* You may not use this file except in compliance with the License.
Expand All @@ -18,11 +18,14 @@

import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Tags;

import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Map.Entry;

import lombok.Getter;
import lombok.Setter;
import org.apache.commons.lang3.StringUtils;

/**
* RqueueMetrics provides all possible configurations available in Rqueue library for metrics.
Expand All @@ -46,6 +49,11 @@ public abstract class MetricsProperties {

private Tags metricTags = Tags.empty();

/*
* Prefix to be used while publishing metrics.
*/
private String prefix = "";

/**
* Get Tags object that can be used in metric. Tags can be either configured manually or using
* properties or XML file.
Expand Down Expand Up @@ -74,6 +82,13 @@ public boolean countFailure() {
return count.isFailure();
}

public String getMetricName(String name) {
if (StringUtils.isEmpty(prefix)) {
return name;
}
return prefix + name;
}

@Getter
@Setter
public static class Count {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2023 Sonu Kumar
* Copyright (c) 2019-2025 Sonu Kumar
*
* Licensed under the Apache License, Version 2.0 (the "License");
* You may not use this file except in compliance with the License.
Expand Down Expand Up @@ -60,15 +60,15 @@ void registerQueue(
QueueDetail queueDetail) {
if (metricsProperties.countFailure()) {
Counter.Builder builder =
Counter.builder(FAILURE_COUNT)
Counter.builder(metricsProperties.getMetricName(FAILURE_COUNT))
.tags(queueTags.and(QUEUE_KEY, queueDetail.getQueueName()))
.description("Failure count");
Counter counter = builder.register(registry);
queueNameToFailureCounter.put(queueDetail.getName(), counter);
}
if (metricsProperties.countExecution()) {
Counter.Builder builder =
Counter.builder(EXECUTION_COUNT)
Counter.builder(metricsProperties.getMetricName(EXECUTION_COUNT))
.tags(queueTags.and(QUEUE_KEY, queueDetail.getQueueName()))
.description("Task execution count");
Counter counter = builder.register(registry);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,32 +67,25 @@ private long size(String name, boolean isZset) {

private void monitor() {
for (QueueDetail queueDetail : EndpointRegistry.getActiveQueueDetails()) {
Tags queueTags =
Tags.concat(metricsProperties.getMetricTags(), "queue", queueDetail.getName());
Gauge.builder(QUEUE_SIZE, queueDetail, c -> size(queueDetail.getQueueName(), false))
Tags queueTags = Tags.concat(metricsProperties.getMetricTags(), "queue",
queueDetail.getName());
Gauge.builder(metricsProperties.getMetricName(QUEUE_SIZE), queueDetail,
c -> size(queueDetail.getQueueName(), false))
.tags(queueTags.and(QUEUE_KEY, queueDetail.getQueueName()))
.description("The number of entries in this queue")
.register(meterRegistry);
Gauge.builder(
PROCESSING_QUEUE_SIZE,
queueDetail,
.description("The number of entries in this queue").register(meterRegistry);
Gauge.builder(metricsProperties.getMetricName(PROCESSING_QUEUE_SIZE), queueDetail,
c -> size(queueDetail.getProcessingQueueName(), true))
.tags(queueTags.and(QUEUE_KEY, queueDetail.getProcessingQueueName()))
.description("The number of entries in the processing queue")
.register(meterRegistry);
Gauge.builder(
SCHEDULED_QUEUE_SIZE,
queueDetail,
.description("The number of entries in the processing queue").register(meterRegistry);
Gauge.builder(metricsProperties.getMetricName(SCHEDULED_QUEUE_SIZE), queueDetail,
c -> size(queueDetail.getScheduledQueueName(), true))
.tags(queueTags.and(QUEUE_KEY, queueDetail.getScheduledQueueName()))
.description("The number of entries waiting in the scheduled queue")
.register(meterRegistry);
if (queueDetail.isDlqSet()) {
Builder<QueueDetail> builder =
Gauge.builder(
DEAD_LETTER_QUEUE_SIZE,
queueDetail,
c -> size(queueDetail.getDeadLetterQueueName(), false));
Builder<QueueDetail> builder = Gauge.builder(
metricsProperties.getMetricName(DEAD_LETTER_QUEUE_SIZE), queueDetail,
c -> size(queueDetail.getDeadLetterQueueName(), false));
builder.tags(queueTags);
builder.description("The number of entries in the dead letter queue");
builder.register(meterRegistry);
Expand Down
2 changes: 1 addition & 1 deletion rqueue-spring-boot-reactive-example/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ plugins {
id "war"
}
dependencies {
implementation project(":rqueue-spring-boot-starter")
implementation "com.github.sonus21:rqueue-spring-boot-starter:3.3.0-RELEASE"
implementation "org.springframework.boot:spring-boot-starter-data-redis-reactive"
implementation "org.springframework.boot:spring-boot-starter-webflux"
implementation "io.lettuce:lettuce-core"
Expand Down