Skip to content

Commit a0ab686

Browse files
committed
TxnDatastore namespace and keytransform wrappers
1 parent 1caa244 commit a0ab686

File tree

2 files changed

+279
-0
lines changed

2 files changed

+279
-0
lines changed

keytransform/txndatastore.go

Lines changed: 271 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,271 @@
1+
package keytransform
2+
3+
import (
4+
"context"
5+
6+
ds "github.com/ipfs/go-datastore"
7+
dsq "github.com/ipfs/go-datastore/query"
8+
)
9+
10+
// WrapTxnDatastore wraps a given datastore with a KeyTransform function.
11+
// The resulting wrapped datastore will use the transform on all TxnDatastore
12+
// operations.
13+
func WrapTxnDatastore(child ds.TxnDatastore, t KeyTransform) *TxnDatastore {
14+
if t == nil {
15+
panic("t (KeyTransform) is nil")
16+
}
17+
18+
if child == nil {
19+
panic("child (ds.TxnDatastore) is nil")
20+
}
21+
22+
return &TxnDatastore{child: child, KeyTransform: t}
23+
}
24+
25+
// TxnDatastore keeps a KeyTransform function
26+
type TxnDatastore struct {
27+
child ds.TxnDatastore
28+
29+
KeyTransform
30+
}
31+
32+
var _ ds.Datastore = (*TxnDatastore)(nil)
33+
var _ ds.Batching = (*TxnDatastore)(nil)
34+
var _ ds.Shim = (*TxnDatastore)(nil)
35+
var _ ds.PersistentDatastore = (*TxnDatastore)(nil)
36+
var _ ds.CheckedDatastore = (*TxnDatastore)(nil)
37+
var _ ds.ScrubbedDatastore = (*TxnDatastore)(nil)
38+
var _ ds.GCDatastore = (*TxnDatastore)(nil)
39+
var _ ds.TxnDatastore = (*TxnDatastore)(nil)
40+
41+
// Children implements ds.Shim
42+
func (d *TxnDatastore) Children() []ds.Datastore {
43+
return []ds.Datastore{d.child}
44+
}
45+
46+
// Put stores the given value, transforming the key first.
47+
func (d *TxnDatastore) Put(ctx context.Context, key ds.Key, value []byte) (err error) {
48+
return d.child.Put(ctx, d.ConvertKey(key), value)
49+
}
50+
51+
// Sync implements Datastore.Sync
52+
func (d *TxnDatastore) Sync(ctx context.Context, prefix ds.Key) error {
53+
return d.child.Sync(ctx, d.ConvertKey(prefix))
54+
}
55+
56+
// Get returns the value for given key, transforming the key first.
57+
func (d *TxnDatastore) Get(ctx context.Context, key ds.Key) (value []byte, err error) {
58+
return d.child.Get(ctx, d.ConvertKey(key))
59+
}
60+
61+
// Has returns whether the datastore has a value for a given key, transforming
62+
// the key first.
63+
func (d *TxnDatastore) Has(ctx context.Context, key ds.Key) (exists bool, err error) {
64+
return d.child.Has(ctx, d.ConvertKey(key))
65+
}
66+
67+
// GetSize returns the size of the value named by the given key, transforming
68+
// the key first.
69+
func (d *TxnDatastore) GetSize(ctx context.Context, key ds.Key) (size int, err error) {
70+
return d.child.GetSize(ctx, d.ConvertKey(key))
71+
}
72+
73+
// Delete removes the value for given key
74+
func (d *TxnDatastore) Delete(ctx context.Context, key ds.Key) (err error) {
75+
return d.child.Delete(ctx, d.ConvertKey(key))
76+
}
77+
78+
// Query implements Query, inverting keys on the way back out.
79+
func (d *TxnDatastore) Query(ctx context.Context, q dsq.Query) (dsq.Results, error) {
80+
nq, cq := d.prepareQuery(q)
81+
82+
cqr, err := d.child.Query(ctx, cq)
83+
if err != nil {
84+
return nil, err
85+
}
86+
87+
qr := dsq.ResultsFromIterator(q, dsq.Iterator{
88+
Next: func() (dsq.Result, bool) {
89+
r, ok := cqr.NextSync()
90+
if !ok {
91+
return r, false
92+
}
93+
if r.Error == nil {
94+
r.Entry.Key = d.InvertKey(ds.RawKey(r.Entry.Key)).String()
95+
}
96+
return r, true
97+
},
98+
Close: func() error {
99+
return cqr.Close()
100+
},
101+
})
102+
return dsq.NaiveQueryApply(nq, qr), nil
103+
}
104+
105+
// Split the query into a child query and a naive query. That way, we can make
106+
// the child datastore do as much work as possible.
107+
func (d *TxnDatastore) prepareQuery(q dsq.Query) (naive, child dsq.Query) {
108+
109+
// First, put everything in the child query. Then, start taking things
110+
// out.
111+
child = q
112+
113+
// Always let the child handle the key prefix.
114+
child.Prefix = d.ConvertKey(ds.NewKey(child.Prefix)).String()
115+
116+
// Check if the key transform is order-preserving so we can use the
117+
// child datastore's built-in ordering.
118+
orderPreserving := false
119+
switch d.KeyTransform.(type) {
120+
case PrefixTransform, *PrefixTransform:
121+
orderPreserving = true
122+
}
123+
124+
// Try to let the child handle ordering.
125+
orders:
126+
for i, o := range child.Orders {
127+
switch o.(type) {
128+
case dsq.OrderByValue, *dsq.OrderByValue,
129+
dsq.OrderByValueDescending, *dsq.OrderByValueDescending:
130+
// Key doesn't matter.
131+
continue
132+
case dsq.OrderByKey, *dsq.OrderByKey,
133+
dsq.OrderByKeyDescending, *dsq.OrderByKeyDescending:
134+
// if the key transform preserves order, we can delegate
135+
// to the child datastore.
136+
if orderPreserving {
137+
// When sorting, we compare with the first
138+
// Order, then, if equal, we compare with the
139+
// second Order, etc. However, keys are _unique_
140+
// so we'll never apply any additional orders
141+
// after ordering by key.
142+
child.Orders = child.Orders[:i+1]
143+
break orders
144+
}
145+
}
146+
147+
// Can't handle this order under transform, punt it to a naive
148+
// ordering.
149+
naive.Orders = q.Orders
150+
child.Orders = nil
151+
naive.Offset = q.Offset
152+
child.Offset = 0
153+
naive.Limit = q.Limit
154+
child.Limit = 0
155+
break
156+
}
157+
158+
// Try to let the child handle the filters.
159+
160+
// don't modify the original filters.
161+
child.Filters = append([]dsq.Filter(nil), child.Filters...)
162+
163+
for i, f := range child.Filters {
164+
switch f := f.(type) {
165+
case dsq.FilterValueCompare, *dsq.FilterValueCompare:
166+
continue
167+
case dsq.FilterKeyCompare:
168+
child.Filters[i] = dsq.FilterKeyCompare{
169+
Op: f.Op,
170+
Key: d.ConvertKey(ds.NewKey(f.Key)).String(),
171+
}
172+
continue
173+
case *dsq.FilterKeyCompare:
174+
child.Filters[i] = &dsq.FilterKeyCompare{
175+
Op: f.Op,
176+
Key: d.ConvertKey(ds.NewKey(f.Key)).String(),
177+
}
178+
continue
179+
case dsq.FilterKeyPrefix:
180+
child.Filters[i] = dsq.FilterKeyPrefix{
181+
Prefix: d.ConvertKey(ds.NewKey(f.Prefix)).String(),
182+
}
183+
continue
184+
case *dsq.FilterKeyPrefix:
185+
child.Filters[i] = &dsq.FilterKeyPrefix{
186+
Prefix: d.ConvertKey(ds.NewKey(f.Prefix)).String(),
187+
}
188+
continue
189+
}
190+
191+
// Not a known filter, defer to the naive implementation.
192+
naive.Filters = q.Filters
193+
child.Filters = nil
194+
naive.Offset = q.Offset
195+
child.Offset = 0
196+
naive.Limit = q.Limit
197+
child.Limit = 0
198+
break
199+
}
200+
return
201+
}
202+
203+
func (d *TxnDatastore) Close() error {
204+
return d.child.Close()
205+
}
206+
207+
// DiskUsage implements the PersistentTxnDatastore interface.
208+
func (d *TxnDatastore) DiskUsage(ctx context.Context) (uint64, error) {
209+
return ds.DiskUsage(ctx, d.child)
210+
}
211+
212+
func (d *TxnDatastore) Batch(ctx context.Context) (ds.Batch, error) {
213+
bds, ok := d.child.(ds.Batching)
214+
if !ok {
215+
return nil, ds.ErrBatchUnsupported
216+
}
217+
218+
childbatch, err := bds.Batch(ctx)
219+
if err != nil {
220+
return nil, err
221+
}
222+
return &transformBatch{
223+
dst: childbatch,
224+
f: d.ConvertKey,
225+
}, nil
226+
}
227+
228+
type transformBatch struct {
229+
dst ds.Batch
230+
231+
f KeyMapping
232+
}
233+
234+
var _ ds.Batch = (*transformBatch)(nil)
235+
236+
func (t *transformBatch) Put(ctx context.Context, key ds.Key, val []byte) error {
237+
return t.dst.Put(ctx, t.f(key), val)
238+
}
239+
240+
func (t *transformBatch) Delete(ctx context.Context, key ds.Key) error {
241+
return t.dst.Delete(ctx, t.f(key))
242+
}
243+
244+
func (t *transformBatch) Commit(ctx context.Context) error {
245+
return t.dst.Commit(ctx)
246+
}
247+
248+
func (d *TxnDatastore) Check(ctx context.Context) error {
249+
if c, ok := d.child.(ds.CheckedDatastore); ok {
250+
return c.Check(ctx)
251+
}
252+
return nil
253+
}
254+
255+
func (d *TxnDatastore) Scrub(ctx context.Context) error {
256+
if c, ok := d.child.(ds.ScrubbedDatastore); ok {
257+
return c.Scrub(ctx)
258+
}
259+
return nil
260+
}
261+
262+
func (d *TxnDatastore) CollectGarbage(ctx context.Context) error {
263+
if c, ok := d.child.(ds.GCDatastore); ok {
264+
return c.CollectGarbage(ctx)
265+
}
266+
return nil
267+
}
268+
269+
func (d *TxnDatastore) NewTransaction(ctx context.Context, readOnly bool) (ds.Txn, error) {
270+
panic("implement me")
271+
}

namespace/namespace.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,3 +24,11 @@ func Wrap(child ds.Datastore, prefix ds.Key) *ktds.Datastore {
2424

2525
return ktds.Wrap(child, PrefixTransform(prefix))
2626
}
27+
28+
func WrapTxnDatastore(child ds.TxnDatastore, prefix ds.Key) *ktds.TxnDatastore {
29+
if child == nil {
30+
panic("child (ds.TxnDatastore) is nil")
31+
}
32+
33+
return ktds.WrapTxnDatastore(child, PrefixTransform(prefix))
34+
}

0 commit comments

Comments
 (0)