Skip to content

Commit 61ef712

Browse files
mattlordmhamza15
authored andcommitted
VReplication: Add throttler stats (vitessio#15221)
Signed-off-by: Matt Lord <[email protected]>
1 parent cf6878d commit 61ef712

File tree

10 files changed

+108
-20
lines changed

10 files changed

+108
-20
lines changed

go/test/endtoend/vreplication/fk_config_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,15 +67,15 @@ insert into t2 values(1, 1, 't21'), (2, 1, 't22'), (3, 2, 't23');
6767
}
6868
]
6969
},
70-
"t1": {
70+
"t1": {
7171
"column_vindexes": [
7272
{
7373
"column": "id",
7474
"name": "reverse_bits"
7575
}
7676
]
7777
},
78-
"t2": {
78+
"t2": {
7979
"column_vindexes": [
8080
{
8181
"column": "t1id",

go/test/endtoend/vreplication/fk_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -285,10 +285,10 @@ func (ls *fkLoadSimulator) exec(query string) *sqltypes.Result {
285285
// constraints, where the parent table is lexicographically sorted before the child table and
286286
// thus may be dropped first, can be successfully cancelled.
287287
func testFKCancel(t *testing.T, vc *VitessCluster) {
288-
var targetKeyspace = "fktarget"
289-
var sourceKeyspace = "fksource"
290-
var workflowName = "wf2"
291-
var ksWorkflow = fmt.Sprintf("%s.%s", targetKeyspace, workflowName)
288+
targetKeyspace := "fktarget"
289+
sourceKeyspace := "fksource"
290+
workflowName := "wf2"
291+
ksWorkflow := fmt.Sprintf("%s.%s", targetKeyspace, workflowName)
292292
mt := newMoveTables(vc, &moveTablesWorkflow{
293293
workflowInfo: &workflowInfo{
294294
vc: vc,

go/test/endtoend/vreplication/vreplication_test.go

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"io"
2323
"net/http"
2424
"runtime"
25+
"strconv"
2526
"strings"
2627
"sync"
2728
"testing"
@@ -1167,7 +1168,7 @@ func materialize(t *testing.T, spec string, useVtctldClient bool) {
11671168

11681169
func materializeProduct(t *testing.T, useVtctldClient bool) {
11691170
t.Run("materializeProduct", func(t *testing.T) {
1170-
// materializing from "product" keyspace to "customer" keyspace
1171+
// Materializing from "product" keyspace to "customer" keyspace.
11711172
workflow := "cproduct"
11721173
keyspace := "customer"
11731174
defaultCell := vc.Cells[vc.CellNames[0]]
@@ -1181,7 +1182,7 @@ func materializeProduct(t *testing.T, useVtctldClient bool) {
11811182

11821183
productTablets := vc.getVttabletsInKeyspace(t, defaultCell, "product", "primary")
11831184
t.Run("throttle-app-product", func(t *testing.T) {
1184-
// Now, throttle the streamer on source tablets, insert some rows
1185+
// Now, throttle the source side component (vstreamer), and insert some rows.
11851186
for _, tab := range productTablets {
11861187
body, err := throttleApp(tab, sourceThrottlerAppName)
11871188
assert.NoError(t, err)
@@ -1192,19 +1193,33 @@ func materializeProduct(t *testing.T, useVtctldClient bool) {
11921193
waitForTabletThrottlingStatus(t, tab, targetThrottlerAppName, throttlerStatusNotThrottled)
11931194
}
11941195
insertMoreProductsForSourceThrottler(t)
1195-
// To be fair to the test, we give the target time to apply the new changes. We expect it to NOT get them in the first place,
1196-
// we expect the additional rows to **not appear** in the materialized view
1196+
// To be fair to the test, we give the target time to apply the new changes. We
1197+
// expect it to NOT get them in the first place, we expect the additional rows
1198+
// to **not appear** in the materialized view.
11971199
for _, tab := range customerTablets {
11981200
waitForRowCountInTablet(t, tab, keyspace, workflow, 5)
1201+
// Confirm that we updated the stats on the target tablets as expected.
1202+
jsVal, err := getDebugVar(t, tab.Port, []string{"VReplicationThrottledCounts"})
1203+
require.NoError(t, err)
1204+
require.NotEqual(t, "{}", jsVal)
1205+
// The JSON value looks like this: {"cproduct.4.tablet.vstreamer": 2}
1206+
vstreamerThrottledCount := gjson.Get(jsVal, fmt.Sprintf(`%s\.*\.tablet\.vstreamer`, workflow)).Int()
1207+
require.Greater(t, vstreamerThrottledCount, int64(0))
1208+
// We only need to do this stat check once.
1209+
val, err := getDebugVar(t, tab.Port, []string{"VReplicationThrottledCountTotal"})
1210+
require.NoError(t, err)
1211+
throttledCount, err := strconv.ParseInt(val, 10, 64)
1212+
require.NoError(t, err)
1213+
require.GreaterOrEqual(t, throttledCount, vstreamerThrottledCount)
11991214
}
12001215
})
12011216
t.Run("unthrottle-app-product", func(t *testing.T) {
1202-
// unthrottle on source tablets, and expect the rows to show up
1217+
// Unthrottle the vstreamer component, and expect the rows to show up.
12031218
for _, tab := range productTablets {
12041219
body, err := unthrottleApp(tab, sourceThrottlerAppName)
12051220
assert.NoError(t, err)
12061221
assert.Contains(t, body, sourceThrottlerAppName)
1207-
// give time for unthrottling to take effect and for target to fetch data
1222+
// Give time for unthrottling to take effect and for targets to fetch data.
12081223
waitForTabletThrottlingStatus(t, tab, sourceThrottlerAppName, throttlerStatusNotThrottled)
12091224
}
12101225
for _, tab := range customerTablets {
@@ -1213,8 +1228,8 @@ func materializeProduct(t *testing.T, useVtctldClient bool) {
12131228
})
12141229

12151230
t.Run("throttle-app-customer", func(t *testing.T) {
1216-
// Now, throttle vreplication (vcopier/vapplier) on target tablets, and
1217-
// insert some more rows.
1231+
// Now, throttle vreplication on the target side (vplayer), and insert some
1232+
// more rows.
12181233
for _, tab := range customerTablets {
12191234
body, err := throttleApp(tab, targetThrottlerAppName)
12201235
assert.NoError(t, err)
@@ -1229,6 +1244,13 @@ func materializeProduct(t *testing.T, useVtctldClient bool) {
12291244
// rows to **not appear** in the materialized view.
12301245
for _, tab := range customerTablets {
12311246
waitForRowCountInTablet(t, tab, keyspace, workflow, 8)
1247+
// Confirm that we updated the stats on the target tablets as expected.
1248+
jsVal, err := getDebugVar(t, tab.Port, []string{"VReplicationThrottledCounts"})
1249+
require.NoError(t, err)
1250+
require.NotEqual(t, "{}", jsVal)
1251+
// The JSON value now looks like this: {"cproduct.4.tablet.vstreamer": 2, "cproduct.4.tablet.vplayer": 4}
1252+
vplayerThrottledCount := gjson.Get(jsVal, fmt.Sprintf(`%s\.*\.tablet\.vplayer`, workflow)).Int()
1253+
require.Greater(t, vplayerThrottledCount, int64(0))
12321254
}
12331255
})
12341256
t.Run("unthrottle-app-customer", func(t *testing.T) {

go/vt/binlog/binlogplayer/binlog_player.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,8 @@ type Stats struct {
106106

107107
PartialQueryCount *stats.CountersWithMultiLabels
108108
PartialQueryCacheSize *stats.CountersWithMultiLabels
109+
110+
ThrottledCounts *stats.CountersWithMultiLabels // By throttler and component
109111
}
110112

111113
// RecordHeartbeat updates the time the last heartbeat from vstreamer was seen
@@ -175,6 +177,7 @@ func NewStats() *Stats {
175177
bps.TableCopyTimings = stats.NewTimings("", "", "Table")
176178
bps.PartialQueryCacheSize = stats.NewCountersWithMultiLabels("", "", []string{"type"})
177179
bps.PartialQueryCount = stats.NewCountersWithMultiLabels("", "", []string{"type"})
180+
bps.ThrottledCounts = stats.NewCountersWithMultiLabels("", "", []string{"throttler", "component"})
178181
return bps
179182
}
180183

@@ -370,13 +373,14 @@ func (blp *BinlogPlayer) applyEvents(ctx context.Context) error {
370373
if backoff == throttler.NotThrottled {
371374
break
372375
}
376+
blp.blplStats.ThrottledCounts.Add([]string{"trx", "binlogplayer"}, 1)
373377
// We don't bother checking for context cancellation here because the
374378
// sleep will block only up to 1 second. (Usually, backoff is 1s / rate
375379
// e.g. a rate of 1000 TPS results into a backoff of 1 ms.)
376380
time.Sleep(backoff)
377381
}
378382

379-
// get the response
383+
// Get the response.
380384
response, err := stream.Recv()
381385
// Check context before checking error, because canceled
382386
// contexts could be wrapped as regular errors.

go/vt/vtgate/sandbox_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -281,7 +281,10 @@ func (sct *sandboxTopo) WatchSrvVSchema(ctx context.Context, cell string, callba
281281
}
282282

283283
sct.topoServer.UpdateSrvVSchema(ctx, cell, srvVSchema)
284-
current, updateChan, _ := sct.topoServer.WatchSrvVSchema(ctx, cell)
284+
current, updateChan, err := sct.topoServer.WatchSrvVSchema(ctx, cell)
285+
if err != nil {
286+
panic(fmt.Sprintf("sandboxTopo WatchSrvVSchema returned an error: %v", err))
287+
}
285288
if !callback(current.Value, nil) {
286289
panic("sandboxTopo callback returned false")
287290
}

go/vt/vttablet/tabletmanager/vreplication/stats.go

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,10 +59,12 @@ type vrStats struct {
5959
mu sync.Mutex
6060
isOpen bool
6161
controllers map[int32]*controller
62+
63+
ThrottledCount *stats.Counter
6264
}
6365

6466
func (st *vrStats) register() {
65-
67+
st.ThrottledCount = stats.NewCounter("", "")
6668
stats.NewGaugeFunc("VReplicationStreamCount", "Number of vreplication streams", st.numControllers)
6769
stats.NewGaugeFunc("VReplicationLagSecondsMax", "Max vreplication seconds behind primary", st.maxReplicationLagSeconds)
6870
stats.NewStringMapFuncWithMultiLabels(
@@ -502,6 +504,29 @@ func (st *vrStats) register() {
502504
return result
503505
})
504506

507+
stats.NewCounterFunc(
508+
"VReplicationThrottledCountTotal",
509+
"The total number of times that vreplication has been throttled",
510+
func() int64 {
511+
st.mu.Lock()
512+
defer st.mu.Unlock()
513+
return st.ThrottledCount.Get()
514+
})
515+
stats.NewCountersFuncWithMultiLabels(
516+
"VReplicationThrottledCounts",
517+
"The number of times vreplication was throttled by workflow, id, throttler (trx or tablet), and the sub-component that was throttled",
518+
[]string{"workflow", "id", "throttler", "component"},
519+
func() map[string]int64 {
520+
st.mu.Lock()
521+
defer st.mu.Unlock()
522+
result := make(map[string]int64)
523+
for _, ct := range st.controllers {
524+
for key, val := range ct.blpStats.ThrottledCounts.Counts() {
525+
result[fmt.Sprintf("%s.%d.%s", ct.workflow, ct.id, key)] = val
526+
}
527+
}
528+
return result
529+
})
505530
}
506531

507532
func (st *vrStats) numControllers() int64 {

go/vt/vttablet/tabletmanager/vreplication/stats_test.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"github.com/stretchr/testify/require"
2727

2828
"vitess.io/vitess/go/mysql/replication"
29+
"vitess.io/vitess/go/stats"
2930

3031
"vitess.io/vitess/go/vt/binlog/binlogplayer"
3132
"vitess.io/vitess/go/vt/proto/binlogdata"
@@ -132,7 +133,9 @@ func TestStatusHtml(t *testing.T) {
132133
func TestVReplicationStats(t *testing.T) {
133134
blpStats := binlogplayer.NewStats()
134135
defer blpStats.Stop()
135-
testStats := &vrStats{}
136+
testStats := &vrStats{
137+
ThrottledCount: stats.NewCounter("", ""),
138+
}
136139
testStats.isOpen = true
137140
testStats.controllers = map[int32]*controller{
138141
1: {
@@ -184,6 +187,14 @@ func TestVReplicationStats(t *testing.T) {
184187
require.Equal(t, int64(100), testStats.status().Controllers[0].CopyLoopCount)
185188
require.Equal(t, int64(200), testStats.status().Controllers[0].CopyRowCount)
186189

190+
testStats.ThrottledCount.Add(99)
191+
require.Equal(t, int64(99), testStats.ThrottledCount.Get())
192+
193+
blpStats.ThrottledCounts.Add([]string{"tablet", "vcopier"}, 10)
194+
blpStats.ThrottledCounts.Add([]string{"tablet", "vplayer"}, 80)
195+
require.Equal(t, int64(10), testStats.controllers[1].blpStats.ThrottledCounts.Counts()["tablet.vcopier"])
196+
require.Equal(t, int64(80), testStats.controllers[1].blpStats.ThrottledCounts.Counts()["tablet.vplayer"])
197+
187198
var tm int64 = 1234567890
188199
blpStats.RecordHeartbeat(tm)
189200
require.Equal(t, tm, blpStats.Heartbeat())

go/vt/vttablet/tabletmanager/vreplication/vreplicator.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -578,10 +578,21 @@ func (vr *vreplicator) throttlerAppName() string {
578578
return throttlerapp.Concatenate(names...)
579579
}
580580

581+
// updateTimeThrottled updates the time_throttled field in the _vt.vreplication record
582+
// with a rate limit so that it's only saved in the database at most once per
583+
// throttleUpdatesRateLimiter.tickerTime.
584+
// It also increments the throttled count in the stats to keep track of how many
585+
// times a VReplication workflow, and the specific sub-component, is throttled by the
586+
// tablet throttler over time. It also increments the global throttled count to keep
587+
// track of how many times in total vreplication has been throttled across all workflows
588+
// (both ones that currently exist and ones that no longer do).
581589
func (vr *vreplicator) updateTimeThrottled(appThrottled throttlerapp.Name) error {
590+
appName := appThrottled.String()
591+
vr.stats.ThrottledCounts.Add([]string{"tablet", appName}, 1)
592+
globalStats.ThrottledCount.Add(1)
582593
err := vr.throttleUpdatesRateLimiter.Do(func() error {
583594
tm := time.Now().Unix()
584-
update, err := binlogplayer.GenerateUpdateTimeThrottled(vr.id, tm, appThrottled.String())
595+
update, err := binlogplayer.GenerateUpdateTimeThrottled(vr.id, tm, appName)
585596
if err != nil {
586597
return err
587598
}

go/vt/vttablet/tabletserver/vstreamer/helper_event_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ const (
6161
)
6262

6363
func getDefaultCollationID() int64 {
64-
return 45
64+
return 45 // utf8mb4_general_ci
6565
}
6666

6767
var (

go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,12 @@ func TestNoBlob(t *testing.T) {
125125
env = nil
126126
newEngine(t, ctx, "noblob")
127127
defer func() {
128+
if engine != nil {
129+
engine.Close()
130+
}
131+
if env != nil {
132+
env.Close()
133+
}
128134
engine = oldEngine
129135
env = oldEnv
130136
}()
@@ -1890,6 +1896,12 @@ func TestMinimalMode(t *testing.T) {
18901896
env = nil
18911897
newEngine(t, ctx, "minimal")
18921898
defer func() {
1899+
if engine != nil {
1900+
engine.Close()
1901+
}
1902+
if env != nil {
1903+
env.Close()
1904+
}
18931905
engine = oldEngine
18941906
env = oldEnv
18951907
}()

0 commit comments

Comments
 (0)