diff --git a/build.gradle.kts b/build.gradle.kts index 84d44288..18695089 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -56,6 +56,9 @@ dependencies { implementation("com.fasterxml.jackson.datatype:jackson-datatype-jdk8:2.15.1") implementation("com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.15.1") + implementation("com.google.flogger:flogger:0.9") + implementation("com.google.flogger:flogger-system-backend:0.9") + implementation("com.google.guava:guava:33.0.0-jre") implementation("com.google.protobuf:protobuf-java:4.32.0") @@ -73,11 +76,10 @@ dependencies { implementation("jakarta.annotation:jakarta.annotation-api:3.0.0") implementation("org.apache.commons:commons-jexl3:3.3") + implementation("org.apache.commons:commons-lang3:3.19.0") implementation("org.apache.httpcomponents.client5:httpclient5:5.3.1") - implementation("org.apache.logging.log4j:log4j-core:2.23.1") - implementation("org.glassfish.expressly:expressly:5.0.0") implementation("org.hibernate.validator:hibernate-validator:8.0.1.Final") diff --git a/src/main/java/at/ac/uibk/dps/cirrina/cirrina/Cirrina.kt b/src/main/java/at/ac/uibk/dps/cirrina/cirrina/Cirrina.kt index ac35cd63..328ce900 100644 --- a/src/main/java/at/ac/uibk/dps/cirrina/cirrina/Cirrina.kt +++ b/src/main/java/at/ac/uibk/dps/cirrina/cirrina/Cirrina.kt @@ -7,88 +7,80 @@ import at.ac.uibk.dps.cirrina.execution.`object`.event.NatsEventHandler import at.ac.uibk.dps.cirrina.execution.service.RandomServiceImplementationSelector import at.ac.uibk.dps.cirrina.execution.service.ServiceImplementationBuilder import at.ac.uibk.dps.cirrina.io.parsing.CsmParser +import com.google.common.flogger.FluentLogger import io.opentelemetry.api.OpenTelemetry import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk import java.net.URI -import org.apache.logging.log4j.Level -import org.apache.logging.log4j.LogManager -import org.apache.logging.log4j.core.LoggerContext + +private val logger: FluentLogger = FluentLogger.forEnclosingClass() /** Cirrina entry class. */ class Cirrina { companion object { const val NATS_CONNECTION_TIMEOUT = 60000L - val logger = LogManager.getLogger() - init { - setupLogging() - setupnewHealthService() - } - - // Set up the logger. - private fun setupLogging() { - (LogManager.getContext(false) as LoggerContext).apply { - configuration.getLoggerConfig(logger.name).level = Level.INFO - updateLoggers() - } - } - - // Create the health service. - private fun setupnewHealthService(): HealthService = + logger.atFine().log("Starting health service") runCatching { HealthService(EnvironmentVariables.healthPort.get()) } - .getOrElse { e -> throw RuntimeException("Failed to start the health service: $e", e) } + .getOrElse { e -> logger.atSevere().withCause(e).log("Could not start the health service") } + } } /** Run Cirrina as configured. */ fun run() { try { + logger.atFine().log("Creating the event handler") newEventHandler() .apply { if (this is NatsEventHandler) { + logger.atFine().log("Awaiting connection to NATS as the event handler") awaitInitialConnection(NATS_CONNECTION_TIMEOUT) } } .use { eventHandler -> + logger.atFiner().log("Subscribing to event sources") eventHandler.subscribe(NatsEventHandler.GLOBAL_SOURCE, "*") eventHandler.subscribe(NatsEventHandler.PERIPHERAL_SOURCE, "*") + logger.atFine().log("Creating the persistent context") newPersistentContext() .apply { if (this is NatsContext) { + logger.atFine().log("Awaiting connection to NATS as the persistent context") awaitInitialConnection(NATS_CONNECTION_TIMEOUT) } } .use { persistentContext -> val openTelemetry = getOpenTelemetry() - val serviceImplementationSelector = - RandomServiceImplementationSelector( - ServiceImplementationBuilder.from( - CsmParser.parseServiceImplementationBindings( - URI(EnvironmentVariables.serviceBindingsPath.get()) - ) - .bindings - ) - .build() - ) - Runtime( + logger.atFine().log("Loading service implementation bindings") + var serviceImplementationBindings = + CsmParser.parseServiceImplementationBindings( + URI(EnvironmentVariables.serviceBindingsPath.get()) + ) + .bindings + + logger.atFine().log("Creating the runtime") + val runtime = + Runtime( URI(EnvironmentVariables.appPath.get()), EnvironmentVariables.instantiate.get(), openTelemetry, - serviceImplementationSelector, + RandomServiceImplementationSelector( + ServiceImplementationBuilder.from(serviceImplementationBindings).build() + ), eventHandler, persistentContext, ) - .run() - logger.info("Done running") + logger.atFine().log("Running the runtime") + runtime.run() } } - } catch (e: EnvironmentVariableError) { - logger.error("There is an error in the current configuration", e) + } catch (e: ConfigurationError) { + logger.atSevere().withCause(e).log("There is an error in the current configuration") } catch (e: Exception) { - logger.error("There was an unknown in the runtime execution", e) + logger.atSevere().withCause(e).log("There was an unknown in the runtime execution") } } diff --git a/src/main/java/at/ac/uibk/dps/cirrina/cirrina/Error.kt b/src/main/java/at/ac/uibk/dps/cirrina/cirrina/Error.kt index 90dcb3e9..9c833371 100644 --- a/src/main/java/at/ac/uibk/dps/cirrina/cirrina/Error.kt +++ b/src/main/java/at/ac/uibk/dps/cirrina/cirrina/Error.kt @@ -1,6 +1,10 @@ package at.ac.uibk.dps.cirrina.cirrina -sealed class EnvironmentVariableError(message: String) : RuntimeException(message) { +sealed class ConfigurationError(message: String) : RuntimeException(message) { + class Unknown(what: String, `is`: Any) : ConfigurationError("Unknown $what which is '$`is`'") +} + +sealed class EnvironmentVariableError(message: String) : ConfigurationError(message) { class Missing(name: String) : EnvironmentVariableError("Missing required environment variable '$name'") @@ -9,7 +13,3 @@ sealed class EnvironmentVariableError(message: String) : RuntimeException(messag "Invalid value for environment variable '$name', the value is '$value', allowed values are '${allowed}'" ) } - -sealed class ConfigurationError(message: String) : RuntimeException(message) { - class Unknown(what: String, `is`: Any) : ConfigurationError("Unknown $what which is '$`is`'") -} diff --git a/src/main/java/at/ac/uibk/dps/cirrina/cirrina/HealthService.java b/src/main/java/at/ac/uibk/dps/cirrina/cirrina/HealthService.java index 71323bce..e99c635e 100644 --- a/src/main/java/at/ac/uibk/dps/cirrina/cirrina/HealthService.java +++ b/src/main/java/at/ac/uibk/dps/cirrina/cirrina/HealthService.java @@ -47,7 +47,7 @@ public HealthService(int port) { * Stops the health service. */ @Override - public void close() throws Exception { + public void close() { httpServer.stop(0); } } diff --git a/src/main/java/at/ac/uibk/dps/cirrina/cirrina/Runtime.kt b/src/main/java/at/ac/uibk/dps/cirrina/cirrina/Runtime.kt index e4f637c3..6571387c 100644 --- a/src/main/java/at/ac/uibk/dps/cirrina/cirrina/Runtime.kt +++ b/src/main/java/at/ac/uibk/dps/cirrina/cirrina/Runtime.kt @@ -9,15 +9,15 @@ import at.ac.uibk.dps.cirrina.execution.`object`.statemachine.StateMachine import at.ac.uibk.dps.cirrina.execution.service.ServiceImplementationSelector import at.ac.uibk.dps.cirrina.io.parsing.CsmParser import at.ac.uibk.dps.cirrina.utils.Id +import com.google.common.flogger.FluentLogger import io.opentelemetry.api.OpenTelemetry -import java.io.IOException import java.net.URI import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.async import kotlinx.coroutines.awaitAll import kotlinx.coroutines.runBlocking -import org.apache.logging.log4j.LogManager -import org.apache.logging.log4j.Logger + +private val logger: FluentLogger = FluentLogger.forEnclosingClass() /** * Runtime for executing state machines defined in a Cirrina CSML project. @@ -37,11 +37,7 @@ class Runtime( val eventHandler: EventHandler, val persistentContext: Context, ) { - companion object { - private val logger: Logger = LogManager.getLogger() - } - - // Instantiated state machines. + /** Instantiated state machines. */ val stateMachines: List /** Top-level extent. */ @@ -51,24 +47,33 @@ class Runtime( val collaborativeStateMachineClass = CollaborativeStateMachineClassBuilder.from(CsmParser.parseCsml(main)).build() + logger.atFine().log("Creating persistent context variables") collaborativeStateMachineClass.persistentContextVariables.forEach { variable -> - try { - logger.info("Creating persistent context variable '{}'", variable.name()) - persistentContext.create(variable.name(), variable.value()) - } catch (_: IOException) { - logger.info( - "Did not create persistent context variable '{}', does it already exist?", - variable.name(), - ) - } + runCatching { + logger.atFiner().log("Creating persistent context variable '${variable.name()}'") + persistentContext.create(variable.name(), variable.value()) + } + .onFailure { _ -> + logger + .atWarning() + .log( + "Did not create persistent context variable '${variable.name()}', does it already exist?" + ) + } } stateMachines = stateMachineNames - .map { name -> - collaborativeStateMachineClass.findStateMachineClassByName(name).orElseThrow { - IllegalArgumentException("No state machine found with name: $name") - } + .mapNotNull { name -> + collaborativeStateMachineClass.findStateMachineClassByName(name) + ?: run { + logger + .atWarning() + .log( + "A state machine with name '$name' could not be instantiated, because it does not exist" + ) + null + } } .flatMap { buildInstances(it, null) } } diff --git a/src/main/java/at/ac/uibk/dps/cirrina/classes/collaborativestatemachine/CollaborativeStateMachineClass.java b/src/main/java/at/ac/uibk/dps/cirrina/classes/collaborativestatemachine/CollaborativeStateMachineClass.java deleted file mode 100644 index 0cd175c8..00000000 --- a/src/main/java/at/ac/uibk/dps/cirrina/classes/collaborativestatemachine/CollaborativeStateMachineClass.java +++ /dev/null @@ -1,90 +0,0 @@ -package at.ac.uibk.dps.cirrina.classes.collaborativestatemachine; - -import at.ac.uibk.dps.cirrina.classes.statemachine.StateMachineClass; -import at.ac.uibk.dps.cirrina.execution.object.context.ContextVariable; -import at.ac.uibk.dps.cirrina.execution.object.event.Event; -import java.util.List; -import java.util.Optional; -import org.jgrapht.graph.DirectedPseudograph; - -/** - * Collaborative state machine class, describes the structure of a collaborative state machine. - *

- * A collaborative state machine is a graph with state machine classes as vertices and events as edges. An edge in the collaborative state - * machine graphs represents how a state machine can be "activated" by another state machine based on an event. - */ -public final class CollaborativeStateMachineClass - extends DirectedPseudograph { - - /** - * The name of the collaborative state machine. - */ - private final String name; - - /** - * The collection of persistent context variables. - */ - private final List persistentContextVariables; - - /** - * Initializes this collaborative state machine class. - * - * @param name Name. - */ - CollaborativeStateMachineClass(String name, List persistentContextVariables) { - super(Event.class); - this.name = name; - this.persistentContextVariables = persistentContextVariables; - } - - /** - * Return a string representation. - * - * @return String representation. - */ - @Override - public String toString() { - return name; - } - - /** - * Returns a state machine class by its name. - *

- * If no state machine class is known with the supplied name, empty is returned. - * - * @param name Name of the state machine to return. - * @return The state machine with the supplied name or empty. - */ - public Optional findStateMachineClassByName(String name) { - // Attempt to match the provided name to a known state machine - var states = vertexSet() - .stream() - .filter(state -> state.getName().equals(name)) - .toList(); - - // Expect precisely one state machine with the provided name - if (states.size() != 1) { - return Optional.empty(); - } - - return Optional.of(states.getFirst()); - } - - /** - * Returns the collection of state machine classes. - * - * @return State machine classes. - */ - public List getStateMachineClasses() { - return List.copyOf(vertexSet()); - } - - /** - * Returns the collection of persistent context variables. - * - * @return Persistent context variables. - */ - public List getPersistentContextVariables() { - return persistentContextVariables; - } -} diff --git a/src/main/java/at/ac/uibk/dps/cirrina/classes/collaborativestatemachine/CollaborativeStateMachineClass.kt b/src/main/java/at/ac/uibk/dps/cirrina/classes/collaborativestatemachine/CollaborativeStateMachineClass.kt new file mode 100644 index 00000000..b6f0d318 --- /dev/null +++ b/src/main/java/at/ac/uibk/dps/cirrina/classes/collaborativestatemachine/CollaborativeStateMachineClass.kt @@ -0,0 +1,40 @@ +package at.ac.uibk.dps.cirrina.classes.collaborativestatemachine + +import at.ac.uibk.dps.cirrina.classes.statemachine.StateMachineClass +import at.ac.uibk.dps.cirrina.execution.`object`.context.ContextVariable +import at.ac.uibk.dps.cirrina.execution.`object`.event.Event +import org.jgrapht.graph.DirectedPseudograph + +/** + * Collaborative state machine class, describes the structure of a collaborative state machine. + * + * A collaborative state machine is a graph with state machine classes as vertices and events as + * edges. An edge in the collaborative state machine graph represents how a state machine can be + * activated by another state machine based on an event. + */ +class CollaborativeStateMachineClass +internal constructor( + // The name of the collaborative state machine. + private val name: String, + + /** The collection of persistent context variables. */ + val persistentContextVariables: MutableList, +) : DirectedPseudograph(Event::class.java) { + + /** + * Returns a state machine class by its name. + * + * @param name Name of the state machine to return. + * @return The state machine with the supplied name or null if none or multiple are found. + */ + fun findStateMachineClassByName(name: String): StateMachineClass? { + val matches = vertexSet().filter { it.name == name } + return matches.singleOrNull() + } + + /** Returns the collection of state machine classes. */ + val stateMachineClasses: List + get() = vertexSet().toList() + + override fun toString(): String = name +} diff --git a/src/main/java/at/ac/uibk/dps/cirrina/execution/command/ActionAssignCommand.java b/src/main/java/at/ac/uibk/dps/cirrina/execution/command/ActionAssignCommand.java index f25760c2..89ea034b 100644 --- a/src/main/java/at/ac/uibk/dps/cirrina/execution/command/ActionAssignCommand.java +++ b/src/main/java/at/ac/uibk/dps/cirrina/execution/command/ActionAssignCommand.java @@ -5,15 +5,14 @@ import at.ac.uibk.dps.cirrina.execution.object.action.AssignAction; import at.ac.uibk.dps.cirrina.execution.object.expression.Expression; import at.ac.uibk.dps.cirrina.utils.Time; +import com.google.common.flogger.FluentLogger; import java.io.IOException; import java.util.ArrayList; import java.util.List; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; public final class ActionAssignCommand extends ActionCommand { - private static final Logger logger = LogManager.getLogger(); + private static final FluentLogger logger = FluentLogger.forEnclosingClass(); private final AssignAction assignAction; @@ -65,7 +64,7 @@ public List execute() throws UnsupportedOperationException { ) ); } catch (IOException e) { - logger.error("Data assignment failed", e); + logger.atWarning().withCause(e).log("Data assignment failed"); } return commands; diff --git a/src/main/java/at/ac/uibk/dps/cirrina/execution/command/ActionCreateCommand.java b/src/main/java/at/ac/uibk/dps/cirrina/execution/command/ActionCreateCommand.java index 73be4a25..c627e9e9 100644 --- a/src/main/java/at/ac/uibk/dps/cirrina/execution/command/ActionCreateCommand.java +++ b/src/main/java/at/ac/uibk/dps/cirrina/execution/command/ActionCreateCommand.java @@ -5,14 +5,13 @@ import at.ac.uibk.dps.cirrina.execution.object.action.CreateAction; import at.ac.uibk.dps.cirrina.execution.object.expression.Expression; import at.ac.uibk.dps.cirrina.utils.Time; +import com.google.common.flogger.FluentLogger; import java.util.ArrayList; import java.util.List; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; public final class ActionCreateCommand extends ActionCommand { - private static final Logger logger = LogManager.getLogger(); + private static final FluentLogger logger = FluentLogger.forEnclosingClass(); private final CreateAction createAction; @@ -69,7 +68,7 @@ public List execute() throws UnsupportedOperationException { gauges.attributesForData("create", !isPersistent ? "local" : "persistent", size) ); } catch (Exception e) { - logger.error("Data creation failed", e); + logger.atWarning().withCause(e).log("Data creation failed"); } return commands; diff --git a/src/main/java/at/ac/uibk/dps/cirrina/execution/command/ActionInvokeCommand.java b/src/main/java/at/ac/uibk/dps/cirrina/execution/command/ActionInvokeCommand.java index c761973e..e7ea0a9d 100644 --- a/src/main/java/at/ac/uibk/dps/cirrina/execution/command/ActionInvokeCommand.java +++ b/src/main/java/at/ac/uibk/dps/cirrina/execution/command/ActionInvokeCommand.java @@ -12,11 +12,10 @@ import at.ac.uibk.dps.cirrina.execution.object.statemachine.StateMachineEventHandler; import at.ac.uibk.dps.cirrina.execution.service.ServiceImplementation; import at.ac.uibk.dps.cirrina.utils.Time; +import com.google.common.flogger.FluentLogger; import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; /** * Action invoke command, performs a service type invocation. @@ -28,7 +27,7 @@ */ public final class ActionInvokeCommand extends ActionCommand { - private static final Logger logger = LogManager.getLogger(); + private static final FluentLogger logger = FluentLogger.forEnclosingClass(); private final InvokeAction invokeAction; @@ -63,11 +62,13 @@ public List execute() throws UnsupportedOperationException { serviceImplementation .invoke(input, executionContext.scope().getId()) .exceptionally(e -> { - logger.error( - "Service invocation failed for service '{}'", - serviceImplementation.getInformationString(), - e - ); + logger + .atWarning() + .withCause(e) + .log( + "Service invocation failed for service '%s'", + serviceImplementation.getInformationString() + ); return null; }) .thenAccept(output -> { @@ -141,18 +142,16 @@ private void assignServiceOutput(List output, Extent extent) { try { extent.trySet(outputReference.getReference(), outputVariable.value()); } catch (Exception e) { - logger.error( - "Failed to assign service output to variable '{}'", - outputReference.getReference(), - e - ); + logger + .atWarning() + .withCause(e) + .log("Failed to assign service output to variable '%s'", outputReference); } }, () -> - logger.warn( - "Service output does not contain expected variable '{}'", - outputReference.getReference() - ) + logger + .atWarning() + .log("Service output does not contain expected variable '%s'", outputReference) ); } } diff --git a/src/main/java/at/ac/uibk/dps/cirrina/execution/command/ActionMatchCommand.java b/src/main/java/at/ac/uibk/dps/cirrina/execution/command/ActionMatchCommand.java index a78415d7..d2b75605 100644 --- a/src/main/java/at/ac/uibk/dps/cirrina/execution/command/ActionMatchCommand.java +++ b/src/main/java/at/ac/uibk/dps/cirrina/execution/command/ActionMatchCommand.java @@ -1,14 +1,13 @@ package at.ac.uibk.dps.cirrina.execution.command; import at.ac.uibk.dps.cirrina.execution.object.action.MatchAction; +import com.google.common.flogger.FluentLogger; import java.util.ArrayList; import java.util.List; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; public final class ActionMatchCommand extends ActionCommand { - private static final Logger logger = LogManager.getLogger(); + private static final FluentLogger logger = FluentLogger.forEnclosingClass(); private final MatchAction matchAction; @@ -40,7 +39,7 @@ public List execute() throws UnsupportedOperationException { } } } catch (UnsupportedOperationException e) { - logger.error("Could not execute match action", e); + logger.atWarning().withCause(e).log("Could not execute match action"); } return commands; diff --git a/src/main/java/at/ac/uibk/dps/cirrina/execution/command/ActionRaiseCommand.java b/src/main/java/at/ac/uibk/dps/cirrina/execution/command/ActionRaiseCommand.java index 07057580..7afe8841 100644 --- a/src/main/java/at/ac/uibk/dps/cirrina/execution/command/ActionRaiseCommand.java +++ b/src/main/java/at/ac/uibk/dps/cirrina/execution/command/ActionRaiseCommand.java @@ -6,13 +6,9 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicReference; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; public final class ActionRaiseCommand extends ActionCommand { - private static final Logger logger = LogManager.getLogger(); - private final RaiseAction raiseAction; private final AtomicReference latency = new AtomicReference<>(); diff --git a/src/main/java/at/ac/uibk/dps/cirrina/execution/object/context/NatsContext.kt b/src/main/java/at/ac/uibk/dps/cirrina/execution/object/context/NatsContext.kt index 7473d577..9bb65ee9 100644 --- a/src/main/java/at/ac/uibk/dps/cirrina/execution/object/context/NatsContext.kt +++ b/src/main/java/at/ac/uibk/dps/cirrina/execution/object/context/NatsContext.kt @@ -1,12 +1,13 @@ package at.ac.uibk.dps.cirrina.execution.`object`.context import at.ac.uibk.dps.cirrina.execution.`object`.exchange.ValueExchange +import com.google.common.flogger.FluentLogger import io.nats.client.* import io.nats.client.api.KeyValueConfiguration import java.io.IOException import java.util.concurrent.CountDownLatch -import org.apache.logging.log4j.LogManager -import org.apache.logging.log4j.Logger + +private val logger: FluentLogger = FluentLogger.forEnclosingClass() /** * A persistent context stored in a NATS key-value bucket. @@ -24,21 +25,16 @@ class NatsContext( private val bucketName: String, private val deleteBucket: Boolean = false, ) : Context(isLocal), AutoCloseable { - - companion object { - private val logger: Logger = LogManager.getLogger() - } - - // NATS connection can be null if not connected + // NATS connection can be null if not connected. private var connection: Connection? = null - // NATs key-value can be null if not connected + // NATs key-value can be null if not connected. private var keyValue: KeyValue? = null - // Lock for the connection and key-value + // Lock for the connection and key-value. private val lock = Any() - // Latch for initial connection + // Latch for initial connection. private val connectedLatch = CountDownLatch(1) init { @@ -51,25 +47,30 @@ class NatsContext( when (type) { ConnectionListener.Events.CONNECTED, ConnectionListener.Events.RECONNECTED -> { + logger.atFiner().log("Received NATS connected event") synchronized(lock) { try { keyValue = getOrCreateBucket(conn) connection = conn } catch (e: Exception) { - logger.error("Failed to setup the persistent context bucket", e) + logger + .atWarning() + .withCause(e) + .log("Failed to setup the persistent context bucket") } finally { connectedLatch.countDown() } } - logger.info("(Re)connected to the NATS server for persistent context") + logger.atFine().log("Connected to the NATS server") } ConnectionListener.Events.DISCONNECTED -> { + logger.atFiner().log("Received NATS disconnected event") synchronized(lock) { connection = null keyValue = null } - logger.warn("Disconnected from the NATS server for persistent context") + logger.atFine().log("Disconnected from the NATS server") } else -> {} @@ -81,9 +82,12 @@ class NatsContext( try { Nats.connectAsynchronously(options, true) - } catch (_: InterruptedException) { + } catch (e: InterruptedException) { Thread.currentThread().interrupt() - logger.error("Interrupted while initiating NATS connection, will never connect") + logger + .atSevere() + .withCause(e) + .log("Interrupted while initiating NATS connection, will never connect") } } @@ -92,14 +96,17 @@ class NatsContext( runCatching { conn.keyValueManagement().apply { if (!bucketNames.contains(bucketName)) { - logger.warn("Persistent bucket '$bucketName' does not exist, creating it") + logger.atInfo().log("Persistent bucket '$bucketName' does not exist, creating it") create(KeyValueConfiguration.Builder().name(bucketName).build()) } } conn.keyValue(bucketName) } .getOrElse { e -> - throw IOException("Failed to create or retrieve bucket '$bucketName': ${e.message}", e) + throw IOException( + "Failed to create or retrieve bucket '$bucketName', is JetStream enabled?", + e, + ) } private fun toBytes(value: Any?): ByteArray = ValueExchange(value).toBytes() @@ -117,8 +124,7 @@ class NatsContext( override fun get(name: String): Any = synchronized(lock) { runCatching { - val kv = - keyValue ?: throw IOException("Not connected to the NATS server for persistent context") + val kv = keyValue ?: throw IOException("Not connected to the NATS server") if (!kv.keys().contains(name)) { throw IOException("A variable with the name '$name' does not exist") } @@ -141,8 +147,7 @@ class NatsContext( override fun create(name: String, value: Any?): Int = synchronized(lock) { runCatching { - val kv = - keyValue ?: throw IOException("Not connected to the NATS server for persistent context") + val kv = keyValue ?: throw IOException("Not connected to the NATS server") if (kv.keys().contains(name)) { throw IOException("A variable with the name '$name' already exists") } @@ -166,8 +171,7 @@ class NatsContext( override fun assign(name: String, value: Any?): Int = synchronized(lock) { runCatching { - val kv = - keyValue ?: throw IOException("Not connected to the NATS server for persistent context") + val kv = keyValue ?: throw IOException("Not connected to the NATS server") if (!kv.keys().contains(name)) { throw IOException("A variable with the name '$name' does not exist") } @@ -189,8 +193,7 @@ class NatsContext( override fun delete(name: String) { synchronized(lock) { runCatching { - val kv = - keyValue ?: throw IOException("Not connected to the NATS server for persistent context") + val kv = keyValue ?: throw IOException("Not connected to the NATS server") if (!kv.keys().contains(name)) { throw IOException("A variable with the name '$name' does not exist") } @@ -205,8 +208,7 @@ class NatsContext( override fun getAll(): List = synchronized(lock) { runCatching { - val kv = - keyValue ?: throw IOException("Not connected to the NATS server for persistent context") + val kv = keyValue ?: throw IOException("Not connected to the NATS server") kv.keys().map { key -> val entry = kv.get(key) ContextVariable(entry.key, entry.value) @@ -225,7 +227,7 @@ class NatsContext( conn.close() } } - .onFailure { e -> throw IOException("Failed to close the NATS persistent context", e) } + .onFailure { e -> logger.atWarning().withCause(e).log("Failed to close the NATS") } } } diff --git a/src/main/java/at/ac/uibk/dps/cirrina/execution/object/event/Event.java b/src/main/java/at/ac/uibk/dps/cirrina/execution/object/event/Event.java index 6ba168cb..3c27f722 100644 --- a/src/main/java/at/ac/uibk/dps/cirrina/execution/object/event/Event.java +++ b/src/main/java/at/ac/uibk/dps/cirrina/execution/object/event/Event.java @@ -9,6 +9,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Objects; +import org.apache.commons.lang3.builder.ToStringBuilder; /** * Event, resembles an event as it is sent to state machine instances. @@ -131,7 +132,12 @@ public Event withData(List data) { */ @Override public String toString() { - return name; + return new ToStringBuilder(this) + .append("createdTime", createdTime) + .append("id", id) + .append("name", name) + .append("channel", channel) + .build(); } /** diff --git a/src/main/java/at/ac/uibk/dps/cirrina/execution/object/event/NatsEventHandler.kt b/src/main/java/at/ac/uibk/dps/cirrina/execution/object/event/NatsEventHandler.kt index 8789ac49..8db718bd 100644 --- a/src/main/java/at/ac/uibk/dps/cirrina/execution/object/event/NatsEventHandler.kt +++ b/src/main/java/at/ac/uibk/dps/cirrina/execution/object/event/NatsEventHandler.kt @@ -2,10 +2,11 @@ package at.ac.uibk.dps.cirrina.execution.`object`.event import at.ac.uibk.dps.cirrina.csm.Csml import at.ac.uibk.dps.cirrina.execution.`object`.exchange.EventExchange +import com.google.common.flogger.FluentLogger import io.nats.client.* import java.util.concurrent.CountDownLatch -import org.apache.logging.log4j.LogManager -import org.apache.logging.log4j.Logger + +private val logger: FluentLogger = FluentLogger.forEnclosingClass() /** * Event handler that uses NATS for communication. @@ -17,13 +18,21 @@ class NatsEventHandler(natsUrl: String) : EventHandler() { companion object { const val GLOBAL_SOURCE = "global" const val PERIPHERAL_SOURCE = "peripheral" - private val logger: Logger = LogManager.getLogger() } + // NATS connection can be null if not connected. private var connection: Connection? = null + + // NATs dispatcher can be null if not connected. private var dispatcher: Dispatcher? = null + + // Lock for the connection and dispatcher. private val lock = Any() + + // Subjects subscribed to, used to re-subscribe after reconnect. private val subscriptions = mutableSetOf() + + // Latch for initial connection. private val connectedLatch = CountDownLatch(1) init { @@ -45,12 +54,12 @@ class NatsEventHandler(natsUrl: String) : EventHandler() { } connection = conn } catch (e: Exception) { - logger.error("Failed to setup the NATS event handler", e) + logger.atWarning().withCause(e).log("Failed to setup the NATS event handler") } finally { connectedLatch.countDown() } } - logger.info("(Re)connected to the NATS server for event handling") + logger.atFine().log("Connected to the NATS server") } ConnectionListener.Events.DISCONNECTED -> { @@ -58,7 +67,7 @@ class NatsEventHandler(natsUrl: String) : EventHandler() { connection = null dispatcher = null } - logger.warn("Disconnected from the NATS server for event handling") + logger.atFine().log("Disconnected from the NATS server") } else -> {} @@ -70,9 +79,12 @@ class NatsEventHandler(natsUrl: String) : EventHandler() { try { Nats.connectAsynchronously(options, true) - } catch (_: InterruptedException) { + } catch (e: InterruptedException) { Thread.currentThread().interrupt() - logger.error("Interrupted while initiating NATS connection, will never connect") + logger + .atSevere() + .withCause(e) + .log("Interrupted while initiating NATS connection, will never connect") } } @@ -85,8 +97,8 @@ class NatsEventHandler(natsUrl: String) : EventHandler() { .onFailure { e -> when (e) { is UnsupportedOperationException -> - logger.debug("A message could not be read as an event: ${e.message}") - else -> logger.error("Unexpected error handling a NATS message", e) + logger.atFiner().withCause(e).log("A message could not be read as an event") + else -> logger.atWarning().withCause(e).log("Unexpected error while handling a message") } } } @@ -106,14 +118,14 @@ class NatsEventHandler(natsUrl: String) : EventHandler() { Csml.EventChannel.EXTERNAL -> source?.let { "$it.${event.name}" } ?: run { - logger.warn("Event source is null, cannot send event '${event.name}'") + logger.atWarning().log("Event source is null, cannot send event '${event.name}'") return } Csml.EventChannel.GLOBAL -> "$GLOBAL_SOURCE.${event.name}" else -> { - logger.warn("Unsupported channel '${event.channel}', event not sent") + logger.atWarning().log("Unsupported channel '${event.channel}', event not sent") return } } @@ -121,8 +133,10 @@ class NatsEventHandler(natsUrl: String) : EventHandler() { synchronized(lock) { connection?.let { conn -> runCatching { conn.publish(subject, EventExchange(event).toBytes()) } - .onFailure { e -> logger.error("Failed to publish event '$subject' through NATS", e) } - } ?: logger.warn("Not sending event, not connected to the NATS server") + .onFailure { e -> + logger.atWarning().withCause(e).log("Failed to publish event '$subject'", e) + } + } ?: logger.atWarning().log("Not sending event, not connected to the NATS server") } } @@ -137,9 +151,9 @@ class NatsEventHandler(natsUrl: String) : EventHandler() { subscriptions.add(subject) runCatching { dispatcher?.subscribe(subject) - ?: logger.debug("Dispatcher unavailable; queued subscription: $subject") + ?: logger.atFiner().log("Dispatcher unavailable; queued subscription: $subject") } - .onFailure { e -> logger.error("Could not subscribe to $eventName", e) } + .onFailure { e -> logger.atWarning().withCause(e).log("Could not subscribe to $eventName") } } } @@ -153,7 +167,9 @@ class NatsEventHandler(natsUrl: String) : EventHandler() { synchronized(lock) { subscriptions.remove(subject) runCatching { dispatcher?.unsubscribe(subject) } - .onFailure { e -> logger.error("Could not unsubscribe from $eventName", e) } + .onFailure { e -> + logger.atWarning().withCause(e).log("Could not unsubscribe from $eventName", e) + } } } @@ -169,9 +185,11 @@ class NatsEventHandler(natsUrl: String) : EventHandler() { subscriptions.add(subject) runCatching { dispatcher?.subscribe(subject) - ?: logger.debug("Dispatcher unavailable; queued subscription: $subject") + ?: logger.atFiner().log("Dispatcher unavailable; queued subscription: $subject") + } + .onFailure { e -> + logger.atWarning().withCause(e).log("Could not subscribe to $subject", e) } - .onFailure { e -> logger.error("Could not subscribe to $subject", e) } } } @@ -186,7 +204,9 @@ class NatsEventHandler(natsUrl: String) : EventHandler() { synchronized(lock) { subscriptions.remove(subject) runCatching { dispatcher?.unsubscribe(subject) } - .onFailure { e -> logger.error("Could not unsubscribe from $subject", e) } + .onFailure { e -> + logger.atWarning().withCause(e).log("Could not unsubscribe from $subject", e) + } } } @@ -197,16 +217,11 @@ class NatsEventHandler(natsUrl: String) : EventHandler() { */ override fun close() { synchronized(lock) { - try { - runCatching { - dispatcher?.let { connection?.closeDispatcher(it) } - connection?.close() - } - .onFailure { e -> logger.error("Failed to close the NATS event handler", e) } - } catch (_: InterruptedException) { - Thread.currentThread().interrupt() - logger.error("Failed to close the NATS event handler due to interruption") - } + runCatching { + dispatcher?.let { connection?.closeDispatcher(it) } + connection?.close() + } + .onFailure { e -> logger.atWarning().withCause(e).log("Failed to close the NATS") } } } diff --git a/src/main/java/at/ac/uibk/dps/cirrina/execution/object/statemachine/StateMachine.java b/src/main/java/at/ac/uibk/dps/cirrina/execution/object/statemachine/StateMachine.java index 7be7d823..0cdafef1 100644 --- a/src/main/java/at/ac/uibk/dps/cirrina/execution/object/statemachine/StateMachine.java +++ b/src/main/java/at/ac/uibk/dps/cirrina/execution/object/statemachine/StateMachine.java @@ -35,6 +35,7 @@ import at.ac.uibk.dps.cirrina.tracing.Gauges; import at.ac.uibk.dps.cirrina.utils.Id; import at.ac.uibk.dps.cirrina.utils.Time; +import com.google.common.flogger.FluentLogger; import io.opentelemetry.api.OpenTelemetry; import jakarta.annotation.Nullable; import jakarta.validation.constraints.NotNull; @@ -46,8 +47,7 @@ import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.stream.Collectors; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; +import org.apache.commons.lang3.builder.ToStringBuilder; public final class StateMachine implements Runnable, EventListener, Scope { @@ -59,7 +59,7 @@ public final class StateMachine implements Runnable, EventListener, Scope { /** * State machine logger. */ - private static final Logger logger = LogManager.getLogger(); + private static final FluentLogger logger = FluentLogger.forEnclosingClass(); /** * State machine ID. @@ -486,6 +486,10 @@ private void switchActiveState(State state) throws IllegalArgumentException { throw new IllegalArgumentException("A state '%s' does not exist".formatted(stateName)); } + logger + .atFine() + .log("State machine '%s': Switching state '%s' to '%s'", this, activeState, state); + // Update the active state activeState = state; } @@ -616,6 +620,8 @@ private void handleInternalTransition( @NotNull Transition transition, @Nullable Event raisingEvent ) throws UnsupportedOperationException { + logger.atFine().log("State machine '%s': Handling internal transition '%s'", this, transition); + // Only perform the transition doTransition(transition, raisingEvent); } @@ -631,6 +637,8 @@ private void handleExternalTransition( @NotNull Transition transition, @Nullable Event raisingEvent ) throws UnsupportedOperationException { + logger.atFine().log("State machine '%s': Handling external transition '%s'", this, transition); + final var targetStateName = transition.getTargetStateName().get(); // Acquire the target state instance @@ -680,12 +688,13 @@ private void handleTransition(@NotNull Transition transition, @Nullable Event ra */ private Optional handleEvent(Event event) throws InterruptedException, UnsupportedOperationException { + logger.atFiner().log("State machine '%s': Handling event '%s'", this, event); + // Increment events received counter counters .getCounter(COUNTER_EVENTS_HANDLED) .add(1, counters.attributesForEvent(event.getChannel().toString())); - // Find a matching transition try { // Create a temporary in-memory context containing the event data final var eventDataContext = new InMemoryContext(true); @@ -703,18 +712,22 @@ private Optional handleEvent(Event event) final var onTransition = trySelectOnTransition(event, extent); // Set the event data in the actual extent - onTransition.ifPresent(transition -> { - try { - for (var contextVariable : event.getData()) { - getExtent().setOrCreate( - EVENT_DATA_VARIABLE_PREFIX + contextVariable.name(), - contextVariable.value() - ); + onTransition.ifPresentOrElse( + transition -> { + logger.atFiner().log("State machine '%s': Selected on transition '%s'", this, transition); + try { + for (var contextVariable : event.getData()) { + getExtent().setOrCreate( + EVENT_DATA_VARIABLE_PREFIX + contextVariable.name(), + contextVariable.value() + ); + } + } catch (IOException e) { + logger.atWarning().withCause(e).log("Failed to set event data"); } - } catch (IOException e) { - logger.error("Failed to set event data", e); - } - }); + }, + () -> logger.atFiner().log("State machine '%s': No on transition selected", this) + ); return onTransition; } catch (IllegalStateException e) { @@ -729,6 +742,8 @@ private Optional handleEvent(Event event) */ @Override public void run() { + logger.atInfo().log("State machine '%s': Starting", this); + // Increment state machine instances counter counters.getCounter(COUNTER_STATE_MACHINE_INSTANCES).add(1, counters.attributesForInstances()); @@ -776,14 +791,13 @@ public void run() { } } } catch (InterruptedException e) { - logger.info("{} is interrupted", stateMachineId.toString()); - Thread.currentThread().interrupt(); + logger.atSevere().withCause(e).log("State machine '%s': Interrupted", this); } catch (Exception e) { - logger.error("{} received a fatal error", stateMachineId.toString(), e); + logger.atSevere().withCause(e).log("State machine '%s': Received a fatal error", this); } - logger.info("{} has stopped", stateMachineId.toString()); + logger.atInfo().log("State machine '%s': Stopped", this); // Decrement state machine instances counter counters.getCounter(COUNTER_STATE_MACHINE_INSTANCES).add(-1, counters.attributesForInstances()); @@ -806,6 +820,11 @@ public String getId() { return getStateMachineInstanceId().toString(); } + @Override + public String toString() { + return new ToStringBuilder(this).append("id", stateMachineId).toString(); + } + /** * Returns the state machine instance ID. * diff --git a/src/main/java/at/ac/uibk/dps/cirrina/execution/object/transition/Transition.java b/src/main/java/at/ac/uibk/dps/cirrina/execution/object/transition/Transition.java index 0094f38b..e9b0c4b8 100644 --- a/src/main/java/at/ac/uibk/dps/cirrina/execution/object/transition/Transition.java +++ b/src/main/java/at/ac/uibk/dps/cirrina/execution/object/transition/Transition.java @@ -6,6 +6,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Optional; +import org.apache.commons.lang3.builder.ToStringBuilder; import org.jgrapht.traverse.TopologicalOrderIterator; public final class Transition { @@ -46,4 +47,12 @@ public List getActionCommands(CommandFactory commandFactory) { public boolean isElse() { return isElse; } + + @Override + public String toString() { + return new ToStringBuilder(this) + .append("internal", isInternalTransition()) + .append("targetStateName", getTargetStateName()) + .toString(); + } } diff --git a/src/main/java/at/ac/uibk/dps/cirrina/utils/Id.java b/src/main/java/at/ac/uibk/dps/cirrina/utils/Id.java index 4d0a6125..1528c20b 100644 --- a/src/main/java/at/ac/uibk/dps/cirrina/utils/Id.java +++ b/src/main/java/at/ac/uibk/dps/cirrina/utils/Id.java @@ -2,6 +2,7 @@ import java.util.Objects; import java.util.UUID; +import org.apache.commons.lang3.builder.ToStringBuilder; public final class Id { @@ -9,7 +10,7 @@ public final class Id { @Override public String toString() { - return uuid.toString(); + return new ToStringBuilder(this).append("uuid", uuid).toString(); } @Override diff --git a/src/test/java/at/ac/uibk/dps/cirrina/classes/statemachine/StateMachineClassTest.kt b/src/test/java/at/ac/uibk/dps/cirrina/classes/statemachine/StateMachineClassTest.kt index a3d70e62..e7dc2f28 100644 --- a/src/test/java/at/ac/uibk/dps/cirrina/classes/statemachine/StateMachineClassTest.kt +++ b/src/test/java/at/ac/uibk/dps/cirrina/classes/statemachine/StateMachineClassTest.kt @@ -22,8 +22,7 @@ class StateMachineClassTest { CsmParser.parseCsml(DefaultDescriptions.complete) ) .build() - .findStateMachineClassByName("stateMachine1") - .get() + .findStateMachineClassByName("stateMachine1")!! } }