diff --git a/src/main/java/at/ac/uibk/dps/cirrina/execution/object/context/NatsContext.kt b/src/main/java/at/ac/uibk/dps/cirrina/execution/object/context/NatsContext.kt index 9bb65ee..8dde274 100644 --- a/src/main/java/at/ac/uibk/dps/cirrina/execution/object/context/NatsContext.kt +++ b/src/main/java/at/ac/uibk/dps/cirrina/execution/object/context/NatsContext.kt @@ -4,6 +4,9 @@ import at.ac.uibk.dps.cirrina.execution.`object`.exchange.ValueExchange import com.google.common.flogger.FluentLogger import io.nats.client.* import io.nats.client.api.KeyValueConfiguration +import io.nats.client.api.KeyValueEntry +import io.nats.client.api.KeyValueOperation +import io.nats.client.api.KeyValueWatcher import java.io.IOException import java.util.concurrent.CountDownLatch @@ -24,14 +27,17 @@ class NatsContext( private val natsUrl: String, private val bucketName: String, private val deleteBucket: Boolean = false, -) : Context(isLocal), AutoCloseable { +) : Context(isLocal), AutoCloseable, KeyValueWatcher { // NATS connection can be null if not connected. private var connection: Connection? = null // NATs key-value can be null if not connected. private var keyValue: KeyValue? = null - // Lock for the connection and key-value. + // Known keys, used to check if a key has been created. + private var knownKeys: Set = emptySet() + + // Lock for the connection, key-value, and known keys. private val lock = Any() // Latch for initial connection. @@ -50,7 +56,8 @@ class NatsContext( logger.atFiner().log("Received NATS connected event") synchronized(lock) { try { - keyValue = getOrCreateBucket(conn) + keyValue = getOrCreateBucket(conn).apply { watchAll(this@NatsContext) } + knownKeys = keyValue!!.keys().toSet() connection = conn } catch (e: Exception) { logger @@ -125,7 +132,7 @@ class NatsContext( synchronized(lock) { runCatching { val kv = keyValue ?: throw IOException("Not connected to the NATS server") - if (!kv.keys().contains(name)) { + if (!knownKeys.contains(name)) { throw IOException("A variable with the name '$name' does not exist") } @@ -148,7 +155,7 @@ class NatsContext( synchronized(lock) { runCatching { val kv = keyValue ?: throw IOException("Not connected to the NATS server") - if (kv.keys().contains(name)) { + if (knownKeys.contains(name)) { throw IOException("A variable with the name '$name' already exists") } @@ -172,7 +179,7 @@ class NatsContext( synchronized(lock) { runCatching { val kv = keyValue ?: throw IOException("Not connected to the NATS server") - if (!kv.keys().contains(name)) { + if (!knownKeys.contains(name)) { throw IOException("A variable with the name '$name' does not exist") } @@ -194,7 +201,7 @@ class NatsContext( synchronized(lock) { runCatching { val kv = keyValue ?: throw IOException("Not connected to the NATS server") - if (!kv.keys().contains(name)) { + if (!knownKeys.contains(name)) { throw IOException("A variable with the name '$name' does not exist") } @@ -209,7 +216,7 @@ class NatsContext( synchronized(lock) { runCatching { val kv = keyValue ?: throw IOException("Not connected to the NATS server") - kv.keys().map { key -> + knownKeys.map { key -> val entry = kv.get(key) ContextVariable(entry.key, entry.value) } @@ -239,4 +246,24 @@ class NatsContext( */ fun awaitInitialConnection(timeoutMs: Long = 5000): Boolean = connectedLatch.await(timeoutMs, java.util.concurrent.TimeUnit.MILLISECONDS) + + /** + * Watches for changes to the context. + * + * @param entry the KeyValueEntry that has been changed + */ + override fun watch(entry: KeyValueEntry?) { + synchronized(lock) { + when (entry?.operation) { + KeyValueOperation.PUT -> knownKeys += entry.key + KeyValueOperation.DELETE, + KeyValueOperation.PURGE -> knownKeys -= entry.key + + else -> {} + } + } + } + + /** Not used. */ + override fun endOfData() {} }