Skip to content

Commit 1c516bb

Browse files
committed
fix(model-server): transaction isolation in listeners
When listening to an entry in the store, a listener should only be notified after a transaction is committed and the notification should happen outside a transaction. A listener should not get notified about changes that where made during a failed transaction.
1 parent 94d28f8 commit 1c516bb

File tree

3 files changed

+154
-50
lines changed

3 files changed

+154
-50
lines changed

model-server/src/main/kotlin/org/modelix/model/server/store/IgniteStoreClient.kt

Lines changed: 90 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -14,23 +14,28 @@
1414
*/
1515
package org.modelix.model.server.store
1616

17-
import com.google.common.collect.MultimapBuilder
17+
import mu.KotlinLogging
1818
import org.apache.ignite.Ignite
1919
import org.apache.ignite.IgniteCache
2020
import org.apache.ignite.Ignition
2121
import org.modelix.model.IKeyListener
22+
import org.modelix.model.persistent.HashUtil
2223
import java.io.File
2324
import java.io.FileReader
2425
import java.io.IOException
2526
import java.util.*
26-
import java.util.concurrent.Executors
2727
import java.util.stream.Collectors
2828

29+
private val LOG = KotlinLogging.logger { }
30+
2931
class IgniteStoreClient(jdbcConfFile: File? = null, inmemory: Boolean = false) : IStoreClient, AutoCloseable {
30-
private val ignite: Ignite
32+
private val ENTRY_CHANGED_TOPIC = "entryChanged"
33+
private lateinit var ignite: Ignite
3134
private val cache: IgniteCache<String, String?>
32-
private val timer = Executors.newScheduledThreadPool(1)
33-
private val listeners = MultimapBuilder.hashKeys().hashSetValues().build<String, IKeyListener>()
35+
private val changeNotifier = ChangeNotifier(this)
36+
private val pendingChangeMessages = PendingChangeMessages {
37+
ignite.message().send(ENTRY_CHANGED_TOPIC, it)
38+
}
3439

3540
/**
3641
* Istantiate an IgniteStoreClient
@@ -68,6 +73,13 @@ class IgniteStoreClient(jdbcConfFile: File? = null, inmemory: Boolean = false) :
6873
// timer.scheduleAtFixedRate(() -> {
6974
// System.out.println("stats: " + cache.metrics());
7075
// }, 10, 10, TimeUnit.SECONDS);
76+
77+
ignite.message().localListen(ENTRY_CHANGED_TOPIC) { nodeId: UUID?, key: Any? ->
78+
if (key is String) {
79+
changeNotifier.notifyListeners(key)
80+
}
81+
true
82+
}
7183
}
7284

7385
override fun get(key: String): String? {
@@ -94,44 +106,27 @@ class IgniteStoreClient(jdbcConfFile: File? = null, inmemory: Boolean = false) :
94106
override fun putAll(entries: Map<String, String?>, silent: Boolean) {
95107
val deletes = entries.filterValues { it == null }
96108
val puts = entries.filterValues { it != null }
97-
if (deletes.isNotEmpty()) cache.removeAll(deletes.keys)
98-
if (puts.isNotEmpty()) cache.putAll(puts)
99-
if (!silent) {
100-
for ((key, value) in entries) {
101-
ignite.message().send(key, value ?: IKeyListener.NULL_VALUE)
109+
runTransaction {
110+
if (deletes.isNotEmpty()) cache.removeAll(deletes.keys)
111+
if (puts.isNotEmpty()) cache.putAll(puts)
112+
if (!silent) {
113+
for (key in entries.keys) {
114+
if (HashUtil.isSha256(key)) continue
115+
pendingChangeMessages.entryChanged(key)
116+
}
102117
}
103118
}
104119
}
105120

106121
override fun listen(key: String, listener: IKeyListener) {
107-
synchronized(listeners) {
108-
val wasSubscribed = listeners.containsKey(key)
109-
listeners.put(key, listener)
110-
if (!wasSubscribed) {
111-
ignite.message()
112-
.localListen(
113-
key,
114-
) { nodeId: UUID?, value: Any? ->
115-
if (value is String) {
116-
synchronized(listeners) {
117-
for (l in listeners[key].toList()) {
118-
try {
119-
l.changed(key, if (value == IKeyListener.NULL_VALUE) null else value)
120-
} catch (ex: Exception) {
121-
println(ex.message)
122-
ex.printStackTrace()
123-
}
124-
}
125-
}
126-
}
127-
true
128-
}
129-
}
130-
}
122+
// Entries where the key is the SHA hash over the value are not expected to change and listening is unnecessary.
123+
require(!HashUtil.isSha256(key)) { "Listener for $key will never get notified." }
124+
125+
changeNotifier.addListener(key, listener)
131126
}
132127

133128
override fun removeListener(key: String, listener: IKeyListener) {
134-
synchronized(listeners) { listeners.remove(key, listener) }
129+
changeNotifier.removeListener(key, listener)
135130
}
136131

137132
override fun generateId(key: String): Long {
@@ -144,6 +139,7 @@ class IgniteStoreClient(jdbcConfFile: File? = null, inmemory: Boolean = false) :
144139
transactions.txStart().use { tx ->
145140
val result = body()
146141
tx.commit()
142+
pendingChangeMessages.flushChangeMessages()
147143
return result
148144
}
149145
} else {
@@ -160,3 +156,62 @@ class IgniteStoreClient(jdbcConfFile: File? = null, inmemory: Boolean = false) :
160156
dispose()
161157
}
162158
}
159+
160+
class PendingChangeMessages(private val notifier: (String) -> Unit) {
161+
private val pendingChangeMessages = Collections.synchronizedSet(HashSet<String>())
162+
163+
@Synchronized
164+
fun flushChangeMessages() {
165+
for (pendingChangeMessage in pendingChangeMessages) {
166+
notifier(pendingChangeMessage)
167+
}
168+
pendingChangeMessages.clear()
169+
}
170+
171+
@Synchronized
172+
fun entryChanged(key: String) {
173+
pendingChangeMessages += key
174+
}
175+
}
176+
177+
class ChangeNotifier(val store: IStoreClient) {
178+
private val changeNotifiers = HashMap<String, EntryChangeNotifier>()
179+
180+
@Synchronized
181+
fun notifyListeners(key: String) {
182+
changeNotifiers[key]?.notifyIfChanged()
183+
}
184+
185+
@Synchronized
186+
fun addListener(key: String, listener: IKeyListener) {
187+
changeNotifiers.getOrPut(key) { EntryChangeNotifier(key) }.listeners.add(listener)
188+
}
189+
190+
@Synchronized
191+
fun removeListener(key: String, listener: IKeyListener) {
192+
val notifier = changeNotifiers[key] ?: return
193+
notifier.listeners.remove(listener)
194+
if (notifier.listeners.isEmpty()) {
195+
changeNotifiers.remove(key)
196+
}
197+
}
198+
199+
private inner class EntryChangeNotifier(val key: String) {
200+
val listeners = HashSet<IKeyListener>()
201+
private var lastNotifiedValue: String? = null
202+
203+
fun notifyIfChanged() {
204+
val value = store.get(key)
205+
if (value == lastNotifiedValue) return
206+
lastNotifiedValue = value
207+
208+
for (listener in listeners) {
209+
try {
210+
listener.changed(key, value)
211+
} catch (ex: Exception) {
212+
LOG.error("Exception in listener of $key", ex)
213+
}
214+
}
215+
}
216+
}
217+
}

model-server/src/main/kotlin/org/modelix/model/server/store/InMemoryStoreClient.kt

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@ class InMemoryStoreClient : IStoreClient {
3838

3939
private val values: MutableMap<String, String?> = HashMap()
4040
private var transactionValues: MutableMap<String, String?>? = null
41-
private val listeners: MutableMap<String?, MutableSet<IKeyListener>> = HashMap()
41+
private val changeNotifier = ChangeNotifier(this)
42+
private val pendingChangeMessages = PendingChangeMessages(changeNotifier::notifyListeners)
4243

4344
@Synchronized
4445
override fun get(key: String): String? {
@@ -62,35 +63,31 @@ class InMemoryStoreClient : IStoreClient {
6263

6364
@Synchronized
6465
override fun put(key: String, value: String?, silent: Boolean) {
65-
(transactionValues ?: values)[key] = value
66-
if (!silent) {
67-
listeners[key]?.toList()?.forEach {
68-
try {
69-
it.changed(key, value)
70-
} catch (ex: Exception) {
71-
println(ex.message)
72-
ex.printStackTrace()
73-
LOG.error("Failed to notify listeners after put '$key' = '$value'", ex)
74-
}
66+
runTransaction {
67+
(transactionValues ?: values)[key] = value
68+
if (!silent) {
69+
pendingChangeMessages.entryChanged(key)
7570
}
7671
}
7772
}
7873

7974
@Synchronized
8075
override fun putAll(entries: Map<String, String?>, silent: Boolean) {
81-
for ((key, value) in entries) {
82-
put(key, value, silent)
76+
runTransaction {
77+
for ((key, value) in entries) {
78+
put(key, value, silent)
79+
}
8380
}
8481
}
8582

8683
@Synchronized
8784
override fun listen(key: String, listener: IKeyListener) {
88-
listeners.getOrPut(key) { LinkedHashSet() }.add(listener)
85+
changeNotifier.addListener(key, listener)
8986
}
9087

9188
@Synchronized
9289
override fun removeListener(key: String, listener: IKeyListener) {
93-
listeners[key]?.remove(listener)
90+
changeNotifier.removeListener(key, listener)
9491
}
9592

9693
@Synchronized
@@ -110,6 +107,7 @@ class InMemoryStoreClient : IStoreClient {
110107
return result
111108
} finally {
112109
transactionValues = null
110+
pendingChangeMessages.flushChangeMessages()
113111
}
114112
} else {
115113
return body()

model-server/src/test/kotlin/org/modelix/model/server/StoreClientTest.kt

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,14 @@
1616

1717
package org.modelix.model.server
1818

19+
import kotlinx.coroutines.coroutineScope
1920
import kotlinx.coroutines.launch
2021
import kotlinx.coroutines.test.runTest
22+
import org.modelix.model.IKeyListener
2123
import org.modelix.model.server.store.IStoreClient
2224
import org.modelix.model.server.store.IgniteStoreClient
2325
import org.modelix.model.server.store.InMemoryStoreClient
26+
import java.util.Collections
2427
import kotlin.random.Random
2528
import kotlin.test.AfterTest
2629
import kotlin.test.Test
@@ -92,4 +95,52 @@ abstract class StoreClientTest(val store: IStoreClient) {
9295
}
9396
assertEquals(value1, store.get(key)) // failed transaction should be rolled back
9497
}
98+
99+
@Test
100+
fun `listeners don't see incomplete transaction`() = runTest {
101+
val key = "nbmndsyr"
102+
val value1 = "a"
103+
val value2 = "b"
104+
val value3 = "c"
105+
106+
val valuesSeenByListener = Collections.synchronizedSet(HashSet<String?>())
107+
store.listen(
108+
key,
109+
object : IKeyListener {
110+
override fun changed(key: String, value: String?) {
111+
valuesSeenByListener += value
112+
valuesSeenByListener += store.get(key)
113+
}
114+
},
115+
)
116+
117+
store.put(key, value1)
118+
assertEquals(value1, store.get(key))
119+
120+
assertEquals(setOf<String?>(value1), valuesSeenByListener)
121+
valuesSeenByListener.clear()
122+
123+
coroutineScope {
124+
launch {
125+
assertFailsWith(NullPointerException::class) {
126+
store.runTransaction {
127+
assertEquals(value1, store.get(key))
128+
store.put(key, value2, silent = false)
129+
assertEquals(value2, store.get(key))
130+
throw NullPointerException()
131+
}
132+
}
133+
}
134+
135+
launch {
136+
store.runTransaction {
137+
assertEquals(value1, store.get(key))
138+
store.put(key, value3, silent = false)
139+
assertEquals(value3, store.get(key))
140+
}
141+
}
142+
}
143+
144+
assertEquals(setOf<String?>(value3), valuesSeenByListener)
145+
}
95146
}

0 commit comments

Comments
 (0)