Skip to content

Commit 3a62640

Browse files
sql: Fix idle latency tracking
Previously idle latencies were not being tracked accurately in two situations. First, there was massive over counting of idle latencies in transactions that had INSERTs with placeholders (more generally statement that executed a prepare/bind before running). See issue #146116. In these situations we had missed setting the QueryServiced time for the statements within the transaction, resulting in all the intermediate idle latencies calulating their `waitingSince` from the transaction start time. The control flow of the conn executor can be difficult to track and through adding print statements, the difference between a transaction with `INSERT INTO ... VALUES (1)` vs `INSERT INTO ... VALUES ($1)` highlighted that setting QueryServiced at the end of the `ExecStmt` case in `execCmd` meant that for the prepare/bind version of the INSERT statements, we never set QueryServiced. Simple statements: ``` execCmd ExecStmt execStmtInOpenState INSERT INTO test_idle VALUES (1) execCmd Setting QueryServiced execCmd ExecStmt execStmtInOpenState INSERT INTO test_idle VALUES (1) execCmd Setting QueryServiced execCmd ExecStmt execStmtInOpenState INSERT INTO test_idle VALUES (1) execCmd Setting QueryServiced ``` Notice how after every `execStmtInOpenState`, the flow returns to `execCmd`. Whereas in the case below, we just flow back into `execStmtInOpenState`. Prepare/bind statements: ``` prepare statsCollector.Reset bind statsCollector.Reset execStmtInOpenState INSERT INTO test_idle VALUES ($1) prepare statsCollector.Reset bind statsCollector.Reset execStmtInOpenState INSERT INTO test_idle VALUES ($1) prepare statsCollector.Reset bind statsCollector.Reset execStmtInOpenState INSERT INTO test_idle VALUES ($1) ``` The second situation in which idle latencies were not tracked correctly was when observer statements were run in between statements in the transaction. This is a situation that occurs when using the `cockroach sql` cli. In this case, idle latencies were severely undercounted because we'd reset the stats collector before running the observer statement, and then when the non observer statement ran and we recorded the statement summary, the idle latency would be calculated from the previous observer statement time. This commit fixes the first situation by additionally setting QueryServiced after `execStmt`. This commit fixes the second situation by accumulating idle latencies in between observer statements. Lastly this commit removes the existing TestSQLStatsIdleLatencies as it evidently had poor coverage and replaces it with TestTransactionLatencies which enforces invariants like: totalTxnTime >= txnServiceLatency >= txnIdleLatency p.s debugging session phase times is really tedious and we should consider refactoring. Fixes: #146116 Release note (bug fix): Fixes discrepencies between idle and service latencies.
1 parent 5154683 commit 3a62640

File tree

4 files changed

+120
-236
lines changed

4 files changed

+120
-236
lines changed

