@@ -18,19 +18,29 @@ import mu.KotlinLogging
1818import org.apache.ignite.Ignite
1919import org.apache.ignite.IgniteCache
2020import org.apache.ignite.Ignition
21+ import org.apache.ignite.cache.query.ScanQuery
22+ import org.apache.ignite.lang.IgniteBiPredicate
23+ import org.apache.ignite.lang.IgniteClosure
2124import org.modelix.kotlin.utils.ContextValue
2225import org.modelix.model.IGenericKeyListener
26+ import org.modelix.model.lazy.RepositoryId
2327import org.modelix.model.persistent.HashUtil
2428import org.modelix.model.server.SqlUtils
2529import java.io.File
2630import java.io.FileReader
2731import java.io.IOException
32+ import java.sql.SQLException
2833import java.util.*
34+ import javax.cache.Cache
2935import javax.sql.DataSource
3036
3137private val LOG = KotlinLogging .logger { }
3238
33- class IgniteStoreClient (jdbcConfFile : File ? = null , inmemory : Boolean = false ) : IsolatingStore, AutoCloseable {
39+ /* *
40+ * Store client implementation with an ignite cache.
41+ * If [inmemory] is true, the data is not persisted in a database.
42+ */
43+ class IgniteStoreClient (jdbcConfFile : File ? = null , private val inmemory : Boolean = false ) : IsolatingStore, AutoCloseable {
3444
3545 companion object {
3646 private const val ENTRY_CHANGED_TOPIC = " entryChanged"
@@ -43,6 +53,14 @@ class IgniteStoreClient(jdbcConfFile: File? = null, inmemory: Boolean = false) :
4353 ignite.message().send(ENTRY_CHANGED_TOPIC , it)
4454 }
4555
56+ private val igniteConfigName: String = if (inmemory) " ignite-inmemory.xml" else " ignite.xml"
57+ private val dataSource: DataSource by lazy {
58+ Ignition .loadSpringBean(
59+ IgniteStoreClient ::class .java.getResource(igniteConfigName),
60+ " dataSource" ,
61+ )
62+ }
63+
4664 /* *
4765 * Instantiate an IgniteStoreClient
4866 *
@@ -74,8 +92,7 @@ class IgniteStoreClient(jdbcConfFile: File? = null, inmemory: Boolean = false) :
7492 )
7593 }
7694 }
77- val igniteConfigName = if (inmemory) " ignite-inmemory.xml" else " ignite.xml"
78- if (! inmemory) updateDatabaseSchema(igniteConfigName)
95+ if (! inmemory) updateDatabaseSchema()
7996 ignite = Ignition .start(javaClass.getResource(igniteConfigName))
8097 cache = ignite.getOrCreateCache(" model" )
8198
@@ -87,11 +104,7 @@ class IgniteStoreClient(jdbcConfFile: File? = null, inmemory: Boolean = false) :
87104 }
88105 }
89106
90- private fun updateDatabaseSchema (igniteConfigName : String ) {
91- val dataSource: DataSource = Ignition .loadSpringBean<DataSource >(
92- IgniteStoreClient ::class .java.getResource(igniteConfigName),
93- " dataSource" ,
94- )
107+ private fun updateDatabaseSchema () {
95108 SqlUtils (dataSource.connection).ensureSchemaInitialization()
96109 }
97110
@@ -103,6 +116,44 @@ class IgniteStoreClient(jdbcConfFile: File? = null, inmemory: Boolean = false) :
103116 return cache.associate { it.key to it.value }
104117 }
105118
119+ override fun removeRepositoryObjects (repositoryId : RepositoryId ) {
120+ if (! inmemory) {
121+ // Not all entries are in the cache. We delete them directly instead of loading them into the cache first.
122+ // This should be safe as the repository has already been removed from the list of available ones.
123+ removeRepositoryObjectsFromDatabase(repositoryId)
124+ }
125+
126+ val filter = IgniteBiPredicate <ObjectInRepository , String ?> { key, _ ->
127+ key.getRepositoryId() == repositoryId.id
128+ }
129+ val transformer = IgniteClosure <Cache .Entry <ObjectInRepository , String ?>, ObjectInRepository ? > { entry ->
130+ entry.key
131+ }
132+ val query = ScanQuery (filter)
133+
134+ // sorting is necessary to avoid deadlocks, see documentation of IgniteCache::removeAllAsync
135+ val toDelete = cache.query(query, transformer).all.asSequence().filterNotNull().toSortedSet()
136+ LOG .info { " Deleting cache entries asynchronously. [numberOfEntries=${toDelete.size} ]" }
137+ cache.removeAllAsync(toDelete).listen { LOG .info { " Cache entries deleted." } }
138+ }
139+
140+ private fun removeRepositoryObjectsFromDatabase (repositoryId : RepositoryId ) {
141+ require(! inmemory) { " Cannot remove from database in in-memory mode." }
142+ LOG .info { " Removing repository objects from database." }
143+
144+ dataSource.connection.use { connection ->
145+ connection.prepareStatement(" DELETE from model WHERE repository = ?" ).use { stmt ->
146+ stmt.setString(1 , repositoryId.id)
147+ try {
148+ val deletedRows = stmt.executeUpdate()
149+ LOG .info { " Deleted rows from database. [deletedRows=$deletedRows ]" }
150+ } catch (e: SQLException ) {
151+ LOG .error { e }
152+ }
153+ }
154+ }
155+ }
156+
106157 override fun putAll (entries : Map <ObjectInRepository , String ?>, silent : Boolean ) {
107158 // Sorting is important to avoid deadlocks (lock ordering).
108159 // The documentation of IgniteCache.putAll also states that this a requirement.
@@ -174,7 +225,8 @@ class PendingChangeMessages(private val notifier: (ObjectInRepository) -> Unit)
174225 }
175226
176227 fun entryChanged (key : ObjectInRepository ) {
177- val messages = checkNotNull(pendingChangeMessages.getValueOrNull()) { " Only allowed inside PendingChangeMessages.runAndFlush" }
228+ val messages =
229+ checkNotNull(pendingChangeMessages.getValueOrNull()) { " Only allowed inside PendingChangeMessages.runAndFlush" }
178230 messages.add(key)
179231 }
180232}
0 commit comments