Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -667,6 +667,7 @@ ALL_TESTS = [
"//pkg/sql/sqlstats/sslocal:sslocal_test",
"//pkg/sql/sqlstats/ssmemstorage:ssmemstorage_test",
"//pkg/sql/sqlstats/ssremote:ssremote_test",
"//pkg/sql/sqlstats:sqlstats_test",
"//pkg/sql/sqltestutils:sqltestutils_test",
"//pkg/sql/stats:stats_test",
"//pkg/sql/stmtdiagnostics:stmtdiagnostics_test",
Expand Down Expand Up @@ -2424,6 +2425,7 @@ GO_TARGETS = [
"//pkg/sql/sqlstats/ssremote:ssremote",
"//pkg/sql/sqlstats/ssremote:ssremote_test",
"//pkg/sql/sqlstats:sqlstats",
"//pkg/sql/sqlstats:sqlstats_test",
"//pkg/sql/sqltelemetry:sqltelemetry",
"//pkg/sql/sqltestutils:sqltestutils",
"//pkg/sql/sqltestutils:sqltestutils_test",
Expand Down
4 changes: 1 addition & 3 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1311,8 +1311,6 @@ func (s *Server) newConnExecutor(

ex.applicationName.Store(ex.sessionData().ApplicationName)
ex.applicationStats = applicationStats
// We ignore statements and transactions run by the internal executor by
// passing a nil writer.
ex.statsCollector = sslocal.NewStatsCollector(
s.cfg.Settings,
applicationStats,
Expand Down Expand Up @@ -4020,7 +4018,7 @@ func (ex *connExecutor) initPlanner(ctx context.Context, p *planner) {
p.cancelChecker.Reset(ctx)

ex.initEvalCtx(ctx, &p.extendedEvalCtx, p)

p.statsCollector = ex.statsCollector
p.sessionDataMutatorIterator = ex.dataMutatorIterator
p.noticeSender = nil
p.preparedStatements = ex.getPrepStmtsAccessor()
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/opt/exec/execbuilder/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ go_library(
deps = [
"//pkg/kv/kvserver/concurrency/isolation",
"//pkg/server/telemetry",
"//pkg/sql/appstatspb",
"//pkg/sql/catalog/colinfo",
"//pkg/sql/catalog/descpb",
"//pkg/sql/lexbase",
Expand Down Expand Up @@ -45,6 +46,7 @@ go_library(
"//pkg/sql/sem/tree/treewindow",
"//pkg/sql/sem/volatility",
"//pkg/sql/sqlerrors",
"//pkg/sql/sqlstats",
"//pkg/sql/sqltelemetry",
"//pkg/sql/types",
"//pkg/util/buildutil",
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/opt/exec/execbuilder/relational.go
Original file line number Diff line number Diff line change
Expand Up @@ -3731,6 +3731,7 @@ func (b *Builder) buildCall(c *memo.CallExpr) (_ execPlan, outputCols colOrdMap,
udf.Def.BodyProps,
udf.Def.BodyStmts,
udf.Def.BodyTags,
udf.Def.BodyASTs,
false, /* allowOuterWithRefs */
nil, /* wrapRootExpr */
0, /* resultBufferID */
Expand Down
37 changes: 33 additions & 4 deletions pkg/sql/opt/exec/execbuilder/scalar.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package execbuilder
import (
"context"

"github.com/cockroachdb/cockroach/pkg/sql/appstatspb"
"github.com/cockroachdb/cockroach/pkg/sql/opt"
"github.com/cockroachdb/cockroach/pkg/sql/opt/exec"
"github.com/cockroachdb/cockroach/pkg/sql/opt/memo"
Expand All @@ -24,6 +25,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree/treecmp"
"github.com/cockroachdb/cockroach/pkg/sql/sem/volatility"
"github.com/cockroachdb/cockroach/pkg/sql/sqlerrors"
"github.com/cockroachdb/cockroach/pkg/sql/sqlstats"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/errorutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -695,6 +697,7 @@ func (b *Builder) buildExistsSubquery(
stmtProps,
nil, /* stmtStr */
make([]string, len(stmts)),
nil, /* stmtASTs */
true, /* allowOuterWithRefs */
wrapRootExpr,
0, /* resultBufferID */
Expand Down Expand Up @@ -822,6 +825,7 @@ func (b *Builder) buildSubquery(
stmtProps,
nil, /* stmtStr */
make([]string, len(stmts)),
nil, /* stmtASTs */
true, /* allowOuterWithRefs */
nil, /* wrapRootExpr */
0, /* resultBufferID */
Expand Down Expand Up @@ -900,7 +904,7 @@ func (b *Builder) buildSubquery(
if err != nil {
return err
}
err = fn(plan, "" /* stmtForDistSQLDiagram */, true /* isFinalPlan */)
err = fn(plan, nil, "" /* stmtForDistSQLDiagram */, true /* isFinalPlan */)
if err != nil {
return err
}
Expand Down Expand Up @@ -1017,8 +1021,9 @@ func (b *Builder) buildUDF(ctx *buildScalarCtx, scalar opt.ScalarExpr) (tree.Typ
udf.Def.BodyProps,
udf.Def.BodyStmts,
udf.Def.BodyTags,
false, /* allowOuterWithRefs */
nil, /* wrapRootExpr */
udf.Def.BodyASTs, /* stmtASTs */
false, /* allowOuterWithRefs */
nil, /* wrapRootExpr */
udf.Def.ResultBufferID,
)

Expand Down Expand Up @@ -1091,6 +1096,7 @@ func (b *Builder) initRoutineExceptionHandler(
action.BodyProps,
action.BodyStmts,
action.BodyTags,
nil, /* stmtASTs */
false, /* allowOuterWithRefs */
nil, /* wrapRootExpr */
0, /* resultBufferID */
Expand Down Expand Up @@ -1141,6 +1147,7 @@ func (b *Builder) buildRoutinePlanGenerator(
stmtProps []*physical.Required,
stmtStr []string,
stmtTags []string,
stmtASTs []tree.Statement,
allowOuterWithRefs bool,
wrapRootExpr wrapRootExprFn,
resultBufferID memo.RoutineResultBufferID,
Expand Down Expand Up @@ -1207,7 +1214,12 @@ func (b *Builder) buildRoutinePlanGenerator(
dbName := b.evalCtx.SessionData().Database
appName := b.evalCtx.SessionData().ApplicationName

format := tree.FmtHideConstants | tree.FmtFlags(tree.QueryFormattingForFingerprintsMask.Get(&b.evalCtx.Settings.SV))
for i := range stmts {
var statsBuilder *sqlstats.StatsBuilderWithLatencyRecorder
latencyRecorder := sqlstats.NewStatementLatencyRecorder()
sqlstats.RecordStatementPhase(latencyRecorder, sqlstats.StatementStarted)
sqlstats.RecordStatementPhase(latencyRecorder, sqlstats.StatementStartParsing)
stmt := stmts[i]
props := stmtProps[i]
var tag string
Expand All @@ -1216,6 +1228,22 @@ func (b *Builder) buildRoutinePlanGenerator(
if i < len(stmtTags) {
tag = stmtTags[i]
}
if i < len(stmtASTs) {
fingerprint := tree.FormatStatementHideConstants(stmtASTs[i], format)
fpId := appstatspb.ConstructStatementFingerprintID(fingerprint, b.evalCtx.TxnImplicit, dbName)
summary := tree.FormatStatementSummary(stmtASTs[i], format)
stmtType := stmtASTs[i].StatementType()
builder := sqlstats.NewRecordedStatementStatsBuilder(
fpId, dbName, fingerprint, summary, stmtType, appName,
)

statsBuilder = &sqlstats.StatsBuilderWithLatencyRecorder{
StatsBuilder: builder,
LatencyRecorder: latencyRecorder,
}
}
sqlstats.RecordStatementPhase(latencyRecorder, sqlstats.StatementEndParsing)
sqlstats.RecordStatementPhase(latencyRecorder, sqlstats.StatementStartPlanning)
o.Init(ctx, b.evalCtx, b.catalog)
f := o.Factory()

Expand Down Expand Up @@ -1324,7 +1352,8 @@ func (b *Builder) buildRoutinePlanGenerator(
stmtForDistSQLDiagram = stmtStr[i]
}
incrementRoutineStmtCounter(b.evalCtx.StartedRoutineStatementCounters, dbName, appName, tag)
err = fn(plan, stmtForDistSQLDiagram, isFinalPlan)
sqlstats.RecordStatementPhase(latencyRecorder, sqlstats.StatementEndPlanning)
err = fn(plan, statsBuilder, stmtForDistSQLDiagram, isFinalPlan)
if err != nil {
return err
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/opt/memo/expr.go
Original file line number Diff line number Diff line change
Expand Up @@ -742,6 +742,8 @@ type UDFDefinition struct {
// at the same position in Body.
BodyProps []*physical.Required

BodyASTs []tree.Statement

// BodyStmts, if set, is the string representation of each statement in
// Body. It is only populated when verbose tracing is enabled.
BodyStmts []string
Expand Down
41 changes: 22 additions & 19 deletions pkg/sql/opt/optbuilder/plpgsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -678,7 +678,7 @@ func (b *plpgsqlBuilder) buildPLpgSQLStatements(stmts []ast.Statement, s *scope)
if b.options.insideDataSource && b.setReturnType.Family() == types.TupleFamily {
retNextScope = b.ob.expandRoutineTupleIntoCols(retNextScope)
}
b.appendBodyStmtFromScope(&retCon, retNextScope, "" /* stmtTag */)
b.appendBodyStmtFromScope(&retCon, retNextScope, nil /* stmt */)
b.appendPlpgSQLStmts(&retCon, stmts[i+1:])
return b.callContinuation(&retCon, s)

Expand Down Expand Up @@ -720,7 +720,7 @@ func (b *plpgsqlBuilder) buildPLpgSQLStatements(stmts []ast.Statement, s *scope)
if !b.options.insideDataSource && b.setReturnType.Family() == types.TupleFamily {
retQueryScope = b.ob.combineRoutineColsIntoTuple(retQueryScope)
}
b.appendBodyStmtFromScope(&retCon, retQueryScope, t.SqlStmt.StatementTag())
b.appendBodyStmtFromScope(&retCon, retQueryScope, t.SqlStmt)
b.appendPlpgSQLStmts(&retCon, stmts[i+1:])
return b.callContinuation(&retCon, s)

Expand Down Expand Up @@ -949,7 +949,7 @@ func (b *plpgsqlBuilder) buildPLpgSQLStatements(stmts []ast.Statement, s *scope)
// crdb_internal.plpgsql_raise builtin function.
con := b.makeContinuation("_stmt_raise")
con.def.Volatility = volatility.Volatile
b.appendBodyStmtFromScope(&con, b.buildPLpgSQLRaise(con.s, b.getRaiseArgs(con.s, t)), "" /* stmtTag */)
b.appendBodyStmtFromScope(&con, b.buildPLpgSQLRaise(con.s, b.getRaiseArgs(con.s, t)), nil /* stmt */)
b.appendPlpgSQLStmts(&con, stmts[i+1:])
return b.callContinuation(&con, s)

Expand All @@ -970,7 +970,7 @@ func (b *plpgsqlBuilder) buildPLpgSQLStatements(stmts []ast.Statement, s *scope)
if len(t.Target) == 0 {
// When there is no INTO target, build the SQL statement into a body
// statement that is only executed for its side effects.
b.appendBodyStmtFromScope(&execCon, stmtScope, t.SqlStmt.StatementTag())
b.appendBodyStmtFromScope(&execCon, stmtScope, t.SqlStmt)
b.appendPlpgSQLStmts(&execCon, stmts[i+1:])
return b.callContinuation(&execCon, s)
}
Expand Down Expand Up @@ -1028,7 +1028,7 @@ func (b *plpgsqlBuilder) buildPLpgSQLStatements(stmts []ast.Statement, s *scope)
intoScope = b.callContinuation(&retCon, intoScope)

// Step 3: call the INTO continuation from the parent scope.
b.appendBodyStmtFromScope(&execCon, intoScope, t.SqlStmt.StatementTag())
b.appendBodyStmtFromScope(&execCon, intoScope, t.SqlStmt)
return b.callContinuation(&execCon, s)

case *ast.Open:
Expand Down Expand Up @@ -1068,7 +1068,7 @@ func (b *plpgsqlBuilder) buildPLpgSQLStatements(stmts []ast.Statement, s *scope)
// Cursors with mutations are invalid.
panic(cursorMutationErr)
}
b.appendBodyStmtFromScope(&openCon, openScope, query.StatementTag())
b.appendBodyStmtFromScope(&openCon, openScope, query)
b.appendPlpgSQLStmts(&openCon, stmts[i+1:])

// Build a statement to generate a unique name for the cursor if one
Expand All @@ -1078,7 +1078,7 @@ func (b *plpgsqlBuilder) buildPLpgSQLStatements(stmts []ast.Statement, s *scope)
nameCon := b.makeContinuation("_gen_cursor_name")
nameCon.def.Volatility = volatility.Volatile
nameScope := b.buildCursorNameGen(&nameCon, t.CurVar)
b.appendBodyStmtFromScope(&nameCon, b.callContinuation(&openCon, nameScope), "" /* stmtTag */)
b.appendBodyStmtFromScope(&nameCon, b.callContinuation(&openCon, nameScope), nil /* stmt */)
return b.callContinuation(&nameCon, s)

case *ast.Close:
Expand Down Expand Up @@ -1118,7 +1118,7 @@ func (b *plpgsqlBuilder) buildPLpgSQLStatements(stmts []ast.Statement, s *scope)
closeScope := closeCon.s.push()
b.ob.synthesizeColumn(closeScope, closeColName, types.Int, nil /* expr */, closeCall)
b.ob.constructProjectForScope(closeCon.s, closeScope)
b.appendBodyStmtFromScope(&closeCon, closeScope, "" /* stmtTag */)
b.appendBodyStmtFromScope(&closeCon, closeScope, nil /* stmt */)
b.appendPlpgSQLStmts(&closeCon, stmts[i+1:])
return b.callContinuation(&closeCon, s)

Expand All @@ -1142,7 +1142,7 @@ func (b *plpgsqlBuilder) buildPLpgSQLStatements(stmts []ast.Statement, s *scope)
fetchCon.def.Volatility = volatility.Volatile
fetchScope := b.buildFetch(fetchCon.s, t)
if t.IsMove {
b.appendBodyStmtFromScope(&fetchCon, fetchScope, "" /* stmtTag */)
b.appendBodyStmtFromScope(&fetchCon, fetchScope, nil /* stmt */)
b.appendPlpgSQLStmts(&fetchCon, stmts[i+1:])
return b.callContinuation(&fetchCon, s)
}
Expand Down Expand Up @@ -1173,7 +1173,7 @@ func (b *plpgsqlBuilder) buildPLpgSQLStatements(stmts []ast.Statement, s *scope)
intoScope = b.callContinuation(&retCon, intoScope)

// Add the built statement to the FETCH continuation.
b.appendBodyStmtFromScope(&fetchCon, intoScope, "" /* stmtTag */)
b.appendBodyStmtFromScope(&fetchCon, intoScope, nil /* stmt */)
return b.callContinuation(&fetchCon, s)

case *ast.Null:
Expand Down Expand Up @@ -1275,7 +1275,7 @@ func (b *plpgsqlBuilder) buildPLpgSQLStatements(stmts []ast.Statement, s *scope)
if len(target) == 0 {
// When there is no INTO target, build the nested procedure call into a
// body statement that is only executed for its side effects.
b.appendBodyStmtFromScope(&callCon, callScope, "" /* stmtTag */)
b.appendBodyStmtFromScope(&callCon, callScope, nil /* stmt */)
b.appendPlpgSQLStmts(&callCon, stmts[i+1:])
return b.callContinuation(&callCon, s)
}
Expand All @@ -1290,7 +1290,7 @@ func (b *plpgsqlBuilder) buildPLpgSQLStatements(stmts []ast.Statement, s *scope)
intoScope = b.callContinuation(&retCon, intoScope)

// Add the built statement to the CALL continuation.
b.appendBodyStmtFromScope(&callCon, intoScope, "" /* stmtTag */)
b.appendBodyStmtFromScope(&callCon, intoScope, nil /* stmt */)
return b.callContinuation(&callCon, s)

case *ast.DoBlock:
Expand All @@ -1306,7 +1306,7 @@ func (b *plpgsqlBuilder) buildPLpgSQLStatements(stmts []ast.Statement, s *scope)
doCon := b.makeContinuation("_stmt_do")
doCon.def.Volatility = volatility.Volatile
bodyScope := b.ob.buildPLpgSQLDoBody(t)
b.appendBodyStmtFromScope(&doCon, bodyScope, "" /* stmtTag */)
b.appendBodyStmtFromScope(&doCon, bodyScope, nil /* stmt */)
b.appendPlpgSQLStmts(&doCon, stmts[i+1:])
return b.callContinuation(&doCon, s)

Expand Down Expand Up @@ -1456,7 +1456,7 @@ func (b *plpgsqlBuilder) handleIntForLoop(
)
// Call recursively into the loop body continuation.
incScope = b.callContinuation(&loopCon, incScope)
b.appendBodyStmtFromScope(&incrementCon, incScope, "" /* stmtTag */)
b.appendBodyStmtFromScope(&incrementCon, incScope, nil /* stmt */)

// Notably, we call the loop body continuation here, rather than the
// increment continuation, because the counter should not be incremented
Expand Down Expand Up @@ -2020,7 +2020,7 @@ func (b *plpgsqlBuilder) buildEndOfFunctionRaise(con *continuation) {
pgcode.RoutineExceptionFunctionExecutedNoReturnStatement.String(), /* code */
)
con.def.Volatility = volatility.Volatile
b.appendBodyStmtFromScope(con, b.buildPLpgSQLRaise(con.s, args), "" /* stmtTag */)
b.appendBodyStmtFromScope(con, b.buildPLpgSQLRaise(con.s, args), nil /* stmt */)

// Build a dummy statement that returns NULL. It won't be executed, but
// ensures that the continuation routine's return type is correct.
Expand All @@ -2029,7 +2029,7 @@ func (b *plpgsqlBuilder) buildEndOfFunctionRaise(con *continuation) {
typedNull := b.ob.factory.ConstructNull(b.returnType)
b.ob.synthesizeColumn(eofScope, eofColName, b.returnType, nil /* expr */, typedNull)
b.ob.constructProjectForScope(con.s, eofScope)
b.appendBodyStmtFromScope(con, eofScope, "" /* stmtTag */)
b.appendBodyStmtFromScope(con, eofScope, nil /* stmt */)
}

// addOneRowCheck handles INTO STRICT, where a SQL statement is required to
Expand Down Expand Up @@ -2280,7 +2280,7 @@ func (b *plpgsqlBuilder) makeContinuationWithTyp(
// routine definitions, which need to push the continuation before it is
// finished. The separation also allows for appending multiple body statements.
func (b *plpgsqlBuilder) appendBodyStmtFromScope(
con *continuation, bodyScope *scope, stmtTag string,
con *continuation, bodyScope *scope, stmt tree.Statement,
) {
// Set the volatility of the continuation routine to the least restrictive
// volatility level in the Relational properties of the body statements.
Expand All @@ -2290,8 +2290,11 @@ func (b *plpgsqlBuilder) appendBodyStmtFromScope(
con.def.Volatility = vol
}
con.def.Body = append(con.def.Body, bodyExpr)
con.def.BodyTags = append(con.def.BodyTags, stmtTag)
con.def.BodyProps = append(con.def.BodyProps, bodyScope.makePhysicalProps())
if stmt != nil {
con.def.BodyTags = append(con.def.BodyTags, stmt.StatementTag())
con.def.BodyASTs = append(con.def.BodyASTs, stmt)
}
}

// appendPlpgSQLStmts builds the given PLpgSQL statements into a relational
Expand All @@ -2301,7 +2304,7 @@ func (b *plpgsqlBuilder) appendPlpgSQLStmts(con *continuation, stmts []ast.State
// Make sure to push s before constructing the continuation scope to ensure
// that the parameter columns are not projected.
continuationScope := b.buildPLpgSQLStatements(stmts, con.s.push())
b.appendBodyStmtFromScope(con, continuationScope, "" /* stmtTag */)
b.appendBodyStmtFromScope(con, continuationScope, nil /* stmt */)
}

// callContinuation adds a column that projects the result of calling the
Expand Down
6 changes: 6 additions & 0 deletions pkg/sql/opt/optbuilder/routine.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,10 +420,15 @@ func (b *Builder) buildRoutine(
var bodyProps []*physical.Required
var bodyStmts []string
var bodyTags []string
var bodyASTs []tree.Statement
switch o.Language {
case tree.RoutineLangSQL:
// Parse the function body.
stmts, err := parser.Parse(o.Body)
bodyASTs = make([]tree.Statement, len(stmts))
for i := range stmts {
bodyASTs[i] = stmts[i].AST
}
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -539,6 +544,7 @@ func (b *Builder) buildRoutine(
BodyProps: bodyProps,
BodyStmts: bodyStmts,
BodyTags: bodyTags,
BodyASTs: bodyASTs,
Params: params,
ResultBufferID: resultBufferID,
},
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,8 @@ type planner struct {

instrumentation instrumentationHelper

statsCollector *sslocal.StatsCollector

// Contexts for different stages of planning and execution.
semaCtx tree.SemaContext
extendedEvalCtx extendedEvalContext
Expand Down Expand Up @@ -495,6 +497,7 @@ func newInternalPlanner(
p.schemaResolver.authAccessor = p
p.evalCatalogBuiltins.Init(execCfg.Codec, p.txn, p.Descriptors())
p.extendedEvalCtx.CatalogBuiltins = &p.evalCatalogBuiltins
p.statsCollector = &sslocal.StatsCollector{}

return p, func() {
// Note that we capture ctx here. This is only valid as long as we create
Expand Down
Loading
Loading