Skip to content

Commit acf9e49

Browse files
committed
AggSpill 1: Adding save intermediate to agg exec.
1 parent 0bddd6f commit acf9e49

34 files changed

+1293
-213
lines changed

pkg/container/batch/batch.go

Lines changed: 29 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -107,59 +107,8 @@ func (bat *Batch) Slice(from, to int) *Batch {
107107
}
108108

109109
func (bat *Batch) MarshalBinary() ([]byte, error) {
110-
// --------------------------------------------------------------------
111-
// | len | Zs... | len | Vecs... | len | Attrs... | len | AggInfos... |
112-
// --------------------------------------------------------------------
113110
var w bytes.Buffer
114-
115-
// row count.
116-
rl := int64(bat.rowCount)
117-
w.Write(types.EncodeInt64(&rl))
118-
119-
// Vecs
120-
l := int32(len(bat.Vecs))
121-
w.Write(types.EncodeInt32(&l))
122-
for i := 0; i < int(l); i++ {
123-
data, err := bat.Vecs[i].MarshalBinary()
124-
if err != nil {
125-
return nil, err
126-
}
127-
size := int32(len(data))
128-
w.Write(types.EncodeInt32(&size))
129-
w.Write(data)
130-
}
131-
132-
// Attrs
133-
l = int32(len(bat.Attrs))
134-
w.Write(types.EncodeInt32(&l))
135-
for i := 0; i < int(l); i++ {
136-
size := int32(len(bat.Attrs[i]))
137-
w.Write(types.EncodeInt32(&size))
138-
w.WriteString(bat.Attrs[i])
139-
}
140-
141-
// AggInfos
142-
aggInfos := make([][]byte, len(bat.Aggs))
143-
for i, exec := range bat.Aggs {
144-
data, err := aggexec.MarshalAggFuncExec(exec)
145-
if err != nil {
146-
return nil, err
147-
}
148-
aggInfos[i] = data
149-
}
150-
151-
l = int32(len(aggInfos))
152-
w.Write(types.EncodeInt32(&l))
153-
for i := 0; i < int(l); i++ {
154-
size := int32(len(aggInfos[i]))
155-
w.Write(types.EncodeInt32(&size))
156-
w.Write(aggInfos[i])
157-
}
158-
159-
w.Write(types.EncodeInt32(&bat.Recursive))
160-
w.Write(types.EncodeInt32(&bat.ShuffleIDX))
161-
162-
return w.Bytes(), nil
111+
return bat.MarshalBinaryWithBuffer(&w)
163112
}
164113

165114
func (bat *Batch) MarshalBinaryWithBuffer(w *bytes.Buffer) ([]byte, error) {
@@ -196,6 +145,20 @@ func (bat *Batch) MarshalBinaryWithBuffer(w *bytes.Buffer) ([]byte, error) {
196145
}
197146
}
198147

