Skip to content

Commit 2b236db

Browse files
committed
interface(Datastore): added Sync function to the Datastore, along with implementations in helper Datastores
1 parent 00a16ea commit 2b236db

File tree

12 files changed

+316
-0
lines changed

12 files changed

+316
-0
lines changed

autobatch/autobatch.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,35 @@ func (d *Datastore) Put(k ds.Key, val []byte) error {
6464
return nil
6565
}
6666

67+
// Sync flushes all operations on keys at or under the prefix
68+
// from the current batch to the underlying datastore
69+
func (d *Datastore) Sync(prefix ds.Key) error {
70+
b, err := d.child.Batch()
71+
if err != nil {
72+
return err
73+
}
74+
75+
for k, o := range d.buffer {
76+
if !(k.Equal(prefix) || k.IsDescendantOf(prefix)) {
77+
continue
78+
}
79+
80+
var err error
81+
if o.delete {
82+
err = b.Delete(k)
83+
} else {
84+
err = b.Put(k, o.value)
85+
}
86+
if err != nil {
87+
return err
88+
}
89+
90+
delete(d.buffer, k)
91+
}
92+
93+
return b.Commit()
94+
}
95+
6796
// Flush flushes the current batch to the underlying datastore.
6897
func (d *Datastore) Flush() error {
6998
b, err := d.child.Batch()

autobatch/autobatch_test.go

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,3 +103,149 @@ func TestFlushing(t *testing.T) {
103103
t.Fatal("wrong value")
104104
}
105105
}
106+
107+
func TestSync(t *testing.T) {
108+
child := ds.NewMapDatastore()
109+
d := NewAutoBatching(child, 100)
110+
111+
put := func(key ds.Key) {
112+
if err := d.Put(key, []byte(key.String())); err != nil {
113+
t.Fatal(err)
114+
}
115+
}
116+
del := func(key ds.Key) {
117+
if err := d.Delete(key); err != nil {
118+
t.Fatal(err)
119+
}
120+
}
121+
122+
get := func(d ds.Datastore, key ds.Key) {
123+
val, err := d.Get(key)
124+
if err != nil {
125+
t.Fatal(err)
126+
}
127+
128+
if !bytes.Equal(val, []byte(key.String())) {
129+
t.Fatal("wrong value")
130+
}
131+
}
132+
invalidGet := func(d ds.Datastore, key ds.Key) {
133+
if _, err := d.Get(key); err != ds.ErrNotFound {
134+
t.Fatal("should not have found value")
135+
}
136+
}
137+
138+
// Test if Syncing Puts works
139+
internalSyncTest(t, d, child, put, del, get, invalidGet)
140+
141+
// Test if Syncing Deletes works
142+
internalSyncTest(t, d, child, del, put, invalidGet, get)
143+
}
144+
145+
// This function can be used to test Sync Puts and Deletes
146+
// For clarity comments are written as if op = Put and undoOp = Delete
147+
func internalSyncTest(t *testing.T, d, child ds.Datastore, op, undoOp func(ds.Key),
148+
checkOp, checkUndoOp func(ds.Datastore, ds.Key)) {
149+
var keys []ds.Key
150+
keymap := make(map[ds.Key]int)
151+
for i := 0; i < 4; i++ {
152+
k := ds.NewKey(fmt.Sprintf("%d", i))
153+
keymap[k] = len(keys)
154+
keys = append(keys, k)
155+
for j := 0; j < 2; j++ {
156+
k := ds.NewKey(fmt.Sprintf("%d/%d", i, j))
157+
keymap[k] = len(keys)
158+
keys = append(keys, k)
159+
for k := 0; k < 2; k++ {
160+
k := ds.NewKey(fmt.Sprintf("%d/%d/%d", i, j, k))
161+
keymap[k] = len(keys)
162+
keys = append(keys, k)
163+
}
164+
}
165+
}
166+
167+
for _, k := range keys {
168+
op(k)
169+
}
170+
171+
// Get works normally.
172+
for _, k := range keys {
173+
checkOp(d, k)
174+
}
175+
176+
// Put not flushed
177+
checkUndoOp(child, ds.NewKey("0"))
178+
179+
// Delete works.
180+
deletedKey := ds.NewKey("2/1/1")
181+
undoOp(deletedKey)
182+
checkUndoOp(d, deletedKey)
183+
184+
// Put still not flushed
185+
checkUndoOp(child, ds.NewKey("0"))
186+
187+
// Sync the tree "0/*/*"
188+
if err := d.Sync(ds.NewKey("0")); err != nil {
189+
t.Fatal(err)
190+
}
191+
192+
// Try to get keys "0/*/*" from the child datastore
193+
checkKeyRange(t, keymap, keys, d, [][]string{{"0", "0/1/1"}}, checkOp)
194+
195+
// Verify no other keys were synchronized
196+
checkKeyRange(t, keymap, keys, child, [][]string{{"1", "3/1/1"}}, checkUndoOp)
197+
198+
// Sync the tree "1/1/*"
199+
if err := d.Sync(ds.NewKey("1/1")); err != nil {
200+
t.Fatal(err)
201+
}
202+
203+
// Try to get keys "0/*/*" and "1/1/*" from the child datastore
204+
checkKeyRange(t, keymap, keys, d, [][]string{{"0", "0/1/1"}, {"1/1", "1/1/1"}}, checkOp)
205+
206+
// Verify no other keys were synchronized
207+
checkKeyRange(t, keymap, keys, child, [][]string{{"1", "1/0/1"}, {"2", "3/1/1"}}, checkUndoOp)
208+
209+
// Sync the tree "3/1/1"
210+
if err := d.Sync(ds.NewKey("3/1/1")); err != nil {
211+
t.Fatal(err)
212+
}
213+
214+
// Try to get keys "0/*/*", "1/1/*", "3/1/1" from the child datastore
215+
checkKeyRange(t, keymap, keys, d, [][]string{{"0", "0/1/1"}, {"1/1", "1/1/1"}, {"3/1/1", "3/1/1"}}, checkOp)
216+
217+
// Verify no other keys were synchronized
218+
checkKeyRange(t, keymap, keys, child, [][]string{{"1", "1/0/1"}, {"2", "3/1/0"}}, checkUndoOp)
219+
220+
if err := d.Sync(ds.Key{}); err != nil {
221+
t.Fatal(err)
222+
}
223+
224+
// Never flushed the deleted key.
225+
checkUndoOp(child, deletedKey)
226+
227+
// Try to get all keys except the deleted key from the child datastore
228+
checkKeyRange(t, keymap, keys, d, [][]string{{"0", "2/1/0"}, {"3", "3/1/1"}}, checkOp)
229+
230+
// Add the deleted key into the datastore
231+
op(deletedKey)
232+
233+
// Sync it
234+
if err := d.Sync(deletedKey); err != nil {
235+
t.Fatal(err)
236+
}
237+
238+
// Check it
239+
checkOp(d, deletedKey)
240+
}
241+
242+
func checkKeyRange(t *testing.T, keymap map[ds.Key]int, keys []ds.Key,
243+
d ds.Datastore, validKeyRanges [][]string, checkFn func(ds.Datastore, ds.Key)) {
244+
t.Helper()
245+
for _, validKeyBoundaries := range validKeyRanges {
246+
start, end := keymap[ds.NewKey(validKeyBoundaries[0])], keymap[ds.NewKey(validKeyBoundaries[1])]
247+
for _, k := range keys[start:end] {
248+
checkFn(d, k)
249+
}
250+
}
251+
}