pkg/sql/conn_executor_exec.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,8 @@ func (ex *connExecutor) execStmt(
135135
if _, ok := ast.(tree.ObserverStatement); ok {
136136
ex.statsCollector.Reset(ex.applicationStats, ex.phaseTimes)
137137
err := ex.runObserverStatement(ctx, ast, res)
138+
ex.extraTxnState.idleLatency += ex.phaseTimes.GetIdleLatency(ex.statsCollector.PreviousPhaseTimes())
139+
ex.phaseTimes.SetSessionPhaseTime(sessionphase.SessionQueryServiced, crtime.NowMono())
138140
// Note that regardless of res.Err(), these observer statements don't
139141
// generate error events; transactions are always allowed to continue.
140142
return nil, nil, err
@@ -178,6 +180,7 @@ func (ex *connExecutor) execStmt(
178180
case eventNonRetryableErrPayload:
179181
ex.recordFailure(p)
180182
}
183+
ex.phaseTimes.SetSessionPhaseTime(sessionphase.SessionQueryServiced, crtime.NowMono())
181184

182185
case stateAborted:
183186
ev, payload = ex.execStmtInAbortedState(ctx, ast, res)

pkg/sql/sessionphase/session_phase.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,9 @@ func NewTimes() *Times {
114114
}
115115

116116
// SetSessionPhaseTime sets the time for a given SessionPhase.
117+
// TODO(alyshan): Session Phase Times are set throughout the conn executor, it is
118+
// a tedious process to track when and where these times are set.
119+
// Found a bug again? Consider refactoring.
117120
func (t *Times) SetSessionPhaseTime(sp SessionPhase, time crtime.Mono) {
118121
t.times[sp] = time
119122
}

pkg/sql/sqlstats/sslocal/sql_stats_test.go

Lines changed: 111 additions & 236 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ package sslocal_test
77

88
import (
99
"context"
10-
gosql "database/sql"
1110
"encoding/json"
1211
"fmt"
1312
"net/url"
@@ -1249,241 +1248,6 @@ func TestEnhancedFingerprintCreation(t *testing.T) {
12491248
}
12501249
}
12511250

1252-
func TestSQLStatsIdleLatencies(t *testing.T) {
1253-
defer leaktest.AfterTest(t)()
1254-
defer log.Scope(t).Close(t)
1255-
1256-
ctx := context.Background()
1257-
srv, db, _ := serverutils.StartServer(t, base.TestServerArgs{})
1258-
defer srv.Stopper().Stop(ctx)
1259-
s := srv.ApplicationLayer()
1260-
1261-
// Max limit safety check on the expected idle latency in seconds. Mostly a
1262-
// paranoia check.
1263-
const idleLatCap float64 = 30
1264-
1265-
testCases := []struct {
1266-
name string
1267-
stmtLats map[string]float64
1268-
txnLat float64
1269-
ops func(*testing.T, *gosql.DB)
1270-
}{
1271-
{
1272-
name: "no latency",
1273-
stmtLats: map[string]float64{"SELECT _": 0},
1274-
txnLat: 0,
1275-
ops: func(t *testing.T, db *gosql.DB) {
1276-
tx, err := db.Begin()
1277-
require.NoError(t, err)
1278-
_, err = tx.Exec("SELECT 1")
1279-
require.NoError(t, err)
1280-
err = tx.Commit()
1281-
require.NoError(t, err)
1282-
},
1283-
},
1284-
{
1285-
name: "no latency (implicit txn)",
1286-
stmtLats: map[string]float64{"SELECT _": 0},
1287-
txnLat: 0,
1288-
ops: func(t *testing.T, db *gosql.DB) {
1289-
// These 100ms don't count because we're not in an explicit
1290-
// transaction.
1291-
time.Sleep(100 * time.Millisecond)
1292-
_, err := db.Exec("SELECT 1")
1293-
require.NoError(t, err)
1294-
},
1295-
},
1296-
{
1297-
name: "no latency - prepared statement (implicit txn)",
1298-
stmtLats: map[string]float64{"SELECT _::INT8": 0},
1299-
txnLat: 0,
1300-
ops: func(t *testing.T, db *gosql.DB) {
1301-
stmt, err := db.Prepare("SELECT $1::INT")
1302-
require.NoError(t, err)
1303-
time.Sleep(100 * time.Millisecond)
1304-
_, err = stmt.Exec(1)
1305-
require.NoError(t, err)
1306-
},
1307-
},
1308-
{
1309-
name: "simple statement",
1310-
stmtLats: map[string]float64{"SELECT _": 0.1},
1311-
txnLat: 0.2,
1312-
ops: func(t *testing.T, db *gosql.DB) {
1313-
tx, err := db.Begin()
1314-
require.NoError(t, err)
1315-
time.Sleep(100 * time.Millisecond)
1316-
_, err = tx.Exec("SELECT 1")
1317-
require.NoError(t, err)
1318-
time.Sleep(100 * time.Millisecond)
1319-
err = tx.Commit()
1320-
require.NoError(t, err)
1321-
},
1322-
},
1323-
{
1324-
name: "simple statement with rollback",
1325-
stmtLats: map[string]float64{"SELECT _": 0.1},
1326-
txnLat: 0.2,
1327-
ops: func(t *testing.T, db *gosql.DB) {
1328-
tx, err := db.Begin()
1329-
require.NoError(t, err)
1330-
time.Sleep(100 * time.Millisecond)
1331-
_, err = tx.Exec("SELECT 1")
1332-
require.NoError(t, err)
1333-
time.Sleep(100 * time.Millisecond)
1334-
err = tx.Rollback()
1335-
require.NoError(t, err)
1336-
},
1337-
},
1338-
{
1339-
name: "compound statement",
1340-
stmtLats: map[string]float64{"SELECT _": 0.1, "SELECT count(*) FROM crdb_internal.statement_statistics": 0},
1341-
txnLat: 0.2,
1342-
ops: func(t *testing.T, db *gosql.DB) {
1343-
tx, err := db.Begin()
1344-
require.NoError(t, err)
1345-
time.Sleep(100 * time.Millisecond)
1346-
_, err = tx.Exec("SELECT 1; SELECT count(*) FROM crdb_internal.statement_statistics")
1347-
require.NoError(t, err)
1348-
time.Sleep(100 * time.Millisecond)
1349-
err = tx.Commit()
1350-
require.NoError(t, err)
1351-
},
1352-
},
1353-
{
1354-
name: "multiple statements - slow generation",
1355-
stmtLats: map[string]float64{"SELECT pg_sleep(_)": 0, "SELECT _": 0},
1356-
txnLat: 0,
1357-
ops: func(t *testing.T, db *gosql.DB) {
1358-
tx, err := db.Begin()
1359-
require.NoError(t, err)
1360-
_, err = tx.Exec("SELECT pg_sleep(1)")
1361-
require.NoError(t, err)
1362-
_, err = tx.Exec("SELECT 1")
1363-
require.NoError(t, err)
1364-
err = tx.Commit()
1365-
require.NoError(t, err)
1366-
},
1367-
},
1368-
{
1369-
name: "prepared statement",
1370-
stmtLats: map[string]float64{"SELECT _::INT8": 0.1},
1371-
txnLat: 0.2,
1372-
ops: func(t *testing.T, db *gosql.DB) {
1373-
stmt, err := db.Prepare("SELECT $1::INT")
1374-
require.NoError(t, err)
1375-
tx, err := db.Begin()
1376-
require.NoError(t, err)
1377-
time.Sleep(100 * time.Millisecond)
1378-
_, err = tx.Stmt(stmt).Exec(1)
1379-
require.NoError(t, err)
1380-
time.Sleep(100 * time.Millisecond)
1381-
err = tx.Commit()
1382-
require.NoError(t, err)
1383-
},
1384-
},
1385-
{
1386-
name: "prepared statement inside transaction",
1387-
stmtLats: map[string]float64{"SELECT _::INT8": 0.1},
1388-
txnLat: 0.2,
1389-
ops: func(t *testing.T, db *gosql.DB) {
1390-
tx, err := db.Begin()
1391-
require.NoError(t, err)
1392-
stmt, err := tx.Prepare("SELECT $1::INT")
1393-
require.NoError(t, err)
1394-
time.Sleep(100 * time.Millisecond)
1395-
_, err = stmt.Exec(1)
1396-
require.NoError(t, err)
1397-
time.Sleep(100 * time.Millisecond)
1398-
err = tx.Commit()
1399-
require.NoError(t, err)
1400-
},
1401-
},
1402-
{
1403-
name: "multiple transactions",
1404-
stmtLats: map[string]float64{"SELECT _": 0.1},
1405-
txnLat: 0.2,
1406-
ops: func(t *testing.T, db *gosql.DB) {
1407-
for i := 0; i < 3; i++ {
1408-
tx, err := db.Begin()
1409-
require.NoError(t, err)
1410-
time.Sleep(100 * time.Millisecond)
1411-
_, err = tx.Exec("SELECT 1")
1412-
require.NoError(t, err)
1413-
time.Sleep(100 * time.Millisecond)
1414-
err = tx.Commit()
1415-
require.NoError(t, err)
1416-
}
1417-
},
1418-
},
1419-
}
1420-
1421-
// Observer connection.
1422-
obsConn := sqlutils.MakeSQLRunner(s.SQLConn(t))
1423-
1424-
for _, tc := range testCases {
1425-
t.Run(tc.name, func(t *testing.T) {
1426-
// Make a separate connection to the database, to isolate the stats
1427-
// we'll observe.
1428-
// Note that we're not using pgx here because it *always* prepares
1429-
// statement, and we want to test our client latency measurements
1430-
// both with and without prepared statements.
1431-
opsDB := s.SQLConn(t)
1432-
1433-
// Set a unique application name for our session, so we can find our
1434-
// stats easily.
1435-
appName := redact.RedactableString(t.Name())
1436-
_, err := opsDB.Exec("SET application_name = $1", appName)
1437-
require.NoError(t, err)
1438-
1439-
// Run the test operations.
1440-
tc.ops(t, opsDB)
1441-
1442-
appFilter := sqlstatstestutil.StatementFilter{App: string(appName)}
1443-
sqlstatstestutil.WaitForStatementEntriesAtLeast(t, obsConn, len(tc.stmtLats), appFilter)
1444-
1445-
// Look for the latencies we expect.
1446-
t.Run("stmt", func(t *testing.T) {
1447-
actual := make(map[string]float64)
1448-
rows, err := db.Query(`
1449-
SELECT metadata->>'query', statistics->'statistics'->'idleLat'->'mean'
1450-
FROM crdb_internal.statement_statistics
1451-
WHERE app_name = $1`, appName)
1452-
require.NoError(t, err)
1453-
for rows.Next() {
1454-
var query string
1455-
var latency float64
1456-
err = rows.Scan(&query, &latency)
1457-
require.NoError(t, err)
1458-
actual[query] = latency
1459-
}
1460-
require.NoError(t, rows.Err())
1461-
// Ensure that all test case statements have at least the
1462-
// minimum expected idle latency and do not exceed the safety
1463-
// check cap.
1464-
for tc_stmt, tc_latency := range tc.stmtLats {
1465-
require.GreaterOrEqual(t, actual[tc_stmt], tc_latency)
1466-
require.Less(t, actual[tc_stmt], idleLatCap)
1467-
}
1468-
})
1469-
1470-
t.Run("txn", func(t *testing.T) {
1471-
var actual float64
1472-
row := db.QueryRow(`
1473-
SELECT statistics->'statistics'->'idleLat'->'mean'
1474-
FROM crdb_internal.transaction_statistics
1475-
WHERE app_name = $1`, appName)
1476-
err := row.Scan(&actual)
1477-
require.NoError(t, err)
1478-
// Ensure the test case transaction has at least the minimum
1479-
// expected idle latency and do not exceed the safety check cap.
1480-
require.GreaterOrEqual(t, actual, tc.txnLat)
1481-
require.Less(t, actual, idleLatCap)
1482-
})
1483-
})
1484-
}
1485-
}
1486-
14871251
type indexInfo struct {
14881252
name string
14891253
table string
@@ -2056,6 +1820,117 @@ func BenchmarkSqlStatsDrain(b *testing.B) {
20561820
}
20571821
}
20581822

1823+
func TestTransactionLatencies(t *testing.T) {
1824+
defer leaktest.AfterTest(t)()
1825+
defer log.Scope(t).Close(t)
1826+
ctx := context.Background()
1827+
1828+
type capturedStats struct {
1829+
syncutil.Mutex
1830+
txnStats []*sqlstats.RecordedTxnStats
1831+
stmtCount int
1832+
}
1833+
stats := &capturedStats{}
1834+
var params base.TestServerArgs
1835+
params.Knobs.SQLExecutor = &sql.ExecutorTestingKnobs{
1836+
AfterExecute: func(ctx context.Context, stmt string, isInternal bool, err error) {
1837+
if isInternal {
1838+
return
1839+
}
1840+
stats.Lock()
1841+
defer stats.Unlock()
1842+
stats.stmtCount++
1843+
},
1844+
OnRecordTxnFinish: func(isInternal bool, phaseTimes *sessionphase.Times, stmt string, txnStats *sqlstats.RecordedTxnStats) {
1845+
if isInternal {
1846+
return
1847+
}
1848+
stats.Lock()
1849+
defer stats.Unlock()
1850+
stats.txnStats = append(stats.txnStats, txnStats)
1851+
},
1852+
}
1853+
1854+
s := serverutils.StartServerOnly(t, params)
1855+
defer s.Stopper().Stop(ctx)
1856+
1857+
verifyTxnStats := func(t *testing.T, lastTxnStats *sqlstats.RecordedTxnStats, expectedIdleLatency time.Duration, totalTxnTime time.Duration) {
1858+
// txn idle latency should be >= expectedIdleLatency.
1859+
require.GreaterOrEqual(t, lastTxnStats.IdleLatency, expectedIdleLatency)
1860+
// txn service latency should be >= txn idle latency.
1861+
require.GreaterOrEqual(t, lastTxnStats.ServiceLatency, lastTxnStats.IdleLatency)
1862+
// txn service latency should be <= txn time.
1863+
require.LessOrEqual(t, lastTxnStats.ServiceLatency, time.Duration(lastTxnStats.TransactionTimeSec*float64(time.Second)))
1864+
// txn time should be roughly equal to the time we tracked (within 2%).
1865+
// Note(alyshan): Most test runs the difference is < 1%, but under stress it can be > 1%.
1866+
require.InDelta(t, totalTxnTime.Seconds(), lastTxnStats.TransactionTimeSec, totalTxnTime.Seconds()*0.05)
1867+
}
1868+
1869+
db := sqlutils.MakeSQLRunner(s.SQLConn(t))
1870+
// Create a test table
1871+
db.Exec(t, "CREATE TABLE test_idle (id INT)")
1872+
t.Run("simple statements", func(t *testing.T) {
1873+
startTime := time.Now()
1874+
db.Exec(t, "BEGIN")
1875+
for i := 0; i < 10; i++ {
1876+
db.Exec(t, "INSERT INTO test_idle VALUES (1)")
1877+
time.Sleep(100 * time.Millisecond)
1878+
}
1879+
db.Exec(t, "COMMIT")
1880+
endTime := time.Now()
1881+
totalTxnTime := endTime.Sub(startTime)
1882+
1883+
stats.Lock()
1884+
defer stats.Unlock()
1885+
lastTxnStats := stats.txnStats[len(stats.txnStats)-1]
1886+
verifyTxnStats(t, lastTxnStats, time.Second, totalTxnTime)
1887+
})
1888+
1889+
t.Run("with intermediate observer statements", func(t *testing.T) {
1890+
startTime := time.Now()
1891+
db.Exec(t, "BEGIN")
1892+
// This mimics the behaviour of the cockroach sql cli.
1893+
for i := 0; i < 10; i++ {
1894+
db.Exec(t, "SHOW SYNTAX 'INSERT INTO test_idle VALUES (1)'")
1895+
db.Exec(t, "INSERT INTO test_idle VALUES (1)")
1896+
db.Exec(t, "SHOW LAST QUERY STATISTICS")
1897+
db.Exec(t, "SHOW TRANSACTION STATUS")
1898+
// SHOW DATABASE is not an observer statement, but we include it here since it mimics
1899+
// the behaviour of the cockroach sql cli.
1900+
// TODO(alyshan): Investigate why SHOW DATABASE is not an observer statement.
1901+
db.Exec(t, "SHOW DATABASE")
1902+
time.Sleep(100 * time.Millisecond)
1903+
}
1904+
db.Exec(t, "COMMIT")
1905+
endTime := time.Now()
1906+
totalTxnTime := endTime.Sub(startTime)
1907+
1908+
stats.Lock()
1909+
defer stats.Unlock()
1910+
lastTxnStats := stats.txnStats[len(stats.txnStats)-1]
1911+
verifyTxnStats(t, lastTxnStats, time.Second, totalTxnTime)
1912+
})
1913+
1914+
t.Run("prepare/bind statements", func(t *testing.T) {
1915+
startTime := time.Now()
1916+
db.Exec(t, "BEGIN")
1917+
for i := 0; i < 10; i++ {
1918+
// Placeholders invoke prepare/bind.
1919+
db.Exec(t, "INSERT INTO test_idle VALUES ($1)", i)
1920+
time.Sleep(100 * time.Millisecond)
1921+
}
1922+
db.Exec(t, "COMMIT")
1923+
endTime := time.Now()
1924+
totalTxnTime := endTime.Sub(startTime)
1925+
1926+
stats.Lock()
1927+
defer stats.Unlock()
1928+
lastTxnStats := stats.txnStats[len(stats.txnStats)-1]
1929+
verifyTxnStats(t, lastTxnStats, time.Second, totalTxnTime)
1930+
})
1931+
1932+
}
1933+
20591934
func createNewSqlStats() *sslocal.SQLStats {
20601935
st := cluster.MakeTestingClusterSettings()
20611936
monitor := mon.NewUnlimitedMonitor(context.Background(), mon.Options{

0 commit comments

Comments
 (0)