Skip to content

Commit df9c53f

Browse files
committed
refactor(model-server): encapsulate ThreadLocal in PendingChangeMessages
1 parent 77a9a5b commit df9c53f

File tree

3 files changed

+35
-35
lines changed

3 files changed

+35
-35
lines changed

model-server/src/main/kotlin/org/modelix/model/server/store/IgniteStoreClient.kt

Lines changed: 18 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import mu.KotlinLogging
1818
import org.apache.ignite.Ignite
1919
import org.apache.ignite.IgniteCache
2020
import org.apache.ignite.Ignition
21+
import org.modelix.kotlin.utils.ContextValue
2122
import org.modelix.model.IKeyListener
2223
import org.modelix.model.persistent.HashUtil
2324
import java.io.File
@@ -29,14 +30,14 @@ import java.util.stream.Collectors
2930
private val LOG = KotlinLogging.logger { }
3031

3132
class IgniteStoreClient(jdbcConfFile: File? = null, inmemory: Boolean = false) : IStoreClient, AutoCloseable {
32-
companion object {
33-
private val threadLocalPendingChangeMessages: ThreadLocal<PendingChangeMessages> = ThreadLocal()
34-
}
3533

3634
private val ENTRY_CHANGED_TOPIC = "entryChanged"
37-
private var ignite: Ignite
35+
private lateinit var ignite: Ignite
3836
private val cache: IgniteCache<String, String?>
3937
private val changeNotifier = ChangeNotifier(this)
38+
private val pendingChangeMessages = PendingChangeMessages {
39+
ignite.message().send(ENTRY_CHANGED_TOPIC, it)
40+
}
4041

4142
/**
4243
* Instantiate an IgniteStoreClient
@@ -113,7 +114,7 @@ class IgniteStoreClient(jdbcConfFile: File? = null, inmemory: Boolean = false) :
113114
if (!silent) {
114115
for (key in entries.keys) {
115116
if (HashUtil.isSha256(key)) continue
116-
threadLocalPendingChangeMessages.get().entryChanged(key)
117+
pendingChangeMessages.entryChanged(key)
117118
}
118119
}
119120
}
@@ -138,17 +139,10 @@ class IgniteStoreClient(jdbcConfFile: File? = null, inmemory: Boolean = false) :
138139
val transactions = ignite.transactions()
139140
if (transactions.tx() == null) {
140141
transactions.txStart().use { tx ->
141-
try {
142-
val pendingChangeMessages = PendingChangeMessages {
143-
ignite.message().send(ENTRY_CHANGED_TOPIC, it)
144-
}
145-
threadLocalPendingChangeMessages.set(pendingChangeMessages)
142+
return pendingChangeMessages.runAndFlush {
146143
val result = body()
147144
tx.commit()
148-
pendingChangeMessages.flushChangeMessages()
149-
return result
150-
} finally {
151-
threadLocalPendingChangeMessages.remove()
145+
result
152146
}
153147
}
154148
} else {
@@ -167,19 +161,20 @@ class IgniteStoreClient(jdbcConfFile: File? = null, inmemory: Boolean = false) :
167161
}
168162

169163
class PendingChangeMessages(private val notifier: (String) -> Unit) {
170-
private val pendingChangeMessages = Collections.synchronizedSet(HashSet<String>())
171-
172-
@Synchronized
173-
fun flushChangeMessages() {
174-
for (pendingChangeMessage in pendingChangeMessages) {
175-
notifier(pendingChangeMessage)
164+
private val pendingChangeMessages = ContextValue<MutableSet<String>>()
165+
166+
fun <R> runAndFlush(body: () -> R): R {
167+
val messages = HashSet<String>()
168+
return pendingChangeMessages.computeWith(messages) {
169+
val result = body()
170+
messages.forEach { notifier(it) }
171+
result
176172
}
177-
pendingChangeMessages.clear()
178173
}
179174

180-
@Synchronized
181175
fun entryChanged(key: String) {
182-
pendingChangeMessages += key
176+
val messages = checkNotNull(pendingChangeMessages.getValueOrNull()) { "Only allowed inside PendingChangeMessages.runAndFlush" }
177+
messages.add(key)
183178
}
184179
}
185180

model-server/src/main/kotlin/org/modelix/model/server/store/InMemoryStoreClient.kt

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -100,14 +100,15 @@ class InMemoryStoreClient : IStoreClient {
100100
@Synchronized
101101
override fun <T> runTransaction(body: () -> T): T {
102102
if (transactionValues == null) {
103-
try {
104-
transactionValues = HashMap()
105-
val result = body()
106-
values.putAll(transactionValues!!)
107-
return result
108-
} finally {
109-
transactionValues = null
110-
pendingChangeMessages.flushChangeMessages()
103+
return pendingChangeMessages.runAndFlush {
104+
try {
105+
transactionValues = HashMap()
106+
val result = body()
107+
values.putAll(transactionValues!!)
108+
result
109+
} finally {
110+
transactionValues = null
111+
}
111112
}
112113
} else {
113114
return body()

model-server/src/test/kotlin/org/modelix/model/server/IgniteStoreClientParallelTransactionsTest.kt

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,19 +20,23 @@ import kotlinx.coroutines.Dispatchers
2020
import kotlinx.coroutines.launch
2121
import kotlinx.coroutines.test.runTest
2222
import org.modelix.model.IKeyListener
23+
import org.modelix.model.server.store.IStoreClient
2324
import org.modelix.model.server.store.IgniteStoreClient
25+
import org.modelix.model.server.store.InMemoryStoreClient
2426
import org.slf4j.LoggerFactory
2527
import java.util.concurrent.CompletableFuture
2628
import java.util.concurrent.TimeUnit
2729
import kotlin.test.AfterTest
30+
import kotlin.test.Ignore
2831
import kotlin.test.Test
2932
import kotlin.test.assertEquals
3033

31-
class IgniteStoreClientParallelTransactionsTest {
34+
@Ignore("Doesn't support parallel transactions (yet)")
35+
class MabBasedStoreClientParallelTransactionsTest : StoreClientParallelTransactionsTest(InMemoryStoreClient())
3236

33-
// This test is currently not relevant for the InMemoryClient,
34-
// because the in memory client does not open multiple transactions in parallel.
35-
val store = IgniteStoreClient(inmemory = true)
37+
class IgniteStoreClientParallelTransactionsTest : StoreClientParallelTransactionsTest(IgniteStoreClient(inmemory = true))
38+
39+
abstract class StoreClientParallelTransactionsTest(val store: IStoreClient) {
3640

3741
@AfterTest
3842
fun cleanup() {

0 commit comments

Comments
 (0)