Skip to content

Commit e99ed9d

Browse files
Merge pull request #121 from AxonIQ/fix/double-handler-invocation
Fix double invocation on processor handler error
2 parents b03e54f + 286bbbb commit e99ed9d

File tree

2 files changed

+21
-14
lines changed

2 files changed

+21
-14
lines changed

console-framework-client/src/main/java/io/axoniq/console/framework/eventprocessor/metrics/AxoniqConsoleProcessorInterceptor.kt

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2022-2025. AxonIQ B.V.
2+
* Copyright (c) 2022-2026. AxonIQ B.V.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -44,11 +44,13 @@ class AxoniqConsoleProcessorInterceptor(
4444
if (message !is EventMessage) {
4545
return interceptorChain.proceed()
4646
}
47+
48+
var segment = -1
4749
try {
4850
AxoniqConsoleSpanFactory.onTopLevelSpanIfActive {
4951
it.reportProcessorName(processorName)
5052
}
51-
val segment = unitOfWork.resources()["Processor[$processorName]/SegmentId"] as? Int ?: -1
53+
segment = unitOfWork.resources()["Processor[$processorName]/SegmentId"] as? Int ?: -1
5254
val ingestTimestamp = Instant.now()
5355
processorMetricsRegistry.registerIngested(
5456
processorName,
@@ -64,13 +66,12 @@ class AxoniqConsoleProcessorInterceptor(
6466
)
6567
}
6668
}
67-
68-
return processorMetricsRegistry.doWithActiveMessageForSegment(processorName, segment, message.timestamp) {
69-
interceptorChain.proceed()
70-
}
7169
} catch (e: Exception) {
7270
logger.debug("AxonIQ Console could not register metrics for processor $processorName", e)
73-
return interceptorChain.proceed()
71+
}
72+
73+
return processorMetricsRegistry.doWithActiveMessageForSegment(processorName, segment, message.timestamp) {
74+
interceptorChain.proceed()
7475
}
7576
}
7677
}

console-framework-client/src/main/java/io/axoniq/console/framework/eventprocessor/metrics/ProcessorMetricsRegistry.kt

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2022-2025. AxonIQ B.V.
2+
* Copyright (c) 2022-2026. AxonIQ B.V.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -19,6 +19,7 @@ package io.axoniq.console.framework.eventprocessor.metrics
1919
import io.axoniq.console.framework.computeIfAbsentWithRetry
2020
import org.axonframework.messaging.unitofwork.BatchingUnitOfWork
2121
import org.axonframework.messaging.unitofwork.CurrentUnitOfWork
22+
import org.slf4j.LoggerFactory
2223
import java.time.Clock
2324
import java.time.Instant
2425
import java.time.temporal.ChronoUnit
@@ -27,6 +28,7 @@ import java.util.concurrent.atomic.AtomicLong
2728
import java.util.concurrent.atomic.AtomicReference
2829

2930
class ProcessorMetricsRegistry {
31+
private val logger = LoggerFactory.getLogger(this::class.java)
3032
private val ingestLatencyRegistry: MutableMap<String, MutableMap<Int, ExpiringLatencyValue>> = ConcurrentHashMap()
3133
private val commitLatencyRegistry: MutableMap<String, MutableMap<Int, ExpiringLatencyValue>> = ConcurrentHashMap()
3234
private val processingLatencyRegistry: MutableMap<String, MutableMap<Int, Instant?>> = ConcurrentHashMap()
@@ -40,20 +42,24 @@ class ProcessorMetricsRegistry {
4042
}
4143

4244
fun <T> doWithActiveMessageForSegment(processor: String, segment: Int, messageTimestamp: Instant, action: () -> T?): T? {
43-
val processingMessageTimestampsForSegment = getProcessingLatencySegmentMap(processor)
44-
4545
try {
46+
val processingMessageTimestampsForSegment = getProcessingLatencySegmentMap(processor)
4647
processingMessageTimestampsForSegment[segment] = messageTimestamp
47-
return action()
48-
} finally {
48+
} catch (e: Exception) {
49+
logger.debug("AxonIQ Console could not track active message for processor $processor segment $segment", e)
50+
}
51+
try {
4952
val uow = CurrentUnitOfWork.get()
50-
if(uow !is BatchingUnitOfWork || uow.isFirstMessage) {
53+
if (uow !is BatchingUnitOfWork || uow.isFirstMessage) {
5154
uow.onCleanup {
5255
getProcessingLatencySegmentMap(processor)
5356
.remove(segment)
5457
}
5558
}
59+
} catch (e: Exception) {
60+
logger.debug("AxonIQ Console could not register cleanup for processor $processor segment $segment", e)
5661
}
62+
return action()
5763
}
5864

5965
fun ingestLatencyForProcessor(processor: String, segment: Int): ExpiringLatencyValue {
@@ -81,7 +87,7 @@ class ProcessorMetricsRegistry {
8187
.computeIfAbsentWithRetry(processor) { ConcurrentHashMap() }
8288

8389
class ExpiringLatencyValue(
84-
private val expiryTime: Long = 2 * 60 * 1000 // Default to 2 minutes
90+
private val expiryTime: Long = 2 * 60 * 1000 // Default to 2 minutes
8591
) {
8692
private val clock = Clock.systemUTC()
8793
private val value: AtomicReference<Double> = AtomicReference(-1.0)

0 commit comments

Comments
 (0)