148+
// ExtraBuf1
149+
size := int32(len(bat.ExtraBuf1))
150+
w.Write(types.EncodeInt32(&size))
151+
if size > 0 {
152+
w.Write(bat.ExtraBuf1)
153+
}
154+
155+
// ExtraBuf2
156+
size = int32(len(bat.ExtraBuf2))
157+
w.Write(types.EncodeInt32(&size))
158+
if size > 0 {
159+
w.Write(bat.ExtraBuf2)
160+
}
161+
199162
// AggInfos
200163
aggInfos := make([][]byte, len(bat.Aggs))
201164
for i, exec := range bat.Aggs {
@@ -270,6 +233,20 @@ func (bat *Batch) UnmarshalBinaryWithAnyMp(data []byte, mp *mpool.MPool) (err er
270233
data = data[size:]
271234
}
272235

236+
// ExtraBuf1
237+
l = types.DecodeInt32(data[:4])
238+
data = data[4:]
239+
bat.ExtraBuf1 = nil
240+
bat.ExtraBuf1 = append(bat.ExtraBuf1, data[:l]...)
241+
data = data[l:]
242+
243+
// ExtraBuf2
244+
l = types.DecodeInt32(data[:4])
245+
data = data[4:]
246+
bat.ExtraBuf2 = nil
247+
bat.ExtraBuf2 = append(bat.ExtraBuf2, data[:l]...)
248+
data = data[l:]
249+
273250
l = types.DecodeInt32(data[:4])
274251
aggs := make([][]byte, l)
275252

pkg/container/batch/batch_test.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,10 @@ package batch
1616

1717
import (
1818
"bytes"
19-
"github.com/matrixorigin/matrixone/pkg/sql/colexec/aggexec"
2019
"testing"
2120

21+
"github.com/matrixorigin/matrixone/pkg/sql/colexec/aggexec"
22+
2223
"github.com/matrixorigin/matrixone/pkg/common/mpool"
2324
"github.com/matrixorigin/matrixone/pkg/container/types"
2425
"github.com/matrixorigin/matrixone/pkg/container/vector"
@@ -53,6 +54,10 @@ func TestBatchMarshalAndUnmarshal(t *testing.T) {
5354
rbat := new(Batch)
5455
err = rbat.UnmarshalBinary(data)
5556
require.NoError(t, err)
57+
58+
require.Equal(t, tc.bat.ExtraBuf1, rbat.ExtraBuf1)
59+
require.Equal(t, tc.bat.ExtraBuf2, rbat.ExtraBuf2)
60+
5661
for i, vec := range rbat.Vecs {
5762
require.Equal(t, vector.MustFixedColWithTypeCheck[int8](tc.bat.Vecs[i]), vector.MustFixedColWithTypeCheck[int8](vec))
5863
}
@@ -140,6 +145,9 @@ func newBatch(ts []types.Type, rows int) *Batch {
140145
}
141146
}
142147

148+
bat.ExtraBuf1 = []byte("extra buf 1")
149+
bat.ExtraBuf2 = []byte("extra buf 2")
150+
143151
aggexec.RegisterGroupConcatAgg(0, ",")
144152
agg0, _ := aggexec.MakeAgg(aggexec.NewSimpleAggMemoryManager(mp), 0, false, []types.Type{types.T_varchar.ToType()}...)
145153
bat.Aggs = []aggexec.AggFuncExec{agg0}

pkg/container/batch/types.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,10 +52,16 @@ type Batch struct {
5252
// Vecs col data
5353
Vecs []*vector.Vector
5454

55-
Aggs []aggexec.AggFuncExec
55+
// We really want to put all data through vectors, but, the
56+
// Aggs is so f**king insane, so keep it, and add two extra
57+
// buffers for sane persons to use.
58+
//
59+
// XXX MUST REMOVE THIS
60+
Aggs []aggexec.AggFuncExec
61+
ExtraBuf1 []byte
62+
ExtraBuf2 []byte
5663

5764
// row count of batch, to instead of old len(Zs).
5865
rowCount int
59-
60-
offHeap bool
66+
offHeap bool
6167
}

pkg/sql/colexec/aggexec/aggContext.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,13 @@ func (a *AggContext) getGroupContext(i int) AggGroupExecContext {
9696
return nil
9797
}
9898

99+
func (a *AggContext) getGroupContextBinaryMarshaller() []AggGroupExecContext {
100+
if !a.hasGroupContext {
101+
return nil
102+
}
103+
return a.groupContext
104+
}
105+
99106
func (a *AggContext) getGroupContextEncodings() [][]byte {
100107
if !a.hasGroupContext {
101108
return nil

pkg/sql/colexec/aggexec/aggFrame_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -75,9 +75,10 @@ type avgDemoCtx struct {
7575
count int64
7676
}
7777

78-
func (d *avgDemoCtx) Marshal() []byte { return types.EncodeInt64(&d.count) }
79-
func (d *avgDemoCtx) Unmarshal(b []byte) { d.count = types.DecodeInt64(b) }
80-
func (d *avgDemoCtx) Size() int64 { return 8 } // size of count
78+
func (d *avgDemoCtx) Marshal() []byte { return types.EncodeInt64(&d.count) }
79+
func (d *avgDemoCtx) MarshalBinary() ([]byte, error) { return d.Marshal(), nil }
80+
func (d *avgDemoCtx) Unmarshal(b []byte) { d.count = types.DecodeInt64(b) }
81+
func (d *avgDemoCtx) Size() int64 { return 8 } // size of count
8182
var _ AggGroupExecContext = &avgDemoCtx{}
8283

8384
func fromIdxListToNullList(start, end int, idxList []int) []bool {

pkg/sql/colexec/aggexec/aggMethod.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ type (
127127

128128
type AggCanMarshal interface {
129129
Marshal() []byte
130+
MarshalBinary() ([]byte, error)
130131
Unmarshal([]byte)
131132
Size() int64
132133
}

pkg/sql/colexec/aggexec/approx_count.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
package aggexec
1616

1717
import (
18+
"bytes"
19+
1820
hll "github.com/axiomhq/hyperloglog"
1921
"github.com/matrixorigin/matrixone/pkg/common/mpool"
2022
"github.com/matrixorigin/matrixone/pkg/container/types"
@@ -60,6 +62,18 @@ func (exec *approxCountFixedExec[T]) marshal() ([]byte, error) {
6062
return encoded.Marshal()
6163
}
6264

65+
func (exec *approxCountFixedExec[T]) SaveIntermediateResult(bucketIdx []int64, bucket int64, buf *bytes.Buffer) error {
66+
return marshalRetAndGroupsToBuffers(
67+
bucketIdx, bucket, buf,
68+
&exec.ret.optSplitResult, exec.groups)
69+
}
70+
71+
func (exec *approxCountFixedExec[T]) SaveIntermediateResultOfChunk(chunk int, buf *bytes.Buffer) error {
72+
return marshalChunkRetAndGroupsToBuffer(
73+
chunk, buf,
74+
&exec.ret.optSplitResult, exec.groups)
75+
}
76+
6377
func (exec *approxCountFixedExec[T]) unmarshal(_ *mpool.MPool, result, empties, groups [][]byte) error {
6478
err := exec.ret.unmarshalFromBytes(result, empties)
6579
if err != nil {
@@ -115,6 +129,17 @@ func (exec *approxCountVarExec) marshal() ([]byte, error) {
115129
return encoded.Marshal()
116130
}
117131

132+
func (exec *approxCountVarExec) SaveIntermediateResult(bucketIdx []int64, bucket int64, buf *bytes.Buffer) error {
133+
return marshalRetAndGroupsToBuffers(
134+
bucketIdx, bucket, buf,
135+
&exec.ret.optSplitResult, exec.groups)
136+
}
137+
138+
func (exec *approxCountVarExec) SaveIntermediateResultOfChunk(chunk int, buf *bytes.Buffer) error {
139+
return marshalChunkRetAndGroupsToBuffer(chunk, buf,
140+
&exec.ret.optSplitResult, exec.groups)
141+
}
142+
118143
func (exec *approxCountVarExec) unmarshal(_ *mpool.MPool, result, empties, groups [][]byte) error {
119144
err := exec.ret.unmarshalFromBytes(result, empties)
120145
if err != nil {

pkg/sql/colexec/aggexec/concat.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
package aggexec
1616

1717
import (
18+
"bytes"
1819
"fmt"
1920
"math"
2021

@@ -73,6 +74,19 @@ func (exec *groupConcatExec) unmarshal(_ *mpool.MPool, result, empties, groups [
7374
return exec.ret.unmarshalFromBytes(result, empties)
7475
}
7576

77+
func (exec *groupConcatExec) SaveIntermediateResult(bucketIdx []int64, bucket int64, buf *bytes.Buffer) error {
78+
return marshalRetAndGroupsAndDistinctHashToBuffers[dummyBinaryMarshaler](
79+
bucketIdx, bucket, buf,
80+
&exec.ret.optSplitResult, nil,
81+
exec.IsDistinct(), &exec.distinctHash)
82+
}
83+
84+
func (exec *groupConcatExec) SaveIntermediateResultOfChunk(chunk int, buf *bytes.Buffer) error {
85+
return marshalChunkToBuffer[dummyBinaryMarshaler](chunk, buf,
86+
&exec.ret.optSplitResult, nil,
87+
exec.IsDistinct(), &exec.distinctHash)
88+
}
89+
7690
func GroupConcatReturnType(args []types.Type) types.Type {
7791
for _, p := range args {
7892
if p.Oid == types.T_binary || p.Oid == types.T_varbinary || p.Oid == types.T_blob {

pkg/sql/colexec/aggexec/count.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
package aggexec
1616

1717
import (
18+
"bytes"
19+
1820
"github.com/matrixorigin/matrixone/pkg/common/mpool"
1921
"github.com/matrixorigin/matrixone/pkg/container/types"
2022
"github.com/matrixorigin/matrixone/pkg/container/vector"
@@ -63,6 +65,20 @@ func (exec *countColumnExec) marshal() ([]byte, error) {
6365
return encoded.Marshal()
6466
}
6567

68+
func (exec *countColumnExec) SaveIntermediateResult(bucketIdx []int64, bucket int64, buf *bytes.Buffer) error {
69+
return marshalRetAndGroupsAndDistinctHashToBuffers[dummyBinaryMarshaler](
70+
bucketIdx, bucket, buf,
71+
&exec.ret.optSplitResult, nil,
72+
exec.IsDistinct(), &exec.distinctHash)
73+
}
74+
75+
func (exec *countColumnExec) SaveIntermediateResultOfChunk(chunk int, buf *bytes.Buffer) error {
76+
return marshalChunkToBuffer[dummyBinaryMarshaler](
77+
chunk, buf,
78+
&exec.ret.optSplitResult, nil,
79+
exec.IsDistinct(), &exec.distinctHash)
80+
}
81+
6682
func (exec *countColumnExec) unmarshal(_ *mpool.MPool, result, empties, groups [][]byte) error {
6783
if exec.IsDistinct() {
6884
if len(groups) > 0 {
@@ -301,6 +317,17 @@ func (exec *countStarExec) marshal() ([]byte, error) {
301317
return encoded.Marshal()
302318
}
303319

320+
func (exec *countStarExec) SaveIntermediateResult(bucketIdx []int64, bucket int64, buf *bytes.Buffer) error {
321+
return marshalRetAndGroupsToBuffers[dummyBinaryMarshaler](
322+
bucketIdx, bucket, buf,
323+
&exec.ret.optSplitResult, nil)
324+
}
325+
func (exec *countStarExec) SaveIntermediateResultOfChunk(chunk int, buf *bytes.Buffer) error {
326+
return marshalChunkRetAndGroupsToBuffer[dummyBinaryMarshaler](
327+
chunk, buf,
328+
&exec.ret.optSplitResult, nil)
329+
}
330+
304331
func (exec *countStarExec) unmarshal(_ *mpool.MPool, result, empties, _ [][]byte) error {
305332
return exec.ret.unmarshalFromBytes(result, empties)
306333
}

pkg/sql/colexec/aggexec/distinct.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,30 @@ func (d *distinctHash) marshal() ([]byte, error) {
202202
return buf.Bytes(), nil
203203
}
204204

205+
func (d *distinctHash) marshalToBuffers(bucketIdx []int64, bucket int64, buf *bytes.Buffer) error {
206+
if len(d.maps) != len(bucketIdx) {
207+
return moerr.NewInternalErrorNoCtx("distinct hash: the length of the bucket index is not equal to the length of the maps")
208+
}
209+
210+
for i := range d.maps {
211+
if bucketIdx[i] == bucket {
212+
if _, err := d.maps[i].WriteTo(buf); err != nil {
213+
return err
214+
}
215+
}
216+
}
217+
return nil
218+
}
219+
220+
func (d *distinctHash) marshalChunkToBuffer(start, cnt int, buf *bytes.Buffer) error {
221+
for i := 0; i < cnt; i++ {
222+
if _, err := d.maps[start+i].WriteTo(buf); err != nil {
223+
return err
224+
}
225+
}
226+
return nil
227+
}
228+
205229
func (d *distinctHash) unmarshal(data []byte) error {
206230
if len(data) == 0 {
207231
return nil

0 commit comments

Comments
 (0)