Skip to content

Commit ce2a0fe

Browse files
Merge pull request #97 from AxonIQ/feature/processing-latency
Add processing latency to the client
2 parents 5965062 + 56dbb94 commit ce2a0fe

File tree

8 files changed

+66
-29
lines changed

8 files changed

+66
-29
lines changed

console-framework-client-api/pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
<?xml version="1.0" encoding="UTF-8"?>
22
<!--
3-
~ Copyright (c) 2022-2024. AxonIQ B.V.
3+
~ Copyright (c) 2022-2025. AxonIQ B.V.
44
~
55
~ Licensed under the Apache License, Version 2.0 (the "License");
66
~ you may not use this file except in compliance with the License.
@@ -21,7 +21,7 @@
2121
<parent>
2222
<groupId>io.axoniq.console</groupId>
2323
<artifactId>console-framework-client-parent</artifactId>
24-
<version>1.8.2-SNAPSHOT</version>
24+
<version>1.9.0-SNAPSHOT</version>
2525
</parent>
2626
<modelVersion>4.0.0</modelVersion>
2727

console-framework-client-api/src/main/java/io/axoniq/console/framework/api/eventProcessorApi.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2022-2023. AxonIQ B.V.
2+
* Copyright (c) 2022-2025. AxonIQ B.V.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -57,6 +57,7 @@ data class SegmentStatus(
5757
val errorMessage: String?,
5858
val ingestLatency: Double?,
5959
val commitLatency: Double?,
60+
val processingLatency: Double?,
6061
val position: Long? = -1,
6162
val resetPosition: Long? = -1,
6263
)

console-framework-client-spring-boot-starter/pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
<?xml version="1.0" encoding="UTF-8"?>
22
<!--
3-
~ Copyright (c) 2022-2024. AxonIQ B.V.
3+
~ Copyright (c) 2022-2025. AxonIQ B.V.
44
~
55
~ Licensed under the Apache License, Version 2.0 (the "License");
66
~ you may not use this file except in compliance with the License.
@@ -21,7 +21,7 @@
2121
<parent>
2222
<groupId>io.axoniq.console</groupId>
2323
<artifactId>console-framework-client-parent</artifactId>
24-
<version>1.8.2-SNAPSHOT</version>
24+
<version>1.9.0-SNAPSHOT</version>
2525
</parent>
2626
<modelVersion>4.0.0</modelVersion>
2727

console-framework-client/pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
<?xml version="1.0" encoding="UTF-8"?>
22
<!--
3-
~ Copyright (c) 2022-2024. AxonIQ B.V.
3+
~ Copyright (c) 2022-2025. AxonIQ B.V.
44
~
55
~ Licensed under the Apache License, Version 2.0 (the "License");
66
~ you may not use this file except in compliance with the License.
@@ -21,7 +21,7 @@
2121
<parent>
2222
<groupId>io.axoniq.console</groupId>
2323
<artifactId>console-framework-client-parent</artifactId>
24-
<version>1.8.2-SNAPSHOT</version>
24+
<version>1.9.0-SNAPSHOT</version>
2525
</parent>
2626
<modelVersion>4.0.0</modelVersion>
2727

console-framework-client/src/main/java/io/axoniq/console/framework/eventprocessor/ProcessorReportCreator.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2022-2023. AxonIQ B.V.
2+
* Copyright (c) 2022-2025. AxonIQ B.V.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -121,6 +121,7 @@ class ProcessorReportCreator(
121121
errorMessage = this.error?.message,
122122
ingestLatency = metricsRegistry.ingestLatencyForProcessor(name, this.segment.segmentId).getValue(),
123123
commitLatency = metricsRegistry.commitLatencyForProcessor(name, this.segment.segmentId).getValue(),
124+
processingLatency = metricsRegistry.processingMessageLatencyForProcessor(name, this.segment.segmentId)?.toDouble() ?: -1.0,
124125
position = this.currentPosition?.orElse(-1) ?: -1,
125126
resetPosition = this.resetPosition?.orElse(-1) ?: -1,
126127
)

