Skip to content

Commit c4e9a45

Browse files
committed
sqlstats,insights: add struct conversion functions
This commit adds functions converting sqlstats.Recorded{Stmt,Txn}Stats to `insight.{Statement,Transaction}`. Next, we will change insights to consume sqlstats.Recorded* objects for analysis and only create insight objects if we are putting them into the cache. Epic: none Release note: None
1 parent 53cdc3d commit c4e9a45

File tree

4 files changed

+133
-124
lines changed

4 files changed

+133
-124
lines changed

pkg/sql/sqlstats/insights/BUILD.bazel

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ go_library(
1414
"registry.go",
1515
"store.go",
1616
"test_utils.go",
17+
"util.go",
1718
],
1819
embed = [":insights_go_proto"],
1920
importpath = "github.com/cockroachdb/cockroach/pkg/sql/sqlstats/insights",
@@ -24,6 +25,8 @@ go_library(
2425
"//pkg/sql/appstatspb",
2526
"//pkg/sql/clusterunique",
2627
"//pkg/sql/contention/contentionutils",
28+
"//pkg/sql/pgwire/pgerror",
29+
"//pkg/sql/sqlstats",
2730
"//pkg/util/cache",
2831
"//pkg/util/intsets",
2932
"//pkg/util/metric",

pkg/sql/sqlstats/insights/util.go

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package insights
7+
8+
import (
9+
"time"
10+
11+
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
12+
"github.com/cockroachdb/cockroach/pkg/sql/sqlstats"
13+
"github.com/cockroachdb/redact"
14+
)
15+
16+
func MakeTxnInsight(value sqlstats.RecordedTxnStats) *Transaction {
17+
var retryReason string
18+
if value.AutoRetryReason != nil {
19+
retryReason = value.AutoRetryReason.Error()
20+
}
21+
22+
var cpuSQLNanos int64
23+
if value.ExecStats.CPUTime.Nanoseconds() >= 0 {
24+
cpuSQLNanos = value.ExecStats.CPUTime.Nanoseconds()
25+
}
26+
27+
var errorCode string
28+
var errorMsg redact.RedactableString
29+
if value.TxnErr != nil {
30+
errorCode = pgerror.GetPGCode(value.TxnErr).String()
31+
errorMsg = redact.Sprint(value.TxnErr)
32+
}
33+
34+
status := Transaction_Failed
35+
if value.Committed {
36+
status = Transaction_Completed
37+
}
38+
39+
var user, appName string
40+
if value.SessionData != nil {
41+
user = value.SessionData.User().Normalized()
42+
appName = value.SessionData.ApplicationName
43+
}
44+
45+
insight := &Transaction{
46+
ID: value.TransactionID,
47+
FingerprintID: value.FingerprintID,
48+
UserPriority: value.Priority.String(),
49+
ImplicitTxn: value.ImplicitTxn,
50+
Contention: &value.ExecStats.ContentionTime,
51+
StartTime: value.StartTime,
52+
EndTime: value.EndTime,
53+
User: user,
54+
ApplicationName: appName,
55+
RowsRead: value.RowsRead,
56+
RowsWritten: value.RowsWritten,
57+
RetryCount: value.RetryCount,
58+
AutoRetryReason: retryReason,
59+
CPUSQLNanos: cpuSQLNanos,
60+
LastErrorCode: errorCode,
61+
LastErrorMsg: errorMsg,
62+
Status: status,
63+
}
64+
65+
return insight
66+
}
67+
68+
func MakeStmtInsight(value sqlstats.RecordedStmtStats) *Statement {
69+
var autoRetryReason string
70+
if value.AutoRetryReason != nil {
71+
autoRetryReason = value.AutoRetryReason.Error()
72+
}
73+
74+
var contention *time.Duration
75+
var cpuSQLNanos int64
76+
if value.ExecStats != nil {
77+
contention = &value.ExecStats.ContentionTime
78+
cpuSQLNanos = value.ExecStats.CPUTime.Nanoseconds()
79+
}
80+
81+
var errorCode string
82+
var errorMsg redact.RedactableString
83+
if value.StatementError != nil {
84+
errorCode = pgerror.GetPGCode(value.StatementError).String()
85+
errorMsg = redact.Sprint(value.StatementError)
86+
}
87+
88+
insight := &Statement{
89+
ID: value.StatementID,
90+
FingerprintID: value.FingerprintID,
91+
LatencyInSeconds: value.ServiceLatencySec,
92+
Query: value.Query,
93+
Status: getInsightStatus(value.StatementError),
94+
StartTime: value.StartTime,
95+
EndTime: value.EndTime,
96+
FullScan: value.FullScan,
97+
PlanGist: value.PlanGist,
98+
Retries: int64(value.AutoRetryCount),
99+
AutoRetryReason: autoRetryReason,
100+
RowsRead: value.RowsRead,
101+
RowsWritten: value.RowsWritten,
102+
Nodes: value.Nodes,
103+
KVNodeIDs: value.KVNodeIDs,
104+
Contention: contention,
105+
IndexRecommendations: value.IndexRecommendations,
106+
Database: value.Database,
107+
CPUSQLNanos: cpuSQLNanos,
108+
ErrorCode: errorCode,
109+
ErrorMsg: errorMsg,
110+
}
111+
112+
return insight
113+
}
114+
115+
func getInsightStatus(statementError error) Statement_Status {
116+
if statementError == nil {
117+
return Statement_Completed
118+
}
119+
120+
return Statement_Failed
121+
}

pkg/sql/sqlstats/sslocal/BUILD.bazel

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ go_library(
2020
"//pkg/settings/cluster",
2121
"//pkg/sql/appstatspb",
2222
"//pkg/sql/clusterunique",
23-
"//pkg/sql/pgwire/pgerror",
2423
"//pkg/sql/sessionphase",
2524
"//pkg/sql/sqlstats",
2625
"//pkg/sql/sqlstats/insights",
@@ -32,7 +31,6 @@ go_library(
3231
"//pkg/util/syncutil",
3332
"//pkg/util/timeutil",
3433
"@com_github_cockroachdb_errors//:errors",
35-
"@com_github_cockroachdb_redact//:redact",
3634
],
3735
)
3836

pkg/sql/sqlstats/sslocal/sslocal_stats_collector.go

Lines changed: 9 additions & 122 deletions
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,14 @@ package sslocal
77

88
import (
99
"context"
10-
"time"
1110

1211
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
1312
"github.com/cockroachdb/cockroach/pkg/sql/appstatspb"
1413
"github.com/cockroachdb/cockroach/pkg/sql/clusterunique"
15-
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
1614
"github.com/cockroachdb/cockroach/pkg/sql/sessionphase"
1715
"github.com/cockroachdb/cockroach/pkg/sql/sqlstats"
1816
"github.com/cockroachdb/cockroach/pkg/sql/sqlstats/insights"
1917
"github.com/cockroachdb/cockroach/pkg/sql/sqlstats/ssmemstorage"
20-
"github.com/cockroachdb/redact"
2118
)
2219

2320
type bufferedStmtStats []sqlstats.RecordedStmtStats
@@ -39,8 +36,7 @@ type bufferedStmtStats []sqlstats.RecordedStmtStats
3936
//
4037
// 2. The insights subsystem (insightsWriter) which is used to
4138
// persist statement and transaction insights to an in-memory cache.
42-
// Events are sent to the insights subsystem for async processing in
43-
// observeStatement() and observeTransaction() respectively.
39+
// Events are sent to the insights subsystem for async processing.
4440
type StatsCollector struct {
4541

4642
// stmtBuf contains the current transaction's statement
@@ -216,130 +212,18 @@ func (s *StatsCollector) SetStatementSampled(
216212
s.flushTarget.TrySetStatementSampled(fingerprint, implicitTxn, database)
217213
}
218214

219-
func getInsightStatus(statementError error) insights.Statement_Status {
220-
if statementError == nil {
221-
return insights.Statement_Completed
222-
}
223-
224-
return insights.Statement_Failed
225-
}
226-
227215
func (s *StatsCollector) shouldObserveInsights() bool {
228216
return sqlstats.StmtStatsEnable.Get(&s.st.SV) && sqlstats.TxnStatsEnable.Get(&s.st.SV)
229217
}
230218

231-
// observeStatement sends the recorded statement stats to the insights system
232-
// for further processing.
233-
func (s *StatsCollector) observeStatement(value sqlstats.RecordedStmtStats) {
234-
if !s.sendInsights {
235-
return
236-
}
237-
238-
var autoRetryReason string
239-
if value.AutoRetryReason != nil {
240-
autoRetryReason = value.AutoRetryReason.Error()
241-
}
242-
243-
var contention *time.Duration
244-
var cpuSQLNanos int64
245-
if value.ExecStats != nil {
246-
contention = &value.ExecStats.ContentionTime
247-
cpuSQLNanos = value.ExecStats.CPUTime.Nanoseconds()
248-
}
249-
250-
var errorCode string
251-
var errorMsg redact.RedactableString
252-
if value.StatementError != nil {
253-
errorCode = pgerror.GetPGCode(value.StatementError).String()
254-
errorMsg = redact.Sprint(value.StatementError)
255-
}
256-
257-
insight := insights.Statement{
258-
ID: value.StatementID,
259-
FingerprintID: value.FingerprintID,
260-
LatencyInSeconds: value.ServiceLatencySec,
261-
Query: value.Query,
262-
Status: getInsightStatus(value.StatementError),
263-
StartTime: value.StartTime,
264-
EndTime: value.EndTime,
265-
FullScan: value.FullScan,
266-
PlanGist: value.PlanGist,
267-
Retries: int64(value.AutoRetryCount),
268-
AutoRetryReason: autoRetryReason,
269-
RowsRead: value.RowsRead,
270-
RowsWritten: value.RowsWritten,
271-
Nodes: value.Nodes,
272-
KVNodeIDs: value.KVNodeIDs,
273-
Contention: contention,
274-
IndexRecommendations: value.IndexRecommendations,
275-
Database: value.Database,
276-
CPUSQLNanos: cpuSQLNanos,
277-
ErrorCode: errorCode,
278-
ErrorMsg: errorMsg,
279-
}
280-
if s.insightsWriter != nil {
281-
s.insightsWriter.ObserveStatement(value.SessionID, &insight)
282-
}
283-
}
284-
285-
// observeTransaction sends the recorded transaction stats to the insights system
286-
// for further processing.
287-
func (s *StatsCollector) observeTransaction(value sqlstats.RecordedTxnStats) {
288-
if !s.sendInsights {
289-
return
290-
}
291-
292-
var retryReason string
293-
if value.AutoRetryReason != nil {
294-
retryReason = value.AutoRetryReason.Error()
295-
}
296-
297-
var cpuSQLNanos int64
298-
if value.ExecStats.CPUTime.Nanoseconds() >= 0 {
299-
cpuSQLNanos = value.ExecStats.CPUTime.Nanoseconds()
300-
}
301-
302-
var errorCode string
303-
var errorMsg redact.RedactableString
304-
if value.TxnErr != nil {
305-
errorCode = pgerror.GetPGCode(value.TxnErr).String()
306-
errorMsg = redact.Sprint(value.TxnErr)
307-
}
308-
309-
status := insights.Transaction_Failed
310-
if value.Committed {
311-
status = insights.Transaction_Completed
312-
}
313-
314-
insight := insights.Transaction{
315-
ID: value.TransactionID,
316-
FingerprintID: value.FingerprintID,
317-
UserPriority: value.Priority.String(),
318-
ImplicitTxn: value.ImplicitTxn,
319-
Contention: &value.ExecStats.ContentionTime,
320-
StartTime: value.StartTime,
321-
EndTime: value.EndTime,
322-
User: value.SessionData.User().Normalized(),
323-
ApplicationName: value.SessionData.ApplicationName,
324-
RowsRead: value.RowsRead,
325-
RowsWritten: value.RowsWritten,
326-
RetryCount: value.RetryCount,
327-
AutoRetryReason: retryReason,
328-
CPUSQLNanos: cpuSQLNanos,
329-
LastErrorCode: errorCode,
330-
LastErrorMsg: errorMsg,
331-
Status: status,
332-
}
333-
if s.insightsWriter != nil {
334-
s.insightsWriter.ObserveTransaction(value.SessionID, &insight)
335-
}
336-
}
337-
338219
// RecordStatement records the statistics of a statement.
339220
func (s *StatsCollector) RecordStatement(
340221
ctx context.Context, value sqlstats.RecordedStmtStats,
341222
) error {
342-
s.observeStatement(value)
223+
if s.sendInsights && s.insightsWriter != nil {
224+
insight := insights.MakeStmtInsight(value)
225+
s.insightsWriter.ObserveStatement(value.SessionID, insight)
226+
}
343227

344228
// TODO(xinhaoz): This isn't the best place to set this, but we'll clean this up
345229
// when we refactor the stats collection code to send the stats to an ingester.
@@ -359,7 +243,10 @@ func (s *StatsCollector) RecordStatement(
359243
func (s *StatsCollector) RecordTransaction(
360244
ctx context.Context, value sqlstats.RecordedTxnStats,
361245
) error {
362-
s.observeTransaction(value)
246+
if s.sendInsights && s.insightsWriter != nil {
247+
insight := insights.MakeTxnInsight(value)
248+
s.insightsWriter.ObserveTransaction(value.SessionID, insight)
249+
}
363250

364251
// TODO(117690): Unify StmtStatsEnable and TxnStatsEnable into a single cluster setting.
365252
if !sqlstats.TxnStatsEnable.Get(&s.st.SV) {

0 commit comments

Comments
 (0)