Skip to content

Commit 61b32c5

Browse files
Port fixes for connections (#111) to Axon Framework 5 (main)
1 parent fe0de35 commit 61b32c5

File tree

4 files changed

+52
-12
lines changed

4 files changed

+52
-12
lines changed

framework-client-messaging/src/main/java/io/axoniq/platform/framework/application/ApplicationMetricReporter.kt

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2022-2025. AxonIQ B.V.
2+
* Copyright (c) 2022-2026. AxonIQ B.V.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -52,7 +52,15 @@ class ApplicationMetricReporter(
5252
}
5353

5454
private fun report() {
55-
client.sendReport(Routes.Application.REPORT, reportCreator.createReport()).block()
55+
if (!client.isConnected()) {
56+
return
57+
}
58+
client.sendReport(Routes.Application.REPORT, reportCreator.createReport())
59+
.doOnError { e ->
60+
logger.debug { "Failed to send application report: ${e.message}" }
61+
}
62+
.onErrorComplete()
63+
.subscribe()
5664
}
5765

5866
override fun onDisconnected() {

framework-client-messaging/src/main/java/io/axoniq/platform/framework/client/AxoniqConsoleRSocketClient.kt

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2022-2025. AxonIQ B.V.
2+
* Copyright (c) 2022-2026. AxonIQ B.V.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -41,6 +41,8 @@ import java.time.temporal.ChronoUnit
4141
import java.util.concurrent.ScheduledExecutorService
4242
import java.util.concurrent.ScheduledFuture
4343
import java.util.concurrent.TimeUnit
44+
import java.util.concurrent.locks.ReentrantLock
45+
import kotlin.concurrent.withLock
4446
import kotlin.math.pow
4547

4648
/**
@@ -76,9 +78,12 @@ class AxoniqConsoleRSocketClient(
7678
private var maintenanceTask: ScheduledFuture<*>? = null
7779
private val logger = LoggerFactory.getLogger(this::class.java)
7880

81+
private val connectionLock = ReentrantLock()
82+
@Volatile
7983
private var rsocket: RSocket? = null
8084
private var lastConnectionTry = Instant.EPOCH
8185
private var connectionRetryCount = 0
86+
@Volatile
8287
private var pausedReports = false
8388
private var supressConnectMessage = false
8489

@@ -110,7 +115,7 @@ class AxoniqConsoleRSocketClient(
110115
* Sends a report to Axoniq Platform. If reports are paused, does nothing silently.
111116
*/
112117
fun sendReport(route: String, payload: Any): Mono<Unit> {
113-
if(pausedReports) {
118+
if (pausedReports) {
114119
return Mono.empty()
115120
}
116121
return sendMessage(payload, route)
@@ -236,10 +241,20 @@ class AxoniqConsoleRSocketClient(
236241

237242
fun isConnected() = rsocket != null
238243

244+
/**
245+
* Disposes the current RSocket connection in a thread-safe manner.
246+
* This method can be called from multiple threads (e.g., TCP disconnect callback,
247+
* heartbeat checker), but will only perform the disposal once per connection.
248+
*/
239249
fun disposeCurrentConnection() {
240-
rsocket?.dispose()
241-
rsocket = null
242-
clientSettingsService.clearSettings()
250+
connectionLock.withLock {
251+
val currentRSocket = rsocket
252+
if (currentRSocket != null) {
253+
rsocket = null
254+
currentRSocket.dispose()
255+
clientSettingsService.clearSettings()
256+
}
257+
}
243258
}
244259

245260
fun disposeClient() {

framework-client-messaging/src/main/java/io/axoniq/platform/framework/client/ServerProcessorReporter.kt

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2022-2025. AxonIQ B.V.
2+
* Copyright (c) 2022-2026. AxonIQ B.V.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -50,7 +50,15 @@ class ServerProcessorReporter(
5050
}
5151

5252
private fun report() {
53-
client.sendReport(Routes.EventProcessor.REPORT, processorReportCreator.createReport()).block()
53+
if (!client.isConnected()) {
54+
return
55+
}
56+
client.sendReport(Routes.EventProcessor.REPORT, processorReportCreator.createReport())
57+
.doOnError { e ->
58+
logger.debug { "Failed to send processor report: ${e.message}" }
59+
}
60+
.onErrorComplete()
61+
.subscribe()
5462
}
5563

5664
override fun onDisconnected() {

framework-client-messaging/src/main/java/io/axoniq/platform/framework/messaging/HandlerMetricsRegistry.kt

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2022-2025. AxonIQ B.V.
2+
* Copyright (c) 2022-2026. AxonIQ B.V.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -68,12 +68,21 @@ class HandlerMetricsRegistry(
6868
override fun onConnectedWithSettings(settings: ClientSettingsV2) {
6969
logger.debug { "Sending handler information every ${settings.handlerReportInterval}ms to Axoniq Platform" }
7070
this.reportTask = executor.scheduleAtFixedRate({
71+
if (!axoniqConsoleRSocketClient.isConnected()) {
72+
return@scheduleAtFixedRate
73+
}
7174
try {
7275
val stats = getStats()
73-
logger.debug { "${"Sending metrics: {}"} $stats" }
76+
logger.debug { "Sending metrics: $stats" }
77+
axoniqConsoleRSocketClient.sendReport(Routes.MessageFlow.STATS, stats)
78+
.doOnError { e ->
79+
logger.debug { "Failed to send handler metrics: ${e.message}" }
80+
}
81+
.onErrorComplete()
82+
.subscribe()
7483
axoniqConsoleRSocketClient.sendReport(Routes.MessageFlow.STATS, stats).block()
7584
} catch (e: Exception) {
76-
logger.warn { "${"No metrics could be reported to Axoniq Platform: {}"} ${e.message}" }
85+
logger.warn { "No metrics could be reported to AxonIQ Console: ${e.message}" }
7786
}
7887
}, 0, settings.handlerReportInterval, TimeUnit.MILLISECONDS)
7988
}

0 commit comments

Comments
 (0)