Skip to content

Commit 286bbbb

Browse files
Fix double invocation on processor handler error
One of our clients reported an issue where, when the invocation of and event handler fails, the handler is invoked a second tme. This was due to a second `InterceptorChain` invocation in the catch block. The code has been adjusted for the catch of the metrics to be independent from the InterceptorChain call. In addition, the `ProcessorMetricsRegistry` has been made more defensive to also catch any exceptions resulting from measuring code.
1 parent b03e54f commit 286bbbb

File tree

2 files changed

+21
-14
lines changed

2 files changed

+21
-14
lines changed

console-framework-client/src/main/java/io/axoniq/console/framework/eventprocessor/metrics/AxoniqConsoleProcessorInterceptor.kt

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2022-2025. AxonIQ B.V.
2+
* Copyright (c) 2022-2026. AxonIQ B.V.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -44,11 +44,13 @@ class AxoniqConsoleProcessorInterceptor(
4444
if (message !is EventMessage) {
4545
return interceptorChain.proceed()
4646
}
47+
48+
var segment = -1
4749
try {
4850
AxoniqConsoleSpanFactory.onTopLevelSpanIfActive {
4951
it.reportProcessorName(processorName)
5052
}
51-
val segment = unitOfWork.resources()["Processor[$processorName]/SegmentId"] as? Int ?: -1
53+
segment = unitOfWork.resources()["Processor[$processorName]/SegmentId"] as? Int ?: -1
5254
val ingestTimestamp = Instant.now()
5355
processorMetricsRegistry.registerIngested(
5456
processorName,
@@ -64,13 +66,12 @@ class AxoniqConsoleProcessorInterceptor(
6466
)
6567
}
6668
}
67-
68-
return processorMetricsRegistry.doWithActiveMessageForSegment(processorName, segment, message.timestamp) {
69-
interceptorChain.proceed()
70-
}
7169
} catch (e: Exception) {
7270
logger.debug("AxonIQ Console could not register metrics for processor $processorName", e)
73-
return interceptorChain.proceed()
71+
}
72+
73+
return processorMetricsRegistry.doWithActiveMessageForSegment(processorName, segment, message.timestamp) {
74+
interceptorChain.proceed()
7475
}
7576
}
7677
}

console-framework-client/src/main/java/io/axoniq/console/framework/eventprocessor/metrics/ProcessorMetricsRegistry.kt

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2022-2025. AxonIQ B.V.
2+
* Copyright (c) 2022-2026. AxonIQ B.V.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -19,6 +19,7 @@ package io.axoniq.console.framework.eventprocessor.metrics
1919
import io.axoniq.console.framework.computeIfAbsentWithRetry
2020
import org.axonframework.messaging.unitofwork.BatchingUnitOfWork
2121
import org.axonframework.messaging.unitofwork.CurrentUnitOfWork
22+
import org.slf4j.LoggerFactory
2223
import java.time.Clock
2324
import java.time.Instant
2425
import java.time.temporal.ChronoUnit
@@ -27,6 +28,7 @@ import java.util.concurrent.atomic.AtomicLong
2728
import java.util.concurrent.atomic.AtomicReference
2829

2930
class ProcessorMetricsRegistry {
31+
private val logger = LoggerFactory.getLogger(this::class.java)
3032
private val ingestLatencyRegistry: MutableMap<String, MutableMap<Int, ExpiringLatencyValue>> = ConcurrentHashMap()
3133
private val commitLatencyRegistry: MutableMap<String, MutableMap<Int, ExpiringLatencyValue>> = ConcurrentHashMap()
3234
private val processingLatencyRegistry: MutableMap<String, MutableMap<Int, Instant?>> = ConcurrentHashMap()
@@ -40,20 +42,24 @@ class ProcessorMetricsRegistry {
4042
}
4143

4244
fun <T> doWithActiveMessageForSegment(processor: String, segment: Int, messageTimestamp: Instant, action: () -> T?): T? {
43-
val processingMessageTimestampsForSegment = getProcessingLatencySegmentMap(processor)
44-
4545
try {
46+
val processingMessageTimestampsForSegment = getProcessingLatencySegmentMap(processor)
4647
processingMessageTimestampsForSegment[segment] = messageTimestamp
47-
return action()
48-
} finally {
48+
} catch (e: Exception) {
49+
logger.debug("AxonIQ Console could not track active message for processor $processor segment $segment", e)
50+
}
51+
try {
4952
val uow = CurrentUnitOfWork.get()
50-
if(uow !is BatchingUnitOfWork || uow.isFirstMessage) {
53+
if (uow !is BatchingUnitOfWork || uow.isFirstMessage) {
5154
uow.onCleanup {
5255
getProcessingLatencySegmentMap(processor)
5356
.remove(segment)
5457
}
5558
}
59+
} catch (e: Exception) {
60+
logger.debug("AxonIQ Console could not register cleanup for processor $processor segment $segment", e)
5661
}
62+
return action()
5763
}
5864

5965
fun ingestLatencyForProcessor(processor: String, segment: Int): ExpiringLatencyValue {
@@ -81,7 +87,7 @@ class ProcessorMetricsRegistry {
8187
.computeIfAbsentWithRetry(processor) { ConcurrentHashMap() }
8288

8389
class ExpiringLatencyValue(
84-
private val expiryTime: Long = 2 * 60 * 1000 // Default to 2 minutes
90+
private val expiryTime: Long = 2 * 60 * 1000 // Default to 2 minutes
8591
) {
8692
private val clock = Clock.systemUTC()
8793
private val value: AtomicReference<Double> = AtomicReference(-1.0)

0 commit comments

Comments
 (0)