diff --git a/build.gradle.kts b/build.gradle.kts index 1869508..33af969 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -63,6 +63,8 @@ dependencies { implementation("com.google.protobuf:protobuf-java:4.32.0") + implementation("io.etcd:jetcd-core:0.8.5") + implementation("io.nats:jnats:2.17.3") implementation(platform("io.opentelemetry:opentelemetry-bom:1.38.0")) diff --git a/src/main/java/at/ac/uibk/dps/cirrina/cirrina/Cirrina.kt b/src/main/java/at/ac/uibk/dps/cirrina/cirrina/Cirrina.kt index ef600ca..3cd1716 100644 --- a/src/main/java/at/ac/uibk/dps/cirrina/cirrina/Cirrina.kt +++ b/src/main/java/at/ac/uibk/dps/cirrina/cirrina/Cirrina.kt @@ -1,7 +1,7 @@ package at.ac.uibk.dps.cirrina.cirrina import at.ac.uibk.dps.cirrina.execution.`object`.context.Context -import at.ac.uibk.dps.cirrina.execution.`object`.context.NatsContext +import at.ac.uibk.dps.cirrina.execution.`object`.context.EtcdContext import at.ac.uibk.dps.cirrina.execution.`object`.event.EventHandler import at.ac.uibk.dps.cirrina.execution.`object`.event.NatsEventHandler import at.ac.uibk.dps.cirrina.execution.service.RandomServiceImplementationSelector @@ -21,6 +21,7 @@ private val logger: FluentLogger = FluentLogger.forEnclosingClass() class Cirrina { companion object { const val NATS_CONNECTION_TIMEOUT = 60000L + const val ETCD_CONNECTION_TIMEOUT = 60000L init { ToStringBuilder.setDefaultStyle(SIMPLE_STYLE) @@ -59,9 +60,9 @@ class Cirrina { logger.atFine().log("Creating the persistent context") newPersistentContext() .apply { - if (this is NatsContext) { - logger.atFine().log("Awaiting connection to NATS as the persistent context") - awaitInitialConnection(NATS_CONNECTION_TIMEOUT) + if (this is EtcdContext) { + logger.atFine().log("Awaiting connection to Etcd as the persistent context") + awaitInitialConnection(ETCD_CONNECTION_TIMEOUT) } } .use { persistentContext -> @@ -116,7 +117,7 @@ class Cirrina { // Construct a new persistent context based as configured. private fun newPersistentContext(): Context = when (EnvironmentVariables.contextProvider.get()) { - PersistentContextProvider.NATS -> newNatsPersistentContext() + PersistentContextProvider.ETCD -> newEtcdPersistentContext() else -> throw ConfigurationError.Unknown( "persistent context", @@ -124,13 +125,9 @@ class Cirrina { ) } - // Construct a new NATS persistent context as configured. - private fun newNatsPersistentContext(): NatsContext = - NatsContext( - false, - EnvironmentVariables.natsContextUrl.get(), - EnvironmentVariables.natsContextBucket.get(), - ) + // Construct a new Etcd persistent context as configured. + private fun newEtcdPersistentContext(): EtcdContext = + EtcdContext(false, listOf(EnvironmentVariables.etcdContextUrl.get())) // Construct a new OpenTelemetry instance as configured. private fun getOpenTelemetry(): OpenTelemetry = diff --git a/src/main/java/at/ac/uibk/dps/cirrina/cirrina/EnvironmentVariables.kt b/src/main/java/at/ac/uibk/dps/cirrina/cirrina/EnvironmentVariables.kt index f5a7f12..5070158 100644 --- a/src/main/java/at/ac/uibk/dps/cirrina/cirrina/EnvironmentVariables.kt +++ b/src/main/java/at/ac/uibk/dps/cirrina/cirrina/EnvironmentVariables.kt @@ -7,7 +7,7 @@ enum class EventProvider { /** Provider for the persistent context. */ enum class PersistentContextProvider { - NATS + ETCD } /** An environment variable, which can be converted from a string to the required type. */ @@ -38,11 +38,8 @@ object EnvironmentVariables { /** The NATS event server URL. */ val natsEventUrl = EnvironmentVariable("NATS_EVENT_URL", default = "nats://localhost:4222/") - /** The NATS context server URL. */ - val natsContextUrl = EnvironmentVariable("NATS_CONTEXT_URL", default = "nats://localhost:4222") - - /** The NATS context bucket. */ - val natsContextBucket = EnvironmentVariable("NATS_CONTEXT_BUCKET", default = "persistent") + /** The Etcd context server URL. */ + val etcdContextUrl = EnvironmentVariable("ETCD_CONTEXT_URL", default = "http://localhost:2379") /** The path to the CSML application. */ val appPath = EnvironmentVariable("APP_PATH", required = true) @@ -80,7 +77,7 @@ object EnvironmentVariables { val contextProvider = EnvironmentVariable( name = "CONTEXT_PROVIDER", - default = PersistentContextProvider.NATS, + default = PersistentContextProvider.ETCD, mapper = { value -> try { PersistentContextProvider.valueOf(value.uppercase()) diff --git a/src/main/java/at/ac/uibk/dps/cirrina/execution/object/context/Context.java b/src/main/java/at/ac/uibk/dps/cirrina/execution/object/context/Context.java index 4f5b3a6..47736f3 100644 --- a/src/main/java/at/ac/uibk/dps/cirrina/execution/object/context/Context.java +++ b/src/main/java/at/ac/uibk/dps/cirrina/execution/object/context/Context.java @@ -13,7 +13,7 @@ public abstract class Context implements AutoCloseable { /** * Initializes this context object. * - * @param isLocal True if this context is local, otherwise false. + * @param isLocal true if this context is local, otherwise false */ public Context(boolean isLocal) { this.isLocal = isLocal; @@ -22,29 +22,29 @@ public Context(boolean isLocal) { /** * Retrieve a context variable. * - * @param name Name of the context variable. - * @return The retrieved context variable. - * @throws IOException If the context variable could not be retrieved. + * @param name name of the context variable + * @return The retrieved context variable + * @throws IOException if the context variable could not be retrieved */ public abstract Object get(String name) throws IOException; /** * Creates a context variable. * - * @param name Name of the context variable. - * @param value Value of the context variable. - * @return Byte size of stored data. - * @throws IOException If the variable could not be created. + * @param name name of the context variable + * @param value value of the context variable + * @return byte size of stored data + * @throws IOException if the variable could not be created */ public abstract int create(String name, Object value) throws IOException; /** * Assigns to a context variable. * - * @param name Name of the context variable. - * @param value New value of the context variable. - * @return Byte size of stored data. - * @throws IOException If the variable could not be assigned to. + * @param name name of the context variable + * @param value new value of the context variable + * @return byte size of stored data + * @throws IOException if the variable could not be assigned to */ public abstract int assign(String name, Object value) throws IOException; @@ -56,18 +56,25 @@ public Context(boolean isLocal) { */ public abstract void delete(String name) throws IOException; + /** + * Deletes all context variables. + * + * @throws IOException if the variable could not be deleted + */ + public abstract void deleteAll() throws IOException; + /** * Returns all context variables. * - * @return Context variables. - * @throws IOException If the variables could not be retrieved. + * @return context variables. + * @throws IOException if the variables could not be retrieved */ public abstract List getAll() throws IOException; /** * Returns a flag that indicates if this context is local. * - * @return True if local, otherwise false. + * @return true if local, otherwise false */ public boolean isLocal() { return isLocal; diff --git a/src/main/java/at/ac/uibk/dps/cirrina/execution/object/context/ContextBuilder.java b/src/main/java/at/ac/uibk/dps/cirrina/execution/object/context/ContextBuilder.java index 736095f..146d82f 100644 --- a/src/main/java/at/ac/uibk/dps/cirrina/execution/object/context/ContextBuilder.java +++ b/src/main/java/at/ac/uibk/dps/cirrina/execution/object/context/ContextBuilder.java @@ -61,23 +61,6 @@ public ContextBuilder inMemoryContext(boolean isLocal) { return this; } - /** - * Build a NATS context. - * - * @param isLocal True if this context is local, otherwise false. - * @param natsUrl NATS url. - * @param bucketName NATS bucket name. - * @return This builder. - * @throws IOException If the context could not be built. - * @see NatsContext - */ - public ContextBuilder natsContext(boolean isLocal, String natsUrl, String bucketName) - throws IOException { - context = new NatsContext(isLocal, natsUrl, bucketName, false); - - return this; - } - /** * Builds the current context. * diff --git a/src/main/java/at/ac/uibk/dps/cirrina/execution/object/context/EtcdContext.kt b/src/main/java/at/ac/uibk/dps/cirrina/execution/object/context/EtcdContext.kt new file mode 100644 index 0000000..e6a6ea6 --- /dev/null +++ b/src/main/java/at/ac/uibk/dps/cirrina/execution/object/context/EtcdContext.kt @@ -0,0 +1,230 @@ +package at.ac.uibk.dps.cirrina.execution.`object`.context + +import at.ac.uibk.dps.cirrina.execution.`object`.exchange.ValueExchange +import com.google.common.flogger.FluentLogger +import com.google.protobuf.ByteString +import io.etcd.jetcd.ByteSequence +import io.etcd.jetcd.Client +import io.etcd.jetcd.op.Cmp +import io.etcd.jetcd.op.CmpTarget +import io.etcd.jetcd.op.Op +import io.etcd.jetcd.options.DeleteOption +import io.etcd.jetcd.options.GetOption +import io.etcd.jetcd.options.OptionsUtil +import io.etcd.jetcd.options.PutOption +import java.io.Closeable +import java.io.IOException +import java.util.concurrent.CompletableFuture +import java.util.concurrent.Executors +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicBoolean + +private val logger: FluentLogger = FluentLogger.forEnclosingClass() + +private class AsyncEtcdConnection( + private val endpoints: List, // Etcd endpoints + private val retryDelayMs: Long = 1000, // Delay between retries in ms +) : Closeable { + // Executor for retry loop. + private val executor = Executors.newSingleThreadExecutor() + + // Controls retry loop. + private val running = AtomicBoolean(true) + + // Future holding the Etcd client. + private val clientFuture = CompletableFuture() + + init { + executor.submit { + while (running.get() && !clientFuture.isDone) { + try { + logger.atFiner().log("Attempting to connect to Etcd") + clientFuture.complete(Client.builder().endpoints(*endpoints.toTypedArray()).build()) + break + } catch (_: Exception) { + logger.atWarning().log("Failed to connect to Etcd") + Thread.sleep(retryDelayMs) + } + } + } + } + + // Returns the CompletableFuture that completes when the client is connected. + fun getClientFuture(): CompletableFuture = clientFuture + + /** Closes the connection manager and the underlying Etcd client. */ + override fun close() { + running.set(false) + executor.shutdownNow() + clientFuture.thenAccept { it.close() } + } +} + +class EtcdContext(isLocal: Boolean, endpoints: List) : Context(isLocal), AutoCloseable { + // Async Etcd connection manager. + private val asyncConn = AsyncEtcdConnection(endpoints) + + // Converts value to ByteArray. + private fun toBytes(value: Any?) = ValueExchange(value).toBytes() + + // Converts ByteArray back to value. + private fun fromBytes(bytes: ByteArray) = ValueExchange.fromBytes(bytes).value + + // Returns the connected Etcd client or throws IOException if not connected. + private fun client(): Client = + asyncConn.getClientFuture().getNow(null) ?: throw IOException("Etcd client not connected") + + /** + * Retrieves a variable by name. + * + * @param name name of the variable + * @return value of the variable + * @throws IOException if the variable does not exist or retrieval fails + */ + override fun get(name: String): Any = + runCatching { + fromBytes( + client() + .kvClient + .get(ByteSequence.from(name.toByteArray())) + .get() + .kvs + .firstOrNull() + ?.value + ?.bytes ?: throw IOException("A variable with the name '$name' does not exist") + ) + } + .getOrElse { e -> throw IOException("Failed to retrieve variable '$name'", e) } + + /** + * Creates a new context variable. + * + * The byte size is only returned for binary (byte array) data and is 0 otherwise. + * + * @param name name of the context variable + * @param value value of the context variable + * @return byte size of stored data + * @throws IOException if a variable with the same name already exists + */ + override fun create(name: String, value: Any?): Int = + runCatching { + val bytes = toBytes(value) + val key = ByteSequence.from(name.toByteArray()) + client() + .kvClient + .txn() + .If(Cmp(key, Cmp.Op.EQUAL, CmpTarget.createRevision(0))) + .Then(Op.put(key, ByteSequence.from(bytes), PutOption.DEFAULT)) + .Else(Op.get(key, GetOption.DEFAULT)) + .commit() + .get() + .takeIf { it.isSucceeded } + ?.let { bytes.size } + ?: throw IOException("A variable with the name '$name' already exists") + } + .getOrElse { e -> throw IOException("Failed to create variable '$name'", e) } + + /** + * Assigns a new value to a variable. + * + * @param name name of the variable + * @param value new value + * @return byte size of the stored data + * @throws IOException if the assignment fails + */ + override fun assign(name: String, value: Any?): Int = + runCatching { + val bytes = toBytes(value) + val key = ByteSequence.from(name.toByteArray()) + client() + .kvClient + .txn() + .If(Cmp(key, Cmp.Op.GREATER, CmpTarget.createRevision(0))) + .Then(Op.put(key, ByteSequence.from(bytes), PutOption.DEFAULT)) + .Else(Op.get(key, GetOption.DEFAULT)) + .commit() + .get() + .takeIf { it.isSucceeded } + ?.let { bytes.size } + ?: throw IOException("A variable with the name '$name' does not exist") + } + .getOrElse { e -> throw IOException("Failed to assign variable '$name'", e) } + + /** + * Deletes a variable. + * + * @param name name of the variable + * @throws IOException if the variable does not exist or deletion fails + */ + override fun delete(name: String) { + runCatching { + client().kvClient.delete(ByteSequence.from(name.toByteArray())).get().takeIf { + it.deleted > 0 + } ?: throw IOException("A variable with the name '$name' does not exist") + } + .getOrElse { e -> throw IOException("Failed to delete variable '$name'", e) } + } + + /** + * Deletes all context variables. + * + * @throws IOException if the variable could not be deleted + */ + override fun deleteAll() { + runCatching { + client() + .kvClient + .delete( + ByteSequence.from(ByteString.copyFromUtf8("*")), + DeleteOption.builder() + .isPrefix(true) + .withRange(OptionsUtil.prefixEndOf(ByteSequence.from(byteArrayOf()))) + .build(), + ) + .get() + } + .getOrElse { e -> throw IOException("Failed to delete all variables", e) } + } + + /** + * Retrieves all variables. + * + * @return list of all context variables + * @throws IOException if retrieval fails + */ + override fun getAll(): List = + runCatching { + client() + .kvClient + .get( + ByteSequence.from(ByteString.copyFromUtf8("*")), + GetOption.builder() + .isPrefix(true) + .withRange(OptionsUtil.prefixEndOf(ByteSequence.from(byteArrayOf()))) + .build(), + ) + .get() + .kvs + .map { ContextVariable(it.key.toString(), fromBytes(it.value.bytes)) } + } + .getOrElse { e -> throw IOException("Failed to retrieve all variables", e) } + + /** Closes the context and underlying Etcd connection. */ + override fun close() { + asyncConn.close() + } + + /** + * Blocks until the initial connection is established or the timeout expires. + * + * @param timeoutMs timeout in milliseconds + * @return true if the connection was successfully established, false if timed out + */ + fun awaitInitialConnection(timeoutMs: Long = 5000): Boolean = + try { + asyncConn.getClientFuture().get(timeoutMs, TimeUnit.MILLISECONDS) + true + } catch (_: Exception) { + false + } +} diff --git a/src/main/java/at/ac/uibk/dps/cirrina/execution/object/context/InMemoryContext.kt b/src/main/java/at/ac/uibk/dps/cirrina/execution/object/context/InMemoryContext.kt index ef4532f..455dbd7 100644 --- a/src/main/java/at/ac/uibk/dps/cirrina/execution/object/context/InMemoryContext.kt +++ b/src/main/java/at/ac/uibk/dps/cirrina/execution/object/context/InMemoryContext.kt @@ -25,20 +25,23 @@ open class InMemoryContext(isLocal: Boolean) : Context(isLocal) { /** * Creates a new context variable. * - * The byte size is only returned for binary (byte array) data, and is 0 otherwise. + * The byte size is only returned for binary (byte array) data and is 0 otherwise. * * @param name name of the context variable * @param value value of the context variable * @return byte size of stored data * @throws IOException if a variable with the same name already exists */ - override fun create(name: String, value: Any?): Int { - if (values.containsKey(name)) { - throw IOException("Variable '$name' already exists") - } - values[name] = value - return if (value is ByteArray) value.size else 0 - } + override fun create(name: String, value: Any?): Int = + runCatching { + values + .compute(name) { _, old -> + if (old != null) throw IOException("Variable '$name' already exists") + value + } + ?.let { if (it is ByteArray) it.size else 0 } ?: if (value is ByteArray) value.size else 0 + } + .getOrElse { e -> throw IOException("Failed to create variable '$name'", e) } /** * Assigns a new value to an existing context variable. @@ -50,13 +53,13 @@ open class InMemoryContext(isLocal: Boolean) : Context(isLocal) { * @return byte size of stored data * @throws IOException if the variable does not exist */ - override fun assign(name: String, value: Any?): Int { - if (!values.containsKey(name)) { - throw IOException("Variable '$name' does not exist") - } - values[name] = value - return if (value is ByteArray) value.size else 0 - } + override fun assign(name: String, value: Any?): Int = + runCatching { + if (!values.containsKey(name)) throw IOException("Variable '$name' does not exist") + values[name] = value + if (value is ByteArray) value.size else 0 + } + .getOrElse { e -> throw IOException("Failed to assign variable '$name'", e) } /** * Deletes a context variable. @@ -64,13 +67,22 @@ open class InMemoryContext(isLocal: Boolean) : Context(isLocal) { * @param name name of the context variable * @throws IOException if the variable does not exist */ - override fun delete(name: String) { - if (values.remove(name) == null) { - throw IOException("Variable '$name' does not exist") - } + override fun delete(name: String) = + runCatching { + if (values.remove(name) == null) throw IOException("Variable '$name' does not exist") + } + .getOrElse { e -> throw IOException("Failed to delete variable '$name'", e) } + + /** Deletes all context variables. */ + override fun deleteAll() { + values.clear() } - /** Returns all context variables. */ + /** + * Retrieves all variables. + * + * @return list of all context variables + */ override fun getAll(): List = values.map { (key, value) -> ContextVariable(key, value) } diff --git a/src/main/java/at/ac/uibk/dps/cirrina/execution/object/context/NatsContext.kt b/src/main/java/at/ac/uibk/dps/cirrina/execution/object/context/NatsContext.kt deleted file mode 100644 index 7c00115..0000000 --- a/src/main/java/at/ac/uibk/dps/cirrina/execution/object/context/NatsContext.kt +++ /dev/null @@ -1,269 +0,0 @@ -package at.ac.uibk.dps.cirrina.execution.`object`.context - -import at.ac.uibk.dps.cirrina.execution.`object`.exchange.ValueExchange -import com.google.common.flogger.FluentLogger -import io.nats.client.* -import io.nats.client.api.KeyValueConfiguration -import io.nats.client.api.KeyValueEntry -import io.nats.client.api.KeyValueOperation -import io.nats.client.api.KeyValueWatcher -import java.io.IOException -import java.util.concurrent.CountDownLatch - -private val logger: FluentLogger = FluentLogger.forEnclosingClass() - -/** - * A persistent context stored in a NATS key-value bucket. - * - * Bucket is expected to exist already, but if missing, it is created. - * - * @param isLocal true if this context is local, false otherwise - * @param natsUrl NATS server URL - * @param bucketName name of the key-value bucket - * @param deleteBucket if true, the bucket will be deleted upon closing the context - */ -class NatsContext( - isLocal: Boolean, - private val natsUrl: String, - private val bucketName: String, - private val deleteBucket: Boolean = false, -) : Context(isLocal), AutoCloseable, KeyValueWatcher { - // NATS connection can be null if not connected. - private var connection: Connection? = null - - // NATs key-value can be null if not connected. - private var keyValue: KeyValue? = null - - // Known keys, used to check if a key has been created. - private var knownKeys: Set = emptySet() - - // Lock for the connection, key-value, and known keys. - private val lock = Any() - - // Latch for initial connection. - private val connectedLatch = CountDownLatch(1) - - init { - val options = - Options.Builder() - .server(natsUrl) - .connectionListener( - object : ConnectionListener { - override fun connectionEvent(conn: Connection, type: ConnectionListener.Events?) { - when (type) { - ConnectionListener.Events.CONNECTED, - ConnectionListener.Events.RECONNECTED -> { - logger.atFiner().log("Received NATS connected event") - synchronized(lock) { - try { - keyValue = getOrCreateBucket(conn).apply { watchAll(this@NatsContext) } - knownKeys = keyValue!!.keys().toSet() - connection = conn - } catch (e: Exception) { - logger - .atSevere() - .withCause(e) - .log("Failed to setup the persistent context bucket") - } finally { - connectedLatch.countDown() - } - } - logger.atFine().log("Connected to the NATS server") - } - - ConnectionListener.Events.DISCONNECTED -> { - logger.atFiner().log("Received NATS disconnected event") - synchronized(lock) { - connection = null - keyValue = null - } - logger.atFine().log("Disconnected from the NATS server") - } - - else -> {} - } - } - } - ) - .build() - - try { - Nats.connectAsynchronously(options, true) - } catch (e: InterruptedException) { - Thread.currentThread().interrupt() - logger - .atSevere() - .withCause(e) - .log("Interrupted while initiating NATS connection, will never connect") - } - } - - // Creates a bucket if it does not exist and returns the created bucket. - private fun getOrCreateBucket(conn: Connection): KeyValue = - runCatching { - conn.keyValueManagement().apply { - if (!bucketNames.contains(bucketName)) { - logger.atInfo().log("Persistent bucket '$bucketName' does not exist, creating it") - create(KeyValueConfiguration.Builder().name(bucketName).build()) - } - } - conn.keyValue(bucketName) - } - .getOrElse { e -> - throw IOException( - "Failed to create or retrieve bucket '$bucketName', is JetStream enabled?", - e, - ) - } - - private fun toBytes(value: Any?): ByteArray = ValueExchange(value).toBytes() - - private fun fromBytes(bytes: ByteArray): Any = ValueExchange.fromBytes(bytes).value - - /** - * Retrieve a context variable. - * - * @param name name of the context variable - * @return the retrieved context variable - * @throws IOException if the variable does not exist - * @throws IOException if the variable could not be retrieved - */ - override fun get(name: String): Any = - synchronized(lock) { - runCatching { - val kv = keyValue ?: throw IOException("Not connected to the NATS server") - if (!knownKeys.contains(name)) { - throw IOException("A variable with the name '$name' does not exist") - } - - val entry = kv.get(name) - fromBytes(entry.value) - } - .getOrElse { e -> throw IOException("Failed to retrieve variable '$name'", e) } - } - - /** - * Creates a new context variable. - * - * @param name name of the context variable - * @param value value of the context variable - * @return byte size of stored data - * @throws IOException if a variable with the same name already exists - * @throws IOException if the variable could not be created - */ - override fun create(name: String, value: Any?): Int = - synchronized(lock) { - runCatching { - val kv = keyValue ?: throw IOException("Not connected to the NATS server") - if (knownKeys.contains(name)) { - throw IOException("A variable with the name '$name' already exists") - } - - val data = toBytes(value) - kv.create(name, data) - data.size - } - .getOrElse { e -> throw IOException("Failed to create variable '$name'", e) } - } - - /** - * Assigns a new value to an existing context variable. - * - * @param name name of the context variable - * @param value new value of the context variable - * @return byte size of stored data - * @throws IOException if the variable does not exist - * @throws IOException if the variable could not be assigned - */ - override fun assign(name: String, value: Any?): Int = - synchronized(lock) { - runCatching { - val kv = keyValue ?: throw IOException("Not connected to the NATS server") - if (!knownKeys.contains(name)) { - throw IOException("A variable with the name '$name' does not exist") - } - - val data = toBytes(value) - kv.put(name, data) - data.size - } - .getOrElse { e -> throw IOException("Failed to assign variable '$name'", e) } - } - - /** - * Deletes a context variable. - * - * @param name name of the context variable - * @throws IOException if the variable does not exist - * @throws IOException if the variable could not be deleted - */ - override fun delete(name: String) { - synchronized(lock) { - runCatching { - val kv = keyValue ?: throw IOException("Not connected to the NATS server") - if (!knownKeys.contains(name)) { - throw IOException("A variable with the name '$name' does not exist") - } - - kv.delete(name) - } - .onFailure { e -> throw IOException("Failed to delete variable '$name'", e) } - } - } - - /** Returns all context variables. */ - override fun getAll(): List = - synchronized(lock) { - runCatching { - val kv = keyValue ?: throw IOException("Not connected to the NATS server") - knownKeys.map { key -> - val entry = kv.get(key) - ContextVariable(entry.key, entry.value) - } - } - .getOrElse { e -> throw IOException("Failed to retrieve variables from context", e) } - } - - override fun close() { - synchronized(lock) { - runCatching { - connection?.let { conn -> - if (deleteBucket) { - conn.keyValueManagement().delete(bucketName) - } - conn.close() - } - } - .onFailure { _ -> logger.atWarning().log("Failed to close the NATS") } - } - } - - /** - * Waits until the initial connection is established or the timeout expires. - * - * @param timeoutMs maximum time to wait in milliseconds - * @return true if connection was established, false otherwise - */ - fun awaitInitialConnection(timeoutMs: Long = 5000): Boolean = - connectedLatch.await(timeoutMs, java.util.concurrent.TimeUnit.MILLISECONDS) - - /** - * Watches for changes to the context. - * - * @param entry the KeyValueEntry that has been changed - */ - override fun watch(entry: KeyValueEntry?) { - synchronized(lock) { - when (entry?.operation) { - KeyValueOperation.PUT -> knownKeys += entry.key - KeyValueOperation.DELETE, - KeyValueOperation.PURGE -> knownKeys -= entry.key - - else -> {} - } - } - } - - /** Not used. */ - override fun endOfData() {} -} diff --git a/src/test/java/at/ac/uibk/dps/cirrina/execution/object/context/ContextTest.kt b/src/test/java/at/ac/uibk/dps/cirrina/execution/object/context/ContextTest.kt index b670404..ebba9a1 100644 --- a/src/test/java/at/ac/uibk/dps/cirrina/execution/object/context/ContextTest.kt +++ b/src/test/java/at/ac/uibk/dps/cirrina/execution/object/context/ContextTest.kt @@ -14,6 +14,8 @@ abstract class ContextTest { @Throws(Exception::class) fun testOperations() { createContext().use { context -> + assertDoesNotThrow { context.deleteAll() } + // Create a variable assertDoesNotThrow { context.create("testVar", 42) } @@ -37,11 +39,16 @@ abstract class ContextTest { // Delete the variable assertDoesNotThrow { context.delete("testVar") } - assertThrows { context.delete("nonExistentVar") } + + // Deleting it again should fail + assertThrows { context.delete("testVar") } // It should not exist anymore assertThrows { context.get("testVar") } + // Assigning should fail + assertThrows { context.assign("testVar", 42) } + // Get all variables assertDoesNotThrow { context.create("var1", 1) @@ -57,6 +64,8 @@ abstract class ContextTest { @Throws(Exception::class) fun testMultiThreadedCreateGet() { createContext().use { context -> + assertDoesNotThrow { context.deleteAll() } + val threadCount = 10 val iterationsPerThread = 100 @@ -87,6 +96,8 @@ abstract class ContextTest { @Throws(Exception::class) fun testMultiThreadedSetValueGetValue() { createContext().use { context -> + assertDoesNotThrow { context.deleteAll() } + val threadCount = 10 val iterationsPerThread = 100 diff --git a/src/test/java/at/ac/uibk/dps/cirrina/execution/object/context/NatsContextTest.kt b/src/test/java/at/ac/uibk/dps/cirrina/execution/object/context/EtcdContextTest.kt similarity index 51% rename from src/test/java/at/ac/uibk/dps/cirrina/execution/object/context/NatsContextTest.kt rename to src/test/java/at/ac/uibk/dps/cirrina/execution/object/context/EtcdContextTest.kt index a47b30f..7793aa0 100644 --- a/src/test/java/at/ac/uibk/dps/cirrina/execution/object/context/NatsContextTest.kt +++ b/src/test/java/at/ac/uibk/dps/cirrina/execution/object/context/EtcdContextTest.kt @@ -3,14 +3,14 @@ package at.ac.uibk.dps.cirrina.execution.`object`.context import org.junit.jupiter.api.Assumptions.assumeFalse import org.junit.jupiter.api.assertDoesNotThrow -class NatsContextTest : ContextTest() { +class EtcdContextTest : ContextTest() { override fun createContext(): Context { - val natsServerURL = System.getenv("NATS_SERVER_URL") + val etcdServerURL = System.getenv("ETCD_CONTEXT_URL") - assumeFalse(natsServerURL == null, "Skipping NATS persistent context test") + assumeFalse(etcdServerURL == null, "Skipping Etcd persistent context test") return assertDoesNotThrow { - NatsContext(true, natsServerURL, "test", true).apply { awaitInitialConnection() } + EtcdContext(true, listOf(etcdServerURL)).apply { awaitInitialConnection() } } } } diff --git a/src/test/java/at/ac/uibk/dps/cirrina/execution/object/event/NatsEventHandlerTest.kt b/src/test/java/at/ac/uibk/dps/cirrina/execution/object/event/NatsEventHandlerTest.kt index 0964457..1f90ab6 100644 --- a/src/test/java/at/ac/uibk/dps/cirrina/execution/object/event/NatsEventHandlerTest.kt +++ b/src/test/java/at/ac/uibk/dps/cirrina/execution/object/event/NatsEventHandlerTest.kt @@ -16,7 +16,7 @@ internal class NatsEventHandlerTest { @ParameterizedTest @EnumSource(Csml.EventChannel::class) fun testNatsEventHandlerSendReceive(channel: Csml.EventChannel) { - val natsServerURL = System.getenv("NATS_SERVER_URL") + val natsServerURL = System.getenv("NATS_EVENT_URL") assumeTrue(natsServerURL != null, "Skipping NATS event handler test") // Test should finish in 5 seconds