Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ import at.ac.uibk.dps.cirrina.execution.`object`.exchange.ValueExchange
import com.google.common.flogger.FluentLogger
import io.nats.client.*
import io.nats.client.api.KeyValueConfiguration
import io.nats.client.api.KeyValueEntry
import io.nats.client.api.KeyValueOperation
import io.nats.client.api.KeyValueWatcher
import java.io.IOException
import java.util.concurrent.CountDownLatch

Expand All @@ -24,14 +27,17 @@ class NatsContext(
private val natsUrl: String,
private val bucketName: String,
private val deleteBucket: Boolean = false,
) : Context(isLocal), AutoCloseable {
) : Context(isLocal), AutoCloseable, KeyValueWatcher {
// NATS connection can be null if not connected.
private var connection: Connection? = null

// NATs key-value can be null if not connected.
private var keyValue: KeyValue? = null

// Lock for the connection and key-value.
// Known keys, used to check if a key has been created.
private var knownKeys: Set<String> = emptySet()

// Lock for the connection, key-value, and known keys.
private val lock = Any()

// Latch for initial connection.
Expand All @@ -50,7 +56,8 @@ class NatsContext(
logger.atFiner().log("Received NATS connected event")
synchronized(lock) {
try {
keyValue = getOrCreateBucket(conn)
keyValue = getOrCreateBucket(conn).apply { watchAll(this@NatsContext) }
knownKeys = keyValue!!.keys().toSet()
connection = conn
} catch (e: Exception) {
logger
Expand Down Expand Up @@ -125,7 +132,7 @@ class NatsContext(
synchronized(lock) {
runCatching {
val kv = keyValue ?: throw IOException("Not connected to the NATS server")
if (!kv.keys().contains(name)) {
if (!knownKeys.contains(name)) {
throw IOException("A variable with the name '$name' does not exist")
}

Expand All @@ -148,7 +155,7 @@ class NatsContext(
synchronized(lock) {
runCatching {
val kv = keyValue ?: throw IOException("Not connected to the NATS server")
if (kv.keys().contains(name)) {
if (knownKeys.contains(name)) {
throw IOException("A variable with the name '$name' already exists")
}

Expand All @@ -172,7 +179,7 @@ class NatsContext(
synchronized(lock) {
runCatching {
val kv = keyValue ?: throw IOException("Not connected to the NATS server")
if (!kv.keys().contains(name)) {
if (!knownKeys.contains(name)) {
throw IOException("A variable with the name '$name' does not exist")
}

Expand All @@ -194,7 +201,7 @@ class NatsContext(
synchronized(lock) {
runCatching {
val kv = keyValue ?: throw IOException("Not connected to the NATS server")
if (!kv.keys().contains(name)) {
if (!knownKeys.contains(name)) {
throw IOException("A variable with the name '$name' does not exist")
}

Expand All @@ -209,7 +216,7 @@ class NatsContext(
synchronized(lock) {
runCatching {
val kv = keyValue ?: throw IOException("Not connected to the NATS server")
kv.keys().map { key ->
knownKeys.map { key ->
val entry = kv.get(key)
ContextVariable(entry.key, entry.value)
}
Expand Down Expand Up @@ -239,4 +246,24 @@ class NatsContext(
*/
fun awaitInitialConnection(timeoutMs: Long = 5000): Boolean =
connectedLatch.await(timeoutMs, java.util.concurrent.TimeUnit.MILLISECONDS)

/**
* Watches for changes to the context.
*
* @param entry the KeyValueEntry that has been changed
*/
override fun watch(entry: KeyValueEntry?) {
synchronized(lock) {
when (entry?.operation) {
KeyValueOperation.PUT -> knownKeys += entry.key
KeyValueOperation.DELETE,
KeyValueOperation.PURGE -> knownKeys -= entry.key

else -> {}
}
}
}

/** Not used. */
override fun endOfData() {}
}