Skip to content

Commit adaf942

Browse files
author
Hongchao Deng
committed
add log abstraction to store records in disk
1 parent b2448de commit adaf942

File tree

14 files changed

+289
-213
lines changed

14 files changed

+289
-213
lines changed

db/backend.go

Lines changed: 12 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,45 +1,35 @@
11
package db
22

33
import (
4-
"io/ioutil"
5-
"os"
64
"strings"
75

8-
"github.com/go-distributed/xtree/db/recordio"
6+
"github.com/go-distributed/xtree/db/log"
7+
"github.com/go-distributed/xtree/db/message"
98
"github.com/go-distributed/xtree/third-party/github.com/google/btree"
109
)
1110

1211
type backend struct {
1312
bt *btree.BTree
1413
cache *cache
1514
rev int
16-
fc recordio.Fetcher
17-
ap recordio.Appender
15+
log *log.Log
1816
}
1917

2018
func newBackend() *backend {
2119
bt := btree.New(10)
22-
23-
// temporary file IO to test in-disk values
24-
writeFile, err := ioutil.TempFile("", "backend")
25-
if err != nil {
26-
panic("can't create temp file")
27-
}
28-
readFile, err := os.Open(writeFile.Name())
20+
log, err := log.Create()
2921
if err != nil {
30-
panic("can't open temp file")
22+
panic("Not implemented")
3123
}
32-
3324
return &backend{
3425
bt: bt,
3526
cache: newCache(),
36-
fc: recordio.NewFetcher(readFile),
37-
ap: recordio.NewAppender(writeFile),
27+
log: log,
3828
}
3929
}
4030

