Skip to content

Commit cb61238

Browse files
committed
Dynamic runtime selection logic
This commit changes the logic for selection of either vam or sam runtime. If a given query contains a vectorized file then the vector runtime is selected. This commit also introduces the -vam -sam runtime flags that if enabled will force the selection of either specified runtime.
1 parent 37d3e4e commit cb61238

File tree

8 files changed

+169
-40
lines changed

8 files changed

+169
-40
lines changed

cli/runtimeflags/flags.go

Lines changed: 63 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,11 @@ package runtimeflags
33
import (
44
"errors"
55
"flag"
6+
"fmt"
7+
"os"
68

79
"github.com/brimdata/super/cli/auto"
10+
"github.com/brimdata/super/runtime/exec"
811
"github.com/brimdata/super/runtime/sam/expr/agg"
912
"github.com/brimdata/super/runtime/sam/op/fuse"
1013
"github.com/brimdata/super/runtime/sam/op/sort"
@@ -28,35 +31,80 @@ func defaultMemMaxBytes() uint64 {
2831
}
2932
}
3033

34+
type EngineFlags struct {
35+
sam bool
36+
vam bool
37+
Runtime exec.Runtime
38+
}
39+
3140
type Flags struct {
3241
// these memory limits should be based on a shared resource model
3342
aggMemMax auto.Bytes
3443
sortMemMax auto.Bytes
3544
fuseMemMax auto.Bytes
45+
EngineFlags
46+
}
47+
48+
func (e *EngineFlags) SetFlags(fs *flag.FlagSet) {
49+
fs.BoolVar(&e.sam, "sam", false, "execute query in sequential runtime")
50+
fs.BoolVar(&e.vam, "vam", false, "execute query in vector runtime")
3651
}
3752

