Skip to content

Commit 5f58c1a

Browse files
committed
chore: upgrade kernel record struct
Signed-off-by: Young Xu <[email protected]>
1 parent 17d0cc6 commit 5f58c1a

File tree

3 files changed

+151
-2
lines changed

3 files changed

+151
-2
lines changed

lib/record/column_util.go

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
package record
1616

1717
import (
18+
"fmt"
1819
"unsafe"
1920
)
2021

@@ -226,3 +227,116 @@ func (cv *ColVal) Marshal(buf []byte) ([]byte, error) {
226227
buf = AppendUint32Slice(buf, cv.Offset)
227228
return buf, nil
228229
}
230+
231+
func resize(dst []ColVal, segs int) []ColVal {
232+
if cap(dst) < segs {
233+
delta := segs - cap(dst)
234+
dst = dst[:cap(dst)]
235+
dst = append(dst, make([]ColVal, delta)...)
236+
}
237+
dst = dst[:segs]
238+
return dst
239+
}
240+
241+
func (cv *ColVal) Split(dst []ColVal, maxRows int, refType int) []ColVal {
242+
if maxRows <= 0 {
243+
panic(fmt.Sprintf("maxRows is %v, must grater than 0", maxRows))
244+
}
245+
segs := (cv.Len + maxRows - 1) / maxRows
246+
dst = resize(dst, segs)
247+
248+
start, offset, validCount := 0, 0, 0
249+
for i := 0; i < segs; i++ {
250+
end := start + maxRows
251+
if end > cv.Len {
252+
end = cv.Len
253+
}
254+
offset, validCount = dst[i].sliceValAndOffset(cv, start, end, refType, offset)
255+
dst[i].sliceBitMap(cv, start, end)
256+
dst[i].Len = end - start
257+
dst[i].NilCount = dst[i].Len - validCount
258+
start = end
259+
}
260+
261+
return dst
262+
}
263+
264+
func (cv *ColVal) Length() int {
265+
return cv.Len
266+
}
267+
268+
func (cv *ColVal) ValidCount(start, end int) int {
269+
validCount := 0
270+
if cv.Length()+cv.NilCount == 0 || cv.Len == cv.NilCount {
271+
return validCount
272+
}
273+
if cv.NilCount == 0 {
274+
return end - start
275+
}
276+
277+
end += cv.BitMapOffset
278+
start += cv.BitMapOffset
279+
for i := start; i < end; i++ {
280+
if cv.Bitmap[i>>3]&BitMask[i&0x07] != 0 {
281+
validCount++
282+
}
283+
}
284+
return validCount
285+
}
286+
287+
func (cv *ColVal) sliceValAndOffset(srcCol *ColVal, start, end, colType, valOffset int) (offset int, valueValidCount int) {
288+
var validCount, endOffset int
289+
if colType == FieldTypeInt {
290+
validCount = srcCol.ValidCount(start, end)
291+
endOffset = valOffset + Int64SizeBytes*validCount
292+
cv.Val = srcCol.Val[valOffset:endOffset]
293+
} else if colType == FieldTypeFloat {
294+
validCount = srcCol.ValidCount(start, end)
295+
endOffset = valOffset + Float64SizeBytes*validCount
296+
cv.Val = srcCol.Val[valOffset:endOffset]
297+
} else if colType == FieldTypeString {
298+
offsetStart := srcCol.Offset[start]
299+
if end == srcCol.Len {
300+
endOffset = len(srcCol.Val)
301+
} else {
302+
endOffset = int(srcCol.Offset[end])
303+
}
304+
cv.Val = srcCol.Val[offsetStart:endOffset]
305+
cv.reserveValOffset(end - start)
306+
for index, pos := 0, start; pos < end; pos++ {
307+
cv.Offset[index] = srcCol.Offset[pos] - offsetStart
308+
index++
309+
bitOffset := srcCol.BitMapOffset + pos
310+
if srcCol.Bitmap[bitOffset>>3]&BitMask[bitOffset&0x07] != 0 {
311+
validCount++
312+
}
313+
}
314+
} else if colType == FieldTypeBoolean {
315+
validCount = srcCol.ValidCount(start, end)
316+
endOffset = valOffset + BooleanSizeBytes*validCount
317+
cv.Val = srcCol.Val[valOffset:endOffset]
318+
} else {
319+
panic("error type")
320+
}
321+
return endOffset, validCount
322+
}
323+
324+
func (cv *ColVal) reserveValOffset(size int) {
325+
if cap(cv.Offset) < size {
326+
cv.Offset = make([]uint32, size)
327+
}
328+
cv.Offset = cv.Offset[:size]
329+
}
330+
331+
func (cv *ColVal) sliceBitMap(srcCol *ColVal, start, end int) {
332+
s := srcCol.BitMapOffset + start
333+
offset := s % 8
334+
s = s / 8
335+
e := (srcCol.BitMapOffset + end) / 8
336+
if (srcCol.BitMapOffset+end)%8 != 0 {
337+
e++
338+
}
339+
340+
cv.Bitmap = srcCol.Bitmap[s:e]
341+
cv.BitMapOffset = offset
342+
}

