@@ -17,6 +17,7 @@ import (
17
17
"github.com/cockroachdb/cockroach/pkg/roachpb"
18
18
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
19
19
"github.com/cockroachdb/cockroach/pkg/settings"
20
+ "github.com/cockroachdb/cockroach/pkg/sql/appstatspb"
20
21
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
21
22
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
22
23
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
@@ -34,7 +35,6 @@ import (
34
35
"github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb"
35
36
"github.com/cockroachdb/cockroach/pkg/sql/sessionphase"
36
37
"github.com/cockroachdb/cockroach/pkg/sql/sqlstats"
37
- "github.com/cockroachdb/cockroach/pkg/sql/sqlstats/sslocal"
38
38
"github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry"
39
39
"github.com/cockroachdb/cockroach/pkg/sql/stmtdiagnostics"
40
40
"github.com/cockroachdb/cockroach/pkg/util"
@@ -46,6 +46,7 @@ import (
46
46
"github.com/cockroachdb/cockroach/pkg/util/tracing"
47
47
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
48
48
"github.com/cockroachdb/errors"
49
+ "go.opentelemetry.io/otel/attribute"
49
50
)
50
51
51
52
var collectTxnStatsSampleRate = settings .RegisterFloatSetting (
@@ -77,6 +78,8 @@ type instrumentationHelper struct {
77
78
// Query fingerprint (anonymized statement).
78
79
fingerprint string
79
80
81
+ fingerprintId appstatspb.StmtFingerprintID
82
+
80
83
// Transaction information.
81
84
implicitTxn bool
82
85
txnPriority roachpb.UserPriority
@@ -410,16 +413,18 @@ func (ih *instrumentationHelper) finalizeSetup(ctx context.Context, cfg *Executo
410
413
// which case Finish() is a no-op).
411
414
func (ih * instrumentationHelper ) Setup (
412
415
ctx context.Context ,
413
- cfg * ExecutorConfig ,
414
- statsCollector * sslocal.StatsCollector ,
416
+ ex * connExecutor ,
415
417
p * planner ,
416
- stmtDiagnosticsRecorder * stmtdiagnostics.Registry ,
417
418
stmt * Statement ,
418
419
implicitTxn bool ,
419
420
txnPriority roachpb.UserPriority ,
420
- collectTxnExecStats bool ,
421
421
retryCount int32 ,
422
422
) (newCtx context.Context ) {
423
+ cfg := ex .server .cfg
424
+ statsCollector := ex .statsCollector
425
+ stmtDiagnosticsRecorder := ex .stmtDiagnosticsRecorder
426
+ collectTxnExecStats := ex .extraTxnState .shouldCollectTxnExecutionStats
427
+
423
428
ih .fingerprint = stmt .StmtNoConstants
424
429
ih .implicitTxn = implicitTxn
425
430
ih .txnPriority = txnPriority
@@ -431,6 +436,12 @@ func (ih *instrumentationHelper) Setup(
431
436
ih .isTenant = execinfra .IncludeRUEstimateInExplainAnalyze .Get (cfg .SV ()) && cfg .DistSQLSrv != nil &&
432
437
cfg .DistSQLSrv .TenantCostController != nil
433
438
ih .topLevelStats = topLevelQueryStats {}
439
+ stmtFingerprintId := appstatspb .ConstructStatementFingerprintID (
440
+ stmt .StmtNoConstants , implicitTxn , p .SessionData ().Database )
441
+ ih .fingerprintId = stmtFingerprintId
442
+ ih .stmtDiagnosticsRecorder = stmtDiagnosticsRecorder
443
+ ih .withStatementTrace = cfg .TestingKnobs .WithStatementTrace
444
+ defer func () { ih .finalizeSetup (newCtx , cfg ) }()
434
445
435
446
switch ih .outputMode {
436
447
case explainAnalyzeDebugOutput :
@@ -445,17 +456,28 @@ func (ih *instrumentationHelper) Setup(
445
456
ih .discardRows = true
446
457
447
458
default :
448
- ih .collectBundle , ih .diagRequestID , ih .diagRequest =
449
- stmtDiagnosticsRecorder .ShouldCollectDiagnostics (ctx , stmt .StmtNoConstants , "" /* planGist */ )
450
- // IsRedacted will be false when ih.collectBundle is false.
451
- ih .explainFlags .RedactValues = ih .explainFlags .RedactValues || ih .diagRequest .IsRedacted ()
459
+ // Handle transaction-level diagnostics
460
+ var collectingDiagnostics bool
461
+ if ctx , collectingDiagnostics = ih .handleTransactionDiagnostics (
462
+ ctx ,
463
+ & ex .state ,
464
+ ex .transitionCtx .tracer ,
465
+ stmt ,
466
+ stmtFingerprintId ,
467
+ ); collectingDiagnostics {
468
+ // If collectingDiagnostics is true, the instrumentationHelper should
469
+ // already be set up for tracing, so we can return.
470
+ return ctx
471
+ } else {
472
+ // If no transaction diagnostics are in progress, check for statement-level
473
+ // diagnostics
474
+ ih .collectBundle , ih .diagRequestID , ih .diagRequest =
475
+ stmtDiagnosticsRecorder .ShouldCollectDiagnostics (ctx , stmt .StmtNoConstants , "" /* planGist */ )
476
+ // IsRedacted will be false when ih.collectBundle is false.
477
+ ih .explainFlags .RedactValues = ih .explainFlags .RedactValues || ih .diagRequest .IsRedacted ()
478
+ }
452
479
}
453
480
454
- ih .stmtDiagnosticsRecorder = stmtDiagnosticsRecorder
455
- ih .withStatementTrace = cfg .TestingKnobs .WithStatementTrace
456
-
457
- defer func () { ih .finalizeSetup (newCtx , cfg ) }()
458
-
459
481
if sp := tracing .SpanFromContext (ctx ); sp != nil {
460
482
if sp .IsVerbose () {
461
483
// If verbose tracing was enabled at a higher level, stats
@@ -477,7 +499,7 @@ func (ih *instrumentationHelper) Setup(
477
499
} else {
478
500
collectTxnExecStats = func () bool {
479
501
if stmt .AST .StatementType () == tree .TypeTCL {
480
- // We don't collect stats for statements so there's no need
502
+ // We don't collect stats for statements, so there's no need
481
503
//to trace them.
482
504
return false
483
505
}
@@ -602,9 +624,7 @@ func (ih *instrumentationHelper) setupWithPlanGist(
602
624
}
603
625
604
626
func (ih * instrumentationHelper ) Finish (
605
- cfg * ExecutorConfig ,
606
- statsCollector * sslocal.StatsCollector ,
607
- txnStats * execstats.QueryLevelStats ,
627
+ ex * connExecutor ,
608
628
collectExecStats bool ,
609
629
p * planner ,
610
630
ast tree.Statement ,
@@ -614,6 +634,10 @@ func (ih *instrumentationHelper) Finish(
614
634
retErr error ,
615
635
) error {
616
636
ctx := ih .origCtx
637
+ cfg := ex .server .cfg
638
+ statsCollector := ex .statsCollector
639
+ txnStats := & ex .extraTxnState .accumulatedStats
640
+ txnHelper := & ex .state .txnInstrumentationHelper
617
641
if _ , ok := ih .Tracing (); ! ok {
618
642
return retErr
619
643
}
@@ -711,17 +735,28 @@ func (ih *instrumentationHelper) Finish(
711
735
stmtRawSQL , & p .curPlan , planString , trace , placeholders , res .ErrAllowReleased (),
712
736
payloadErr , retErr , & p .extendedEvalCtx .Settings .SV , ih .inFlightTraceCollector ,
713
737
)
714
- // Include all non-critical errors as warnings. Note that these
715
- // error strings might contain PII, but the warnings are only shown
716
- // to the current user and aren't included into the bundle.
717
- warnings = append (warnings , bundle .errorStrings ... )
718
- bundle .insert (
719
- bundleCtx , ih .fingerprint , ast , cfg .StmtDiagnosticsRecorder , ih .diagRequestID , ih .diagRequest ,
720
- )
738
+
739
+ if ! txnHelper .DiagnosticsInProgress () {
740
+ // Include all non-critical errors as warnings. Note that these
741
+ // error strings might contain PII, but the warnings are only shown
742
+ // to the current user and aren't included into the bundle.
743
+ warnings = append (warnings , bundle .errorStrings ... )
744
+ bundle .insert (
745
+ bundleCtx , ih .fingerprint , ast , cfg .StmtDiagnosticsRecorder , ih .diagRequestID , ih .diagRequest ,
746
+ )
747
+ }
721
748
telemetry .Inc (sqltelemetry .StatementDiagnosticsCollectedCounter )
722
749
}
723
750
}
724
751
752
+ if txnHelper .DiagnosticsInProgress () {
753
+ // If we're collecting a transaction bundle, add the statement to the
754
+ // current txn diagnostic bundle. These will be persisted at the end
755
+ // of the transaction if the transaction matches the request.
756
+ // NB: It is safe for bundle to be empty here.
757
+ txnHelper .AddStatementBundle (ctx , ast , uint64 (ih .fingerprintId ), ih .fingerprint , bundle )
758
+ }
759
+
725
760
// If there was a communication error already, no point in setting any
726
761
// results.
727
762
if retErr != nil {
@@ -1214,3 +1249,31 @@ func (ih *instrumentationHelper) SetIndexRecommendations(
1214
1249
reset ,
1215
1250
)
1216
1251
}
1252
+
1253
+ // handleTransactionDiagnostics manages transaction-level diagnostics
1254
+ // collection. If transaction diagnostics are being collected, the
1255
+ // instrumentationHelper is set up to collect the bundle for the statement.
1256
+ func (ih * instrumentationHelper ) handleTransactionDiagnostics (
1257
+ ctx context.Context ,
1258
+ txnState * txnState ,
1259
+ tracer * tracing.Tracer ,
1260
+ stmt * Statement ,
1261
+ stmtFingerprintId appstatspb.StmtFingerprintID ,
1262
+ ) (newCtx context.Context , collectingDiagnostics bool ) {
1263
+ newCtx , collectingDiagnostics = txnState .shouldCollectTxnDiagnostics (ctx , uint64 (stmtFingerprintId ), stmt , tracer )
1264
+
1265
+ if collectingDiagnostics {
1266
+ var stmtSp * tracing.Span
1267
+ newCtx , stmtSp = tracing .ChildSpan (newCtx , "txn-diag-sql-query" )
1268
+ stmtSp .SetTag ("statement" , attribute .StringValue (stmt .SQL ))
1269
+ ih .collectBundle = true
1270
+ ih .diagRequestID = 0
1271
+ ih .diagRequest = stmtdiagnostics.Request {}
1272
+ ih .explainFlags .RedactValues = txnState .txnInstrumentationHelper .ShouldRedact ()
1273
+ ih .shouldFinishSpan = true
1274
+ ih .needFinish = true
1275
+ ih .sp = stmtSp
1276
+ }
1277
+
1278
+ return newCtx , collectingDiagnostics
1279
+ }
0 commit comments