Skip to content

Commit 771d3f7

Browse files
craig[bot]xinhaozstevendanna
committed
107081: sql: add `StmtPosInTxn` to CommonSQLExecDetails r=xinhaoz a=xinhaoz This commit adds the `StmtPosInTxn` field to the `CommonSQLExecDetails` event. `StmtPosInTxn` represents the stmt's index in the transaction, starting at 1. Epic: none Release note (sql change): In `CommonSQLExecDetails`, which is emitted as part of the sql audit logs, sql exec logs and telemetry events, there is a new field: - `StmtPosInTxn`: represents the stmt's index in the transaction, starting at 1. 107437: testserver: reduce wait for nodelocal capability r=knz a=stevendanna This adds the ability to restart the rangefeedcache used by the tenant capability watcher and then issues such a restart before waiting for the nodelocal capability. The initial scan the restart forces typically takes less time than waiting out the closed timestamp. Epic: CRDB-18499 Release note: None Co-authored-by: Xin Hao Zhang <[email protected]> Co-authored-by: Steven Danna <[email protected]>
3 parents 4122999 + 8b12b72 + 132211f commit 771d3f7

File tree

11 files changed

+178
-54
lines changed

11 files changed

+178
-54
lines changed

docs/generated/eventlog.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -438,6 +438,7 @@ is directly or indirectly a member of the admin role) executes a query.
438438
| `FullIndexScan` | Whether the query contains a full secondary index scan of a non-partial index. | no |
439439
| `TxnCounter` | The sequence number of the SQL transaction inside its session. | no |
440440
| `BulkJobId` | The job id for bulk job (IMPORT/BACKUP/RESTORE). | no |
441+
| `StmtPosInTxn` | The statement's index in the transaction, starting at 1. | no |
441442

442443
### `role_based_audit_event`
443444

@@ -473,6 +474,7 @@ cluster setting.
473474
| `FullIndexScan` | Whether the query contains a full secondary index scan of a non-partial index. | no |
474475
| `TxnCounter` | The sequence number of the SQL transaction inside its session. | no |
475476
| `BulkJobId` | The job id for bulk job (IMPORT/BACKUP/RESTORE). | no |
477+
| `StmtPosInTxn` | The statement's index in the transaction, starting at 1. | no |
476478

477479
### `sensitive_table_access`
478480

@@ -508,6 +510,7 @@ a table marked as audited.
508510
| `FullIndexScan` | Whether the query contains a full secondary index scan of a non-partial index. | no |
509511
| `TxnCounter` | The sequence number of the SQL transaction inside its session. | no |
510512
| `BulkJobId` | The job id for bulk job (IMPORT/BACKUP/RESTORE). | no |
513+
| `StmtPosInTxn` | The statement's index in the transaction, starting at 1. | no |
511514

512515
## SQL Execution Log
513516

@@ -550,6 +553,7 @@ and the cluster setting `sql.trace.log_statement_execute` is set.
550553
| `FullIndexScan` | Whether the query contains a full secondary index scan of a non-partial index. | no |
551554
| `TxnCounter` | The sequence number of the SQL transaction inside its session. | no |
552555
| `BulkJobId` | The job id for bulk job (IMPORT/BACKUP/RESTORE). | no |
556+
| `StmtPosInTxn` | The statement's index in the transaction, starting at 1. | no |
553557

554558
## SQL Logical Schema Changes
555559

@@ -2350,6 +2354,7 @@ set to a non-zero value, AND
23502354
| `FullIndexScan` | Whether the query contains a full secondary index scan of a non-partial index. | no |
23512355
| `TxnCounter` | The sequence number of the SQL transaction inside its session. | no |
23522356
| `BulkJobId` | The job id for bulk job (IMPORT/BACKUP/RESTORE). | no |
2357+
| `StmtPosInTxn` | The statement's index in the transaction, starting at 1. | no |
23532358

23542359
### `txn_rows_read_limit`
23552360

@@ -2470,6 +2475,7 @@ the "slow query" condition.
24702475
| `FullIndexScan` | Whether the query contains a full secondary index scan of a non-partial index. | no |
24712476
| `TxnCounter` | The sequence number of the SQL transaction inside its session. | no |
24722477
| `BulkJobId` | The job id for bulk job (IMPORT/BACKUP/RESTORE). | no |
2478+
| `StmtPosInTxn` | The statement's index in the transaction, starting at 1. | no |
24732479

