Skip to content

Commit 73df2d9

Browse files
craig[bot]xinhaozdhartunianDedej-Bergin
committed
143855: sqlstats: move sql stats writing to be async r=dhartunian a=xinhaoz This commit removes the recording of sql stats at the end of statement and transaction execution. The per-node SQLStats container has been added as a sink to the async sql stats ingester. Instead of having sslocal.StatsCollector record sql stats at the end of execution, it will now record the stats asynchronously as part of event processing in sslocal.SQLStatsIngester's ingest routine. When creating the execution statistics structs, we now record the application name from the application container currently that is currenlty set in sslocal.StatsCollector. This is because the conn exec's app name may have been mutated if the current statement is a `set application_name` statement but we still want to record that statement in the previous application. Epic: none Closes: #141024 Release note: None 147920: cli: fix flake in TestDockerCLI_test_error_hints r=rafiss a=Dedej-Bergin Previously a process was not exiting properly and was causing a test flake, this change adds a wait loop which checks that we have exited properly to avoid the flake. Fixes: #147066 Fixes: #151413 Informs: #115626 Release note: None Co-authored-by: Xin Hao Zhang <[email protected]> Co-authored-by: David Hartunian <[email protected]> Co-authored-by: Bergin Dedej <[email protected]>
3 parents 1283b87 + 657ecb3 + 799f5ab commit 73df2d9

25 files changed

+501
-288
lines changed

docs/generated/metrics/metrics.yaml

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9148,6 +9148,22 @@ layers:
91489148
unit: COUNT
91499149
aggregation: AVG
91509150
derivative: NON_NEGATIVE_DERIVATIVE
9151+
- name: sql.stats.ingester.num_processed
9152+
exported_name: sql_stats_ingester_num_processed
9153+
description: Number of items processed by the SQL stats ingester
9154+
y_axis_label: Items
9155+
type: COUNTER
9156+
unit: COUNT
9157+
aggregation: AVG
9158+
derivative: NON_NEGATIVE_DERIVATIVE
9159+
- name: sql.stats.ingester.queue_size
9160+
exported_name: sql_stats_ingester_queue_size
9161+
description: Current number of items queued in the SQL stats ingester
9162+
y_axis_label: Items
9163+
type: GAUGE
9164+
unit: COUNT
9165+
aggregation: AVG
9166+
derivative: NONE
91519167
- name: sql.stats.mem.current
91529168
exported_name: sql_stats_mem_current
91539169
description: Current memory usage for fingerprint storage

pkg/cli/interactive_tests/common.tcl

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ proc eexpect {text} {
7373
}
7474

