Skip to content

Commit 561e406

Browse files
craig[bot]dhartunian
andcommitted
Merge #147133
147133: sqlstats: fix data race between collector and ingester r=alyshanjahani-crl a=dhartunian I've admittedly not been able to reproduce the data race observed in the same `*sqlstats.RecordedStmtStats` object in two different locations. One of these is now redunant. The transactionID set in the `StatsCollector` happens before the data is processed by the `SQLStatsIngester` so the setting of the transactionID in the ingester is removed. If it happens to get statements without corresponding transactionIDs, that should be a bug. The reason this was introduced was likely #141767 which unified the types used by the two components. Previously, they were different structs and ownership was clear and required editing the transactionID twice. This PR also introduces a `doc.go` file to the `sslocal` package in an effort to guide the reviewer and provide context for future work. The diagram reflects the changes in this commit. Resolves: #146796 Release note: None Co-authored-by: David Hartunian <[email protected]>
2 parents d164968 + 5179c44 commit 561e406

File tree

5 files changed

+193
-9
lines changed

5 files changed

+193
-9
lines changed

pkg/sql/sqlstats/sslocal/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ go_library(
44
name = "sslocal",
55
srcs = [
66
"cluster_settings.go",
7+
"doc.go",
78
"sql_stats.go",
89
"sql_stats_ingestor.go",
910
"sslocal_iterator.go",

pkg/sql/sqlstats/sslocal/doc.go

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
/*
7+
This package is used to collect SQL Activity Statistics from the SQL subsystem.
8+
9+
# The recording of SQL Activity is managed through the following components:
10+
11+
1. Conn Executor: SQL Execution is managed here and stats are collected and sent
12+
to the Stats Collector.
13+
2. (sslocal) Stats Collector: Receives stats from the Conn Executor and buffers them into
14+
the `ssmemstorage.Container`
15+
3. (sslocal) SQL Stats Ingester: Receives stats from the Stats Collector and
16+
flushes them to the registry.
17+
4. Container: A per-application container that holds stats for a given
18+
application. The Ingester flushes
19+
20+
# Sequence of Operations
21+
22+
The diagram below illustrates how stats flow through the StatsCollector and
23+
StatsIngester as a transaction executes.
24+
25+
+---------------+ +-----------------+ +---------------+ +-----------+
26+
| ConnExecutor | | StatsCollector | | StatsIngester | | Container |
27+
+---------------+ +-----------------+ +---------------+ +-----------+
28+
| | | |
29+
| RecordStatement | | |
30+
|--------------------------->| | |
31+
| | -------------------------------------------\ | |
32+
| |-| *RecordedStmtStats accumulates in buffer | | |
33+
| | |------------------------------------------| | |
34+
| | | |
35+
| EndTransaction | | |
36+
|--------------------------->| | |
37+
| | ------------------------------------------\ | |
38+
| |-| set TransactionID on *RecordedStmtStats | | |
39+
| | |-----------------------------------------| | |
40+
| | | |
41+
| | IngestStatement | |
42+
| |--------------------------------------------------->| |
43+
| | | -------------------------------------------\ |
44+
| | |-| *RecordedStmtStats accumulates in buffer | |
45+
| | | |------------------------------------------| |
46+
| | | |
47+
| | RecordStatement | |
48+
| |---------------------------------------------------------------------------------------------------------->|
49+
| | | |
50+
| RecordTransaction | | |
51+
|--------------------------->| | |
52+
| | | |
53+
| | IngestTransaction | |
54+
| |--------------------------------------------------->| |
55+
| | | |
56+
| | RecordTransaction | |
57+
| |---------------------------------------------------------------------------------------------------------->|
58+
| | | -----------------------------------------------\ |
59+
| | |-| all *RecordedStmtStats in buffer are flushed | |
60+
| | | |----------------------------------------------| |
61+
| | | | ----------------------------------------\
62+
| | | |-| eventually flushed to persisted stats |
63+
| | | | |---------------------------------------|
64+
| | | |
65+
*/
66+
package sslocal
67+
68+
// Input to sequence diagram generator:
69+
/*
70+
object ConnExecutor StatsCollector StatsIngester Container
71+
ConnExecutor -> StatsCollector: RecordStatement
72+
note right of StatsCollector: *RecordedStmtStats accumulates in buffer
73+
ConnExecutor -> StatsCollector: EndTransaction
74+
note right of StatsCollector: set TransactionID on *RecordedStmtStats
75+
StatsCollector -> StatsIngester: IngestStatement
76+
note right of StatsIngester: *RecordedStmtStats accumulates in buffer
77+
StatsCollector -> Container: RecordStatement
78+
ConnExecutor -> StatsCollector: RecordTransaction
79+
StatsCollector -> StatsIngester: IngestTransaction
80+
StatsCollector -> Container: RecordTransaction
81+
note right of StatsIngester: all *RecordedStmtStats in buffer are flushed
82+
note right of Container: eventually flushed to persisted stats
83+
*/

pkg/sql/sqlstats/sslocal/sql_stats_ingester_test.go

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,12 @@ import (
1212
"testing"
1313
"time"
1414

15+
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
1516
"github.com/cockroachdb/cockroach/pkg/sql/appstatspb"
1617
"github.com/cockroachdb/cockroach/pkg/sql/clusterunique"
18+
"github.com/cockroachdb/cockroach/pkg/sql/sessionphase"
1719
"github.com/cockroachdb/cockroach/pkg/sql/sqlstats"
20+
"github.com/cockroachdb/cockroach/pkg/sql/sqlstats/ssmemstorage"
1821
"github.com/cockroachdb/cockroach/pkg/testutils"
1922
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
2023
"github.com/cockroachdb/cockroach/pkg/util/log"
@@ -349,3 +352,98 @@ func TestSQLIngester_ClearSession(t *testing.T) {
349352
require.Len(t, ingester.statementsBySessionID, 0)
350353
})
351354
}
355+
356+
type observedStmt struct {
357+
FingerprintID appstatspb.StmtFingerprintID
358+
TransactionFingerprintID appstatspb.TransactionFingerprintID
359+
}
360+
361+
type capturingSink struct {
362+
syncutil.Mutex
363+
observed []observedStmt
364+
}
365+
366+
var _ SQLStatsSink = &capturingSink{}
367+
368+
func (s *capturingSink) ObserveTransaction(
369+
ctx context.Context,
370+
transactionStats *sqlstats.RecordedTxnStats,
371+
statements []*sqlstats.RecordedStmtStats,
372+
) {
373+
s.Lock()
374+
defer s.Unlock()
375+
for _, stmt := range statements {
376+
s.observed = append(s.observed, observedStmt{
377+
FingerprintID: stmt.FingerprintID,
378+
TransactionFingerprintID: stmt.TransactionFingerprintID,
379+
})
380+
}
381+
}
382+
383+
// TestStatsCollectorIngester validates that all statements recorded as part of a
384+
// transaction through the StatsCollector are ingested into the SQLStatsIngester
385+
// with the correct TransactionFingerprintID.
386+
func TestStatsCollectorIngester(t *testing.T) {
387+
defer leaktest.AfterTest(t)()
388+
defer log.Scope(t).Close(t)
389+
390+
ctx := context.Background()
391+
stopper := stop.NewStopper()
392+
defer stopper.Stop(ctx)
393+
394+
fakeSink := &capturingSink{}
395+
ingester := NewSQLStatsIngester(nil, fakeSink)
396+
ingester.Start(ctx, stopper, WithFlushInterval(10))
397+
398+
// Set up a StatsCollector with the ingester.
399+
st := cluster.MakeTestingClusterSettings()
400+
appStats := ssmemstorage.New(st, nil, nil, "test", nil)
401+
uniqueServerCounts := &ssmemstorage.SQLStatsAtomicCounters{}
402+
phaseTimes := sessionphase.NewTimes()
403+
statsCollector := NewStatsCollector(
404+
st,
405+
appStats,
406+
ingester,
407+
phaseTimes,
408+
uniqueServerCounts,
409+
false, // underOuterTxn
410+
nil, // knobs
411+
)
412+
413+
sessionID := clusterunique.IDFromBytes([]byte("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"))
414+
415+
statsCollector.StartTransaction()
416+
for i := range 100 {
417+
err := statsCollector.RecordStatement(ctx, &sqlstats.RecordedStmtStats{
418+
FingerprintID: appstatspb.StmtFingerprintID(i),
419+
Query: fmt.Sprintf("SELECT %d", i),
420+
ImplicitTxn: false,
421+
SessionID: sessionID,
422+
})
423+
require.NoError(t, err)
424+
}
425+
txnFingerprintID := appstatspb.TransactionFingerprintID(999)
426+
427+
statsCollector.EndTransaction(ctx, txnFingerprintID)
428+
err := statsCollector.RecordTransaction(ctx, &sqlstats.RecordedTxnStats{
429+
SessionID: sessionID,
430+
FingerprintID: txnFingerprintID,
431+
})
432+
require.NoError(t, err)
433+
statsCollector.Close(ctx, sessionID)
434+
435+
// Wait for the ingester to process the events.
436+
testutils.SucceedsSoon(t, func() error {
437+
fakeSink.Lock()
438+
defer fakeSink.Unlock()
439+
if len(fakeSink.observed) != 100 {
440+
return fmt.Errorf("expected 100 statements, got %d", len(fakeSink.observed))
441+
}
442+
for _, obs := range fakeSink.observed {
443+
if obs.TransactionFingerprintID != txnFingerprintID && obs.TransactionFingerprintID != appstatspb.InvalidTransactionFingerprintID {
444+
return fmt.Errorf("unexpected TransactionFingerprintID: %d", obs.TransactionFingerprintID)
445+
}
446+
}
447+
return nil
448+
})
449+
}

pkg/sql/sqlstats/sslocal/sql_stats_ingestor.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -315,11 +315,6 @@ func (i *SQLStatsIngester) flushBuffer(
315315
return
316316
}
317317

318-
// Set the transaction fingerprint ID for each statement.
319-
for _, s := range *statements {
320-
s.TransactionFingerprintID = transaction.FingerprintID
321-
}
322-
323318
for _, sink := range i.sinks {
324319
sink.ObserveTransaction(ctx, transaction, *statements)
325320
}

pkg/sql/sqlstats/sslocal/sslocal_stats_collector.go

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,14 @@ func (s *StatsCollector) Reset(appStats *ssmemstorage.Container, phaseTime *sess
145145
// any memory allocated by underlying sql stats systems for the session
146146
// that owns this stats collector.
147147
func (s *StatsCollector) Close(_ctx context.Context, sessionID clusterunique.ID) {
148+
if s.statsIngester != nil {
149+
for _, stmt := range s.stmtBuf {
150+
stmt.TransactionFingerprintID = appstatspb.InvalidTransactionFingerprintID
151+
if s.sendInsights {
152+
s.statsIngester.IngestStatement(stmt)
153+
}
154+
}
155+
}
148156
s.stmtBuf = nil
149157
if s.statsIngester != nil {
150158
s.statsIngester.ClearSession(sessionID)
@@ -174,6 +182,9 @@ func (s *StatsCollector) EndTransaction(
174182

175183
for _, stmt := range s.stmtBuf {
176184
stmt.TransactionFingerprintID = transactionFingerprintID
185+
if s.sendInsights && s.statsIngester != nil {
186+
s.statsIngester.IngestStatement(stmt)
187+
}
177188
if err := s.flushTarget.RecordStatement(ctx, stmt); err != nil {
178189
discardedStats++
179190
}
@@ -217,10 +228,6 @@ func (s *StatsCollector) shouldObserveInsights() bool {
217228
func (s *StatsCollector) RecordStatement(
218229
ctx context.Context, value *sqlstats.RecordedStmtStats,
219230
) error {
220-
if s.sendInsights && s.statsIngester != nil {
221-
s.statsIngester.IngestStatement(value)
222-
}
223-
224231
// TODO(xinhaoz): This isn't the best place to set this, but we'll clean this up
225232
// when we refactor the stats collection code to send the stats to an ingester.
226233
s.stmtFingerprintID = value.FingerprintID

0 commit comments

Comments
 (0)