24742480
### `txn_rows_read_limit_internal`
24752481

@@ -3021,6 +3027,7 @@ contains common SQL event/execution details.
30213027
| `FullIndexScan` | Whether the query contains a full secondary index scan of a non-partial index. | no |
30223028
| `TxnCounter` | The sequence number of the SQL transaction inside its session. | no |
30233029
| `BulkJobId` | The job id for bulk job (IMPORT/BACKUP/RESTORE). | no |
3030+
| `StmtPosInTxn` | The statement's index in the transaction, starting at 1. | no |
30243031

30253032
### `schema_descriptor`
30263033

pkg/cli/interactive_tests/test_exec_log.tcl

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -116,12 +116,12 @@ grep 'SELECT ..*550..* +' $logfile;
116116
exit 1;"
117117

118118
# Two separate single-stmt txns.
119-
system "n=`grep 'SELECT ..*550..* +' $logfile | sed -e 's/.*TxnCounter.:\\(\[0-9\]*\\)/\\1/g' | uniq | wc -l`; if test \$n -ne 2; then echo unexpected \$n; exit 1; fi"
119+
system "n=`grep 'SELECT ..*550..* +' $logfile | sed -e 's/.*TxnCounter.:\\(\[0-9\]*\\).*/\\1/g' | uniq | wc -l`; if test \$n -ne 2; then echo unexpected \$n; exit 1; fi"
120120
# Same txns.
121-
system "n=`grep 'SELECT ..*660..* +' $logfile | sed -e 's/.*TxnCounter.:\\(\[0-9\]*\\)/\\1/g' | uniq | wc -l`; if test \$n -ne 1; then echo unexpected \$n; exit 1; fi"
122-
system "n=`grep 'SELECT ..*770..* +' $logfile | sed -e 's/.*TxnCounter.:\\(\[0-9\]*\\)/\\1/g' | uniq | wc -l`; if test \$n -ne 1; then echo unexpected \$n; exit 1; fi"
123-
system "n=`grep 'SELECT ..*880..* +' $logfile | sed -e 's/.*TxnCounter.:\\(\[0-9\]*\\)/\\1/g' | uniq | wc -l`; if test \$n -ne 1; then echo unexpected \$n; exit 1; fi"
124-
system "n=`grep 'SELECT ..*990..* +' $logfile | sed -e 's/.*TxnCounter.:\\(\[0-9\]*\\)/\\1/g' | uniq | wc -l`; if test \$n -ne 1; then echo unexpected \$n; exit 1; fi"
121+
system "n=`grep 'SELECT ..*660..* +' $logfile | sed -e 's/.*TxnCounter.:\\(\[0-9\]*\\).*/\\1/g' | uniq | wc -l`; if test \$n -ne 1; then echo unexpected \$n; exit 1; fi"
122+
system "n=`grep 'SELECT ..*770..* +' $logfile | sed -e 's/.*TxnCounter.:\\(\[0-9\]*\\).*/\\1/g' | uniq | wc -l`; if test \$n -ne 1; then echo unexpected \$n; exit 1; fi"
123+
system "n=`grep 'SELECT ..*880..* +' $logfile | sed -e 's/.*TxnCounter.:\\(\[0-9\]*\\).*/\\1/g' | uniq | wc -l`; if test \$n -ne 1; then echo unexpected \$n; exit 1; fi"
124+
system "n=`grep 'SELECT ..*990..* +' $logfile | sed -e 's/.*TxnCounter.:\\(\[0-9\]*\\).*/\\1/g' | uniq | wc -l`; if test \$n -ne 1; then echo unexpected \$n; exit 1; fi"
125125

126126
end_test
127127

pkg/kv/kvclient/rangefeed/rangefeedcache/watcher.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,9 @@ type Watcher struct {
7777

7878
lastFrontierTS hlc.Timestamp // used to assert monotonicity across rangefeed attempts
7979

80+
// Used to force a restart during testing.
81+
restartErrCh chan error
82+
8083
knobs TestingKnobs
8184
}
8285

