Skip to content

Commit bd006ec

Browse files
authored
Merge pull request #10 from weaviate/merge_optimizations_concurrency
Merge optimizations - support for concurrency
2 parents ed969e0 + 8288f7e commit bd006ec

File tree

2 files changed

+285
-192
lines changed

2 files changed

+285
-192
lines changed

bitmap_opt.go

Lines changed: 124 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,125 @@
11
package sroar
22

3+
import (
4+
"sync"
5+
)
6+
7+
const minContainersForConcurrency = 16
8+
39
// AndToSuperset calculates intersection of current and incoming bitmap
410
// It reuses containers present in current bitmap
5-
// and utilize container buffer provided.
11+
// and utilize container buffers provided.
12+
// Number of passed buffers indicates concurrency level
13+
// (e.g. 4 buffers = merge will be performed by 4 goroutines).
14+
//
15+
// CAUTION: should be used only when current bitmap contained before
16+
// all elements present in incoming bitmap
17+
func (dst *Bitmap) AndToSuperset(src *Bitmap, containerBufs ...[]uint16) {
18+
conc := len(containerBufs)
19+
assert(conc > 0)
20+
21+
dstNumKeys := dst.keys.numKeys()
22+
if src == nil {
23+
concurrentlyOnRange(conc, dstNumKeys, func(_, from, to int) {
24+
zeroOutSelectedContainers(dst, from, to)
25+
})
26+
return
27+
}
28+
29+
srcNumKeys := src.keys.numKeys()
30+
concurrentlyOnRange(conc, dstNumKeys, func(i, from, to int) {
31+
andSelectedContainers(dst, src, from, to, 0, srcNumKeys, containerBufs[i])
32+
})
33+
}
34+
35+
// OrToSuperset calculates union of current and incoming bitmap
36+
// It reuses containers present in current bitmap
37+
// and utilize containers buffers provided.
38+
// Number of passed buffers indicates concurrency level
39+
// (e.g. 4 buffers = merge will be performed by 4 goroutines).
40+
//
41+
// CAUTION: should be used only when current bitmap contained before
42+
// all elements present in incoming bitmap
43+
func (dst *Bitmap) OrToSuperset(src *Bitmap, containerBufs ...[]uint16) {
44+
conc := len(containerBufs)
45+
assert(conc > 0)
46+
47+
if src == nil {
48+
return
49+
}
50+
51+
srcNumKeys := src.keys.numKeys()
52+
concurrentlyOnRange(conc, srcNumKeys, func(i, from, to int) {
53+
orSelectedContainers(dst, src, from, to, containerBufs[i])
54+
})
55+
}
56+
57+
// AndNotToSuperset calculates difference between current and incoming bitmap
58+
// It reuses containers present in current bitmap
59+
// and utilize containers buffers provided.
60+
// Number of passed buffers indicates concurrency level
61+
// (e.g. 4 buffers = merge will be performed by 4 goroutines).
662
//
763
// CAUTION: should be used only when current bitmap contained before
864
// all elements present in incoming bitmap
9-
func (dst *Bitmap) AndToSuperset(src *Bitmap, containerBuf []uint16) {
65+
func (dst *Bitmap) AndNotToSuperset(src *Bitmap, containerBufs ...[]uint16) {
66+
conc := len(containerBufs)
67+
assert(conc > 0)
68+
1069
if src == nil {
11-
for ai, an := 0, dst.keys.numKeys(); ai < an; ai++ {
12-
off := dst.keys.val(ai)
13-
zeroOutContainer(dst.getContainer(off))
70+
return
71+
}
72+
73+
dstNumKeys := dst.keys.numKeys()
74+
srcNumKeys := src.keys.numKeys()
75+
concurrentlyOnRange(conc, dstNumKeys, func(i, from, to int) {
76+
andNotSelectedContainers(dst, src, from, to, 0, srcNumKeys, containerBufs[i])
77+
})
78+
}
79+
80+
func (ra *Bitmap) ConvertToBitmapContainers() {
81+
for ai, an := 0, ra.keys.numKeys(); ai < an; ai++ {
82+
ak := ra.keys.key(ai)
83+
off := ra.keys.val(ai)
84+
ac := ra.getContainer(off)
85+
86+
if ac[indexType] == typeArray {
87+
c := array(ac).toBitmapContainer(nil)
88+
offset := ra.newContainer(uint16(len(c)))
89+
copy(ra.data[offset:], c)
90+
ra.setKey(ak, offset)
1491
}
92+
}
93+
}
94+
95+
func concurrentlyOnRange(conc, max int, callback func(i, from, to int)) {
96+
if conc == 1 || max < conc*minContainersForConcurrency {
97+
callback(0, 0, max)
1598
return
1699
}
17100

18-
a, b := dst, src
19-
ai, an := 0, a.keys.numKeys()
20-
bi, bn := 0, b.keys.numKeys()
101+
delta := max / conc
102+
103+
wg := new(sync.WaitGroup)
104+
wg.Add(conc - 1)
105+
for i := 0; i < conc-1; i++ {
106+
go func(i int) {
107+
callback(i, delta*i, delta*(i+1))
108+
wg.Done()
109+
}(i)
110+
}
111+
callback(conc-1, delta*(conc-1), max)
112+
wg.Wait()
113+
}
114+
115+
func zeroOutSelectedContainers(a *Bitmap, ai, an int) {
116+
for ; ai < an; ai++ {
117+
off := a.keys.val(ai)
118+
zeroOutContainer(a.getContainer(off))
119+
}
120+
}
21121

122+
func andSelectedContainers(a, b *Bitmap, ai, an, bi, bn int, containerBuf []uint16) {
22123
for ai < an && bi < bn {
23124
ak := a.keys.key(ai)
24125
bk := b.keys.key(bi)
@@ -49,64 +150,38 @@ func (dst *Bitmap) AndToSuperset(src *Bitmap, containerBuf []uint16) {
49150
}
50151
}
51152

52-
// OrToSuperset calculates union of current and incoming bitmap
53-
// It reuses containers present in current bitmap
54-
// and utilize container buffer provided.
55-
//
56-
// CAUTION: should be used only when current bitmap contained before
57-
// all elements present in incoming bitmap
58-
func (dst *Bitmap) OrToSuperset(src *Bitmap, containerBuf []uint16) {
59-
if src == nil {
60-
return
61-
}
62-
63-
srcIdx, numKeys := 0, src.keys.numKeys()
64-
for ; srcIdx < numKeys; srcIdx++ {
65-
srcCont := src.getContainer(src.keys.val(srcIdx))
66-
if getCardinality(srcCont) == 0 {
153+
func orSelectedContainers(a, b *Bitmap, bi, bn int, containerBuf []uint16) {
154+
for ; bi < bn; bi++ {
155+
off := b.keys.val(bi)
156+
bc := b.getContainer(off)
157+
if getCardinality(bc) == 0 {
67158
continue
68159
}
69160

70-
key := src.keys.key(srcIdx)
71-
72-
dstIdx := dst.keys.search(key)
73-
if dstIdx >= dst.keys.numKeys() || dst.keys.key(dstIdx) != key {
161+
bk := b.keys.key(bi)
162+
ai := a.keys.search(bk)
163+
if ai >= a.keys.numKeys() || a.keys.key(ai) != bk {
74164
// Container does not exist in dst.
75165
panic("Current bitmap should have all containers of incoming bitmap")
76166
} else {
77167
// Container exists in dst as well. Do an inline containerOr.
78-
offset := dst.keys.val(dstIdx)
79-
dstCont := dst.getContainer(offset)
80-
containerOrToSuperset(dstCont, srcCont, containerBuf)
168+
off = a.keys.val(ai)
169+
ac := a.getContainer(off)
170+
containerOrToSuperset(ac, bc, containerBuf)
81171
}
82172
}
83173
}
84174

85-
// AndNotToSuperset calculates difference between current and incoming bitmap
86-
// It reuses containers present in current bitmap
87-
// and utilize container buffer provided.
88-
//
89-
// CAUTION: should be used only when current bitmap contained before
90-
// all elements present in incoming bitmap
91-
func (dst *Bitmap) AndNotToSuperset(src *Bitmap, containerBuf []uint16) {
92-
if src == nil {
93-
return
94-
}
95-
96-
a, b := dst, src
97-
ai, an := 0, a.keys.numKeys()
98-
bi, bn := 0, b.keys.numKeys()
99-
175+
func andNotSelectedContainers(a, b *Bitmap, ai, an, bi, bn int, containerBuf []uint16) {
100176
for ai < an && bi < bn {
101177
ak := a.keys.key(ai)
102178
bk := b.keys.key(bi)
103179
if ak == bk {
104-
off := a.keys.val(ai)
105-
ac := a.getContainer(off)
106-
off = b.keys.val(bi)
180+
off := b.keys.val(bi)
107181
bc := b.getContainer(off)
108-
109182
if getCardinality(bc) != 0 {
183+
off = a.keys.val(ai)
184+
ac := a.getContainer(off)
110185
containerAndNotToSuperset(ac, bc, containerBuf)
111186
}
112187
ai++
@@ -118,18 +193,3 @@ func (dst *Bitmap) AndNotToSuperset(src *Bitmap, containerBuf []uint16) {
118193
}
119194
}
120195
}
121-
122-
func (ra *Bitmap) ConvertToBitmapContainers() {
123-
for ai, an := 0, ra.keys.numKeys(); ai < an; ai++ {
124-
ak := ra.keys.key(ai)
125-
off := ra.keys.val(ai)
126-
ac := ra.getContainer(off)
127-
128-
if ac[indexType] == typeArray {
129-
c := array(ac).toBitmapContainer(nil)
130-
offset := ra.newContainer(uint16(len(c)))
131-
copy(ra.data[offset:], c)
132-
ra.setKey(ak, offset)
133-
}
134-
}
135-
}

0 commit comments

Comments
 (0)