@@ -2,7 +2,6 @@ package org.modelix.model.lazy
2
2
3
3
import com.badoo.reaktive.single.notNull
4
4
import kotlinx.coroutines.sync.Mutex
5
- import kotlinx.coroutines.sync.withLock
6
5
import org.modelix.kotlin.utils.AtomicBoolean
7
6
import org.modelix.model.persistent.IKVValue
8
7
import org.modelix.streams.CompletableObservable
@@ -17,63 +16,38 @@ class BulkQuery(private val store: IDeserializingKeyValueStore, config: BulkQuer
17
16
private val queue: MutableMap <String , QueueElement <out IKVValue >> = LinkedHashMap ()
18
17
private var processing = AtomicBoolean (false )
19
18
private val batchSize: Int = config.requestBatchSize
20
- private val prefetchSize: Int = config.prefetchBatchSize ? : (this .batchSize / 2 )
21
- private val prefetchQueueSizeLimit: Int = (this .prefetchSize * 10 ).coerceAtLeast(this .batchSize * 2 )
22
- private val prefetchQueue: PrefetchQueue = PrefetchQueue (this , prefetchQueueSizeLimit).also {
23
- it.requestFilter = { ! queue.contains(it.getHash()) }
24
- }
25
19
private val processingMutex = Mutex ()
26
20
27
- init {
28
- require(this .prefetchSize <= this .batchSize) { " prefetch size ${this .prefetchSize} is greater than the batch size ${this .batchSize} " }
29
- }
30
-
31
21
private fun <T : IKVValue > getValueInstance (ref : IKVEntryReference <T >): CompletableObservable <T ?>? {
32
22
return queue[ref.getHash()]?.let { it.value as CompletableObservable <T ?>? }
33
- ? : (prefetchQueue.getValueInstance(ref) as CompletableObservable <T ?>? )
34
23
}
35
24
36
- private fun executeBulkQuery (regular : List <IKVEntryReference <IKVValue >>, prefetch : List < IKVEntryReference < IKVValue >> ): Map <String , IKVValue ?> {
37
- return store.getAll(regular, prefetch )
25
+ private fun executeBulkQuery (regular : List <IKVEntryReference <IKVValue >>): Map <String , IKVValue ?> {
26
+ return store.getAll(regular)
38
27
}
39
28
40
29
override fun <T : IKVValue > query (hash : IKVEntryReference <T >): IStream .ZeroOrOne <T > {
41
30
if (! hash.isWritten()) return IStream .of(hash.getValue(store))
42
31
43
- val cachedValue = store.getIfCached(hash.getHash(), hash.getDeserializer(), prefetchQueue.isLoadingGoal() )
32
+ val cachedValue = store.getIfCached(hash.getHash(), hash.getDeserializer(), false )
44
33
if (cachedValue != null ) {
45
34
return IStream .of(cachedValue)
46
35
}
47
36
48
37
val existingValue = getValueInstance(hash)
49
38
if (existingValue != null && existingValue.isDone()) return ReaktiveStreamBuilder .WrapperMaybe (existingValue.single.notNull())
50
39
51
- if (prefetchQueue.isLoadingGoal()) {
52
- prefetchQueue.addRequest(hash, getValueInstance(hash) ? : CompletableObservable (::executeQuery))
53
- return IStream .empty() // transitive objects are loaded when the prefetch queue is processed the next time
54
- } else {
55
- if (queue.size >= batchSize && ! processing.get()) executeQuery()
40
+ if (queue.size >= batchSize && ! processing.get()) executeQuery()
56
41
57
- val existingQueueElement = queue[hash.getHash()] as QueueElement <T >?
58
- val result = if (existingQueueElement != null ) {
59
- existingQueueElement.value
60
- } else {
61
- val result: CompletableObservable <T ?> = getValueInstance(hash) ? : CompletableObservable (::executeQuery)
62
- queue.put(hash.getHash(), QueueElement <T >(hash, result))
63
- result
64
- }
65
- return ReaktiveStreamBuilder .WrapperMaybe (result.single.notNull())
66
- }
67
- }
68
-
69
- override fun offerPrefetch (goal : IPrefetchGoal ) {
70
- prefetchQueue.addGoal(goal)
71
- }
72
-
73
- private suspend fun executeQuerySuspending () {
74
- processingMutex.withLock {
75
- executeQuery()
42
+ val existingQueueElement = queue[hash.getHash()] as QueueElement <T >?
43
+ val result = if (existingQueueElement != null ) {
44
+ existingQueueElement.value
45
+ } else {
46
+ val result: CompletableObservable <T ?> = getValueInstance(hash) ? : CompletableObservable (::executeQuery)
47
+ queue.put(hash.getHash(), QueueElement <T >(hash, result))
48
+ result
76
49
}
50
+ return ReaktiveStreamBuilder .WrapperMaybe (result.single.notNull())
77
51
}
78
52
79
53
override fun executeQuery () {
@@ -86,17 +60,12 @@ class BulkQuery(private val store: IDeserializingKeyValueStore, config: BulkQuer
86
60
val regularRequests: List <Pair <IKVEntryReference <* >, CompletableObservable <* >>> = queue.entries.tailSequence(batchSize)
87
61
.map { it.value.hash to it.value.value }
88
62
.toList()
89
- if (queue.size < prefetchSize) {
90
- prefetchQueue.fillRequestsQueue(prefetchSize - regularRequests.size)
91
- }
92
- val prefetchRequests: List <Pair <IKVEntryReference <* >, CompletableObservable <* >>> = prefetchQueue.getRequests(prefetchSize - regularRequests.size)
93
63
regularRequests.forEach { queue.remove(it.first.getHash()) }
94
64
95
- val allRequests: List <Pair <IKVEntryReference <* >, CompletableObservable <* >>> = regularRequests + prefetchRequests
65
+ val allRequests: List <Pair <IKVEntryReference <* >, CompletableObservable <* >>> = regularRequests
96
66
97
67
val entries: Map <String , IKVValue ?> = executeBulkQuery(
98
68
regularRequests.asSequence().map { obj -> obj.first }.toSet().toList(),
99
- prefetchRequests.asSequence().map { obj -> obj.first }.toSet().toList(),
100
69
)
101
70
for (request in allRequests) {
102
71
(request.second as CompletableObservable <IKVValue ?>).complete(entries[request.first.getHash()])
@@ -125,95 +94,3 @@ private fun <T> Sequence<T>.tailSequence(size: Int, tailSize: Int): Sequence<T>
125
94
private fun <T > Collection<T>.tailSequence (tailSize : Int ): Sequence <T > {
126
95
return asSequence().tailSequence(size, tailSize)
127
96
}
128
-
129
- @Deprecated(" Prefetching will be replaced by usages of IAsyncNode" )
130
- interface IPrefetchGoal {
131
- fun loadRequest (bulkQuery : IBulkQuery ): IStream .Many <Any ?>
132
- }
133
-
134
- @Deprecated(" Prefetching will be replaced by usages of IAsyncNode" )
135
- private class PrefetchQueue (val bulkQuery : IBulkQuery , val queueSizeLimit : Int ) {
136
- private val goals: MutableMap <IPrefetchGoal , QueuedGoal > = LinkedHashMap ()
137
- private var previousRequests: MutableMap <String , PrefetchRequest <* >> = LinkedHashMap ()
138
- private var nextRequests: MutableMap <String , PrefetchRequest <* >> = LinkedHashMap ()
139
- private var currentGoal: QueuedGoal ? = null
140
- private var anyEntryRequested = false
141
- var requestFilter: (IKVEntryReference <* >) -> Boolean = { true }
142
-
143
- fun isLoadingGoal () = currentGoal != null
144
-
145
- fun fillRequestsQueue (requestLimit : Int ) {
146
- if (requestLimit <= 0 ) return
147
-
148
- previousRequests = nextRequests
149
- nextRequests = LinkedHashMap ()
150
-
151
- for (goal in goals.values.toList().sortedByDescending { it.prefetchLevel }.asReversed()) {
152
- if (nextRequests.size >= requestLimit) break
153
- executeRequests(goal)
154
- }
155
- }
156
-
157
- fun getRequests (limit : Int ): List <Pair <IKVEntryReference <* >, CompletableObservable<*>>> {
158
- return nextRequests.entries.tailSequence(limit)
159
- .map { it.value.hash to it.value.result }
160
- .toList()
161
- }
162
-
163
- fun addGoal (goal : IPrefetchGoal ) {
164
- val newLevel = currentGoal?.prefetchLevel?.let { it + 1 } ? : 0
165
- // remove and re-add to move it to the end of the queue
166
- val queuedGoal = goals.remove(goal)?.also { it.prefetchLevel = minOf(it.prefetchLevel, newLevel) } ? : QueuedGoal (goal, newLevel)
167
- goals[goal] = queuedGoal
168
- trimQueue()
169
- }
170
-
171
- fun <T : IKVValue > addRequest (hash : IKVEntryReference <T >, result : CompletableObservable <T ?>) {
172
- addRequest(hash, checkNotNull(currentGoal) { " Not loading any goal" }, result)
173
- }
174
-
175
- private fun <T : IKVValue > addRequest (hash : IKVEntryReference <T >, goal : QueuedGoal , result : CompletableObservable <T ?>) {
176
- anyEntryRequested = true
177
-
178
- val request = (previousRequests[hash.getHash()] ? : nextRequests[hash.getHash()])?.also {
179
- require(result == it.result)
180
- it.prefetchLevel = minOf(it.prefetchLevel, goal.prefetchLevel)
181
- } ? : PrefetchRequest (hash, result, goal.prefetchLevel)
182
-
183
- if (! request.result.isDone() && requestFilter(request.hash)) {
184
- nextRequests[hash.getHash()] = request
185
- }
186
- trimQueue()
187
- }
188
-
189
- fun <T : IKVValue > getValueInstance (hash : IKVEntryReference <T >): CompletableObservable <T ?>? {
190
- return ((nextRequests[hash.getHash()] ? : previousRequests[hash.getHash()]) as PrefetchRequest <T >? )?.result
191
- }
192
-
193
- private fun trimQueue () {
194
- if (goals.size > queueSizeLimit * 2 ) {
195
- val toRemove = goals.entries.sortedBy { it.value.prefetchLevel }.drop(goals.size - queueSizeLimit).map { it.key }
196
- toRemove.forEach { goals.remove(it) }
197
- }
198
- }
199
-
200
- private fun executeRequests (goal : QueuedGoal ) {
201
- val previousGoal = currentGoal
202
- val previousAnyEntryRequested = anyEntryRequested
203
- try {
204
- currentGoal = goal
205
- anyEntryRequested = false
206
- goal.goal.loadRequest(bulkQuery).iterateSynchronous { }
207
- if (! anyEntryRequested) {
208
- goals.remove(goal.goal)
209
- }
210
- } finally {
211
- anyEntryRequested = previousAnyEntryRequested
212
- currentGoal = previousGoal
213
- }
214
- }
215
-
216
- private inner class QueuedGoal (val goal : IPrefetchGoal , var prefetchLevel : Int )
217
-
218
- private inner class PrefetchRequest <E : IKVValue >(val hash : IKVEntryReference <E >, val result : CompletableObservable <E ?>, var prefetchLevel : Int )
219
- }
0 commit comments