|
58 | 58 | targetKsOpts = make(map[string]string)
|
59 | 59 | httpClient = throttlebase.SetupHTTPClient(time.Second)
|
60 | 60 | sourceThrottlerAppName = throttlerapp.VStreamerName
|
61 |
| - targetThrottlerAppName = throttlerapp.VReplicationName |
| 61 | + targetThrottlerAppName = throttlerapp.VPlayerName |
62 | 62 | )
|
63 | 63 |
|
64 | 64 | const (
|
@@ -1241,18 +1241,7 @@ func materializeProduct(t *testing.T, useVtctldClient bool) {
|
1241 | 1241 | for _, tab := range customerTablets {
|
1242 | 1242 | waitForRowCountInTablet(t, tab, keyspace, workflow, 5)
|
1243 | 1243 | // Confirm that we updated the stats on the target tablets as expected.
|
1244 |
| - jsVal, err := getDebugVar(t, tab.Port, []string{"VReplicationThrottledCounts"}) |
1245 |
| - require.NoError(t, err) |
1246 |
| - require.NotEqual(t, "{}", jsVal) |
1247 |
| - // The JSON value looks like this: {"cproduct.4.tablet.vstreamer": 2} |
1248 |
| - vstreamerThrottledCount := gjson.Get(jsVal, fmt.Sprintf(`%s\.*\.tablet\.vstreamer`, workflow)).Int() |
1249 |
| - require.Greater(t, vstreamerThrottledCount, int64(0)) |
1250 |
| - // We only need to do this stat check once. |
1251 |
| - val, err := getDebugVar(t, tab.Port, []string{"VReplicationThrottledCountTotal"}) |
1252 |
| - require.NoError(t, err) |
1253 |
| - throttledCount, err := strconv.ParseInt(val, 10, 64) |
1254 |
| - require.NoError(t, err) |
1255 |
| - require.GreaterOrEqual(t, throttledCount, vstreamerThrottledCount) |
| 1244 | + confirmVReplicationThrottling(t, tab, sourceKs, workflow, sourceThrottlerAppName) |
1256 | 1245 | }
|
1257 | 1246 | })
|
1258 | 1247 | t.Run("unthrottle-app-product", func(t *testing.T) {
|
@@ -1287,12 +1276,7 @@ func materializeProduct(t *testing.T, useVtctldClient bool) {
|
1287 | 1276 | for _, tab := range customerTablets {
|
1288 | 1277 | waitForRowCountInTablet(t, tab, keyspace, workflow, 8)
|
1289 | 1278 | // Confirm that we updated the stats on the target tablets as expected.
|
1290 |
| - jsVal, err := getDebugVar(t, tab.Port, []string{"VReplicationThrottledCounts"}) |
1291 |
| - require.NoError(t, err) |
1292 |
| - require.NotEqual(t, "{}", jsVal) |
1293 |
| - // The JSON value now looks like this: {"cproduct.4.tablet.vstreamer": 2, "cproduct.4.tablet.vplayer": 4} |
1294 |
| - vplayerThrottledCount := gjson.Get(jsVal, fmt.Sprintf(`%s\.*\.tablet\.vplayer`, workflow)).Int() |
1295 |
| - require.Greater(t, vplayerThrottledCount, int64(0)) |
| 1279 | + confirmVReplicationThrottling(t, tab, sourceKs, workflow, targetThrottlerAppName) |
1296 | 1280 | }
|
1297 | 1281 | })
|
1298 | 1282 | t.Run("unthrottle-app-customer", func(t *testing.T) {
|
@@ -1856,3 +1840,52 @@ func waitForInnoDBHistoryLength(t *testing.T, tablet *cluster.VttabletProcess, e
|
1856 | 1840 | func releaseInnoDBRowHistory(t *testing.T, dbConn *mysql.Conn) {
|
1857 | 1841 | execQuery(t, dbConn, "rollback")
|
1858 | 1842 | }
|
| 1843 | + |
| 1844 | +// confirmVReplicationThrottling confirms that the throttling related metrics reflect that |
| 1845 | +// the workflow is being throttled as expected, via the expected app name, and that this |
| 1846 | +// is impacting the lag as expected. |
| 1847 | +// The tablet passed should be a target tablet for the given workflow while the keyspace |
| 1848 | +// name provided should be the source keyspace as the target tablet stats note the stream's |
| 1849 | +// source keyspace and shard. |
| 1850 | +func confirmVReplicationThrottling(t *testing.T, tab *cluster.VttabletProcess, keyspace, workflow string, appname throttlerapp.Name) { |
| 1851 | + const ( |
| 1852 | + sleepTime = 5 * time.Second |
| 1853 | + zv = int64(0) |
| 1854 | + ) |
| 1855 | + time.Sleep(sleepTime) // To be sure that we accrue some lag |
| 1856 | + |
| 1857 | + jsVal, err := getDebugVar(t, tab.Port, []string{"VReplicationThrottledCounts"}) |
| 1858 | + require.NoError(t, err) |
| 1859 | + require.NotEqual(t, "{}", jsVal) |
| 1860 | + // The JSON value looks like this: {"cproduct.4.tablet.vstreamer": 2, "cproduct.4.tablet.vplayer": 4} |
| 1861 | + throttledCount := gjson.Get(jsVal, fmt.Sprintf(`%s\.*\.tablet\.%s`, workflow, appname)).Int() |
| 1862 | + require.Greater(t, throttledCount, zv, "JSON value: %s", jsVal) |
| 1863 | + |
| 1864 | + val, err := getDebugVar(t, tab.Port, []string{"VReplicationThrottledCountTotal"}) |
| 1865 | + require.NoError(t, err) |
| 1866 | + require.NotEqual(t, "", val) |
| 1867 | + throttledCountTotal, err := strconv.ParseInt(val, 10, 64) |
| 1868 | + require.NoError(t, err) |
| 1869 | + require.GreaterOrEqual(t, throttledCountTotal, throttledCount, "Value: %s", val) |
| 1870 | + |
| 1871 | + // We do not calculate replication lag for the vcopier as it's not replicating |
| 1872 | + // events. |
| 1873 | + if appname != throttlerapp.VCopierName { |
| 1874 | + jsVal, err = getDebugVar(t, tab.Port, []string{"VReplicationLagSeconds"}) |
| 1875 | + require.NoError(t, err) |
| 1876 | + require.NotEqual(t, "{}", jsVal) |
| 1877 | + // The JSON value looks like this: {"product.0.cproduct.4": 6} |
| 1878 | + vreplLagSeconds := gjson.Get(jsVal, fmt.Sprintf(`%s\.*\.%s\.*`, keyspace, workflow)).Int() |
| 1879 | + require.NoError(t, err) |
| 1880 | + // Take off 1 second to deal with timing issues in the test. |
| 1881 | + minLagSecs := int64(int64(sleepTime.Seconds()) - 1) |
| 1882 | + require.GreaterOrEqual(t, vreplLagSeconds, minLagSecs, "JSON value: %s", jsVal) |
| 1883 | + |
| 1884 | + val, err = getDebugVar(t, tab.Port, []string{"VReplicationLagSecondsMax"}) |
| 1885 | + require.NoError(t, err) |
| 1886 | + require.NotEqual(t, "", val) |
| 1887 | + vreplLagSecondsMax, err := strconv.ParseInt(val, 10, 64) |
| 1888 | + require.NoError(t, err) |
| 1889 | + require.GreaterOrEqual(t, vreplLagSecondsMax, vreplLagSeconds, "Value: %s", val) |
| 1890 | + } |
| 1891 | +} |
0 commit comments