Skip to content

Commit 8441070

Browse files
committed
stmtdiagnostics: Add support for transaction diagnostics
Adds a new TxnRegistry and other supporting structs to support the collection of transaction diagnostic bundles. The TxnRegistry adds functionality to: - Register a TxnRequest - defines the criteria for collecting a transaction diagnostic bundle - Start collecting a transaction bundle - This is done by checking that a statement fingerprint id matches the first statement fingerprint id in a TxnRequest - Save a transaction diagnostic bundle upon completion to be downloaded in the future Since the system tables to persist transaction diagnostics and transaction diagnostics requests don't exist yet, this commit only registers requests in the local registry. A future commit will add request and diagnostic persistence, as well as add polling logic to register requests created in other gateway nodes. Part of: CRDB-5342 Epic: CRDB-53541 Release note: None
1 parent 79fb2a9 commit 8441070

File tree

3 files changed

+629
-1
lines changed

3 files changed

+629
-1
lines changed

pkg/sql/stmtdiagnostics/BUILD.bazel

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
22

33
go_library(
44
name = "stmtdiagnostics",
5-
srcs = ["statement_diagnostics.go"],
5+
srcs = [
6+
"statement_diagnostics.go",
7+
"txn_diagnostics.go",
8+
],
69
importpath = "github.com/cockroachdb/cockroach/pkg/sql/stmtdiagnostics",
710
visibility = ["//visibility:public"],
811
deps = [
@@ -29,12 +32,14 @@ go_test(
2932
"main_test.go",
3033
"statement_diagnostics_helpers_test.go",
3134
"statement_diagnostics_test.go",
35+
"txn_diagnostics_test.go",
3236
],
3337
embed = [":stmtdiagnostics"],
3438
tags = ["no-remote-exec"],
3539
deps = [
3640
"//pkg/base",
3741
"//pkg/keys",
42+
"//pkg/kv",
3843
"//pkg/kv/kvpb",
3944
"//pkg/kv/kvserver",
4045
"//pkg/roachpb",
@@ -44,6 +49,7 @@ go_test(
4449
"//pkg/settings/cluster",
4550
"//pkg/sql",
4651
"//pkg/sql/catalog/systemschema",
52+
"//pkg/sql/isql",
4753
"//pkg/sql/sqlerrors",
4854
"//pkg/testutils",
4955
"//pkg/testutils/serverutils",
@@ -53,6 +59,7 @@ go_test(
5359
"//pkg/util/leaktest",
5460
"//pkg/util/log",
5561
"//pkg/util/syncutil",
62+
"//pkg/util/timeutil",
5663
"//pkg/util/uuid",
5764
"@com_github_cockroachdb_errors//:errors",
5865
"@com_github_stretchr_testify//require",
Lines changed: 284 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,284 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package stmtdiagnostics
7+
8+
import (
9+
"context"
10+
"math/rand"
11+
"time"
12+
13+
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
14+
"github.com/cockroachdb/cockroach/pkg/sql/isql"
15+
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
16+
"github.com/cockroachdb/cockroach/pkg/sql/types"
17+
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
18+
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
19+
"github.com/cockroachdb/errors"
20+
)
21+
22+
// TxnRequest describes a transaction diagnostic request for which a diagnostic
23+
// bundle should be collected.
24+
type TxnRequest struct {
25+
txnFingerprintId uint64
26+
stmtFingerprintsId []uint64
27+
redacted bool
28+
username string
29+
expiresAt time.Time
30+
minExecutionLatency time.Duration
31+
samplingProbability float64
32+
}
33+
34+
func (t *TxnRequest) StmtFingerprintIds() []uint64 {
35+
return t.stmtFingerprintsId
36+
}
37+
38+
func (t *TxnRequest) IsRedacted() bool {
39+
return t.redacted
40+
}
41+
42+
func (t *TxnRequest) Username() string {
43+
return t.username
44+
}
45+
46+
func (t *TxnRequest) isExpired(now time.Time) bool {
47+
return !t.expiresAt.IsZero() && t.expiresAt.Before(now)
48+
}
49+
50+
func (t *TxnRequest) isConditional() bool {
51+
return t.minExecutionLatency != 0
52+
}
53+
54+
// TxnDiagnostic is a container for all the diagnostic data that has been
55+
// collected and will be persisted for a transaction. This will be downloadable
56+
// as a transaction diagnostic bundle
57+
type TxnDiagnostic struct {
58+
stmtDiagnostics []StmtDiagnostic
59+
}
60+
61+
func NewTxnDiagnostic(stmtDiagnostics []StmtDiagnostic) TxnDiagnostic {
62+
return TxnDiagnostic{stmtDiagnostics: stmtDiagnostics}
63+
}
64+
65+
// TxnRegistry maintains a view on the transactions on which a diagnostic
66+
// bundle should be collected. It is responsible for saving new requests
67+
// to the transaction diagnostics requests table, deciding whether a
68+
// diagnostic bundle should be collected for a transaction, and persisting
69+
// recorded diagnostics to the transaction diagnostics table.
70+
type TxnRegistry struct {
71+
st *cluster.Settings
72+
db isql.DB
73+
StmtRegistry *Registry
74+
mu struct {
75+
// NOTE: This lock can't be held while the registry runs any statements
76+
// internally; it'd deadlock.
77+
syncutil.Mutex
78+
// requests is a map of all the transaction diagnostic requests that are
79+
// pending to be collected. Requests will be removed from this map once
80+
// it has been fulfilled, has expired, or if moved to the
81+
// unconditionalOngoingRequests map.
82+
requests map[RequestID]TxnRequest
83+
// unconditionalOngoingRequests contains requests that are currently being
84+
// recorded and expected to be recorded unconditionally. This means that
85+
// these requests should be recorded on their next execution, regardless
86+
// of the transaction's latency or other conditions.
87+
unconditionalOngoingRequests map[RequestID]TxnRequest
88+
rand *rand.Rand
89+
}
90+
}
91+
92+
func NewTxnRegistry(
93+
db isql.DB, st *cluster.Settings, stmtDiagnosticsRegistry *Registry,
94+
) *TxnRegistry {
95+
r := &TxnRegistry{
96+
db: db,
97+
st: st,
98+
StmtRegistry: stmtDiagnosticsRegistry,
99+
}
100+
r.mu.rand = rand.New(rand.NewSource(timeutil.Now().UnixNano()))
101+
r.mu.requests = make(map[RequestID]TxnRequest)
102+
r.mu.unconditionalOngoingRequests = make(map[RequestID]TxnRequest)
103+
return r
104+
}
105+
106+
// ShouldStartTxnDiagnostic returns the first txn request whose first
107+
// statement fingerprint id matches the provided stmtFingerprintId. There may
108+
// be multiple transaction diagnostic requests that the stmtFingerprintId
109+
// matches, in which case we will stop at the first one we find.
110+
func (r *TxnRegistry) ShouldStartTxnDiagnostic(
111+
ctx context.Context, stmtFingerprintId uint64,
112+
) (shouldCollect bool, reqID RequestID, req TxnRequest) {
113+
r.mu.Lock()
114+
defer r.mu.Unlock()
115+
116+
if len(r.mu.requests) == 0 {
117+
return false, 0, req
118+
}
119+
120+
for id, f := range r.mu.requests {
121+
if len(f.stmtFingerprintsId) > 0 && f.stmtFingerprintsId[0] == stmtFingerprintId {
122+
if f.isExpired(timeutil.Now()) {
123+
delete(r.mu.requests, id)
124+
return false, 0, req
125+
}
126+
req = f
127+
reqID = id
128+
break
129+
}
130+
}
131+
if reqID == 0 {
132+
return false, 0, TxnRequest{}
133+
}
134+
135+
// Unconditional requests are those that will be recorded on the next
136+
// execution. In this case, we move the request to the unconditional
137+
// ongoing requests map, so that it is not considered for future
138+
// transactions until it is reset.
139+
if !req.isConditional() {
140+
r.mu.unconditionalOngoingRequests[reqID] = req
141+
delete(r.mu.requests, reqID)
142+
}
143+
144+
if req.samplingProbability == 0 || r.mu.rand.Float64() < req.samplingProbability {
145+
return true, reqID, req
146+
}
147+
return false, 0, TxnRequest{}
148+
}
149+
150+
func (r *TxnRegistry) InsertTxnRequest(
151+
ctx context.Context,
152+
txnFingerprintId uint64,
153+
stmtFingerprintIds []uint64,
154+
username string,
155+
samplingProbability float64,
156+
minExecutionLatency time.Duration,
157+
expiresAfter time.Duration,
158+
redacted bool,
159+
) error {
160+
if samplingProbability != 0 {
161+
if samplingProbability < 0 || samplingProbability > 1 {
162+
return errors.Newf(
163+
"expected sampling probability in range [0.0, 1.0], got %f",
164+
samplingProbability)
165+
}
166+
if minExecutionLatency == 0 {
167+
return errors.Newf(
168+
"got non-zero sampling probability %f and empty min exec latency",
169+
samplingProbability)
170+
}
171+
}
172+
173+
var reqID RequestID = RequestID(rand.Int())
174+
var expiresAt time.Time
175+
if expiresAfter != 0 {
176+
expiresAt = timeutil.Now().Add(expiresAfter)
177+
}
178+
// TODO: insert the request into system.txn_diagnostics_requests once the table is made
179+
func() {
180+
r.mu.Lock()
181+
defer r.mu.Unlock()
182+
r.addTxnRequestInternalLocked(
183+
ctx, reqID, txnFingerprintId, stmtFingerprintIds, samplingProbability,
184+
minExecutionLatency, expiresAt, redacted, username,
185+
)
186+
}()
187+
188+
return nil
189+
}
190+
191+
// ResetTxnRequest moves the TxnRequest of the given requestID from the ongoing
192+
// requests map back to the requests map, which makes it available to be picked
193+
// up to be recorded again.
194+
func (r *TxnRegistry) ResetTxnRequest(requestID RequestID) (req TxnRequest, ok bool) {
195+
if requestID == 0 {
196+
return TxnRequest{}, false
197+
}
198+
199+
r.mu.Lock()
200+
defer r.mu.Unlock()
201+
202+
req, ok = r.mu.unconditionalOngoingRequests[requestID]
203+
if !ok {
204+
return TxnRequest{}, false
205+
}
206+
207+
delete(r.mu.unconditionalOngoingRequests, requestID)
208+
r.mu.requests[requestID] = req
209+
210+
return req, true
211+
}
212+
213+
// InsertTxnDiagnostic persists the collected transaction diagnostic bundle. It
214+
// will persist all the collected statement diagnostic bundles as well as the
215+
// transaction trace, and update the request as completed.
216+
func (r *TxnRegistry) InsertTxnDiagnostic(
217+
ctx context.Context, requestId RequestID, request TxnRequest, diagnostic TxnDiagnostic,
218+
) (CollectedInstanceID, error) {
219+
var txnDiagnosticId CollectedInstanceID
220+
err := r.db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
221+
txn.KV().SetDebugName("txn-diag-insert-bundle")
222+
stmtDiagnostics := tree.NewDArray(types.Int)
223+
for _, sd := range diagnostic.stmtDiagnostics {
224+
id, err := r.StmtRegistry.innerInsertStatementDiagnostics(ctx, sd, txn)
225+
if err != nil {
226+
return err
227+
}
228+
if err = stmtDiagnostics.Append(tree.NewDInt(tree.DInt(id))); err != nil {
229+
return err
230+
}
231+
}
232+
233+
txnDiagnosticId = CollectedInstanceID(rand.Int())
234+
235+
// TODO: insert into txn_diagnostics once the table is made
236+
237+
// TODO: mark request complete in txn_diagnostics_requests once the table is made
238+
239+
return nil
240+
})
241+
242+
return txnDiagnosticId, err
243+
}
244+
245+
func (r *TxnRegistry) addTxnRequestInternalLocked(
246+
ctx context.Context,
247+
id RequestID,
248+
txnFingerprintId uint64,
249+
stmtFingerprintsId []uint64,
250+
samplingProbability float64,
251+
minExecutionLatency time.Duration,
252+
expiresAt time.Time,
253+
redacted bool,
254+
username string,
255+
) {
256+
if r.findTxnRequestLocked(id) {
257+
return
258+
}
259+
if r.mu.requests == nil {
260+
r.mu.requests = make(map[RequestID]TxnRequest)
261+
}
262+
request := TxnRequest{
263+
txnFingerprintId: txnFingerprintId,
264+
stmtFingerprintsId: stmtFingerprintsId,
265+
redacted: redacted,
266+
username: username,
267+
expiresAt: expiresAt,
268+
minExecutionLatency: minExecutionLatency,
269+
samplingProbability: samplingProbability,
270+
}
271+
r.mu.requests[id] = request
272+
}
273+
274+
func (r *TxnRegistry) findTxnRequestLocked(requestID RequestID) bool {
275+
f, ok := r.mu.requests[requestID]
276+
if ok {
277+
if f.isExpired(timeutil.Now()) {
278+
delete(r.mu.requests, requestID)
279+
}
280+
return true
281+
}
282+
_, ok = r.mu.unconditionalOngoingRequests[requestID]
283+
return ok
284+
}

0 commit comments

Comments
 (0)