Skip to content

Commit 1140923

Browse files
Add TransactionsProcessed metric to track transactions at VTGate (vitessio#18171)
Signed-off-by: Harshit Gangal <[email protected]>
1 parent b42c0df commit 1140923

File tree

4 files changed

+135
-17
lines changed

4 files changed

+135
-17
lines changed

changelog/23.0/23.0.0/summary.md

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
## Summary
22

33
### Table of Contents
4+
45
- **[Minor Changes](#minor-changes)**
56
- **[Deletions](#deletions)**
67
- [Metrics](#deleted-metrics)
7-
- **[VTTablet](#minor-changes-vttablet)**
8-
- [CLI Flags](#flags-vttablet)
9-
- [Managed MySQL configuration defaults to caching-sha2-password](#mysql-caching-sha2-password)
8+
- **[New Metrics](#new-metrics)**
9+
- [VTGate](#new-vtgate-metrics)
10+
- **[VTTablet](#minor-changes-vttablet)**
11+
- [CLI Flags](#flags-vttablet)
12+
- [Managed MySQL configuration defaults to caching-sha2-password](#mysql-caching-sha2-password)
1013

1114
## <a id="minor-changes"/>Minor Changes</a>
1215

@@ -21,6 +24,14 @@
2124
| `vtgate` | `QueriesProcessedByTable` | `v22.0.0` | [#17727](https://github.com/vitessio/vitess/pull/17727) |
2225
| `vtgate` | `QueriesRoutedByTable` | `v22.0.0` | [#17727](https://github.com/vitessio/vitess/pull/17727) |
2326

27+
### <a id="new-metrics"/>New Metrics
28+
29+
#### <a id="new-vtgate-metrics"/>VTGate
30+
31+
| Name | Dimensions | Description | PR |
32+
|:-----------------------:|:---------------:|:-----------------------------------------------------------------------------------:|:-------------------------------------------------------:|
33+
| `TransactionsProcessed` | `Shard`, `Type` | Counts transactions processed at VTGate by shard distribution and transaction type. | [#18171](https://github.com/vitessio/vitess/pull/18171) |
34+
2435
### <a id="minor-changes-vttablet"/>VTTablet</a>
2536

2637
#### <a id="flags-vttablet"/>CLI Flags</a>

go/vt/vtgate/executor.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ var (
7575
queryExecutions = stats.NewCountersWithMultiLabels("QueryExecutions", "Counts queries executed at VTGate by query type, plan type, and tablet type.", []string{"Query", "Plan", "Tablet"})
7676
queryRoutes = stats.NewCountersWithMultiLabels("QueryRoutes", "Counts queries routed from VTGate to VTTablet by query type, plan type, and tablet type.", []string{"Query", "Plan", "Tablet"})
7777
queryExecutionsByTable = stats.NewCountersWithMultiLabels("QueryExecutionsByTable", "Counts queries executed at VTGate per table by query type and table.", []string{"Query", "Table"})
78+
txProcessed = stats.NewCountersWithMultiLabels("TransactionsProcessed", "Counts transactions processed at VTGate by shard distribution (single or cross), transaction type (read write or read only)", []string{"Shard", "Type"})
7879

7980
// commitMode records the timing of the commit phase of a transaction.
8081
// It also tracks between different transaction mode i.e. Single, Multi and TwoPC

go/vt/vtgate/tx_conn.go

Lines changed: 57 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,29 @@ var txAccessModeToEOTxAccessMode = map[sqlparser.TxAccessMode]querypb.ExecuteOpt
6262
sqlparser.ReadOnly: querypb.ExecuteOptions_READ_ONLY,
6363
}
6464

65+
const (
66+
SingleShardTransaction = "Single"
67+
CrossShardTransaction = "Cross"
68+
)
69+
70+
type txType int
71+
72+
const (
73+
TXReadOnly txType = iota
74+
TXReadWrite
75+
)
76+
77+
func (tt txType) String() string {
78+
switch tt {
79+
case TXReadOnly:
80+
return "ReadOnly"
81+
case TXReadWrite:
82+
return "ReadWrite"
83+
default:
84+
return "Unknown"
85+
}
86+
}
87+
6588
type commitPhase int
6689

6790
const (
@@ -126,10 +149,12 @@ func (txc *TxConn) Commit(ctx context.Context, session *econtext.SafeSession) er
126149
return err
127150
}
128151

152+
shardDistribution := getShardDistribution(session.ShardSessions)
153+
var txnType txType
129154
if twopc {
130-
err = txc.commit2PC(ctx, session)
155+
txnType, err = txc.commit2PC(ctx, session)
131156
} else {
132-
err = txc.commitNormal(ctx, session)
157+
txnType, err = txc.commitNormal(ctx, session)
133158
}
134159

135160
if err != nil {
@@ -146,9 +171,17 @@ func (txc *TxConn) Commit(ctx context.Context, session *econtext.SafeSession) er
146171
_ = txc.Release(ctx, session)
147172
}
148173
}
174+
txProcessed.Add([]string{shardDistribution, txnType.String()}, 1)
149175
return nil
150176
}
151177

178+
func getShardDistribution(sessions []*vtgatepb.Session_ShardSession) string {
179+
if len(sessions) > 1 {
180+
return CrossShardTransaction
181+
}
182+
return SingleShardTransaction
183+
}
184+
152185
func recordCommitTime(session *econtext.SafeSession, twopc bool, startTime time.Time) {
153186
switch {
154187
case len(session.ShardSessions) == 0:
@@ -193,9 +226,13 @@ func (txc *TxConn) commitShard(ctx context.Context, s *vtgatepb.Session_ShardSes
193226
return nil
194227
}
195228

196-
func (txc *TxConn) commitNormal(ctx context.Context, session *econtext.SafeSession) error {
229+
func (txc *TxConn) commitNormal(ctx context.Context, session *econtext.SafeSession) (txType, error) {
230+
txnType := TXReadOnly
197231
// Retain backward compatibility on commit order for the normal session.
198232
for i, shardSession := range session.ShardSessions {
233+
if txnType == TXReadOnly && shardSession.RowsAffected {
234+
txnType = TXReadWrite
235+
}
199236
if err := txc.commitShard(ctx, shardSession, session.GetLogger()); err != nil {
200237
if i > 0 {
201238
nShards := i
@@ -217,14 +254,14 @@ func (txc *TxConn) commitNormal(ctx context.Context, session *econtext.SafeSessi
217254
})
218255
warnings.Add("NonAtomicCommit", 1)
219256
}
220-
return err
257+
return txnType, err
221258
}
222259
}
223-
return nil
260+
return txnType, nil
224261
}
225262

226263
// commit2PC will not used the pinned tablets - to make sure we use the current source, we need to use the gateway's queryservice
227-
func (txc *TxConn) commit2PC(ctx context.Context, session *econtext.SafeSession) (err error) {
264+
func (txc *TxConn) commit2PC(ctx context.Context, session *econtext.SafeSession) (txnType txType, err error) {
228265
// If the number of participants is one or less, then it's a normal commit.
229266
if len(session.ShardSessions) <= 1 {
230267
return txc.commitNormal(ctx, session)
@@ -233,8 +270,14 @@ func (txc *TxConn) commit2PC(ctx context.Context, session *econtext.SafeSession)
233270
mmShard := session.ShardSessions[0]
234271
rmShards := session.ShardSessions[1:]
235272
dtid := dtids.New(mmShard)
273+
if mmShard.RowsAffected {
274+
txnType = TXReadWrite
275+
}
236276
participants := make([]*querypb.Target, len(rmShards))
237277
for i, s := range rmShards {
278+
if s.RowsAffected {
279+
txnType = TXReadWrite
280+
}
238281
participants[i] = s.Target
239282
}
240283

@@ -249,12 +292,12 @@ func (txc *TxConn) commit2PC(ctx context.Context, session *econtext.SafeSession)
249292

250293
txPhase = Commit2pcCreateTransaction
251294
if err = txc.tabletGateway.CreateTransaction(ctx, mmShard.Target, dtid, participants); err != nil {
252-
return err
295+
return txnType, err
253296
}
254297

255298
if DebugTwoPc { // Test code to simulate a failure after RM prepare
256299
if terr := checkTestFailure(ctx, "TRCreated_FailNow", nil); terr != nil {
257-
return terr
300+
return txnType, terr
258301
}
259302
}
260303

@@ -268,24 +311,24 @@ func (txc *TxConn) commit2PC(ctx context.Context, session *econtext.SafeSession)
268311
return txc.tabletGateway.Prepare(ctx, s.Target, s.TransactionId, dtid)
269312
}
270313
if err = txc.runSessions(ctx, rmShards, session.GetLogger(), prepareAction); err != nil {
271-
return err
314+
return txnType, err
272315
}
273316

274317
if DebugTwoPc { // Test code to simulate a failure after RM prepare
275318
if terr := checkTestFailure(ctx, "RMPrepared_FailNow", nil); terr != nil {
276-
return terr
319+
return txnType, terr
277320
}
278321
}
279322

280323
txPhase = Commit2pcStartCommit
281324
startCommitState, err = txc.tabletGateway.StartCommit(ctx, mmShard.Target, mmShard.TransactionId, dtid)
282325
if err != nil {
283-
return err
326+
return txnType, err
284327
}
285328

286329
if DebugTwoPc { // Test code to simulate a failure after MM commit
287330
if terr := checkTestFailure(ctx, "MMCommitted_FailNow", nil); terr != nil {
288-
return terr
331+
return txnType, terr
289332
}
290333
}
291334

@@ -299,15 +342,15 @@ func (txc *TxConn) commit2PC(ctx context.Context, session *econtext.SafeSession)
299342
return txc.tabletGateway.CommitPrepared(ctx, s.Target, dtid)
300343
}
301344
if err = txc.runSessions(ctx, rmShards, session.GetLogger(), prepareCommitAction); err != nil {
302-
return err
345+
return txnType, err
303346
}
304347

305348
// At this point, application can continue forward.
306349
// The transaction is already committed.
307350
// This step is to clean up the transaction metadata.
308351
txPhase = Commit2pcConclude
309352
_ = txc.tabletGateway.ConcludeTransaction(ctx, mmShard.Target, dtid)
310-
return nil
353+
return txnType, nil
311354
}
312355

313356
func (txc *TxConn) errActionAndLogWarn(

go/vt/vtgate/tx_conn_test.go

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1554,6 +1554,69 @@ func TestTxConnAccessModeReset(t *testing.T) {
15541554
}
15551555
}
15561556

1557+
// TestTxConnMetrics tests the `TransactionProcessed` metrics.
1558+
func TestTxConnMetrics(t *testing.T) {
1559+
ctx := utils.LeakCheckContext(t)
1560+
1561+
sc, _, _, rss0, rss1, _ := newTestTxConnEnv(t, ctx, "TestTxConn")
1562+
session := &vtgatepb.Session{}
1563+
1564+
tcases := []struct {
1565+
name string
1566+
queries []*querypb.BoundQuery
1567+
rss []*srvtopo.ResolvedShard
1568+
expMetric string
1569+
expVal int
1570+
}{{
1571+
name: "oneReadQuery",
1572+
queries: []*querypb.BoundQuery{{Sql: "select 1"}},
1573+
rss: rss0,
1574+
expMetric: "Single.ReadOnly",
1575+
expVal: 1,
1576+
}, {
1577+
name: "twoReadQuery",
1578+
queries: []*querypb.BoundQuery{{Sql: "select 2"}, {Sql: "select 3"}},
1579+
rss: append(rss0, rss1...),
1580+
expMetric: "Cross.ReadOnly",
1581+
expVal: 1,
1582+
}, {
1583+
name: "oneWriteQuery",
1584+
queries: []*querypb.BoundQuery{{Sql: "update t set col = 1"}},
1585+
rss: rss0,
1586+
expMetric: "Single.ReadWrite",
1587+
expVal: 1,
1588+
}, {
1589+
name: "twoWriteQuery",
1590+
queries: []*querypb.BoundQuery{{Sql: "update t set col = 2"}, {Sql: "update t set col = 3"}},
1591+
rss: append(rss0, rss1...),
1592+
expMetric: "Cross.ReadWrite",
1593+
expVal: 1,
1594+
}, {
1595+
name: "oneReadOneWriteQuery",
1596+
queries: []*querypb.BoundQuery{{Sql: "select 4"}, {Sql: "update t set col = 4"}},
1597+
rss: append(rss0, rss1...),
1598+
expMetric: "Cross.ReadWrite",
1599+
expVal: 2,
1600+
}}
1601+
1602+
txProcessed.ResetAll()
1603+
for _, tc := range tcases {
1604+
t.Run(tc.name, func(t *testing.T) {
1605+
// begin
1606+
safeSession := econtext.NewAutocommitSession(session)
1607+
err := sc.txConn.Begin(ctx, safeSession, nil)
1608+
require.NoError(t, err)
1609+
_, errors := sc.ExecuteMultiShard(ctx, nil, tc.rss, tc.queries, safeSession, false, false, nullResultsObserver{}, false)
1610+
require.Empty(t, errors)
1611+
require.NoError(t,
1612+
sc.txConn.Commit(ctx, safeSession))
1613+
txCountMap := txProcessed.Counts()
1614+
fmt.Printf("%v", txCountMap)
1615+
assert.EqualValues(t, tc.expVal, txCountMap[tc.expMetric])
1616+
})
1617+
}
1618+
}
1619+
15571620
func newTestTxConnEnv(t *testing.T, ctx context.Context, name string) (sc *ScatterConn, sbc0, sbc1 *sandboxconn.SandboxConn, rss0, rss1, rss01 []*srvtopo.ResolvedShard) {
15581621
t.Helper()
15591622
createSandbox(name)

0 commit comments

Comments
 (0)