4131
func (b *backend) getData(offset int64) []byte {
42-
rec, err := b.fc.Fetch(offset)
32+
rec, err := b.log.GetRecord(offset)
4333
if err != nil {
4434
panic("unimplemented")
4535
}
@@ -87,7 +77,11 @@ func (b *backend) Put(rev int, path Path, data []byte) {
8777
}
8878

8979
b.rev++
90-
offset, err := b.ap.Append(recordio.Record{data})
80+
offset, err := b.log.Append(&message.Record{
81+
Rev: b.rev,
82+
Key: path.p,
83+
Data: data,
84+
})
9185
if err != nil {
9286
panic("unimplemented")
9387
}

db/db.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ type DB interface {
2323
// Otherwise, it lists recursively all paths.
2424
//
2525
// if count is >= 0, it is the number of paths we want in the list.
26-
// if count is -1, it means any.
26+
// if count is -1, it means all.
2727
//
2828
// if it failed, an error is returned.
2929
Ls(rev int, path string, recursive bool, count int) ([]Path, error)

db/log/decoder.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package log
2+
3+
import (
4+
"bufio"
5+
"encoding/binary"
6+
"io"
7+
8+
"github.com/go-distributed/xtree/db/message"
9+
)
10+
11+
type decoder struct {
12+
br *bufio.Reader
13+
}
14+
15+
func newDecoder(r io.Reader) *decoder {
16+
return &decoder{bufio.NewReader(r)}
17+
}
18+
19+
func (d *decoder) decode(r *message.Record) (err error) {
20+
var l int64
21+
if l, err = readInt64(d.br); err != nil {
22+
return
23+
}
24+
data := make([]byte, l)
25+
if _, err = io.ReadFull(d.br, data); err != nil {
26+
return
27+
}
28+
return r.Unmarshal(data)
29+
}
30+
31+
func readInt64(r io.Reader) (n int64, err error) {
32+
err = binary.Read(r, binary.LittleEndian, &n)
33+
return
34+
}

db/log/encoder.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package log
2+
3+
import (
4+
"bufio"
5+
"encoding/binary"
6+
"io"
7+
8+
"github.com/go-distributed/xtree/db/message"
9+
)
10+
11+
type encoder struct {
12+
bw *bufio.Writer
13+
}
14+
15+
func newEncoder(w io.Writer) *encoder {
16+
return &encoder{bufio.NewWriter(w)}
17+
}
18+
19+
func (e *encoder) encode(r *message.Record) (err error) {
20+
var data []byte
21+
if data, err = r.Marshal(); err != nil {
22+
return
23+
}
24+
if err = writeInt64(e.bw, int64(len(data))); err != nil {
25+
return
26+
}
27+
_, err = e.bw.Write(data)
28+
return
29+
}
30+
31+
func (e *encoder) flush() error {
32+
return e.bw.Flush()
33+
}
34+
35+
func writeInt64(w io.Writer, n int64) error {
36+
return binary.Write(w, binary.LittleEndian, n)
37+
}

db/log/encoder_test.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package log
2+
3+
import (
4+
"bytes"
5+
"reflect"
6+
"testing"
7+
8+
"github.com/go-distributed/xtree/db/message"
9+
)
10+
11+
func TestEncoderDecoder(t *testing.T) {
12+
tests := []struct {
13+
rec *message.Record
14+
}{
15+
{&message.Record{
16+
Rev: 1,
17+
Key: "/test",
18+
Data: []byte("some data"),
19+
}},
20+
}
21+
22+
for i, tt := range tests {
23+
var err error
24+
eBuf := new(bytes.Buffer)
25+
encoder := newEncoder(eBuf)
26+
27+
if err = encoder.encode(tt.rec); err != nil {
28+
t.Fatalf("#%d: cannot encode, err: %v", i, err)
29+
}
30+
if err = encoder.flush(); err != nil {
31+
t.Fatalf("#%d: cannot flush encode, err: %v", i, err)
32+
}
33+
34+
rec := &message.Record{}
35+
dBuf := bytes.NewBuffer(eBuf.Bytes())
36+
decoder := newDecoder(dBuf)
37+
38+
if err = decoder.decode(rec); err != nil {
39+
t.Fatalf("#%d: cannot decode, err: %v", i, err)
40+
}
41+
42+
if !reflect.DeepEqual(tt.rec, rec) {
43+
t.Fatalf("#%d: records are not the same, want: %v, get: %v",
44+
i, tt.rec, rec)
45+
}
46+
}
47+
}

db/log/log.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package log
2+
3+
import (
4+
"io/ioutil"
5+
"os"
6+
7+
"github.com/go-distributed/xtree/db/message"
8+
)
9+
10+
type Log struct {
11+
f *os.File
12+
}
13+
14+
func Create() (*Log, error) {
15+
f, err := ioutil.TempFile("", "backend")
16+
if err != nil {
17+
return nil, err
18+
}
19+
20+
return &Log{f}, nil
21+
}
22+
23+
func (l *Log) Destroy() error {
24+
return os.Remove(l.f.Name())
25+
}
26+
27+
func (l *Log) GetRecord(offset int64) (*message.Record, error) {
28+
_, err := l.f.Seek(offset, 0)
29+
if err != nil {
30+
return nil, err
31+
}
32+
33+
decoder := newDecoder(l.f)
34+
r := &message.Record{}
35+
decoder.decode(r)
36+
37+
return r, nil
38+
}
39+
40+
func (l *Log) Append(r *message.Record) (offset int64, err error) {
41+
offset, err = l.f.Seek(0, 2)
42+
if err != nil {
43+
return -1, err
44+
}
45+
46+
encoder := newEncoder(l.f)
47+
48+
err = encoder.encode(r)
49+
encoder.flush()
50+
return offset, err
51+
}

db/log/log_test.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package log
2+
3+
import (
4+
"reflect"
5+
"testing"
6+
7+
"github.com/go-distributed/xtree/db/message"
8+
)
9+
10+
func TestAppendAndGetRecord(t *testing.T) {
11+
var err error
12+
var log *Log
13+
if log, err = Create(); err != nil {
14+
t.Errorf("Create failed: %v", err)
15+
}
16+
17+
defer log.Destroy()
18+
19+
tests := []struct {
20+
offset int64
21+
rec *message.Record
22+
}{
23+
{-1, &message.Record{
24+
Rev: 1,
25+
Key: "/test",
26+
Data: []byte("some data"),
27+
}},
28+
{-1, &message.Record{
29+
Rev: 2,
30+
Key: "/test2",
31+
Data: []byte("some other data"),
32+
}},
33+
}
34+
35+
for i, tt := range tests {
36+
tests[i].offset, err = log.Append(tt.rec)
37+
if err != nil {
38+
t.Errorf("#%d: Append failed: %v", i, err)
39+
}
40+
}
41+
42+
for i, tt := range tests {
43+
var rec *message.Record
44+
if rec, err = log.GetRecord(tt.offset); err != nil {
45+
t.Errorf("#%d: GetRecord failed: %v", i, err)
46+
}
47+
48+
if !reflect.DeepEqual(tt.rec, rec) {
49+
t.Errorf("#%d: records not the same, want: %v, get %v",
50+
i, tt.rec, rec)
51+
}
52+
53+
}
54+
}

db/message/record.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package message
2+
3+
import "encoding/json"
4+
5+
type Record struct {
6+
Rev int
7+
Key string
8+
Data []byte
9+
}
10+
11+
func (r *Record) Marshal() ([]byte, error) {
12+
return json.Marshal(r)
13+
}
14+
15+
func (r *Record) Unmarshal(data []byte) error {
16+
return json.Unmarshal(data, r)
17+
}

db/message/record_test.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package message
2+
3+
import (
4+
"reflect"
5+
"testing"
6+
)
7+
8+
func TestLogRecord(t *testing.T) {
9+
tests := []struct {
10+
rec *Record
11+
}{
12+
{&Record{
13+
Rev: 1,
14+
Key: "/test",
15+
Data: []byte("some data"),
16+
}},
17+
}
18+
19+
for i, tt := range tests {
20+
var data []byte
21+
var err error
22+
if data, err = tt.rec.Marshal(); err != nil {
23+
t.Fatalf("#%d: cannot marshal, err: %v", i, err)
24+
}
25+
26+
rec := &Record{}
27+
if err = rec.Unmarshal(data); err != nil {
28+
t.Fatalf("#%d: cannot unmarshal, err: %v", i, err)
29+
}
30+
31+
if !reflect.DeepEqual(tt.rec, rec) {
32+
t.Fatalf("#%d: records are not the same, want: %v, get: %v",
33+
i, tt.rec, rec)
34+
}
35+
}
36+
}

db/recordio/appender.go

Lines changed: 0 additions & 30 deletions
This file was deleted.

0 commit comments

Comments
 (0)