Skip to content

Commit 6bd4970

Browse files
mattlordmhamza15
authored andcommitted
VReplication: Estimate lag when workflow fully throttled (vitessio#16577)
Signed-off-by: Matt Lord <[email protected]>
1 parent 61ef712 commit 6bd4970

File tree

3 files changed

+84
-46
lines changed

3 files changed

+84
-46
lines changed

go/test/endtoend/vreplication/vreplication_test.go

Lines changed: 52 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ var (
5858
targetKsOpts = make(map[string]string)
5959
httpClient = throttlebase.SetupHTTPClient(time.Second)
6060
sourceThrottlerAppName = throttlerapp.VStreamerName
61-
targetThrottlerAppName = throttlerapp.VReplicationName
61+
targetThrottlerAppName = throttlerapp.VPlayerName
6262
)
6363

6464
const (
@@ -1199,18 +1199,7 @@ func materializeProduct(t *testing.T, useVtctldClient bool) {
11991199
for _, tab := range customerTablets {
12001200
waitForRowCountInTablet(t, tab, keyspace, workflow, 5)
12011201
// Confirm that we updated the stats on the target tablets as expected.
1202-
jsVal, err := getDebugVar(t, tab.Port, []string{"VReplicationThrottledCounts"})
1203-
require.NoError(t, err)
1204-
require.NotEqual(t, "{}", jsVal)
1205-
// The JSON value looks like this: {"cproduct.4.tablet.vstreamer": 2}
1206-
vstreamerThrottledCount := gjson.Get(jsVal, fmt.Sprintf(`%s\.*\.tablet\.vstreamer`, workflow)).Int()
1207-
require.Greater(t, vstreamerThrottledCount, int64(0))
1208-
// We only need to do this stat check once.
1209-
val, err := getDebugVar(t, tab.Port, []string{"VReplicationThrottledCountTotal"})
1210-
require.NoError(t, err)
1211-
throttledCount, err := strconv.ParseInt(val, 10, 64)
1212-
require.NoError(t, err)
1213-
require.GreaterOrEqual(t, throttledCount, vstreamerThrottledCount)
1202+
confirmVReplicationThrottling(t, tab, sourceKs, workflow, sourceThrottlerAppName)
12141203
}
12151204
})
12161205
t.Run("unthrottle-app-product", func(t *testing.T) {
@@ -1245,12 +1234,7 @@ func materializeProduct(t *testing.T, useVtctldClient bool) {
12451234
for _, tab := range customerTablets {
12461235
waitForRowCountInTablet(t, tab, keyspace, workflow, 8)
12471236
// Confirm that we updated the stats on the target tablets as expected.
1248-
jsVal, err := getDebugVar(t, tab.Port, []string{"VReplicationThrottledCounts"})
1249-
require.NoError(t, err)
1250-
require.NotEqual(t, "{}", jsVal)
1251-
// The JSON value now looks like this: {"cproduct.4.tablet.vstreamer": 2, "cproduct.4.tablet.vplayer": 4}
1252-
vplayerThrottledCount := gjson.Get(jsVal, fmt.Sprintf(`%s\.*\.tablet\.vplayer`, workflow)).Int()
1253-
require.Greater(t, vplayerThrottledCount, int64(0))
1237+
confirmVReplicationThrottling(t, tab, sourceKs, workflow, targetThrottlerAppName)
12541238
}
12551239
})
12561240
t.Run("unthrottle-app-customer", func(t *testing.T) {
@@ -1806,3 +1790,52 @@ func waitForInnoDBHistoryLength(t *testing.T, tablet *cluster.VttabletProcess, e
18061790
func releaseInnoDBRowHistory(t *testing.T, dbConn *mysql.Conn) {
18071791
execQuery(t, dbConn, "rollback")
18081792
}
1793+
1794+
// confirmVReplicationThrottling confirms that the throttling related metrics reflect that
1795+
// the workflow is being throttled as expected, via the expected app name, and that this
1796+
// is impacting the lag as expected.
1797+
// The tablet passed should be a target tablet for the given workflow while the keyspace
1798+
// name provided should be the source keyspace as the target tablet stats note the stream's
1799+
// source keyspace and shard.
1800+
func confirmVReplicationThrottling(t *testing.T, tab *cluster.VttabletProcess, keyspace, workflow string, appname throttlerapp.Name) {
1801+
const (
1802+
sleepTime = 5 * time.Second
1803+
zv = int64(0)
1804+
)
1805+
time.Sleep(sleepTime) // To be sure that we accrue some lag
1806+
1807+
jsVal, err := getDebugVar(t, tab.Port, []string{"VReplicationThrottledCounts"})
1808+
require.NoError(t, err)
1809+
require.NotEqual(t, "{}", jsVal)
1810+
// The JSON value looks like this: {"cproduct.4.tablet.vstreamer": 2, "cproduct.4.tablet.vplayer": 4}
1811+
throttledCount := gjson.Get(jsVal, fmt.Sprintf(`%s\.*\.tablet\.%s`, workflow, appname)).Int()
1812+
require.Greater(t, throttledCount, zv, "JSON value: %s", jsVal)
1813+
1814+
val, err := getDebugVar(t, tab.Port, []string{"VReplicationThrottledCountTotal"})
1815+
require.NoError(t, err)
1816+
require.NotEqual(t, "", val)
1817+
throttledCountTotal, err := strconv.ParseInt(val, 10, 64)
1818+
require.NoError(t, err)
1819+
require.GreaterOrEqual(t, throttledCountTotal, throttledCount, "Value: %s", val)
1820+
1821+
// We do not calculate replication lag for the vcopier as it's not replicating
1822+
// events.
1823+
if appname != throttlerapp.VCopierName {
1824+
jsVal, err = getDebugVar(t, tab.Port, []string{"VReplicationLagSeconds"})
1825+
require.NoError(t, err)
1826+
require.NotEqual(t, "{}", jsVal)
1827+
// The JSON value looks like this: {"product.0.cproduct.4": 6}
1828+
vreplLagSeconds := gjson.Get(jsVal, fmt.Sprintf(`%s\.*\.%s\.*`, keyspace, workflow)).Int()
1829+
require.NoError(t, err)
1830+
// Take off 1 second to deal with timing issues in the test.
1831+
minLagSecs := int64(int64(sleepTime.Seconds()) - 1)
1832+
require.GreaterOrEqual(t, vreplLagSeconds, minLagSecs, "JSON value: %s", jsVal)
1833+
1834+
val, err = getDebugVar(t, tab.Port, []string{"VReplicationLagSecondsMax"})
1835+
require.NoError(t, err)
1836+
require.NotEqual(t, "", val)
1837+
vreplLagSecondsMax, err := strconv.ParseInt(val, 10, 64)
1838+
require.NoError(t, err)
1839+
require.GreaterOrEqual(t, vreplLagSecondsMax, vreplLagSeconds, "Value: %s", val)
1840+
}
1841+
}

go/vt/vttablet/tabletmanager/vreplication/vplayer.go

Lines changed: 26 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -453,33 +453,34 @@ func (vp *vplayer) recordHeartbeat() error {
453453
func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog) error {
454454
defer vp.vr.dbClient.Rollback()
455455

456+
estimateLag := func() {
457+
behind := time.Now().UnixNano() - vp.lastTimestampNs - vp.timeOffsetNs
458+
vp.vr.stats.ReplicationLagSeconds.Store(behind / 1e9)
459+
vp.vr.stats.VReplicationLags.Add(strconv.Itoa(int(vp.vr.id)), time.Duration(behind/1e9)*time.Second)
460+
}
461+
456462
// If we're not running, set ReplicationLagSeconds to be very high.
457463
// TODO(sougou): if we also stored the time of the last event, we
458464
// can estimate this value more accurately.
459465
defer vp.vr.stats.ReplicationLagSeconds.Store(math.MaxInt64)
460466
defer vp.vr.stats.VReplicationLags.Add(strconv.Itoa(int(vp.vr.id)), math.MaxInt64)
461-
var sbm int64 = -1
467+
var lagSecs int64
462468
for {
463469
if ctx.Err() != nil {
464470
return ctx.Err()
465471
}
466472
// Check throttler.
467473
if !vp.vr.vre.throttlerClient.ThrottleCheckOKOrWaitAppName(ctx, throttlerapp.Name(vp.throttlerAppName)) {
468474
_ = vp.vr.updateTimeThrottled(throttlerapp.VPlayerName)
475+
estimateLag()
469476
continue
470477
}
471478

472479
items, err := relay.Fetch()
473480
if err != nil {
474481
return err
475482
}
476-
// No events were received. This likely means that there's a network partition.
477-
// So, we should assume we're falling behind.
478-
if len(items) == 0 {
479-
behind := time.Now().UnixNano() - vp.lastTimestampNs - vp.timeOffsetNs
480-
vp.vr.stats.ReplicationLagSeconds.Store(behind / 1e9)
481-
vp.vr.stats.VReplicationLags.Add(strconv.Itoa(int(vp.vr.id)), time.Duration(behind/1e9)*time.Second)
482-
}
483+
483484
// Empty transactions are saved at most once every idleTimeout.
484485
// This covers two situations:
485486
// 1. Fetch was idle for idleTimeout.
@@ -496,12 +497,21 @@ func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog) error {
496497
return nil
497498
}
498499
}
500+
501+
lagSecs = -1
499502
for i, events := range items {
500503
for j, event := range events {
501504
if event.Timestamp != 0 {
502-
vp.lastTimestampNs = event.Timestamp * 1e9
503-
vp.timeOffsetNs = time.Now().UnixNano() - event.CurrentTime
504-
sbm = event.CurrentTime/1e9 - event.Timestamp
505+
// If the event is a heartbeat sent while throttled then do not update
506+
// the lag based on it.
507+
// If the batch consists only of throttled heartbeat events then we cannot
508+
// determine the actual lag, as the vstreamer is fully throttled, and we
509+
// will estimate it after processing the batch.
510+
if !(event.Type == binlogdatapb.VEventType_HEARTBEAT && event.Throttled) {
511+
vp.lastTimestampNs = event.Timestamp * 1e9
512+
vp.timeOffsetNs = time.Now().UnixNano() - event.CurrentTime
513+
lagSecs = event.CurrentTime/1e9 - event.Timestamp
514+
}
505515
}
506516
mustSave := false
507517
switch event.Type {
@@ -532,11 +542,12 @@ func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog) error {
532542
}
533543
}
534544

535-
if sbm >= 0 {
536-
vp.vr.stats.ReplicationLagSeconds.Store(sbm)
537-
vp.vr.stats.VReplicationLags.Add(strconv.Itoa(int(vp.vr.id)), time.Duration(sbm)*time.Second)
545+
if lagSecs >= 0 {
546+
vp.vr.stats.ReplicationLagSeconds.Store(lagSecs)
547+
vp.vr.stats.VReplicationLags.Add(strconv.Itoa(int(vp.vr.id)), time.Duration(lagSecs)*time.Second)
548+
} else { // We couldn't determine the lag, so we need to estimate it
549+
estimateLag()
538550
}
539-
540551
}
541552
}
542553

go/vt/vttablet/tabletserver/vstreamer/vstreamer.go

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ import (
3030
"vitess.io/vitess/go/mysql/collations"
3131
"vitess.io/vitess/go/mysql/replication"
3232
"vitess.io/vitess/go/sqltypes"
33-
"vitess.io/vitess/go/timer"
3433
"vitess.io/vitess/go/vt/binlog"
3534
"vitess.io/vitess/go/vt/dbconfigs"
3635
"vitess.io/vitess/go/vt/log"
@@ -283,11 +282,11 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog
283282
defer hbTimer.Stop()
284283

285284
injectHeartbeat := func(throttled bool) error {
286-
now := time.Now().UnixNano()
287285
select {
288286
case <-ctx.Done():
289287
return vterrors.Errorf(vtrpcpb.Code_CANCELED, "context has expired")
290288
default:
289+
now := time.Now().UnixNano()
291290
err := bufferAndTransmit(&binlogdatapb.VEvent{
292291
Type: binlogdatapb.VEventType_HEARTBEAT,
293292
Timestamp: now / 1e9,
@@ -299,22 +298,16 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog
299298
}
300299

301300
throttleEvents := func(throttledEvents chan mysql.BinlogEvent) {
302-
throttledHeartbeatsRateLimiter := timer.NewRateLimiter(HeartbeatTime)
303-
defer throttledHeartbeatsRateLimiter.Stop()
304301
for {
305-
// check throttler.
302+
// Check throttler.
306303
if !vs.vse.throttlerClient.ThrottleCheckOKOrWaitAppName(ctx, vs.throttlerApp) {
307-
// make sure to leave if context is cancelled
304+
// Make sure to leave if context is cancelled.
308305
select {
309306
case <-ctx.Done():
310307
return
311308
default:
312-
// do nothing special
309+
// Do nothing special.
313310
}
314-
throttledHeartbeatsRateLimiter.Do(func() error {
315-
return injectHeartbeat(true)
316-
})
317-
// we won't process events, until we're no longer throttling
318311
continue
319312
}
320313
select {
@@ -386,7 +379,8 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog
386379
case <-ctx.Done():
387380
return nil
388381
case <-hbTimer.C:
389-
if err := injectHeartbeat(false); err != nil {
382+
ok := vs.vse.throttlerClient.ThrottleCheckOK(ctx, vs.throttlerApp)
383+
if err := injectHeartbeat(!ok); err != nil {
390384
if err == io.EOF {
391385
return nil
392386
}

0 commit comments

Comments
 (0)