lib/record/field.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,12 @@ func (f *Field) String() string {
5454

5555
type Schemas []Field
5656

57+
func (sh Schemas) Len() int { return len(sh) }
58+
59+
func (sh Schemas) Swap(i, j int) { sh[i], sh[j] = sh[j], sh[i] }
60+
61+
func (sh Schemas) Less(i, j int) bool { return sh[i].Name < sh[j].Name }
62+
5763
func (sh *Schemas) String() string {
5864
sb := strings.Builder{}
5965
for _, f := range *sh {

lib/record/record.go

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,6 @@ func (rec *Record) AppendTime(time ...int64) {
181181

182182
func (rec *Record) Marshal(buf []byte) ([]byte, error) {
183183
var err error
184-
// Schema
185184
buf = AppendUint32(buf, uint32(len(rec.Schema)))
186185
for i := 0; i < len(rec.Schema); i++ {
187186
buf = AppendUint32(buf, uint32(rec.Schema[i].Size()))
@@ -191,7 +190,6 @@ func (rec *Record) Marshal(buf []byte) ([]byte, error) {
191190
}
192191
}
193192

194-
// ColVal
195193
buf = AppendUint32(buf, uint32(len(rec.ColVals)))
196194
for i := 0; i < len(rec.ColVals); i++ {
197195
buf = AppendUint32(buf, uint32(rec.ColVals[i].Size()))
@@ -202,3 +200,34 @@ func (rec *Record) Marshal(buf []byte) ([]byte, error) {
202200
}
203201
return buf, nil
204202
}
203+
204+
func (rec *Record) Split(dst []Record, maxRows int) []Record {
205+
rows := rec.RowNums()
206+
segs := (rows + maxRows - 1) / maxRows
207+
if cap(dst) < segs {
208+
delta := segs - cap(dst)
209+
dst = dst[:cap(dst)]
210+
dst = append(dst, make([]Record, delta)...)
211+
}
212+
dst = dst[:segs]
213+
214+
if segs == 1 {
215+
dst[0] = *rec
216+
return dst
217+
}
218+
219+
for i := range dst {
220+
dst[i].Schema = append(dst[i].Schema[:0], rec.Schema...)
221+
dst[i].ColVals = resize(dst[i].ColVals, rec.Schema.Len())
222+
}
223+
224+
for i := range rec.Schema {
225+
col := rec.Column(i)
226+
dstCol := col.Split(nil, maxRows, rec.Schema[i].Type)
227+
for j := range dstCol {
228+
dst[j].ColVals[i] = dstCol[j]
229+
}
230+
}
231+
232+
return dst
233+
}

0 commit comments

Comments
 (0)