Skip to content

Commit d36b323

Browse files
committed
sql: fix transaction diagnostics for implicit txns
Fixes transaction diagnostic collection to work for implicit transactions. This is done by allowing the txnInstrumentationHelper to finalize a collection if the diagnostics collector is still in the "in progress" state. The only time it should still be in this state is if collecting a transaction bundle for an implicit transaction. This is because explicit transactions will have updated the state to the "ready to finalize" state if a terminal statement has been executed and the request has been fulfilled, or the "reset" state if terminal statement has been executed and the request hasnt been fulfilled. In either case, a terminal statement has been executed. This is different from an implicit transaction, where a statement is auto commited and no explicit terminal statement is ever executed. Epic: CRDB-53541 Release note: None
1 parent 29de882 commit d36b323

File tree

2 files changed

+90
-30
lines changed

2 files changed

+90
-30
lines changed

pkg/sql/stmtdiagnostics/statement_diagnostics_test.go

Lines changed: 87 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -812,7 +812,7 @@ func TestTxnBundleCollection(t *testing.T) {
812812
sqlConn := sqlutils.MakeSQLRunner(tc.ServerConn(0))
813813

814814
// Execute to transaction so that it is in transaction_statistics
815-
executeTransactions(t, sqlConn, txnStatements)
815+
executeTransaction(t, sqlConn, txnStatements)
816816

817817
// Get the fingerprint id and statement fingerprint ids for the transaction
818818
// to make the txn diagnostics request
@@ -928,9 +928,7 @@ LIMIT 1`)
928928
require.NoError(t, err)
929929

930930
// Ensure that the request shows up in the system.transaction_diagnostics_requests table
931-
var count int
932-
sqlConn.QueryRow(t, "SELECT count(*) FROM system.transaction_diagnostics_requests WHERE completed=false and id=$1", reqId).Scan(&count)
933-
require.Equal(t, 1, count)
931+
require.False(t, getRequestCompletedStatus(t, sqlConn, reqId))
934932

935933
// Execute the transaction until the request is marked as complete.
936934
// We use a connection to a different server than the diagnostics request
@@ -1016,18 +1014,15 @@ LIMIT 1`)
10161014
require.NoError(t, err)
10171015

10181016
// Ensure that the request shows up in the system.transaction_diagnostics_requests table
1019-
var count int
1020-
sqlConn.QueryRow(t, "SELECT count(*) FROM system.transaction_diagnostics_requests WHERE completed=false and id=$1", reqId).Scan(&count)
1021-
require.Equal(t, 1, count)
1017+
require.False(t, getRequestCompletedStatus(t, sqlConn, reqId))
10221018
runTxnUntilDiagnosticsCollected(t, tc, sqlConn, txnStatements, reqId)
1023-
// Ensure there are no more active transaction diagnostics requests
1024-
sqlConn.QueryRow(t, "SELECT count(*) FROM system.transaction_diagnostics_requests WHERE completed=false and id=$1", reqId).Scan(&count)
1025-
require.Equal(t, 0, count)
1019+
// Ensure the request is complete
1020+
require.True(t, getRequestCompletedStatus(t, sqlConn, reqId))
10261021
// make sure the statement diagnostics request is still incomplete
10271022
sqlConn.QueryRow(t, "SELECT count(*) FROM system.statement_diagnostics_requests WHERE completed=false and id=$1", stmtReqId).Scan(&stmtcount)
10281023
require.Equal(t, 1, stmtcount)
10291024

