Skip to content

Commit 259ca37

Browse files
Merge pull request #103 from AxonIQ/fix/latency-correctness
Correct latency metrics in AxonIQ Console
2 parents 24bd71e + 178402c commit 259ca37

File tree

2 files changed

+5
-4
lines changed

2 files changed

+5
-4
lines changed

console-framework-client/src/main/java/io/axoniq/console/framework/eventprocessor/metrics/AxoniqConsoleProcessorInterceptor.kt

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,17 +49,18 @@ class AxoniqConsoleProcessorInterceptor(
4949
it.reportProcessorName(processorName)
5050
}
5151
val segment = unitOfWork.resources()["Processor[$processorName]/SegmentId"] as? Int ?: -1
52+
val ingestTimestamp = Instant.now()
5253
processorMetricsRegistry.registerIngested(
5354
processorName,
5455
segment,
55-
ChronoUnit.NANOS.between(message.timestamp, Instant.now())
56+
ChronoUnit.NANOS.between(message.timestamp, ingestTimestamp)
5657
)
5758
if (unitOfWork !is BatchingUnitOfWork<*> || unitOfWork.isFirstMessage) {
5859
unitOfWork.afterCommit {
5960
processorMetricsRegistry.registerCommitted(
6061
processorName,
6162
segment,
62-
ChronoUnit.NANOS.between(message.timestamp, Instant.now())
63+
ChronoUnit.NANOS.between(ingestTimestamp, Instant.now())
6364
)
6465
}
6566
}

console-framework-client/src/main/java/io/axoniq/console/framework/eventprocessor/metrics/ProcessorMetricsRegistry.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ class ProcessorMetricsRegistry {
4848
} finally {
4949
val uow = CurrentUnitOfWork.get()
5050
if(uow !is BatchingUnitOfWork || uow.isFirstMessage) {
51-
uow.afterCommit {
51+
uow.onCleanup {
5252
getProcessingLatencySegmentMap(processor)
5353
.remove(segment)
5454
}
@@ -81,7 +81,7 @@ class ProcessorMetricsRegistry {
8181
.computeIfAbsentWithRetry(processor) { ConcurrentHashMap() }
8282

8383
class ExpiringLatencyValue(
84-
private val expiryTime: Long = 30 * 60 * 1000 // Default to 1 hour
84+
private val expiryTime: Long = 2 * 60 * 1000 // Default to 2 minutes
8585
) {
8686
private val clock = Clock.systemUTC()
8787
private val value: AtomicReference<Double> = AtomicReference(-1.0)

0 commit comments

Comments
 (0)