Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 42 additions & 12 deletions br/pkg/utils/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,38 +222,47 @@ func (tr *taskRegister) keepaliveLoop(ctx context.Context, ch <-chan *clientv3.L
for {
timeGap := time.Since(lastUpdateTime)
if tr.ttl-timeGap <= timeLeftThreshold {
lease, err := tr.grant(ctx)
var (
lease *clientv3.LeaseGrantResponse
grantErr error
)
failpoint.Inject("brie-task-register-failed-to-grant", func(_ failpoint.Value) {
err = errors.New("failpoint-error")
grantErr = errors.New("failpoint-error")
})
if err != nil {
if grantErr == nil {
lease, grantErr = tr.grant(ctx)
}
if grantErr != nil {
select {
case <-ctx.Done():
return
default:
}
log.Warn("failed to grant lease", zap.Error(err))
time.Sleep(RegisterRetryInternal)
log.Warn("failed to grant lease", zap.Error(grantErr))
tr.sleepRetryInterval(RegisterRetryInternal)
continue
}
tr.curLeaseID = lease.ID
lastUpdateTime = time.Now()
needReputKV = true
}
if needReputKV {
// if the lease has expired, need to put the key again
_, err := tr.client.KV.Put(ctx, tr.key, "", clientv3.WithLease(tr.curLeaseID))
var reputErr error
failpoint.Inject("brie-task-register-failed-to-reput", func(_ failpoint.Value) {
err = errors.New("failpoint-error")
reputErr = errors.New("failpoint-error")
})
if err != nil {
if reputErr == nil {
// If the lease has expired, need to put the key again.
_, reputErr = tr.client.KV.Put(ctx, tr.key, "", clientv3.WithLease(tr.curLeaseID))
}
if reputErr != nil {
select {
case <-ctx.Done():
return
default:
}
log.Warn("failed to put new kv", zap.Error(err))
time.Sleep(RegisterRetryInternal)
log.Warn("failed to put new kv", zap.Error(reputErr))
tr.sleepRetryInterval(RegisterRetryInternal)
continue
}
needReputKV = false
Expand All @@ -267,7 +276,7 @@ func (tr *taskRegister) keepaliveLoop(ctx context.Context, ch <-chan *clientv3.L
default:
}
log.Warn("failed to create new kv", zap.Error(err))
time.Sleep(RegisterRetryInternal)
tr.sleepRetryInterval(RegisterRetryInternal)
continue
}

Expand All @@ -276,6 +285,27 @@ func (tr *taskRegister) keepaliveLoop(ctx context.Context, ch <-chan *clientv3.L
}
}

func (tr *taskRegister) sleepRetryInterval(defaultInterval time.Duration) {
interval := defaultInterval
failpoint.Inject("brie-task-register-retry-interval", func(val failpoint.Value) {
switch v := val.(type) {
case int:
if v > 0 {
interval = time.Duration(v) * time.Millisecond
}
case int64:
if v > 0 {
interval = time.Duration(v) * time.Millisecond
}
case float64:
if v > 0 {
interval = time.Duration(v) * time.Millisecond
}
}
})
time.Sleep(interval)
}

// RegisterTask saves the task's information
type RegisterTask struct {
Key string
Expand Down
66 changes: 44 additions & 22 deletions br/pkg/utils/register_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,37 +91,48 @@ func TestTaskRegisterFailedGrant(t *testing.T) {
client := testEtcdCluster.RandClient()

register := NewTaskRegisterWithTTL(client, 3*time.Second, RegisterRestore, "test")
defer func() {
err := register.Close(ctx)
// for flaky test, the lease would expire
if err != nil && !strings.Contains(err.Error(), "requested lease not found") {
require.NoError(t, err)
}
}()
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/br/pkg/utils/brie-task-register-failed-to-grant", "return(true)"))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/br/pkg/utils/brie-task-register-always-grant", "return(true)"))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/br/pkg/utils/brie-task-register-keepalive-stop", "return(true)"))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/br/pkg/utils/brie-task-register-retry-interval", "return(200)"))
defer func() {
failpoint.Disable("github.com/pingcap/tidb/br/pkg/utils/brie-task-register-failed-to-grant")
failpoint.Disable("github.com/pingcap/tidb/br/pkg/utils/brie-task-register-always-grant")
failpoint.Disable("github.com/pingcap/tidb/br/pkg/utils/brie-task-register-keepalive-stop")
failpoint.Disable("github.com/pingcap/tidb/br/pkg/utils/brie-task-register-retry-interval")
}()
err := register.RegisterTask(ctx)
require.NoError(t, err)

time.Sleep(RegisterRetryInternal)
list, err := GetImportTasksFrom(ctx, client)
require.NoError(t, err)
require.Equal(t, 0, len(list.Tasks), list)
require.Eventually(t, func() bool {
list, err := GetImportTasksFrom(ctx, client)
if err != nil {
// The task lease might be revoked during TTL query.
return strings.Contains(err.Error(), "requested lease not found")
}
return len(list.Tasks) == 0
}, 5*time.Second, 50*time.Millisecond)

failpoint.Disable("github.com/pingcap/tidb/br/pkg/utils/brie-task-register-keepalive-stop")
failpoint.Disable("github.com/pingcap/tidb/br/pkg/utils/brie-task-register-failed-to-grant")
time.Sleep(RegisterRetryInternal)
list, err = GetImportTasksFrom(ctx, client)
require.Eventually(t, func() bool {
list, err := GetImportTasksFrom(ctx, client)
return err == nil && len(list.Tasks) > 0
}, 5*time.Second, 50*time.Millisecond)
list, err := GetImportTasksFrom(ctx, client)
require.NoError(t, err)
for _, task := range list.Tasks {
t.Log(task.MessageToUser())
require.Equal(t, "/tidb/brie/import/restore/test", task.Key)
}
require.True(t, len(list.Tasks) > 0)
err = register.Close(ctx)
// for flaky test, the lease would expire
if err != nil && !strings.Contains(err.Error(), "requested lease not found") {
require.NoError(t, err)
}
}