1030-
executeTransactions(t, sqlConn, txnStatements)
1025+
executeTransaction(t, sqlConn, txnStatements)
10311026
// should be complete now that there is no transaction diagnostics request
10321027
sqlConn.QueryRow(t, "SELECT count(*) FROM system.statement_diagnostics_requests WHERE completed=true and id=$1", stmtReqId).Scan(&stmtcount)
10331028
require.Equal(t, 1, stmtcount)
@@ -1048,9 +1043,7 @@ LIMIT 1`)
10481043
require.NoError(t, err)
10491044

10501045
// Ensure that the request shows up in the system.transaction_diagnostics_requests table
1051-
var count int
1052-
sqlConn.QueryRow(t, "SELECT count(*) FROM system.transaction_diagnostics_requests WHERE completed=false and id=$1", reqId).Scan(&count)
1053-
require.Equal(t, 1, count)
1046+
require.False(t, getRequestCompletedStatus(t, sqlConn, reqId))
10541047

10551048
conn2 := sqlutils.MakeSQLRunner(tc.ServerConn(1))
10561049
conn3 := sqlutils.MakeSQLRunner(tc.ServerConn(2))
@@ -1078,16 +1071,79 @@ LIMIT 1`)
10781071
conn3.Exec(t, "COMMIT")
10791072
conn2.Exec(t, "COMMIT")
10801073

1081-
sqlConn.QueryRow(t, "SELECT count(*) FROM system.transaction_diagnostics_requests WHERE completed=true and id=$1", reqId).Scan(&count)
1082-
require.Equal(t, 1, count)
1074+
require.True(t, getRequestCompletedStatus(t, sqlConn, reqId))
10831075

1076+
var count int
10841077
sqlConn.QueryRow(t, ""+
10851078
"SELECT count(*) FROM system.statement_diagnostics sd "+
10861079
"JOIN system.transaction_diagnostics_requests tdr on tdr.transaction_diagnostics_id = sd.transaction_diagnostics_id "+
10871080
"WHERE tdr.id=$1", reqId).Scan(&count)
10881081

10891082
require.Equal(t, 3, count)
10901083
})
1084+
t.Run("partial match transaction", func(t *testing.T) {
1085+
reqId, err := registry.InsertTxnRequestInternal(
1086+
ctx,
1087+
txnFingerprintId,
1088+
statementFingerprintIds,
1089+
"testuser",
1090+
0,
1091+
0,
1092+
0,
1093+
false,
1094+
)
1095+
require.NoError(t, err)
1096+
1097+
// Ensure that the request shows up in the system.transaction_diagnostics_requests table
1098+
require.False(t, getRequestCompletedStatus(t, sqlConn, reqId))
1099+
1100+
executeTransaction(t, sqlConn, []string{txnStatements[0]})
1101+
require.False(t, getRequestCompletedStatus(t, sqlConn, reqId))
1102+
1103+
sqlConn.Exec(t, txnStatements[0])
1104+
require.False(t, getRequestCompletedStatus(t, sqlConn, reqId))
1105+
1106+
executeTransaction(t, sqlConn, txnStatements)
1107+
runTxnUntilDiagnosticsCollected(t, tc, sqlConn, txnStatements, reqId)
1108+
})
1109+
1110+
t.Run("implicit txn", func(t *testing.T) {
1111+
sqlConn.Exec(t, "CREATE DATABASE mydb")
1112+
sqlConn.Exec(t, "USE mydb")
1113+
sqlConn.Exec(t, "CREATE TABLE my_table(a int, b int)")
1114+
sqlConn.Exec(t, "INSERT INTO my_table VALUES (1,1)")
1115+
var fingerprintIdBytes []byte
1116+
var txnFingerprintIdBytes []byte
1117+
sqlConn.QueryRow(t, `
1118+
SELECT fingerprint_id, transaction_fingerprint_id
1119+
FROM crdb_internal.statement_statistics
1120+
WHERE metadata->>'db' = 'mydb'
1121+
AND metadata->>'query'
1122+
LIKE 'INSERT INTO my_table%'
1123+
LIMIT 1`).Scan(&fingerprintIdBytes, &txnFingerprintIdBytes)
1124+
_, fingerprintId, err := encoding.DecodeUint64Ascending(fingerprintIdBytes)
1125+
require.NoError(t, err)
1126+
_, txnFingerprintId, err := encoding.DecodeUint64Ascending(txnFingerprintIdBytes)
1127+
require.NoError(t, err)
1128+
1129+
// Insert a request for the transaction fingerprint
1130+
reqId, err := registry.InsertTxnRequestInternal(
1131+
ctx,
1132+
txnFingerprintId,
1133+
[]uint64{fingerprintId},
1134+
"testuser",
1135+
0,
1136+
0,
1137+
0,
1138+
false,
1139+
)
1140+
require.NoError(t, err)
1141+
require.False(t, getRequestCompletedStatus(t, sqlConn, reqId))
1142+
1143+
sqlConn.Exec(t, "INSERT INTO my_table VALUES (1,1)")
1144+
require.True(t, getRequestCompletedStatus(t, sqlConn, reqId))
1145+
})
1146+
10911147
}
10921148

