@@ -4,6 +4,7 @@ import com.badoo.reaktive.maybe.Maybe
4
4
import com.badoo.reaktive.maybe.asObservable
5
5
import com.badoo.reaktive.maybe.filter
6
6
import com.badoo.reaktive.maybe.flatMap
7
+ import com.badoo.reaktive.maybe.flatMapObservable
7
8
import com.badoo.reaktive.maybe.map
8
9
import com.badoo.reaktive.maybe.maybeOf
9
10
import com.badoo.reaktive.maybe.maybeOfEmpty
@@ -12,16 +13,19 @@ import com.badoo.reaktive.observable.Observable
12
13
import com.badoo.reaktive.observable.asObservable
13
14
import com.badoo.reaktive.observable.filter
14
15
import com.badoo.reaktive.observable.flatMap
15
- import com.badoo.reaktive.observable.flatMapMaybe
16
16
import com.badoo.reaktive.observable.flatMapSingle
17
17
import com.badoo.reaktive.observable.flatten
18
18
import com.badoo.reaktive.observable.map
19
19
import com.badoo.reaktive.observable.observableOf
20
20
import com.badoo.reaktive.observable.observableOfEmpty
21
+ import com.badoo.reaktive.observable.toList
21
22
import com.badoo.reaktive.single.Single
22
23
import com.badoo.reaktive.single.asMaybe
23
24
import com.badoo.reaktive.single.flatMapMaybe
25
+ import com.badoo.reaktive.single.flatMapObservable
24
26
import com.badoo.reaktive.single.flatten
27
+ import com.badoo.reaktive.single.singleOf
28
+ import com.badoo.reaktive.single.toSingle
25
29
import com.badoo.reaktive.single.zipWith
26
30
import org.modelix.model.async.IAsyncObjectStore
27
31
import org.modelix.model.bitCount
@@ -81,6 +85,39 @@ class CPHamtInternal(
81
85
}
82
86
}
83
87
88
+ override fun putAll (entries : List <Pair <Long , KVEntryReference <CPNode >? >>, shift : Int , store : IAsyncObjectStore ): Maybe <CPHamtNode > {
89
+ val groups = entries.groupBy { indexFromKey(it.first, shift) }
90
+ val logicalIndices = groups.keys.toIntArray()
91
+ val newChildrenLists = groups.values.toList()
92
+ return getChildren(logicalIndices, store).flatMapObservable { children: List <CPHamtNode ?> ->
93
+ children.withIndex().asObservable().flatMapSingle { (i, oldChild) ->
94
+ val newChildren = newChildrenLists[i]
95
+ if (oldChild == null ) {
96
+ val nonNullChildren = newChildren.filter { it.second != null }
97
+ when (nonNullChildren.size) {
98
+ 0 -> null .toSingle()
99
+ 1 -> {
100
+ val singleChild = nonNullChildren.single()
101
+ CPHamtLeaf .create(singleChild.first, singleChild.second).toSingle()
102
+ }
103
+ else -> {
104
+ createEmpty().putAll(nonNullChildren, shift + BITS_PER_LEVEL , store).orNull()
105
+ }
106
+ }
107
+ } else {
108
+ oldChild.putAll(newChildren, shift + BITS_PER_LEVEL , store).orNull()
109
+ }
110
+ }
111
+ }.toList().flatMapMaybe { updatedChildren ->
112
+ setChildren(
113
+ logicalIndices,
114
+ updatedChildren.map { it?.let { KVEntryReference (it) } },
115
+ shift,
116
+ store,
117
+ )
118
+ }
119
+ }
120
+
84
121
override fun remove (key : Long , shift : Int , store : IAsyncObjectStore ): Maybe <CPHamtNode > {
85
122
require(shift <= CPHamtNode .MAX_SHIFT ) { " $shift > ${CPHamtNode .MAX_SHIFT } " }
86
123
val childIndex = CPHamtNode .indexFromKey(key, shift)
@@ -103,6 +140,19 @@ class CPHamtInternal(
103
140
}
104
141
}
105
142
143
+ override fun getAll (
144
+ keys : LongArray ,
145
+ shift : Int ,
146
+ store : IAsyncObjectStore ,
147
+ ): Observable <Pair <Long , KVEntryReference <CPNode >? >> {
148
+ val groups = keys.groupBy { indexFromKey(it, shift) }
149
+ return groups.entries.asObservable().flatMap { group ->
150
+ getChild(group.key, store).flatMapObservable { child ->
151
+ child.getAll(group.value.toLongArray(), shift + BITS_PER_LEVEL , store)
152
+ }
153
+ }
154
+ }
155
+
106
156
protected fun getChild (logicalIndex : Int , store : IAsyncObjectStore ): Maybe <CPHamtNode > {
107
157
if (isBitNotSet(data.bitmap, logicalIndex)) {
108
158
return maybeOfEmpty()
@@ -113,10 +163,60 @@ class CPHamtInternal(
113
163
return getChild(childHash, store).asMaybe()
114
164
}
115
165
166
+ private fun getChildren (logicalIndices : IntArray , store : IAsyncObjectStore ): Single <List <CPHamtNode ?>> {
167
+ val childHashes = logicalIndices.map { logicalIndex ->
168
+ if (isBitNotSet(data.bitmap, logicalIndex)) {
169
+ null
170
+ } else {
171
+ val physicalIndex = logicalToPhysicalIndex(data.bitmap, logicalIndex)
172
+ data.children[physicalIndex]
173
+ }
174
+ }
175
+ return childHashes.asObservable().flatMapSingle { it?.getValue(store) ? : singleOf(null ) }.toList()
176
+ }
177
+
116
178
protected fun getChild (childHash : KVEntryReference <CPHamtNode >, store : IAsyncObjectStore ): Single <CPHamtNode > {
117
179
return childHash.getValue(store)
118
180
}
119
181
182
+ fun setChildren (logicalIndices : IntArray , children : List <KVEntryReference <CPHamtNode >? >, shift : Int , store : IAsyncObjectStore ): Maybe <CPHamtNode > {
183
+ var oldBitmap = data.bitmap
184
+ var newBitmap = data.bitmap
185
+ val oldChildren = data.children
186
+ var newChildren = data.children
187
+ for (i in logicalIndices.indices) {
188
+ val logicalIndex = logicalIndices[i]
189
+ val newChild = children[i]
190
+ val oldChild = if (isBitNotSet(oldBitmap, logicalIndex)) {
191
+ null
192
+ } else {
193
+ oldChildren[logicalToPhysicalIndex(oldBitmap, logicalIndex)]
194
+ }
195
+ if (newChild == null ) {
196
+ if (oldChild == null ) {
197
+ // nothing changed
198
+ } else {
199
+ newChildren = COWArrays .removeAt(newChildren, logicalToPhysicalIndex(newBitmap, logicalIndex))
200
+ newBitmap = newBitmap and (1 shl logicalIndex).inv () // clear bit
201
+ }
202
+ } else {
203
+ if (oldChild == null ) {
204
+ newChildren = COWArrays .insert(newChildren, logicalToPhysicalIndex(newBitmap, logicalIndex), newChild)
205
+ newBitmap = newBitmap or (1 shl logicalIndex) // set bit
206
+ } else {
207
+ newChildren = COWArrays .set(newChildren, logicalToPhysicalIndex(newBitmap, logicalIndex), newChild)
208
+ }
209
+ }
210
+ }
211
+
212
+ val newNode = create(newBitmap, newChildren)
213
+ return if (shift < MAX_BITS - BITS_PER_LEVEL ) {
214
+ CPHamtSingle .replaceIfSingleChild(newNode, store).asMaybe()
215
+ } else {
216
+ newNode.toMaybe()
217
+ }
218
+ }
219
+
120
220
fun setChild (logicalIndex : Int , child : CPHamtNode ? , shift : Int , store : IAsyncObjectStore ): Maybe <CPHamtNode > {
121
221
if (child == null ) {
122
222
return deleteChild(logicalIndex, store)
0 commit comments