Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright (c) 2022-2026. AxonIQ B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.axoniq.platform.framework.api


/**
* Represents the status of the client.
*
* @param enabled whether the client is enabled to process reports and retrieve a license.
*/
enum class ClientStatus(val enabled: Boolean) {
/**
* The connection has just been established, but has not yet been validated by the server to be able to send reports
* nor have a license. This is the default status of a connection, and will be changed to one of the other statuses after validation.
*/
PENDING(false),

/**
* The connection has been accepted and this connection is part of the provisioned connections.
* This means that the connection is able to process reports and retrieve a license, and does not use credits.
*/
PROVISIONED(true),

/**
* The connection has been accepted, but the pool of provisioned connections is full.
* This means that the connection is able to process reports and retrieve a license, but is using credits to do so.
* Upon depletion of credits, the connection will be BLOCKED.
* Can be changed to PROVISIONED when a provisioned connection is available.
*/
USING_CREDITS(true),

/**
* The connection has been blocked, and is not able to process reports.
* Once a provisioned connection opens up, or credits are available, the connection will be unblocked.
*/
BLOCKED(false)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright (c) 2022-2026. AxonIQ B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.axoniq.platform.framework.api

data class ClientStatusUpdate(
val newStatus: ClientStatus,
)
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2025. AxonIQ B.V.
* Copyright (c) 2022-2026. AxonIQ B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -32,6 +32,8 @@ object Routes {
const val START_REPORTS = "client-reporting-start"
// Request to send a thread dump
const val THREAD_DUMP = "client-thread-dump"
// Update pushed to the client whenever its status changes
const val STATUS = "client-status"
}

object EventProcessor {
Expand Down Expand Up @@ -76,4 +78,8 @@ object Routes {
object MessageFlow {
const val STATS = "message-flow-stats"
}

object License {
const val LICENSE = "license"
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2025. AxonIQ B.V.
* Copyright (c) 2022-2026. AxonIQ B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -28,7 +28,7 @@
* The token looks like the following:
* @code Bearer WORK_SPACE_ID:ENVIRONMENT_ID:COMPONENT_NAME:NODE_ID:ACCESS_TOKEN}
*/
data class ConsoleClientAuthentication(
data class PlatformClientAuthentication(
val identification: ConsoleClientIdentifier,
val accessToken: String,
) {
Expand All @@ -45,13 +45,13 @@
private const val BEARER_PREFIX: String = "Bearer "
private const val TOKEN_ERROR: String = "Not a valid Bearer token!"

fun fromToken(token: String): ConsoleClientAuthentication {
fun fromToken(token: String): PlatformClientAuthentication {
assert(token.startsWith(BEARER_PREFIX)) { TOKEN_ERROR }

val tokenParts = token.removePrefix(BEARER_PREFIX).split(":")
if (tokenParts.size == 5) {
val (_, environmentId, applicationName, nodeName, accessToken) = tokenParts
return ConsoleClientAuthentication(
return PlatformClientAuthentication(
ConsoleClientIdentifier(
environmentId = environmentId,
applicationName = applicationName.decode(),
Expand All @@ -63,7 +63,7 @@

assert(tokenParts.size == 4) { TOKEN_ERROR }
val (environmentId, applicationName, nodeName, accessToken) = tokenParts
return ConsoleClientAuthentication(
return PlatformClientAuthentication(
ConsoleClientIdentifier(
environmentId = environmentId,
applicationName = applicationName.decode(),
Expand Down Expand Up @@ -99,14 +99,17 @@
val heartbeat: Boolean? = false,
/* Whether the client supports direct logging.*/
val logDirect: Boolean? = false,
@Deprecated("Was never used, accidentally, will be removed in 2.1.0")
/* Whether the client supports pause/resume of reports.*/
val pauseReports: Boolean? = false,

Check warning on line 104 in framework-client-api/src/main/java/io/axoniq/platform/framework/api/clientIdentification.kt

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Do not forget to remove this deprecated code someday.

See more on https://sonarcloud.io/project/issues?id=AxonIQ_console-framework-client&issues=AZzYR92-32CG6XB0R4Uc&open=AZzYR92-32CG6XB0R4Uc&pullRequest=122
/* Whether the client supports thread dumps.*/
val threadDump: Boolean? = false,
/* Whether the client supports DLQ insights. Can be FULL, LIMITED, MASKED, or NONE (default).*/
val deadLetterQueuesInsights: AxoniqConsoleDlqMode = AxoniqConsoleDlqMode.NONE,
/* Whether the client supports domain events insights. Can be FULL, LOAD_DOMAIN_STATE_ONLY, PREVIEW_PAYLOAD_ONLY, or NONE (default).*/
val domainEventsInsights: DomainEventAccessMode = DomainEventAccessMode.NONE,
/* Whether the client supports client status updates .*/
val clientStatusUpdates: Boolean? = false,
)

data class Versions(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2025. AxonIQ B.V.
* Copyright (c) 2022-2026. AxonIQ B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package io.axoniq.platform.framework.application
import io.axoniq.platform.framework.api.ClientSettingsV2
import io.axoniq.platform.framework.api.Routes
import io.axoniq.platform.framework.AxoniqPlatformConfiguration
import io.axoniq.platform.framework.api.ClientStatus
import io.axoniq.platform.framework.client.AxoniqConsoleRSocketClient
import io.axoniq.platform.framework.client.ClientSettingsObserver
import io.axoniq.platform.framework.client.ClientSettingsService
Expand All @@ -27,10 +28,10 @@ import java.util.concurrent.ScheduledFuture
import java.util.concurrent.TimeUnit

class ApplicationMetricReporter(
private val client: AxoniqConsoleRSocketClient,
private val reportCreator: ApplicationReportCreator,
private val clientSettingsService: ClientSettingsService,
private val properties: AxoniqPlatformConfiguration,
private val client: AxoniqConsoleRSocketClient,
private val reportCreator: ApplicationReportCreator,
private val clientSettingsService: ClientSettingsService,
private val properties: AxoniqPlatformConfiguration,
) : ClientSettingsObserver {
private var reportTask: ScheduledFuture<*>? = null
private val logger = KotlinLogging.logger { }
Expand All @@ -40,7 +41,10 @@ class ApplicationMetricReporter(
clientSettingsService.subscribeToSettings(this)
}

override fun onConnectedWithSettings(settings: ClientSettingsV2) {
override fun onConnectionUpdate(clientStatus: ClientStatus, settings: ClientSettingsV2) {
if (!clientStatus.enabled || reportTask != null) {
return
}
logger.debug { "Sending application information every ${settings.applicationReportInterval}ms to Axoniq Platform" }
this.reportTask = executor.scheduleWithFixedDelay({
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@
package io.axoniq.platform.framework.client

import io.axoniq.platform.framework.api.ClientSettingsV2
import io.axoniq.platform.framework.api.ConsoleClientAuthentication
import io.axoniq.platform.framework.api.PlatformClientAuthentication
import io.axoniq.platform.framework.api.ConsoleClientIdentifier
import io.axoniq.platform.framework.api.Routes
import io.axoniq.platform.framework.api.notifications.Notification
import io.axoniq.platform.framework.api.notifications.NotificationLevel
import io.axoniq.platform.framework.api.notifications.NotificationList
import io.axoniq.platform.framework.AxoniqPlatformConfiguration
import io.axoniq.platform.framework.api.ClientStatus
import io.axoniq.platform.framework.api.ClientStatusUpdate
import io.axoniq.platform.framework.client.strategy.RSocketPayloadEncodingStrategy
import io.netty.buffer.ByteBufAllocator
import io.netty.buffer.CompositeByteBuf
Expand All @@ -34,6 +36,7 @@ import io.rsocket.metadata.WellKnownMimeType
import io.rsocket.transport.netty.client.TcpClientTransport
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.netty.tcp.TcpClient
import java.time.Instant
Expand Down Expand Up @@ -79,12 +82,14 @@ class AxoniqConsoleRSocketClient(
private val logger = LoggerFactory.getLogger(this::class.java)

private val connectionLock = ReentrantLock()

@Volatile
private var rsocket: RSocket? = null
private var lastConnectionTry = Instant.EPOCH
private var connectionRetryCount = 0

@Volatile
private var pausedReports = false
private var status: ClientStatus = ClientStatus.PENDING
private var supressConnectMessage = false

init {
Expand All @@ -95,14 +100,11 @@ class AxoniqConsoleRSocketClient(
clientSettingsService.updateSettings(it)
}

// Server can block and unblock reports
registrar.registerHandlerWithoutPayload(Routes.Management.STOP_REPORTS) {
pausedReports = true
true
}
registrar.registerHandlerWithoutPayload(Routes.Management.START_REPORTS) {
pausedReports = false
true
// Server sends client status updates
registrar.registerHandlerWithPayload(Routes.Management.STATUS, ClientStatusUpdate::class.java) {
logger.debug("Received status update from Axoniq Platform. New status: {}", it.newStatus)
status = it.newStatus
clientSettingsService.updateClientStatus(status)
}

// Server can send log requests
Expand All @@ -115,7 +117,7 @@ class AxoniqConsoleRSocketClient(
* Sends a report to Axoniq Platform. If reports are paused, does nothing silently.
*/
fun sendReport(route: String, payload: Any): Mono<Unit> {
if (pausedReports) {
if (!status.enabled) {
return Mono.empty()
}
return sendMessage(payload, route)
Expand All @@ -133,6 +135,15 @@ class AxoniqConsoleRSocketClient(
}
?: Mono.empty())

fun <R> retrieve(payload: Any, route: String, responseType: Class<R>): Mono<R> {
return rsocket
?.requestResponse(encodingStrategy.encode(payload, createRoutingMetadata(route)))
?.map { responsePayload ->
encodingStrategy.decode(responsePayload, responseType)
}
?: Mono.empty()
}

/**
* Starts the connection, and starts the maintenance task.
* The task will ensure that if heartbeats are missed the connection is killed, as well as re-setup in case
Expand Down Expand Up @@ -186,7 +197,7 @@ class AxoniqConsoleRSocketClient(
}

private fun createRSocket(): RSocket {
val authentication = ConsoleClientAuthentication(
val authentication = PlatformClientAuthentication(
identification = ConsoleClientIdentifier(
environmentId = environmentId,
applicationName = applicationName,
Expand Down Expand Up @@ -217,7 +228,7 @@ class AxoniqConsoleRSocketClient(
return metadata
}

private fun createSetupMetadata(auth: ConsoleClientAuthentication): CompositeByteBuf {
private fun createSetupMetadata(auth: PlatformClientAuthentication): CompositeByteBuf {
val metadata: CompositeByteBuf = ByteBufAllocator.DEFAULT.compositeBuffer()
metadata.addRouteMetadata("client")
metadata.addAuthMetadata(auth)
Expand Down Expand Up @@ -280,7 +291,10 @@ class AxoniqConsoleRSocketClient(
}
}

override fun onConnectedWithSettings(settings: ClientSettingsV2) {
override fun onConnectionUpdate(clientStatus: ClientStatus, settings: ClientSettingsV2) {
if (this.heartbeatSendTask != null) {
return
}
lastReceivedHeartbeat = Instant.now()
this.heartbeatSendTask = executor.scheduleWithFixedDelay(
{ sendHeartbeat().subscribe() },
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2025. AxonIQ B.V.
* Copyright (c) 2022-2026. AxonIQ B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,19 +17,20 @@
package io.axoniq.platform.framework.client

import io.axoniq.platform.framework.api.ClientSettingsV2
import io.axoniq.platform.framework.api.ClientStatus

/**
* Observes the established connection and the settings provided by the server.
* The [onDisconnected] method is called when the connection is lost, or just before new settings
* are being updated to provide cleanup. The [onConnectedWithSettings] method is called when the connection is
* are being updated to provide cleanup. The [onConnectionUpdate] method is called when the connection is
* established or the settings are updated
*/
interface ClientSettingsObserver {
/**
* Called when the connection is established or the settings are updated.
* Called when the connection is established, the settings are updated, or the client's status changes.
* @param settings the settings provided by the server
*/
fun onConnectedWithSettings(settings: ClientSettingsV2)
fun onConnectionUpdate(clientStatus: ClientStatus, settings: ClientSettingsV2)

/**
* Called when the connection is lost, or just before new settings are being updated to provide cleanup.
Expand Down
Loading
Loading