10931149
func TestRequestTxnBundleBuiltin(t *testing.T) {
@@ -1111,7 +1167,7 @@ func TestRequestTxnBundleBuiltin(t *testing.T) {
11111167
// The built-in requires that the transaction fingerprint exists in
11121168
// the system.transaction_statistics table, so we execute the transaction
11131169
// once.
1114-
executeTransactions(t, runner, txnStatements)
1170+
executeTransaction(t, runner, txnStatements)
11151171

11161172
// Get the fingerprint id and statement fingerprint ids for the transaction
11171173
// to make the txn diagnostics request
@@ -1334,7 +1390,7 @@ LIMIT 1`)
13341390
})
13351391
}
13361392

1337-
func executeTransactions(t *testing.T, runner *sqlutils.SQLRunner, statements []string) {
1393+
func executeTransaction(t *testing.T, runner *sqlutils.SQLRunner, statements []string) {
13381394
t.Helper()
13391395
runner.Exec(t, "BEGIN")
13401396
for _, stmt := range statements {
@@ -1352,18 +1408,10 @@ func runTxnUntilDiagnosticsCollected(
13521408
) {
13531409
t.Helper()
13541410
testutils.SucceedsSoon(t, func() error {
1355-
executeTransactions(t, sqlConn, statements)
1356-
r := tc.ServerConn(0).QueryRow("SELECT count(*) FROM system.transaction_diagnostics_requests WHERE completed=true and id=$1", reqId)
1357-
if r.Err() != nil {
1358-
return r.Err()
1359-
}
1360-
var count int
1361-
err := r.Scan(&count)
1362-
if err != nil {
1363-
return err
1364-
}
1365-
if count != 1 {
1366-
return errors.Newf("expected 1 completed bundle, got %d", count)
1411+
executeTransaction(t, sqlConn, statements)
1412+
completed := getRequestCompletedStatus(t, sqlConn, reqId)
1413+
if !completed {
1414+
return errors.New("expected request to be completed")
13671415
}
13681416
return nil
13691417
})
@@ -1379,3 +1427,12 @@ func runTxnUntilDiagnosticsCollected(
13791427
return nil
13801428
})
13811429
}
1430+
1431+
func getRequestCompletedStatus(
1432+
t *testing.T, runner *sqlutils.SQLRunner, reqId stmtdiagnostics.RequestID,
1433+
) bool {
1434+
t.Helper()
1435+
var completed bool
1436+
runner.QueryRow(t, "SELECT completed FROM system.transaction_diagnostics_requests WHERE id=$1", reqId).Scan(&completed)
1437+
return completed
1438+
}

pkg/sql/txn_instrumentation.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -280,6 +280,9 @@ func (h *txnInstrumentationHelper) ShouldRedact() bool {
280280
func (h *txnInstrumentationHelper) Finalize(ctx context.Context, executionDuration time.Duration) {
281281
defer h.Reset()
282282
collector := h.diagnosticsCollector
283+
if collector.InProgress() {
284+
collector.UpdateState(txnDiagnosticsReadyToFinalize)
285+
}
283286
if collector.ShouldCollect(executionDuration) {
284287
collector.collectTrace()
285288
buf, err := collector.z.Finalize()

0 commit comments

Comments
 (0)