Skip to content

Commit 368b487

Browse files
authored
feat: close meta store (#114)
- fix: unmarshal typo - feat: close meta store - test: add ut for meta store
1 parent f4f8af0 commit 368b487

File tree

7 files changed

+256
-37
lines changed

7 files changed

+256
-37
lines changed

internal/store/meta/async.go

Lines changed: 41 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ type AsyncStore struct {
3535
pending *skiplist.SkipList
3636

3737
commitc chan struct{}
38+
closec chan struct{}
3839
donec chan struct{}
3940
}
4041

@@ -49,6 +50,7 @@ func newAsyncStore(wal *walog.WAL, committed *skiplist.SkipList, version, snapsh
4950
},
5051
pending: skiplist.New(skiplist.Bytes),
5152
commitc: make(chan struct{}, 1),
53+
closec: make(chan struct{}),
5254
donec: make(chan struct{}),
5355
}
5456

@@ -57,10 +59,16 @@ func newAsyncStore(wal *walog.WAL, committed *skiplist.SkipList, version, snapsh
5759
return s
5860
}
5961

60-
func (s *AsyncStore) Stop() {
61-
// TODO(james.yin): stop WAL
62-
close(s.commitc)
62+
func (s *AsyncStore) Close() {
63+
s.mu.Lock()
64+
close(s.closec)
65+
s.mu.Unlock()
66+
6367
<-s.donec
68+
69+
// Close WAL.
70+
s.wal.Close()
71+
s.wal.Wait()
6472
}
6573

6674
func (s *AsyncStore) Load(key []byte) (interface{}, bool) {
@@ -76,17 +84,38 @@ func (s *AsyncStore) Load(key []byte) (interface{}, bool) {
7684
}
7785

7886
func (s *AsyncStore) Store(key []byte, value interface{}) {
79-
s.mu.Lock()
80-
defer s.mu.Unlock()
81-
s.pending.Set(key, value)
82-
s.tryCommit()
87+
_ = s.set(KVRange(key, value))
88+
}
89+
90+
func (s *AsyncStore) BatchStore(kvs Ranger) {
91+
_ = s.set(kvs)
8392
}
8493

8594
func (s *AsyncStore) Delete(key []byte) {
95+
_ = s.set(KVRange(key, deletedMark))
96+
}
97+
98+
func (s *AsyncStore) set(kvs Ranger) error {
8699
s.mu.Lock()
87100
defer s.mu.Unlock()
88-
s.pending.Set(key, deletedMark)
101+
102+
select {
103+
case <-s.closec:
104+
return ErrClosed
105+
default:
106+
}
107+
108+
err := kvs.Range(func(key []byte, value interface{}) error {
109+
s.pending.Set(key, value)
110+
return nil
111+
})
112+
if err != nil {
113+
return err
114+
}
115+
89116
s.tryCommit()
117+
118+
return nil
90119
}
91120

92121
func (s *AsyncStore) tryCommit() {
@@ -113,10 +142,9 @@ func (s *AsyncStore) runCommit() {
113142

114143
for {
115144
select {
116-
case _, ok := <-s.commitc:
117-
if !ok {
118-
return
119-
}
145+
case <-s.closec:
146+
return
147+
case <-s.commitc:
120148
case <-ticker.C:
121149
}
122150
s.commit()
@@ -163,7 +191,7 @@ func RecoverAsyncStore(walDir string) (*AsyncStore, error) {
163191
version := snapshot
164192
wal, err := walog.RecoverWithVisitor(walDir, snapshot, func(data []byte, offset int64) error {
165193
m := skiplist.New(skiplist.Bytes)
166-
err2 := defaultCodec.Unmarshl(data, func(key []byte, value interface{}) error {
194+
err2 := defaultCodec.Unmarshal(data, func(key []byte, value interface{}) error {
167195
m.Set(key, value)
168196
return nil
169197
})

internal/store/meta/async_test.go

Lines changed: 81 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,87 @@
1414

1515
package meta
1616

17-
import "testing"
17+
import (
18+
// standard libraries.
19+
"os"
20+
"testing"
1821

19-
func TestAsyncStore_Load(t *testing.T) {
22+
// third-party libraries.
23+
. "github.com/smartystreets/goconvey/convey"
24+
)
2025

26+
func TestAsyncStore(t *testing.T) {
27+
Convey("AsyncStore", t, func() {
28+
walDir, err := os.MkdirTemp("", "async-*")
29+
So(err, ShouldBeNil)
30+
31+
Convey("new empty AsyncStore by recovery", func() {
32+
ss, err := RecoverAsyncStore(walDir)
33+
34+
So(err, ShouldBeNil)
35+
So(ss, ShouldNotBeNil)
36+
37+
ss.Close()
38+
})
39+
40+
Convey("setup AsyncStore", func() {
41+
ss, err := RecoverAsyncStore(walDir)
42+
So(err, ShouldBeNil)
43+
ss.Store(key0, "value0")
44+
ss.Store(key1, "value1")
45+
ss.Close()
46+
47+
Convey("recover AsyncStore", func() {
48+
ss, err = RecoverAsyncStore(walDir)
49+
So(err, ShouldBeNil)
50+
51+
value0, ok0 := ss.Load(key0)
52+
So(ok0, ShouldBeTrue)
53+
So(value0, ShouldResemble, "value0")
54+
55+
value1, ok1 := ss.Load(key1)
56+
So(ok1, ShouldBeTrue)
57+
So(value1, ShouldResemble, "value1")
58+
59+
_, ok2 := ss.Load(key2)
60+
So(ok2, ShouldBeFalse)
61+
62+
Convey("modify AsyncStore", func() {
63+
ss.Delete(key1)
64+
_, ok1 = ss.Load(key1)
65+
So(ok1, ShouldBeFalse)
66+
67+
ss.Store(key2, "value2")
68+
value2, ok2 := ss.Load(key2)
69+
So(ok2, ShouldBeTrue)
70+
So(value2, ShouldResemble, "value2")
71+
72+
ss.Close()
73+
74+
Convey("recover AsyncStore again", func() {
75+
ss, err = RecoverAsyncStore(walDir)
76+
So(err, ShouldBeNil)
77+
78+
value0, ok0 := ss.Load(key0)
79+
So(ok0, ShouldBeTrue)
80+
So(value0, ShouldResemble, "value0")
81+
82+
_, ok1 := ss.Load(key1)
83+
So(ok1, ShouldBeFalse)
84+
85+
value2, ok2 := ss.Load(key2)
86+
So(ok2, ShouldBeTrue)
87+
So(value2, ShouldResemble, "value2")
88+
89+
ss.Close()
90+
})
91+
})
92+
})
93+
})
94+
95+
Reset(func() {
96+
err := os.RemoveAll(walDir)
97+
So(err, ShouldBeNil)
98+
})
99+
})
21100
}

internal/store/meta/codec.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ type Marshaler interface {
3434
}
3535

3636
type Unmarshaler interface {
37-
Unmarshl(data []byte, cb RangeCallback) error
37+
Unmarshal(data []byte, cb RangeCallback) error
3838
}
3939

4040
type deletedMarkType struct{}
@@ -176,7 +176,7 @@ func toKind(k reflect.Kind) Kind {
176176
return Invalid
177177
}
178178

179-
func (codec) Unmarshl(data []byte, cb RangeCallback) error {
179+
func (codec) Unmarshal(data []byte, cb RangeCallback) error {
180180
var last []byte
181181
for so := 0; so < len(data); {
182182
// Decode key.

internal/store/meta/snapshot.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ func recoverLatestSnopshot(dir string, unmarshaler Unmarshaler) (*skiplist.SkipL
9797
}
9898

9999
m := skiplist.New(skiplist.Bytes)
100-
err = unmarshaler.Unmarshl(data, func(key []byte, value interface{}) error {
100+
err = unmarshaler.Unmarshal(data, func(key []byte, value interface{}) error {
101101
m.Set(key, value)
102102
return nil
103103
})

internal/store/meta/store.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package meta
1616

1717
import (
1818
// standard libraries.
19+
"errors"
1920
"sync"
2021

2122
// third-party libraries.
@@ -25,6 +26,8 @@ import (
2526
walog "github.com/linkall-labs/vanus/internal/store/wal"
2627
)
2728

29+
var ErrClosed = errors.New("MetaStore: closed")
30+
2831
type store struct {
2932
mu sync.RWMutex
3033

internal/store/meta/sync.go

Lines changed: 37 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package meta
1616

1717
import (
1818
// standard libraries.
19+
"errors"
1920
"time"
2021

2122
// third-party libraries.
@@ -33,6 +34,7 @@ type SyncStore struct {
3334
store
3435

3536
snapshotc chan struct{}
37+
donec chan struct{}
3638
}
3739

3840
func newSyncStore(wal *walog.WAL, committed *skiplist.SkipList, version, snapshot int64) *SyncStore {
@@ -45,15 +47,23 @@ func newSyncStore(wal *walog.WAL, committed *skiplist.SkipList, version, snapsho
4547
marshaler: defaultCodec,
4648
},
4749
snapshotc: make(chan struct{}, 1),
50+
donec: make(chan struct{}),
4851
}
4952

5053
go s.runSnapshot()
5154

5255
return s
5356
}
5457

55-
func (s *SyncStore) Stop() {
56-
// TODO(james.yin): stop WAL
58+
func (s *SyncStore) Close() {
59+
// Close WAL.
60+
s.wal.Close()
61+
s.wal.Wait()
62+
63+
// NOTE: Can not close the snapshotc before close the WAL,
64+
// because write to snapshotc in callback of WAL append.
65+
close(s.snapshotc)
66+
<-s.donec
5767
}
5868

5969
func (s *SyncStore) Load(key []byte) (interface{}, bool) {
@@ -87,27 +97,25 @@ func (s *SyncStore) set(kvs Ranger) error {
8797
}
8898

8999
ch := make(chan error, 1)
100+
// Use callbacks for ordering guarantees.
90101
s.wal.AppendOne(entry, walog.WithCallback(func(re walog.Result) {
91102
if re.Err != nil {
92103
ch <- re.Err
93104
return
94105
}
95106

96107
// Update state.
97-
func() {
98-
s.mu.Lock()
99-
defer s.mu.Unlock()
100-
_ = kvs.Range(func(key []byte, value interface{}) error {
101-
if value == deletedMark {
102-
s.committed.Remove(key)
103-
} else {
104-
s.committed.Set(key, value)
105-
}
106-
return nil
107-
})
108-
109-
s.version = re.Offset()
110-
}()
108+
s.mu.Lock()
109+
_ = kvs.Range(func(key []byte, value interface{}) error {
110+
if value == deletedMark {
111+
s.committed.Remove(key)
112+
} else {
113+
s.committed.Set(key, value)
114+
}
115+
return nil
116+
})
117+
s.version = re.Offset()
118+
s.mu.Unlock()
111119

112120
close(ch)
113121

@@ -116,12 +124,22 @@ func (s *SyncStore) set(kvs Ranger) error {
116124
default:
117125
}
118126
}))
119-
return <-ch
127+
err = <-ch
128+
129+
// Convert ErrClosed.
130+
if err != nil && errors.Is(err, walog.ErrClosed) {
131+
return ErrClosed
132+
}
133+
134+
return err
120135
}
121136

122137
func (s *SyncStore) runSnapshot() {
123138
ticker := time.NewTicker(runSnapshotInterval)
124-
defer ticker.Stop()
139+
defer func() {
140+
ticker.Stop()
141+
close(s.donec)
142+
}()
125143

126144
for {
127145
select {
@@ -143,7 +161,7 @@ func RecoverSyncStore(walDir string) (*SyncStore, error) {
143161

144162
version := snapshot
145163
wal, err := walog.RecoverWithVisitor(walDir, snapshot, func(data []byte, offset int64) error {
146-
err2 := defaultCodec.Unmarshl(data, func(key []byte, value interface{}) error {
164+
err2 := defaultCodec.Unmarshal(data, func(key []byte, value interface{}) error {
147165
set(committed, key, value)
148166
return nil
149167
})

0 commit comments

Comments
 (0)