Skip to content

Commit ecaf20e

Browse files
Merge pull request #178 from github/mhamza/backport-16577
VReplication: Estimate lag when workflow fully throttled (vitessio#16577)
2 parents 4fe826d + 0a4023d commit ecaf20e

File tree

3 files changed

+89
-47
lines changed

3 files changed

+89
-47
lines changed

go/test/endtoend/vreplication/vreplication_test.go

Lines changed: 52 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ var (
5858
targetKsOpts = make(map[string]string)
5959
httpClient = throttlebase.SetupHTTPClient(time.Second)
6060
sourceThrottlerAppName = throttlerapp.VStreamerName
61-
targetThrottlerAppName = throttlerapp.VReplicationName
61+
targetThrottlerAppName = throttlerapp.VPlayerName
6262
)
6363

6464
const (
@@ -1241,18 +1241,7 @@ func materializeProduct(t *testing.T, useVtctldClient bool) {
12411241
for _, tab := range customerTablets {
12421242
waitForRowCountInTablet(t, tab, keyspace, workflow, 5)
12431243
// Confirm that we updated the stats on the target tablets as expected.
1244-
jsVal, err := getDebugVar(t, tab.Port, []string{"VReplicationThrottledCounts"})
1245-
require.NoError(t, err)
1246-
require.NotEqual(t, "{}", jsVal)
1247-
// The JSON value looks like this: {"cproduct.4.tablet.vstreamer": 2}
1248-
vstreamerThrottledCount := gjson.Get(jsVal, fmt.Sprintf(`%s\.*\.tablet\.vstreamer`, workflow)).Int()
1249-
require.Greater(t, vstreamerThrottledCount, int64(0))
1250-
// We only need to do this stat check once.
1251-
val, err := getDebugVar(t, tab.Port, []string{"VReplicationThrottledCountTotal"})
1252-
require.NoError(t, err)
1253-
throttledCount, err := strconv.ParseInt(val, 10, 64)
1254-
require.NoError(t, err)
1255-
require.GreaterOrEqual(t, throttledCount, vstreamerThrottledCount)
1244+
confirmVReplicationThrottling(t, tab, sourceKs, workflow, sourceThrottlerAppName)
12561245
}
12571246
})
12581247
t.Run("unthrottle-app-product", func(t *testing.T) {
@@ -1287,12 +1276,7 @@ func materializeProduct(t *testing.T, useVtctldClient bool) {
12871276
for _, tab := range customerTablets {
12881277
waitForRowCountInTablet(t, tab, keyspace, workflow, 8)
12891278
// Confirm that we updated the stats on the target tablets as expected.
1290-
jsVal, err := getDebugVar(t, tab.Port, []string{"VReplicationThrottledCounts"})
1291-
require.NoError(t, err)
1292-
require.NotEqual(t, "{}", jsVal)
1293-
// The JSON value now looks like this: {"cproduct.4.tablet.vstreamer": 2, "cproduct.4.tablet.vplayer": 4}
1294-
vplayerThrottledCount := gjson.Get(jsVal, fmt.Sprintf(`%s\.*\.tablet\.vplayer`, workflow)).Int()
1295-
require.Greater(t, vplayerThrottledCount, int64(0))
1279+
confirmVReplicationThrottling(t, tab, sourceKs, workflow, targetThrottlerAppName)
12961280
}
12971281
})
12981282
t.Run("unthrottle-app-customer", func(t *testing.T) {
@@ -1856,3 +1840,52 @@ func waitForInnoDBHistoryLength(t *testing.T, tablet *cluster.VttabletProcess, e
18561840
func releaseInnoDBRowHistory(t *testing.T, dbConn *mysql.Conn) {
18571841
execQuery(t, dbConn, "rollback")
18581842
}
1843+
1844+
// confirmVReplicationThrottling confirms that the throttling related metrics reflect that
1845+
// the workflow is being throttled as expected, via the expected app name, and that this
1846+
// is impacting the lag as expected.
1847+
// The tablet passed should be a target tablet for the given workflow while the keyspace
1848+
// name provided should be the source keyspace as the target tablet stats note the stream's
1849+
// source keyspace and shard.
1850+
func confirmVReplicationThrottling(t *testing.T, tab *cluster.VttabletProcess, keyspace, workflow string, appname throttlerapp.Name) {
1851+
const (
1852+
sleepTime = 5 * time.Second
1853+
zv = int64(0)
1854+
)
1855+
time.Sleep(sleepTime) // To be sure that we accrue some lag
1856+
1857+
jsVal, err := getDebugVar(t, tab.Port, []string{"VReplicationThrottledCounts"})
1858+
require.NoError(t, err)
1859+
require.NotEqual(t, "{}", jsVal)
1860+
// The JSON value looks like this: {"cproduct.4.tablet.vstreamer": 2, "cproduct.4.tablet.vplayer": 4}
1861+
throttledCount := gjson.Get(jsVal, fmt.Sprintf(`%s\.*\.tablet\.%s`, workflow, appname)).Int()
1862+
require.Greater(t, throttledCount, zv, "JSON value: %s", jsVal)
1863+
1864+
val, err := getDebugVar(t, tab.Port, []string{"VReplicationThrottledCountTotal"})
1865+
require.NoError(t, err)
1866+
require.NotEqual(t, "", val)
1867+
throttledCountTotal, err := strconv.ParseInt(val, 10, 64)
1868+
require.NoError(t, err)
1869+
require.GreaterOrEqual(t, throttledCountTotal, throttledCount, "Value: %s", val)
1870+
1871+
// We do not calculate replication lag for the vcopier as it's not replicating
1872+
// events.
1873+
if appname != throttlerapp.VCopierName {
1874+
jsVal, err = getDebugVar(t, tab.Port, []string{"VReplicationLagSeconds"})
1875+
require.NoError(t, err)
1876+
require.NotEqual(t, "{}", jsVal)
1877+
// The JSON value looks like this: {"product.0.cproduct.4": 6}
1878+
vreplLagSeconds := gjson.Get(jsVal, fmt.Sprintf(`%s\.*\.%s\.*`, keyspace, workflow)).Int()
1879+
require.NoError(t, err)
1880+
// Take off 1 second to deal with timing issues in the test.
1881+
minLagSecs := int64(int64(sleepTime.Seconds()) - 1)
1882+
require.GreaterOrEqual(t, vreplLagSeconds, minLagSecs, "JSON value: %s", jsVal)
1883+
1884+
val, err = getDebugVar(t, tab.Port, []string{"VReplicationLagSecondsMax"})
1885+
require.NoError(t, err)
1886+
require.NotEqual(t, "", val)
1887+
vreplLagSecondsMax, err := strconv.ParseInt(val, 10, 64)
1888+
require.NoError(t, err)
1889+
require.GreaterOrEqual(t, vreplLagSecondsMax, vreplLagSeconds, "Value: %s", val)
1890+
}
1891+
}

go/vt/vttablet/tabletmanager/vreplication/vplayer.go

Lines changed: 26 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -454,33 +454,34 @@ func (vp *vplayer) recordHeartbeat() error {
454454
func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog) error {
455455
defer vp.vr.dbClient.Rollback()
456456

457+
estimateLag := func() {
458+
behind := time.Now().UnixNano() - vp.lastTimestampNs - vp.timeOffsetNs
459+
vp.vr.stats.ReplicationLagSeconds.Store(behind / 1e9)
460+
vp.vr.stats.VReplicationLags.Add(strconv.Itoa(int(vp.vr.id)), time.Duration(behind/1e9)*time.Second)
461+
}
462+
457463
// If we're not running, set ReplicationLagSeconds to be very high.
458464
// TODO(sougou): if we also stored the time of the last event, we
459465
// can estimate this value more accurately.
460466
defer vp.vr.stats.ReplicationLagSeconds.Store(math.MaxInt64)
461467
defer vp.vr.stats.VReplicationLags.Add(strconv.Itoa(int(vp.vr.id)), math.MaxInt64)
462-
var sbm int64 = -1
468+
var lagSecs int64
463469
for {
464470
if ctx.Err() != nil {
465471
return ctx.Err()
466472
}
467473
// Check throttler.
468474
if !vp.vr.vre.throttlerClient.ThrottleCheckOKOrWaitAppName(ctx, throttlerapp.Name(vp.throttlerAppName)) {
469475
_ = vp.vr.updateTimeThrottled(throttlerapp.VPlayerName)
476+
estimateLag()
470477
continue
471478
}
472479

473480
items, err := relay.Fetch()
474481
if err != nil {
475482
return err
476483
}
477-
// No events were received. This likely means that there's a network partition.
478-
// So, we should assume we're falling behind.
479-
if len(items) == 0 {
480-
behind := time.Now().UnixNano() - vp.lastTimestampNs - vp.timeOffsetNs
481-
vp.vr.stats.ReplicationLagSeconds.Store(behind / 1e9)
482-
vp.vr.stats.VReplicationLags.Add(strconv.Itoa(int(vp.vr.id)), time.Duration(behind/1e9)*time.Second)
483-
}
484+
484485
// Empty transactions are saved at most once every idleTimeout.
485486
// This covers two situations:
486487
// 1. Fetch was idle for idleTimeout.
@@ -497,12 +498,21 @@ func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog) error {
497498
return nil
498499
}
499500
}
501+
502+
lagSecs = -1
500503
for i, events := range items {
501504
for j, event := range events {
502505
if event.Timestamp != 0 {
503-
vp.lastTimestampNs = event.Timestamp * 1e9
504-
vp.timeOffsetNs = time.Now().UnixNano() - event.CurrentTime
505-
sbm = event.CurrentTime/1e9 - event.Timestamp
506+
// If the event is a heartbeat sent while throttled then do not update
507+
// the lag based on it.
508+
// If the batch consists only of throttled heartbeat events then we cannot
509+
// determine the actual lag, as the vstreamer is fully throttled, and we
510+
// will estimate it after processing the batch.
511+
if !(event.Type == binlogdatapb.VEventType_HEARTBEAT && event.Throttled) {
512+
vp.lastTimestampNs = event.Timestamp * 1e9
513+
vp.timeOffsetNs = time.Now().UnixNano() - event.CurrentTime
514+
lagSecs = event.CurrentTime/1e9 - event.Timestamp
515+
}
506516
}
507517
mustSave := false
508518
switch event.Type {
@@ -533,11 +543,12 @@ func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog) error {
533543
}
534544
}
535545

536-
if sbm >= 0 {
537-
vp.vr.stats.ReplicationLagSeconds.Store(sbm)
538-
vp.vr.stats.VReplicationLags.Add(strconv.Itoa(int(vp.vr.id)), time.Duration(sbm)*time.Second)
546+
if lagSecs >= 0 {
547+
vp.vr.stats.ReplicationLagSeconds.Store(lagSecs)
548+
vp.vr.stats.VReplicationLags.Add(strconv.Itoa(int(vp.vr.id)), time.Duration(lagSecs)*time.Second)
549+
} else { // We couldn't determine the lag, so we need to estimate it
550+
estimateLag()
539551
}
540-
541552
}
542553
}
543554

go/vt/vttablet/tabletserver/vstreamer/vstreamer.go

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ import (
3131
"vitess.io/vitess/go/mysql/collations"
3232
"vitess.io/vitess/go/mysql/replication"
3333
"vitess.io/vitess/go/sqltypes"
34-
"vitess.io/vitess/go/timer"
3534
"vitess.io/vitess/go/vt/binlog"
3635
"vitess.io/vitess/go/vt/dbconfigs"
3736
"vitess.io/vitess/go/vt/log"
@@ -288,11 +287,11 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog
288287
defer hbTimer.Stop()
289288

290289
injectHeartbeat := func(throttled bool) error {
291-
now := time.Now().UnixNano()
292290
select {
293291
case <-ctx.Done():
294292
return vterrors.Errorf(vtrpcpb.Code_CANCELED, "context has expired")
295293
default:
294+
now := time.Now().UnixNano()
296295
err := bufferAndTransmit(&binlogdatapb.VEvent{
297296
Type: binlogdatapb.VEventType_HEARTBEAT,
298297
Timestamp: now / 1e9,
@@ -304,24 +303,22 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog
304303
}
305304

306305
logger := logutil.NewThrottledLogger(vs.vse.GetTabletInfo(), throttledLoggerInterval)
306+
wfNameLog := ""
307+
if vs.filter != nil && vs.filter.WorkflowName != "" {
308+
wfNameLog = fmt.Sprintf(" in workflow %s", vs.filter.WorkflowName)
309+
}
307310
throttleEvents := func(throttledEvents chan mysql.BinlogEvent) {
308-
throttledHeartbeatsRateLimiter := timer.NewRateLimiter(HeartbeatTime)
309-
defer throttledHeartbeatsRateLimiter.Stop()
310311
for {
311-
// check throttler.
312+
// Check throttler.
312313
if !vs.vse.throttlerClient.ThrottleCheckOKOrWaitAppName(ctx, vs.throttlerApp) {
313-
// make sure to leave if context is cancelled
314+
// Make sure to leave if context is cancelled.
314315
select {
315316
case <-ctx.Done():
316317
return
317318
default:
318-
// do nothing special
319+
// Do nothing special.
319320
}
320-
throttledHeartbeatsRateLimiter.Do(func() error {
321-
return injectHeartbeat(true)
322-
})
323-
// we won't process events, until we're no longer throttling
324-
logger.Infof("throttled.")
321+
logger.Infof("vstreamer throttled%s.", wfNameLog)
325322
continue
326323
}
327324
select {
@@ -393,7 +390,8 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog
393390
case <-ctx.Done():
394391
return nil
395392
case <-hbTimer.C:
396-
if err := injectHeartbeat(false); err != nil {
393+
ok := vs.vse.throttlerClient.ThrottleCheckOK(ctx, vs.throttlerApp)
394+
if err := injectHeartbeat(!ok); err != nil {
397395
if err == io.EOF {
398396
return nil
399397
}

0 commit comments

Comments
 (0)