Skip to content

Commit bec8934

Browse files
authored
add support for re-entering subqueries in recursive functions (#6325)
This commit addresses an unfinished aspect of subqueries when they are invoked from recursive functions. Previously, the subquery would incorrectly block as Eval() is called concurrently violating our invariant. The fix is to keep a stack of non-shared subquery instances that can be used when recursion is detected. This involves calling the rungen builder from the runtime whenever we need a new instance, which turned out to be surprisingly easy to implement.
1 parent 4c16757 commit bec8934

File tree

3 files changed

+56
-13
lines changed

3 files changed

+56
-13
lines changed

compiler/rungen/expr.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -398,13 +398,17 @@ func (b *Builder) compileSubquery(query *dag.SubqueryExpr) (expr.Evaluator, erro
398398
}
399399
return subquery.NewCachedSubquery(b.rctx, body), nil
400400
}
401-
subquery := subquery.NewSubquery(b.rctx)
402-
body, err := b.compileSeqAndCombine(query.Body, []sbuf.Puller{subquery})
403-
if err != nil {
404-
return nil, err
401+
var create func() *subquery.Subquery
402+
create = func() *subquery.Subquery {
403+
subquery := subquery.NewSubquery(b.rctx, create)
404+
body, err := b.compileSeqAndCombine(query.Body, []sbuf.Puller{subquery})
405+
if err != nil {
406+
panic(err)
407+
}
408+
subquery.SetBody(body)
409+
return subquery
405410
}
406-
subquery.SetBody(body)
407-
return subquery, nil
411+
return create(), nil
408412
}
409413

410414
func (b *Builder) compileSeqAndCombine(seq dag.Seq, parents []sbuf.Puller) (sbuf.Puller, error) {

runtime/sam/op/subquery/subquery.go

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -51,14 +51,19 @@ type Subquery struct {
5151
sctx *super.Context
5252
batchCh chan sbuf.Batch
5353
eos bool
54+
tos int
55+
stack []*Subquery
56+
create func() *Subquery
5457

5558
body sbuf.Puller
5659
}
5760

58-
func NewSubquery(rctx *runtime.Context) *Subquery {
61+
func NewSubquery(rctx *runtime.Context, create func() *Subquery) *Subquery {
5962
return &Subquery{
6063
ctx: rctx.Context,
6164
sctx: rctx.Sctx,
65+
tos: -2,
66+
create: create,
6267
batchCh: make(chan sbuf.Batch, 1),
6368
}
6469
}
@@ -81,22 +86,43 @@ func (s *Subquery) Pull(done bool) (sbuf.Batch, error) {
8186
}
8287
}
8388

84-
func (q *Subquery) Eval(this super.Value) super.Value {
89+
const MaxSubqueryRecursion = 10000
90+
91+
func (s *Subquery) Eval(this super.Value) super.Value {
92+
s.tos++
93+
defer func() {
94+
s.tos--
95+
}()
96+
if s.tos >= 0 {
97+
// We're re-entering this subquery instance before it's done evaluating
98+
// the previous invocation. This happens when a subquery is invoked
99+
// inside of a recursive function so the same instance ends up being
100+
// called by different call frames. To deal with this, we keep a stack
101+
// of Subquery duplicates where each duplicate is not shared and extend
102+
// the stack as needed. If the stack overflows, we return an error.
103+
if s.tos >= MaxSubqueryRecursion {
104+
return s.sctx.WrapError("subquery recursion depth exceeded", this)
105+
}
106+
if s.tos >= len(s.stack) {
107+
s.stack = append(s.stack, s.create())
108+
}
109+
return s.stack[s.tos].Eval(this)
110+
}
85111
select {
86-
case q.batchCh <- sbuf.NewArray([]super.Value{this}):
87-
case <-q.ctx.Done():
88-
return q.sctx.NewError(q.ctx.Err())
112+
case s.batchCh <- sbuf.NewArray([]super.Value{this}):
113+
case <-s.ctx.Done():
114+
return s.sctx.NewError(s.ctx.Err())
89115
}
90116
val := super.Null
91117
var count int
92118
for {
93-
b, err := q.body.Pull(false)
119+
b, err := s.body.Pull(false)
94120
if err != nil {
95121
panic(err)
96122
}
97123
if b == nil {
98124
if count > 1 {
99-
return q.sctx.NewErrorf("query expression produced multiple values (consider [subquery])")
125+
return s.sctx.NewErrorf("query expression produced multiple values (consider [subquery])")
100126
}
101127
return val
102128
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
spq: |
2+
fn walk(v):
3+
case kind(v)
4+
when "array" then
5+
[unnest v | walk(this)]
6+
when "record" then
7+
unflatten([unnest flatten(v) | {key,value:walk(value)}])
8+
else v+1
9+
end
10+
values walk([{x:[1,2]},{y:3}])
11+
12+
output: |
13+
[{x:[2,3]},{y:4}]

0 commit comments

Comments
 (0)