Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 11 additions & 18 deletions db/backend.go
Original file line number Diff line number Diff line change
@@ -1,45 +1,35 @@
package db

import (
"io/ioutil"
"os"
"strings"

"github.com/go-distributed/xtree/db/recordio"
"github.com/go-distributed/xtree/db/log"
"github.com/go-distributed/xtree/db/message"
"github.com/go-distributed/xtree/third-party/github.com/google/btree"
)

type backend struct {
bt *btree.BTree
cache *cache
rev int
fc recordio.Fetcher
ap recordio.Appender
log *log.Log
}

func newBackend() *backend {
bt := btree.New(10)

// temporary file IO to test in-disk values
writeFile, err := ioutil.TempFile("", "backend")
if err != nil {
panic("can't create temp file")
}
readFile, err := os.Open(writeFile.Name())
log, err := log.Create()
if err != nil {
panic("can't open temp file")
panic("Not implemented")
}

return &backend{
bt: bt,
cache: newCache(),
fc: recordio.NewFetcher(readFile),
ap: recordio.NewAppender(writeFile),
log: log,
}
}

func (b *backend) getData(offset int64) []byte {
rec, err := b.fc.Fetch(offset)
rec, err := b.log.GetRecord(offset)
if err != nil {
panic("unimplemented")
}
Expand Down Expand Up @@ -87,7 +77,10 @@ func (b *backend) Put(rev int, path Path, data []byte) {
}

b.rev++
offset, err := b.ap.Append(recordio.Record{data})
offset, err := b.log.Append(&message.Record{
Key: path.p,
Data: data,
})
if err != nil {
panic("unimplemented")
}
Expand Down
2 changes: 1 addition & 1 deletion db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type DB interface {
// Otherwise, it lists recursively all paths.
//
// if count is >= 0, it is the number of paths we want in the list.
// if count is -1, it means any.
// if count is -1, it means all.
//
// if it failed, an error is returned.
Ls(rev int, path string, recursive bool, count int) ([]Path, error)
Expand Down
34 changes: 34 additions & 0 deletions db/log/decoder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package log

import (
"bufio"
"encoding/binary"
"io"

"github.com/go-distributed/xtree/db/message"
)

type decoder struct {
br *bufio.Reader
}

func newDecoder(r io.Reader) *decoder {
return &decoder{bufio.NewReader(r)}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, no needs to double buffer reads.

}

func (d *decoder) decode(r *message.Record) (err error) {
var l int64
if l, err = readInt64(d.br); err != nil {
return
}
data := make([]byte, l)
if _, err = io.ReadFull(d.br, data); err != nil {
return
}
return r.Unmarshal(data)
}

func readInt64(r io.Reader) (n int64, err error) {
err = binary.Read(r, binary.LittleEndian, &n)
return
}
37 changes: 37 additions & 0 deletions db/log/encoder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package log

import (
"bufio"
"encoding/binary"
"io"

"github.com/go-distributed/xtree/db/message"
)

type encoder struct {
bw *bufio.Writer
}

func newEncoder(w io.Writer) *encoder {
return &encoder{bufio.NewWriter(w)}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No needs to have bufio here since OS already buffers IO for you. Double buffer is evil in database.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's interesting. We need to investigate more regarding buffering behavior.

It currently uses bufio to write a single request: (length | record).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Buffer should only be done in kernel space.

In kernel space, OS's page cache will buffer any incoming writes until you fsync on the file or until the flusher thread kicks in. There is no point buffering them in user space unless you want to speed up the reads for buffered data.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't about buffering. It's just a way to construct a message to be written into disk.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can still construct the message with io.writer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about we don't buffer stuff until we know there is a performance issue without buffering? : ) The code will be much simpler.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope. You can keep a downstream branch without buffer. I don't understand this part in OS so I will keep it in upstream.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not upstream, this is just a pull request towards upstream. I'm just pointing out things that doesn't make sense in the pull request.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a follow up on this. According to KAFKA doc, "As a result of these factors using the filesystem and relying on pagecache is superior to maintaining an in-memory cache or other structure". It sounds OS page-cache reliance is a better option. And thanks for the discussion here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for doing the due diligence before adding a feature :)

To clarify, most of stuff we were discussing here is buffering instead of caching. Caching is a super set of buffering in the sense that you can read from a cache but cannot read from a bufio writer.

}

func (e *encoder) encode(r *message.Record) (err error) {
var data []byte
if data, err = r.Marshal(); err != nil {
return
}
if err = writeInt64(e.bw, int64(len(data))); err != nil {
return
}
_, err = e.bw.Write(data)
return
}

func (e *encoder) flush() error {
return e.bw.Flush()
}

func writeInt64(w io.Writer, n int64) error {
return binary.Write(w, binary.LittleEndian, n)
}
46 changes: 46 additions & 0 deletions db/log/encoder_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package log

import (
"bytes"
"reflect"
"testing"

"github.com/go-distributed/xtree/db/message"
)

func TestEncoderDecoder(t *testing.T) {
tests := []struct {
rec *message.Record
}{
{&message.Record{
Key: "/test",
Data: []byte("some data"),
}},
}

for i, tt := range tests {
var err error
eBuf := new(bytes.Buffer)
encoder := newEncoder(eBuf)

if err = encoder.encode(tt.rec); err != nil {
t.Fatalf("#%d: cannot encode, err: %v", i, err)
}
if err = encoder.flush(); err != nil {
t.Fatalf("#%d: cannot flush encode, err: %v", i, err)
}

rec := &message.Record{}
dBuf := bytes.NewBuffer(eBuf.Bytes())
decoder := newDecoder(dBuf)

if err = decoder.decode(rec); err != nil {
t.Fatalf("#%d: cannot decode, err: %v", i, err)
}

if !reflect.DeepEqual(tt.rec, rec) {
t.Fatalf("#%d: records are not the same, want: %v, get: %v",
i, tt.rec, rec)
}
}
}
51 changes: 51 additions & 0 deletions db/log/log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package log

import (
"io/ioutil"
"os"

"github.com/go-distributed/xtree/db/message"
)

type Log struct {
f *os.File
}

func Create() (*Log, error) {
f, err := ioutil.TempFile("", "backend")
if err != nil {
return nil, err
}

return &Log{f}, nil
}

func (l *Log) Destroy() error {
return os.Remove(l.f.Name())
}

func (l *Log) GetRecord(offset int64) (*message.Record, error) {
_, err := l.f.Seek(offset, 0)
if err != nil {
return nil, err
}

decoder := newDecoder(l.f)
r := &message.Record{}
decoder.decode(r)

return r, nil
}

func (l *Log) Append(r *message.Record) (offset int64, err error) {
offset, err = l.f.Seek(0, 2)
if err != nil {
return -1, err
}

encoder := newEncoder(l.f)

err = encoder.encode(r)
encoder.flush()
return offset, err
}
52 changes: 52 additions & 0 deletions db/log/log_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package log

import (
"reflect"
"testing"

"github.com/go-distributed/xtree/db/message"
)

func TestAppendAndGetRecord(t *testing.T) {
var err error
var log *Log
if log, err = Create(); err != nil {
t.Errorf("Create failed: %v", err)
}

defer log.Destroy()

tests := []struct {
offset int64
rec *message.Record
}{
{-1, &message.Record{
Key: "/test",
Data: []byte("some data"),
}},
{-1, &message.Record{
Key: "/test2",
Data: []byte("some other data"),
}},
}

for i, tt := range tests {
tests[i].offset, err = log.Append(tt.rec)
if err != nil {
t.Errorf("#%d: Append failed: %v", i, err)
}
}

for i, tt := range tests {
var rec *message.Record
if rec, err = log.GetRecord(tt.offset); err != nil {
t.Errorf("#%d: GetRecord failed: %v", i, err)
}

if !reflect.DeepEqual(tt.rec, rec) {
t.Errorf("#%d: records not the same, want: %v, get %v",
i, tt.rec, rec)
}

}
}
16 changes: 16 additions & 0 deletions db/message/record.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package message

import "encoding/json"

type Record struct {
Key string
Data []byte
}

func (r *Record) Marshal() ([]byte, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't use json, use binary format instead.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

message/ will only contain protobuf messages in the future

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need protobuf for a struct with two fields.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless we have benchmarks of different go serialization framework and come up with the best, I would take the traditional role and use protobuf for prototyping.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did some research on how protobuf works internally, it should be faster than plain encoding. That's good.

return json.Marshal(r)
}

func (r *Record) Unmarshal(data []byte) error {
return json.Unmarshal(data, r)
}
35 changes: 35 additions & 0 deletions db/message/record_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package message

import (
"reflect"
"testing"
)

func TestLogRecord(t *testing.T) {
tests := []struct {
rec *Record
}{
{&Record{
Key: "/test",
Data: []byte("some data"),
}},
}

for i, tt := range tests {
var data []byte
var err error
if data, err = tt.rec.Marshal(); err != nil {
t.Fatalf("#%d: cannot marshal, err: %v", i, err)
}

rec := &Record{}
if err = rec.Unmarshal(data); err != nil {
t.Fatalf("#%d: cannot unmarshal, err: %v", i, err)
}

if !reflect.DeepEqual(tt.rec, rec) {
t.Fatalf("#%d: records are not the same, want: %v, get: %v",
i, tt.rec, rec)
}
}
}
30 changes: 0 additions & 30 deletions db/recordio/appender.go

This file was deleted.

Loading