7575
# eexpect_re is like eexpect, but takes a regular expression argument
76-
# instead of a text string
76+
# instead of a text string.
7777
proc eexpect_re {text} {
7878
expect {
7979
-re $text {}
@@ -88,6 +88,44 @@ proc interrupt {} {
8888
sleep 0.4
8989
}
9090

91+
# Convenience function that sends Ctrl+C and waits more reliably for process to exit.
92+
# This uses a retry approach similar to stop_server but adapted for expect environment.
93+
proc interrupt_and_wait {} {
94+
report "INTERRUPT AND WAIT FOR FOREGROUND PROCESS"
95+
96+
# Send the interrupt signal
97+
send "\003"
98+
99+
# Use a retry loop similar to stop_server's wait mechanism.
100+
# Try up to 30 times with 1 second intervals, like stop_server does.
101+
try {
102+
# Set shorter timeout for the retry loop
103+
set timeout 1
104+
105+
for {set i 1} {$i <= 30} {incr i} {
106+
# Try to expect the shell prompt with a short timeout (1 second per attempt).
107+
expect {
108+
":/# " {
109+
report "FOREGROUND PROCESS EXITED SUCCESSFULLY (after $i attempt(s))"
110+
return
111+
}
112+
timeout {
113+
# Process might still be running, continue waiting.
114+
report "still waiting for process to exit (attempt $i/30)"
115+
# The sleep is implicit in the timeout, so we don't need an additional sleep here.
116+
}
117+
}
118+
}
119+
120+
# If we get here, the process didn't exit cleanly within 30 seconds.
121+
report "TIMEOUT: foreground process still running after 30 attempts"
122+
error "Failed to interrupt foreground process - it did not exit within 30 seconds"
123+
} finally {
124+
# Restore original timeout.
125+
set timeout 45
126+
}
127+
}
128+
91129
# Convenience function that sends Ctrl+D to the monitored process.
92130
# Leaves some upfront delay to let the readline process the time
93131
# to initialize the key binding.

pkg/cli/interactive_tests/test_error_hints.tcl

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,7 @@ end_test
6969
# server.
7070

7171
set spawn_id $shell_spawn_id
72-
interrupt
73-
interrupt
74-
eexpect ":/# "
72+
interrupt_and_wait
7573

7674
send "$argv start-single-node --listen-addr=localhost --certs-dir=$certs_dir --store=logs/db\r"
7775
eexpect "restarted pre-existing node"
@@ -90,13 +88,10 @@ eexpect "ERROR: node is running secure mode, SSL connection required"
9088
eexpect ":/# "
9189
end_test
9290

93-
9491
# Check what happens when attempting to connect to something
9592
# that is not a CockroachDB server.
9693
set spawn_id $shell_spawn_id
97-
interrupt
98-
interrupt
99-
eexpect ":/# "
94+
interrupt_and_wait
10095

10196
start_test "Connecting an insecure RPC client to a non-CockroachDB server"
10297
# In one shell, start a bogus server
@@ -110,8 +105,7 @@ eexpect "insecure\r\n"
110105
set spawn_id $shell_spawn_id
111106
eexpect "connected"
112107
eexpect "PRI * HTTP/2.0"
113-
interrupt
114-
eexpect ":/# "
108+
interrupt_and_wait
115109
# Check that cockroach node drain becomes suitably confused.
116110
set spawn_id $client_spawn_id
117111
eexpect "ERROR: server closed the connection."
@@ -133,8 +127,7 @@ eexpect "insecure\r\n"
133127
set spawn_id $shell_spawn_id
134128
eexpect "connected"
135129
eexpect "cockroach sql"
136-
interrupt
137-
eexpect ":/# "
130+
interrupt_and_wait
138131
# Check that cockroach sql becomes suitably confused.
139132
set spawn_id $client_spawn_id
140133
eexpect "ERROR: server closed the connection."

pkg/server/application_api/sql_stats_test.go

Lines changed: 49 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ func TestStatusAPICombinedTransactions(t *testing.T) {
5959

6060
var params base.TestServerArgs
6161
params.Knobs.SpanConfig = &spanconfig.TestingKnobs{ManagerDisableJobCreation: true} // TODO(irfansharif): #74919.
62+
params.Knobs.SQLStatsKnobs = &sqlstats.TestingKnobs{SynchronousSQLStats: true}
6263
testCluster := serverutils.StartCluster(t, 3, base.TestClusterArgs{
6364
ServerArgs: params,
6465
})
@@ -130,8 +131,6 @@ func TestStatusAPICombinedTransactions(t *testing.T) {
130131
}
131132
}
132133

133-
// Wait for last app name to be flushed to in memory stats.
134-
135134
// Hit query endpoint.
136135
var resp serverpb.StatementsResponse
137136
if err := srvtestutils.GetStatusJSONProtoWithAdminAndTimeoutOption(firstServerProto, "combinedstmts", &resp, true, additionalTimeout); err != nil {
@@ -199,7 +198,15 @@ func TestStatusAPITransactions(t *testing.T) {
199198
skip.UnderDeadlock(t, "test is very slow under deadlock")
200199
skip.UnderRace(t, "test is too slow to run under race")
201200

202-
testCluster := serverutils.StartCluster(t, 3, base.TestClusterArgs{})
201+
testCluster := serverutils.StartCluster(t, 3, base.TestClusterArgs{
202+
ServerArgs: base.TestServerArgs{
203+
Knobs: base.TestingKnobs{
204+
SQLStatsKnobs: &sqlstats.TestingKnobs{
205+
SynchronousSQLStats: true,
206+
},
207+
},
208+
},
209+
})
203210
ctx := context.Background()
204211
defer testCluster.Stopper().Stop(ctx)
205212

@@ -330,7 +337,15 @@ func TestStatusAPITransactionStatementFingerprintIDsTruncation(t *testing.T) {
330337
defer leaktest.AfterTest(t)()
331338
defer log.Scope(t).Close(t)
332339

333-
testCluster := serverutils.StartCluster(t, 3, base.TestClusterArgs{})
340+
testCluster := serverutils.StartCluster(t, 3, base.TestClusterArgs{
341+
ServerArgs: base.TestServerArgs{
342+
Knobs: base.TestingKnobs{
343+
SQLStatsKnobs: &sqlstats.TestingKnobs{
344+
SynchronousSQLStats: true,
345+
},
346+
},
347+
},
348+
})
334349
defer testCluster.Stopper().Stop(context.Background())
335350

336351
firstServerProto := testCluster.Server(0).ApplicationLayer()
@@ -398,6 +413,7 @@ func TestStatusAPIStatements(t *testing.T) {
398413
aggregatedTs := int64(1630353000)
399414
statsKnobs := sqlstats.CreateTestingKnobs()
400415
statsKnobs.StubTimeNow = func() time.Time { return timeutil.Unix(aggregatedTs, 0) }
416+
statsKnobs.SynchronousSQLStats = true
401417
testCluster := serverutils.StartCluster(t, 3, base.TestClusterArgs{
402418
ServerArgs: base.TestServerArgs{
403419
Knobs: base.TestingKnobs{
@@ -521,6 +537,7 @@ func TestStatusAPICombinedStatementsTotalLatency(t *testing.T) {
521537
}
522538

523539
sqlStatsKnobs := sqlstats.CreateTestingKnobs()
540+
sqlStatsKnobs.SynchronousSQLStats = true
524541
// Start the cluster.
525542
srv, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{
526543
Insecure: true,
@@ -682,6 +699,7 @@ func TestStatusAPICombinedStatementsWithFullScans(t *testing.T) {
682699
oneMinAfterAggregatedTs := aggregatedTs + 60
683700
statsKnobs := sqlstats.CreateTestingKnobs()
684701
statsKnobs.StubTimeNow = func() time.Time { return timeutil.Unix(aggregatedTs, 0) }
702+
statsKnobs.SynchronousSQLStats = true
685703
testCluster := serverutils.StartCluster(t, 3, base.TestClusterArgs{
686704
ServerArgs: base.TestServerArgs{
687705
Knobs: base.TestingKnobs{
@@ -856,6 +874,7 @@ func TestStatusAPICombinedStatements(t *testing.T) {
856874
aggregatedTs := int64(1630353000)
857875
statsKnobs := sqlstats.CreateTestingKnobs()
858876
statsKnobs.StubTimeNow = func() time.Time { return timeutil.Unix(aggregatedTs, 0) }
877+
statsKnobs.SynchronousSQLStats = true
859878
testCluster := serverutils.StartCluster(t, 3, base.TestClusterArgs{
860879
ServerArgs: base.TestServerArgs{
861880
Knobs: base.TestingKnobs{
@@ -1031,6 +1050,7 @@ func TestStatusAPIStatementDetails(t *testing.T) {
10311050
aggregatedTs := int64(1630353000)
10321051
statsKnobs := sqlstats.CreateTestingKnobs()
10331052
statsKnobs.StubTimeNow = func() time.Time { return timeutil.Unix(aggregatedTs, 0) }
1053+
statsKnobs.SynchronousSQLStats = true
10341054
testCluster := serverutils.StartCluster(t, 3, base.TestClusterArgs{
10351055
ServerArgs: base.TestServerArgs{
10361056
Knobs: base.TestingKnobs{
@@ -1306,6 +1326,7 @@ func TestCombinedStatementUsesCorrectSourceTable(t *testing.T) {
13061326
defer leaktest.AfterTest(t)()
13071327
defer log.Scope(t).Close(t)
13081328

1329+
skip.UnderDuress(t)
13091330
ctx := context.Background()
13101331

13111332
// Disable flushing sql stats so we can manually set the table states
@@ -1314,6 +1335,7 @@ func TestCombinedStatementUsesCorrectSourceTable(t *testing.T) {
13141335
statsKnobs := sqlstats.CreateTestingKnobs()
13151336
defaultMockInsertedAggTs := timeutil.Unix(1696906800, 0)
13161337
statsKnobs.StubTimeNow = func() time.Time { return defaultMockInsertedAggTs }
1338+
statsKnobs.SynchronousSQLStats = true
13171339
persistedsqlstats.SQLStatsFlushEnabled.Override(ctx, &settings.SV, false)
13181340
srv := serverutils.StartServerOnly(t, base.TestServerArgs{
13191341
Settings: settings,
@@ -1550,7 +1572,15 @@ func TestDrainSqlStats(t *testing.T) {
15501572
defer leaktest.AfterTest(t)()
15511573
defer log.Scope(t).Close(t)
15521574
appName := "drain_stats_app"
1553-
testCluster := serverutils.StartCluster(t, 3, base.TestClusterArgs{})
1575+
testCluster := serverutils.StartCluster(t, 3, base.TestClusterArgs{
1576+
ServerArgs: base.TestServerArgs{
1577+
Knobs: base.TestingKnobs{
1578+
SQLStatsKnobs: &sqlstats.TestingKnobs{
1579+
SynchronousSQLStats: true,
1580+
},
1581+
},
1582+
},
1583+
})
15541584
ctx := context.Background()
15551585
defer testCluster.Stopper().Stop(ctx)
15561586

@@ -1587,7 +1617,15 @@ func TestDrainSqlStats_partialOutage(t *testing.T) {
15871617
defer leaktest.AfterTest(t)()
15881618
defer log.Scope(t).Close(t)
15891619
appName := "drain_stats_app"
1590-
testCluster := serverutils.StartCluster(t, 3, base.TestClusterArgs{})
1620+
testCluster := serverutils.StartCluster(t, 3, base.TestClusterArgs{
1621+
ServerArgs: base.TestServerArgs{
1622+
Knobs: base.TestingKnobs{
1623+
SQLStatsKnobs: &sqlstats.TestingKnobs{
1624+
SynchronousSQLStats: true,
1625+
},
1626+
},
1627+
},
1628+
})
15911629
ctx := context.Background()
15921630
defer testCluster.Stopper().Stop(ctx)
15931631

@@ -1636,15 +1674,18 @@ func TestClusterResetSQLStats(t *testing.T) {
16361674
defer leaktest.AfterTest(t)()
16371675
defer log.Scope(t).Close(t)
16381676

1639-
ctx := context.Background()
1677+
skip.UnderDuress(t)
16401678

1679+
ctx := context.Background()
1680+
knobs := sqlstats.CreateTestingKnobs()
1681+
knobs.SynchronousSQLStats = true
16411682
for _, flushed := range []bool{false, true} {
16421683
t.Run(fmt.Sprintf("flushed=%t", flushed), func(t *testing.T) {
16431684
testCluster := serverutils.StartCluster(t, 3 /* numNodes */, base.TestClusterArgs{
16441685
ServerArgs: base.TestServerArgs{
16451686
Insecure: true,
16461687
Knobs: base.TestingKnobs{
1647-
SQLStatsKnobs: sqlstats.CreateTestingKnobs(),
1688+
SQLStatsKnobs: knobs,
16481689
},
16491690
},
16501691
})

pkg/sql/conn_executor.go

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -430,6 +430,9 @@ type ServerMetrics struct {
430430

431431
// InsightsMetrics contains metrics related to outlier detection.
432432
InsightsMetrics insights.Metrics
433+
434+
// IngesterMetrics contains metrics related to SQL stats ingestion.
435+
IngesterMetrics sslocal.Metrics
433436
}
434437

435438
// NewServer creates a new Server. Start() needs to be called before the Server
@@ -444,6 +447,7 @@ func NewServer(cfg *ExecutorConfig, pool *mon.BytesMonitor) *Server {
444447
sqlstats.MaxMemReportedSQLStatsTxnFingerprints,
445448
serverMetrics.StatsMetrics.ReportedSQLStatsMemoryCurBytesCount,
446449
serverMetrics.StatsMetrics.ReportedSQLStatsMemoryMaxBytesHist,
450+
nil, /* discardedStatsCount */
447451
pool,
448452
nil, /* reportedProvider */
449453
cfg.SQLStatsTestingKnobs,
@@ -454,11 +458,13 @@ func NewServer(cfg *ExecutorConfig, pool *mon.BytesMonitor) *Server {
454458
sqlstats.MaxMemSQLStatsTxnFingerprints,
455459
serverMetrics.StatsMetrics.SQLStatsMemoryCurBytesCount,
456460
serverMetrics.StatsMetrics.SQLStatsMemoryMaxBytesHist,
461+
serverMetrics.StatsMetrics.DiscardedStatsCount,
457462
pool,
458463
reportedSQLStats,
459464
cfg.SQLStatsTestingKnobs,
460465
)
461-
sqlStatsIngester := sslocal.NewSQLStatsIngester(cfg.SQLStatsTestingKnobs, insightsProvider)
466+
sqlStatsIngester := sslocal.NewSQLStatsIngester(
467+
cfg.Settings, cfg.SQLStatsTestingKnobs, serverMetrics.IngesterMetrics, insightsProvider, localSQLStats)
462468
// TODO(117690): Unify StmtStatsEnable and TxnStatsEnable into a single cluster setting.
463469
sqlstats.TxnStatsEnable.SetOnChange(&cfg.Settings.SV, func(_ context.Context) {
464470
if !sqlstats.TxnStatsEnable.Get(&cfg.Settings.SV) {
@@ -672,6 +678,7 @@ func makeServerMetrics(cfg *ExecutorConfig) ServerMetrics {
672678
},
673679
ContentionSubsystemMetrics: txnidcache.NewMetrics(),
674680
InsightsMetrics: insights.NewMetrics(),
681+
IngesterMetrics: sslocal.NewIngesterMetrics(),
675682
}
676683
}
677684

@@ -1256,7 +1263,6 @@ func (s *Server) newConnExecutor(
12561263
s.sqlStatsIngester,
12571264
ex.phaseTimes,
12581265
s.localSqlStats.GetCounters(),
1259-
underOuterTxn,
12601266
s.cfg.SQLStatsTestingKnobs,
12611267
)
12621268
ex.dataMutatorIterator.onApplicationNameChange = func(newName string) {
@@ -4082,17 +4088,7 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper(
40824088
return advanceInfo{}, err
40834089
}
40844090
if advInfo.txnEvent.eventType == txnUpgradeToExplicit {
4085-
// TODO (xinhaoz): This is a temporary hook until
4086-
// https://github.com/cockroachdb/cockroach/issues/141024
4087-
// is resolved. The reason this exists is because we
4088-
// need to recompute the statement fingerprint id for
4089-
// statements currently in the stats collector, which
4090-
// were computed once already for insights. There is an
4091-
// acknowledgement that this means the fingerprint id
4092-
// given to insights for upgraded transactions are
4093-
// currently incorrect.
40944091
ex.extraTxnState.txnFinishClosure.implicit = false
4095-
ex.statsCollector.UpgradeToExplicitTransaction()
40964092
}
40974093
}
40984094
case txnStart:

0 commit comments

Comments
 (0)