Skip to content

Commit b06d88e

Browse files
authored
Add buffer configurations to readers and writers (#40)
1 parent 68806d7 commit b06d88e

File tree

12 files changed

+169
-44
lines changed

12 files changed

+169
-44
lines changed

_examples/recordio.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,13 @@ package main
22

33
import (
44
"errors"
5-
"github.com/thomasjungblut/go-sstables/_examples/proto"
6-
"github.com/thomasjungblut/go-sstables/recordio"
7-
rProto "github.com/thomasjungblut/go-sstables/recordio/proto"
85
"io"
96
"log"
107
"os"
8+
9+
"github.com/thomasjungblut/go-sstables/_examples/proto"
10+
"github.com/thomasjungblut/go-sstables/recordio"
11+
rProto "github.com/thomasjungblut/go-sstables/recordio/proto"
1112
)
1213

1314
func main() {
@@ -21,7 +22,7 @@ func main() {
2122
}
2223

2324
func simpleRead(path string) {
24-
reader, err := rProto.NewProtoReaderWithPath(path)
25+
reader, err := rProto.NewReader(rProto.ReaderPath(path))
2526
if err != nil {
2627
log.Fatalf("error: %v", err)
2728
}

recordio/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ import (
130130
rProto "github.com/thomasjungblut/go-sstables/recordio/proto"
131131
)
132132

133-
reader, err := rProto.NewProtoReaderWithPath(path)
133+
reader, err := rProto.NewProtoReader(rProto.ReaderPath(path))
134134
if err != nil { log.Fatalf("error: %v", err) }
135135

136136
err = reader.Open()

recordio/file_reader.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -404,6 +404,10 @@ func NewFileReader(readerOptions ...FileReaderOption) (ReaderI, error) {
404404
readOption(opts)
405405
}
406406

407+
if (opts.file == nil) == (opts.path == "") {
408+
return nil, errors.New("NewFileReader: either os.File or string path must be supplied, never both")
409+
}
410+
407411
f, r, err := opts.factory.CreateNewReader(opts.path, opts.bufferSizeBytes)
408412
if err != nil {
409413
return nil, err

recordio/file_reader_test.go

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
package recordio
22

33
import (
4-
"github.com/stretchr/testify/assert"
5-
"github.com/stretchr/testify/require"
4+
"errors"
65
"io"
6+
"os"
77
"testing"
8+
9+
"github.com/stretchr/testify/assert"
10+
"github.com/stretchr/testify/require"
811
)
912

1013
func TestReaderHappyPathSingleRecord(t *testing.T) {
@@ -187,6 +190,21 @@ func TestReaderForbidsDoubleOpens(t *testing.T) {
187190
expectErrorStringOnOpen(t, reader, "already opened")
188191
}
189192

193+
func TestReaderInitNoPath(t *testing.T) {
194+
_, err := NewFileReader()
195+
assert.Equal(t, errors.New("NewFileReader: either os.File or string path must be supplied, never both"), err)
196+
}
197+
198+
func TestReaderInitPathAndFile(t *testing.T) {
199+
f, err := os.OpenFile("test_files/readerTestFile", os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0666)
200+
assert.NoError(t, err)
201+
defer os.Remove("test_files/readerTestFile")
202+
defer f.Close()
203+
reader, err := NewFileReader(ReaderFile(f), ReaderPath("test_files/readerTestFile2"))
204+
assert.Equal(t, errors.New("NewFileReader: either os.File or string path must be supplied, never both"), err)
205+
assert.Nil(t, reader)
206+
}
207+
190208
func expectErrorStringOnOpen(t *testing.T, reader OpenClosableI, expectedError string) {
191209
err := reader.Open()
192210
defer closeOpenClosable(t, reader)

recordio/proto/proto_reader.go

Lines changed: 61 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
package proto
22

33
import (
4+
"errors"
5+
"os"
6+
47
"github.com/thomasjungblut/go-sstables/recordio"
58
"google.golang.org/protobuf/proto"
6-
"os"
79
)
810

911
type Reader struct {
@@ -36,27 +38,75 @@ func (r *Reader) Close() error {
3638
return r.reader.Close()
3739
}
3840

39-
func NewProtoReaderWithPath(path string) (ReaderI, error) {
40-
f, err := os.Open(path)
41-
if err != nil {
42-
return nil, err
41+
// options
42+
43+
type ReaderOptions struct {
44+
path string
45+
file *os.File
46+
bufSizeBytes int
47+
}
48+
49+
type ReaderOption func(*ReaderOptions)
50+
51+
func ReaderPath(p string) ReaderOption {
52+
return func(args *ReaderOptions) {
53+
args.path = p
4354
}
55+
}
4456

45-
r, err := NewProtoReaderWithFile(f)
46-
if err != nil {
47-
return nil, err
57+
func ReaderFile(p *os.File) ReaderOption {
58+
return func(args *ReaderOptions) {
59+
args.file = p
4860
}
61+
}
4962

50-
return r, nil
63+
func ReadBufferSizeBytes(p int) ReaderOption {
64+
return func(args *ReaderOptions) {
65+
args.bufSizeBytes = p
66+
}
5167
}
5268

53-
func NewProtoReaderWithFile(file *os.File) (ReaderI, error) {
54-
reader, err := recordio.NewFileReaderWithFile(file)
69+
// create a new reader with the given options. Either Path or File must be supplied
70+
func NewReader(readerOptions ...ReaderOption) (ReaderI, error) {
71+
opts := &ReaderOptions{
72+
path: "",
73+
file: nil,
74+
bufSizeBytes: 1024 * 1024 * 4,
75+
}
76+
77+
for _, readerOption := range readerOptions {
78+
readerOption(opts)
79+
}
80+
81+
if (opts.file != nil) && (opts.path != "") {
82+
return nil, errors.New("either os.File or string path must be supplied, never both")
83+
}
84+
85+
if opts.file == nil {
86+
if opts.path == "" {
87+
return nil, errors.New("path was not supplied")
88+
}
89+
}
90+
reader, err := recordio.NewFileReader(
91+
recordio.ReaderPath(opts.path),
92+
recordio.ReaderFile(opts.file),
93+
recordio.ReaderBufferSizeBytes(opts.bufSizeBytes))
5594
if err != nil {
5695
return nil, err
5796
}
5897

5998
return &Reader{
6099
reader: reader,
61100
}, nil
101+
102+
}
103+
104+
// Deprecated: use the NewProtoReader with options.
105+
func NewProtoReaderWithPath(path string) (ReaderI, error) {
106+
return NewReader(ReaderPath(path))
107+
}
108+
109+
// Deprecated: use the NewProtoReader with options.
110+
func NewProtoReaderWithFile(file *os.File) (ReaderI, error) {
111+
return NewReader(ReaderFile(file))
62112
}

recordio/proto/recordio_proto_test.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,14 @@ package proto
22

33
import (
44
"bufio"
5+
"math/rand"
6+
"os"
7+
"testing"
8+
59
"github.com/stretchr/testify/assert"
610
"github.com/stretchr/testify/require"
711
"github.com/thomasjungblut/go-sstables/recordio"
812
"github.com/thomasjungblut/go-sstables/recordio/test_files"
9-
"math/rand"
10-
"os"
11-
"testing"
1213
)
1314

1415
const TestFile = "../test_files/berlin52.tsp"
@@ -63,7 +64,7 @@ func endToEndReadWriteProtobuf(writer WriterI, t *testing.T, tmpFile *os.File) {
6364
require.NoError(t, writer.Close())
6465
require.NoError(t, inFile.Close())
6566

66-
reader, err := NewProtoReaderWithPath(tmpFile.Name())
67+
reader, err := NewReader(ReaderPath(tmpFile.Name()))
6768
require.NoError(t, err)
6869
require.NoError(t, reader.Open())
6970

simpledb/compaction.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,11 @@ func executeCompaction(db *DB) (compactionMetadata *proto.CompactionMetadata, er
149149
}
150150

151151
func saveCompactionMetadata(writeFolder string, compactionMetadata *proto.CompactionMetadata) (err error) {
152-
metaWriter, err := rProto.NewWriter(rProto.Path(filepath.Join(writeFolder, CompactionFinishedSuccessfulFileName)))
152+
metaWriter, err := rProto.NewWriter(
153+
rProto.Path(filepath.Join(writeFolder, CompactionFinishedSuccessfulFileName)),
154+
rProto.WriteBufferSizeBytes(4*1024),
155+
)
156+
153157
if err != nil {
154158
return err
155159
}

simpledb/db.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ const MemStoreMaxSizeBytes uint64 = 1024 * 1024 * 1024 // 1gb
2525
const NumSSTablesToTriggerCompaction int = 10
2626
const DefaultCompactionMaxSizeBytes uint64 = 5 * 1024 * 1024 * 1024 // 5gb
2727
const DefaultCompactionInterval = 5 * time.Second
28+
const DefaultWriteBufferSizeBytes uint64 = 4 * 1024 * 1024 // 4Mb
29+
const DefaultReadBufferSizeBytes uint64 = 4 * 1024 * 1024 // 4Mb
2830

2931
var ErrNotFound = errors.New("ErrNotFound")
3032
var ErrNotOpenedYet = errors.New("database has not been opened yet, please call Open() first")
@@ -90,6 +92,9 @@ type DB struct {
9092
compactionTicker *time.Ticker
9193
compactionTickerStopChannel chan interface{}
9294
doneCompactionChannel chan bool
95+
96+
writeBufferSizeBytes uint64
97+
readBufferSizeBytes uint64
9398
}
9499

95100
func (db *DB) Open() error {
@@ -327,6 +332,8 @@ func NewSimpleDB(basePath string, extraOptions ...ExtraOption) (*DB, error) {
327332
NumSSTablesToTriggerCompaction,
328333
DefaultCompactionMaxSizeBytes,
329334
DefaultCompactionInterval,
335+
DefaultWriteBufferSizeBytes,
336+
DefaultReadBufferSizeBytes,
330337
}
331338

332339
for _, extraOption := range extraOptions {
@@ -364,6 +371,8 @@ func NewSimpleDB(basePath string, extraOptions ...ExtraOption) (*DB, error) {
364371
doneFlushChannel: doneFlushChan,
365372
compactionTickerStopChannel: compactionTimerStopChannel,
366373
doneCompactionChannel: doneCompactionChan,
374+
readBufferSizeBytes: extraOpts.readBufferSizeBytes,
375+
writeBufferSizeBytes: extraOpts.writeBufferSizeBytes,
367376
}, nil
368377
}
369378

@@ -377,6 +386,8 @@ type ExtraOptions struct {
377386
compactionFileThreshold int
378387
compactionMaxSizeBytes uint64
379388
compactionRunInterval time.Duration
389+
writeBufferSizeBytes uint64
390+
readBufferSizeBytes uint64
380391
}
381392

382393
type ExtraOption func(options *ExtraOptions)
@@ -433,3 +444,17 @@ func CompactionMaxSizeBytes(n uint64) ExtraOption {
433444
args.compactionMaxSizeBytes = n
434445
}
435446
}
447+
448+
// WriteBufferSizeBytes is the write buffer size for all buffer used by simple db.
449+
func WriteBufferSizeBytes(n uint64) ExtraOption {
450+
return func(args *ExtraOptions) {
451+
args.writeBufferSizeBytes = n
452+
}
453+
}
454+
455+
// ReadBufferSizeBytes is the read buffer size for all buffer used by simple db.
456+
func ReadBufferSizeBytes(n uint64) ExtraOption {
457+
return func(args *ExtraOptions) {
458+
args.readBufferSizeBytes = n
459+
}
460+
}

simpledb/flush.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,14 @@ package simpledb
22

33
import (
44
"fmt"
5-
"github.com/thomasjungblut/go-sstables/memstore"
6-
"github.com/thomasjungblut/go-sstables/sstables"
75
"log"
86
"os"
97
"path/filepath"
108
"sync/atomic"
119
"time"
10+
11+
"github.com/thomasjungblut/go-sstables/memstore"
12+
"github.com/thomasjungblut/go-sstables/sstables"
1213
)
1314

1415
func flushMemstoreContinuously(db *DB) {
@@ -50,6 +51,7 @@ func executeFlush(db *DB, flushAction memStoreFlushAction) error {
5051
err = memStoreToFlush.FlushWithTombstones(
5152
sstables.WriteBasePath(writePath),
5253
sstables.WithKeyComparator(db.cmp),
54+
sstables.WriteBufferSizeBytes(int(db.writeBufferSizeBytes)),
5355
sstables.BloomExpectedNumberOfElements(numElements))
5456
if err != nil {
5557
return err
@@ -65,6 +67,7 @@ func executeFlush(db *DB, flushAction memStoreFlushAction) error {
6567
reader, err := sstables.NewSSTableReader(
6668
sstables.ReadBasePath(writePath),
6769
sstables.ReadWithKeyComparator(db.cmp),
70+
sstables.ReadBufferSizeBytes(int(db.readBufferSizeBytes)),
6871
)
6972
if err != nil {
7073
return err

simpledb/flush_test.go

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,17 @@ package simpledb
22

33
import (
44
"fmt"
5-
"github.com/stretchr/testify/assert"
6-
"github.com/thomasjungblut/go-sstables/memstore"
7-
"github.com/thomasjungblut/go-sstables/skiplist"
85
"os"
96
"path/filepath"
107
"sort"
118
"strconv"
129
"strings"
1310
"sync"
1411
"testing"
12+
13+
"github.com/stretchr/testify/assert"
14+
"github.com/thomasjungblut/go-sstables/memstore"
15+
"github.com/thomasjungblut/go-sstables/skiplist"
1516
)
1617

1718
func TestFlushHappyPath(t *testing.T) {
@@ -33,11 +34,13 @@ func TestFlushHappyPath(t *testing.T) {
3334
}
3435

3536
db := &DB{
36-
cmp: skiplist.BytesComparator{},
37-
basePath: tmpDir,
38-
currentGeneration: 42,
39-
rwLock: &sync.RWMutex{},
40-
sstableManager: NewSSTableManager(skiplist.BytesComparator{}, &sync.RWMutex{}, tmpDir),
37+
cmp: skiplist.BytesComparator{},
38+
basePath: tmpDir,
39+
currentGeneration: 42,
40+
rwLock: &sync.RWMutex{},
41+
sstableManager: NewSSTableManager(skiplist.BytesComparator{}, &sync.RWMutex{}, tmpDir),
42+
readBufferSizeBytes: DefaultReadBufferSizeBytes,
43+
writeBufferSizeBytes: DefaultWriteBufferSizeBytes,
4144
}
4245
err = executeFlush(db, action)
4346
assert.Nil(t, err)

0 commit comments

Comments
 (0)