Skip to content

Commit 13c8c18

Browse files
authored
Merge pull request #17 from raulk/txn
Add transactional support to leveldb datastore.
2 parents b71f76e + bdf88d6 commit 13c8c18

File tree

4 files changed

+165
-28
lines changed

4 files changed

+165
-28
lines changed

.gx/lastpubver

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
1.1.2: QmPnXsHj9W8WpDDwj2iogRcnVL6d5ANtK9SAJLgKpeBMq8
1+
1.2.0: QmcxDvw8NnJsfdEcfrypwHkLeVxZY2rT8iiWsUuBnw93gb

datastore.go

Lines changed: 64 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,21 @@ import (
99
"github.com/jbenet/goprocess"
1010
"github.com/syndtr/goleveldb/leveldb"
1111
"github.com/syndtr/goleveldb/leveldb/errors"
12+
"github.com/syndtr/goleveldb/leveldb/iterator"
1213
"github.com/syndtr/goleveldb/leveldb/opt"
1314
"github.com/syndtr/goleveldb/leveldb/storage"
1415
"github.com/syndtr/goleveldb/leveldb/util"
1516
)
1617

1718
type datastore struct {
19+
*accessor
1820
DB *leveldb.DB
1921
path string
2022
}
2123

24+
var _ ds.Datastore = (*datastore)(nil)
25+
var _ ds.TxnDatastore = (*datastore)(nil)
26+
2227
// Options is an alias of syndtr/goleveldb/opt.Options which might be extended
2328
// in the future.
2429
type Options opt.Options
@@ -49,21 +54,34 @@ func NewDatastore(path string, opts *Options) (*datastore, error) {
4954
}
5055

5156
return &datastore{
52-
DB: db,
53-
path: path,
57+
accessor: &accessor{ldb: db},
58+
DB: db,
59+
path: path,
5460
}, nil
5561
}
5662

