diff --git a/framework-client-api/src/main/java/io/axoniq/platform/framework/api/ClientStatus.kt b/framework-client-api/src/main/java/io/axoniq/platform/framework/api/ClientStatus.kt new file mode 100644 index 0000000..8033e20 --- /dev/null +++ b/framework-client-api/src/main/java/io/axoniq/platform/framework/api/ClientStatus.kt @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2022-2026. AxonIQ B.V. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.axoniq.platform.framework.api + + +/** + * Represents the status of the client. + * + * @param enabled whether the client is enabled to process reports and retrieve a license. + */ +enum class ClientStatus(val enabled: Boolean) { + /** + * The connection has just been established, but has not yet been validated by the server to be able to send reports + * nor have a license. This is the default status of a connection, and will be changed to one of the other statuses after validation. + */ + PENDING(false), + + /** + * The connection has been accepted and this connection is part of the provisioned connections. + * This means that the connection is able to process reports and retrieve a license, and does not use credits. + */ + PROVISIONED(true), + + /** + * The connection has been accepted, but the pool of provisioned connections is full. + * This means that the connection is able to process reports and retrieve a license, but is using credits to do so. + * Upon depletion of credits, the connection will be BLOCKED. + * Can be changed to PROVISIONED when a provisioned connection is available. + */ + USING_CREDITS(true), + + /** + * The connection has been blocked, and is not able to process reports. + * Once a provisioned connection opens up, or credits are available, the connection will be unblocked. + */ + BLOCKED(false) +} \ No newline at end of file diff --git a/framework-client-api/src/main/java/io/axoniq/platform/framework/api/ClientStatusUpdate.kt b/framework-client-api/src/main/java/io/axoniq/platform/framework/api/ClientStatusUpdate.kt new file mode 100644 index 0000000..2badb26 --- /dev/null +++ b/framework-client-api/src/main/java/io/axoniq/platform/framework/api/ClientStatusUpdate.kt @@ -0,0 +1,21 @@ +/* + * Copyright (c) 2022-2026. AxonIQ B.V. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.axoniq.platform.framework.api + +data class ClientStatusUpdate( + val newStatus: ClientStatus, +) diff --git a/framework-client-api/src/main/java/io/axoniq/platform/framework/api/Routes.kt b/framework-client-api/src/main/java/io/axoniq/platform/framework/api/Routes.kt index 4093d5e..558359c 100644 --- a/framework-client-api/src/main/java/io/axoniq/platform/framework/api/Routes.kt +++ b/framework-client-api/src/main/java/io/axoniq/platform/framework/api/Routes.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022-2025. AxonIQ B.V. + * Copyright (c) 2022-2026. AxonIQ B.V. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -32,6 +32,8 @@ object Routes { const val START_REPORTS = "client-reporting-start" // Request to send a thread dump const val THREAD_DUMP = "client-thread-dump" + // Update pushed to the client whenever its status changes + const val STATUS = "client-status" } object EventProcessor { @@ -76,4 +78,8 @@ object Routes { object MessageFlow { const val STATS = "message-flow-stats" } + + object License { + const val LICENSE = "license" + } } diff --git a/framework-client-api/src/main/java/io/axoniq/platform/framework/api/clientIdentification.kt b/framework-client-api/src/main/java/io/axoniq/platform/framework/api/clientIdentification.kt index 95376ab..b891401 100644 --- a/framework-client-api/src/main/java/io/axoniq/platform/framework/api/clientIdentification.kt +++ b/framework-client-api/src/main/java/io/axoniq/platform/framework/api/clientIdentification.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022-2025. AxonIQ B.V. + * Copyright (c) 2022-2026. AxonIQ B.V. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -28,7 +28,7 @@ import java.net.URLEncoder * The token looks like the following: * @code Bearer WORK_SPACE_ID:ENVIRONMENT_ID:COMPONENT_NAME:NODE_ID:ACCESS_TOKEN} */ -data class ConsoleClientAuthentication( +data class PlatformClientAuthentication( val identification: ConsoleClientIdentifier, val accessToken: String, ) { @@ -45,13 +45,13 @@ data class ConsoleClientAuthentication( private const val BEARER_PREFIX: String = "Bearer " private const val TOKEN_ERROR: String = "Not a valid Bearer token!" - fun fromToken(token: String): ConsoleClientAuthentication { + fun fromToken(token: String): PlatformClientAuthentication { assert(token.startsWith(BEARER_PREFIX)) { TOKEN_ERROR } val tokenParts = token.removePrefix(BEARER_PREFIX).split(":") if (tokenParts.size == 5) { val (_, environmentId, applicationName, nodeName, accessToken) = tokenParts - return ConsoleClientAuthentication( + return PlatformClientAuthentication( ConsoleClientIdentifier( environmentId = environmentId, applicationName = applicationName.decode(), @@ -63,7 +63,7 @@ data class ConsoleClientAuthentication( assert(tokenParts.size == 4) { TOKEN_ERROR } val (environmentId, applicationName, nodeName, accessToken) = tokenParts - return ConsoleClientAuthentication( + return PlatformClientAuthentication( ConsoleClientIdentifier( environmentId = environmentId, applicationName = applicationName.decode(), @@ -99,6 +99,7 @@ data class SupportedFeatures( val heartbeat: Boolean? = false, /* Whether the client supports direct logging.*/ val logDirect: Boolean? = false, + @Deprecated("Was never used, accidentally, will be removed in 2.1.0") /* Whether the client supports pause/resume of reports.*/ val pauseReports: Boolean? = false, /* Whether the client supports thread dumps.*/ @@ -107,6 +108,8 @@ data class SupportedFeatures( val deadLetterQueuesInsights: AxoniqConsoleDlqMode = AxoniqConsoleDlqMode.NONE, /* Whether the client supports domain events insights. Can be FULL, LOAD_DOMAIN_STATE_ONLY, PREVIEW_PAYLOAD_ONLY, or NONE (default).*/ val domainEventsInsights: DomainEventAccessMode = DomainEventAccessMode.NONE, + /* Whether the client supports client status updates .*/ + val clientStatusUpdates: Boolean? = false, ) data class Versions( diff --git a/framework-client-messaging/src/main/java/io/axoniq/platform/framework/AxoniqPlatformConfigurerEnhancer.java b/framework-client-messaging/src/main/java/io/axoniq/platform/framework/AxoniqPlatformConfigurerEnhancer.java index 2fc901e..b2b31f4 100644 --- a/framework-client-messaging/src/main/java/io/axoniq/platform/framework/AxoniqPlatformConfigurerEnhancer.java +++ b/framework-client-messaging/src/main/java/io/axoniq/platform/framework/AxoniqPlatformConfigurerEnhancer.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022-2025. AxonIQ B.V. + * Copyright (c) 2022-2026. AxonIQ B.V. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/framework-client-messaging/src/main/java/io/axoniq/platform/framework/application/ApplicationMetricReporter.kt b/framework-client-messaging/src/main/java/io/axoniq/platform/framework/application/ApplicationMetricReporter.kt index d381dd2..2f5fbc6 100644 --- a/framework-client-messaging/src/main/java/io/axoniq/platform/framework/application/ApplicationMetricReporter.kt +++ b/framework-client-messaging/src/main/java/io/axoniq/platform/framework/application/ApplicationMetricReporter.kt @@ -19,6 +19,7 @@ package io.axoniq.platform.framework.application import io.axoniq.platform.framework.api.ClientSettingsV2 import io.axoniq.platform.framework.api.Routes import io.axoniq.platform.framework.AxoniqPlatformConfiguration +import io.axoniq.platform.framework.api.ClientStatus import io.axoniq.platform.framework.client.AxoniqConsoleRSocketClient import io.axoniq.platform.framework.client.ClientSettingsObserver import io.axoniq.platform.framework.client.ClientSettingsService @@ -27,10 +28,10 @@ import java.util.concurrent.ScheduledFuture import java.util.concurrent.TimeUnit class ApplicationMetricReporter( - private val client: AxoniqConsoleRSocketClient, - private val reportCreator: ApplicationReportCreator, - private val clientSettingsService: ClientSettingsService, - private val properties: AxoniqPlatformConfiguration, + private val client: AxoniqConsoleRSocketClient, + private val reportCreator: ApplicationReportCreator, + private val clientSettingsService: ClientSettingsService, + private val properties: AxoniqPlatformConfiguration, ) : ClientSettingsObserver { private var reportTask: ScheduledFuture<*>? = null private val logger = KotlinLogging.logger { } @@ -40,7 +41,10 @@ class ApplicationMetricReporter( clientSettingsService.subscribeToSettings(this) } - override fun onConnectedWithSettings(settings: ClientSettingsV2) { + override fun onConnectionUpdate(clientStatus: ClientStatus, settings: ClientSettingsV2) { + if (!clientStatus.enabled || reportTask != null) { + return + } logger.debug { "Sending application information every ${settings.applicationReportInterval}ms to Axoniq Platform" } this.reportTask = executor.scheduleWithFixedDelay({ try { diff --git a/framework-client-messaging/src/main/java/io/axoniq/platform/framework/client/AxoniqConsoleRSocketClient.kt b/framework-client-messaging/src/main/java/io/axoniq/platform/framework/client/AxoniqConsoleRSocketClient.kt index e6a6053..8081819 100644 --- a/framework-client-messaging/src/main/java/io/axoniq/platform/framework/client/AxoniqConsoleRSocketClient.kt +++ b/framework-client-messaging/src/main/java/io/axoniq/platform/framework/client/AxoniqConsoleRSocketClient.kt @@ -17,13 +17,15 @@ package io.axoniq.platform.framework.client import io.axoniq.platform.framework.api.ClientSettingsV2 -import io.axoniq.platform.framework.api.ConsoleClientAuthentication +import io.axoniq.platform.framework.api.PlatformClientAuthentication import io.axoniq.platform.framework.api.ConsoleClientIdentifier import io.axoniq.platform.framework.api.Routes import io.axoniq.platform.framework.api.notifications.Notification import io.axoniq.platform.framework.api.notifications.NotificationLevel import io.axoniq.platform.framework.api.notifications.NotificationList import io.axoniq.platform.framework.AxoniqPlatformConfiguration +import io.axoniq.platform.framework.api.ClientStatus +import io.axoniq.platform.framework.api.ClientStatusUpdate import io.axoniq.platform.framework.client.strategy.RSocketPayloadEncodingStrategy import io.netty.buffer.ByteBufAllocator import io.netty.buffer.CompositeByteBuf @@ -34,6 +36,7 @@ import io.rsocket.metadata.WellKnownMimeType import io.rsocket.transport.netty.client.TcpClientTransport import org.slf4j.Logger import org.slf4j.LoggerFactory +import reactor.core.publisher.Flux import reactor.core.publisher.Mono import reactor.netty.tcp.TcpClient import java.time.Instant @@ -79,12 +82,14 @@ class AxoniqConsoleRSocketClient( private val logger = LoggerFactory.getLogger(this::class.java) private val connectionLock = ReentrantLock() + @Volatile private var rsocket: RSocket? = null private var lastConnectionTry = Instant.EPOCH private var connectionRetryCount = 0 + @Volatile - private var pausedReports = false + private var status: ClientStatus = ClientStatus.PENDING private var supressConnectMessage = false init { @@ -95,14 +100,11 @@ class AxoniqConsoleRSocketClient( clientSettingsService.updateSettings(it) } - // Server can block and unblock reports - registrar.registerHandlerWithoutPayload(Routes.Management.STOP_REPORTS) { - pausedReports = true - true - } - registrar.registerHandlerWithoutPayload(Routes.Management.START_REPORTS) { - pausedReports = false - true + // Server sends client status updates + registrar.registerHandlerWithPayload(Routes.Management.STATUS, ClientStatusUpdate::class.java) { + logger.debug("Received status update from Axoniq Platform. New status: {}", it.newStatus) + status = it.newStatus + clientSettingsService.updateClientStatus(status) } // Server can send log requests @@ -115,7 +117,7 @@ class AxoniqConsoleRSocketClient( * Sends a report to Axoniq Platform. If reports are paused, does nothing silently. */ fun sendReport(route: String, payload: Any): Mono { - if (pausedReports) { + if (!status.enabled) { return Mono.empty() } return sendMessage(payload, route) @@ -133,6 +135,15 @@ class AxoniqConsoleRSocketClient( } ?: Mono.empty()) + fun retrieve(payload: Any, route: String, responseType: Class): Mono { + return rsocket + ?.requestResponse(encodingStrategy.encode(payload, createRoutingMetadata(route))) + ?.map { responsePayload -> + encodingStrategy.decode(responsePayload, responseType) + } + ?: Mono.empty() + } + /** * Starts the connection, and starts the maintenance task. * The task will ensure that if heartbeats are missed the connection is killed, as well as re-setup in case @@ -186,7 +197,7 @@ class AxoniqConsoleRSocketClient( } private fun createRSocket(): RSocket { - val authentication = ConsoleClientAuthentication( + val authentication = PlatformClientAuthentication( identification = ConsoleClientIdentifier( environmentId = environmentId, applicationName = applicationName, @@ -217,7 +228,7 @@ class AxoniqConsoleRSocketClient( return metadata } - private fun createSetupMetadata(auth: ConsoleClientAuthentication): CompositeByteBuf { + private fun createSetupMetadata(auth: PlatformClientAuthentication): CompositeByteBuf { val metadata: CompositeByteBuf = ByteBufAllocator.DEFAULT.compositeBuffer() metadata.addRouteMetadata("client") metadata.addAuthMetadata(auth) @@ -280,7 +291,10 @@ class AxoniqConsoleRSocketClient( } } - override fun onConnectedWithSettings(settings: ClientSettingsV2) { + override fun onConnectionUpdate(clientStatus: ClientStatus, settings: ClientSettingsV2) { + if (this.heartbeatSendTask != null) { + return + } lastReceivedHeartbeat = Instant.now() this.heartbeatSendTask = executor.scheduleWithFixedDelay( { sendHeartbeat().subscribe() }, diff --git a/framework-client-messaging/src/main/java/io/axoniq/platform/framework/client/ClientSettingsObserver.kt b/framework-client-messaging/src/main/java/io/axoniq/platform/framework/client/ClientSettingsObserver.kt index f80359e..43850ed 100644 --- a/framework-client-messaging/src/main/java/io/axoniq/platform/framework/client/ClientSettingsObserver.kt +++ b/framework-client-messaging/src/main/java/io/axoniq/platform/framework/client/ClientSettingsObserver.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022-2025. AxonIQ B.V. + * Copyright (c) 2022-2026. AxonIQ B.V. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,19 +17,20 @@ package io.axoniq.platform.framework.client import io.axoniq.platform.framework.api.ClientSettingsV2 +import io.axoniq.platform.framework.api.ClientStatus /** * Observes the established connection and the settings provided by the server. * The [onDisconnected] method is called when the connection is lost, or just before new settings - * are being updated to provide cleanup. The [onConnectedWithSettings] method is called when the connection is + * are being updated to provide cleanup. The [onConnectionUpdate] method is called when the connection is * established or the settings are updated */ interface ClientSettingsObserver { /** - * Called when the connection is established or the settings are updated. + * Called when the connection is established, the settings are updated, or the client's status changes. * @param settings the settings provided by the server */ - fun onConnectedWithSettings(settings: ClientSettingsV2) + fun onConnectionUpdate(clientStatus: ClientStatus, settings: ClientSettingsV2) /** * Called when the connection is lost, or just before new settings are being updated to provide cleanup. diff --git a/framework-client-messaging/src/main/java/io/axoniq/platform/framework/client/ClientSettingsService.kt b/framework-client-messaging/src/main/java/io/axoniq/platform/framework/client/ClientSettingsService.kt index c8897b7..9171b5e 100644 --- a/framework-client-messaging/src/main/java/io/axoniq/platform/framework/client/ClientSettingsService.kt +++ b/framework-client-messaging/src/main/java/io/axoniq/platform/framework/client/ClientSettingsService.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022-2025. AxonIQ B.V. + * Copyright (c) 2022-2026. AxonIQ B.V. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,6 +17,7 @@ package io.axoniq.platform.framework.client import io.axoniq.platform.framework.api.ClientSettingsV2 +import io.axoniq.platform.framework.api.ClientStatus import io.github.oshai.kotlinlogging.KotlinLogging import java.util.concurrent.CopyOnWriteArrayList @@ -25,11 +26,12 @@ import java.util.concurrent.CopyOnWriteArrayList */ class ClientSettingsService { private val observers = CopyOnWriteArrayList() + private var clientStatus: ClientStatus = ClientStatus.PENDING private var settings: ClientSettingsV2? = null private val logger = KotlinLogging.logger { } fun clearSettings() { - logger.info { "Clearing client settings" } + logger.debug { "Clearing client settings" } if (settings != null) { settings = null observers.forEach { it.onDisconnected() } @@ -37,17 +39,25 @@ class ClientSettingsService { } fun subscribeToSettings(observer: ClientSettingsObserver) { - logger.info { "Subscribing to client settings $observer" } + logger.debug { "Subscribing to client settings $observer" } this.observers.add(observer) if (settings != null) { - observer.onConnectedWithSettings(settings!!) + observer.onConnectionUpdate(clientStatus, settings!!) + } + } + + fun updateClientStatus(clientStatus: ClientStatus) { + logger.debug { "Client status changed to $clientStatus" } + this.clientStatus = clientStatus + if (settings != null) { + observers.forEach { it.onConnectionUpdate(clientStatus, settings!!) } } } fun updateSettings(settings: ClientSettingsV2) { clearSettings() - logger.info { "Client settings changed to $settings" } + logger.debug { "Client settings changed to $settings" } this.settings = settings - observers.forEach { it.onConnectedWithSettings(settings) } + observers.forEach { it.onConnectionUpdate(clientStatus, settings) } } } \ No newline at end of file diff --git a/framework-client-messaging/src/main/java/io/axoniq/platform/framework/client/RSocketHandlerRegistrar.kt b/framework-client-messaging/src/main/java/io/axoniq/platform/framework/client/RSocketHandlerRegistrar.kt index c110546..96fd081 100644 --- a/framework-client-messaging/src/main/java/io/axoniq/platform/framework/client/RSocketHandlerRegistrar.kt +++ b/framework-client-messaging/src/main/java/io/axoniq/platform/framework/client/RSocketHandlerRegistrar.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022-2025. AxonIQ B.V. + * Copyright (c) 2022-2026. AxonIQ B.V. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,7 @@ package io.axoniq.platform.framework.client +import io.axoniq.platform.framework.api.Routes import io.axoniq.platform.framework.client.strategy.RSocketPayloadEncodingStrategy import io.rsocket.Payload import io.rsocket.RSocket @@ -31,6 +32,13 @@ class RSocketHandlerRegistrar( private val logger = LoggerFactory.getLogger(this::class.java) private val handlers: MutableList = mutableListOf() + companion object { + /** Routes that contain sensitive information and should not have their payloads logged. */ + private val SENSITIVE_ROUTES = setOf( + Routes.License.LICENSE, + ) + } + fun registerHandlerWithoutPayload(route: String, handler: () -> Any) { logger.debug("Registered Axoniq Platform handler for route {}", route) handlers.add(PayloadlessRegisteredRsocketMessageHandler(route, handler)) @@ -79,7 +87,8 @@ class RSocketHandlerRegistrar( route: String, ): Any { val decodedPayload = encodingStrategy.decode(payload, matchingHandler.payloadType) - logger.debug("Received Axoniq Platform message for route [$route] with payload: [{}]", decodedPayload) + val payloadLog = if (route in SENSITIVE_ROUTES) "[REDACTED]" else decodedPayload + logger.debug("Received Axoniq Platform message for route [$route] with payload: [{}]", payloadLog) return matchingHandler.handler.invoke(decodedPayload) } diff --git a/framework-client-messaging/src/main/java/io/axoniq/platform/framework/client/ServerProcessorReporter.kt b/framework-client-messaging/src/main/java/io/axoniq/platform/framework/client/ServerProcessorReporter.kt index e480cad..7984373 100644 --- a/framework-client-messaging/src/main/java/io/axoniq/platform/framework/client/ServerProcessorReporter.kt +++ b/framework-client-messaging/src/main/java/io/axoniq/platform/framework/client/ServerProcessorReporter.kt @@ -19,8 +19,10 @@ package io.axoniq.platform.framework.client import io.axoniq.platform.framework.api.ClientSettingsV2 import io.axoniq.platform.framework.api.Routes import io.axoniq.platform.framework.AxoniqPlatformConfiguration +import io.axoniq.platform.framework.api.ClientStatus import io.axoniq.platform.framework.eventprocessor.ProcessorReportCreator import io.github.oshai.kotlinlogging.KotlinLogging +import reactor.core.Disposable import java.util.concurrent.ScheduledFuture import java.util.concurrent.TimeUnit @@ -38,8 +40,11 @@ class ServerProcessorReporter( clientSettingsService.subscribeToSettings(this) } - override fun onConnectedWithSettings(settings: ClientSettingsV2) { - logger.info { "Sending processor information every ${settings.processorReportInterval}ms to Axoniq Platform" } + override fun onConnectionUpdate(clientStatus: ClientStatus, settings: ClientSettingsV2) { + if (!clientStatus.enabled || reportTask != null) { + return + } + logger.debug { "Sending processor information every ${settings.processorReportInterval}ms to Axoniq Platform" } this.reportTask = executor.scheduleWithFixedDelay({ try { this.report() diff --git a/framework-client-messaging/src/main/java/io/axoniq/platform/framework/client/SetupPayloadCreator.kt b/framework-client-messaging/src/main/java/io/axoniq/platform/framework/client/SetupPayloadCreator.kt index 2e175ff..e94b825 100644 --- a/framework-client-messaging/src/main/java/io/axoniq/platform/framework/client/SetupPayloadCreator.kt +++ b/framework-client-messaging/src/main/java/io/axoniq/platform/framework/client/SetupPayloadCreator.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022-2025. AxonIQ B.V. + * Copyright (c) 2022-2026. AxonIQ B.V. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -79,8 +79,10 @@ class SetupPayloadCreator( versions = versionInformation(), upcasters = emptyList(), features = SupportedFeatures( + pauseReports = false, heartbeat = true, threadDump = true, + clientStatusUpdates = true, ) ) } diff --git a/framework-client-messaging/src/main/java/io/axoniq/platform/framework/client/utils.kt b/framework-client-messaging/src/main/java/io/axoniq/platform/framework/client/utils.kt index 23345bd..8c20337 100644 --- a/framework-client-messaging/src/main/java/io/axoniq/platform/framework/client/utils.kt +++ b/framework-client-messaging/src/main/java/io/axoniq/platform/framework/client/utils.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022-2025. AxonIQ B.V. + * Copyright (c) 2022-2026. AxonIQ B.V. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,7 +16,7 @@ package io.axoniq.platform.framework.client -import io.axoniq.platform.framework.api.ConsoleClientAuthentication +import io.axoniq.platform.framework.api.PlatformClientAuthentication import io.netty.buffer.ByteBufAllocator import io.netty.buffer.CompositeByteBuf import io.rsocket.metadata.CompositeMetadataCodec @@ -34,7 +34,7 @@ fun CompositeByteBuf.addRouteMetadata(route: String) { ) } -fun CompositeByteBuf.addAuthMetadata(auth: ConsoleClientAuthentication) { +fun CompositeByteBuf.addAuthMetadata(auth: PlatformClientAuthentication) { val authMetadata = ByteBufAllocator.DEFAULT.compositeBuffer() authMetadata.writeBytes(auth.toBearerToken().toByteArray()) CompositeMetadataCodec.encodeAndAddMetadata( diff --git a/framework-client-messaging/src/main/java/io/axoniq/platform/framework/messaging/HandlerMetricsRegistry.kt b/framework-client-messaging/src/main/java/io/axoniq/platform/framework/messaging/HandlerMetricsRegistry.kt index 9de2973..02f1396 100644 --- a/framework-client-messaging/src/main/java/io/axoniq/platform/framework/messaging/HandlerMetricsRegistry.kt +++ b/framework-client-messaging/src/main/java/io/axoniq/platform/framework/messaging/HandlerMetricsRegistry.kt @@ -19,6 +19,7 @@ package io.axoniq.platform.framework.messaging import io.axoniq.platform.framework.api.ClientSettingsV2 import io.axoniq.platform.framework.api.Routes import io.axoniq.platform.framework.AxoniqPlatformConfiguration +import io.axoniq.platform.framework.api.ClientStatus import io.axoniq.platform.framework.api.metrics.DispatcherStatisticIdentifier import io.axoniq.platform.framework.api.metrics.DispatcherStatistics import io.axoniq.platform.framework.api.metrics.DispatcherStatisticsWithIdentifier @@ -65,7 +66,10 @@ class HandlerMetricsRegistry( clientSettingsService.subscribeToSettings(this) } - override fun onConnectedWithSettings(settings: ClientSettingsV2) { + override fun onConnectionUpdate(clientStatus: ClientStatus, settings: ClientSettingsV2) { + if (!clientStatus.enabled || reportTask != null) { + return + } logger.debug { "Sending handler information every ${settings.handlerReportInterval}ms to Axoniq Platform" } this.reportTask = executor.scheduleAtFixedRate({ if (!axoniqConsoleRSocketClient.isConnected()) { diff --git a/pom.xml b/pom.xml index 912cc37..fd2723d 100644 --- a/pom.xml +++ b/pom.xml @@ -73,11 +73,13 @@ org.axonframework axon-messaging ${axon.version} + provided org.axonframework axon-modelling ${axon.version} + provided org.axonframework @@ -107,6 +109,7 @@ kotlin-reflect ${kotlin.version} +