func TestTaskRegisterFailedReput(t *testing.T) {
Expand All @@ -133,35 +144,46 @@ func TestTaskRegisterFailedReput(t *testing.T) {
client := testEtcdCluster.RandClient()

register := NewTaskRegisterWithTTL(client, 3*time.Second, RegisterRestore, "test")
defer func() {
err := register.Close(ctx)
// for flaky test, the lease would expire
if err != nil && !strings.Contains(err.Error(), "requested lease not found") {
require.NoError(t, err)
}
}()
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/br/pkg/utils/brie-task-register-failed-to-reput", "return(true)"))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/br/pkg/utils/brie-task-register-always-grant", "return(true)"))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/br/pkg/utils/brie-task-register-keepalive-stop", "return(true)"))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/br/pkg/utils/brie-task-register-retry-interval", "return(200)"))
defer func() {
failpoint.Disable("github.com/pingcap/tidb/br/pkg/utils/brie-task-register-failed-to-reput")
failpoint.Disable("github.com/pingcap/tidb/br/pkg/utils/brie-task-register-always-grant")
failpoint.Disable("github.com/pingcap/tidb/br/pkg/utils/brie-task-register-keepalive-stop")
failpoint.Disable("github.com/pingcap/tidb/br/pkg/utils/brie-task-register-retry-interval")
}()
err := register.RegisterTask(ctx)
require.NoError(t, err)

time.Sleep(RegisterRetryInternal)
list, err := GetImportTasksFrom(ctx, client)
require.NoError(t, err)
require.Equal(t, 0, len(list.Tasks), list)
require.Eventually(t, func() bool {
list, err := GetImportTasksFrom(ctx, client)
if err != nil {
// The task lease might be revoked during TTL query.
return strings.Contains(err.Error(), "requested lease not found")
}
return len(list.Tasks) == 0
}, 5*time.Second, 50*time.Millisecond)

