Skip to content

Commit aeaf45b

Browse files
authored
Merge pull request #586 from modelix/fix-value-changed-notification
Fix value changed notification
2 parents 5d85602 + df9c53f commit aeaf45b

File tree

3 files changed

+156
-22
lines changed

3 files changed

+156
-22
lines changed

model-server/src/main/kotlin/org/modelix/model/server/store/IgniteStoreClient.kt

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import mu.KotlinLogging
1818
import org.apache.ignite.Ignite
1919
import org.apache.ignite.IgniteCache
2020
import org.apache.ignite.Ignition
21+
import org.modelix.kotlin.utils.ContextValue
2122
import org.modelix.model.IKeyListener
2223
import org.modelix.model.persistent.HashUtil
2324
import java.io.File
@@ -29,6 +30,7 @@ import java.util.stream.Collectors
2930
private val LOG = KotlinLogging.logger { }
3031

3132
class IgniteStoreClient(jdbcConfFile: File? = null, inmemory: Boolean = false) : IStoreClient, AutoCloseable {
33+
3234
private val ENTRY_CHANGED_TOPIC = "entryChanged"
3335
private lateinit var ignite: Ignite
3436
private val cache: IgniteCache<String, String?>
@@ -38,7 +40,7 @@ class IgniteStoreClient(jdbcConfFile: File? = null, inmemory: Boolean = false) :
3840
}
3941

