Skip to content
Merged
16 changes: 16 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,22 @@ For actual configuration, please consult the setup instructions that will be pro
only the keys set in the `axoniq.console.dlq-diagnostics-whitelist` will be shown.
* `full` - all the message payloads, aggregate identifier, and the diagnostics data will be visible.


* `axoniq.console.dlq-diagnostics-whitelist` - a comma-separated list of messages that will be shown in the DLQ
diagnostics.


* `axoniq.console.domain-event-access-mode` – change the visibility and access level of Domain Events in the system,
default: `none`. Several options are available:

* `none` – no payloads are visible, and aggregate loading is disabled.

* `preview_payload_only` – payloads of events are visible via the API or UI, but snapshot loading is disabled.

* `load_domain_state_only` – event payloads are not shown, but domain state loading is enabled for the domain entity.

* `full` – full access: event payloads are visible, and aggregate loading is supported.

## Data sent to AxonIQ

AxonIQ Console is an [AxonIQ](https://axoniq.io) SaaS product. Your application will periodically, or upon request, send
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ object Routes {
const val REPORT = "application-info-report"
}

object Enity {
const val DOMAIN_EVENTS = "domain-events"
const val ENTITY_STATE_AT_SEQUENCE = "entity-state-at-sequence"
}

object MessageFlow {
const val STATS = "message-flow-stats"
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package io.axoniq.console.framework.api

import java.time.Instant

data class DomainEventsResult(
val entityId: String,
val entityType: String,
val domainEvents: List<DomainEvent>,
val page: Int,
val pageSize: Int,
val totalCount: Long,
)

data class DomainEvent(
val sequenceNumber: Long,
val timestamp: Instant,
val payloadType: String,
val payload: String?
)

data class DomainEventsQuery(
val entityId: String,
val page: Int = 0,
val pageSize: Int = 10,
)

data class EntityStateResult(
val type: String,
val entityId: String,
val maxSequenceNumber: Long = 0,
val state: String,
)

data class EntityStateAtSequenceQuery(
val type: String,
val entityId: String,
val maxSequenceNumber: Long = 0,
)
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ class AxoniqConsoleAutoConfiguration {
.initialDelay(properties.initialDelay)
.disableSpanFactoryInConfiguration()
.managementMaxThreadPoolSize(properties.maxConcurrentManagementTasks)
.domainEventAccessMode(properties.domainEventAccessMode)
properties.dlqDiagnosticsWhitelist.forEach { builder.addDlqDiagnosticsWhitelistKey(it) }
return builder.build()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.axoniq.console.framework.starter;

import io.axoniq.console.framework.AxoniqConsoleDlqMode;
import io.axoniq.console.framework.DomainEventAccessMode;
import org.springframework.boot.context.properties.ConfigurationProperties;

import java.util.ArrayList;
Expand Down Expand Up @@ -54,6 +55,12 @@ public class AxoniqConsoleSpringProperties {
* the list as the keys to filter on.
*/
private List<String> dlqDiagnosticsWhitelist = new ArrayList<>();
/**
* The access mode for Domain Events. Defaults to {@code NONE}, meaning no event payload is visible and loading
* for aggregate reconstruction is disabled. If payload inspection is required, consider {@code PAYLOAD_ONLY}.
* For full functionality, use {@code FULL}, which enables both payload visibility and aggregate snapshot loading.
*/
private DomainEventAccessMode domainEventAccessMode = DomainEventAccessMode.NONE;
/**
* Whether the connection should use SSL/TLs. Defaults to {@code true}.
*/
Expand Down Expand Up @@ -117,6 +124,14 @@ public void setDlqDiagnosticsWhitelist(List<String> dlqDiagnosticsWhitelist) {
this.dlqDiagnosticsWhitelist = dlqDiagnosticsWhitelist;
}

public DomainEventAccessMode getDomainEventAccessMode() {
return domainEventAccessMode;
}

public void setDomainEventAccessMode(DomainEventAccessMode domainEventAccessMode) {
this.domainEventAccessMode = domainEventAccessMode;
}

public boolean isSecure() {
return secure;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,15 @@

package io.axoniq.console.framework;

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.axoniq.console.framework.application.DomainEventStreamProvider;
import io.axoniq.console.framework.application.ApplicationMetricRegistry;
import io.axoniq.console.framework.application.ApplicationMetricReporter;
import io.axoniq.console.framework.application.ApplicationReportCreator;
import io.axoniq.console.framework.application.ApplicationThreadDumpProvider;
import io.axoniq.console.framework.application.RSocketDomainEntityDataResponder;
import io.axoniq.console.framework.application.RSocketThreadDumpResponder;
import io.axoniq.console.framework.client.AxoniqConsoleRSocketClient;
import io.axoniq.console.framework.client.ClientSettingsService;
Expand Down Expand Up @@ -83,13 +88,15 @@ public class AxoniqConsoleConfigurerModule implements ConfigurerModule {
private final Long initialDelay;
private final AxoniqConsoleDlqMode dlqMode;
private final List<String> dlqDiagnosticsWhitelist;
private final DomainEventAccessMode domainEventAccessMode;
private final ScheduledExecutorService reportingTaskExecutor;
private final ExecutorService managementTaskExecutor;
private final boolean configureSpanFactory;
private final SpanMatcherPredicateMap spanMatcherPredicateMap;
private final EventScheduler eventScheduler;
private final MeterRegistry meterRegistry = new SimpleMeterRegistry();
private final String instanceName;
private final ObjectMapper objectMapper;

/**
* Creates the {@link AxoniqConsoleConfigurerModule} with the given {@code builder}.
Expand All @@ -107,11 +114,13 @@ protected AxoniqConsoleConfigurerModule(Builder builder) {
this.initialDelay = builder.initialDelay;
this.dlqMode = builder.dlqMode;
this.dlqDiagnosticsWhitelist = builder.dlqDiagnosticsWhitelist;
this.domainEventAccessMode = builder.domainEventAccessMode;
this.reportingTaskExecutor = builder.reportingTaskExecutor;
this.managementTaskExecutor = builder.managementTaskExecutor;
this.configureSpanFactory = !builder.disableSpanFactoryInConfiguration;
this.spanMatcherPredicateMap = builder.spanMatcherPredicateMap;
this.eventScheduler = builder.eventScheduler;
this.objectMapper = builder.objectMapper;
}

/**
Expand Down Expand Up @@ -223,6 +232,12 @@ public void configureModule(@NotNull Configurer configurer) {
.registerComponent(ApplicationThreadDumpProvider.class,
c -> new ApplicationThreadDumpProvider()
)
.registerComponent(DomainEventStreamProvider.class,
c -> new DomainEventStreamProvider(
configurer.buildConfiguration(),
objectMapper
)
)
.registerComponent(RSocketDlqResponder.class,
c -> new RSocketDlqResponder(
c.getComponent(DeadLetterManager.class),
Expand All @@ -233,6 +248,13 @@ public void configureModule(@NotNull Configurer configurer) {
c.getComponent(ApplicationThreadDumpProvider.class),
c.getComponent(RSocketHandlerRegistrar.class)
))
.registerComponent(RSocketDomainEntityDataResponder.class,
c -> new RSocketDomainEntityDataResponder(
c.getComponent(DomainEventStreamProvider.class),
c.getComponent(RSocketHandlerRegistrar.class),
domainEventAccessMode,
c.eventSerializer()
))
.eventProcessing()
.registerDefaultHandlerInterceptor((
c, name) -> new AxoniqConsoleProcessorInterceptor(
Expand All @@ -259,6 +281,7 @@ public void configureModule(@NotNull Configurer configurer) {
c.getComponent(RSocketDlqResponder.class);
c.getComponent(HandlerMetricsRegistry.class);
c.getComponent(RSocketThreadDumpResponder.class);
c.getComponent(RSocketDomainEntityDataResponder.class);
});

configurer.onStart(() -> {
Expand Down Expand Up @@ -300,6 +323,7 @@ public static class Builder {
private String nodeId = randomNodeId();
private AxoniqConsoleDlqMode dlqMode = AxoniqConsoleDlqMode.NONE;
private final List<String> dlqDiagnosticsWhitelist = new ArrayList<>();
private DomainEventAccessMode domainEventAccessMode = DomainEventAccessMode.NONE;
private Long initialDelay = 0L;
private boolean disableSpanFactoryInConfiguration = false;
private final SpanMatcherPredicateMap spanMatcherPredicateMap = getSpanMatcherPredicateMap();
Expand All @@ -311,6 +335,9 @@ public static class Builder {
private Integer managementMaxThreadPoolSize = 5;
private EventScheduler eventScheduler;

private ObjectMapper objectMapper = new ObjectMapper().findAndRegisterModules()
.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);

/**
* Constructor to instantiate a {@link Builder} based on the fields contained in the
* {@link AxoniqConsoleConfigurerModule.Builder}. Requires the {@code environmentId}, {@code accessToken} and
Expand Down Expand Up @@ -405,6 +432,18 @@ public Builder addDlqDiagnosticsWhitelistKey(String key) {
return this;
}

/**
* The mode of domain event access. Defaults to {@link DomainEventAccessMode#NONE}, which means
* that no domain event payload is visible and aggregate reconstruction is not supported.
*
* @param domainEventAccessMode The access mode to set for domain events
* @return The builder for fluent interfacing
*/
public Builder domainEventAccessMode(DomainEventAccessMode domainEventAccessMode) {
BuilderUtils.assertNonNull(domainEventAccessMode, "Domain event access mode may not be null");
this.domainEventAccessMode = domainEventAccessMode;
return this;
}

/**
* The initial delay before attempting to establish a connection. Defaults to {@code 0}.
Expand Down Expand Up @@ -524,6 +563,19 @@ public Builder eventScheduler(EventScheduler eventScheduler) {
return this;
}

/**
* Set the object mapper to be used for serialization and deserialization of domain events.
* Defaults to a new {@link ObjectMapper} with all modules registered and field visibility set to any.
*
* @param objectMapper the object mapper to use
* @return The builder for fluent interfacing
*/
public Builder objectMapper(ObjectMapper objectMapper) {
BuilderUtils.assertNonNull(objectMapper, "Object mapper must be non-null");
this.objectMapper = objectMapper;
return this;
}

/**
* Builds the {@link AxoniqConsoleConfigurerModule} based on the fields set in this {@link Builder}.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package io.axoniq.console.framework;

public enum DomainEventAccessMode {
/**
* Full access: payload is visible and loading domain state is supported.
*/
FULL,

/**
* Payload is hidden (e.g., masked), but loading domain state is still supported.
*/
LOAD_DOMAIN_STATE_ONLY,

/**
* Payload is visible, but loading domain state is not supported.
*/
PREVIEW_PAYLOAD_ONLY,

/**
* No access: payload is hidden and loading domain state is not supported.
*/
NONE
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package io.axoniq.console.framework.application

import com.fasterxml.jackson.databind.ObjectMapper
import org.axonframework.common.ReflectionUtils
import org.axonframework.config.AggregateConfiguration
import org.axonframework.config.Configuration
import org.axonframework.eventhandling.DomainEventMessage
import org.axonframework.eventhandling.EventMessage
import org.axonframework.eventsourcing.AggregateFactory
import org.axonframework.eventsourcing.EventSourcedAggregate
import org.axonframework.eventsourcing.SnapshotTrigger
import org.axonframework.eventsourcing.eventstore.AbstractEventStorageEngine
import org.axonframework.eventsourcing.eventstore.DomainEventStream
import org.axonframework.modelling.command.Repository
import org.axonframework.modelling.command.RepositoryProvider
import org.axonframework.modelling.command.inspection.AggregateModel

class DomainEventStreamProvider(
private val configuration: Configuration,
private val objectMapper: ObjectMapper
) {

fun getDomainEventStream(entityIdentifier: String, firstSequenceNumber: Long = 0): List<DomainEventMessage<*>>? =
configuration.eventStore()
.readEvents(entityIdentifier, firstSequenceNumber)
.iterator()
.asSequence()
.toList()
.takeIf { it.isNotEmpty() }

fun <T> loadDomainStateAtSequence(type: String, entityIdentifier: String, maxSequenceNumber: Long): String? {
val entityConfiguration = configuration.modules
.filterIsInstance<AggregateConfiguration<*>>()
.firstOrNull { it.aggregateType().simpleName == type }
?: throw IllegalArgumentException("No domain entity found for type $type")

val eventStore = configuration.eventStore()

val factory: AggregateFactory<T> = entityConfiguration.aggregateFactory() as AggregateFactory<T>
val model: AggregateModel<T> = entityConfiguration.repository().getPropertyValue<AggregateModel<T>>("aggregateModel")
?: throw IllegalArgumentException("No domain entity model found for type $type")

val stream = readEvents(entityIdentifier)
val loadingEntity: EventSourcedAggregate<T> = EventSourcedAggregate
.initialize(factory.createAggregateRoot(entityIdentifier, stream.peek()),
model,
eventStore,
object : RepositoryProvider {
override fun <T> repositoryFor(aggregateType: Class<T>): Repository<T> {
return entityConfiguration.repository() as Repository<T>
}

},
object : SnapshotTrigger {
override fun eventHandled(p0: EventMessage<*>) {
// Do nothing
}

override fun initializationFinished() {
// Do nothing
}
})

loadingEntity.initializeState(stream.filter { it.sequenceNumber <= maxSequenceNumber })

return objectMapper.writeValueAsString(loadingEntity.aggregateRoot)
}

private fun readEvents(identifier: String, firstSequenceNumber: Long = 0): DomainEventStream =
configuration.eventStore()
.getPropertyValue<AbstractEventStorageEngine>("storageEngine")
?.readEvents(identifier, firstSequenceNumber)
?: throw IllegalStateException("Unable to find AbstractEventStorageEngine in event store")

@Suppress("UNCHECKED_CAST")
private inline fun <reified T> Any.getPropertyValue(fieldName: String): T? =
ReflectionUtils.fieldsOf(this::class.java)
.firstOrNull { it.name == fieldName }
?.let { ReflectionUtils.getMemberValue(it, this) as? T }
}
Loading
Loading