Skip to content

Commit 26ccb0d

Browse files
committed
Update/Add Required Metrics to match latest design doc
Added - redis.connections.total - redis.connection.drops.total - redis.pubsub.operations.total - redis.operations.total - redis.operation.duration Not implemented - redis_active_connections Ref : - https://redislabs.atlassian.net/wiki/spaces/CAE/pages/5011111938/High-level+design+Lettuce-based+test+application#Required-Metrics - https://github.com/redis-developer/observability-stack/blob/main/README.md
1 parent 96c32c8 commit 26ccb0d

File tree

10 files changed

+190
-33
lines changed

10 files changed

+190
-33
lines changed

application.properties

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,12 @@ management.influx.metrics.export.bucket=BUCKET
1515
management.influx.metrics.export.token=TOKEN
1616
management.influx.metrics.export.auto-create-bucket=true
1717
management.influx.metrics.export.consistency=one
18-
management.influx.metrics.export.step=PT5S
18+
management.influx.metrics.export.step=PT1S
19+
20+
# OpenTelemetry Configuration for Micrometer (Spring Boot 3.x)
21+
management.otlp.metrics.export.enabled=false
22+
management.otlp.metrics.export.url=http://localhost:4318/v1/metrics
23+
management.otlp.metrics.export.step=PT1S
1924

2025
#logging.pattern.console=%d{yyyy-MM-dd HH:mm:ss} - %msg%n
2126
#logging.level.org.springframework=DEBUG

pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,10 @@
3333
<groupId>io.micrometer</groupId>
3434
<artifactId>micrometer-registry-influx</artifactId>
3535
</dependency>
36+
<dependency>
37+
<groupId>io.micrometer</groupId>
38+
<artifactId>micrometer-registry-otlp</artifactId>
39+
</dependency>
3640
<dependency>
3741
<groupId>org.springframework.boot</groupId>
3842
<artifactId>spring-boot-starter-test</artifactId>