@@ -177,6 +180,7 @@ func NewWatcher(
177180
withPrevValue: withPrevValue,
178181
translateEvent: translateEvent,
179182
onUpdate: onUpdate,
183+
restartErrCh: make(chan error),
180184
}
181185
if knobs != nil {
182186
w.knobs = *knobs
@@ -353,12 +357,24 @@ func (s *Watcher) Run(ctx context.Context) error {
353357

354358
case err := <-errCh:
355359
return err
360+
case err := <-s.restartErrCh:
361+
return err
356362
case err := <-s.knobs.ErrorInjectionCh:
357363
return err
358364
}
359365
}
360366
}
361367

368+
var restartErr = errors.New("testing restart requested")
369+
370+
// TestingRestart injects an error into the rangefeed cache, forcing
371+
// it to restart. This is separate from the testing knob so that we
372+
// can force a restart from test infrastructure without overriding the
373+
// user-provided testing knobs.
374+
func (s *Watcher) TestingRestart() {
375+
s.restartErrCh <- restartErr
376+
}
377+
362378
func (s *Watcher) handleUpdate(
363379
ctx context.Context, buffer *rangefeedbuffer.Buffer, ts hlc.Timestamp, updateType UpdateType,
364380
) {

pkg/multitenant/tenantcapabilities/tenantcapabilitieswatcher/watcher.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,10 @@ type Watcher struct {
5454
startCh chan struct{}
5555
startErr error
5656

57+
// rfc provides access to the underlying
58+
// rangefeedcache.Watcher for testing.
59+
rfc *rangefeedcache.Watcher
60+
5761
// initialScan is used to synchronize the Start() method with the
5862
// reception of the initial batch of values from the rangefeed
5963
// (which happens asynchronously).
@@ -212,6 +216,7 @@ func (w *Watcher) startRangeFeed(ctx context.Context) error {
212216
if err := rangefeedcache.Start(ctx, w.stopper, rfc, w.onError); err != nil {
213217
return err
214218
}
219+
w.rfc = rfc
215220

216221
// Wait for the initial scan before returning.
217222
select {
@@ -346,6 +351,12 @@ func (w *Watcher) handleIncrementalUpdate(
346351
}
347352
}
348353

354+
func (w *Watcher) TestingRestart() {
355+
if w.rfc != nil {
356+
w.rfc.TestingRestart()
357+
}
358+
}
359+
349360
// TestingFlushCapabilitiesState flushes the underlying global tenant capability
350361
// state for testing purposes. The returned entries are sorted by tenant ID.
351362
func (w *Watcher) TestingFlushCapabilitiesState() (entries []tenantcapabilities.Entry) {

pkg/server/testserver.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1201,18 +1201,13 @@ func (ts *TestServer) StartTenant(
12011201
baseCfg.DisableTLSForHTTP = params.DisableTLSForHTTP
12021202
baseCfg.EnableDemoLoginEndpoint = params.EnableDemoLoginEndpoint
12031203

1204-
// Waiting for capabilities can take 3+ seconds since the
1205-
// rangefeedcache needs to wait for the closed timestamp
1206-
// before flushing updates. To avoid paying this cost in all
1207-
// cases, we only set the nodelocal storage capability if the
1208-
// caller has configured an ExternalIODir since nodelocal
1209-
// storage only works with that configured.
1204+
// Waiting for capabilities can time To avoid paying this cost in all
1205+
// cases, we only set the nodelocal storage capability if the caller has
1206+
// configured an ExternalIODir since nodelocal storage only works with
1207+
// that configured.
12101208
//
1211-
// TODO(ssd): We should do more here. We could have the caller
1212-
// pass in explicitly that they want these capabilities. Or,
1213-
// we could modify the system in some way to avoid waiting on
1214-
// capabilities for so long. Also, note that this doesn't
1215-
// apply to StartSharedProcessTenant.
1209+
// TODO(ssd): We do not set this capability in
1210+
// StartSharedProcessTenant.
12161211
shouldGrantNodelocalCap := ts.params.ExternalIODir != ""
12171212
canGrantNodelocalCap := ts.ClusterSettings().Version.IsActive(ctx, clusterversion.V23_1TenantCapabilities)
12181213
if canGrantNodelocalCap && shouldGrantNodelocalCap {
@@ -1225,6 +1220,11 @@ func (ts *TestServer) StartTenant(
12251220
return nil, err
12261221
}
12271222
} else {
1223+
// Restart the capabilities watcher. Restarting the
1224+
// watcher forces a new initial scan which is faster
1225+
// than waiting out the closed timestamp interval
1226+
// required to see new updates.
1227+
ts.tenantCapabilitiesWatcher.TestingRestart()
12281228
if err := testutils.SucceedsSoonError(func() error {
12291229
capabilities, found := ts.TenantCapabilitiesReader().GetCapabilities(params.TenantID)
12301230
if !found {

pkg/sql/conn_executor.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2792,6 +2792,7 @@ func (ex *connExecutor) execCopyOut(
27922792
int(ex.state.mu.autoRetryCounter),
27932793
ex.extraTxnState.txnCounter,
27942794
numOutputRows,
2795+
ex.state.mu.stmtCount,
27952796
0, /* bulkJobId */
27962797
copyErr,
27972798
ex.statsCollector.PhaseTimes().GetSessionPhaseTime(sessionphase.SessionQueryReceived),
@@ -3036,7 +3037,8 @@ func (ex *connExecutor) execCopyIn(
30363037
var stats topLevelQueryStats
30373038
ex.planner.maybeLogStatement(ctx, ex.executorType, true,
30383039
int(ex.state.mu.autoRetryCounter), ex.extraTxnState.txnCounter,
3039-
numInsertedRows, 0, /* bulkJobId */
3040+
numInsertedRows, ex.state.mu.stmtCount,
3041+
0, /* bulkJobId */
30403042
copyErr,
30413043
ex.statsCollector.PhaseTimes().GetSessionPhaseTime(sessionphase.SessionQueryReceived),
30423044
&ex.extraTxnState.hasAdminRoleCache,

pkg/sql/conn_executor_exec.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1512,6 +1512,7 @@ func (ex *connExecutor) dispatchToExecutionEngine(
15121512
int(ex.state.mu.autoRetryCounter),
15131513
ex.extraTxnState.txnCounter,
15141514
ppInfo.dispatchToExecutionEngine.rowsAffected,
1515+
ex.state.mu.stmtCount,
15151516
bulkJobId,
15161517
ppInfo.curRes.ErrAllowReleased(),
15171518
ex.statsCollector.PhaseTimes().GetSessionPhaseTime(sessionphase.SessionQueryReceived),
@@ -1539,6 +1540,7 @@ func (ex *connExecutor) dispatchToExecutionEngine(
15391540
int(ex.state.mu.autoRetryCounter),
15401541
ex.extraTxnState.txnCounter,
15411542
nonBulkJobNumRows,
1543+
ex.state.mu.stmtCount,
15421544
bulkJobId,
15431545
res.Err(),
15441546
ex.statsCollector.PhaseTimes().GetSessionPhaseTime(sessionphase.SessionQueryReceived),

pkg/sql/exec_log.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ func (p *planner) maybeLogStatement(
121121
ctx context.Context,
122122
execType executorType,
123123
isCopy bool,
124-
numRetries, txnCounter, rows int,
124+
numRetries, txnCounter, rows, stmtCount int,
125125
bulkJobId uint64,
126126
err error,
127127
queryReceived time.Time,
@@ -133,7 +133,7 @@ func (p *planner) maybeLogStatement(
133133
) {
134134
p.maybeAuditRoleBasedAuditEvent(ctx, execType)
135135
p.maybeLogStatementInternal(ctx, execType, isCopy, numRetries, txnCounter,
136-
rows, bulkJobId, err, queryReceived, hasAdminRoleCache,
136+
rows, stmtCount, bulkJobId, err, queryReceived, hasAdminRoleCache,
137137
telemetryLoggingMetrics, stmtFingerprintID, queryStats, statsCollector,
138138
)
139139
}
@@ -142,7 +142,7 @@ func (p *planner) maybeLogStatementInternal(
142142
ctx context.Context,
143143
execType executorType,
144144
isCopy bool,
145-
numRetries, txnCounter, rows int,
145+
numRetries, txnCounter, rows, stmtCount int,
146146
bulkJobId uint64,
147147
err error,
148148
startTime time.Time,
@@ -215,6 +215,7 @@ func (p *planner) maybeLogStatementInternal(
215215
FullTableScan: p.curPlan.flags.IsSet(planFlagContainsFullTableScan),
216216
FullIndexScan: p.curPlan.flags.IsSet(planFlagContainsFullIndexScan),
217217
TxnCounter: uint32(txnCounter),
218+
StmtPosInTxn: uint32(stmtCount),
218219
}
219220

220221
// Note that for bulk job query (IMPORT, BACKUP and RESTORE), we don't

0 commit comments

Comments
 (0)