failpoint.Disable("github.com/pingcap/tidb/br/pkg/utils/brie-task-register-keepalive-stop")
failpoint.Disable("github.com/pingcap/tidb/br/pkg/utils/brie-task-register-failed-to-reput")
time.Sleep(RegisterRetryInternal)
list, err = GetImportTasksFrom(ctx, client)
require.Eventually(t, func() bool {
list, err := GetImportTasksFrom(ctx, client)
return err == nil && len(list.Tasks) > 0
}, 5*time.Second, 50*time.Millisecond)
list, err := GetImportTasksFrom(ctx, client)
require.NoError(t, err)
for _, task := range list.Tasks {
t.Log(task.MessageToUser())
require.Equal(t, "/tidb/brie/import/restore/test", task.Key)
}
require.True(t, len(list.Tasks) > 0)
err = register.Close(ctx)
// for flaky test, the lease would expire
if err != nil && !strings.Contains(err.Error(), "requested lease not found") {
require.NoError(t, err)
}
}
44 changes: 37 additions & 7 deletions pkg/server/handler/optimizor/plan_replayer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ func prepareServerAndClientForTest(t *testing.T, store kv.Storage, dom *domain.D
cfg.Status.StatusPort = client.StatusPort
cfg.Status.ReportStatus = true

// RunInGoTestChan is a global channel and will be closed after the first server starts.
// Recreate it to avoid racing on subsequent server starts in the same test binary.
server.RunInGoTestChan = make(chan struct{})
srv, err := server.NewServer(cfg, driver)
srv.SetDomain(dom)
require.NoError(t, err)
Expand Down Expand Up @@ -359,6 +362,8 @@ func TestPlanReplayerWithMultiForeignKey(t *testing.T) {
config.AllowAllFiles = true
}))
require.NoError(t, err, "Error connecting")
db.SetMaxOpenConns(1)
db.SetMaxIdleConns(1)
defer func() {
err := db.Close()
require.NoError(t, err)
Expand All @@ -374,15 +379,31 @@ func TestPlanReplayerWithMultiForeignKey(t *testing.T) {
tk.MustExec("drop table planReplayer.c")
tk.MustExec(`SET FOREIGN_KEY_CHECKS = 1;`)
tk.MustExec(fmt.Sprintf(`plan replayer load "%s"`, path))
tk.MustExec("use planReplayer")
tk.MustExec("set @@tidb_use_plan_baselines = 1")

rows := tk.MustQuery("select @@global.tidb_mem_quota_binding_cache")
require.True(t, rows.Next(), "unexpected data")
var originBindingCacheQuota int64
require.NoError(t, rows.Scan(&originBindingCacheQuota))
require.NoError(t, rows.Close())
tk.MustExec("set global tidb_mem_quota_binding_cache = 268435456") // 256MB
defer tk.MustExec(fmt.Sprintf("set global tidb_mem_quota_binding_cache = %d", originBindingCacheQuota))

tk.MustExec("admin reload bindings")
// 3-3. check whether binding takes effect
tk.MustExec(`select a, b from t where a in (1, 2, 3)`)
rows := tk.MustQuery("select @@last_plan_from_binding")
require.True(t, rows.Next(), "unexpected data")
var count int64
err = rows.Scan(&count)
require.NoError(t, err)
require.Equal(t, int64(1), count)
require.Eventually(t, func() bool {
tk.MustExec(`select a, b from t where a in (1, 2, 3)`)
rows := tk.MustQuery("select @@last_plan_from_binding")
if !rows.Next() {
_ = rows.Close()
return false
}
var count int64
err := rows.Scan(&count)
_ = rows.Close()
return err == nil && count == int64(1)
}, 10*time.Second, 100*time.Millisecond)
}

func TestIssue43192(t *testing.T) {
Expand Down Expand Up @@ -487,13 +508,22 @@ func prepareData4Issue43192(t *testing.T, client *testserverclient.TestServerCli
require.NoError(t, rows.Close())
rows = tk.MustQuery("select @@tidb_last_plan_replayer_token")
require.True(t, rows.Next(), "unexpected data")
var token string
require.NoError(t, rows.Scan(&token))
require.NoError(t, rows.Close())
require.Equal(t, filename, token)

// Cleanup the binding created for dumping to avoid interference when the same server later loads the replayer file.
tk.MustExec("drop global binding for select a, b from t where a in (1, 2, 3)")
return filename
}

func prepareData4Issue56458(t *testing.T, client *testserverclient.TestServerClient, dom *domain.Domain) string {
h := dom.StatsHandle()
db, err := sql.Open("mysql", client.GetDSN())
require.NoError(t, err, "Error connecting")
db.SetMaxOpenConns(1)
db.SetMaxIdleConns(1)
defer func() {
err := db.Close()
require.NoError(t, err)
Expand Down
3 changes: 3 additions & 0 deletions pkg/server/handler/optimizor/statistics_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ func TestDumpStatsAPI(t *testing.T) {
cfg.Status.ReportStatus = true
cfg.Socket = filepath.Join(tmp, fmt.Sprintf("tidb-mock-%d.sock", time.Now().UnixNano()))

// RunInGoTestChan is a global channel and will be closed after the first server starts.
// Recreate it to avoid racing on subsequent server starts in the same test binary.
server2.RunInGoTestChan = make(chan struct{})
server, err := server2.NewServer(cfg, driver)
require.NoError(t, err)
defer server.Close()
Expand Down
12 changes: 8 additions & 4 deletions pkg/util/gctuner/memory_limit_tuner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func TestIssue48741(t *testing.T) {
func() bool {
return GlobalMemoryLimitTuner.adjustPercentageInProgress.Load() && gcNum < getMemoryLimitGCTotal()
},
500*time.Millisecond, 100*time.Millisecond)
3*time.Second, 100*time.Millisecond)

// update memoryLimit, and sleep 500ms, let t.UpdateMemoryLimit() be called.
memory.ServerMemoryLimit.Store(1500 << 20) // 1.5 GB
Expand Down Expand Up @@ -183,7 +183,9 @@ func TestIssue48741(t *testing.T) {
// Sleep 500ms, let t.UpdateMemoryLimit() be called.
time.Sleep(500 * time.Millisecond)
// The memory limit will be 1.5GB * 110% during tunning.
require.Equal(t, debug.SetMemoryLimit(-1), int64(1500<<20*110/100))
require.Eventually(t, func() bool {
return debug.SetMemoryLimit(-1) == int64(1500<<20*110/100)
}, 3*time.Second, 20*time.Millisecond)
require.True(t, GlobalMemoryLimitTuner.adjustPercentageInProgress.Load())

allocator.free(memory810mb)
Expand All @@ -200,10 +202,12 @@ func TestIssue48741(t *testing.T) {
func() bool {
return GlobalMemoryLimitTuner.adjustPercentageInProgress.Load() && gcNum < getMemoryLimitGCTotal()
},
500*time.Millisecond, 100*time.Millisecond)
3*time.Second, 100*time.Millisecond)

// During the process of adjusting the percentage, the memory limit will be set to 1GB * 110% = 1.1GB.
require.Equal(t, debug.SetMemoryLimit(-1), int64(1<<30*110/100))
require.Eventually(t, func() bool {
return debug.SetMemoryLimit(-1) == int64(1<<30*110/100)
}, 3*time.Second, 20*time.Millisecond)

gcNumAfterMemory810mb := getMemoryLimitGCTotal()
// After the GC triggered by memory810mb.
Expand Down
Loading
Loading