Skip to content

Commit 3aa0b3a

Browse files
committed
sql: add ability to redirect first statement to srf result set
Similar to the existing method to direct the result of the first body statement of a routine to a cursor, this commit adds the ability to redirect to the result buffer of an SRF. This will be used in a later commit to implement PL/pgSQL `RETURN NEXT` and `RETURN QUERY` statements. This commit also adds an option to prevent a routine from adding the result of its *last* body statement to its result set - this will be used by set-returning PL/pgSQL functions that rely on sub-routines to fill in the result set during execution. Informs #105240 Release note: None
1 parent fad7247 commit 3aa0b3a

File tree

11 files changed

+255
-95
lines changed

11 files changed

+255
-95
lines changed

pkg/sql/opt/exec/execbuilder/builder.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,10 @@ type Builder struct {
9494
// rather than scans.
9595
withExprs []builtWithExpr
9696

97+
// routineResultBuffers allows expressions within the body of a set-returning
98+
// PL/pgSQL function to add to the result set during execution.
99+
routineResultBuffers map[memo.RoutineResultBufferID]tree.RoutineResultWriter
100+
97101
// allowAutoCommit is passed through to factory methods for mutation
98102
// operators. It allows execution to commit the transaction as part of the
99103
// mutation itself. See canAutoCommit().

pkg/sql/opt/exec/execbuilder/relational.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1325,6 +1325,7 @@ func (b *Builder) buildApplyJoin(join memo.RelExpr) (_ execPlan, outputCols colO
13251325
eb := New(ctx, ef, &o, f.Memo(), b.catalog, newRightSide, b.semaCtx, b.evalCtx, false /* allowAutoCommit */, b.IsANSIDML)
13261326
eb.disableTelemetry = true
13271327
eb.withExprs = withExprs
1328+
eb.routineResultBuffers = b.routineResultBuffers
13281329
plan, err := eb.Build()
13291330
if err != nil {
13301331
if errors.IsAssertionFailure(err) {
@@ -3550,6 +3551,7 @@ func (b *Builder) buildCall(c *memo.CallExpr) (_ execPlan, outputCols colOrdMap,
35503551
udf.Def.BodyStmts,
35513552
false, /* allowOuterWithRefs */
35523553
nil, /* wrapRootExpr */
3554+
0, /* resultBufferID */
35533555
)
35543556

35553557
r := tree.NewTypedRoutineExpr(
@@ -3561,12 +3563,14 @@ func (b *Builder) buildCall(c *memo.CallExpr) (_ execPlan, outputCols colOrdMap,
35613563
udf.Def.CalledOnNullInput,
35623564
udf.Def.MultiColDataSource,
35633565
udf.Def.SetReturning,
3566+
false, /* discardLastStmtResult */
35643567
false, /* tailCall */
35653568
true, /* procedure */
35663569
false, /* triggerFunc */
35673570
false, /* blockStart */
35683571
nil, /* blockState */
35693572
nil, /* cursorDeclaration */
3573+
nil, /* firstStmtResultWriter */
35703574
)
35713575

35723576
var ep execPlan

pkg/sql/opt/exec/execbuilder/scalar.go

Lines changed: 78 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -694,6 +694,7 @@ func (b *Builder) buildExistsSubquery(
694694
nil, /* stmtStr */
695695
true, /* allowOuterWithRefs */
696696
wrapRootExpr,
697+
0, /* resultBufferID */
697698
)
698699
return tree.NewTypedCoalesceExpr(tree.TypedExprs{
699700
tree.NewTypedRoutineExpr(
@@ -705,12 +706,14 @@ func (b *Builder) buildExistsSubquery(
705706
true, /* calledOnNullInput */
706707
false, /* multiColOutput */
707708
false, /* generator */
709+
false, /* discardLastStmtResult */
708710
false, /* tailCall */
709711
false, /* procedure */
710712
false, /* triggerFunc */
711713
false, /* blockStart */
712714
nil, /* blockState */
713715
nil, /* cursorDeclaration */
716+
nil, /* firstStmtResultWriter */
714717
),
715718
tree.DBoolFalse,
716719
}, types.Bool), nil
@@ -817,6 +820,7 @@ func (b *Builder) buildSubquery(
817820
nil, /* stmtStr */
818821
true, /* allowOuterWithRefs */
819822
nil, /* wrapRootExpr */
823+
0, /* resultBufferID */
820824
)
821825
_, tailCall := b.tailCalls[subquery]
822826
return tree.NewTypedRoutineExpr(
@@ -828,12 +832,14 @@ func (b *Builder) buildSubquery(
828832
true, /* calledOnNullInput */
829833
false, /* multiColOutput */
830834
false, /* generator */
835+
false, /* discardLastStmtResult */
831836
tailCall,
832837
false, /* procedure */
833838
false, /* triggerFunc */
834839
false, /* blockStart */
835840
nil, /* blockState */
836841
nil, /* cursorDeclaration */
842+
nil, /* firstStmtResultWriter */
837843
), nil
838844
}
839845

@@ -847,7 +853,11 @@ func (b *Builder) buildSubquery(
847853
withExprs := make([]builtWithExpr, len(b.withExprs))
848854
copy(withExprs, b.withExprs)
849855
planGen := func(
850-
ctx context.Context, ref tree.RoutineExecFactory, args tree.Datums, fn tree.RoutinePlanGeneratedFunc,
856+
ctx context.Context,
857+
ref tree.RoutineExecFactory,
858+
_ tree.RoutineResultWriter,
859+
args tree.Datums,
860+
fn tree.RoutinePlanGeneratedFunc,
851861
) error {
852862
// Analyze the input of the subquery to find tail calls, which will allow
853863
// nested routines (including lazy subqueries) to be executed in the same
@@ -858,6 +868,7 @@ func (b *Builder) buildSubquery(
858868
ef := ref.(exec.Factory)
859869
eb := New(ctx, ef, b.optimizer, b.mem, b.catalog, input, b.semaCtx, b.evalCtx, false /* allowAutoCommit */, b.IsANSIDML)
860870
eb.withExprs = withExprs
871+
eb.routineResultBuffers = b.routineResultBuffers
861872
eb.disableTelemetry = true
862873
eb.planLazySubqueries = true
863874
eb.tailCalls = tailCalls
@@ -901,12 +912,14 @@ func (b *Builder) buildSubquery(
901912
true, /* calledOnNullInput */
902913
false, /* multiColOutput */
903914
false, /* generator */
915+
false, /* discardLastStmtResult */
904916
tailCall,
905917
false, /* procedure */
906918
false, /* triggerFunc */
907919
false, /* blockStart */
908920
nil, /* blockState */
909921
nil, /* cursorDeclaration */
922+
nil, /* firstStmtResultWriter */
910923
), nil
911924
}
912925

@@ -977,29 +990,43 @@ func (b *Builder) buildUDF(ctx *buildScalarCtx, scalar opt.ScalarExpr) (tree.Typ
977990
}
978991

979992
// Execution expects there to be more than one body statement if a cursor is
980-
// opened.
981-
if udf.Def.CursorDeclaration != nil && len(udf.Def.Body) <= 1 {
993+
// opened or the result of the first statement is directed to a buffer.
994+
firstStmtOut := udf.Def.FirstStmtOutput
995+
if len(udf.Def.Body) <= 1 &&
996+
(firstStmtOut.CursorDeclaration != nil || firstStmtOut.TargetBufferID != 0) {
982997
panic(errors.AssertionFailedf(
983-
"expected more than one body statement for a routine that opens a cursor",
998+
"expected more than one body statement for a routine that " +
999+
"redirects the output of the first statement",
9841000
))
9851001
}
1002+
var firstStmtResultWriter tree.RoutineResultWriter
1003+
if firstStmtOut.TargetBufferID != 0 {
1004+
// The first statement of this routine is writing to the result set of an
1005+
// ancestor set-returning PL/pgSQL function.
1006+
firstStmtResultWriter = b.routineResultBuffers[firstStmtOut.TargetBufferID]
1007+
}
9861008

9871009
// Create a tree.RoutinePlanFn that can plan the statements in the UDF body.
988-
// TODO(mgartner): Add support for WITH expressions inside UDF bodies.
9891010
planGen := b.buildRoutinePlanGenerator(
9901011
udf.Def.Params,
9911012
udf.Def.Body,
9921013
udf.Def.BodyProps,
9931014
udf.Def.BodyStmts,
9941015
false, /* allowOuterWithRefs */
9951016
nil, /* wrapRootExpr */
1017+
udf.Def.ResultBufferID,
9961018
)
9971019

9981020
// Enable stepping for volatile functions so that statements within the UDF
9991021
// see mutations made by the invoking statement and by previously executed
10001022
// statements.
10011023
enableStepping := udf.Def.Volatility == volatility.Volatile
10021024

1025+
// A non-zero ResultBufferID indicates that the UDF is a set-returning
1026+
// function, with sub-routines adding to the result set. In this case, the
1027+
// last body statement does not directly contribute to the result set.
1028+
discardLastStmtResult := udf.Def.ResultBufferID != 0
1029+
10031030
// The calling routine, if any, will have already determined whether this
10041031
// routine is in tail-call position.
10051032
_, tailCall := b.tailCalls[udf]
@@ -1013,12 +1040,14 @@ func (b *Builder) buildUDF(ctx *buildScalarCtx, scalar opt.ScalarExpr) (tree.Typ
10131040
udf.Def.CalledOnNullInput,
10141041
udf.Def.MultiColDataSource,
10151042
udf.Def.SetReturning,
1043+
discardLastStmtResult,
10161044
tailCall,
10171045
false, /* procedure */
10181046
udf.Def.TriggerFunc,
10191047
udf.Def.BlockStart,
10201048
blockState,
1021-
udf.Def.CursorDeclaration,
1049+
firstStmtOut.CursorDeclaration,
1050+
firstStmtResultWriter,
10221051
), nil
10231052
}
10241053

@@ -1058,6 +1087,7 @@ func (b *Builder) initRoutineExceptionHandler(
10581087
action.BodyStmts,
10591088
false, /* allowOuterWithRefs */
10601089
nil, /* wrapRootExpr */
1090+
0, /* resultBufferID */
10611091
)
10621092
// Build a routine with no arguments for the exception handler. The actual
10631093
// arguments will be supplied when (if) the handler is invoked.
@@ -1070,12 +1100,14 @@ func (b *Builder) initRoutineExceptionHandler(
10701100
action.CalledOnNullInput,
10711101
action.MultiColDataSource,
10721102
action.SetReturning,
1103+
false, /* discardLastStmtResult */
10731104
false, /* tailCall */
10741105
false, /* procedure */
10751106
false, /* triggerFunc */
10761107
false, /* blockStart */
10771108
nil, /* blockState */
10781109
nil, /* cursorDeclaration */
1110+
nil, /* firstStmtResultWriter */
10791111
)
10801112
}
10811113
blockState.ExceptionHandler = exceptionHandler
@@ -1093,13 +1125,18 @@ type wrapRootExprFn func(f *norm.Factory, e memo.RelExpr) opt.Expr
10931125
// so that WithScans within a statement can be planned and executed.
10941126
// wrapRootExpr allows the root expression of all statements to be replaced with
10951127
// an arbitrary expression.
1128+
//
1129+
// resultBufferID, if not zero, specifies the ID of the routine's result buffer.
1130+
// This is for PL/pgSQL set-returning routines, which must allow sub-routines to
1131+
// add to the result set at any point during execution.
10961132
func (b *Builder) buildRoutinePlanGenerator(
10971133
params opt.ColList,
10981134
stmts []memo.RelExpr,
10991135
stmtProps []*physical.Required,
11001136
stmtStr []string,
11011137
allowOuterWithRefs bool,
11021138
wrapRootExpr wrapRootExprFn,
1139+
resultBufferID memo.RoutineResultBufferID,
11031140
) tree.RoutinePlanGenerator {
11041141
// argOrd returns the ordinal of the argument within the arguments list that
11051142
// can be substituted for each reference to the given function parameter
@@ -1132,7 +1169,11 @@ func (b *Builder) buildRoutinePlanGenerator(
11321169
var o xform.Optimizer
11331170
originalMemo := b.mem
11341171
planGen := func(
1135-
ctx context.Context, ref tree.RoutineExecFactory, args tree.Datums, fn tree.RoutinePlanGeneratedFunc,
1172+
ctx context.Context,
1173+
ref tree.RoutineExecFactory,
1174+
resultWriter tree.RoutineResultWriter,
1175+
args tree.Datums,
1176+
fn tree.RoutinePlanGeneratedFunc,
11361177
) (err error) {
11371178
defer func() {
11381179
if r := recover(); r != nil {
@@ -1219,9 +1260,14 @@ func (b *Builder) buildRoutinePlanGenerator(
12191260
// Identify nested routines that are in tail-call position, and cache them
12201261
// in the Builder. When a nested routine is evaluated, this information
12211262
// may be used to enable tail-call optimization.
1263+
//
1264+
// Tail-call optimization is not allowed when resultBufferID is non-zero,
1265+
// because non-zero resultBufferID means that expressions in the body will
1266+
// add directly to the result set, and the result of the last body
1267+
// statement will be ignored.
12221268
isFinalPlan := i == len(stmts)-1
12231269
var tailCalls map[opt.ScalarExpr]struct{}
1224-
if isFinalPlan {
1270+
if isFinalPlan && resultBufferID == 0 {
12251271
tailCalls = make(map[opt.ScalarExpr]struct{})
12261272
memo.ExtractTailCalls(optimizedExpr, tailCalls)
12271273
}
@@ -1233,6 +1279,13 @@ func (b *Builder) buildRoutinePlanGenerator(
12331279
eb.disableTelemetry = true
12341280
eb.planLazySubqueries = true
12351281
eb.tailCalls = tailCalls
1282+
eb.routineResultBuffers = b.routineResultBuffers
1283+
if resultBufferID != 0 {
1284+
// A PL/pgSQL set-returning function must allow expressions in the body
1285+
// to add directly to the result set. We achieve this by passing the
1286+
// RoutineResultWriter to the child Builder.
1287+
eb.addRoutineResultBuffer(resultBufferID, resultWriter)
1288+
}
12361289
plan, err := eb.Build()
12371290
if err != nil {
12381291
if errors.IsAssertionFailure(err) {
@@ -1264,6 +1317,23 @@ func (b *Builder) buildRoutinePlanGenerator(
12641317
return planGen
12651318
}
12661319

1320+
func (b *Builder) addRoutineResultBuffer(
1321+
bufferID memo.RoutineResultBufferID, resWriter tree.RoutineResultWriter,
1322+
) {
1323+
if b.routineResultBuffers == nil {
1324+
b.routineResultBuffers = make(map[memo.RoutineResultBufferID]tree.RoutineResultWriter)
1325+
} else {
1326+
// Copy the map to avoid modifying the original. Note that we expect only to
1327+
// call this method once for a given Builder.
1328+
newResultBuffers := make(map[memo.RoutineResultBufferID]tree.RoutineResultWriter)
1329+
for k, v := range b.routineResultBuffers {
1330+
newResultBuffers[k] = v
1331+
}
1332+
b.routineResultBuffers = newResultBuffers
1333+
}
1334+
b.routineResultBuffers[bufferID] = resWriter
1335+
}
1336+
12671337
func expectedLazyRoutineError(typ string) error {
12681338
return errors.AssertionFailedf("expected %s to be lazily planned as a routine", typ)
12691339
}

pkg/sql/opt/memo/expr.go

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -746,6 +746,13 @@ type UDFDefinition struct {
746746
// Body. It is only populated when verbose tracing is enabled.
747747
BodyStmts []string
748748

749+
// FirstStmtOutput allows the result of the first body statement to be
750+
// redirected. Only one of the options can be set. If one is set, there will
751+
// be at least two body statements - the first with redirected output, and the
752+
// last to produce the result of the routine. This invariant is enforced when
753+
// the routine is built.
754+
FirstStmtOutput RoutineStmtOutput
755+
749756
// ExceptionBlock contains information needed for exception-handling when the
750757
// body of this routine returns an error. It can be unset.
751758
ExceptionBlock *ExceptionBlock
@@ -755,12 +762,12 @@ type UDFDefinition struct {
755762
// handling.
756763
BlockState *tree.BlockState
757764

758-
// CursorDeclaration contains the information needed to open a SQL cursor with
759-
// the result of the *first* body statement. If it is set, there will be at
760-
// least two body statements - one to open the cursor, and one to evaluate the
761-
// result of the routine. This invariant is enforced when the PLpgSQL routine
762-
// is built. CursorDeclaration may be unset.
763-
CursorDeclaration *tree.RoutineOpenCursor
765+
// ResultBufferID, if set, identifies the buffer that stores the result for
766+
// the set-returning PL/pgSQL function that this UDFDefinition represents.
767+
// Sub-routines within the body statements may use this ID to add their
768+
// results to the same buffer. This is used to implement the PL/pgsql
769+
// RETURN NEXT and RETURN QUERY statements.
770+
ResultBufferID RoutineResultBufferID
764771
}
765772

766773
// ExceptionBlock contains the information needed to match and handle errors in
@@ -777,6 +784,27 @@ type ExceptionBlock struct {
777784
Actions []*UDFDefinition
778785
}
779786

787+
// RoutineStmtOutput allows the result of a statement in a PL/pgSQL function to
788+
// be redirected from the default output buffer. This is used to open cursors
789+
// and implement the RETURN NEXT and RETURN QUERY statements.
790+
//
791+
// Only one of the members can be set.
792+
type RoutineStmtOutput struct {
793+
// CursorDeclaration contains the information needed to open a SQL cursor
794+
// with the result of the *first* body statement.
795+
CursorDeclaration *tree.RoutineOpenCursor
796+
797+
// TargetBufferID identifies the result buffer of an ancestor set-returning
798+
// PL/pgSQL function. The result of the *first* body statement will be added
799+
// to this buffer.
800+
TargetBufferID RoutineResultBufferID
801+
}
802+
803+
// RoutineResultBufferID identifies a buffer that is used to store the result of
804+
// a set-returning PL/pgSQL function. The RoutineBufferID is unique within the
805+
// scope of a single query.
806+
type RoutineResultBufferID uint64
807+
780808
// WindowFrame denotes the definition of a window frame for an individual
781809
// window function, excluding the OFFSET expressions, if present.
782810
type WindowFrame struct {

pkg/sql/opt/memo/expr_format.go

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1068,12 +1068,22 @@ func (f *ExprFmtCtx) formatScalarWithLabel(
10681068
n := tp.Child("body")
10691069
for i := range def.Body {
10701070
stmtNode := n
1071-
if i == 0 && def.CursorDeclaration != nil {
1072-
// The first statement is opening a cursor.
1073-
stmtNode = n.Child("open-cursor")
1071+
if i == 0 {
1072+
if def.FirstStmtOutput.CursorDeclaration != nil {
1073+
// The first statement is opening a cursor.
1074+
stmtNode = n.Child("open-cursor")
1075+
} else if def.FirstStmtOutput.TargetBufferID != 0 {
1076+
// The first statement is writing to a target buffer.
1077+
stmtNode = n.Child("add-to-srf-result")
1078+
}
10741079
}
10751080
prevTailCalls := f.tailCalls
1076-
if i == len(def.Body)-1 {
1081+
1082+
// Routine calls in the last body statement may be tail calls if
1083+
// ResultBufferID is unset. If it is set, the result of the last body
1084+
// statement is not directly used as the result of the UDF call, so it
1085+
// cannot contain tail calls.
1086+
if i == len(def.Body)-1 && def.ResultBufferID == 0 {
10771087
f.tailCalls = make(map[opt.ScalarExpr]struct{})
10781088
ExtractTailCalls(def.Body[i], f.tailCalls)
10791089
}

0 commit comments

Comments
 (0)