console-framework-client/src/main/java/io/axoniq/console/framework/eventprocessor/metrics/AxoniqConsoleProcessorInterceptor.kt

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2022-2024. AxonIQ B.V.
2+
* Copyright (c) 2022-2025. AxonIQ B.V.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -30,8 +30,8 @@ import java.time.Instant
3030
import java.time.temporal.ChronoUnit
3131

3232
class AxoniqConsoleProcessorInterceptor(
33-
private val processorMetricsRegistry: ProcessorMetricsRegistry,
34-
private val processorName: String,
33+
private val processorMetricsRegistry: ProcessorMetricsRegistry,
34+
private val processorName: String,
3535
) : MessageHandlerInterceptor<Message<*>> {
3636
private val logger = LoggerFactory.getLogger(this::class.java)
3737

@@ -40,31 +40,36 @@ class AxoniqConsoleProcessorInterceptor(
4040
if (uow == null || unitOfWork.message.payload is UnknownSerializedType) {
4141
return interceptorChain.proceed()
4242
}
43+
val message = unitOfWork.message
44+
if (message !is EventMessage) {
45+
return interceptorChain.proceed()
46+
}
4347
try {
4448
AxoniqConsoleSpanFactory.onTopLevelSpanIfActive {
4549
it.reportProcessorName(processorName)
4650
}
47-
val message = unitOfWork.message
48-
if (message is EventMessage) {
49-
val segment = unitOfWork.resources()["Processor[$processorName]/SegmentId"] as? Int ?: -1
50-
processorMetricsRegistry.registerIngested(
51+
val segment = unitOfWork.resources()["Processor[$processorName]/SegmentId"] as? Int ?: -1
52+
processorMetricsRegistry.registerIngested(
5153
processorName,
5254
segment,
5355
ChronoUnit.NANOS.between(message.timestamp, Instant.now())
54-
)
55-
if (unitOfWork !is BatchingUnitOfWork<*> || unitOfWork.isLastMessage) {
56-
unitOfWork.afterCommit {
57-
processorMetricsRegistry.registerCommitted(
56+
)
57+
if (unitOfWork !is BatchingUnitOfWork<*> || unitOfWork.isFirstMessage) {
58+
unitOfWork.afterCommit {
59+
processorMetricsRegistry.registerCommitted(
5860
processorName,
5961
segment,
6062
ChronoUnit.NANOS.between(message.timestamp, Instant.now())
61-
)
62-
}
63+
)
6364
}
6465
}
66+
67+
return processorMetricsRegistry.doWithActiveMessageForSegment(processorName, segment, message.timestamp) {
68+
interceptorChain.proceed()
69+
}
6570
} catch (e: Exception) {
6671
logger.debug("AxonIQ Console could not register metrics for processor $processorName", e)
72+
return interceptorChain.proceed()
6773
}
68-
return interceptorChain.proceed()
6974
}
7075
}

console-framework-client/src/main/java/io/axoniq/console/framework/eventprocessor/metrics/ProcessorMetricsRegistry.kt

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2022-2023. AxonIQ B.V.
2+
* Copyright (c) 2022-2025. AxonIQ B.V.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,14 +17,18 @@
1717
package io.axoniq.console.framework.eventprocessor.metrics
1818

1919
import io.axoniq.console.framework.computeIfAbsentWithRetry
20+
import org.axonframework.messaging.unitofwork.CurrentUnitOfWork
2021
import java.time.Clock
22+
import java.time.Instant
23+
import java.time.temporal.ChronoUnit
2124
import java.util.concurrent.ConcurrentHashMap
2225
import java.util.concurrent.atomic.AtomicLong
2326
import java.util.concurrent.atomic.AtomicReference
2427

2528
class ProcessorMetricsRegistry {
2629
private val ingestLatencyRegistry: MutableMap<String, MutableMap<Int, ExpiringLatencyValue>> = ConcurrentHashMap()
2730
private val commitLatencyRegistry: MutableMap<String, MutableMap<Int, ExpiringLatencyValue>> = ConcurrentHashMap()
31+
private val processingLatencyRegistry: MutableMap<String, MutableMap<Int, Instant?>> = ConcurrentHashMap()
2832

2933
fun registerIngested(processor: String, segment: Int, latencyInNanos: Long) {
3034
ingestLatencyForProcessor(processor, segment).setValue(latencyInNanos.toDouble() / 1000000)
@@ -34,18 +38,44 @@ class ProcessorMetricsRegistry {
3438
commitLatencyForProcessor(processor, segment).setValue(latencyInNanos.toDouble() / 1000000)
3539
}
3640

41+
fun <T> doWithActiveMessageForSegment(processor: String, segment: Int, messageTimestamp: Instant, action: () -> T?): T? {
42+
val processingMessageTimestampsForSegment = getProcessingLatencySegmentMap(processor)
43+
44+
try {
45+
processingMessageTimestampsForSegment[segment] = messageTimestamp
46+
return action()
47+
} finally {
48+
CurrentUnitOfWork.get().afterCommit {
49+
getProcessingLatencySegmentMap(processor)
50+
.remove(segment)
51+
}
52+
}
53+
}
54+
3755
fun ingestLatencyForProcessor(processor: String, segment: Int): ExpiringLatencyValue {
3856
return ingestLatencyRegistry
39-
.computeIfAbsentWithRetry(processor) { mutableMapOf() }
40-
.computeIfAbsentWithRetry(segment) { ExpiringLatencyValue() }
57+
.computeIfAbsentWithRetry(processor) { ConcurrentHashMap() }
58+
.computeIfAbsentWithRetry(segment) { ExpiringLatencyValue() }
4159
}
4260

4361
fun commitLatencyForProcessor(processor: String, segment: Int): ExpiringLatencyValue {
4462
return commitLatencyRegistry
45-
.computeIfAbsentWithRetry(processor) { mutableMapOf() }
46-
.computeIfAbsentWithRetry(segment) { ExpiringLatencyValue() }
63+
.computeIfAbsentWithRetry(processor) { ConcurrentHashMap() }
64+
.computeIfAbsentWithRetry(segment) { ExpiringLatencyValue() }
4765
}
4866

67+
fun processingMessageLatencyForProcessor(processor: String, segment: Int): Long? {
68+
val processingTimestamp = getProcessingLatencySegmentMap(processor)
69+
.computeIfAbsentWithRetry(segment) { null }
70+
if (processingTimestamp == null) {
71+
return null
72+
}
73+
return ChronoUnit.MILLIS.between(processingTimestamp, Instant.now())
74+
}
75+
76+
private fun getProcessingLatencySegmentMap(processor: String) = processingLatencyRegistry
77+
.computeIfAbsentWithRetry(processor) { ConcurrentHashMap() }
78+
4979
class ExpiringLatencyValue(
5080
private val expiryTime: Long = 30 * 60 * 1000 // Default to 1 hour
5181
) {

pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
<?xml version="1.0" encoding="UTF-8"?>
22
<!--
3-
~ Copyright (c) 2022-2024. AxonIQ B.V.
3+
~ Copyright (c) 2022-2025. AxonIQ B.V.
44
~
55
~ Licensed under the Apache License, Version 2.0 (the "License");
66
~ you may not use this file except in compliance with the License.
@@ -22,7 +22,7 @@
2222

2323
<groupId>io.axoniq.console</groupId>
2424
<artifactId>console-framework-client-parent</artifactId>
25-
<version>1.8.2-SNAPSHOT</version>
25+
<version>1.9.0-SNAPSHOT</version>
2626

2727
<modules>
2828
<module>console-framework-client-api</module>

0 commit comments

Comments
 (0)