Skip to content

Commit 5fc79ea

Browse files
author
Hongchao Deng
committed
add log init and restore
1 parent cbf9f1c commit 5fc79ea

File tree

6 files changed

+228
-52
lines changed

6 files changed

+228
-52
lines changed

db/backend.go

Lines changed: 68 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,61 @@
11
package db
22

33
import (
4+
"fmt"
5+
"io/ioutil"
6+
"os"
47
"strings"
58

6-
"github.com/go-distributed/xtree/db/log"
9+
dblog "github.com/go-distributed/xtree/db/log"
710
"github.com/go-distributed/xtree/db/message"
811
"github.com/go-distributed/xtree/third-party/github.com/google/btree"
912
)
1013

1114
type backend struct {
12-
bt *btree.BTree
13-
cache *cache
14-
rev int
15-
log *log.Log
15+
bt *btree.BTree
16+
cache *cache
17+
rev int
18+
dblog *dblog.DBLog
19+
config *DBConfig
1620
}
1721

1822
func newBackend() *backend {
19-
bt := btree.New(10)
20-
log, err := log.Create()
23+
dataDir, err := ioutil.TempDir("", "backend")
24+
if err != nil {
25+
panic("not implemented")
26+
}
27+
28+
config := &DBConfig{
29+
DataDir: dataDir,
30+
}
31+
b, err := newBackendWithConfig(config)
2132
if err != nil {
22-
panic("Not implemented")
33+
panic("not implemented")
34+
}
35+
return b
36+
}
37+
38+
func newBackendWithConfig(config *DBConfig) (b *backend, err error) {
39+
bt := btree.New(10)
40+
b = &backend{
41+
bt: bt,
42+
cache: newCache(),
43+
config: config,
2344
}
24-
return &backend{
25-
bt: bt,
26-
cache: newCache(),
27-
log: log,
45+
haveLog := dblog.Exist(config.DataDir)
46+
switch haveLog {
47+
case false:
48+
fmt.Println("didn't have log file. Init...")
49+
err = b.init(config)
50+
case true:
51+
fmt.Println("had log file. Restore...")
52+
err = b.restore(config)
2853
}
54+
return
2955
}
3056

3157
func (b *backend) getData(offset int64) []byte {
32-
rec, err := b.log.GetRecord(offset)
58+
rec, err := b.dblog.GetRecord(offset)
3359
if err != nil {
3460
panic("unimplemented")
3561
}
@@ -77,7 +103,7 @@ func (b *backend) Put(rev int, path Path, data []byte) {
77103
}
78104

79105
b.rev++
80-
offset, err := b.log.Append(&message.Record{
106+
offset, err := b.dblog.Append(&message.Record{
81107
Key: path.p,
82108
Data: data,
83109
})
@@ -102,5 +128,33 @@ func (b *backend) Ls(pathname string) (paths []Path) {
102128
paths = append(paths, *p)
103129
return true
104130
})
131+
132+
return
133+
}
134+
135+
// init() creates a new log file
136+
func (b *backend) init(config *DBConfig) (err error) {
137+
b.dblog, err = dblog.Create(config.DataDir)
105138
return
106139
}
140+
141+
// restore() restores database from the log file.
142+
func (b *backend) restore(config *DBConfig) (err error) {
143+
rev := 0
144+
return dblog.Reuse(config.DataDir,
145+
func(l *dblog.DBLog) {
146+
b.dblog = l
147+
},
148+
func(r *message.Record) (err error) {
149+
rev++
150+
p := newPath(r.Key)
151+
b.Put(rev, *p, r.Data)
152+
return
153+
})
154+
}
155+
156+
// clean up resource after testing
157+
func (b *backend) testableCleanupResource() (err error) {
158+
b.dblog.Close()
159+
return os.RemoveAll(b.config.DataDir)
160+
}

db/backend_test.go

Lines changed: 52 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ func TestPut(t *testing.T) {
1717
}
1818

1919
b := newBackend()
20+
defer b.testableCleanupResource()
2021
for i, tt := range tests {
2122
b.Put(tt.rev, tt.path, tt.data)
2223
v := b.Get(tt.rev, tt.path)
@@ -27,6 +28,7 @@ func TestPut(t *testing.T) {
2728
t.Errorf("#%d: data = %s, want %s", i, v.data, tt.data)
2829
}
2930
}
31+
3032
}
3133

3234
func TestPutOnExistingPath(t *testing.T) {
@@ -40,6 +42,7 @@ func TestPutOnExistingPath(t *testing.T) {
4042
}
4143

4244
b := newBackend()
45+
defer b.testableCleanupResource()
4346
for i, tt := range tests {
4447
b.Put(2*i+1, tt.path, tt.data1)
4548
v := b.Get(2*i+1, tt.path)
@@ -65,6 +68,8 @@ func TestPutOnExistingPath(t *testing.T) {
6568

6669
func TestGetMVCC(t *testing.T) {
6770
b := newBackend()
71+
defer b.testableCleanupResource()
72+
6873
b.Put(1, *newPath("/a"), []byte("1"))
6974
b.Put(2, *newPath("/b"), []byte("2"))
7075
b.Put(3, *newPath("/a"), []byte("3"))
@@ -99,12 +104,15 @@ func TestGetMVCC(t *testing.T) {
99104
}
100105

101106
func TestLs(t *testing.T) {
102-
back := newBackend()
103107
d := []byte("somedata")
104-
back.Put(1, *newPath("/a"), d)
105-
back.Put(2, *newPath("/a/b"), d)
106-
back.Put(3, *newPath("/a/c"), d)
107-
back.Put(4, *newPath("/b"), d)
108+
109+
b := newBackend()
110+
defer b.testableCleanupResource()
111+
112+
b.Put(1, *newPath("/a"), d)
113+
b.Put(2, *newPath("/a/b"), d)
114+
b.Put(3, *newPath("/a/c"), d)
115+
b.Put(4, *newPath("/b"), d)
108116

109117
tests := []struct {
110118
p string
@@ -118,7 +126,7 @@ func TestLs(t *testing.T) {
118126
{"/c", []string{}},
119127
}
120128
for i, tt := range tests {
121-
ps := back.Ls(tt.p)
129+
ps := b.Ls(tt.p)
122130
if len(ps) != len(tt.wps) {
123131
t.Fatalf("#%d: len(ps) = %d, want %d", i, len(ps), len(tt.wps))
124132
}
@@ -130,9 +138,44 @@ func TestLs(t *testing.T) {
130138
}
131139
}
132140

141+
func TestRestore(t *testing.T) {
142+
tests := []struct {
143+
rev int
144+
path Path
145+
data []byte
146+
}{
147+
{1, *newPath("/foo/bar"), []byte("somedata")},
148+
{2, *newPath("/bar/foo"), []byte("datasome")},
149+
}
150+
151+
b := newBackend()
152+
for _, tt := range tests {
153+
// append records to the log
154+
b.Put(tt.rev, tt.path, tt.data)
155+
}
156+
b.dblog.Close()
157+
158+
// simulate restoring log in another backend
159+
b2, err := newBackendWithConfig(b.config)
160+
defer b2.testableCleanupResource()
161+
if err != nil {
162+
t.Errorf("newBackendWithConfig failed: %v", err)
163+
}
164+
for i, tt := range tests {
165+
v := b2.Get(tt.rev, tt.path)
166+
if v.rev != tt.rev {
167+
t.Errorf("#%d: rev = %d, want %d", i, v.rev, tt.rev)
168+
}
169+
if !reflect.DeepEqual(v.data, tt.data) {
170+
t.Errorf("#%d: data = %s, want %s", i, v.data, tt.data)
171+
}
172+
}
173+
}
174+
133175
func BenchmarkPut(b *testing.B) {
134176
b.StopTimer()
135177
back := newBackend()
178+
defer back.testableCleanupResource()
136179
d := []byte("somedata")
137180
path := make([]Path, b.N)
138181
for i := range path {
@@ -148,6 +191,8 @@ func BenchmarkPut(b *testing.B) {
148191
func BenchmarkGetWithCache(b *testing.B) {
149192
b.StopTimer()
150193
back := newBackend()
194+
defer back.testableCleanupResource()
195+
151196
d := []byte("somedata")
152197
path := make([]Path, b.N)
153198
for i := range path {
@@ -168,6 +213,7 @@ func BenchmarkGetWithCache(b *testing.B) {
168213
func BenchmarkGetWithOutCache(b *testing.B) {
169214
b.StopTimer()
170215
back := newBackend()
216+
defer back.testableCleanupResource()
171217
back.cache = nil
172218
d := []byte("somedata")
173219
path := make([]Path, b.N)

db/config.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package db
2+
3+
type DBConfig struct {
4+
DataDir string
5+
}

db/log/encoder_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ func TestEncoderDecoder(t *testing.T) {
3030
t.Fatalf("#%d: cannot flush encode, err: %v", i, err)
3131
}
3232

33-
rec := &message.Record{}
33+
rec := new(message.Record)
3434
dBuf := bytes.NewBuffer(eBuf.Bytes())
3535
decoder := newDecoder(dBuf)
3636

db/log/log.go

Lines changed: 84 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,49 +1,112 @@
11
package log
22

33
import (
4-
"io/ioutil"
4+
"fmt"
5+
"io"
56
"os"
7+
"path"
68

79
"github.com/go-distributed/xtree/db/message"
810
)
911

10-
type Log struct {
11-
f *os.File
12+
const (
13+
logFilename = "records.log"
14+
)
15+
16+
type DBLog struct {
17+
writeFile, readFile *os.File
18+
encoder *encoder
19+
}
20+
21+
func Create(dataDir string) (l *DBLog, err error) {
22+
l, err = newDBLog(path.Join(dataDir, logFilename), true)
23+
return
1224
}
1325

14-
func Create() (*Log, error) {
15-
f, err := ioutil.TempFile("", "backend")
16-
if err != nil {
17-
return nil, err
26+
func newDBLog(logPath string, needCreate bool) (l *DBLog, err error) {
27+
var writeFile, readFile *os.File
28+
flag := os.O_WRONLY | os.O_APPEND | os.O_SYNC
29+
if needCreate {
30+
flag |= os.O_CREATE
31+
}
32+
if writeFile, err = os.OpenFile(logPath, flag, 0600); err != nil {
33+
return
34+
}
35+
if !needCreate {
36+
writeFile.Seek(0, os.SEEK_END)
37+
}
38+
if readFile, err = os.Open(logPath); err != nil {
39+
writeFile.Close()
40+
return
41+
}
42+
l = &DBLog{
43+
writeFile: writeFile,
44+
readFile: readFile,
45+
encoder: newEncoder(writeFile),
1846
}
19-
return &Log{f}, nil
47+
return
2048
}
2149

22-
func (l *Log) Destroy() error {
23-
if err := l.f.Close(); err != nil {
24-
return err
50+
func Reuse(dataDir string,
51+
setLog func(*DBLog),
52+
replayRecord func(*message.Record) error) (err error) {
53+
var l *DBLog
54+
if l, err = newDBLog(path.Join(dataDir, logFilename),
55+
false); err != nil {
56+
return
2557
}
26-
return os.Remove(l.f.Name())
58+
setLog(l)
59+
decoder := newDecoder(l.readFile)
60+
// TODO: parallel?
61+
fi, _ := os.Stat(l.readFile.Name())
62+
fmt.Println(fi.Size())
63+
return
64+
for {
65+
r := new(message.Record)
66+
if err = decoder.decode(r); err != nil {
67+
if err == io.EOF {
68+
return nil
69+
}
70+
return
71+
}
72+
ret, _ := l.readFile.Seek(0, os.SEEK_CUR)
73+
fmt.Printf("%v %#v\n", ret, r)
74+
if err = replayRecord(r); err != nil {
75+
return
76+
}
77+
}
78+
}
79+
80+
func Exist(dataDir string) bool {
81+
p := path.Join(dataDir, logFilename)
82+
_, err := os.Stat(p)
83+
return err == nil
2784
}
2885

29-
func (l *Log) GetRecord(offset int64) (r *message.Record, err error) {
30-
if _, err = l.f.Seek(offset, 0); err != nil {
86+
func (l *DBLog) GetRecord(offset int64) (r *message.Record, err error) {
87+
if _, err = l.readFile.Seek(offset, 0); err != nil {
3188
return
3289
}
33-
decoder := newDecoder(l.f)
34-
r = &message.Record{}
90+
decoder := newDecoder(l.readFile)
91+
r = new(message.Record)
3592
err = decoder.decode(r)
3693
return
3794
}
3895

39-
func (l *Log) Append(r *message.Record) (offset int64, err error) {
40-
if offset, err = l.f.Seek(0, 2); err != nil {
96+
func (l *DBLog) Append(r *message.Record) (offset int64, err error) {
97+
if offset, err = l.writeFile.Seek(0, os.SEEK_CUR); err != nil {
4198
return
4299
}
43-
encoder := newEncoder(l.f)
44-
if err = encoder.encode(r); err != nil {
100+
if err = l.encoder.encode(r); err != nil {
45101
return
46102
}
47-
err = encoder.flush()
103+
err = l.encoder.flush()
48104
return offset, err
49105
}
106+
107+
func (l *DBLog) Close() (err error) {
108+
if err = l.readFile.Close(); err != nil {
109+
return
110+
}
111+
return l.writeFile.Close()
112+
}

0 commit comments

Comments
 (0)