57-
// Returns ErrInvalidType if value is not of type []byte.
63+
// An extraction of the common interface between LevelDB Transactions and the DB itself.
5864
//
59-
// Note: using sync = false.
60-
// see http://godoc.org/github.com/syndtr/goleveldb/leveldb/opt#WriteOptions
61-
func (d *datastore) Put(key ds.Key, value []byte) (err error) {
62-
return d.DB.Put(key.Bytes(), value, nil)
65+
// It allows to plug in either inside the `accessor`.
66+
type levelDbOps interface {
67+
Put(key, value []byte, wo *opt.WriteOptions) error
68+
Get(key []byte, ro *opt.ReadOptions) (value []byte, err error)
69+
Has(key []byte, ro *opt.ReadOptions) (ret bool, err error)
70+
Delete(key []byte, wo *opt.WriteOptions) error
71+
NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator
72+
}
73+
74+
// Datastore operations using either the DB or a transaction as the backend.
75+
type accessor struct {
76+
ldb levelDbOps
77+
}
78+
79+
func (a *accessor) Put(key ds.Key, value []byte) (err error) {
80+
return a.ldb.Put(key.Bytes(), value, nil)
6381
}
6482

65-
func (d *datastore) Get(key ds.Key) (value []byte, err error) {
66-
val, err := d.DB.Get(key.Bytes(), nil)
83+
func (a *accessor) Get(key ds.Key) (value []byte, err error) {
84+
val, err := a.ldb.Get(key.Bytes(), nil)
6785
if err != nil {
6886
if err == leveldb.ErrNotFound {
6987
return nil, ds.ErrNotFound
@@ -73,40 +91,40 @@ func (d *datastore) Get(key ds.Key) (value []byte, err error) {
7391
return val, nil
7492
}
7593

76-
func (d *datastore) Has(key ds.Key) (exists bool, err error) {
77-
return d.DB.Has(key.Bytes(), nil)
94+
func (a *accessor) Has(key ds.Key) (exists bool, err error) {
95+
return a.ldb.Has(key.Bytes(), nil)
7896
}
7997

80-
func (d *datastore) Delete(key ds.Key) (err error) {
98+
func (a *accessor) Delete(key ds.Key) (err error) {
8199
// leveldb Delete will not return an error if the key doesn't
82100
// exist (see https://github.com/syndtr/goleveldb/issues/109),
83101
// so check that the key exists first and if not return an
84102
// error
85-
exists, err := d.DB.Has(key.Bytes(), nil)
103+
exists, err := a.ldb.Has(key.Bytes(), nil)
86104
if !exists {
87105
return ds.ErrNotFound
88106
} else if err != nil {
89107
return err
90108
}
91-
return d.DB.Delete(key.Bytes(), nil)
109+
return a.ldb.Delete(key.Bytes(), nil)
92110
}
93111

94-
func (d *datastore) Query(q dsq.Query) (dsq.Results, error) {
95-
return d.QueryNew(q)
112+
func (a *accessor) Query(q dsq.Query) (dsq.Results, error) {
113+
return a.queryNew(q)
96114
}
97115

98-
func (d *datastore) QueryNew(q dsq.Query) (dsq.Results, error) {
116+
func (a *accessor) queryNew(q dsq.Query) (dsq.Results, error) {
99117
if len(q.Filters) > 0 ||
100118
len(q.Orders) > 0 ||
101119
q.Limit > 0 ||
102120
q.Offset > 0 {
103-
return d.QueryOrig(q)
121+
return a.queryOrig(q)
104122
}
105123
var rnge *util.Range
106124
if q.Prefix != "" {
107125
rnge = util.BytesPrefix([]byte(q.Prefix))
108126
}
109-
i := d.DB.NewIterator(rnge, nil)
127+
i := a.ldb.NewIterator(rnge, nil)
110128
return dsq.ResultsFromIterator(q, dsq.Iterator{
111129
Next: func() (dsq.Result, bool) {
112130
ok := i.Next()
@@ -130,7 +148,7 @@ func (d *datastore) QueryNew(q dsq.Query) (dsq.Results, error) {
130148
}), nil
131149
}
132150

133-
func (d *datastore) QueryOrig(q dsq.Query) (dsq.Results, error) {
151+
func (a *accessor) queryOrig(q dsq.Query) (dsq.Results, error) {
134152
// we can use multiple iterators concurrently. see:
135153
// https://godoc.org/github.com/syndtr/goleveldb/leveldb#DB.NewIterator
136154
// advance the iterator only if the reader reads
@@ -140,7 +158,7 @@ func (d *datastore) QueryOrig(q dsq.Query) (dsq.Results, error) {
140158
// that resources should be reclaimed.
141159
qrb := dsq.NewResultBuilder(q)
142160
qrb.Process.Go(func(worker goprocess.Process) {
143-
d.runQuery(worker, qrb)
161+
a.runQuery(worker, qrb)
144162
})
145163

146164
// go wait on the worker (without signaling close)
@@ -157,13 +175,12 @@ func (d *datastore) QueryOrig(q dsq.Query) (dsq.Results, error) {
157175
return qr, nil
158176
}
159177

160-
func (d *datastore) runQuery(worker goprocess.Process, qrb *dsq.ResultBuilder) {
161-
178+
func (a *accessor) runQuery(worker goprocess.Process, qrb *dsq.ResultBuilder) {
162179
var rnge *util.Range
163180
if qrb.Query.Prefix != "" {
164181
rnge = util.BytesPrefix([]byte(qrb.Query.Prefix))
165182
}
166-
i := d.DB.NewIterator(rnge, nil)
183+
i := a.ldb.NewIterator(rnge, nil)
167184
defer i.Release()
168185

169186
// advance iterator for offset
@@ -261,3 +278,26 @@ func (b *leveldbBatch) Delete(key ds.Key) error {
261278
b.b.Delete(key.Bytes())
262279
return nil
263280
}
281+
282+
// A leveldb transaction embedding the accessor backed by the transaction.
283+
type transaction struct {
284+
*accessor
285+
tx *leveldb.Transaction
286+
}
287+
288+
func (t *transaction) Commit() error {
289+
return t.tx.Commit()
290+
}
291+
292+
func (t *transaction) Discard() {
293+
t.tx.Discard()
294+
}
295+
296+
func (d *datastore) NewTransaction(readOnly bool) (ds.Txn, error) {
297+
tx, err := d.DB.OpenTransaction()
298+
if err != nil {
299+
return nil, err
300+
}
301+
accessor := &accessor{tx}
302+
return &transaction{accessor, tx}, nil
303+
}

ds_test.go

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package leveldb
22

33
import (
4+
"bytes"
5+
"fmt"
46
"io/ioutil"
57
"os"
68
"testing"
@@ -223,3 +225,98 @@ func TestDiskUsageInMem(t *testing.T) {
223225
t.Fatal("inmem dbs have 0 disk usage")
224226
}
225227
}
228+
229+
func TestTransactionCommit(t *testing.T) {
230+
key := ds.NewKey("/test/key1")
231+
232+
d, done := newDS(t)
233+
defer done()
234+
235+
txn, err := d.NewTransaction(false)
236+
if err != nil {
237+
t.Fatal(err)
238+
}
239+
defer txn.Discard()
240+
241+
if err := txn.Put(key, []byte("hello")); err != nil {
242+
t.Fatal(err)
243+
}
244+
if val, err := d.Get(key); err != ds.ErrNotFound {
245+
t.Fatalf("expected ErrNotFound, got err: %v, value: %v", err, val)
246+
}
247+
if err := txn.Commit(); err != nil {
248+
t.Fatal(err)
249+
}
250+
if val, err := d.Get(key); err != nil || !bytes.Equal(val, []byte("hello")) {
251+
t.Fatalf("expected entry present after commit, got err: %v, value: %v", err, val)
252+
}
253+
}
254+
255+
func TestTransactionDiscard(t *testing.T) {
256+
key := ds.NewKey("/test/key1")
257+
258+
d, done := newDS(t)
259+
defer done()
260+
261+
txn, err := d.NewTransaction(false)
262+
if err != nil {
263+
t.Fatal(err)
264+
}
265+
defer txn.Discard()
266+
267+
if err := txn.Put(key, []byte("hello")); err != nil {
268+
t.Fatal(err)
269+
}
270+
if val, err := d.Get(key); err != ds.ErrNotFound {
271+
t.Fatalf("expected ErrNotFound, got err: %v, value: %v", err, val)
272+
}
273+
if txn.Discard(); err != nil {
274+
t.Fatal(err)
275+
}
276+
if val, err := d.Get(key); err != ds.ErrNotFound {
277+
t.Fatalf("expected ErrNotFound, got err: %v, value: %v", err, val)
278+
}
279+
}
280+
281+
func TestTransactionManyOperations(t *testing.T) {
282+
keys := []ds.Key{ds.NewKey("/test/key1"), ds.NewKey("/test/key2"), ds.NewKey("/test/key3"), ds.NewKey("/test/key4"), ds.NewKey("/test/key5")}
283+
284+
d, done := newDS(t)
285+
defer done()
286+
287+
txn, err := d.NewTransaction(false)
288+
if err != nil {
289+
t.Fatal(err)
290+
}
291+
defer txn.Discard()
292+
293+
// Insert all entries.
294+
for i := 0; i < 5; i++ {
295+
if err := txn.Put(keys[i], []byte(fmt.Sprintf("hello%d", i))); err != nil {
296+
t.Fatal(err)
297+
}
298+
}
299+
300+
// Remove the third entry.
301+
if err := txn.Delete(keys[2]); err != nil {
302+
t.Fatal(err)
303+
}
304+
305+
// Check existences.
306+
if has, err := txn.Has(keys[1]); err != nil || !has {
307+
t.Fatalf("expected key[1] to be present, err: %v, has: %v", err, has)
308+
}
309+
if has, err := txn.Has(keys[2]); err != nil || has {
310+
t.Fatalf("expected key[2] to be absent, err: %v, has: %v", err, has)
311+
}
312+
313+
var res dsq.Results
314+
if res, err = txn.Query(dsq.Query{Prefix: "/test"}); err != nil {
315+
t.Fatalf("query failed, err: %v", err)
316+
}
317+
if entries, err := res.Rest(); err != nil || len(entries) != 4 {
318+
t.Fatalf("query failed or contained unexpected number of entries, err: %v, results: %v", err, entries)
319+
}
320+
321+
txn.Discard()
322+
}

package.json

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,16 @@
2121
},
2222
{
2323
"author": "jbenet",
24-
"hash": "QmUyz7JTJzgegC6tiJrfby3mPhzcdswVtG4x58TQ6pq8jV",
24+
"hash": "QmbQshXLNcCPRUGZv4sBGxnZNAHREA6MKeomkwihNXPZWP",
2525
"name": "go-datastore",
26-
"version": "3.2.0"
26+
"version": "3.3.0"
2727
}
2828
],
2929
"gxVersion": "0.8.0",
3030
"language": "go",
3131
"license": "",
3232
"name": "go-ds-leveldb",
3333
"releaseCmd": "git commit -a -m \"gx publish $VERSION\"",
34-
"version": "1.1.2"
34+
"version": "1.2.0"
3535
}
3636

0 commit comments

Comments
 (0)