Skip to content

Commit 6ba9469

Browse files
authored
Add scalar aggregate for keyless aggregations (#6558)
This commit adds a separate runtime operator for scalar aggregates. If no input is encountered scalar aggregates return default values for each aggregate value.
1 parent ab7c028 commit 6ba9469

File tree

8 files changed

+223
-3
lines changed

8 files changed

+223
-3
lines changed

book/src/super-sql/aggregates/count.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ count() by k | sort
5353
{k:2,count:1}
5454
```
5555

56-
A simple count with no input values returns no output:
56+
A simple count with no input values returns 0:
5757
```mdtest-spq
5858
# spq
5959
where grep("bar", this) | count()
@@ -62,6 +62,7 @@ where grep("bar", this) | count()
6262
"foo"
6363
10.0.0.1
6464
# expected output
65+
0
6566
```
6667

6768
Count can return an explicit zero when using a `filter` clause in the aggregation:

book/src/tutorials/jq.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1072,6 +1072,7 @@ which produces an output like this:
10721072
{reviewers:|["nwt","henridf","mccanne"]|}
10731073
{reviewers:|["mccanne","mattnibs"]|}
10741074
{reviewers:|["henridf","mccanne","mattnibs"]|}
1075+
{reviewers:null}
10751076
{reviewers:|["henridf","mccanne","mattnibs"]|}
10761077
...
10771078
```

compiler/rungen/aggregate.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import (
1212
"github.com/brimdata/super/sbuf"
1313
)
1414

15-
func (b *Builder) compileAggregate(parent sbuf.Puller, a *dag.AggregateOp) (*aggregate.Op, error) {
15+
func (b *Builder) compileAggregate(parent sbuf.Puller, a *dag.AggregateOp) (sbuf.Puller, error) {
1616
keys, err := b.compileAssignments(a.Keys)
1717
if err != nil {
1818
return nil, err
@@ -22,6 +22,9 @@ func (b *Builder) compileAggregate(parent sbuf.Puller, a *dag.AggregateOp) (*agg
2222
return nil, err
2323
}
2424
dir := order.Direction(a.InputSortDir)
25+
if len(keys) == 0 {
26+
return aggregate.NewScalar(b.rctx, parent, names, reducers, a.PartialsIn, a.PartialsOut)
27+
}
2528
return aggregate.New(b.rctx, parent, keys, names, reducers, a.Limit, dir, a.PartialsIn, a.PartialsOut)
2629
}
2730

compiler/rungen/vop.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -422,6 +422,9 @@ func (b *Builder) compileVamAggregate(s *dag.AggregateOp, parent vector.Puller)
422422
keyNames = append(keyNames, lhs.Path)
423423
keyExprs = append(keyExprs, rhs)
424424
}
425+
if len(keyExprs) == 0 {
426+
return aggregate.NewScalar(parent, b.sctx(), aggs, aggNames, aggExprs, s.PartialsIn, s.PartialsOut)
427+
}
425428
return aggregate.New(parent, b.sctx(), aggNames, aggExprs, aggs, keyNames, keyExprs, s.PartialsIn, s.PartialsOut)
426429
}
427430

runtime/sam/op/aggregate/aggregate.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ func NewAggregator(ctx context.Context, sctx *super.Context, keyRefs, keyExprs,
111111
}, nil
112112
}
113113

