Skip to content

Commit 2a8bdf5

Browse files
craig[bot]kyle-a-wong
andcommitted
Merge #152886
152886: sql: Add txn instrumentation helper r=kyle-a-wong a=kyle-a-wong Adds a new txnInstrumentationHelper struct for managing the collection of transaction diagnostic bundles. This struct has APIs for determining if diagnostics collection should be started and should continue, to add a statement bundle to the transaction diagnostics, and to finalize a transaction diagnostics collection. Epic: [CRDB-53541](https://cockroachlabs.atlassian.net/browse/CRDB-53541) Release note: None Co-authored-by: Kyle Wong <[email protected]>
2 parents dcad761 + 6276d5d commit 2a8bdf5

File tree

7 files changed

+863
-102
lines changed

7 files changed

+863
-102
lines changed

pkg/sql/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,7 @@ go_library(
276276
"truncate.go",
277277
"two_phase_commit.go",
278278
"txn_fingerprint_id_cache.go",
279+
"txn_instrumentation.go",
279280
"txn_state.go",
280281
"type_change.go",
281282
"unary.go",
@@ -760,6 +761,7 @@ go_test(
760761
"tenant_test.go",
761762
"trace_test.go",
762763
"txn_fingerprint_id_cache_test.go",
764+
"txn_instrumentation_test.go",
763765
"txn_restart_test.go",
764766
"txn_state_test.go",
765767
"type_change_test.go",

pkg/sql/conn_executor.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3477,6 +3477,12 @@ func isCommit(stmt tree.Statement) bool {
34773477
return ok
34783478
}
34793479

3480+
// isRollback returns true if stmt is a "ROLLBACK" statement.
3481+
func isRollback(stmt tree.Statement) bool {
3482+
_, ok := stmt.(*tree.RollbackTransaction)
3483+
return ok
3484+
}
3485+
34803486
var retryableMinTimestampBoundUnsatisfiableError = errors.Newf(
34813487
"retryable MinTimestampBoundUnsatisfiableError",
34823488
)

pkg/sql/stmtdiagnostics/statement_diagnostics.go

Lines changed: 38 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -591,6 +591,41 @@ func (r *Registry) InsertStatementDiagnostics(
591591
return diagID, err
592592
}
593593

594+
func (r *Registry) insertBundleChunks(
595+
ctx context.Context, bundle []byte, description string, txn isql.Txn,
596+
) (*tree.DArray, error) {
597+
bundleChunksVal := tree.NewDArray(types.Int)
598+
bundleToUpload := bundle
599+
for len(bundleToUpload) > 0 {
600+
chunkSize := int(bundleChunkSize.Get(&r.st.SV))
601+
chunk := bundleToUpload
602+
if len(chunk) > chunkSize {
603+
chunk = chunk[:chunkSize]
604+
}
605+
bundleToUpload = bundleToUpload[len(chunk):]
606+
607+
// Insert the chunk into system.statement_bundle_chunks.
608+
row, err := txn.QueryRowEx(
609+
ctx, "stmt-bundle-chunks-insert", txn.KV(),
610+
sessiondata.NodeUserSessionDataOverride,
611+
"INSERT INTO system.statement_bundle_chunks(description, data) VALUES ($1, $2) RETURNING id",
612+
description,
613+
tree.NewDBytes(tree.DBytes(chunk)),
614+
)
615+
if err != nil {
616+
return nil, err
617+
}
618+
if row == nil {
619+
return nil, errors.New("failed to check statement bundle chunk")
620+
}
621+
chunkID := row[0].(*tree.DInt)
622+
if err := bundleChunksVal.Append(chunkID); err != nil {
623+
return nil, err
624+
}
625+
}
626+
return bundleChunksVal, nil
627+
}
628+
594629
func (r *Registry) innerInsertStatementDiagnostics(
595630
ctx context.Context, diagnostic StmtDiagnostic, txn isql.Txn,
596631
) (CollectedInstanceID, error) {
@@ -621,34 +656,9 @@ func (r *Registry) innerInsertStatementDiagnostics(
621656
errorVal = tree.NewDString(diagnostic.collectionErr.Error())
622657
}
623658

624-
bundleChunksVal := tree.NewDArray(types.Int)
625-
bundleToUpload := diagnostic.bundle
626-
for len(bundleToUpload) > 0 {
627-
chunkSize := int(bundleChunkSize.Get(&r.st.SV))
628-
chunk := bundleToUpload
629-
if len(chunk) > chunkSize {
630-
chunk = chunk[:chunkSize]
631-
}
632-
bundleToUpload = bundleToUpload[len(chunk):]
633-
634-
// Insert the chunk into system.statement_bundle_chunks.
635-
row, err := txn.QueryRowEx(
636-
ctx, "stmt-bundle-chunks-insert", txn.KV(),
637-
sessiondata.NodeUserSessionDataOverride,
638-
"INSERT INTO system.statement_bundle_chunks(description, data) VALUES ($1, $2) RETURNING id",
639-
"statement diagnostics bundle",
640-
tree.NewDBytes(tree.DBytes(chunk)),
641-
)
642-
if err != nil {
643-
return diagID, err
644-
}
645-
if row == nil {
646-
return diagID, errors.New("failed to check statement bundle chunk")
647-
}
648-
chunkID := row[0].(*tree.DInt)
649-
if err := bundleChunksVal.Append(chunkID); err != nil {
650-
return diagID, err
651-
}
659+
bundleChunksVal, err := r.insertBundleChunks(ctx, diagnostic.bundle, "statement diagnostics bundle", txn)
660+
if err != nil {
661+
return diagID, err
652662
}
653663

654664
collectionTime := timeutil.Now()

pkg/sql/stmtdiagnostics/txn_diagnostics.go

Lines changed: 38 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,40 @@ import (
2323
// bundle should be collected.
2424
type TxnRequest struct {
2525
txnFingerprintId uint64
26-
stmtFingerprintsId []uint64
26+
stmtFingerprintIds []uint64
2727
redacted bool
2828
username string
2929
expiresAt time.Time
3030
minExecutionLatency time.Duration
3131
samplingProbability float64
3232
}
3333

34+
func NewTxnRequest(
35+
txnFingerprintId uint64,
36+
stmtFingerprintIds []uint64,
37+
redacted bool,
38+
username string,
39+
expiresAt time.Time,
40+
minExecutionLatency time.Duration,
41+
samplingProbability float64,
42+
) TxnRequest {
43+
return TxnRequest{
44+
txnFingerprintId: txnFingerprintId,
45+
stmtFingerprintIds: stmtFingerprintIds,
46+
redacted: redacted,
47+
username: username,
48+
expiresAt: expiresAt,
49+
minExecutionLatency: minExecutionLatency,
50+
samplingProbability: samplingProbability,
51+
}
52+
}
53+
54+
func (t *TxnRequest) TxnFingerprintId() uint64 {
55+
return t.txnFingerprintId
56+
}
57+
3458
func (t *TxnRequest) StmtFingerprintIds() []uint64 {
35-
return t.stmtFingerprintsId
59+
return t.stmtFingerprintIds
3660
}
3761

3862
func (t *TxnRequest) IsRedacted() bool {
@@ -56,10 +80,11 @@ func (t *TxnRequest) isConditional() bool {
5680
// as a transaction diagnostic bundle
5781
type TxnDiagnostic struct {
5882
stmtDiagnostics []StmtDiagnostic
83+
bundle []byte
5984
}
6085

61-
func NewTxnDiagnostic(stmtDiagnostics []StmtDiagnostic) TxnDiagnostic {
62-
return TxnDiagnostic{stmtDiagnostics: stmtDiagnostics}
86+
func NewTxnDiagnostic(stmtDiagnostics []StmtDiagnostic, bundle []byte) TxnDiagnostic {
87+
return TxnDiagnostic{stmtDiagnostics: stmtDiagnostics, bundle: bundle}
6388
}
6489

6590
// TxnRegistry maintains a view on the transactions on which a diagnostic
@@ -118,7 +143,7 @@ func (r *TxnRegistry) ShouldStartTxnDiagnostic(
118143
}
119144

120145
for id, f := range r.mu.requests {
121-
if len(f.stmtFingerprintsId) > 0 && f.stmtFingerprintsId[0] == stmtFingerprintId {
146+
if len(f.stmtFingerprintIds) > 0 && f.stmtFingerprintIds[0] == stmtFingerprintId {
122147
if f.isExpired(timeutil.Now()) {
123148
delete(r.mu.requests, id)
124149
return false, 0, req
@@ -219,6 +244,13 @@ func (r *TxnRegistry) InsertTxnDiagnostic(
219244
var txnDiagnosticId CollectedInstanceID
220245
err := r.db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
221246
txn.KV().SetDebugName("txn-diag-insert-bundle")
247+
248+
_, err := r.StmtRegistry.insertBundleChunks(ctx, diagnostic.bundle, "transaction diagnostics bundle", txn)
249+
// Insert the transaction diagnostic bundle
250+
if err != nil {
251+
return err
252+
}
253+
// Insert all the statement diagnostics
222254
stmtDiagnostics := tree.NewDArray(types.Int)
223255
for _, sd := range diagnostic.stmtDiagnostics {
224256
id, err := r.StmtRegistry.innerInsertStatementDiagnostics(ctx, sd, txn)
@@ -261,7 +293,7 @@ func (r *TxnRegistry) addTxnRequestInternalLocked(
261293
}
262294
request := TxnRequest{
263295
txnFingerprintId: txnFingerprintId,
264-
stmtFingerprintsId: stmtFingerprintsId,
296+
stmtFingerprintIds: stmtFingerprintsId,
265297
redacted: redacted,
266298
username: username,
267299
expiresAt: expiresAt,

0 commit comments

Comments
 (0)