Skip to content

Commit 4c8d1d5

Browse files
committed
Fast read, bucker constructor/insert/compact need further tuning
1 parent 8f53ea2 commit 4c8d1d5

File tree

4 files changed

+247
-95
lines changed

4 files changed

+247
-95
lines changed

bucket.go

Lines changed: 239 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -1,87 +1,272 @@
11
package table
22

33
import "github.com/neurlang/quaternary"
4-
import "fmt"
4+
import "sync"
5+
import "math/bits"
6+
import "runtime"
57

68
type bucket struct {
7-
data [][]string
8-
filters [][]byte
9-
loglen int
9+
data [][]string
10+
index map[[2]int][][]byte
11+
//blooms [][]byte
12+
loglen int
1013
}
1114

12-
func newBucket(rows [][]string) (ret *bucket) {
15+
func (b *bucket) filter(j, c int, val string) uint64 {
16+
if b.index[[2]int{j, c}] == nil {
17+
return 0
18+
}
19+
var f = quaternary.Filters(b.index[[2]int{j, c}])
20+
return f.GetStringMulti(val)
21+
}
22+
23+
func (ret *bucket) presentBucket(col int, val string) bool {
24+
return true
25+
26+
}
27+
func newBucket(rows [][]string) *bucket {
28+
// Initialize bucket and handle empty input
29+
ret := &bucket{
30+
data: rows,
31+
index: make(map[[2]int][][]byte),
32+
}
33+
if len(rows) == 0 {
34+
ret.loglen = 0
35+
return ret
36+
}
37+
38+
// Compute number of bits
39+
loglen := bits.Len(uint(len(rows) - 1))
40+
ret.loglen = loglen
41+
42+
// Precompute masks
43+
bitMasks := make([]uint64, loglen)
44+
for b := 0; b < loglen; b++ {
45+
bitMasks[b] = 1 << b
46+
}
47+
48+
// Sequential counter pass + collect all partials
49+
type countKey struct {
50+
b, x int
51+
s string
52+
}
53+
counter := make(map[countKey]int, len(rows)*len(rows[0])*loglen)
54+
type partial struct {
55+
ikey [2]int
56+
key string
57+
bits uint64
58+
}
59+
parts := make([]partial, 0, len(rows)*len(rows[0])*loglen)
60+
61+
for y, row := range rows {
62+
rowMask := uint64(y)
63+
for x, key := range row {
64+
for b := 0; b < loglen; b++ {
65+
ck := countKey{b, x, key}
66+
cnt := counter[ck]
67+
counter[ck] = cnt + 1
68+
69+
ik := [2]int{cnt + 1, x}
70+
var bitsVal uint64
71+
if rowMask&bitMasks[b] != 0 {
72+
bitsVal = bitMasks[b]
73+
}
74+
parts = append(parts, partial{ik, key, bitsVal})
75+
}
76+
}
77+
}
78+
79+
// Zero‑count entries
80+
for k, tot := range counter {
81+
ik := [2]int{0, k.x}
82+
boolVal := uint64(tot-1) & bitMasks[k.b]
83+
parts = append(parts, partial{ik, k.s, boolVal})
84+
}
85+
86+
// Shard parts for parallel collection
87+
nWorkers := runtime.GOMAXPROCS(0)
88+
shardSize := (len(parts) + nWorkers - 1) / nWorkers
89+
workerCols := make([]map[[2]int]map[string]uint64, nWorkers)
90+
for i := range workerCols {
91+
workerCols[i] = make(map[[2]int]map[string]uint64)
92+
}
93+
94+
var wg sync.WaitGroup
95+
wg.Add(nWorkers)
96+
for w := 0; w < nWorkers; w++ {
97+
go func(w int) {
98+
defer wg.Done()
99+
start := w * shardSize
100+
if start >= len(parts) {
101+
return // nothing to do
102+
}
103+
end := start + shardSize
104+
if end > len(parts) {
105+
end = len(parts)
106+
}
107+
cols := workerCols[w]
108+
for _, p := range parts[start:end] {
109+
m := cols[p.ikey]
110+
if m == nil {
111+
m = make(map[string]uint64)
112+
cols[p.ikey] = m
113+
}
114+
m[p.key] |= p.bits
115+
}
116+
}(w)
117+
}
118+
wg.Wait()
119+
120+
// Merge workerCols into global
121+
global := make(map[[2]int]map[string]uint64, len(workerCols))
122+
for _, cols := range workerCols {
123+
for ik, km := range cols {
124+
gm := global[ik]
125+
if gm == nil {
126+
gm = make(map[string]uint64, len(km))
127+
global[ik] = gm
128+
}
129+
for k, v := range km {
130+
gm[k] |= v
131+
}
132+
}
133+
}
134+
135+
// Phase 3: build quaternary filters in parallel
136+
type task struct {
137+
ikey [2]int
138+
val map[string]uint64
139+
}
140+
tasks := make(chan task, len(global))
141+
results := make(chan struct {
142+
ik [2]int
143+
fss [][]byte
144+
}, nWorkers)
145+
146+
go func() {
147+
for ik, val := range global {
148+
tasks <- task{ik, val}
149+
}
150+
close(tasks)
151+
}()
152+
153+
wg.Add(nWorkers)
154+
for w := 0; w < nWorkers; w++ {
155+
go func() {
156+
defer wg.Done()
157+
for t := range tasks {
158+
var fs [][]byte
159+
for _, f := range quaternary.MakeStringMulti(byte(loglen), t.val) {
160+
if len(f) > 0 {
161+
fs = append(fs, f)
162+
}
163+
}
164+
results <- struct {
165+
ik [2]int
166+
fss [][]byte
167+
}{t.ikey, fs}
168+
}
169+
}()
170+
}
171+
172+
go func() {
173+
wg.Wait()
174+
close(results)
175+
}()
176+
177+
for r := range results {
178+
ret.index[r.ik] = append(ret.index[r.ik], r.fss...)
179+
}
180+
181+
return ret
182+
}
183+
184+
func newBucketslow(rows [][]string) (ret *bucket) {
13185
var loglen = 0
14186
for i := 0; 1<<i < len(rows); i++ {
15187
loglen++
16188
}
17189
ret = &bucket{
18-
data: rows,
19-
filters: [][]byte{},
20-
loglen: loglen,
190+
data: rows,
191+
index: make(map[[2]int][][]byte),
192+
loglen: loglen,
21193
}
194+
var counter = make(map[struct {
195+
b int
196+
n int
197+
s string
198+
}]int)
199+
var collection = make(map[[2]int]map[string]uint64)
200+
for y := range rows {
201+
for x := range rows[y] {
22202

23-
for b := 0; b < loglen; b++ {
24-
var counter = make(map[string]int)
25-
var collection = make(map[string]bool)
26-
var maxcols = 0
27-
for y := range rows {
28-
if maxcols < len(rows[y]) {
29-
maxcols = len(rows[y])
30-
}
31-
for x := range rows[y] {
32-
key := fmt.Sprint(x) + ":" + rows[y][x]
33-
cnt := counter[key]
34-
strkey := fmt.Sprint(cnt+1) + ":" + key
35-
boolval := (y >> b) & 1
36-
collection[strkey] = boolval == 1
203+
// do other stuff
204+
var bval uint64
205+
key := rows[y][x]
206+
for b := 0; b < loglen; b++ {
207+
cnt := counter[struct {
208+
b int
209+
n int
210+
s string
211+
}{b, x, key}]
212+
counter[struct {
213+
b int
214+
n int
215+
s string
216+
}{b, x, key}]++
217+
intkey := [2]int{cnt + 1, x}
218+
boolval := uint64(y) & (uint64(1) << b)
219+
bval |= boolval
37220
//println(strkey, "=>", boolval)
38-
counter[key]++
221+
if collection[intkey] == nil {
222+
collection[intkey] = make(map[string]uint64)
223+
}
224+
collection[intkey][key] = bval
39225
}
40226
}
41-
for k, v := range counter {
42-
strkey := "0:" + k
43-
boolval := ((v - 1) >> b) & 1
44-
collection[strkey] = boolval == 1
45-
//println(strkey, "=>", boolval)
227+
}
228+
for k, w := range counter {
229+
intkey := [2]int{0, k.n}
230+
strkey := k.s
231+
boolval := uint64(w-1) & (uint64(1) << k.b)
232+
if collection[intkey] == nil {
233+
collection[intkey] = make(map[string]uint64)
234+
}
235+
collection[intkey][strkey] |= boolval
236+
//println(strkey, "=>", boolval)
237+
}
238+
for key, val := range collection {
239+
for _, f := range quaternary.MakeStringMulti(byte(loglen), val) {
240+
if len(f) > 0 {
241+
ret.index[key] = append(ret.index[key], f)
242+
}
46243
}
47-
ret.filters = append(ret.filters, []byte(quaternary.MakeString(collection)))
48244
}
49245
return
50246
}
51247

52248
func (b *bucket) countExisting(col int, val string) (out int) {
53-
key := "0:" + fmt.Sprint(col) + ":" + val
54-
for i := 0; i < b.loglen; i++ {
55-
if quaternary.Filter(b.filters[i]).GetString(key) {
56-
out |= 1 << i
57-
}
249+
if !b.presentBucket(col, val) {
250+
return 0
58251
}
252+
out = int(b.filter(0, col, val))
59253
out++
60254
return
61255
}
62256
func (b *bucket) count(col int, val string) (out int) {
63257
if len(b.data) == 0 {
64258
return 0
65259
}
66-
key1 := "0:" + fmt.Sprint(col) + ":" + val
67-
key2 := "1:" + fmt.Sprint(col) + ":" + val
68260
var pos int
69-
for i := 0; i < b.loglen; i++ {
70-
if quaternary.Filter(b.filters[i]).GetString(key2) {
71-
pos |= 1 << i
72-
}
73-
}
74-
if col >= len(b.data[pos%len(b.data)]) {
261+
pos = int(b.filter(1, col, val))
262+
idx := pos % len(b.data)
263+
if col >= len(b.data[idx]) {
75264
return 0
76265
}
77-
if b.data[pos%len(b.data)][col] != val {
266+
if b.data[idx][col] != val {
78267
return 0
79268
}
80-
for i := 0; i < b.loglen; i++ {
81-
if quaternary.Filter(b.filters[i]).GetString(key1) {
82-
out |= 1 << i
83-
}
84-
}
269+
out = int(b.filter(0, col, val))
85270
out++
86271
return
87272
}
@@ -99,13 +284,8 @@ func (b *bucket) getAll(col int, val string) (data [][]string) {
99284
return nil
100285
}
101286
for j := 1; j <= cnt; j++ {
102-
key := fmt.Sprint(j) + ":" + fmt.Sprint(col) + ":" + val
103287
var pos int
104-
for i := 0; i < b.loglen; i++ {
105-
if quaternary.Filter(b.filters[i]).GetString(key) {
106-
pos |= 1 << i
107-
}
108-
}
288+
pos = int(b.filter(j, col, val))
109289
//println(key, pos)
110290
fetched := b.data[pos%len(b.data)]
111291
if col < len(fetched) && fetched[col] == val {
@@ -123,17 +303,13 @@ func (b *bucket) remove(col int, val string) {
123303
return
124304
}
125305
for j := 1; j <= cnt; j++ {
126-
key := fmt.Sprint(j) + ":" + fmt.Sprint(col) + ":" + val
127306
var pos int
128-
for i := 0; i < b.loglen; i++ {
129-
if quaternary.Filter(b.filters[i]).GetString(key) {
130-
pos |= 1 << i
131-
}
132-
}
307+
pos = int(b.filter(j, col, val))
308+
idx := pos % len(b.data)
133309
//println(key, pos)
134-
fetched := b.data[pos%len(b.data)]
310+
fetched := b.data[idx]
135311
if col < len(fetched) && fetched[col] == val {
136-
b.data[pos%len(b.data)] = nil
312+
b.data[idx] = nil
137313
}
138314
}
139315
return
@@ -147,13 +323,8 @@ func (b *bucket) get(col int, val string) (data []string) {
147323
return nil
148324
}
149325
for j := 1; j <= cnt; j++ {
150-
key := fmt.Sprint(j) + ":" + fmt.Sprint(col) + ":" + val
151326
var pos int
152-
for i := 0; i < b.loglen; i++ {
153-
if quaternary.Filter(b.filters[i]).GetString(key) {
154-
pos |= 1 << i
155-
}
156-
}
327+
pos = int(b.filter(j, col, val))
157328
//println(key, pos)
158329
fetched := b.data[pos%len(b.data)]
159330
if col < len(fetched) && fetched[col] == val {

0 commit comments

Comments
 (0)