Skip to content

Commit 59e1a0a

Browse files
author
Michael
authored
Move row storage logic to separate package. (#68)
Row data message can be a simple byte slice.
1 parent b6aa471 commit 59e1a0a

File tree

7 files changed

+320
-136
lines changed

7 files changed

+320
-136
lines changed

connection.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -355,12 +355,14 @@ func (v *connection) initializeSession() error {
355355
return err
356356
}
357357

358-
if len(result.Columns()) != 1 && result.Columns()[1] != "now" || len(result.resultData) != 1 {
358+
firstRow := result.resultData.Peek()
359+
360+
if len(result.Columns()) != 1 && result.Columns()[1] != "now" || firstRow == nil {
359361
return fmt.Errorf("unable to initialize session; functionality may be unreliable")
360362
}
361363

362364
// Peek into the results manually.
363-
colData := result.resultData[0].Columns()
365+
colData := firstRow.Columns()
364366
str := string(colData.Chunk())
365367

366368
if len(str) < 23 {

msgs/bedatarowmsg.go

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -33,15 +33,12 @@ package msgs
3333
// THE SOFTWARE.
3434

3535
import (
36-
"bytes"
3736
"encoding/binary"
3837
"fmt"
3938
)
4039

4140
// BEDataRowMsg docs
42-
type BEDataRowMsg struct {
43-
rowBuffer *bytes.Buffer
44-
}
41+
type BEDataRowMsg []byte
4542

4643
// ColumnExtractor pulls columns out of a row
4744
type ColumnExtractor struct {
@@ -64,24 +61,22 @@ func (c *ColumnExtractor) Chunk() []byte {
6461

6562
// CreateFromMsgBody docs
6663
func (b *BEDataRowMsg) CreateFromMsgBody(buf *msgBuffer) (BackEndMsg, error) {
67-
newBuf := bytes.NewBuffer(buf.buf.Bytes())
64+
res := BEDataRowMsg(buf.buf.Bytes())
6865
buf.buf.Reset()
69-
res := &BEDataRowMsg{rowBuffer: newBuf}
70-
return res, nil
66+
return &res, nil
7167
}
7268

7369
// Columns provides an extractor to begin reading columns
7470
func (b *BEDataRowMsg) Columns() ColumnExtractor {
75-
rowData := b.rowBuffer.Bytes()
7671
return ColumnExtractor{
77-
NumCols: binary.BigEndian.Uint16(rowData[0:2]),
78-
data: rowData[2:],
72+
NumCols: binary.BigEndian.Uint16((*b)[0:2]),
73+
data: (*b)[2:],
7974
}
8075
}
8176

8277
// RevertToBytes dumps the message back into plain bytes
8378
func (b *BEDataRowMsg) RevertToBytes() []byte {
84-
return b.rowBuffer.Bytes()
79+
return *b
8580
}
8681

8782
func (b *BEDataRowMsg) String() string {

rowcache/file.go

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
package rowcache
2+
3+
import (
4+
"bufio"
5+
"encoding/binary"
6+
"io"
7+
"io/ioutil"
8+
"os"
9+
10+
"github.com/vertica/vertica-sql-go/msgs"
11+
)
12+
13+
// FileCache stores rows from the wire and puts excess into a temporary file
14+
type FileCache struct {
15+
maxInMemory int
16+
rowCount int
17+
readIdx int
18+
resultData []*msgs.BEDataRowMsg
19+
file *os.File
20+
rwbuffer *bufio.ReadWriter
21+
scratch [512]byte
22+
}
23+
24+
// NewFileCache returns a file cache with a set row limit
25+
func NewFileCache(rowLimit int) (*FileCache, error) {
26+
file, err := ioutil.TempFile("", ".vertica-sql-go.*.dat")
27+
if err != nil {
28+
return nil, err
29+
}
30+
return &FileCache{
31+
maxInMemory: rowLimit,
32+
resultData: make([]*msgs.BEDataRowMsg, 0, rowLimit),
33+
file: file,
34+
rwbuffer: bufio.NewReadWriter(bufio.NewReader(file), bufio.NewWriterSize(file, 1<<16)),
35+
}, nil
36+
}
37+
38+
func (f *FileCache) writeCached(msg *msgs.BEDataRowMsg) {
39+
sizeBuf := f.scratch[:4]
40+
binary.LittleEndian.PutUint32(sizeBuf, uint32(len(*msg)))
41+
f.rwbuffer.Write(sizeBuf)
42+
f.rwbuffer.Write(*msg)
43+
}
44+
45+
// AddRow adds a row to the cache
46+
func (f *FileCache) AddRow(msg *msgs.BEDataRowMsg) {
47+
f.rowCount++
48+
if len(f.resultData) >= f.maxInMemory {
49+
f.writeCached(msg)
50+
return
51+
}
52+
f.resultData = append(f.resultData, msg)
53+
}
54+
55+
// Finalize signals the end of rows from the wire and readies the cache for reading
56+
func (f *FileCache) Finalize() error {
57+
var err error
58+
name := f.file.Name()
59+
f.rwbuffer.Flush()
60+
f.file.Close()
61+
f.file, err = os.OpenFile(name, os.O_RDONLY|os.O_EXCL, 0600)
62+
if err != nil {
63+
return err
64+
}
65+
f.rwbuffer = bufio.NewReadWriter(bufio.NewReader(f.file), bufio.NewWriter(f.file))
66+
return err
67+
}
68+
69+
// GetRow pulls a row message out of the cache, returning nil of none remain
70+
func (f *FileCache) GetRow() *msgs.BEDataRowMsg {
71+
if f.readIdx >= len(f.resultData) {
72+
if !f.reloadFromCache() {
73+
return nil
74+
}
75+
}
76+
result := f.resultData[f.readIdx]
77+
f.readIdx++
78+
return result
79+
}
80+
81+
// Peek returns the next row without changing the state
82+
func (f *FileCache) Peek() *msgs.BEDataRowMsg {
83+
if len(f.resultData) == 0 {
84+
return nil
85+
}
86+
return f.resultData[0]
87+
}
88+
89+
// Close clears resources associated with the cache, deleting the temp file
90+
func (f *FileCache) Close() error {
91+
name := f.file.Name()
92+
f.rwbuffer.Flush()
93+
f.file.Close()
94+
return os.Remove(name)
95+
}
96+
97+
func (f *FileCache) reloadFromCache() bool {
98+
hadData := false
99+
100+
f.readIdx = 0
101+
indexCount := 0
102+
103+
for {
104+
sizeBuf := f.scratch[:4]
105+
106+
if _, err := io.ReadFull(f.rwbuffer, sizeBuf); err != nil {
107+
if err == io.EOF {
108+
if indexCount == 0 {
109+
return false
110+
}
111+
f.resultData = f.resultData[0:indexCount]
112+
return true
113+
}
114+
return false
115+
}
116+
117+
rowDataSize := binary.LittleEndian.Uint32(sizeBuf)
118+
119+
var rowBuf []byte
120+
rowBytes := f.scratch[4:]
121+
if rowDataSize <= uint32(len(rowBytes)) {
122+
rowBuf = rowBytes[:rowDataSize]
123+
} else {
124+
rowBuf = make([]byte, rowDataSize)
125+
}
126+
if _, err := io.ReadFull(f.rwbuffer, rowBuf); err != nil {
127+
return false
128+
}
129+
130+
msgBuf := msgs.NewMsgBufferFromBytes(rowBuf)
131+
132+
drm := &msgs.BEDataRowMsg{}
133+
134+
msg, _ := drm.CreateFromMsgBody(msgBuf)
135+
136+
f.resultData[indexCount] = msg.(*msgs.BEDataRowMsg)
137+
indexCount++
138+
139+
hadData = true
140+
141+
// If we've reached the original capacity of the slice, we're done.
142+
if indexCount == len(f.resultData) {
143+
break
144+
}
145+
}
146+
147+
return hadData
148+
}

rowcache/file_test.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package rowcache
2+
3+
import (
4+
"testing"
5+
6+
"github.com/vertica/vertica-sql-go/msgs"
7+
)
8+
9+
func TestFileCache(t *testing.T) {
10+
t.Run("without enough rows to switch to a file", func(t *testing.T) {
11+
rowCount := 100
12+
cache, err := NewFileCache(1000)
13+
if err != nil {
14+
t.Fatalf("Unable to create temp file")
15+
}
16+
for i := 0; i < rowCount; i++ {
17+
row := msgs.BEDataRowMsg([]byte("testRow"))
18+
cache.AddRow(&row)
19+
}
20+
cache.Finalize()
21+
if cache.Peek() == nil {
22+
t.Error("Expected a row with Peek")
23+
}
24+
for i := 0; i < rowCount; i++ {
25+
row := cache.GetRow()
26+
if row == nil {
27+
t.Errorf("Ran out of rows at %d", i)
28+
}
29+
}
30+
cache.Close()
31+
})
32+
t.Run("with file writes", func(t *testing.T) {
33+
rowCount := 10000
34+
cache, err := NewFileCache(100)
35+
if err != nil {
36+
t.Fatalf("Unable to create temp file")
37+
}
38+
for i := 0; i < rowCount; i++ {
39+
row := msgs.BEDataRowMsg([]byte("testRow"))
40+
cache.AddRow(&row)
41+
}
42+
cache.Finalize()
43+
if cache.Peek() == nil {
44+
t.Error("Expected a row with Peek")
45+
}
46+
for i := 0; i < rowCount; i++ {
47+
row := cache.GetRow()
48+
if row == nil {
49+
t.Errorf("Ran out of rows at %d", i)
50+
}
51+
}
52+
cache.Close()
53+
})
54+
55+
}

rowcache/memory.go

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package rowcache
2+
3+
import (
4+
"github.com/vertica/vertica-sql-go/msgs"
5+
)
6+
7+
// MemoryCache is a simple in memory row store
8+
type MemoryCache struct {
9+
resultData []*msgs.BEDataRowMsg
10+
readIdx int
11+
}
12+
13+
// NewMemoryCache initializes the memory store with a given size, but it can continue
14+
// to grow
15+
func NewMemoryCache(size int) *MemoryCache {
16+
return &MemoryCache{
17+
resultData: make([]*msgs.BEDataRowMsg, 0, size),
18+
}
19+
}
20+
21+
// AddRow adds a row to the store
22+
func (m *MemoryCache) AddRow(msg *msgs.BEDataRowMsg) {
23+
m.resultData = append(m.resultData, msg)
24+
}
25+
26+
// Finalize signals the end of new rows, a noop for the memory cache
27+
func (m *MemoryCache) Finalize() error {
28+
return nil
29+
}
30+
31+
// GetRow pulls a row from the cache, returning nil if none remain
32+
func (m *MemoryCache) GetRow() *msgs.BEDataRowMsg {
33+
if m.readIdx >= len(m.resultData) {
34+
return nil
35+
}
36+
result := m.resultData[m.readIdx]
37+
m.readIdx++
38+
return result
39+
}
40+
41+
// Peek returns the next row without changing the state
42+
func (m *MemoryCache) Peek() *msgs.BEDataRowMsg {
43+
if len(m.resultData) == 0 {
44+
return nil
45+
}
46+
return m.resultData[0]
47+
}
48+
49+
// Close provides an opportunity to free resources, a noop for the memory cache
50+
func (m *MemoryCache) Close() error {
51+
return nil
52+
}

rowcache/memory_test.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package rowcache
2+
3+
import (
4+
"testing"
5+
6+
"github.com/vertica/vertica-sql-go/msgs"
7+
)
8+
9+
func TestMemoryCache(t *testing.T) {
10+
rowCount := 100
11+
cache := NewMemoryCache(16)
12+
for i := 0; i < rowCount; i++ {
13+
row := msgs.BEDataRowMsg([]byte("testRow"))
14+
cache.AddRow(&row)
15+
}
16+
cache.Finalize()
17+
if cache.Peek() == nil {
18+
t.Error("Expected a row with Peek")
19+
}
20+
for i := 0; i < rowCount; i++ {
21+
row := cache.GetRow()
22+
if row == nil {
23+
t.Errorf("Ran out of rows at %d", i)
24+
}
25+
}
26+
cache.Close()
27+
}

0 commit comments

Comments
 (0)