-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathrecordbuf.go
More file actions
242 lines (195 loc) · 5.23 KB
/
recordbuf.go
File metadata and controls
242 lines (195 loc) · 5.23 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
package pkg
import (
"encoding/binary"
"io"
)
type WriteColumnSet struct {
data []byte
subColumns []*WriteColumnSet
}
func (s *WriteColumnSet) TotalCount() uint {
c := uint(0)
for _, column := range s.subColumns {
c += column.TotalCount()
}
return c + 1
}
func (s *WriteColumnSet) AddSubColumn() *WriteColumnSet {
s.subColumns = append(s.subColumns, &WriteColumnSet{})
return s.subColumns[len(s.subColumns)-1]
}
func (s *WriteColumnSet) SetBits(b *BitsWriter) {
// Close BitWriter to make sure all pending bits are written.
b.Close()
s.data = b.Bytes()
// We got the data. Prepare the buffer for next writing cycle.
b.Reset()
}
func (s *WriteColumnSet) SetBytes(b *BytesWriter) {
s.data = b.Bytes()
// We got the data. Prepare the buffer for next writing cycle.
b.Reset()
}
func (s *WriteColumnSet) WriteSizesTo(buf *BitsWriter) {
// Write data size
buf.WriteUvarintCompact(uint64(len(s.data)))
if len(s.data) == 0 {
// If column is empty, subcolumns must be empty too.
return
}
// Recursively write subcolumns
for _, subColumn := range s.subColumns {
subColumn.WriteSizesTo(buf)
}
}
func (s *WriteColumnSet) WriteDataTo(buf io.Writer) error {
// Write data
if _, err := buf.Write(s.data); err != nil {
return err
}
if len(s.data) == 0 {
// If column is empty, subcolumns must be empty too.
return nil
}
// Recursively write subcolumn data
for _, subColumn := range s.subColumns {
if err := subColumn.WriteDataTo(buf); err != nil {
return err
}
}
return nil
}
func (s *WriteColumnSet) PrintSchema(indent int) {
//fmt.Printf("%s%d\n", strings.Repeat("-", indent), len(s.subColumns))
//for _, subColumn := range s.subColumns {
// subColumn.PrintSchema(indent + 1)
//}
}
// At returns the subcolumn at specified index.
func (s *WriteColumnSet) At(i int) *WriteColumnSet {
return s.subColumns[i]
}
type WriteBufs struct {
Columns WriteColumnSet
tempBuf BitsWriter
bytes []byte
}
func (w *WriteBufs) WriteTo(buf io.Writer) error {
w.Columns.PrintSchema(0)
// Collect all column data sizes and counts into tempBuf
w.tempBuf.Reset()
w.Columns.WriteSizesTo(&w.tempBuf)
w.tempBuf.Close()
// Write the size of tempBuf to buf
bufSize := uint64(len(w.tempBuf.Bytes()))
w.bytes = w.bytes[:0]
w.bytes = binary.AppendUvarint(w.bytes, bufSize)
if _, err := buf.Write(w.bytes); err != nil {
return err
}
// Write tempBuf content to buf
if _, err := buf.Write(w.tempBuf.Bytes()); err != nil {
return err
}
// Write column data to buf
return w.Columns.WriteDataTo(buf)
}
type ReadableColumn struct {
data []byte
}
func (c *ReadableColumn) Data() []byte {
return c.data
}
// BorrowData returns the data and sets the internal data to nil.
// This allows to avoid copying the data if the caller wants to take
// exclusive ownership.
func (c *ReadableColumn) BorrowData() []byte {
d := c.data
c.data = nil
return d
}
type ReadColumnSet struct {
column ReadableColumn
subColumns []*ReadColumnSet
}
func (s *ReadColumnSet) Column() *ReadableColumn {
return &s.column
}
func (s *ReadColumnSet) AddSubColumn() *ReadColumnSet {
s.subColumns = append(s.subColumns, &ReadColumnSet{})
return s.subColumns[len(s.subColumns)-1]
}
func (s *ReadColumnSet) SubColumnLen() int {
return len(s.subColumns)
}
// ReadSizesFrom reads sizes of the column and its subcolumns from buf.
// It will honor the readLimit to avoid reading too much data and will
// decrease the readLimit by the size of data that is read.
func (s *ReadColumnSet) ReadSizesFrom(buf *BitsReader, readLimit *uint64) error {
// Read data size
dataSize := buf.ReadUvarintCompact()
if dataSize > *readLimit {
return ErrColumnSizeLimitExceeded
}
*readLimit -= dataSize
s.column.data = EnsureLen(s.column.data, int(dataSize))
if dataSize == 0 {
// If column is empty, subcolumns must be empty too.
for i := 0; i < len(s.subColumns); i++ {
s.subColumns[i].ResetData()
}
return nil
}
// Recursively read subcolumns
for i := 0; i < len(s.subColumns); i++ {
if err := s.subColumns[i].ReadSizesFrom(buf, readLimit); err != nil {
return err
}
}
return nil
}
func (s *ReadColumnSet) ReadDataFrom(buf ByteAndBlockReader) error {
// Read data
if _, err := io.ReadFull(buf, s.column.data); err != nil {
return err
}
// Recursively read subcolumn data
for _, subColumn := range s.subColumns {
if err := subColumn.ReadDataFrom(buf); err != nil {
return err
}
}
return nil
}
func (s *ReadColumnSet) ResetData() {
s.column.data = nil
for i := range s.subColumns {
s.subColumns[i].ResetData()
}
}
type ReadBufs struct {
Columns ReadColumnSet
tempBuf BitsReader
tempBufBytes []byte
readLimit uint64
}
func (s *ReadBufs) ReadFrom(buf ByteAndBlockReader, readLimit uint64) error {
bufSize, err := binary.ReadUvarint(buf)
if err != nil {
return err
}
if bufSize > readLimit {
return ErrTotalColumnSizeLimitExceeded
}
s.tempBufBytes = EnsureLen(s.tempBufBytes, int(bufSize))
if _, err := io.ReadFull(buf, s.tempBufBytes); err != nil {
return err
}
s.tempBuf.Reset(s.tempBufBytes)
// Keep track of remaining read limit for column sizes and data
s.readLimit = readLimit - bufSize
if err := s.Columns.ReadSizesFrom(&s.tempBuf, &s.readLimit); err != nil {
return err
}
return s.Columns.ReadDataFrom(buf)
}