Skip to content

Commit 3479e89

Browse files
authored
Bloom filter virtual table (#103)
1 parent 58e9105 commit 3479e89

File tree

11 files changed

+445
-10
lines changed

11 files changed

+445
-10
lines changed

README.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ Go, wazero and [`x/sys`](https://pkg.go.dev/golang.org/x/sys) are the _only_ run
3333
provides the [`array`](https://sqlite.org/carray.html) table-valued function.
3434
- [`github.com/ncruces/go-sqlite3/ext/blobio`](https://pkg.go.dev/github.com/ncruces/go-sqlite3/ext/blobio)
3535
simplifies [incremental BLOB I/O](https://sqlite.org/c3ref/blob_open.html).
36+
- [`github.com/ncruces/go-sqlite3/ext/bloom`](https://pkg.go.dev/github.com/ncruces/go-sqlite3/ext/bloom)
37+
provides the [`bloom_filter`](https://github.com/nalgeon/sqlean/issues/27#issuecomment-1002267134) virtual table.
3638
- [`github.com/ncruces/go-sqlite3/ext/csv`](https://pkg.go.dev/github.com/ncruces/go-sqlite3/ext/csv)
3739
reads [comma-separated values](https://sqlite.org/csv.html).
3840
- [`github.com/ncruces/go-sqlite3/ext/fileio`](https://pkg.go.dev/github.com/ncruces/go-sqlite3/ext/fileio)
@@ -51,12 +53,12 @@ Go, wazero and [`x/sys`](https://pkg.go.dev/golang.org/x/sys) are the _only_ run
5153
provides [Unicode aware](https://sqlite.org/src/dir/ext/icu) functions.
5254
- [`github.com/ncruces/go-sqlite3/ext/zorder`](https://pkg.go.dev/github.com/ncruces/go-sqlite3/ext/zorder)
5355
maps multidimensional data to one dimension.
56+
- [`github.com/ncruces/go-sqlite3/vfs/adiantum`](https://pkg.go.dev/github.com/ncruces/go-sqlite3/vfs/adiantum)
57+
wraps a VFS to offer encryption at rest.
5458
- [`github.com/ncruces/go-sqlite3/vfs/memdb`](https://pkg.go.dev/github.com/ncruces/go-sqlite3/vfs/memdb)
5559
implements an in-memory VFS.
5660
- [`github.com/ncruces/go-sqlite3/vfs/readervfs`](https://pkg.go.dev/github.com/ncruces/go-sqlite3/vfs/readervfs)
5761
implements a VFS for immutable databases.
58-
- [`github.com/ncruces/go-sqlite3/vfs/adiantum`](https://pkg.go.dev/github.com/ncruces/go-sqlite3/vfs/adiantum)
59-
wraps a VFS to offer encryption at rest.
6062

6163
### Advanced features
6264

ext/array/array.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import (
1616
// ints, floats, bools, strings or byte slices,
1717
// using [sqlite3.BindPointer] or [sqlite3.Pointer].
1818
func Register(db *sqlite3.Conn) {
19-
sqlite3.CreateModule[array](db, "array", nil,
19+
sqlite3.CreateModule(db, "array", nil,
2020
func(db *sqlite3.Conn, _, _, _ string, _ ...string) (array, error) {
2121
err := db.DeclareVTab(`CREATE TABLE x(value, array HIDDEN)`)
2222
return array{}, err

ext/bloom/bloom.go

Lines changed: 300 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,300 @@
1+
// Package bloom provides a Bloom filter virtual table.
2+
//
3+
// A Bloom filter is a space-efficient probabilistic data structure
4+
// used to test whether an element is a member of a set.
5+
//
6+
// https://github.com/nalgeon/sqlean/issues/27#issuecomment-1002267134
7+
package bloom
8+
9+
import (
10+
"errors"
11+
"fmt"
12+
"io"
13+
"math"
14+
"strconv"
15+
16+
"github.com/dchest/siphash"
17+
"github.com/ncruces/go-sqlite3"
18+
"github.com/ncruces/go-sqlite3/internal/util"
19+
)
20+
21+
// Register registers the bloom_filter virtual table:
22+
//
23+
// CREATE VIRTUAL TABLE foo USING bloom_filter(nElements, falseProb, kHashes)
24+
func Register(db *sqlite3.Conn) {
25+
sqlite3.CreateModule(db, "bloom_filter", create, connect)
26+
}
27+
28+
type bloom struct {
29+
db *sqlite3.Conn
30+
schema string
31+
storage string
32+
prob float64
33+
nfilter int64
34+
hashes int
35+
}
36+
37+
func create(db *sqlite3.Conn, _, schema, table string, arg ...string) (_ *bloom, err error) {
38+
t := bloom{
39+
db: db,
40+
schema: schema,
41+
storage: table + "_storage",
42+
}
43+
44+
nelem := 100
45+
if len(arg) > 0 {
46+
nelem, err = strconv.Atoi(arg[0])
47+
if err != nil {
48+
return nil, err
49+
}
50+
if nelem <= 0 {
51+
return nil, errors.New("bloom: number of elements in filter must be positive")
52+
}
53+
}
54+
55+
if len(arg) > 1 {
56+
t.prob, err = strconv.ParseFloat(arg[1], 64)
57+
if err != nil {
58+
return nil, err
59+
}
60+
if t.prob <= 0 || t.prob >= 1 {
61+
return nil, errors.New("bloom: probability must be in the range (0,1)")
62+
}
63+
} else {
64+
t.prob = 0.01
65+
}
66+
67+
if len(arg) > 2 {
68+
t.hashes, err = strconv.Atoi(arg[2])
69+
if err != nil {
70+
return nil, err
71+
}
72+
if t.hashes <= 0 {
73+
return nil, errors.New("bloom: number of hash functions must be positive")
74+
}
75+
} else {
76+
t.hashes = int(math.Round(-math.Log2(t.prob)))
77+
}
78+
79+
t.nfilter = computeLength(nelem, t.prob)
80+
81+
err = db.Exec(fmt.Sprintf(
82+
`CREATE TABLE %s.%s (data BLOB, p REAL, n INTEGER, m INTEGER, k INTEGER)`,
83+
sqlite3.QuoteIdentifier(t.schema), sqlite3.QuoteIdentifier(t.storage)))
84+
if err != nil {
85+
return nil, err
86+
}
87+
88+
err = db.Exec(fmt.Sprintf(
89+
`INSERT INTO %s.%s (rowid, data, p, n, m, k)
90+
VALUES (1, zeroblob(%d), %f, %d, %d, %d)`,
91+
sqlite3.QuoteIdentifier(t.schema), sqlite3.QuoteIdentifier(t.storage),
92+
t.nfilter, t.prob, nelem, t.nfilter*8, t.hashes))
93+
if err != nil {
94+
return nil, err
95+
}
96+
97+
err = db.DeclareVTab(
98+
`CREATE TABLE x(present, word HIDDEN NOT NULL PRIMARY KEY) WITHOUT ROWID`)
99+
if err != nil {
100+
t.Destroy()
101+
return nil, err
102+
}
103+
return &t, nil
104+
}
105+
106+
func connect(db *sqlite3.Conn, _, schema, table string, arg ...string) (_ *bloom, err error) {
107+
t := bloom{
108+
db: db,
109+
schema: schema,
110+
storage: table + "_storage",
111+
}
112+
113+
err = db.DeclareVTab(
114+
`CREATE TABLE x(present, word HIDDEN NOT NULL PRIMARY KEY) WITHOUT ROWID`)
115+
if err != nil {
116+
return nil, err
117+
}
118+
119+
load, _, err := db.Prepare(fmt.Sprintf(
120+
`SELECT m/8, p, k FROM %s.%s WHERE rowid = 1`,
121+
sqlite3.QuoteIdentifier(t.schema), sqlite3.QuoteIdentifier(t.storage)))
122+
if err != nil {
123+
return nil, err
124+
}
125+
defer load.Close()
126+
127+
if !load.Step() {
128+
if err = load.Err(); err == nil {
129+
err = sqlite3.CORRUPT_VTAB
130+
}
131+
return nil, err
132+
}
133+
134+
t.nfilter = load.ColumnInt64(0)
135+
t.prob = load.ColumnFloat(1)
136+
t.hashes = load.ColumnInt(2)
137+
return &t, nil
138+
}
139+
140+
func (b *bloom) Destroy() error {
141+
return b.db.Exec(fmt.Sprintf(`DROP TABLE %s.%s`,
142+
sqlite3.QuoteIdentifier(b.schema),
143+
sqlite3.QuoteIdentifier(b.storage)))
144+
}
145+
146+
func (b *bloom) Rename(new string) error {
147+
new += "_storage"
148+
err := b.db.Exec(fmt.Sprintf(`ALTER TABLE %s.%s RENAME TO %s`,
149+
sqlite3.QuoteIdentifier(b.schema),
150+
sqlite3.QuoteIdentifier(b.storage),
151+
sqlite3.QuoteIdentifier(new),
152+
))
153+
if err == nil {
154+
b.storage = new
155+
}
156+
return err
157+
}
158+
159+
func (b *bloom) BestIndex(idx *sqlite3.IndexInfo) error {
160+
for n, cst := range idx.Constraint {
161+
if cst.Usable && cst.Column == 1 &&
162+
cst.Op == sqlite3.INDEX_CONSTRAINT_EQ {
163+
idx.ConstraintUsage[n].ArgvIndex = 1
164+
}
165+
}
166+
idx.OrderByConsumed = true
167+
idx.EstimatedRows = 1
168+
idx.EstimatedCost = float64(b.hashes)
169+
idx.IdxFlags = sqlite3.INDEX_SCAN_UNIQUE
170+
return nil
171+
}
172+
173+
func (b *bloom) Update(arg ...sqlite3.Value) (rowid int64, err error) {
174+
if arg[0].Type() != sqlite3.NULL {
175+
if len(arg) == 1 {
176+
return 0, errors.New("bloom: elements cannot be deleted")
177+
}
178+
return 0, errors.New("bloom: elements cannot be updated")
179+
}
180+
181+
blob := arg[2].RawBlob()
182+
183+
f, err := b.db.OpenBlob(b.schema, b.storage, "data", 1, true)
184+
if err != nil {
185+
return 0, err
186+
}
187+
defer f.Close()
188+
189+
for n := 0; n < b.hashes; n++ {
190+
hash := calcHash(n, blob)
191+
hash %= uint64(b.nfilter * 8)
192+
bitpos := byte(hash % 8)
193+
bytepos := int64(hash / 8)
194+
195+
var buf [1]byte
196+
_, err = f.Seek(bytepos, io.SeekStart)
197+
if err != nil {
198+
return 0, err
199+
}
200+
_, err = f.Read(buf[:])
201+
if err != nil {
202+
return 0, err
203+
}
204+
205+
buf[0] |= (1 << bitpos)
206+
207+
_, err = f.Seek(bytepos, io.SeekStart)
208+
if err != nil {
209+
return 0, err
210+
}
211+
_, err = f.Write(buf[:])
212+
if err != nil {
213+
return 0, err
214+
}
215+
}
216+
return 0, nil
217+
}
218+
219+
func (b *bloom) Open() (sqlite3.VTabCursor, error) {
220+
return &cursor{bloom: b}, nil
221+
}
222+
223+
type cursor struct {
224+
*bloom
225+
eof bool
226+
arg *sqlite3.Value
227+
}
228+
229+
func (c *cursor) Filter(idxNum int, idxStr string, arg ...sqlite3.Value) error {
230+
if len(arg) != 1 {
231+
return nil
232+
}
233+
234+
c.eof = false
235+
c.arg = &arg[0]
236+
blob := arg[0].RawBlob()
237+
238+
f, err := c.db.OpenBlob(c.schema, c.storage, "data", 1, false)
239+
if err != nil {
240+
return err
241+
}
242+
defer f.Close()
243+
244+
for n := 0; n < c.hashes; n++ {
245+
hash := calcHash(n, blob)
246+
hash %= uint64(c.nfilter * 8)
247+
bitpos := byte(hash % 8)
248+
bytepos := int64(hash / 8)
249+
250+
var buf [1]byte
251+
_, err = f.Seek(bytepos, io.SeekStart)
252+
if err != nil {
253+
return err
254+
}
255+
_, err = f.Read(buf[:])
256+
if err != nil {
257+
return err
258+
}
259+
260+
c.eof = (buf[0] & (1 << bitpos)) == 0
261+
if c.eof {
262+
break
263+
}
264+
}
265+
return nil
266+
}
267+
268+
func (c *cursor) Column(ctx *sqlite3.Context, n int) error {
269+
switch n {
270+
case 0:
271+
ctx.ResultBool(true)
272+
case 1:
273+
ctx.ResultValue(*c.arg)
274+
default:
275+
panic(util.AssertErr())
276+
}
277+
return nil
278+
}
279+
280+
func (c *cursor) Next() error {
281+
c.eof = true
282+
return nil
283+
}
284+
285+
func (c *cursor) EOF() bool {
286+
return c.eof
287+
}
288+
289+
func (c *cursor) RowID() (int64, error) {
290+
return 0, nil
291+
}
292+
293+
func calcHash(k int, b []byte) uint64 {
294+
return siphash.Hash(^uint64(k), uint64(k), b)
295+
}
296+
297+
func computeLength(n int, p float64) int64 {
298+
bits := math.Ceil(-((float64(n) * math.Log(p)) / (math.Ln2 * math.Ln2)))
299+
return (int64(bits) + 7) / 8
300+
}

0 commit comments

Comments
 (0)