src/main/java/io/lettuce/test/WorkloadRunnerBase.java

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import io.lettuce.core.cluster.event.ClusterTopologyChangedEvent;
1010
import io.lettuce.core.event.Event;
1111
import io.lettuce.core.event.EventBus;
12+
import io.lettuce.core.event.connection.DisconnectedEvent;
1213
import io.lettuce.core.event.connection.ReconnectAttemptEvent;
1314
import io.lettuce.core.event.connection.ReconnectEventHelper;
1415
import io.lettuce.core.event.connection.ReconnectFailedEvent;
@@ -24,6 +25,8 @@
2425
import io.lettuce.test.config.WorkloadRunnerConfig.WorkloadConfig;
2526
import io.lettuce.test.metrics.ConnectionKey;
2627
import io.lettuce.test.metrics.MetricsReporter;
28+
import io.lettuce.test.metrics.MetricsReporter.ReconnectAttemptKey;
29+
import io.lettuce.test.metrics.OperationStatus;
2730
import io.lettuce.test.workloads.BaseWorkload;
2831
import io.micrometer.core.instrument.MeterRegistry;
2932
import io.micrometer.core.instrument.Timer;
@@ -172,15 +175,15 @@ private BaseWorkload withErrorHandler(BaseWorkload task, C client, Conn conn, Wo
172175

173176
@Override
174177
public void run() {
175-
Timer.Sample timer = metricsReporter.startTimer();
178+
Timer.Sample timer = WorkloadRunnerBase.this.metricsReporter.startTimer();
176179
try {
177180
task.run();
178-
metricsReporter.recordWorkloadExecutionDuration(timer, config.getType(), BaseWorkload.Status.SUCCESSFUL);
181+
WorkloadRunnerBase.this.metricsReporter.recordWorkloadExecutionDuration(timer, config.getType(), BaseWorkload.Status.SUCCESSFUL);
179182
} catch (Exception e) {
180183
// Note: Use client and conn reference to track, which client and connection caused the error
181184
// could not find other means to identify the client and connection
182185
log.error("Error client: {} conn: {}", client, conn, e);
183-
metricsReporter.recordWorkloadExecutionDuration(timer, config.getType(),
186+
WorkloadRunnerBase.this.metricsReporter.recordWorkloadExecutionDuration(timer, config.getType(),
184187
BaseWorkload.Status.COMPLETED_WITH_ERRORS);
185188
throw e;
186189
}
@@ -237,14 +240,19 @@ private void subscribeToReconnectEvents(EventBus eventBus) {
237240
Flux<Event> events = eventBus.get();
238241

239242
events.subscribe(event -> {
240-
if (event instanceof ReconnectAttemptEvent) {
241-
ReconnectAttemptEvent reconnectEvent = (ReconnectAttemptEvent) event;
243+
if (event instanceof ReconnectAttemptEvent reconnectEvent) {
242244
ConnectionKey connectionKey = ReconnectEventHelper.connectionKey(reconnectEvent);
245+
ReconnectAttemptKey reconnectionKey = new ReconnectAttemptKey(connectionKey, OperationStatus.INITIATED);
243246
metricsReporter.incrementReconnectAttempt(connectionKey);
244-
} else if (event instanceof ReconnectFailedEvent) {
245-
ReconnectFailedEvent reconnectEvent = (ReconnectFailedEvent) event;
247+
metricsReporter.incrementRedisConnectionAttempt(reconnectionKey);
248+
} else if (event instanceof ReconnectFailedEvent reconnectEvent) {
246249
ConnectionKey connectionKey = ReconnectEventHelper.connectionKey(reconnectEvent);
250+
ReconnectAttemptKey reconnectionKey = new ReconnectAttemptKey(connectionKey, OperationStatus.ERROR);
247251
metricsReporter.incrementReconnectFailure(connectionKey);
252+
metricsReporter.incrementRedisConnectionAttempt(reconnectionKey);
253+
} else if (event instanceof DisconnectedEvent disconnectedEvent) {
254+
ConnectionKey connectionKey = ReconnectEventHelper.connectionKey(disconnectedEvent);
255+
metricsReporter.incrementRedisConnectionDrops(connectionKey);
248256
} else if (event instanceof ClusterTopologyChangedEvent) {
249257
ClusterTopologyChangedEvent ctcEvent = (ClusterTopologyChangedEvent) event;
250258
log.info("ClusterTopologyChangedEvent: before={}, after={}", ctcEvent.before(), ctcEvent.after());
@@ -337,7 +345,8 @@ private void applyTimeoutOptions(TimeoutOptions.Builder builder, TimeoutOptionsC
337345
timeoutsRelaxingDuringMaintenance.invoke(builder, config.getTimeoutsRelaxingDuringMaintenance());
338346
log.info("ProactiveTimeoutsRelaxingMethod enabled successfully.");
339347
} catch (NoSuchMethodException e) {
340-
throw new IllegalStateException("The method 'timeoutsRelaxingDuringMaintenance' is not available in this build.", e);
348+
throw new IllegalStateException(
349+
"The method 'timeoutsRelaxingDuringMaintenance' is not available in this build.", e);
341350
} catch (IllegalAccessException | InvocationTargetException e) {
342351
throw new RuntimeException("Failed to invoke 'timeoutsRelaxingDuringMaintenance' method.", e);
343352
}

src/main/java/io/lettuce/test/metrics/MeterRegistryConfiguration.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.lettuce.test.metrics;
22

3+
import io.lettuce.core.LettuceVersion;
34
import io.micrometer.core.instrument.MeterRegistry;
45
import io.micrometer.core.instrument.config.MeterFilter;
56
import io.micrometer.core.instrument.logging.LoggingRegistryConfig;
@@ -17,9 +18,24 @@ public class MeterRegistryConfiguration {
1718
@Value("${runId:${runner.test.workload.type}-#{T(org.apache.commons.lang3.RandomStringUtils).randomAlphanumeric(8)}}")
1819
private String runId;
1920

21+
@Value("${instanceId:${spring.application.name}-#{T(org.apache.commons.lang3.RandomStringUtils).randomAlphanumeric(8)}}")
22+
private String instanceId;
23+
24+
@Value("${appName:lettuce-test-app}")
25+
private String appName;
26+
2027
@Bean
2128
public MeterRegistryCustomizer<MeterRegistry> metricsCommonTags() {
22-
return (registry) -> registry.config().commonTags("runId", runId);
29+
return (registry) -> {
30+
registry.config().commonTags("app_name", appName).commonTags("run_id", runId).commonTags("instance_id", instanceId)
31+
.commonTags("version", LettuceVersion.getVersion());
32+
33+
// Ensure OTLP registry accepts all redis metrics
34+
if (registry.getClass().getSimpleName().contains("Otlp")) {
35+
registry.config().meterFilter(MeterFilter.acceptNameStartsWith("redis"))
36+
.meterFilter(MeterFilter.acceptNameStartsWith("lettuce"));
37+
}
38+
};
2339
}
2440

2541
@Bean
@@ -56,8 +72,7 @@ public SimpleMeterRegistry simpleMeterRegistry() {
5672

5773
// * Deny all meters by default and accept only specific meters for metrics stored in logs
5874
private void meterFilters(MeterRegistry registry) {
59-
registry.config().meterFilter(MeterFilter.acceptNameStartsWith("redis.command"))
60-
.meterFilter(MeterFilter.acceptNameStartsWith("redis.workload"))
75+
registry.config().meterFilter(MeterFilter.acceptNameStartsWith("redis"))
6176
.meterFilter(MeterFilter.acceptNameStartsWith("lettuce.connect"))
6277
.meterFilter(MeterFilter.acceptNameStartsWith("lettuce.reconnect"))
6378
.meterFilter(MeterFilter.acceptNameStartsWith("lettuce.reconnection")).meterFilter(MeterFilter.deny());

src/main/java/io/lettuce/test/metrics/MetricsProxy.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@
99
import java.lang.reflect.InvocationTargetException;
1010
import java.lang.reflect.Method;
1111

12+
import static io.lettuce.test.metrics.MetricsReporter.cmdKeyError;
13+
import static io.lettuce.test.metrics.MetricsReporter.cmdKeyOk;
14+
1215
public class MetricsProxy<T> implements InvocationHandler {
1316

1417
private static final Logger log = LoggerFactory.getLogger(MetricsProxy.class);
@@ -39,17 +42,20 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl
3942
command.whenComplete((res, ex) -> {
4043
if (ex != null) {
4144
metricsReporter.incrementCommandError(commandName);
45+
metricsReporter.recordCommandLatency(cmdKeyError(commandName, ex), sample);
4246
log.error("Command failed", ex);
47+
} else {
48+
metricsReporter.recordCommandLatency(cmdKeyOk(commandName), sample);
4349
}
44-
metricsReporter.recordCommandLatency(commandName, sample);
4550
});
4651
return result;
4752
}
4853

49-
metricsReporter.recordCommandLatency(commandName, sample);
54+
metricsReporter.recordCommandLatency(cmdKeyOk(commandName), sample);
5055
return result;
5156
} catch (InvocationTargetException ex) {
5257
metricsReporter.incrementCommandError(commandName);
58+
metricsReporter.recordCommandLatency(cmdKeyError(commandName, ex.getCause()), sample);
5359
log.error("Command failed", ex.getCause());
5460
throw ex.getCause();
5561
}

src/main/java/io/lettuce/test/metrics/MetricsReporter.java

Lines changed: 89 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,14 @@ public class MetricsReporter {
3737
@Value("${simple.metrics.dumpRate:PT5S}")
3838
private Duration dumpRate;
3939

40-
private final Map<String, Timer> commandLatencyTimers = new ConcurrentHashMap<>();
40+
private final Map<CommandKey, Timer> commandLatencyTimers = new ConcurrentHashMap<>();
4141

4242
private final Timer commandLatencyTotalTimer;
4343

4444
private final Map<String, Counter> commandErrorCounters = new ConcurrentHashMap<>();
4545

46+
private final Map<CommandKey, Counter> commandTotalCounter = new ConcurrentHashMap<>();
47+
4648
private final Counter commandErrorTotalCounter;
4749

4850
private final Timer connectionSuccessTimer;
@@ -51,6 +53,12 @@ public class MetricsReporter {
5153

5254
private final Map<ConnectionKey, Counter> reconnectAttemptCounter = new ConcurrentHashMap<>();
5355

56+
private final Map<ReconnectAttemptKey, Counter> redisConnectionsTotal = new ConcurrentHashMap<>();
57+
58+
private final Map<ConnectionKey, Counter> redisConnectionsDrops = new ConcurrentHashMap<>();
59+
60+
private final Map<PubSubOpKey, Counter> pubSubOperationCounter = new ConcurrentHashMap<>();
61+
5462
private final Counter reconnectAttemptTotalCounter;
5563

5664
private final Map<ConnectionKey, Counter> reconnectFailureCounter = new ConcurrentHashMap<>();
@@ -87,11 +95,18 @@ Timer.Sample startCommandTimer() {
8795
return Timer.start(meterRegistry);
8896
}
8997

90-
void recordCommandLatency(String commandName, Timer.Sample sample) {
91-
Timer timer = commandLatencyTimers.computeIfAbsent(commandName, this::createCommandLatencyTimer);
98+
public record CommandKey(String commandName, OperationStatus status) {
99+
100+
}
101+
102+
void recordCommandLatency(CommandKey commandKey, Timer.Sample sample) {
103+
Timer timer = commandLatencyTimers.computeIfAbsent(commandKey, this::createCommandLatencyTimer);
92104
long timeNs = sample.stop(timer);
93105

94106
commandLatencyTotalTimer.record(Duration.ofNanos(timeNs));
107+
108+
Counter counter = commandTotalCounter.computeIfAbsent(commandKey, this::createCommandTotalCounter);
109+
counter.increment();
95110
}
96111

97112
void incrementCommandError(String commandName) {
@@ -101,8 +116,8 @@ void incrementCommandError(String commandName) {
101116

102117
public void recordWorkloadExecutionDuration(Timer.Sample sample, String workloadName, BaseWorkload.Status status) {
103118
Timer workloadDuration = Timer.builder("redis.workload.execution.duration")
104-
.description("Time taken to complete a workload").tag("workload", workloadName).tag("status", status.toString())
105-
.register(meterRegistry);
119+
.description("Time taken to complete a workload").tag("workload", workloadName)
120+
.tag("status", status.name().toLowerCase()).register(meterRegistry);
106121
sample.stop(workloadDuration);
107122
}
108123

@@ -119,21 +134,50 @@ public void incrementReconnectAttempt(ConnectionKey connectionKey) {
119134
reconnectAttemptTotalCounter.increment();
120135
}
121136

137+
public void incrementRedisConnectionAttempt(ReconnectAttemptKey reconnectAttempt) {
138+
redisConnectionsTotal.computeIfAbsent(reconnectAttempt, this::createRedisConnectionsTotal).increment();
139+
}
140+
141+
public void incrementRedisConnectionDrops(ConnectionKey connectionKey) {
142+
redisConnectionsDrops.computeIfAbsent(connectionKey, this::createRedisConnectionsDrops).increment();
143+
}
144+
122145
public void incrementReconnectFailure(ConnectionKey connectionKey) {
123146
reconnectFailureCounter.computeIfAbsent(connectionKey, this::createReconnectFailedAttempCounter).increment();
124147
reconnectFailureTotalCounter.increment();
125148
}
126149

127-
private Timer createCommandLatencyTimer(String commandName) {
128-
return Timer.builder("redis.command.latency").description(
150+
public enum PubSubOperation {
151+
SUBSCRIBE, UNSUBSCRIBE, PUBLISH, RECEIVE
152+
}
153+
154+
public record PubSubOpKey(String channel, PubSubOperation operation, String subsriberId, OperationStatus status) {
155+
156+
}
157+
158+
public void incrementPubSubOperation(PubSubOpKey pubSubOpKey) {
159+
pubSubOperationCounter.computeIfAbsent(pubSubOpKey, this::createPubSubOperationCounter).increment();
160+
}
161+
162+
private Counter createPubSubOperationCounter(PubSubOpKey pubSubOpKey) {
163+
return Counter.builder("redis.pubsub.operations.total")
164+
.description("Total number of Redis pub/sub operations (publish and receive)")
165+
.tag("channel", pubSubOpKey.channel).tag("operation_type", pubSubOpKey.operation.name().toLowerCase())
166+
.tag("subscriber_id", pubSubOpKey.subsriberId).tag("status", pubSubOpKey.status.name().toLowerCase())
167+
.register(meterRegistry);
168+
}
169+
170+
private Timer createCommandLatencyTimer(CommandKey commandKey) {
171+
return Timer.builder("redis.operation.duration").description(
129172
"Measures the execution time of Redis commands from API invocation until command completion per command")
130-
.tag("command", commandName).register(meterRegistry);
173+
.tag("command", commandKey.commandName).tag("status", commandKey.status.name().toLowerCase())
174+
.publishPercentileHistogram(true).publishPercentiles(0.5, 0.95, 0.99).register(meterRegistry);
131175
}
132176

133177
private Timer createCommandLatencyTotalTimer() {
134-
return Timer.builder("redis.command.total.latency")
178+
return Timer.builder("redis.operation.total.duration")
135179
.description("Measures the execution time of Redis commands from API invocation until command completion")
136-
.register(meterRegistry);
180+
.publishPercentileHistogram(true).publishPercentiles(0.5, 0.95, 0.99).register(meterRegistry);
137181
}
138182

139183
private Counter createCommandErrorCounter(String commandName) {
@@ -142,12 +186,39 @@ private Counter createCommandErrorCounter(String commandName) {
142186
.tag("command", commandName).register(meterRegistry);
143187
}
144188

189+
private Counter createCommandTotalCounter(CommandKey commandKey) {
190+
return Counter.builder("redis.operations.total")
191+
.description("Counts the number of total Redis command API calls completed successfully or with an error")
192+
.tag("command", commandKey.commandName).tag("status", commandKey.status.name().toLowerCase())
193+
.register(meterRegistry);
194+
}
195+
145196
private Counter createCommandErrorTotalCounter() {
146197
return Counter.builder("redis.command.total.errors")
147198
.description("Counts the number of failed Redis command API calls that completed with an exception")
148199
.register(meterRegistry);
149200
}
150201

202+
public record ReconnectAttemptKey(ConnectionKey connectionKey, OperationStatus status) {
203+
}
204+
205+
private Counter createRedisConnectionsTotal(ReconnectAttemptKey reconnectAttempt) {
206+
ConnectionKey connectionKey = reconnectAttempt.connectionKey;
207+
OperationStatus status = reconnectAttempt.status;
208+
209+
return Counter.builder("redis.connections.total")
210+
.description("Counts the number of Redis reconnect attempts per connection")
211+
.tag("status", status.name().toLowerCase()).tag("epid", connectionKey.getEpId())
212+
.tag("local", connectionKey.getLocalAddress().toString())
213+
.tag("remote", connectionKey.getRemoteAddress().toString()).register(meterRegistry);
214+
}
215+
216+
private Counter createRedisConnectionsDrops(ConnectionKey connectionKey) {
217+
return Counter.builder("redis.connection.drops.total").description("Counts the number of disconnects")
218+
.tag("epid", connectionKey.getEpId()).tag("local", connectionKey.getLocalAddress().toString())
219+
.tag("remote", connectionKey.getRemoteAddress().toString()).register(meterRegistry);
220+
}
221+
151222
private Counter createReconnectAttemptCounter(ConnectionKey connectionKey) {
152223
return Counter.builder("lettuce.reconnect.attempts")
153224
.description("Counts the number of Redis reconnect attempts per connection")
@@ -214,4 +285,12 @@ public void shutdown() {
214285
}
215286
}
216287

288+
public static CommandKey cmdKeyOk(String commandName) {
289+
return new CommandKey(commandName, OperationStatus.SUCCESS);
290+
}
291+
292+
public static CommandKey cmdKeyError(String commandName, Throwable throwable) {
293+
return new CommandKey(commandName, OperationStatus.ERROR);
294+
}
295+
217296
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package io.lettuce.test.metrics;
2+
3+
public enum OperationStatus {
4+
SUCCESS, ERROR, INITIATED
5+
}

src/main/java/io/lettuce/test/workloads/BaseWorkload.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ public enum Status {
3131

3232
private static final Logger log = LoggerFactory.getLogger(BaseWorkload.class);
3333

34-
private MetricsReporter metricsReporter;
34+
protected MetricsReporter metricsReporter;
3535

3636
private final CommonWorkloadOptions options;
3737

0 commit comments

Comments
 (0)