38-
func (f *Flags) SetFlags(fs *flag.FlagSet) {
39-
f.aggMemMax = auto.NewBytes(uint64(agg.MaxValueSize))
40-
fs.Var(&f.aggMemMax, "aggmem", "maximum memory used per aggregate function value in MiB, MB, etc")
53+
func (e *Flags) SetFlags(fs *flag.FlagSet) {
54+
e.aggMemMax = auto.NewBytes(uint64(agg.MaxValueSize))
55+
fs.Var(&e.aggMemMax, "aggmem", "maximum memory used per aggregate function value in MiB, MB, etc")
4156
def := defaultMemMaxBytes()
42-
f.sortMemMax = auto.NewBytes(def)
43-
fs.Var(&f.sortMemMax, "sortmem", "maximum memory used by sort in MiB, MB, etc")
44-
f.fuseMemMax = auto.NewBytes(def)
45-
fs.Var(&f.fuseMemMax, "fusemem", "maximum memory used by fuse in MiB, MB, etc")
57+
e.sortMemMax = auto.NewBytes(def)
58+
fs.Var(&e.sortMemMax, "sortmem", "maximum memory used by sort in MiB, MB, etc")
59+
e.fuseMemMax = auto.NewBytes(def)
60+
fs.Var(&e.fuseMemMax, "fusemem", "maximum memory used by fuse in MiB, MB, etc")
61+
e.EngineFlags.SetFlags(fs)
4662
}
4763

48-
func (f *Flags) Init() error {
49-
if f.aggMemMax.Bytes <= 0 {
64+
func (e *EngineFlags) Init() error {
65+
var err error
66+
e.Runtime, err = e.getRuntime()
67+
return err
68+
}
69+
70+
func (e *Flags) Init() error {
71+
if e.aggMemMax.Bytes <= 0 {
5072
return errors.New("aggmem value must be greater than zero")
5173
}
52-
agg.MaxValueSize = int(f.aggMemMax.Bytes)
53-
if f.sortMemMax.Bytes <= 0 {
74+
agg.MaxValueSize = int(e.aggMemMax.Bytes)
75+
if e.sortMemMax.Bytes <= 0 {
5476
return errors.New("sortmem value must be greater than zero")
5577
}
56-
sort.MemMaxBytes = int(f.sortMemMax.Bytes)
57-
if f.fuseMemMax.Bytes <= 0 {
78+
sort.MemMaxBytes = int(e.sortMemMax.Bytes)
79+
if e.fuseMemMax.Bytes <= 0 {
5880
return errors.New("fusemem value must be greater than zero")
5981
}
60-
fuse.MemMaxBytes = int(f.fuseMemMax.Bytes)
61-
return nil
82+
fuse.MemMaxBytes = int(e.fuseMemMax.Bytes)
83+
if e.sam && e.vam {
84+
return errors.New("sam and vam flags cannot both be enabled")
85+
}
86+
return e.EngineFlags.Init()
87+
}
88+
89+
func (e *EngineFlags) getRuntime() (exec.Runtime, error) {
90+
// Flags take precedence.
91+
if e.sam {
92+
return exec.RuntimeSAM, nil
93+
}
94+
if e.vam {
95+
return exec.RuntimeVAM, nil
96+
}
97+
// Then environment variable.
98+
if rt := os.Getenv("SUPER_RUNTIME"); rt != "" {
99+
switch rt {
100+
case "sam":
101+
return exec.RuntimeSAM, nil
102+
case "vam":
103+
return exec.RuntimeVAM, nil
104+
default:
105+
return exec.RuntimeAuto, fmt.Errorf("invalid SUPER_RUNTIME value: %q (must be \"vam\" or \"sam\")", rt)
106+
}
107+
}
108+
return exec.RuntimeAuto, nil
109+
62110
}

cmd/super/compile/shared.go

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,13 @@ import (
55
"errors"
66
"flag"
77
"fmt"
8+
"os"
89

910
"github.com/brimdata/super"
1011
"github.com/brimdata/super/cli/dbflags"
1112
"github.com/brimdata/super/cli/outputflags"
1213
"github.com/brimdata/super/cli/queryflags"
14+
"github.com/brimdata/super/cli/runtimeflags"
1315
"github.com/brimdata/super/compiler"
1416
"github.com/brimdata/super/compiler/describe"
1517
"github.com/brimdata/super/compiler/parser"
@@ -25,13 +27,15 @@ import (
2527
)
2628

2729
type Shared struct {
28-
dag bool
29-
dynamic bool
30-
optimize bool
31-
parallel int
32-
query bool
33-
queryFlags queryflags.QueryTextFlags
34-
OutputFlags outputflags.Flags
30+
dag bool
31+
dynamic bool
32+
optimize bool
33+
parallel int
34+
query bool
35+
runtime bool
36+
queryFlags queryflags.QueryTextFlags
37+
runtimeFlags runtimeflags.EngineFlags
38+
OutputFlags outputflags.Flags
3539
}
3640

3741
func (s *Shared) SetFlags(fs *flag.FlagSet) {
@@ -40,11 +44,16 @@ func (s *Shared) SetFlags(fs *flag.FlagSet) {
4044
fs.BoolVar(&s.optimize, "O", false, "display optimized DAG")
4145
fs.IntVar(&s.parallel, "P", 0, "display parallelized DAG")
4246
fs.BoolVar(&s.query, "C", false, "display DAG or AST as query text")
47+
fs.BoolVar(&s.runtime, "runtime", false, "print selected runtime to stderr")
4348
s.OutputFlags.SetFlags(fs)
4449
s.queryFlags.SetFlags(fs)
50+
s.runtimeFlags.SetFlags(fs)
4551
}
4652

4753
func (s *Shared) Run(ctx context.Context, args []string, dbFlags *dbflags.Flags, desc bool) error {
54+
if err := s.runtimeFlags.Init(); err != nil {
55+
return err
56+
}
4857
if len(s.queryFlags.Query) == 0 && len(args) == 0 {
4958
return errors.New("no query specified")
5059
}
@@ -72,6 +81,9 @@ func (s *Shared) Run(ctx context.Context, args []string, dbFlags *dbflags.Flags,
7281
s.dag = true
7382
}
7483
if !s.dag {
84+
if s.runtime {
85+
defer printRuntime(s.runtimeFlags.Runtime)
86+
}
7587
if s.query {
7688
fmt.Println(sfmt.AST(ast.Parsed()))
7789
return nil
@@ -83,11 +95,15 @@ func (s *Shared) Run(ctx context.Context, args []string, dbFlags *dbflags.Flags,
8395
}
8496
rctx := runtime.DefaultContext()
8597
env := exec.NewEnvironment(storage.NewLocalEngine(), root)
98+
env.Runtime = s.runtimeFlags.Runtime
8699
env.Dynamic = s.dynamic
87100
dag, err := compiler.Analyze(rctx, ast, env, false)
88101
if err != nil {
89102
return err
90103
}
104+
if s.runtime {
105+
defer printRuntime(env.Runtime)
106+
}
91107
if desc {
92108
description, err := describe.AnalyzeDAG(ctx, dag, env)
93109
if err != nil {
@@ -122,3 +138,7 @@ func (s *Shared) writeValue(ctx context.Context, v any) error {
122138
}
123139
return err
124140
}
141+
142+
func printRuntime(r exec.Runtime) {
143+
fmt.Fprintf(os.Stderr, "runtime: %s\n", r)
144+
}

cmd/super/root/command.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,7 @@ func (c *Command) Run(args []string) error {
148148
env.Dynamic = c.inputFlags.Dynamic
149149
env.IgnoreOpenErrors = !c.stopErr
150150
env.ReaderOpts = c.inputFlags.ReaderOpts
151+
env.Runtime = c.runtimeFlags.Runtime
151152
comp := compiler.NewCompilerWithEnv(env)
152153
query, err := runtime.CompileQuery(ctx, super.NewContext(), comp, ast, nil)
153154
if err != nil {

compiler/semantic/analyzer.go

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,9 @@ func Analyze(ctx context.Context, p *parser.AST, env *exec.Environment, extInput
4343
}
4444
}
4545
main := newDagen(t.reporter).assemble(seq, t.resolver.funcs)
46+
if env.Runtime == exec.RuntimeAuto && t.hasVectorizedInput {
47+
env.Runtime = exec.RuntimeVAM
48+
}
4649
return main, t.Error()
4750
}
4851

@@ -52,15 +55,16 @@ func Analyze(ctx context.Context, p *parser.AST, env *exec.Environment, extInput
5255
// to dataflow.
5356
type translator struct {
5457
reporter
55-
ctx context.Context
56-
resolver *resolver
57-
checker *checker
58-
opCnt map[*ast.OpDecl]int
59-
opStack []string
60-
cteStack []*ast.SQLCTE
61-
env *exec.Environment
62-
scope *Scope
63-
sctx *super.Context
58+
ctx context.Context
59+
resolver *resolver
60+
checker *checker
61+
hasVectorizedInput bool
62+
opCnt map[*ast.OpDecl]int
63+
opStack []string
64+
cteStack []*ast.SQLCTE
65+
env *exec.Environment
66+
scope *Scope
67+
sctx *super.Context
6468
}
6569

6670
func newTranslator(ctx context.Context, r reporter, env *exec.Environment) *translator {

compiler/semantic/op.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,9 @@ func (t *translator) file(n ast.Node, name string, args []ast.OpArg) sem.Op {
284284
if format == "" {
285285
format = sio.FormatFromPath(name)
286286
}
287+
if format == "csup" || format == "parquet" {
288+
t.hasVectorizedInput = true
289+
}
287290
typ, err := t.fileType(name, format)
288291
if err != nil {
289292
t.error(n, err)
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
script: |
2+
# Check the environment variable works.
3+
SUPER_RUNTIME=vam super compile -dag -C -runtime 'values "foo"'
4+
>&2 echo "// ==="
5+
# Check the flag overrides environment variable.
6+
SUPER_RUNTIME=vam super compile -sam -dag -C -runtime 'values "foo"'
7+
>&2 echo "// ==="
8+
# A vectorized file runs in vam.
9+
super -f parquet -o test.parquet -c 'values {x:1}'
10+
super compile -dag -C -runtime 'from test.parquet'
11+
>&2 echo "// ==="
12+
# Normal file runs in sam.
13+
super -f sup -o test.sup -c 'values {y:1}'
14+
super compile -dag -C -runtime 'from test.sup'
15+
>&2 echo "// ==="
16+
# A vectorized and normal file run in vam.
17+
super compile -dag -C -runtime 'fork ( from test.parquet ) ( from test.sup )'
18+
>&2 echo "// ==="
19+
# Check -sam with a vector input still runs in sam.
20+
super compile -dag -sam -C -runtime 'from test.parquet'
21+
>&2 echo "// ==="
22+
# Error if both -sam and -vam are enabled.
23+
! super -vam -sam
24+
25+
outputs:
26+
- name: stderr
27+
data: |
28+
runtime: vam
29+
// ===
30+
runtime: sam
31+
// ===
32+
runtime: vam
33+
// ===
34+
runtime: auto
35+
// ===
36+
runtime: vam
37+
// ===
38+
runtime: sam
39+
// ===
40+
sam and vam flags cannot both be enabled

runtime/exec/environment.go

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"fmt"
66
"io"
77
"net/http"
8-
"os"
98

109
"github.com/brimdata/super"
1110
"github.com/brimdata/super/compiler/dag"
@@ -28,21 +27,39 @@ type VectorConcurrentPuller interface {
2827
ConcurrentPull(done bool, id int) (vector.Any, error)
2928
}
3029

30+
type Runtime int
31+
32+
const (
33+
RuntimeAuto Runtime = iota
34+
RuntimeSAM
35+
RuntimeVAM
36+
)
37+
38+
func (r Runtime) String() string {
39+
switch r {
40+
case RuntimeSAM:
41+
return "sam"
42+
case RuntimeVAM:
43+
return "vam"
44+
default:
45+
return "auto"
46+
}
47+
}
48+
3149
type Environment struct {
3250
engine storage.Engine
3351
db *db.Root
34-
useVAM bool
3552

3653
Dynamic bool
3754
IgnoreOpenErrors bool
3855
ReaderOpts anyio.ReaderOpts
56+
Runtime Runtime
3957
}
4058

4159
func NewEnvironment(engine storage.Engine, d *db.Root) *Environment {
4260
return &Environment{
4361
engine: engine,
4462
db: d,
45-
useVAM: os.Getenv("SUPER_VAM") != "",
4663
}
4764
}
4865

@@ -51,11 +68,7 @@ func (e *Environment) Engine() storage.Engine {
5168
}
5269

5370
func (e *Environment) UseVAM() bool {
54-
return e.useVAM
55-
}
56-
57-
func (e *Environment) SetUseVAM() {
58-
e.useVAM = true
71+
return e.Runtime == RuntimeVAM
5972
}
6073

6174
func (e *Environment) IsAttached() bool {

ztest/ztest.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -508,7 +508,7 @@ func runInternal(ctx context.Context, query string, input *string, outputFlags,
508508
}
509509
env := exec.NewEnvironment(nil, nil)
510510
if vector {
511-
env.SetUseVAM()
511+
env.Runtime = exec.RuntimeVAM
512512
}
513513
q, err := runtime.CompileQuery(ctx, sctx, compiler.NewCompilerWithEnv(env), ast, readers)
514514
if err != nil {

0 commit comments

Comments
 (0)