Skip to content

Commit 77a9a5b

Browse files
author
Oleksandr Dzhychko
committed
fix(model-server): fix lost notifications because of parallel transactions
On finished transaction tried notifying about changes from another unfinished transaction. Because the new value was not yet committed, the notification was lost. Also, the notification was tried too early.
1 parent 5fc4f9c commit 77a9a5b

File tree

2 files changed

+144
-10
lines changed

2 files changed

+144
-10
lines changed

model-server/src/main/kotlin/org/modelix/model/server/store/IgniteStoreClient.kt

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -29,16 +29,17 @@ import java.util.stream.Collectors
2929
private val LOG = KotlinLogging.logger { }
3030

3131
class IgniteStoreClient(jdbcConfFile: File? = null, inmemory: Boolean = false) : IStoreClient, AutoCloseable {
32+
companion object {
33+
private val threadLocalPendingChangeMessages: ThreadLocal<PendingChangeMessages> = ThreadLocal()
34+
}
35+
3236
private val ENTRY_CHANGED_TOPIC = "entryChanged"
33-
private lateinit var ignite: Ignite
37+
private var ignite: Ignite
3438
private val cache: IgniteCache<String, String?>
3539
private val changeNotifier = ChangeNotifier(this)
36-
private val pendingChangeMessages = PendingChangeMessages {
37-
ignite.message().send(ENTRY_CHANGED_TOPIC, it)
38-
}
3940

4041
/**
41-
* Istantiate an IgniteStoreClient
42+
* Instantiate an IgniteStoreClient
4243
*
4344
* @param jdbcConfFile adopt the configuration specified. If it is not specified, configuration
4445
* from ignite.xml is used
@@ -112,7 +113,7 @@ class IgniteStoreClient(jdbcConfFile: File? = null, inmemory: Boolean = false) :
112113
if (!silent) {
113114
for (key in entries.keys) {
114115
if (HashUtil.isSha256(key)) continue
115-
pendingChangeMessages.entryChanged(key)
116+
threadLocalPendingChangeMessages.get().entryChanged(key)
116117
}
117118
}
118119
}
@@ -137,10 +138,18 @@ class IgniteStoreClient(jdbcConfFile: File? = null, inmemory: Boolean = false) :
137138
val transactions = ignite.transactions()
138139
if (transactions.tx() == null) {
139140
transactions.txStart().use { tx ->
140-
val result = body()
141-
tx.commit()
142-
pendingChangeMessages.flushChangeMessages()
143-
return result
141+
try {
142+
val pendingChangeMessages = PendingChangeMessages {
143+
ignite.message().send(ENTRY_CHANGED_TOPIC, it)
144+
}
145+
threadLocalPendingChangeMessages.set(pendingChangeMessages)
146+
val result = body()
147+
tx.commit()
148+
pendingChangeMessages.flushChangeMessages()
149+
return result
150+
} finally {
151+
threadLocalPendingChangeMessages.remove()
152+
}
144153
}
145154
} else {
146155
// already in a transaction
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
/*
2+
* Copyright (c) 2023.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.modelix.model.server
18+
19+
import kotlinx.coroutines.Dispatchers
20+
import kotlinx.coroutines.launch
21+
import kotlinx.coroutines.test.runTest
22+
import org.modelix.model.IKeyListener
23+
import org.modelix.model.server.store.IgniteStoreClient
24+
import org.slf4j.LoggerFactory
25+
import java.util.concurrent.CompletableFuture
26+
import java.util.concurrent.TimeUnit
27+
import kotlin.test.AfterTest
28+
import kotlin.test.Test
29+
import kotlin.test.assertEquals
30+
31+
class IgniteStoreClientParallelTransactionsTest {
32+
33+
// This test is currently not relevant for the InMemoryClient,
34+
// because the in memory client does not open multiple transactions in parallel.
35+
val store = IgniteStoreClient(inmemory = true)
36+
37+
@AfterTest
38+
fun cleanup() {
39+
store.close()
40+
}
41+
42+
@Test
43+
fun `notifications are not lost because of parallel transactions`() = runTest {
44+
val threadBlocker = ThreadBlocker()
45+
val notifiedValueFuture = CompletableFuture<String>()
46+
47+
store.listen(
48+
"key2",
49+
object : IKeyListener {
50+
override fun changed(key: String, value: String?) {
51+
notifiedValueFuture.complete(store[key]!!)
52+
}
53+
},
54+
)
55+
56+
launch(Dispatchers.IO) {
57+
store.runTransaction {
58+
store.put("key1", "valueA")
59+
threadBlocker.reachPointInTime(1)
60+
threadBlocker.sleepUntilPointInTime(2)
61+
// The bug was originally caused here.
62+
// After the first transaction finished,
63+
// it tried nothing about pending changes from the second transaction
64+
// that was not finished yet.
65+
}
66+
threadBlocker.reachPointInTime(3)
67+
}
68+
69+
launch(Dispatchers.IO) {
70+
store.runTransaction {
71+
threadBlocker.sleepUntilPointInTime(1)
72+
store.put("key2", "valueB")
73+
threadBlocker.reachPointInTime(2)
74+
threadBlocker.sleepUntilPointInTime(3)
75+
}
76+
}
77+
78+
threadBlocker.sleepUntilPointInTime(3)
79+
val notifiedValue = notifiedValueFuture.get(10, TimeUnit.SECONDS)
80+
assertEquals("valueB", notifiedValue)
81+
}
82+
}
83+
84+
class ThreadBlocker {
85+
86+
companion object {
87+
private val LOG = LoggerFactory.getLogger(ThreadBlocker::class.java)
88+
}
89+
90+
private var reachedPointInTime = 0
91+
92+
@Synchronized
93+
fun reachPointInTime(pointInTime: Int) {
94+
reachedPointInTime = pointInTime
95+
LOG.debug("Reached point in time {}", reachedPointInTime)
96+
}
97+
98+
fun sleepUntilPointInTime(pointInTime: Int) {
99+
LOG.debug("Waiting for point in time {}", pointInTime)
100+
sleepUntil {
101+
reachedPointInTime >= pointInTime
102+
}
103+
}
104+
}
105+
106+
fun sleepUntil(
107+
checkIntervalMilliseconds: Long = 10,
108+
timeoutMilliseconds: Long = 1000,
109+
condition: () -> Boolean,
110+
) {
111+
check(checkIntervalMilliseconds > 0) {
112+
"checkIntervalMilliseconds must be positive."
113+
}
114+
check(timeoutMilliseconds > 0) {
115+
"timeoutMilliseconds must be positive."
116+
}
117+
var remainingDelays = timeoutMilliseconds / checkIntervalMilliseconds
118+
while (!condition()) {
119+
if (remainingDelays == 0L) {
120+
error("Waited too long.")
121+
}
122+
Thread.sleep(checkIntervalMilliseconds)
123+
remainingDelays--
124+
}
125+
}

0 commit comments

Comments
 (0)