Skip to content

Commit 5bb1488

Browse files
committed
key transform wrapper for ds.Txn
1 parent a0ab686 commit 5bb1488

File tree

1 file changed

+165
-21
lines changed

1 file changed

+165
-21
lines changed

keytransform/txndatastore.go

Lines changed: 165 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -225,26 +225,6 @@ func (d *TxnDatastore) Batch(ctx context.Context) (ds.Batch, error) {
225225
}, nil
226226
}
227227

228-
type transformBatch struct {
229-
dst ds.Batch
230-
231-
f KeyMapping
232-
}
233-
234-
var _ ds.Batch = (*transformBatch)(nil)
235-
236-
func (t *transformBatch) Put(ctx context.Context, key ds.Key, val []byte) error {
237-
return t.dst.Put(ctx, t.f(key), val)
238-
}
239-
240-
func (t *transformBatch) Delete(ctx context.Context, key ds.Key) error {
241-
return t.dst.Delete(ctx, t.f(key))
242-
}
243-
244-
func (t *transformBatch) Commit(ctx context.Context) error {
245-
return t.dst.Commit(ctx)
246-
}
247-
248228
func (d *TxnDatastore) Check(ctx context.Context) error {
249229
if c, ok := d.child.(ds.CheckedDatastore); ok {
250230
return c.Check(ctx)
@@ -267,5 +247,169 @@ func (d *TxnDatastore) CollectGarbage(ctx context.Context) error {
267247
}
268248

269249
func (d *TxnDatastore) NewTransaction(ctx context.Context, readOnly bool) (ds.Txn, error) {
270-
panic("implement me")
250+
childTxn, err := d.child.NewTransaction(ctx, readOnly)
251+
if err != nil {
252+
return nil, err
253+
}
254+
return &txnWrapper{child: childTxn, KeyTransform: d.KeyTransform}, nil
255+
}
256+
257+
type txnWrapper struct {
258+
child ds.Txn
259+
260+
KeyTransform
261+
}
262+
263+
var _ ds.Txn = (*txnWrapper)(nil)
264+
265+
func (t *txnWrapper) Get(ctx context.Context, key ds.Key) (value []byte, err error) {
266+
return t.child.Get(ctx, t.ConvertKey(key))
267+
}
268+
269+
func (t *txnWrapper) Has(ctx context.Context, key ds.Key) (exists bool, err error) {
270+
return t.child.Has(ctx, t.ConvertKey(key))
271+
}
272+
273+
func (t *txnWrapper) GetSize(ctx context.Context, key ds.Key) (size int, err error) {
274+
return t.child.GetSize(ctx, t.ConvertKey(key))
275+
}
276+
277+
func (t *txnWrapper) Query(ctx context.Context, q dsq.Query) (dsq.Results, error) {
278+
nq, cq := t.prepareQuery(q)
279+
280+
cqr, err := t.child.Query(ctx, cq)
281+
if err != nil {
282+
return nil, err
283+
}
284+
285+
qr := dsq.ResultsFromIterator(q, dsq.Iterator{
286+
Next: func() (dsq.Result, bool) {
287+
r, ok := cqr.NextSync()
288+
if !ok {
289+
return r, false
290+
}
291+
if r.Error == nil {
292+
r.Entry.Key = t.InvertKey(ds.RawKey(r.Entry.Key)).String()
293+
}
294+
return r, true
295+
},
296+
Close: func() error {
297+
return cqr.Close()
298+
},
299+
})
300+
return dsq.NaiveQueryApply(nq, qr), nil
301+
}
302+
303+
// Split the query into a child query and a naive query. That way, we can make
304+
// the child datastore do as much work as possible.
305+
func (t *txnWrapper) prepareQuery(q dsq.Query) (naive, child dsq.Query) {
306+
307+
// First, put everything in the child query. Then, start taking things
308+
// out.
309+
child = q
310+
311+
// Always let the child handle the key prefix.
312+
child.Prefix = t.ConvertKey(ds.NewKey(child.Prefix)).String()
313+
314+
// Check if the key transform is order-preserving so we can use the
315+
// child datastore's built-in ordering.
316+
orderPreserving := false
317+
switch t.KeyTransform.(type) {
318+
case PrefixTransform, *PrefixTransform:
319+
orderPreserving = true
320+
}
321+
322+
// Try to let the child handle ordering.
323+
orders:
324+
for i, o := range child.Orders {
325+
switch o.(type) {
326+
case dsq.OrderByValue, *dsq.OrderByValue,
327+
dsq.OrderByValueDescending, *dsq.OrderByValueDescending:
328+
// Key doesn't matter.
329+
continue
330+
case dsq.OrderByKey, *dsq.OrderByKey,
331+
dsq.OrderByKeyDescending, *dsq.OrderByKeyDescending:
332+
// if the key transform preserves order, we can delegate
333+
// to the child datastore.
334+
if orderPreserving {
335+
// When sorting, we compare with the first
336+
// Order, then, if equal, we compare with the
337+
// second Order, etc. However, keys are _unique_
338+
// so we'll never apply any additional orders
339+
// after ordering by key.
340+
child.Orders = child.Orders[:i+1]
341+
break orders
342+
}
343+
}
344+
345+
// Can't handle this order under transform, punt it to a naive
346+
// ordering.
347+
naive.Orders = q.Orders
348+
child.Orders = nil
349+
naive.Offset = q.Offset
350+
child.Offset = 0
351+
naive.Limit = q.Limit
352+
child.Limit = 0
353+
break
354+
}
355+
356+
// Try to let the child handle the filters.
357+
358+
// don't modify the original filters.
359+
child.Filters = append([]dsq.Filter(nil), child.Filters...)
360+
361+
for i, f := range child.Filters {
362+
switch f := f.(type) {
363+
case dsq.FilterValueCompare, *dsq.FilterValueCompare:
364+
continue
365+
case dsq.FilterKeyCompare:
366+
child.Filters[i] = dsq.FilterKeyCompare{
367+
Op: f.Op,
368+
Key: t.ConvertKey(ds.NewKey(f.Key)).String(),
369+
}
370+
continue
371+
case *dsq.FilterKeyCompare:
372+
child.Filters[i] = &dsq.FilterKeyCompare{
373+
Op: f.Op,
374+
Key: t.ConvertKey(ds.NewKey(f.Key)).String(),
375+
}
376+
continue
377+
case dsq.FilterKeyPrefix:
378+
child.Filters[i] = dsq.FilterKeyPrefix{
379+
Prefix: t.ConvertKey(ds.NewKey(f.Prefix)).String(),
380+
}
381+
continue
382+
case *dsq.FilterKeyPrefix:
383+
child.Filters[i] = &dsq.FilterKeyPrefix{
384+
Prefix: t.ConvertKey(ds.NewKey(f.Prefix)).String(),
385+
}
386+
continue
387+
}
388+
389+
// Not a known filter, defer to the naive implementation.
390+
naive.Filters = q.Filters
391+
child.Filters = nil
392+
naive.Offset = q.Offset
393+
child.Offset = 0
394+
naive.Limit = q.Limit
395+
child.Limit = 0
396+
break
397+
}
398+
return
399+
}
400+
401+
func (t txnWrapper) Put(ctx context.Context, key ds.Key, value []byte) error {
402+
return t.child.Put(ctx, t.ConvertKey(key), value)
403+
}
404+
405+
func (t txnWrapper) Delete(ctx context.Context, key ds.Key) error {
406+
return t.child.Delete(ctx, t.ConvertKey(key))
407+
}
408+
409+
func (t txnWrapper) Commit(ctx context.Context) error {
410+
return t.child.Commit(ctx)
411+
}
412+
413+
func (t txnWrapper) Discard(ctx context.Context) {
414+
t.child.Discard(ctx)
271415
}

0 commit comments

Comments
 (0)