14
14
*/
15
15
package org.modelix.model.server.store
16
16
17
- import com.google.common.collect.MultimapBuilder
17
+ import mu.KotlinLogging
18
18
import org.apache.ignite.Ignite
19
19
import org.apache.ignite.IgniteCache
20
20
import org.apache.ignite.Ignition
21
21
import org.modelix.model.IKeyListener
22
+ import org.modelix.model.persistent.HashUtil
22
23
import java.io.File
23
24
import java.io.FileReader
24
25
import java.io.IOException
25
26
import java.util.*
26
- import java.util.concurrent.Executors
27
27
import java.util.stream.Collectors
28
28
29
- class IgniteStoreClient (jdbcConfFile : File ? ) : IStoreClient {
30
- private val ignite: Ignite
29
+ private val LOG = KotlinLogging .logger { }
30
+
31
+ class IgniteStoreClient (jdbcConfFile : File ? = null , inmemory : Boolean = false ) : IStoreClient, AutoCloseable {
32
+ private val ENTRY_CHANGED_TOPIC = " entryChanged"
33
+ private lateinit var ignite: Ignite
31
34
private val cache: IgniteCache <String , String ?>
32
- private val timer = Executors .newScheduledThreadPool(1 )
33
- private val listeners = MultimapBuilder .hashKeys().hashSetValues().build<String , IKeyListener >()
35
+ private val changeNotifier = ChangeNotifier (this )
36
+ private val pendingChangeMessages = PendingChangeMessages {
37
+ ignite.message().send(ENTRY_CHANGED_TOPIC , it)
38
+ }
34
39
35
40
/* *
36
41
* Istantiate an IgniteStoreClient
@@ -63,11 +68,18 @@ class IgniteStoreClient(jdbcConfFile: File?) : IStoreClient {
63
68
)
64
69
}
65
70
}
66
- ignite = Ignition .start(javaClass.getResource(" ignite.xml" ))
71
+ ignite = Ignition .start(javaClass.getResource(if (inmemory) " ignite-inmemory.xml " else " ignite.xml" ))
67
72
cache = ignite.getOrCreateCache(" model" )
68
73
// timer.scheduleAtFixedRate(() -> {
69
74
// System.out.println("stats: " + cache.metrics());
70
75
// }, 10, 10, TimeUnit.SECONDS);
76
+
77
+ ignite.message().localListen(ENTRY_CHANGED_TOPIC ) { nodeId: UUID ? , key: Any? ->
78
+ if (key is String ) {
79
+ changeNotifier.notifyListeners(key)
80
+ }
81
+ true
82
+ }
71
83
}
72
84
73
85
override fun get (key : String ): String? {
@@ -83,51 +95,38 @@ class IgniteStoreClient(jdbcConfFile: File?) : IStoreClient {
83
95
return cache.getAll(keys)
84
96
}
85
97
98
+ override fun getAll (): Map <String , String ?> {
99
+ return cache.associate { it.key to it.value }
100
+ }
101
+
86
102
override fun put (key : String , value : String? , silent : Boolean ) {
87
103
putAll(Collections .singletonMap(key, value), silent)
88
104
}
89
105
90
106
override fun putAll (entries : Map <String , String ?>, silent : Boolean ) {
91
107
val deletes = entries.filterValues { it == null }
92
108
val puts = entries.filterValues { it != null }
93
- if (deletes.isNotEmpty()) cache.removeAll(deletes.keys)
94
- if (puts.isNotEmpty()) cache.putAll(puts)
95
- if (! silent) {
96
- for ((key, value) in entries) {
97
- ignite.message().send(key, value ? : IKeyListener .NULL_VALUE )
109
+ runTransaction {
110
+ if (deletes.isNotEmpty()) cache.removeAll(deletes.keys)
111
+ if (puts.isNotEmpty()) cache.putAll(puts)
112
+ if (! silent) {
113
+ for (key in entries.keys) {
114
+ if (HashUtil .isSha256(key)) continue
115
+ pendingChangeMessages.entryChanged(key)
116
+ }
98
117
}
99
118
}
100
119
}
101
120
102
121
override fun listen (key : String , listener : IKeyListener ) {
103
- synchronized(listeners) {
104
- val wasSubscribed = listeners.containsKey(key)
105
- listeners.put(key, listener)
106
- if (! wasSubscribed) {
107
- ignite.message()
108
- .localListen(
109
- key,
110
- ) { nodeId: UUID ? , value: Any? ->
111
- if (value is String ) {
112
- synchronized(listeners) {
113
- for (l in listeners[key].toList()) {
114
- try {
115
- l.changed(key, if (value == IKeyListener .NULL_VALUE ) null else value)
116
- } catch (ex: Exception ) {
117
- println (ex.message)
118
- ex.printStackTrace()
119
- }
120
- }
121
- }
122
- }
123
- true
124
- }
125
- }
126
- }
122
+ // Entries where the key is the SHA hash over the value are not expected to change and listening is unnecessary.
123
+ require(! HashUtil .isSha256(key)) { " Listener for $key will never get notified." }
124
+
125
+ changeNotifier.addListener(key, listener)
127
126
}
128
127
129
128
override fun removeListener (key : String , listener : IKeyListener ) {
130
- synchronized(listeners) { listeners.remove (key, listener) }
129
+ changeNotifier.removeListener (key, listener)
131
130
}
132
131
133
132
override fun generateId (key : String ): Long {
@@ -136,14 +135,83 @@ class IgniteStoreClient(jdbcConfFile: File?) : IStoreClient {
136
135
137
136
override fun <T > runTransaction (body : () -> T ): T {
138
137
val transactions = ignite.transactions()
139
- transactions.txStart().use { tx ->
140
- val result = body()
141
- tx.commit()
142
- return result
138
+ if (transactions.tx() == null ) {
139
+ transactions.txStart().use { tx ->
140
+ val result = body()
141
+ tx.commit()
142
+ pendingChangeMessages.flushChangeMessages()
143
+ return result
144
+ }
145
+ } else {
146
+ // already in a transaction
147
+ return body()
143
148
}
144
149
}
145
150
146
151
fun dispose () {
147
152
ignite.close()
148
153
}
154
+
155
+ override fun close () {
156
+ dispose()
157
+ }
158
+ }
159
+
160
+ class PendingChangeMessages (private val notifier : (String ) -> Unit ) {
161
+ private val pendingChangeMessages = Collections .synchronizedSet(HashSet <String >())
162
+
163
+ @Synchronized
164
+ fun flushChangeMessages () {
165
+ for (pendingChangeMessage in pendingChangeMessages) {
166
+ notifier(pendingChangeMessage)
167
+ }
168
+ pendingChangeMessages.clear()
169
+ }
170
+
171
+ @Synchronized
172
+ fun entryChanged (key : String ) {
173
+ pendingChangeMessages + = key
174
+ }
175
+ }
176
+
177
+ class ChangeNotifier (val store : IStoreClient ) {
178
+ private val changeNotifiers = HashMap <String , EntryChangeNotifier >()
179
+
180
+ @Synchronized
181
+ fun notifyListeners (key : String ) {
182
+ changeNotifiers[key]?.notifyIfChanged()
183
+ }
184
+
185
+ @Synchronized
186
+ fun addListener (key : String , listener : IKeyListener ) {
187
+ changeNotifiers.getOrPut(key) { EntryChangeNotifier (key) }.listeners.add(listener)
188
+ }
189
+
190
+ @Synchronized
191
+ fun removeListener (key : String , listener : IKeyListener ) {
192
+ val notifier = changeNotifiers[key] ? : return
193
+ notifier.listeners.remove(listener)
194
+ if (notifier.listeners.isEmpty()) {
195
+ changeNotifiers.remove(key)
196
+ }
197
+ }
198
+
199
+ private inner class EntryChangeNotifier (val key : String ) {
200
+ val listeners = HashSet <IKeyListener >()
201
+ private var lastNotifiedValue: String? = null
202
+
203
+ fun notifyIfChanged () {
204
+ val value = store.get(key)
205
+ if (value == lastNotifiedValue) return
206
+ lastNotifiedValue = value
207
+
208
+ for (listener in listeners) {
209
+ try {
210
+ listener.changed(key, value)
211
+ } catch (ex: Exception ) {
212
+ LOG .error(" Exception in listener of $key " , ex)
213
+ }
214
+ }
215
+ }
216
+ }
149
217
}
0 commit comments