Skip to content

Commit eb654e4

Browse files
craig[bot]kyle-a-wong
andcommitted
Merge #153296
153296: sql: add txn diagnostics collection to instrumentation helper r=kyle-a-wong a=kyle-a-wong commit 1/2: When calling the setup method on instrumentationhelper (ih), ih will now check if transaction diagnostics should should be collected. Transaction diagnostics collection uses a prefix based matching system to determine whether or not transaction diagnostics collection should start, continue, or stop. This works by creating a list of statement fingerprint ids, that make up a transaction fingerprint, and comparing the currently executing statement fingerprint id against this list. If at any point the expected statement fingerprint id doesn't match, diagnostics collection is short circuited and no longer collected. This change also causes transaction diagnostics collection to take precedence over statement transaction diagnostics, meaning if a transaction diagnostics request exists that contains a statement that also has a statement diagnostics request, the transaction diagnostic request gets priority over collecting the bundle. When a transaction diagnostics request is being fulfilled, statement bundles for every statement in the transaction are built and collected in memory. Only after a transaction is finalized and it is determined that the request is fulfilled are all the statement bundles and the transaction trace stored in the corresponding system tables. Resolves: [CRDB-54321](https://cockroachlabs.atlassian.net/browse/CRDB-54321) Epic: [CRDB-53541](https://cockroachlabs.atlassian.net/browse/CRDB-53541) Release note: None ---- commit 2/2: sql: replace statement fingerprint id construction with fingerprint id from instrumentation helper The instrumentation helper now generates a statement fingerprint id in its setup function and stores it in its state. Now, when fingerprint ids are needed for sql stats collection or telemetry logging, the fingerprint id from the instrumentation helper is used. Epic: None Release note: None Co-authored-by: Kyle Wong <[email protected]>
2 parents cc94980 + f8a4192 commit eb654e4

11 files changed

+628
-138
lines changed