114-
func New(rctx *runtime.Context, parent sbuf.Puller, keys []expr.Assignment, aggNames field.List, aggs []*expr.Aggregator, limit int, inputSortDir order.Direction, partialsIn, partialsOut bool) (*Op, error) {
114+
func New(rctx *runtime.Context, parent sbuf.Puller, keys []expr.Assignment, aggNames field.List, aggs []*expr.Aggregator, limit int, inputSortDir order.Direction, partialsIn, partialsOut bool) (sbuf.Puller, error) {
115115
names := make(field.List, 0, len(keys)+len(aggNames))
116116
for _, e := range keys {
117117
p, ok := e.LHS.Path()

runtime/sam/op/aggregate/scalar.go

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
package aggregate
2+
3+
import (
4+
"github.com/brimdata/super"
5+
"github.com/brimdata/super/pkg/field"
6+
"github.com/brimdata/super/runtime"
7+
"github.com/brimdata/super/runtime/sam/expr"
8+
"github.com/brimdata/super/sbuf"
9+
)
10+
11+
type scalarAggregate struct {
12+
sctx *super.Context
13+
parent sbuf.Puller
14+
builder *super.RecordBuilder
15+
aggRefs []expr.Evaluator
16+
aggs []*expr.Aggregator
17+
partialsIn bool
18+
partialsOut bool
19+
row valRow
20+
}
21+
22+
func NewScalar(rctx *runtime.Context, parent sbuf.Puller, aggNames []field.Path, aggs []*expr.Aggregator, partialsIn, partialsOut bool) (sbuf.Puller, error) {
23+
builder, err := super.NewRecordBuilder(rctx.Sctx, aggNames)
24+
if err != nil {
25+
return nil, err
26+
}
27+
aggRefs := make([]expr.Evaluator, 0, len(aggNames))
28+
for _, fieldName := range aggNames {
29+
aggRefs = append(aggRefs, expr.NewDottedExpr(rctx.Sctx, fieldName))
30+
}
31+
return &scalarAggregate{
32+
sctx: rctx.Sctx,
33+
parent: parent,
34+
builder: builder,
35+
aggRefs: aggRefs,
36+
aggs: aggs,
37+
partialsIn: partialsIn,
38+
partialsOut: partialsOut,
39+
row: newValRow(aggs),
40+
}, nil
41+
}
42+
43+
func (s *scalarAggregate) Pull(done bool) (sbuf.Batch, error) {
44+
if done {
45+
s.row = nil
46+
return nil, nil
47+
}
48+
if s.row == nil {
49+
s.row = newValRow(s.aggs)
50+
return nil, nil
51+
}
52+
for {
53+
batch, err := s.parent.Pull(false)
54+
if err != nil {
55+
return nil, err
56+
}
57+
if batch == nil {
58+
return s.result(), nil
59+
}
60+
for _, val := range batch.Values() {
61+
if s.partialsIn {
62+
s.row.consumeAsPartial(val, s.aggRefs)
63+
} else {
64+
s.row.apply(s.sctx, s.aggs, val)
65+
}
66+
}
67+
}
68+
}
69+
70+
func (s *scalarAggregate) result() sbuf.Batch {
71+
var typs []super.Type
72+
s.builder.Reset()
73+
for _, agg := range s.row {
74+
var val super.Value
75+
if s.partialsOut {
76+
val = agg.ResultAsPartial(s.sctx)
77+
} else {
78+
val = agg.Result(s.sctx)
79+
}
80+
typs = append(typs, val.Type())
81+
s.builder.Append(val.Bytes())
82+
}
83+
s.row = nil
84+
typ := s.builder.Type(typs)
85+
b, err := s.builder.Encode()
86+
if err != nil {
87+
panic(err)
88+
}
89+
return sbuf.NewBatch([]super.Value{super.NewValue(typ, b)})
90+
}

runtime/vam/op/aggregate/scalar.go

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
package aggregate
2+
3+
import (
4+
"github.com/brimdata/super"
5+
"github.com/brimdata/super/pkg/field"
6+
"github.com/brimdata/super/runtime/vam/expr"
7+
"github.com/brimdata/super/runtime/vam/expr/agg"
8+
"github.com/brimdata/super/vector"
9+
"github.com/brimdata/super/vector/bitvec"
10+
)
11+
12+
type scalarAggregate struct {
13+
parent vector.Puller
14+
sctx *super.Context
15+
aggExprs []expr.Evaluator
16+
aggs []*expr.Aggregator
17+
builder *vector.RecordBuilder
18+
partialsIn bool
19+
partialsOut bool
20+
21+
funcs []agg.Func
22+
}
23+
24+
func NewScalar(parent vector.Puller, sctx *super.Context, aggs []*expr.Aggregator, aggNames []field.Path, aggExprs []expr.Evaluator, partialsIn, partialsOut bool) (vector.Puller, error) {
25+
builder, err := vector.NewRecordBuilder(sctx, aggNames)
26+
if err != nil {
27+
return nil, err
28+
}
29+
return &scalarAggregate{
30+
parent: parent,
31+
sctx: sctx,
32+
aggs: aggs,
33+
aggExprs: aggExprs,
34+
builder: builder,
35+
partialsIn: partialsIn,
36+
partialsOut: partialsOut,
37+
funcs: newFuncs(aggs),
38+
}, nil
39+
}
40+
41+
func (s *scalarAggregate) Pull(done bool) (vector.Any, error) {
42+
if s.funcs == nil {
43+
s.funcs = newFuncs(s.aggs)
44+
return nil, nil
45+
}
46+
for {
47+
vec, err := s.parent.Pull(done)
48+
if err != nil {
49+
return nil, err
50+
}
51+
if vec == nil {
52+
return s.result(), nil
53+
}
54+
var vals []vector.Any
55+
if s.partialsIn {
56+
for _, e := range s.aggExprs {
57+
vals = append(vals, e.Eval(vec))
58+
}
59+
} else {
60+
for _, e := range s.aggs {
61+
vals = append(vals, e.Eval(vec))
62+
}
63+
}
64+
vector.Apply(false, func(vecs ...vector.Any) vector.Any {
65+
for i, vec := range vecs {
66+
if s.partialsIn {
67+
s.funcs[i].ConsumeAsPartial(vec)
68+
} else {
69+
s.funcs[i].Consume(vec)
70+
}
71+
}
72+
return vector.NewConst(super.Null, vecs[0].Len(), bitvec.Zero)
73+
}, vals...)
74+
}
75+
}
76+
77+
func newFuncs(aggs []*expr.Aggregator) []agg.Func {
78+
var funcs []agg.Func
79+
for _, agg := range aggs {
80+
funcs = append(funcs, agg.Pattern())
81+
}
82+
return funcs
83+
}
84+
85+
func (s *scalarAggregate) result() vector.Any {
86+
var vecs []vector.Any
87+
for _, f := range s.funcs {
88+
b := vector.NewDynamicBuilder()
89+
if s.partialsOut {
90+
b.Write(f.ResultAsPartial(s.sctx))
91+
} else {
92+
b.Write(f.Result(s.sctx))
93+
}
94+
vecs = append(vecs, b.Build())
95+
}
96+
s.funcs = nil
97+
return s.builder.New(vecs, bitvec.Zero)
98+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
spq: |
2+
where false
3+
| count(*), sum(this), and(this), or(this), min(this), avg(this), collect(this)
4+
5+
vector: true
6+
7+
output: |
8+
{count:0,sum:null,and:null::bool,or:null::bool,min:null,avg:null::float64,collect:null}
9+
10+
---
11+
12+
# Test that scalar aggregations with no inputs work in an unnset subquery.
13+
spq: |
14+
unnest this into ( where false | count(*) )
15+
16+
vector: true
17+
18+
input: |
19+
[null]
20+
[null]
21+
22+
output: |
23+
0
24+
0

0 commit comments

Comments
 (0)