Skip to content

Commit 926da9a

Browse files
zhyassbohutang
authored andcommitted
*: add cleanup handle #569
[summary] add cleanup admin to support clean up the old data after shifted. syntax: radon cleanup; [test case] src/ctl/v1/shard_test.go src/proxy/admin_cleanup_test.go src/proxy/proxy_test.go src/proxy/radon_test.go src/vendor/github.com/xelabs/go-mysqlstack/sqlparser/radon_test.go [patch codecov] src/ctl/v1/shard.go 90.9% src/proxy/admin_cleanup.go 100% src/proxy/proxy.go 85.6% src/proxy/radon.go 77.8% src/vendor/github.com/xelabs/go-mysqlstack/sqlparser/ast.go 85.9%
1 parent a23bb53 commit 926da9a

File tree

16 files changed

+2336
-1838
lines changed

16 files changed

+2336
-1838
lines changed

docs/api.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -342,7 +342,6 @@ This api is used to migrate the data from one backend to another.
342342
Path: /v1/shard/migrate
343343
Method: POST
344344
Request: {
345-
"to-flavor": "Destination db flavor, like mysql/mariadb/radondb, default mysql"
346345
"from": "Source MySQL backend(host:port)", [required]
347346
"from-user": "MySQL user, must have replication privilege", [required]
348347
"from-password": "MySQL user password", [required]

src/ctl/v1/shard.go

Lines changed: 33 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,16 @@
99
package v1
1010

1111
import (
12+
"fmt"
1213
"net/http"
1314
"regexp"
1415
"strconv"
1516
"strings"
1617

18+
"plugins/shiftmanager"
1719
"proxy"
1820

1921
"github.com/ant0ine/go-json-rest/rest"
20-
"github.com/radondb/shift/shift"
2122
shiftlog "github.com/radondb/shift/xlog"
2223
"github.com/xelabs/go-mysqlstack/xlog"
2324
)
@@ -360,8 +361,6 @@ func globalsHandler(log *xlog.Log, proxy *proxy.Proxy, w rest.ResponseWriter, r
360361
}
361362

362363
type migrateParams struct {
363-
ToFlavor string `json:"to-flavor"`
364-
365364
From string `json:"from"`
366365
FromUser string `json:"from-user"`
367366
FromPassword string `json:"from-password"`
@@ -388,7 +387,8 @@ type migrateParams struct {
388387
// Returns:
389388
// 1. Status:200
390389
// 2. Status:204
391-
// 3. Status:500
390+
// 3. Status:403
391+
// 4. Status:500
392392
func ShardMigrateHandler(log *xlog.Log, proxy *proxy.Proxy) rest.HandlerFunc {
393393
f := func(w rest.ResponseWriter, r *rest.Request) {
394394
shardMigrateHandler(proxy, w, r)
@@ -397,9 +397,9 @@ func ShardMigrateHandler(log *xlog.Log, proxy *proxy.Proxy) rest.HandlerFunc {
397397
}
398398

399399
func shardMigrateHandler(proxy *proxy.Proxy, w rest.ResponseWriter, r *rest.Request) {
400+
scatter := proxy.Scatter()
400401
log := shiftlog.NewStdLog(shiftlog.Level(shiftlog.INFO))
401402
p := &migrateParams{
402-
ToFlavor: shift.ToMySQLFlavor,
403403
RadonURL: "http://" + proxy.Config().Proxy.PeerAddress,
404404
Rebalance: false,
405405
Cleanup: false,
@@ -423,18 +423,30 @@ func shardMigrateHandler(proxy *proxy.Proxy, w rest.ResponseWriter, r *rest.Requ
423423
}
424424

425425
// check args.
426-
if len(p.From) == 0 || len(p.FromUser) == 0 || len(p.FromDatabase) == 0 || len(p.FromTable) == 0 ||
427-
len(p.To) == 0 || len(p.ToUser) == 0 || len(p.ToDatabase) == 0 || len(p.ToTable) == 0 {
426+
if len(p.FromUser) == 0 || len(p.FromDatabase) == 0 || len(p.FromTable) == 0 ||
427+
len(p.ToUser) == 0 || len(p.ToDatabase) == 0 || len(p.ToTable) == 0 {
428428
log.Error("api.v1.shard.migrate[%+v].error:some param is empty", p)
429429
rest.Error(w, "some args are empty", http.StatusNoContent)
430430
return
431431
}
432-
log.Warning(`
433-
IMPORTANT: Please check that the shift run completes successfully.
434-
At the end of a successful shift run prints "shift.completed.OK!".`)
435432

436-
cfg := &shift.Config{
437-
ToFlavor: p.ToFlavor,
433+
// Check the backend name.
434+
var fromBackend, toBackend string
435+
backends := scatter.BackendConfigsClone()
436+
for _, backend := range backends {
437+
if backend.Address == p.From {
438+
fromBackend = backend.Name
439+
} else if backend.Address == p.To {
440+
toBackend = backend.Name
441+
}
442+
}
443+
if fromBackend == "" || toBackend == "" {
444+
log.Error("api.v1.shard.migrate.fromBackend[%s].or.toBackend[%s].is.NULL", fromBackend, toBackend)
445+
rest.Error(w, "api.v1.shard.migrate.backend.NULL", http.StatusInternalServerError)
446+
return
447+
}
448+
449+
cfg := &shiftmanager.ShiftInfo{
438450
From: p.From,
439451
FromUser: p.FromUser,
440452
FromPassword: p.FromPassword,
@@ -447,23 +459,26 @@ func shardMigrateHandler(proxy *proxy.Proxy, w rest.ResponseWriter, r *rest.Requ
447459
ToTable: p.ToTable,
448460
Rebalance: p.Rebalance,
449461
Cleanup: p.Cleanup,
450-
MySQLDump: p.MySQLDump,
462+
MysqlDump: p.MySQLDump,
451463
Threads: p.Threads,
452-
Behinds: p.Behinds,
464+
PosBehinds: p.Behinds,
453465
RadonURL: p.RadonURL,
454466
Checksum: p.Checksum,
455467
WaitTimeBeforeChecksum: p.WaitTimeBeforeChecksum,
456468
}
457-
log.Info("shift.cfg:%+v", cfg)
458469

459-
shift := shift.NewShift(log, cfg)
460-
if err := shift.Start(); err != nil {
470+
shiftMgr := proxy.Plugins().PlugShiftMgr()
471+
shift, _ := shiftMgr.NewShiftInstance(cfg, shiftmanager.ShiftTypeRebalance)
472+
473+
key := fmt.Sprintf("`%s`.`%s`_%s", p.ToDatabase, p.ToTable, toBackend)
474+
err = shiftMgr.StartShiftInstance(key, shift, shiftmanager.ShiftTypeRebalance)
475+
if err != nil {
461476
log.Error("shift.start.error:%+v", err)
462477
rest.Error(w, err.Error(), http.StatusInternalServerError)
463478
return
464479
}
465480

466-
err = shift.WaitFinish()
481+
err = shiftMgr.WaitInstanceFinish(key)
467482
if err != nil {
468483
log.Error("shift.wait.finish.error:%+v", err)
469484
rest.Error(w, err.Error(), http.StatusInternalServerError)

src/ctl/v1/shard_test.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1402,13 +1402,12 @@ func TestCtlV1ShardMigrateErr(t *testing.T) {
14021402
}
14031403
}
14041404
p := &migrateParams{
1405-
ToFlavor: "mysql",
1406-
From: "127.0.0.1" + from,
1405+
From: from,
14071406
FromUser: fromUsr,
14081407
FromPassword: fromPasswd,
14091408
FromDatabase: "test",
14101409
FromTable: "a",
1411-
To: "127.0.0.1" + to,
1410+
To: to,
14121411
ToUser: toUsr,
14131412
ToPassword: toPasswd,
14141413
ToDatabase: "test",
@@ -1475,6 +1474,14 @@ func TestCtlV1ShardMigrateErr(t *testing.T) {
14751474
recorded.CodeIs(204)
14761475
}
14771476

1477+
// check backend null.
1478+
{
1479+
p.ToTable = "a"
1480+
p.To = "192.168.0.1:3306"
1481+
recorded := test.RunRequest(t, handler, test.MakeSimpleRequest("POST", "http://localhost/v1/shard/migrate", p))
1482+
recorded.CodeIs(500)
1483+
}
1484+
14781485
// Set readonly.
14791486
{
14801487
proxy.SetReadOnly(true)

src/plugins/shiftmanager/handler.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ type ShiftInfo struct {
6060
ToDatabase string
6161
ToTable string
6262

63+
Rebalance bool
6364
Cleanup bool
6465
Checksum bool
6566
MysqlDump string

src/plugins/shiftmanager/shiftmanager.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ func (shiftMgr *ShiftManager) NewShiftInstance(shiftInfo *ShiftInfo, typ ShiftTy
8888
ToPassword: shiftInfo.ToPassword,
8989
ToDatabase: shiftInfo.ToDatabase,
9090
ToTable: shiftInfo.ToTable,
91+
Rebalance: shiftInfo.Rebalance,
9192
Cleanup: shiftInfo.Cleanup,
9293
MySQLDump: shiftInfo.MysqlDump,
9394
Threads: shiftInfo.Threads,

src/proxy/admin_cleanup.go

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
package proxy
2+
3+
import (
4+
"fmt"
5+
"strings"
6+
7+
"backend"
8+
"plugins/shiftmanager"
9+
"router"
10+
11+
"github.com/xelabs/go-mysqlstack/sqlparser/depends/sqltypes"
12+
"github.com/xelabs/go-mysqlstack/xlog"
13+
)
14+
15+
// Cleanup ...
16+
type Cleanup struct {
17+
log *xlog.Log
18+
scatter *backend.Scatter
19+
router *router.Router
20+
spanner *Spanner
21+
}
22+
23+
// NewCleanup -- creates new Cleanup handler.
24+
func NewCleanup(log *xlog.Log, scatter *backend.Scatter, router *router.Router, spanner *Spanner) *Cleanup {
25+
return &Cleanup{
26+
log: log,
27+
scatter: scatter,
28+
router: router,
29+
spanner: spanner,
30+
}
31+
}
32+
33+
// Cleanup used to find and cleanup the old data.
34+
func (c *Cleanup) Cleanup() (*sqltypes.Result, error) {
35+
backends := c.scatter.AllBackends()
36+
for _, backend := range backends {
37+
dbQuery := "show databases"
38+
qr, err := c.spanner.ExecuteOnThisBackend(backend, dbQuery)
39+
if err != nil {
40+
return nil, err
41+
}
42+
43+
for _, r := range qr.Rows {
44+
db := string(r[0].Raw())
45+
err := c.cleanupHandler(db, backend)
46+
if err != nil {
47+
return nil, err
48+
}
49+
}
50+
}
51+
52+
return &sqltypes.Result{}, nil
53+
}
54+
55+
func (c *Cleanup) cleanupHandler(database string, backend string) error {
56+
log := c.log
57+
var err error
58+
if isSysDB := c.router.IsSystemDB(database); isSysDB {
59+
return nil
60+
}
61+
62+
// 1.Check if the database is in router. If not, drop database.
63+
if err = c.router.CheckDatabase(database); err != nil {
64+
sql := fmt.Sprintf("drop database if exists `%s`", database)
65+
if _, err = c.spanner.ExecuteOnThisBackend(backend, sql); err != nil {
66+
log.Error("cleanup.drop.database[%s].on.backend[%s].error:%v", database, backend, err)
67+
return err
68+
}
69+
log.Warning("cleanup.database[%s].on.backend[%s].has.been.cleaned", database, backend)
70+
return nil
71+
}
72+
73+
// 2. Find the table with suffix '_cleanup', check whether exist in radon. If not, drop table.
74+
sql := fmt.Sprintf("select table_name from information_schema.tables where table_schema = '%s' and table_name like '%%_cleanup'", database)
75+
qr, err := c.spanner.ExecuteOnThisBackend(backend, sql)
76+
if err != nil {
77+
return err
78+
}
79+
80+
for _, r := range qr.Rows {
81+
tb := string(r[0].Raw())
82+
// Because partition table suffix is a 4-digit number,
83+
// here can only be a global table or a single table.
84+
// Therefore, we can directly check in the router.
85+
if isExist, _ := c.router.CheckTable(database, tb); !isExist {
86+
sql := fmt.Sprintf("drop table if exists `%s`.`%s`", database, tb)
87+
_, err = c.spanner.ExecuteOnThisBackend(backend, sql)
88+
if err != nil {
89+
log.Error("cleanup.drop.table[%s.%s].on.backend[%s].error:%v", database, tb, backend, err)
90+
return err
91+
}
92+
log.Warning("cleanup.table[%s.%s].on.backend[%s].has.been.cleaned", database, tb, backend)
93+
}
94+
}
95+
96+
// 3. Find the table with suffix '_migrate', check whether migrating or exist in radon. If not, drop table.
97+
sql = fmt.Sprintf("select table_name from information_schema.tables where table_schema = '%s' and table_name like '%%_migrate'", database)
98+
qr, err = c.spanner.ExecuteOnThisBackend(backend, sql)
99+
if err != nil {
100+
return err
101+
}
102+
103+
for _, r := range qr.Rows {
104+
tb := string(r[0].Raw())
105+
if isExist, _ := c.router.CheckTable(database, tb); !isExist {
106+
// Check if the table is migrating.
107+
key := fmt.Sprintf("`%s`.`%s`_%s", database, strings.TrimSuffix(tb, "_migrate"), backend)
108+
shiftMgr := c.spanner.plugins.PlugShiftMgr()
109+
status := shiftMgr.GetStatus(key)
110+
if status == shiftmanager.ShiftStatusMigrating {
111+
continue
112+
}
113+
114+
sql := fmt.Sprintf("drop table if exists `%s`.`%s`", database, tb)
115+
_, err = c.spanner.ExecuteOnThisBackend(backend, sql)
116+
if err != nil {
117+
log.Error("cleanup.drop.table[%s.%s].on.backend[%s].error:%v", database, tb, backend, err)
118+
return err
119+
}
120+
log.Warning("cleanup.table[%s.%s].on.backend[%s].has.been.cleaned", database, tb, backend)
121+
}
122+
}
123+
return nil
124+
}

0 commit comments

Comments
 (0)