Skip to content

Commit 5308d41

Browse files
committed
sql, sqlstats: Refactor Statement Recording
- Adds a new RecordedStatementStatsBuilder struct to make building RecordedStmtStats easier. - Moves the recording, buffering, and synchronization of sql stats to the ingester and removes the logic from StatsCollecotr Resolves: CRDB-57024 Epic: CRDB-55081 Release note: None
1 parent 21eed7b commit 5308d41

File tree

8 files changed

+347
-111
lines changed

8 files changed

+347
-111
lines changed

pkg/sql/conn_executor.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1268,7 +1268,6 @@ func (s *Server) newConnExecutor(
12681268
s.sqlStatsIngester,
12691269
ex.phaseTimes,
12701270
s.localSqlStats.GetCounters(),
1271-
s.cfg.SQLStatsTestingKnobs,
12721271
)
12731272
ex.dataMutatorIterator.OnApplicationNameChange = func(newName string) {
12741273
ex.applicationName.Store(newName)

pkg/sql/executor_statement_metrics.go

Lines changed: 43 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ import (
1313
"github.com/cockroachdb/cockroach/pkg/sql/contentionpb"
1414
"github.com/cockroachdb/cockroach/pkg/sql/idxrecommendations"
1515
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
16-
"github.com/cockroachdb/cockroach/pkg/sql/sessionphase"
1716
"github.com/cockroachdb/cockroach/pkg/sql/sqlstats"
1817
"github.com/cockroachdb/cockroach/pkg/util/log"
1918
"github.com/cockroachdb/cockroach/pkg/util/metric"
@@ -167,47 +166,16 @@ func (ex *connExecutor) recordStatementSummary(
167166
stmtErr error,
168167
stats topLevelQueryStats,
169168
) appstatspb.StmtFingerprintID {
170-
phaseTimes := ex.statsCollector.PhaseTimes()
171-
172-
// Collect the statistics.
173-
idleLatRaw := phaseTimes.GetIdleLatency(ex.statsCollector.PreviousPhaseTimes())
174-
idleLatSec := idleLatRaw.Seconds()
175-
runLatRaw := phaseTimes.GetRunLatency()
176-
runLatSec := runLatRaw.Seconds()
177-
parseLatSec := phaseTimes.GetParsingLatency().Seconds()
178-
planLatSec := phaseTimes.GetPlanningLatency().Seconds()
179-
// We want to exclude any overhead to reduce possible confusion.
180-
svcLatRaw := phaseTimes.GetServiceLatencyNoOverhead()
181-
svcLatSec := svcLatRaw.Seconds()
182-
183-
// processing latency: contributing towards SQL results.
184-
processingLatSec := parseLatSec + planLatSec + runLatSec
185-
186-
// overhead latency: txn/retry management, error checking, etc
187-
execOverheadSec := svcLatSec - processingLatSec
188169

189170
stmt := &planner.stmt
190171
flags := planner.curPlan.flags
191172
ex.recordStatementLatencyMetrics(
192-
stmt, flags, automaticRetryTxnCount+automaticRetryStmtCount, runLatRaw, svcLatRaw,
173+
stmt, flags, automaticRetryTxnCount+automaticRetryStmtCount, ex.statsCollector.RunLatency(), ex.statsCollector.ServiceLatency(),
193174
)
194175

195-
fullScan := flags.IsSet(planFlagContainsFullIndexScan) || flags.IsSet(planFlagContainsFullTableScan)
196-
197176
idxRecommendations := idxrecommendations.FormatIdxRecommendations(planner.instrumentation.indexRecs)
198177
queryLevelStats, queryLevelStatsOk := planner.instrumentation.GetQueryLevelStats()
199178

200-
var sqlInstanceIDs []int64
201-
var kvNodeIDs []int32
202-
if queryLevelStatsOk {
203-
sqlInstanceIDs = make([]int64, 0, len(queryLevelStats.SQLInstanceIDs))
204-
for _, sqlInstanceID := range queryLevelStats.SQLInstanceIDs {
205-
sqlInstanceIDs = append(sqlInstanceIDs, int64(sqlInstanceID))
206-
}
207-
kvNodeIDs = queryLevelStats.KVNodeIDs
208-
}
209-
startTime := phaseTimes.GetSessionPhaseTime(sessionphase.PlannerStartExecStmt).ToUTC()
210-
implicitTxn := flags.IsSet(planFlagImplicitTxn)
211179
stmtFingerprintID := planner.instrumentation.fingerprintId
212180
autoRetryReason := ex.state.mu.autoRetryReason
213181
if automaticRetryStmtCount > 0 {
@@ -221,51 +189,45 @@ func (ex *connExecutor) recordStatementSummary(
221189
ex.metrics.EngineMetrics.StatementIndexBytesWritten.Inc(stats.indexBytesWritten)
222190

223191
if ex.statsCollector.EnabledForTransaction() {
224-
recordedStmtStats := &sqlstats.RecordedStmtStats{
225-
FingerprintID: stmtFingerprintID,
226-
QuerySummary: stmt.StmtSummary,
227-
Generic: flags.IsSet(planFlagGeneric),
228-
AppliedStmtHints: len(stmt.Hints) > 0,
229-
DistSQL: flags.ShouldBeDistributed(),
230-
Vec: flags.IsSet(planFlagVectorized),
231-
ImplicitTxn: implicitTxn,
232-
PlanHash: planner.instrumentation.planGist.Hash(),
233-
SessionID: ex.planner.extendedEvalCtx.SessionID,
234-
StatementID: stmt.QueryID,
235-
AutoRetryCount: automaticRetryTxnCount + automaticRetryStmtCount,
236-
Failed: stmtErr != nil,
237-
AutoRetryReason: autoRetryReason,
238-
RowsAffected: rowsAffected,
239-
IdleLatencySec: idleLatSec,
240-
ParseLatencySec: parseLatSec,
241-
PlanLatencySec: planLatSec,
242-
RunLatencySec: runLatSec,
243-
ServiceLatencySec: svcLatSec,
244-
OverheadLatencySec: execOverheadSec,
245-
BytesRead: stats.bytesRead,
246-
RowsRead: stats.rowsRead,
247-
RowsWritten: stats.rowsWritten,
248-
Nodes: sqlInstanceIDs,
249-
KVNodeIDs: kvNodeIDs,
250-
StatementType: stmt.AST.StatementType(),
251-
PlanGist: planner.instrumentation.planGist.String(),
252-
StatementError: stmtErr,
253-
IndexRecommendations: idxRecommendations,
254-
Query: stmt.StmtNoConstants,
255-
StartTime: startTime,
256-
EndTime: startTime.Add(svcLatRaw),
257-
FullScan: fullScan,
258-
ExecStats: queryLevelStats,
192+
b := sqlstats.NewRecordedStatementStatsBuilder(
193+
stmtFingerprintID,
194+
planner.SessionData().Database,
195+
stmt.StmtNoConstants,
196+
stmt.StmtSummary,
197+
stmt.AST.StatementType(),
198+
ex.statsCollector.CurrentApplicationName(),
199+
).
200+
QueryID(stmt.QueryID).
201+
SessionID(ex.planner.extendedEvalCtx.SessionID).
202+
PlanMetadata(
203+
flags.IsSet(planFlagGeneric),
204+
flags.ShouldBeDistributed(),
205+
flags.IsSet(planFlagVectorized),
206+
flags.IsSet(planFlagImplicitTxn),
207+
flags.IsSet(planFlagContainsFullIndexScan) || flags.IsSet(planFlagContainsFullTableScan),
208+
).
209+
PlanGist(planner.instrumentation.planGist.String(), planner.instrumentation.planGist.Hash()).
210+
LatencyRecorder(ex.statsCollector).
211+
QueryLevelStats(stats.bytesRead, stats.rowsRead, stats.rowsWritten).
212+
ExecStats(queryLevelStats).
259213
// TODO(mgartner): Use a slice of struct{uint64, uint64} instead of
260214
// converting to strings.
261-
Indexes: planner.instrumentation.indexesUsed.Strings(),
262-
Database: planner.SessionData().Database,
263-
QueryTags: stmt.QueryTags,
264-
App: ex.statsCollector.CurrentApplicationName(),
265-
UnderOuterTxn: ex.extraTxnState.underOuterTxn,
215+
Indexes(planner.instrumentation.indexesUsed.Strings()).
216+
AutoRetry(automaticRetryTxnCount+automaticRetryStmtCount, autoRetryReason).
217+
RowsAffected(rowsAffected).
218+
IndexRecommendations(idxRecommendations).
219+
QueryTags(stmt.QueryTags).
220+
StatementError(stmtErr)
221+
222+
if ex.extraTxnState.underOuterTxn {
223+
b.UnderOuterTxn()
224+
}
225+
226+
if len(stmt.Hints) > 0 {
227+
b.AppliedStatementHints()
266228
}
267229

268-
ex.statsCollector.RecordStatement(ctx, recordedStmtStats)
230+
ex.statsCollector.RecordStatement(ctx, b.Build())
269231
}
270232

271233
// Record statement execution statistics if span is recorded and no error was
@@ -314,11 +276,16 @@ func (ex *connExecutor) recordStatementSummary(
314276
ex.extraTxnState.transactionStatementsHash.Add(uint64(stmtFingerprintID))
315277
}
316278
ex.extraTxnState.numRows += rowsAffected
317-
ex.extraTxnState.idleLatency += idleLatRaw
279+
ex.extraTxnState.idleLatency += ex.statsCollector.IdleLatency()
318280

319281
if log.V(2) {
320282
// ages since significant epochs
321-
sessionAge := phaseTimes.GetSessionAge().Seconds()
283+
sessionAge := ex.statsCollector.PhaseTimes().GetSessionAge().Seconds()
284+
parseLatSec := ex.statsCollector.ParsingLatency().Seconds()
285+
planLatSec := ex.statsCollector.PlanningLatency().Seconds()
286+
runLatSec := ex.statsCollector.RunLatency().Seconds()
287+
svcLatSec := ex.statsCollector.ServiceLatency().Seconds()
288+
execOverheadSec := ex.statsCollector.ExecOverheadLatency().Seconds()
322289

323290
log.Dev.Infof(ctx,
324291
"query stats: %d rows, %d retries, "+

pkg/sql/sqlstats/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ go_library(
2222
"//pkg/sql/sem/tree",
2323
"//pkg/sql/sqlcommenter",
2424
"//pkg/sql/sqlstats/persistedsqlstats/sqlstatsutil",
25+
"//pkg/util/buildutil",
2526
"//pkg/util/log",
2627
"//pkg/util/metric",
2728
"//pkg/util/stop",

pkg/sql/sqlstats/sslocal/sql_stats_ingester_test.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -412,7 +412,6 @@ func TestStatsCollectorIngester(t *testing.T) {
412412
ingester,
413413
phaseTimes,
414414
uniqueServerCounts,
415-
nil, // knobs
416415
)
417416

418417
sessionID := clusterunique.IDFromBytes([]byte("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"))

pkg/sql/sqlstats/sslocal/sql_stats_ingestor.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,25 @@ func (i *SQLStatsIngester) ingest(ctx context.Context, events *eventBuffer) {
265265
}
266266
}
267267

268+
func (i *SQLStatsIngester) RecordStatement(statement *sqlstats.RecordedStmtStats) {
269+
i.BufferStatement(statement)
270+
if i.testingKnobs != nil && i.testingKnobs.SynchronousSQLStats {
271+
// Flush buffer and wait for the stats ingester to finish writing.
272+
i.guard.ForceSync()
273+
<-i.syncStatsTestingCh
274+
}
275+
}
276+
277+
func (i *SQLStatsIngester) RecordTransaction(transaction *sqlstats.RecordedTxnStats) {
278+
i.BufferTransaction(transaction)
279+
280+
if i.testingKnobs != nil && i.testingKnobs.SynchronousSQLStats {
281+
// Flush buffer and wait for the stats ingester to finish writing.
282+
i.guard.ForceSync()
283+
<-i.syncStatsTestingCh
284+
}
285+
}
286+
268287
func (i *SQLStatsIngester) BufferStatement(statement *sqlstats.RecordedStmtStats) {
269288
if i.testingKnobs != nil && i.testingKnobs.IngesterStmtInterceptor != nil {
270289
i.testingKnobs.IngesterStmtInterceptor(statement.SessionID, statement)

pkg/sql/sqlstats/sslocal/sql_stats_test.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -471,7 +471,6 @@ func TestExplicitTxnFingerprintAccounting(t *testing.T) {
471471
ingester,
472472
sessionphase.NewTimes(),
473473
sqlStats.GetCounters(),
474-
nil, /* knobs */
475474
)
476475

477476
recordStats := func(testCase *tc) {
@@ -595,7 +594,6 @@ func TestAssociatingStmtStatsWithTxnFingerprint(t *testing.T) {
595594
ingester,
596595
sessionphase.NewTimes(),
597596
sqlStats.GetCounters(),
598-
nil, /* knobs */
599597
)
600598

601599
ingester.Start(ctx, stopper)

pkg/sql/sqlstats/sslocal/sslocal_stats_collector.go

Lines changed: 42 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@ package sslocal
77

88
import (
99
"context"
10+
"time"
1011

1112
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
12-
"github.com/cockroachdb/cockroach/pkg/sql/appstatspb"
1313
"github.com/cockroachdb/cockroach/pkg/sql/clusterunique"
1414
"github.com/cockroachdb/cockroach/pkg/sql/sessionphase"
1515
"github.com/cockroachdb/cockroach/pkg/sql/sqlstats"
@@ -35,12 +35,6 @@ import (
3535
// persist statement and transaction insights to an in-memory cache.
3636
// Events are sent to the insights subsystem for async processing.
3737
type StatsCollector struct {
38-
// stmtFingerprintID is the fingerprint ID of the current statement we are
39-
// recording. Note that we don't observe sql stats for all statements (e.g. COMMIT).
40-
// If no stats have been attempted to be recorded yet for the current statement,
41-
// this value will be 0.
42-
stmtFingerprintID appstatspb.StmtFingerprintID
43-
4438
// phaseTimes tracks session-level phase times.
4539
phaseTimes sessionphase.Times
4640

@@ -67,8 +61,7 @@ type StatsCollector struct {
6761

6862
statsIngester *SQLStatsIngester
6963

70-
st *cluster.Settings
71-
knobs *sqlstats.TestingKnobs
64+
st *cluster.Settings
7265
}
7366

7467
// NewStatsCollector returns an instance of StatsCollector.
@@ -78,27 +71,20 @@ func NewStatsCollector(
7871
ingester *SQLStatsIngester,
7972
phaseTime *sessionphase.Times,
8073
uniqueServerCounts *ssmemstorage.SQLStatsAtomicCounters,
81-
knobs *sqlstats.TestingKnobs,
8274
) *StatsCollector {
8375
s := &StatsCollector{
8476
flushTarget: appStats,
8577
phaseTimes: *phaseTime,
8678
uniqueServerCounts: uniqueServerCounts,
8779
statsIngester: ingester,
8880
st: st,
89-
knobs: knobs,
9081
}
9182

9283
s.sendStats = s.enabled()
9384

9485
return s
9586
}
9687

97-
// StatementFingerprintID returns the fingerprint ID for the current statement.
98-
func (s *StatsCollector) StatementFingerprintID() appstatspb.StmtFingerprintID {
99-
return s.stmtFingerprintID
100-
}
101-
10288
// PhaseTimes returns the sessionphase.Times that this StatsCollector is
10389
// currently tracking.
10490
func (s *StatsCollector) PhaseTimes() *sessionphase.Times {
@@ -118,7 +104,6 @@ func (s *StatsCollector) PreviousPhaseTimes() *sessionphase.Times {
118104
// Found a bug again? Consider refactoring.
119105
func (s *StatsCollector) Reset(appStats *ssmemstorage.Container, phaseTime *sessionphase.Times) {
120106
s.flushTarget = appStats
121-
s.stmtFingerprintID = 0
122107
s.previousPhaseTimes = s.phaseTimes
123108
s.phaseTimes = *phaseTime
124109
}
@@ -166,14 +151,8 @@ func (s *StatsCollector) RecordStatement(_ctx context.Context, value *sqlstats.R
166151
if !s.sendStats {
167152
return
168153
}
169-
s.stmtFingerprintID = value.FingerprintID
170-
s.statsIngester.BufferStatement(value)
171154

172-
if s.knobs != nil && s.knobs.SynchronousSQLStats {
173-
// Flush buffer and wait for the stats ingester to finish writing.
174-
s.statsIngester.guard.ForceSync()
175-
<-s.statsIngester.syncStatsTestingCh
176-
}
155+
s.statsIngester.RecordStatement(value)
177156
}
178157

179158
// RecordTransaction sends the transaction statistics to the stats ingester.
@@ -182,13 +161,7 @@ func (s *StatsCollector) RecordTransaction(_ctx context.Context, value *sqlstats
182161
return
183162
}
184163

185-
s.statsIngester.BufferTransaction(value)
186-
187-
if s.knobs != nil && s.knobs.SynchronousSQLStats {
188-
// Flush buffer and wait for the stats ingester to finish writing.
189-
s.statsIngester.guard.ForceSync()
190-
<-s.statsIngester.syncStatsTestingCh
191-
}
164+
s.statsIngester.RecordTransaction(value)
192165
}
193166

194167
func (s *StatsCollector) EnabledForTransaction() bool {
@@ -205,3 +178,41 @@ func (s *StatsCollector) EnabledForTransaction() bool {
205178
func (s *StatsCollector) CurrentApplicationName() string {
206179
return s.flushTarget.ApplicationName()
207180
}
181+
182+
func (s *StatsCollector) RunLatency() time.Duration {
183+
return s.PhaseTimes().GetRunLatency()
184+
}
185+
186+
func (s *StatsCollector) IdleLatency() time.Duration {
187+
return s.PhaseTimes().GetIdleLatency(s.PreviousPhaseTimes())
188+
}
189+
190+
func (s *StatsCollector) ServiceLatency() time.Duration {
191+
return s.PhaseTimes().GetServiceLatencyNoOverhead()
192+
}
193+
194+
func (s *StatsCollector) ParsingLatency() time.Duration {
195+
return s.PhaseTimes().GetParsingLatency()
196+
}
197+
198+
func (s *StatsCollector) PlanningLatency() time.Duration {
199+
return s.PhaseTimes().GetPlanningLatency()
200+
}
201+
202+
func (s *StatsCollector) ProcessingLatency() time.Duration {
203+
return s.ParsingLatency() + s.PlanningLatency() + s.RunLatency()
204+
}
205+
206+
func (s *StatsCollector) ExecOverheadLatency() time.Duration {
207+
return s.ServiceLatency() - s.ProcessingLatency()
208+
}
209+
210+
func (s *StatsCollector) StartTime() time.Time {
211+
return s.PhaseTimes().GetSessionPhaseTime(sessionphase.PlannerStartExecStmt).ToUTC()
212+
}
213+
214+
func (s *StatsCollector) EndTime() time.Time {
215+
return s.StartTime().Add(s.ServiceLatency())
216+
}
217+
218+
var _ sqlstats.StatementLatencyRecorder = &StatsCollector{}

0 commit comments

Comments
 (0)