basic_ds.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,11 @@ func (d *MapDatastore) Put(key Key, value []byte) (err error) {
2828
return nil
2929
}
3030

31+
// Sync implements Datastore.Sync
32+
func (d *MapDatastore) Sync(prefix Key) error {
33+
return nil
34+
}
35+
3136
// Get implements Datastore.Get
3237
func (d *MapDatastore) Get(key Key) (value []byte, err error) {
3338
val, found := d.values[key]
@@ -95,6 +100,11 @@ func (d *NullDatastore) Put(key Key, value []byte) (err error) {
95100
return nil
96101
}
97102

103+
// Sync implements Datastore.Sync
104+
func (d *NullDatastore) Sync(prefix Key) error {
105+
return nil
106+
}
107+
98108
// Get implements Datastore.Get
99109
func (d *NullDatastore) Get(key Key) (value []byte, err error) {
100110
return nil, ErrNotFound

datastore.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,13 @@ should be checked by callers.
3434
type Datastore interface {
3535
Read
3636
Write
37+
// Sync guarantees that any Put or Delete calls under prefix that returned
38+
// before Sync(prefix) was called will be observed after Sync(prefix)
39+
// returns, even if the program crashes. If Put/Delete operations already
40+
// satisfy these requirements then Sync may be a no-op.
41+
//
42+
// If the prefix fails to Sync this method returns an error.
43+
Sync(prefix Key) error
3744
io.Closer
3845
}
3946

delayed/delayed.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,12 @@ func (dds *Delayed) Put(key ds.Key, value []byte) (err error) {
3030
return dds.ds.Put(key, value)
3131
}
3232

33+
// Sync implements Datastore.Sync
34+
func (dds *Delayed) Sync(prefix ds.Key) error {
35+
dds.delay.Wait()
36+
return dds.ds.Sync(prefix)
37+
}
38+
3339
// Get implements the ds.Datastore interface.
3440
func (dds *Delayed) Get(key ds.Key) (value []byte, err error) {
3541
dds.delay.Wait()

examples/fs.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,12 @@ func (d *Datastore) Put(key ds.Key, value []byte) (err error) {
6363
return ioutil.WriteFile(fn, value, 0666)
6464
}
6565

66+
// Sync would ensure that any previous Puts under the prefix are written to disk.
67+
// However, they already are.
68+
func (d *Datastore) Sync(prefix ds.Key) error {
69+
return nil
70+
}
71+
6672
// Get returns the value for given key
6773
func (d *Datastore) Get(key ds.Key) (value []byte, err error) {
6874
fn := d.KeyFilename(key)

failstore/failstore.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,16 @@ func (d *Failstore) Put(k ds.Key, val []byte) error {
3636
return d.child.Put(k, val)
3737
}
3838

39+
// Sync implements Datastore.Sync
40+
func (d *Failstore) Sync(prefix ds.Key) error {
41+
err := d.errfunc("sync")
42+
if err != nil {
43+
return err
44+
}
45+
46+
return d.child.Sync(prefix)
47+
}
48+
3949
// Get retrieves a value from the datastore.
4050
func (d *Failstore) Get(k ds.Key) ([]byte, error) {
4151
err := d.errfunc("get")

keytransform/keytransform.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,11 @@ func (d *Datastore) Put(key ds.Key, value []byte) (err error) {
3737
return d.child.Put(d.ConvertKey(key), value)
3838
}
3939

40+
// Sync implements Datastore.Sync
41+
func (d *Datastore) Sync(prefix ds.Key) error {
42+
return d.child.Sync(d.ConvertKey(prefix))
43+
}
44+
4045
// Get returns the value for given key, transforming the key first.
4146
func (d *Datastore) Get(key ds.Key) (value []byte, err error) {
4247
return d.child.Get(d.ConvertKey(key))

mount/mount.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,24 @@ func (d *Datastore) Put(key ds.Key, value []byte) error {
189189
return cds.Put(k, value)
190190
}
191191

192+
// Sync implements Datastore.Sync
193+
func (d *Datastore) Sync(prefix ds.Key) error {
194+
// Sync all mount points below the prefix
195+
// Sync the mount point right at (or above) the prefix
196+
dstores, mountPts, rest := d.lookupAll(prefix)
197+
for i, suffix := range rest {
198+
if err := dstores[i].Sync(suffix); err != nil {
199+
return err
200+
}
201+
202+
if mountPts[i].Equal(prefix) || suffix.String() != "/" {
203+
return nil
204+
}
205+
}
206+
207+
return nil
208+
}
209+
192210
func (d *Datastore) Get(key ds.Key) (value []byte, err error) {
193211
cds, _, k := d.lookup(key)
194212
if cds == nil {

mount/mount_test.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"testing"
66

77
datastore "github.com/ipfs/go-datastore"
8+
autobatch "github.com/ipfs/go-datastore/autobatch"
89
mount "github.com/ipfs/go-datastore/mount"
910
query "github.com/ipfs/go-datastore/query"
1011
sync "github.com/ipfs/go-datastore/sync"
@@ -641,6 +642,70 @@ func TestLookupPrio(t *testing.T) {
641642
}
642643
}
643644

645+
func TestNestedMountSync(t *testing.T) {
646+
internalDSRoot := datastore.NewMapDatastore()
647+
internalDSFoo := datastore.NewMapDatastore()
648+
internalDSFooBar := datastore.NewMapDatastore()
649+
650+
m := mount.New([]mount.Mount{
651+
{Prefix: datastore.NewKey("/foo"), Datastore: autobatch.NewAutoBatching(internalDSFoo, 10)},
652+
{Prefix: datastore.NewKey("/foo/bar"), Datastore: autobatch.NewAutoBatching(internalDSFooBar, 10)},
653+
{Prefix: datastore.NewKey("/"), Datastore: autobatch.NewAutoBatching(internalDSRoot, 10)},
654+
})
655+
656+
// Testing scenarios
657+
// 1) Make sure child(ren) sync
658+
// 2) Make sure parent syncs
659+
// 3) Make sure parent only syncs the relevant subtree (instead of fully syncing)
660+
661+
addToDS := func(str string) {
662+
t.Helper()
663+
if err := m.Put(datastore.NewKey(str), []byte(str)); err != nil {
664+
t.Fatal(err)
665+
}
666+
}
667+
668+
checkVal := func(d datastore.Datastore, str string, expectFound bool) {
669+
t.Helper()
670+
res, err := d.Has(datastore.NewKey(str))
671+
if err != nil {
672+
t.Fatal(err)
673+
}
674+
if res != expectFound {
675+
if expectFound {
676+
t.Fatal("datastore is missing key")
677+
}
678+
t.Fatal("datastore has key it should not have")
679+
}
680+
}
681+
682+
// Add /foo/bar/0, Add /foo/bar/0/1, Add /foo/baz, Add /beep/bop, Sync /foo: all added except last - checks 1 and 2
683+
addToDS("/foo/bar/0")
684+
addToDS("/foo/bar/1")
685+
addToDS("/foo/baz")
686+
addToDS("/beep/bop")
687+
688+
if err := m.Sync(datastore.NewKey("/foo")); err != nil {
689+
t.Fatal(err)
690+
}
691+
692+
checkVal(internalDSFooBar, "/0", true)
693+
checkVal(internalDSFooBar, "/1", true)
694+
checkVal(internalDSFoo, "/baz", true)
695+
checkVal(internalDSRoot, "/beep/bop", false)
696+
697+
// Add /fwop Add /bloop Sync /fwop, both added - checks 3
698+
addToDS("/fwop")
699+
addToDS("/bloop")
700+
701+
if err := m.Sync(datastore.NewKey("/fwop")); err != nil {
702+
t.Fatal(err)
703+
}
704+
705+
checkVal(internalDSRoot, "/fwop", true)
706+
checkVal(internalDSRoot, "/bloop", false)
707+
}
708+
644709
type errQueryDS struct {
645710
datastore.NullDatastore
646711
}

0 commit comments

Comments
 (0)