Skip to content

Commit 0bddd6f

Browse files
authored
Adding agg_spill_mem parameter. (#22623)
### **User description** ## What type of PR is this? - [ ] API-change - [ ] BUG - [ ] Improvement - [ ] Documentation - [ ] Feature - [ ] Test and CI - [x] Code Refactoring ## Which issue(s) this PR fixes: issue #3433 ## What this PR does / why we need it: Try to add a spill mem session var to config spill threshold. right now it is NOT auto tuned, but this can be done at either plan time (we know global resource) or run time (we know real time resource). But later. ___ ### **PR Type** Enhancement ___ ### **Description** - Add `agg_spill_mem` session variable to configure aggregate spill threshold - Propagate `agg_spill_mem` from query builder through plan nodes to runtime operators - Define memory size constants (KiB, MiB, GiB, TiB, PiB) in common package - Update protobuf definitions to include `spill_mem` field in Group and Node messages ___ ### Diagram Walkthrough ```mermaid flowchart LR A["Session Variable: agg_spill_mem"] --> B["QueryBuilder"] B --> C["Plan Nodes"] C --> D["Protobuf Messages"] D --> E["Runtime Operators"] E --> F["Group Operator"] ``` <details> <summary><h3> File Walkthrough</h3></summary> <table><thead><tr><th></th><th align="left">Relevant files</th></tr></thead><tbody><tr><td><strong>Configuration changes</strong></td><td><details><summary>1 files</summary><table> <tr> <td><strong>variables.go</strong><dd><code>Register new `agg_spill_mem` session variable with default 1GiB</code></dd></td> <td><a href="https://github.com/matrixorigin/matrixone/pull/22623/files#diff-07649adc637b0268b12ec915ec819f66db3a4e4375bfc77291116bfa06c4432f">+9/-0</a>&nbsp; &nbsp; &nbsp; </td> </tr> </table></details></td></tr><tr><td><strong>Enhancement</strong></td><td><details><summary>14 files</summary><table> <tr> <td><strong>const.go</strong><dd><code>Add memory size constants (KiB through PiB)</code>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; </dd></td> <td><a href="https://github.com/matrixorigin/matrixone/pull/22623/files#diff-c9fab2e2ba5d60f621f961e17a9e2bc28c84c2aa793b9849af0320188a4121ed">+23/-0</a>&nbsp; &nbsp; </td> </tr> <tr> <td><strong>types.go</strong><dd><code>Add `aggSpillMem` field to QueryBuilder struct</code>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; </dd></td> <td><a href="https://github.com/matrixorigin/matrixone/pull/22623/files#diff-fd44fd4ef13ca8bef0473460d6b4298a1beb5cd259562bd013e35433af2b80fb">+3/-0</a>&nbsp; &nbsp; &nbsp; </td> </tr> <tr> <td><strong>query_builder.go</strong><dd><code>Initialize `aggSpillMem` from session variable in QueryBuilder</code></dd></td> <td><a href="https://github.com/matrixorigin/matrixone/pull/22623/files#diff-144e02a38da50867dc021b9254d10a5fd131671cca344c2323337ba2141440f1">+29/-0</a>&nbsp; &nbsp; </td> </tr> <tr> <td><strong>build_dml_util.go</strong><dd><code>Set `SpillMem` on aggregate nodes in DML operations</code>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; </dd></td> <td><a href="https://github.com/matrixorigin/matrixone/pull/22623/files#diff-095fb233d51021791cb24454839b013236680bbc6bbc22e0d2f6741ac8fe7dff">+3/-0</a>&nbsp; &nbsp; &nbsp; </td> </tr> <tr> <td><strong>distinct_agg.go</strong><dd><code>Set `SpillMem` on aggregate nodes for distinct optimization</code></dd></td> <td><a href="https://github.com/matrixorigin/matrixone/pull/22623/files#diff-e00897e8cc6b6ff631988a32501af83198e5eece46eedf0b0e0e9771891b2613">+1/-0</a>&nbsp; &nbsp; &nbsp; </td> </tr> <tr> <td><strong>agg_pushdown_pullup.go</strong><dd><code>Set `SpillMem` on aggregate nodes during pushdown optimization</code></dd></td> <td><a href="https://github.com/matrixorigin/matrixone/pull/22623/files#diff-4a4bc94bc3fc07c48047ca086ea67a57ab19c129f298dc15a8fd0617bd625363">+1/-0</a>&nbsp; &nbsp; &nbsp; </td> </tr> <tr> <td><strong>opt_misc.go</strong><dd><code>Set `SpillMem` when rewriting DISTINCT to aggregate</code>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; </dd></td> <td><a href="https://github.com/matrixorigin/matrixone/pull/22623/files#diff-2efd240eb857ad87a1ed05754d9d4f908d148b92ace437b07b55e60b9a6b216f">+1/-0</a>&nbsp; &nbsp; &nbsp; </td> </tr> <tr> <td><strong>build_insert.go</strong><dd><code>Set `SpillMem` on aggregate nodes in INSERT plans</code>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; </dd></td> <td><a href="https://github.com/matrixorigin/matrixone/pull/22623/files#diff-9d0fd08ed4afb7ebd89ebb05190cd8900fd133473e4ef2e795dd5cea28ddce40">+1/-0</a>&nbsp; &nbsp; &nbsp; </td> </tr> <tr> <td><strong>build_constraint_util.go</strong><dd><code>Set `SpillMem` on aggregate nodes in constraint plans</code>&nbsp; &nbsp; &nbsp; &nbsp; </dd></td> <td><a href="https://github.com/matrixorigin/matrixone/pull/22623/files#diff-6eb56175b52eb3302f1d9ae1cde88ced82c25bb8dd90562238cc8d9757adb76b">+1/-0</a>&nbsp; &nbsp; &nbsp; </td> </tr> <tr> <td><strong>plan.proto</strong><dd><code>Add `spill_mem` field to Node message definition</code>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; </dd></td> <td><a href="https://github.com/matrixorigin/matrixone/pull/22623/files#diff-22bd2bd1e47d7acf42499abc93266a657dfd1a35f32837167470fd926564dfb5">+3/-0</a>&nbsp; &nbsp; &nbsp; </td> </tr> <tr> <td><strong>pipeline.proto</strong><dd><code>Add `spill_mem` field to Group message definition</code>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; </dd></td> <td><a href="https://github.com/matrixorigin/matrixone/pull/22623/files#diff-e9c91df4d87ffef63693440b86da7db2a8349c550c6c789bcc8e71f3e81309a6">+1/-0</a>&nbsp; &nbsp; &nbsp; </td> </tr> <tr> <td><strong>types.go</strong><dd><code>Add `SpillMem` field to Group operator struct</code>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; </dd></td> <td><a href="https://github.com/matrixorigin/matrixone/pull/22623/files#diff-8743f0c66efcf26ec44ff685c6f1a8c38e8e9a362dc8adab5a7e39ee4fc715d5">+3/-2</a>&nbsp; &nbsp; &nbsp; </td> </tr> <tr> <td><strong>operator.go</strong><dd><code>Copy `SpillMem` when duplicating and constructing Group operators</code></dd></td> <td><a href="https://github.com/matrixorigin/matrixone/pull/22623/files#diff-24c2df3f5c8c484aab6845aa35e6426ba672758aea5c19bb680c08eadec260ee">+2/-0</a>&nbsp; &nbsp; &nbsp; </td> </tr> <tr> <td><strong>remoterun.go</strong><dd><code>Convert `SpillMem` between pipeline and VM operator representations</code></dd></td> <td><a href="https://github.com/matrixorigin/matrixone/pull/22623/files#diff-5b03ee992b77eaa173e65a1858ac49c0006729e6d0180db8f62d51aada29576d">+2/-0</a>&nbsp; &nbsp; &nbsp; </td> </tr> </table></details></td></tr><tr><td><strong>Bug fix</strong></td><td><details><summary>1 files</summary><table> <tr> <td><strong>compiler_context.go</strong><dd><code>Fix return statement in `GetLowerCaseTableNames` method</code>&nbsp; &nbsp; </dd></td> <td><a href="https://github.com/matrixorigin/matrixone/pull/22623/files#diff-2e1e8cbf97f9cd34095a0a6b95d33456cb53dad2c6fec7eb9cd37893b8048b8b">+1/-1</a>&nbsp; &nbsp; &nbsp; </td> </tr> </table></details></td></tr><tr><td><strong>Additional files</strong></td><td><details><summary>2 files</summary><table> <tr> <td><strong>pipeline.pb.go</strong></td> <td><a href="https://github.com/matrixorigin/matrixone/pull/22623/files#diff-966f3a64eaa6e8c3a8815332fec622b1e79c9d999ed32198a74e0038c17a25ec">+292/-256</a></td> </tr> <tr> <td><strong>plan.pb.go</strong></td> <td><a href="https://github.com/matrixorigin/matrixone/pull/22623/files#diff-33f2cd3e1c50f35dec893cbff56b5ff6cd269b0b239e51463d4c93f7afdab924">+834/-795</a></td> </tr> </table></details></td></tr></tr></tbody></table> </details> ___
1 parent 232e202 commit 0bddd6f

39 files changed

+1876
-1178
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ require (
2424
github.com/buger/jsonparser v1.1.1
2525
github.com/bytedance/sonic v1.14.1
2626
github.com/cakturk/go-netstat v0.0.0-20200220111822-e5b49efee7a5
27-
github.com/cespare/xxhash v1.1.0
2827
github.com/cespare/xxhash/v2 v2.3.0
2928
github.com/cockroachdb/errors v1.9.1
3029
github.com/colinmarc/hdfs/v2 v2.4.0
@@ -110,6 +109,7 @@ require (
110109
github.com/andybalholm/brotli v1.1.0 // indirect
111110
github.com/bytedance/gopkg v0.1.3 // indirect
112111
github.com/bytedance/sonic/loader v0.3.0 // indirect
112+
github.com/cespare/xxhash v1.1.0 // indirect
113113
github.com/cilium/ebpf v0.9.1 // indirect
114114
github.com/clbanning/mxj v1.8.4 // indirect
115115
github.com/cloudwego/base64x v0.1.6 // indirect

go.sum

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@ github.com/Microsoft/go-winio v0.6.1 h1:9/kr64B9VUZrLm5YYwbGtUJnMgqWVOdUAXu6Migc
4141
github.com/Microsoft/go-winio v0.6.1/go.mod h1:LRdKpFKfdobln8UmuiYcKPot9D2v6svN5+sAH+4kjUM=
4242
github.com/Microsoft/hcsshim v0.11.4 h1:68vKo2VN8DE9AdN4tnkWnmdhqdbpUFM8OF3Airm7fz8=
4343
github.com/Microsoft/hcsshim v0.11.4/go.mod h1:smjE4dvqPX9Zldna+t5FG3rnoHhaB7QYxPRqGcpAD9w=
44-
github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE=
4544
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
4645
github.com/QcloudApi/qcloud_sign_golang v0.0.0-20141224014652-e4130a326409/go.mod h1:1pk82RBxDY/JZnPQrtqHlUFfCctgdorsd9M06fMynOM=
4746
github.com/RoaringBitmap/roaring v1.2.3 h1:yqreLINqIrX22ErkKI0vY47/ivtJr6n+kMhVOVmhWBY=
@@ -771,7 +770,6 @@ github.com/smartystreets/assertions v1.13.1/go.mod h1:cXr/IwVfSo/RbCSPhoAPv73p3h
771770
github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
772771
github.com/smartystreets/goconvey v1.7.2 h1:9RBaZCeXEQ3UselpuwUQHltGVXvdwm6cv1hgR6gDIPg=
773772
github.com/smartystreets/goconvey v1.7.2/go.mod h1:Vw0tHAZW6lzCRk3xgdin6fKYcG+G3Pg9vgXWeJpQFMM=
774-
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72 h1:qLC7fQah7D6K1B0ujays3HV9gkFtllcxhzImRR7ArPQ=
775773
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
776774
github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ=
777775
github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE=

pkg/common/const.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
// Copyright 2024 Matrix Origin
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package common
16+
17+
import "fmt"
18+
19+
const (
20+
KiB = 1024
21+
MiB = 1024 * KiB
22+
GiB = 1024 * MiB
23+
TiB = 1024 * GiB
24+
PiB = 1024 * TiB
25+
26+
THOUSAND = 1000
27+
MILLION = 1000 * THOUSAND
28+
BILLION = 1000 * MILLION
29+
TRILLION = 1000 * BILLION
30+
QUADRILLION = 1000 * TRILLION
31+
)
32+
33+
func ConvertBytesToHumanReadable(bytes int64) string {
34+
num := float64(bytes)
35+
if bytes < KiB {
36+
return fmt.Sprintf("%d bytes", bytes)
37+
}
38+
if bytes < MiB {
39+
return fmt.Sprintf("%.2f KiB", num/KiB)
40+
}
41+
if bytes < GiB {
42+
return fmt.Sprintf("%.2f MiB", num/MiB)
43+
}
44+
if bytes < TiB {
45+
return fmt.Sprintf("%.2f GiB", num/GiB)
46+
}
47+
return fmt.Sprintf("%.2f TiB", num/TiB)
48+
}

pkg/container/types/packer.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,11 @@ var packerAllocator = malloc.NewShardedAllocator(
4040
)
4141

4242
func NewPacker() *Packer {
43-
bs, dec, err := packerAllocator.Allocate(4096, malloc.NoClear)
43+
return NewPackerWithSize(4096)
44+
}
45+
46+
func NewPackerWithSize(size uint64) *Packer {
47+
bs, dec, err := packerAllocator.Allocate(size, malloc.NoClear)
4448
if err != nil {
4549
panic(err)
4650
}
@@ -51,9 +55,13 @@ func NewPacker() *Packer {
5155
}
5256

5357
func NewPackerArray(length int) []*Packer {
58+
return NewPackerArrayWithSize(length, 4096)
59+
}
60+
61+
func NewPackerArrayWithSize(length int, size uint64) []*Packer {
5462
ret := make([]*Packer, 0, length)
5563
for i := 0; i < length; i++ {
56-
ret = append(ret, NewPacker())
64+
ret = append(ret, NewPackerWithSize(size))
5765
}
5866
return ret
5967
}

pkg/fileservice/local_fs.go

Lines changed: 69 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ import (
1818
"bytes"
1919
"context"
2020
"errors"
21-
"github.com/matrixorigin/matrixone/pkg/common/malloc"
2221
"io"
2322
"io/fs"
2423
"iter"
@@ -31,6 +30,8 @@ import (
3130
"sync/atomic"
3231
"time"
3332

33+
"github.com/matrixorigin/matrixone/pkg/common/malloc"
34+
3435
"go.uber.org/zap"
3536

3637
"github.com/matrixorigin/matrixone/pkg/common/moerr"
@@ -1218,3 +1219,70 @@ func entryIsDir(path string, name string, entry fs.FileInfo) (bool, error) {
12181219
}
12191220
return false, nil
12201221
}
1222+
1223+
// open for read and write
1224+
func (l *LocalFS) OpenFile(ctx context.Context, filePath string) (*os.File, error) {
1225+
err := ctx.Err()
1226+
if err != nil {
1227+
return nil, err
1228+
}
1229+
1230+
path, err := ParsePathAtService(filePath, l.name)
1231+
if err != nil {
1232+
return nil, err
1233+
}
1234+
nativePath := l.toNativeFilePath(path.File)
1235+
return os.OpenFile(nativePath, os.O_RDWR, 0644)
1236+
}
1237+
1238+
// create or truncate.
1239+
func (l *LocalFS) CreateFile(ctx context.Context, filePath string) (*os.File, error) {
1240+
err := ctx.Err()
1241+
if err != nil {
1242+
return nil, err
1243+
}
1244+
1245+
path, err := ParsePathAtService(filePath, l.name)
1246+
if err != nil {
1247+
return nil, err
1248+
}
1249+
nativePath := l.toNativeFilePath(path.File)
1250+
return os.Create(nativePath)
1251+
}
1252+
1253+
// remove file
1254+
func (l *LocalFS) RemoveFile(ctx context.Context, filePath string) error {
1255+
err := ctx.Err()
1256+
if err != nil {
1257+
return err
1258+
}
1259+
1260+
path, err := ParsePathAtService(filePath, l.name)
1261+
if err != nil {
1262+
return err
1263+
}
1264+
nativePath := l.toNativeFilePath(path.File)
1265+
return os.Remove(nativePath)
1266+
}
1267+
1268+
// open/create then immediately remove. the opend file is good for read/write.
1269+
func (l *LocalFS) CreateAndRemoveFile(ctx context.Context, filePath string) (*os.File, error) {
1270+
err := ctx.Err()
1271+
if err != nil {
1272+
return nil, err
1273+
}
1274+
1275+
path, err := ParsePathAtService(filePath, l.name)
1276+
if err != nil {
1277+
return nil, err
1278+
}
1279+
nativePath := l.toNativeFilePath(path.File)
1280+
f, err := os.Create(nativePath)
1281+
if err != nil {
1282+
return nil, err
1283+
}
1284+
1285+
// do not check error for this one
1286+
os.Remove(nativePath)
1287+
return f, nil
1288+
}

pkg/fileservice/local_fs_test.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -278,3 +278,60 @@ func TestLocalFSEmptyRootPath(t *testing.T) {
278278
assert.Nil(t, err)
279279
assert.NotNil(t, fs)
280280
}
281+
282+
func TestLocalFSOpenFile(t *testing.T) {
283+
ctx := context.Background()
284+
var counter perfcounter.CounterSet
285+
ctx = perfcounter.WithCounterSet(ctx, &counter)
286+
const (
287+
n = 32
288+
dataLen = 32
289+
)
290+
291+
// new fs
292+
fs, err := NewLocalFS(
293+
ctx,
294+
"foo",
295+
t.TempDir(),
296+
CacheConfig{
297+
DiskPath: ptrTo(t.TempDir()),
298+
DiskCapacity: ptrTo[toml.ByteSize](dataLen * n / 32),
299+
enableDiskCacheForLocalFS: true,
300+
},
301+
nil,
302+
)
303+
assert.Nil(t, err)
304+
305+
f, err := fs.CreateFile(ctx, "aaa.txt")
306+
assert.Nil(t, err)
307+
308+
f.Write([]byte("0123456789"))
309+
f.Close()
310+
311+
f, err = fs.OpenFile(ctx, "aaa.txt")
312+
assert.Nil(t, err)
313+
314+
buf := make([]byte, 10)
315+
f.Read(buf)
316+
assert.Equal(t, []byte("0123456789"), buf)
317+
f.Close()
318+
319+
err = fs.RemoveFile(ctx, "aaa.txt")
320+
assert.Nil(t, err)
321+
322+
// removed, open should fail.
323+
f, err = fs.OpenFile(ctx, "aaa.txt")
324+
assert.NotNil(t, err)
325+
326+
f2, err := fs.CreateAndRemoveFile(ctx, "aaa2.txt")
327+
assert.Nil(t, err)
328+
f2.Write([]byte("0123456789"))
329+
nn, err := f2.ReadAt(buf, 0)
330+
assert.Nil(t, err)
331+
assert.Equal(t, 10, nn)
332+
assert.Equal(t, []byte("0123456789"), buf)
333+
f2.Close()
334+
335+
f2, err = fs.OpenFile(ctx, "aaa2.txt")
336+
assert.NotNil(t, err)
337+
}

pkg/frontend/compiler_context.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ func (tcc *TxnCompilerContext) GetLowerCaseTableNames() int64 {
7777
defer tcc.mu.Unlock()
7878
val, err := tcc.execCtx.ses.GetSessionSysVar("lower_case_table_names")
7979
if err != nil || val == nil {
80-
val = int64(1)
80+
return 1
8181
}
8282
return val.(int64)
8383
}

pkg/frontend/variables.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"sync"
2525
"time"
2626

27+
"github.com/matrixorigin/matrixone/pkg/common"
2728
"github.com/matrixorigin/matrixone/pkg/common/moerr"
2829
"github.com/matrixorigin/matrixone/pkg/config"
2930
"github.com/matrixorigin/matrixone/pkg/container/types"
@@ -3801,6 +3802,14 @@ var gSysVarsDefs = map[string]SystemVariable{
38013802
Type: InitSystemVariableStringType("mo_table_stats.reset_update_time"),
38023803
Default: "",
38033804
},
3805+
"agg_spill_mem": {
3806+
Name: "agg_spill_mem",
3807+
Scope: ScopeBoth,
3808+
Dynamic: true,
3809+
SetVarHintApplies: false,
3810+
Type: InitSystemVariableIntType("agg_spill_mem", 0, common.TiB, false),
3811+
Default: int64(common.GiB),
3812+
},
38043813
}
38053814

38063815
func updateTimeZone(ctx context.Context, sess *Session, sv *SystemVariables, name string, val interface{}) error {

pkg/monlp/llm/mockllm.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ package llm
1717
import (
1818
"context"
1919

20-
"github.com/cespare/xxhash"
20+
"github.com/cespare/xxhash/v2"
2121
"github.com/matrixorigin/matrixone/pkg/common/moerr"
2222
)
2323

0 commit comments

Comments
 (0)