pkg/sql/conn_executor.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1171,6 +1171,7 @@ func (s *Server) newConnExecutor(
11711171
connCtx: ctx,
11721172
testingForceRealTracingSpans: s.cfg.TestingKnobs.ForceRealTracingSpans,
11731173
execType: executorType,
1174+
txnInstrumentationHelper: txnInstrumentationHelper{TxnDiagnosticsRecorder: s.cfg.TxnDiagnosticsRecorder},
11741175
},
11751176
transitionCtx: transitionCtx{
11761177
db: s.cfg.DB,
@@ -1195,6 +1196,7 @@ func (s *Server) newConnExecutor(
11951196
executorType: executorType,
11961197
hasCreatedTemporarySchema: false,
11971198
stmtDiagnosticsRecorder: s.cfg.StmtDiagnosticsRecorder,
1199+
txnDiagnosticsRecorder: s.cfg.TxnDiagnosticsRecorder,
11981200
indexUsageStats: s.indexUsageStats,
11991201
txnIDCacheWriter: s.txnIDCache,
12001202
totalActiveTimeStopWatch: timeutil.NewStopWatch(),
@@ -1871,6 +1873,7 @@ type connExecutor struct {
18711873
// stmtDiagnosticsRecorder is used to track which queries need to have
18721874
// information collected.
18731875
stmtDiagnosticsRecorder *stmtdiagnostics.Registry
1876+
txnDiagnosticsRecorder *stmtdiagnostics.TxnRegistry
18741877

18751878
// indexUsageStats is used to track index usage stats.
18761879
indexUsageStats *idxusage.LocalIndexUsageStats

pkg/sql/conn_executor_exec.go

Lines changed: 6 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -565,13 +565,8 @@ func (ex *connExecutor) execStmtInOpenState(
565565

566566
// This goroutine is the only one that can modify txnState.mu.priority and
567567
// txnState.mu.autoRetryCounter, so we don't need to get a mutex here.
568-
ctx = ih.Setup(
569-
ctx, ex.server.cfg, ex.statsCollector, p, ex.stmtDiagnosticsRecorder,
570-
&stmt, os.ImplicitTxn.Get(),
571-
ex.state.mu.priority,
572-
ex.extraTxnState.shouldCollectTxnExecutionStats,
573-
ex.state.mu.autoRetryCounter,
574-
)
568+
ctx = ih.Setup(ctx, ex, p, &stmt, os.ImplicitTxn.Get(),
569+
ex.state.mu.priority, ex.state.mu.autoRetryCounter)
575570

576571
// Note that here we always unconditionally defer a function that takes care
577572
// of finishing the instrumentation helper. This is needed since in order to
@@ -580,9 +575,7 @@ func (ex *connExecutor) execStmtInOpenState(
580575
defer func() {
581576
if ih.needFinish {
582577
retErr = ih.Finish(
583-
ex.server.cfg,
584-
ex.statsCollector,
585-
&ex.extraTxnState.accumulatedStats,
578+
ex,
586579
ih.collectExecStats,
587580
p,
588581
ast,
@@ -1462,13 +1455,8 @@ func (ex *connExecutor) execStmtInOpenStateWithPausablePortal(
14621455
// This goroutine is the only one that can modify txnState.mu.priority and
14631456
// txnState.mu.autoRetryCounter, so we don't need to get a mutex here.
14641457
if !portal.isPausable() || portal.pauseInfo.execStmtInOpenState.ihWrapper == nil {
1465-
ctx = ih.Setup(
1466-
ctx, ex.server.cfg, ex.statsCollector, p, ex.stmtDiagnosticsRecorder,
1467-
&vars.stmt, os.ImplicitTxn.Get(),
1468-
ex.state.mu.priority,
1469-
ex.extraTxnState.shouldCollectTxnExecutionStats,
1470-
ex.state.mu.autoRetryCounter,
1471-
)
1458+
ctx = ih.Setup(ctx, ex, p, &vars.stmt, os.ImplicitTxn.Get(),
1459+
ex.state.mu.priority, ex.state.mu.autoRetryCounter)
14721460
} else {
14731461
ctx = portal.pauseInfo.execStmtInOpenState.ihWrapper.ctx
14741462
}
@@ -1509,9 +1497,7 @@ func (ex *connExecutor) execStmtInOpenStateWithPausablePortal(
15091497
}
15101498
if ihToFinish.needFinish {
15111499
retErr = ihToFinish.Finish(
1512-
ex.server.cfg,
1513-
ex.statsCollector,
1514-
&ex.extraTxnState.accumulatedStats,
1500+
ex,
15151501
ihToFinish.collectExecStats,
15161502
p,
15171503
vars.ast,

pkg/sql/conn_executor_internal_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import (
3939
"github.com/cockroachdb/cockroach/pkg/util/log"
4040
"github.com/cockroachdb/cockroach/pkg/util/mon"
4141
"github.com/cockroachdb/cockroach/pkg/util/stop"
42+
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
4243
"github.com/cockroachdb/cockroach/pkg/util/uuid"
4344
"github.com/stretchr/testify/require"
4445
)
@@ -293,6 +294,7 @@ func startConnExecutor(
293294
// This pool should never be Stop()ed because, if the test is failing, memory
294295
// is not properly released.
295296
collectionFactory := descs.NewBareBonesCollectionFactory(st, keys.SystemSQLCodec)
297+
registry := stmtdiagnostics.NewRegistry(nil, st)
296298
cfg := &ExecutorConfig{
297299
AmbientCtx: ambientCtx,
298300
Settings: st,
@@ -339,7 +341,8 @@ func startConnExecutor(
339341
),
340342
QueryCache: querycache.New(0),
341343
TestingKnobs: ExecutorTestingKnobs{},
342-
StmtDiagnosticsRecorder: stmtdiagnostics.NewRegistry(nil, st),
344+
StmtDiagnosticsRecorder: registry,
345+
TxnDiagnosticsRecorder: stmtdiagnostics.NewTxnRegistry(nil, st, registry, timeutil.DefaultTimeSource{}),
343346
HistogramWindowInterval: base.DefaultHistogramWindowInterval(),
344347
CollectionFactory: collectionFactory,
345348
LicenseEnforcer: license.NewEnforcer(nil),

pkg/sql/exec_log.go

Lines changed: 1 addition & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -347,23 +347,7 @@ func (p *planner) maybeLogStatementInternal(
347347
// overhead latency: txn/retry management, error checking, etc
348348
execOverheadNanos := svcLatNanos - processingLatNanos
349349

350-
// If the statement was recorded by the stats collector, we can extract
351-
// the statement fingerprint ID. Otherwise, we'll need to compute it from the AST.
352-
stmtFingerprintID := statsCollector.StatementFingerprintID()
353-
if stmtFingerprintID == 0 {
354-
repQuery := p.stmt.StmtNoConstants
355-
if repQuery == "" {
356-
flags := tree.FmtFlags(tree.QueryFormattingForFingerprintsMask.Get(&p.execCfg.Settings.SV))
357-
f := tree.NewFmtCtx(flags)
358-
f.FormatNode(p.stmt.AST)
359-
repQuery = f.CloseAndGetString()
360-
}
361-
stmtFingerprintID = appstatspb.ConstructStatementFingerprintID(
362-
repQuery,
363-
implicitTxn,
364-
p.CurrentDatabase(),
365-
)
366-
}
350+
stmtFingerprintID := p.instrumentation.fingerprintId
367351

368352
sampledQuery := getSampledQuery()
369353
defer releaseSampledQuery(sampledQuery)

pkg/sql/executor_statement_metrics.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -191,8 +191,7 @@ func (ex *connExecutor) recordStatementSummary(
191191
}
192192
startTime := phaseTimes.GetSessionPhaseTime(sessionphase.PlannerStartExecStmt).ToUTC()
193193
implicitTxn := flags.IsSet(planFlagImplicitTxn)
194-
stmtFingerprintID := appstatspb.ConstructStatementFingerprintID(
195-
stmt.StmtNoConstants, implicitTxn, planner.SessionData().Database)
194+
stmtFingerprintID := planner.instrumentation.fingerprintId
196195
autoRetryReason := ex.state.mu.autoRetryReason
197196
if automaticRetryStmtCount > 0 {
198197
autoRetryReason = planner.autoRetryStmtReason

pkg/sql/instrumentation.go

Lines changed: 88 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"github.com/cockroachdb/cockroach/pkg/roachpb"
1818
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
1919
"github.com/cockroachdb/cockroach/pkg/settings"
20+
"github.com/cockroachdb/cockroach/pkg/sql/appstatspb"
2021
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
2122
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
2223
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
@@ -34,7 +35,6 @@ import (
3435
"github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb"
3536
"github.com/cockroachdb/cockroach/pkg/sql/sessionphase"
3637
"github.com/cockroachdb/cockroach/pkg/sql/sqlstats"
37-
"github.com/cockroachdb/cockroach/pkg/sql/sqlstats/sslocal"
3838
"github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry"
3939
"github.com/cockroachdb/cockroach/pkg/sql/stmtdiagnostics"
4040
"github.com/cockroachdb/cockroach/pkg/util"
@@ -46,6 +46,7 @@ import (
4646
"github.com/cockroachdb/cockroach/pkg/util/tracing"
4747
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
4848
"github.com/cockroachdb/errors"
49+
"go.opentelemetry.io/otel/attribute"
4950
)
5051

5152
var collectTxnStatsSampleRate = settings.RegisterFloatSetting(
@@ -77,6 +78,8 @@ type instrumentationHelper struct {
7778
// Query fingerprint (anonymized statement).
7879
fingerprint string
7980

81+
fingerprintId appstatspb.StmtFingerprintID
82+
8083
// Transaction information.
8184
implicitTxn bool
8285
txnPriority roachpb.UserPriority
@@ -410,16 +413,18 @@ func (ih *instrumentationHelper) finalizeSetup(ctx context.Context, cfg *Executo
410413
// which case Finish() is a no-op).
411414
func (ih *instrumentationHelper) Setup(
412415
ctx context.Context,
413-
cfg *ExecutorConfig,
414-
statsCollector *sslocal.StatsCollector,
416+
ex *connExecutor,
415417
p *planner,
416-
stmtDiagnosticsRecorder *stmtdiagnostics.Registry,
417418
stmt *Statement,
418419
implicitTxn bool,
419420
txnPriority roachpb.UserPriority,
420-
collectTxnExecStats bool,
421421
retryCount int32,
422422
) (newCtx context.Context) {
423+
cfg := ex.server.cfg
424+
statsCollector := ex.statsCollector
425+
stmtDiagnosticsRecorder := ex.stmtDiagnosticsRecorder
426+
collectTxnExecStats := ex.extraTxnState.shouldCollectTxnExecutionStats
427+
423428
ih.fingerprint = stmt.StmtNoConstants
424429
ih.implicitTxn = implicitTxn
425430
ih.txnPriority = txnPriority
@@ -431,6 +436,12 @@ func (ih *instrumentationHelper) Setup(
431436
ih.isTenant = execinfra.IncludeRUEstimateInExplainAnalyze.Get(cfg.SV()) && cfg.DistSQLSrv != nil &&
432437
cfg.DistSQLSrv.TenantCostController != nil
433438
ih.topLevelStats = topLevelQueryStats{}
439+
stmtFingerprintId := appstatspb.ConstructStatementFingerprintID(
440+
stmt.StmtNoConstants, implicitTxn, p.SessionData().Database)
441+
ih.fingerprintId = stmtFingerprintId
442+
ih.stmtDiagnosticsRecorder = stmtDiagnosticsRecorder
443+
ih.withStatementTrace = cfg.TestingKnobs.WithStatementTrace
444+
defer func() { ih.finalizeSetup(newCtx, cfg) }()
434445

435446
switch ih.outputMode {
436447
case explainAnalyzeDebugOutput:
@@ -445,17 +456,28 @@ func (ih *instrumentationHelper) Setup(
445456
ih.discardRows = true
446457

447458
default:
448-
ih.collectBundle, ih.diagRequestID, ih.diagRequest =
449-
stmtDiagnosticsRecorder.ShouldCollectDiagnostics(ctx, stmt.StmtNoConstants, "" /* planGist */)
450-
// IsRedacted will be false when ih.collectBundle is false.
451-
ih.explainFlags.RedactValues = ih.explainFlags.RedactValues || ih.diagRequest.IsRedacted()
459+
// Handle transaction-level diagnostics
460+
var collectingDiagnostics bool
461+
if ctx, collectingDiagnostics = ih.handleTransactionDiagnostics(
462+
ctx,
463+
&ex.state,
464+
ex.transitionCtx.tracer,
465+
stmt,
466+
stmtFingerprintId,
467+
); collectingDiagnostics {
468+
// If collectingDiagnostics is true, the instrumentationHelper should
469+
// already be set up for tracing, so we can return.
470+
return ctx
471+
} else {
472+
// If no transaction diagnostics are in progress, check for statement-level
473+
// diagnostics
474+
ih.collectBundle, ih.diagRequestID, ih.diagRequest =
475+
stmtDiagnosticsRecorder.ShouldCollectDiagnostics(ctx, stmt.StmtNoConstants, "" /* planGist */)
476+
// IsRedacted will be false when ih.collectBundle is false.
477+
ih.explainFlags.RedactValues = ih.explainFlags.RedactValues || ih.diagRequest.IsRedacted()
478+
}
452479
}
453480

454-
ih.stmtDiagnosticsRecorder = stmtDiagnosticsRecorder
455-
ih.withStatementTrace = cfg.TestingKnobs.WithStatementTrace
456-
457-
defer func() { ih.finalizeSetup(newCtx, cfg) }()
458-
459481
if sp := tracing.SpanFromContext(ctx); sp != nil {
460482
if sp.IsVerbose() {
461483
// If verbose tracing was enabled at a higher level, stats
@@ -477,7 +499,7 @@ func (ih *instrumentationHelper) Setup(
477499
} else {
478500
collectTxnExecStats = func() bool {
479501
if stmt.AST.StatementType() == tree.TypeTCL {
480-
// We don't collect stats for statements so there's no need
502+
// We don't collect stats for statements, so there's no need
481503
//to trace them.
482504
return false
483505
}
@@ -602,9 +624,7 @@ func (ih *instrumentationHelper) setupWithPlanGist(
602624
}
603625

604626
func (ih *instrumentationHelper) Finish(
605-
cfg *ExecutorConfig,
606-
statsCollector *sslocal.StatsCollector,
607-
txnStats *execstats.QueryLevelStats,
627+
ex *connExecutor,
608628
collectExecStats bool,
609629
p *planner,
610630
ast tree.Statement,
@@ -614,6 +634,10 @@ func (ih *instrumentationHelper) Finish(
614634
retErr error,
615635
) error {
616636
ctx := ih.origCtx
637+
cfg := ex.server.cfg
638+
statsCollector := ex.statsCollector
639+
txnStats := &ex.extraTxnState.accumulatedStats
640+
txnHelper := &ex.state.txnInstrumentationHelper
617641
if _, ok := ih.Tracing(); !ok {
618642
return retErr
619643
}
@@ -711,17 +735,28 @@ func (ih *instrumentationHelper) Finish(
711735
stmtRawSQL, &p.curPlan, planString, trace, placeholders, res.ErrAllowReleased(),
712736
payloadErr, retErr, &p.extendedEvalCtx.Settings.SV, ih.inFlightTraceCollector,
713737
)
714-
// Include all non-critical errors as warnings. Note that these
715-
// error strings might contain PII, but the warnings are only shown
716-
// to the current user and aren't included into the bundle.
717-
warnings = append(warnings, bundle.errorStrings...)
718-
bundle.insert(
719-
bundleCtx, ih.fingerprint, ast, cfg.StmtDiagnosticsRecorder, ih.diagRequestID, ih.diagRequest,
720-
)
738+
739+
if !txnHelper.DiagnosticsInProgress() {
740+
// Include all non-critical errors as warnings. Note that these
741+
// error strings might contain PII, but the warnings are only shown
742+
// to the current user and aren't included into the bundle.
743+
warnings = append(warnings, bundle.errorStrings...)
744+
bundle.insert(
745+
bundleCtx, ih.fingerprint, ast, cfg.StmtDiagnosticsRecorder, ih.diagRequestID, ih.diagRequest,
746+
)
747+
}
721748
telemetry.Inc(sqltelemetry.StatementDiagnosticsCollectedCounter)
722749
}
723750
}
724751

752+
if txnHelper.DiagnosticsInProgress() {
753+
// If we're collecting a transaction bundle, add the statement to the
754+
// current txn diagnostic bundle. These will be persisted at the end
755+
// of the transaction if the transaction matches the request.
756+
// NB: It is safe for bundle to be empty here.
757+
txnHelper.AddStatementBundle(ctx, ast, uint64(ih.fingerprintId), ih.fingerprint, bundle)
758+
}
759+
725760
// If there was a communication error already, no point in setting any
726761
// results.
727762
if retErr != nil {
@@ -1214,3 +1249,31 @@ func (ih *instrumentationHelper) SetIndexRecommendations(
12141249
reset,
12151250
)
12161251
}
1252+
1253+
// handleTransactionDiagnostics manages transaction-level diagnostics
1254+
// collection. If transaction diagnostics are being collected, the
1255+
// instrumentationHelper is set up to collect the bundle for the statement.
1256+
func (ih *instrumentationHelper) handleTransactionDiagnostics(
1257+
ctx context.Context,
1258+
txnState *txnState,
1259+
tracer *tracing.Tracer,
1260+
stmt *Statement,
1261+
stmtFingerprintId appstatspb.StmtFingerprintID,
1262+
) (newCtx context.Context, collectingDiagnostics bool) {
1263+
newCtx, collectingDiagnostics = txnState.shouldCollectTxnDiagnostics(ctx, uint64(stmtFingerprintId), stmt, tracer)
1264+
1265+
if collectingDiagnostics {
1266+
var stmtSp *tracing.Span
1267+
newCtx, stmtSp = tracing.ChildSpan(newCtx, "txn-diag-sql-query")
1268+
stmtSp.SetTag("statement", attribute.StringValue(stmt.SQL))
1269+
ih.collectBundle = true
1270+
ih.diagRequestID = 0
1271+
ih.diagRequest = stmtdiagnostics.Request{}
1272+
ih.explainFlags.RedactValues = txnState.txnInstrumentationHelper.ShouldRedact()
1273+
ih.shouldFinishSpan = true
1274+
ih.needFinish = true
1275+
ih.sp = stmtSp
1276+
}
1277+
1278+
return newCtx, collectingDiagnostics
1279+
}

pkg/sql/stmtdiagnostics/BUILD.bazel

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ go_library(
2828

2929
go_test(
3030
name = "stmtdiagnostics_test",
31-
size = "medium",
31+
size = "large",
3232
srcs = [
3333
"main_test.go",
3434
"statement_diagnostics_helpers_test.go",
@@ -37,6 +37,7 @@ go_test(
3737
"txn_diagnostics_test.go",
3838
],
3939
embed = [":stmtdiagnostics"],
40+
exec_properties = {"test.Pool": "large"},
4041
tags = ["no-remote-exec"],
4142
deps = [
4243
"//pkg/base",
@@ -53,6 +54,7 @@ go_test(
5354
"//pkg/sql/catalog/systemschema",
5455
"//pkg/sql/isql",
5556
"//pkg/sql/sqlerrors",
57+
"//pkg/sql/sqlstats",
5658
"//pkg/testutils",
5759
"//pkg/testutils/serverutils",
5860
"//pkg/testutils/skip",

0 commit comments

Comments
 (0)