4042
/**
41-
* Istantiate an IgniteStoreClient
43+
* Instantiate an IgniteStoreClient
4244
*
4345
* @param jdbcConfFile adopt the configuration specified. If it is not specified, configuration
4446
* from ignite.xml is used
@@ -137,10 +139,11 @@ class IgniteStoreClient(jdbcConfFile: File? = null, inmemory: Boolean = false) :
137139
val transactions = ignite.transactions()
138140
if (transactions.tx() == null) {
139141
transactions.txStart().use { tx ->
140-
val result = body()
141-
tx.commit()
142-
pendingChangeMessages.flushChangeMessages()
143-
return result
142+
return pendingChangeMessages.runAndFlush {
143+
val result = body()
144+
tx.commit()
145+
result
146+
}
144147
}
145148
} else {
146149
// already in a transaction
@@ -158,19 +161,20 @@ class IgniteStoreClient(jdbcConfFile: File? = null, inmemory: Boolean = false) :
158161
}
159162

160163
class PendingChangeMessages(private val notifier: (String) -> Unit) {
161-
private val pendingChangeMessages = Collections.synchronizedSet(HashSet<String>())
162-
163-
@Synchronized
164-
fun flushChangeMessages() {
165-
for (pendingChangeMessage in pendingChangeMessages) {
166-
notifier(pendingChangeMessage)
164+
private val pendingChangeMessages = ContextValue<MutableSet<String>>()
165+
166+
fun <R> runAndFlush(body: () -> R): R {
167+
val messages = HashSet<String>()
168+
return pendingChangeMessages.computeWith(messages) {
169+
val result = body()
170+
messages.forEach { notifier(it) }
171+
result
167172
}
168-
pendingChangeMessages.clear()
169173
}
170174

171-
@Synchronized
172175
fun entryChanged(key: String) {
173-
pendingChangeMessages += key
176+
val messages = checkNotNull(pendingChangeMessages.getValueOrNull()) { "Only allowed inside PendingChangeMessages.runAndFlush" }
177+
messages.add(key)
174178
}
175179
}
176180

model-server/src/main/kotlin/org/modelix/model/server/store/InMemoryStoreClient.kt

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -100,14 +100,15 @@ class InMemoryStoreClient : IStoreClient {
100100
@Synchronized
101101
override fun <T> runTransaction(body: () -> T): T {
102102
if (transactionValues == null) {
103-
try {
104-
transactionValues = HashMap()
105-
val result = body()
106-
values.putAll(transactionValues!!)
107-
return result
108-
} finally {
109-
transactionValues = null
110-
pendingChangeMessages.flushChangeMessages()
103+
return pendingChangeMessages.runAndFlush {
104+
try {
105+
transactionValues = HashMap()
106+
val result = body()
107+
values.putAll(transactionValues!!)
108+
result
109+
} finally {
110+
transactionValues = null
111+
}
111112
}
112113
} else {
113114
return body()
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
/*
2+
* Copyright (c) 2023.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.modelix.model.server
18+
19+
import kotlinx.coroutines.Dispatchers
20+
import kotlinx.coroutines.launch
21+
import kotlinx.coroutines.test.runTest
22+
import org.modelix.model.IKeyListener
23+
import org.modelix.model.server.store.IStoreClient
24+
import org.modelix.model.server.store.IgniteStoreClient
25+
import org.modelix.model.server.store.InMemoryStoreClient
26+
import org.slf4j.LoggerFactory
27+
import java.util.concurrent.CompletableFuture
28+
import java.util.concurrent.TimeUnit
29+
import kotlin.test.AfterTest
30+
import kotlin.test.Ignore
31+
import kotlin.test.Test
32+
import kotlin.test.assertEquals
33+
34+
@Ignore("Doesn't support parallel transactions (yet)")
35+
class MabBasedStoreClientParallelTransactionsTest : StoreClientParallelTransactionsTest(InMemoryStoreClient())
36+
37+
class IgniteStoreClientParallelTransactionsTest : StoreClientParallelTransactionsTest(IgniteStoreClient(inmemory = true))
38+
39+
abstract class StoreClientParallelTransactionsTest(val store: IStoreClient) {
40+
41+
@AfterTest
42+
fun cleanup() {
43+
store.close()
44+
}
45+
46+
@Test
47+
fun `notifications are not lost because of parallel transactions`() = runTest {
48+
val threadBlocker = ThreadBlocker()
49+
val notifiedValueFuture = CompletableFuture<String>()
50+
51+
store.listen(
52+
"key2",
53+
object : IKeyListener {
54+
override fun changed(key: String, value: String?) {
55+
notifiedValueFuture.complete(store[key]!!)
56+
}
57+
},
58+
)
59+
60+
launch(Dispatchers.IO) {
61+
store.runTransaction {
62+
store.put("key1", "valueA")
63+
threadBlocker.reachPointInTime(1)
64+
threadBlocker.sleepUntilPointInTime(2)
65+
// The bug was originally caused here.
66+
// After the first transaction finished,
67+
// it tried nothing about pending changes from the second transaction
68+
// that was not finished yet.
69+
}
70+
threadBlocker.reachPointInTime(3)
71+
}
72+
73+
launch(Dispatchers.IO) {
74+
store.runTransaction {
75+
threadBlocker.sleepUntilPointInTime(1)
76+
store.put("key2", "valueB")
77+
threadBlocker.reachPointInTime(2)
78+
threadBlocker.sleepUntilPointInTime(3)
79+
}
80+
}
81+
82+
threadBlocker.sleepUntilPointInTime(3)
83+
val notifiedValue = notifiedValueFuture.get(10, TimeUnit.SECONDS)
84+
assertEquals("valueB", notifiedValue)
85+
}
86+
}
87+
88+
class ThreadBlocker {
89+
90+
companion object {
91+
private val LOG = LoggerFactory.getLogger(ThreadBlocker::class.java)
92+
}
93+
94+
private var reachedPointInTime = 0
95+
96+
@Synchronized
97+
fun reachPointInTime(pointInTime: Int) {
98+
reachedPointInTime = pointInTime
99+
LOG.debug("Reached point in time {}", reachedPointInTime)
100+
}
101+
102+
fun sleepUntilPointInTime(pointInTime: Int) {
103+
LOG.debug("Waiting for point in time {}", pointInTime)
104+
sleepUntil {
105+
reachedPointInTime >= pointInTime
106+
}
107+
}
108+
}
109+
110+
fun sleepUntil(
111+
checkIntervalMilliseconds: Long = 10,
112+
timeoutMilliseconds: Long = 1000,
113+
condition: () -> Boolean,
114+
) {
115+
check(checkIntervalMilliseconds > 0) {
116+
"checkIntervalMilliseconds must be positive."
117+
}
118+
check(timeoutMilliseconds > 0) {
119+
"timeoutMilliseconds must be positive."
120+
}
121+
var remainingDelays = timeoutMilliseconds / checkIntervalMilliseconds
122+
while (!condition()) {
123+
if (remainingDelays == 0L) {
124+
error("Waited too long.")
125+
}
126+
Thread.sleep(checkIntervalMilliseconds)
127+
remainingDelays--
128+
}
129+
}

0 commit comments

Comments
 (0)