Skip to content

Commit d443b76

Browse files
authored
Merge pull request #1408 from signal18/ChecksumRepairTables
Checksum repair tables
2 parents 0e9685f + 9c1ea9f commit d443b76

File tree

11 files changed

+346
-33
lines changed

11 files changed

+346
-33
lines changed

cluster/cluster_acl_rules.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,7 @@ var clusterACLRules = []ACLRule{
209209

210210
// Maintenance
211211
{"/actions/checksum-all-tables", nil, []string{config.GrantClusterChecksum}},
212+
{"/actions/checksum-repair-all-tables", nil, []string{config.GrantClusterChecksumRepair}},
212213
{"/actions/analyze-all-tables", nil, []string{config.GrantClusterAnalyze}},
213214

214215
// Provisioning

cluster/cluster_acl_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -943,6 +943,7 @@ func TestIsURLPassACLComprehensiveCoverage(t *testing.T) {
943943
config.GrantSalesValidate: true,
944944
config.GrantClusterDocker: true,
945945
config.GrantClusterChecksum: true,
946+
config.GrantClusterChecksumRepair: true,
946947
config.GrantClusterAnalyze: true,
947948
config.GrantDBConfigFlag: true,
948949
config.GrantProxyConfigFlag: true,
@@ -1002,6 +1003,7 @@ func TestIsURLPassACLComprehensiveCoverage(t *testing.T) {
10021003
// Maintenance
10031004
{"Maintenance", "/api/clusters/test/actions/checksum-all-tables", true},
10041005
{"Maintenance", "/api/clusters/test/actions/analyze-all-tables", true},
1006+
{"Maintenance", "/api/clusters/test/actions/repair-all-tables", true},
10051007
}
10061008

10071009
for _, tt := range tests {

cluster/cluster_chk.go

Lines changed: 105 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -623,6 +623,12 @@ func (cluster *Cluster) CheckAllTableChecksum() {
623623
}
624624
}
625625

626+
func (cluster *Cluster) RepairAllTableChecksum() {
627+
for _, t := range cluster.master.Tables {
628+
cluster.RepairTableChecksum(t.TableSchema, t.TableName)
629+
}
630+
}
631+
626632
func (cluster *Cluster) CheckAllTableChecksumSchema(name string) {
627633
for _, t := range cluster.master.Tables {
628634
if t.TableSchema == name {
@@ -684,32 +690,32 @@ func (cluster *Cluster) CheckTableChecksum(schema string, table string) {
684690
shardListPredicate := ""
685691
rangeCondition := ""
686692
if len(pks) == 0 {
687-
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlErr,"Table %s.%s has no primary key, cannot create chunk table", schema, table)
693+
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlErr, "Table %s.%s has no primary key, cannot create chunk table", schema, table)
688694
return
689695
}
690696

691697
// var ftype string
692698
for _, p := range pks {
693699
if columnDefPredicate != "" {
694-
columnDefPredicate += " , "
700+
columnDefPredicate += " , "
695701
wherePredicate += " AND "
696702
columnListPredicate += " , "
697703
bColumnListPredicate += " , "
698704
shardListPredicate += " , "
699-
rangeCondition += " , "
705+
rangeCondition += " ,' AND ', "
700706
}
701707
columnType := cluster.master.GetTableColumDef(schema, table, p)
702708
columnDefPredicate = columnDefPredicate + " Min_" + p + " " + columnType + ", Max_" + p + " " + columnType
703709
wherePredicate = wherePredicate + " A." + p + " >= B.Min_" + p + " AND A." + p + "<= B.Max_" + p + " "
704710
columnListPredicate = columnListPredicate + p + " "
705-
bColumnListPredicate = bColumnListPredicate + " B.Min_" + p +" , B.Max_" + p
706-
rangeCondition = rangeCondition + "'A." + p + " >=',B.Min_" + p + ",' AND A." + p + "<=', B.Max_" + p
711+
bColumnListPredicate = bColumnListPredicate + " B.Min_" + p + " , B.Max_" + p
712+
rangeCondition = rangeCondition + "'A." + p + " >=',B.Min_" + p + ",' AND A." + p + "<=', B.Max_" + p
707713
shardListPredicate = shardListPredicate + " MIN(" + p + ") AS Min_" + p + " , MAX(" + p + ") AS Max_" + p + " "
708714
}
709715

710716
_, err = Conn.Exec("/* replication-manager */ CREATE OR REPLACE TABLE replication_manager_schema.table_checksum(chunkId BIGINT,chunkRangeCondition varchar(8000) , " + columnDefPredicate + ",chunkCheckSum BIGINT UNSIGNED ) ENGINE=INNODB")
711717
if err != nil {
712-
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlErr,"Error creating checksum table: %w", err)
718+
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlErr, "Error creating checksum table: %w", err)
713719
return
714720
}
715721
chunkSize := 2000
@@ -719,7 +725,7 @@ func (cluster *Cluster) CheckTableChecksum(schema string, table string) {
719725

720726
_, err = Conn.Exec(query)
721727
if err != nil {
722-
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlErr,"Error creating chunk table: %w", err)
728+
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlErr, "Error creating chunk table: %w", err)
723729
return
724730
}
725731
rows, err := Conn.Queryx("SELECT chunkId FROM replication_manager_schema.table_chunk")
@@ -748,8 +754,7 @@ func (cluster *Cluster) CheckTableChecksum(schema string, table string) {
748754
return
749755
}
750756

751-
752-
query = "INSERT INTO replication_manager_schema.table_checksum SELECT chunkId, CONCAT(" + rangeCondition+ ") as chunkRangeCondition," + bColumnListPredicate + " ," + md5Sum + " as chunkCheckSum FROM " + schema + "." + table + " A inner join (select * from replication_manager_schema.table_chunk WHERE chunkId=? ) B on " + wherePredicate + " GROUP BY chunkId HAVING chunkId IS NOT NULL"
757+
query = "INSERT INTO replication_manager_schema.table_checksum SELECT chunkId, CONCAT(" + rangeCondition + ") as chunkRangeCondition," + bColumnListPredicate + " ," + md5Sum + " as chunkCheckSum FROM " + schema + "." + table + " A inner join (select * from replication_manager_schema.table_chunk WHERE chunkId=? ) B on " + wherePredicate + " GROUP BY chunkId HAVING chunkId IS NOT NULL"
753758
stmt, err := Conn.Prepare(query)
754759

755760
if err != nil {
@@ -764,18 +769,18 @@ func (cluster *Cluster) CheckTableChecksum(schema string, table string) {
764769
return
765770
}
766771
/*
767-
_, err2 := Conn.Exec("DELETE FROM replication_manager_schema.table_chunk WHERE chunkId=?",chunk.ChunkId)
768-
if err2 != nil {
769-
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlErr, "Checksum error deleting chunck %s", err)
770-
return
771-
}
772+
_, err2 := Conn.Exec("DELETE FROM replication_manager_schema.table_chunk WHERE chunkId=?",chunk.ChunkId)
773+
if err2 != nil {
774+
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlErr, "Checksum error deleting chunck %s", err)
775+
return
776+
}
772777
773-
slave := cluster.GetFirstWorkingSlave()
774-
if slave != nil {
775-
if slave.GetReplicationDelay() > 5 {
776-
time.Sleep(time.Duration(slave.GetReplicationDelay()) * time.Second)
777-
}
778-
}*/
778+
slave := cluster.GetFirstWorkingSlave()
779+
if slave != nil {
780+
if slave.GetReplicationDelay() > 5 {
781+
time.Sleep(time.Duration(slave.GetReplicationDelay()) * time.Second)
782+
}
783+
}*/
779784
}
780785
cluster.master.Refresh()
781786
masterSeq := cluster.master.CurrentGtid.GetSeqServerIdNos(uint64(cluster.master.ServerID))
@@ -804,16 +809,17 @@ func (cluster *Cluster) CheckTableChecksum(schema string, table string) {
804809
slaveChecksums, logs, err := dbhelper.GetTableChecksumResult(s.Conn)
805810
cluster.LogSQL(logs, err, s.URL, "CheckTableChecksum", config.LvlDbg, "GetTableChecksumResult")
806811
checkok := true
807-
808812
for _, chunk := range masterChecksums {
809813
if chunk.ChunkCheckSum != slaveChecksums[chunk.ChunkId].ChunkCheckSum {
810814
checkok = false
811-
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlInfo, "Checksum table failed chunks %d %s.%s %s", chunk.ChunkId, schema, table, s.URL)
812-
t := cluster.master.DictTables.Get(schema + "." + table)
813-
t.TableSync = "ER"
814-
cluster.master.DictTables.Set(schema+"."+table, t)
815+
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlInfo, "Checksum table failed chunks %d %s.%s %s", chunk.ChunkId, schema, table, s.URL)
816+
tm := cluster.master.DictTables.Get(schema + "." + table)
817+
tm.TableSync = "ER"
818+
ts := s.DictTables.Get(schema + "." + table)
819+
ts.TableChunksError = append(ts.TableChunksError, chunk)
820+
s.DictTables.Set(schema+"."+table, ts)
821+
cluster.master.DictTables.Set(schema+"."+table, tm)
815822
}
816-
817823
}
818824
if checkok {
819825
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlInfo, "Checksum table succeed %s.%s %s", schema, table, s.URL)
@@ -824,6 +830,79 @@ func (cluster *Cluster) CheckTableChecksum(schema string, table string) {
824830
}
825831
}
826832

833+
func (cluster *Cluster) RepairTableChecksum(schema string, table string) {
834+
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlInfo, "Repair table %s.%s %s", schema, table, cluster.master.URL)
835+
836+
master := cluster.GetMaster()
837+
if master == nil {
838+
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlErr, "Repair table %s.%s, master not discovered", schema, table)
839+
return
840+
}
841+
Conn, err := master.GetNewDBConn()
842+
if err != nil {
843+
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlErr, "Error connection in exec query no log %s", err)
844+
return
845+
}
846+
defer Conn.Close()
847+
848+
Conn.SetConnMaxLifetime(3595 * time.Second)
849+
Conn.Exec("SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ")
850+
Conn.Exec("SET SESSION binlog_format = 'ROW'")
851+
Conn.Exec("USE " + schema)
852+
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlInfo, "Repair table %s.%s", schema, table)
853+
854+
for _, s := range cluster.slaves {
855+
ts := s.DictTables.Get(schema + "." + table)
856+
// Reset the master table dictionary to reflect the new unkown sync status
857+
if len(ts.TableChunksError) > 0 {
858+
t := master.DictTables.Get(schema + "." + table)
859+
t.TableSync = ""
860+
cluster.master.DictTables.Set(schema+"."+table, t)
861+
for i, chunk := range ts.TableChunksError {
862+
query := "start transaction "
863+
_, err = Conn.Exec(query)
864+
if err != nil {
865+
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlErr, "ERROR: Could not process chunck %s %s", query, err)
866+
return
867+
}
868+
query = "CREATE OR REPLACE TEMPORARY TABLE tmp_repair AS SELECT * FROM " + schema + "." + table + " A WHERE " + chunk.ChunkRangeCondition + " FOR UPDATE"
869+
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlInfo, "Repair chunk %d/%d %s", i, len(ts.TableChunksError), query)
870+
_, err = Conn.Exec(query)
871+
if err != nil {
872+
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlErr, "ERROR: Could not process chunck %s %s", query, err)
873+
return
874+
}
875+
query = "DELETE FROM " + schema + "." + table + " A WHERE " + chunk.ChunkRangeCondition
876+
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlInfo, "Repair chunk %d/%d %s", i, len(ts.TableChunksError), query)
877+
_, err = Conn.Exec(query)
878+
if err != nil {
879+
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlErr, "ERROR: Could not process chunck %s %s", query, err)
880+
return
881+
}
882+
query = "INSERT INTO " + schema + "." + table + " SELECT * FROM tmp_repair"
883+
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlInfo, "Repair chunk %d/%d %s", i, len(ts.TableChunksError), query)
884+
_, err = Conn.Exec(query)
885+
if err != nil {
886+
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlErr, "ERROR: Could not process chunck %s %s", query, err)
887+
return
888+
}
889+
query = "COMMIT"
890+
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlInfo, "Repair chunk %d/%d %s", i, len(ts.TableChunksError), query)
891+
_, err = Conn.Exec(query)
892+
if err != nil {
893+
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlErr, "ERROR: Could not process chunck %s %s", query, err)
894+
return
895+
}
896+
897+
}
898+
// Empry the slice of chunk errors
899+
ts.TableChunksError = ts.TableChunksError[:0]
900+
s.DictTables.Set(schema+"."+table, ts)
901+
} // if chunks in error
902+
} // for all slaves
903+
cluster.CheckTableChecksum(schema, table)
904+
}
905+
827906
// CheckSameServerID Check against the servers that all server id are differents
828907
func (cluster *Cluster) CheckSameServerID() {
829908
for _, s := range cluster.Servers {

config/config.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1307,6 +1307,7 @@ const (
13071307
GrantClusterGrant string = "cluster-grant"
13081308
GrantClusterAnalyze string = "cluster-analyze"
13091309
GrantClusterChecksum string = "cluster-checksum"
1310+
GrantClusterChecksumRepair string = "cluster-checksum-repair"
13101311
GrantClusterSharding string = "cluster-sharding"
13111312
GrantClusterReplication string = "cluster-replication"
13121313
GrantClusterCertificatesRotate string = "cluster-certificates-rotate"
@@ -2426,6 +2427,7 @@ func GetGrantType() map[string]string {
24262427
GrantClusterReplication: GrantClusterReplication,
24272428
GrantClusterAnalyze: GrantClusterAnalyze,
24282429
GrantClusterChecksum: GrantClusterChecksum,
2430+
GrantClusterChecksumRepair: GrantClusterChecksumRepair,
24292431
GrantClusterSharding: GrantClusterSharding,
24302432
GrantClusterCertificatesRotate: GrantClusterCertificatesRotate,
24312433
GrantClusterCertificatesReload: GrantClusterCertificatesReload,
@@ -2534,6 +2536,7 @@ func GetGrantCluster() []string {
25342536
GrantClusterReplication,
25352537
GrantClusterAnalyze,
25362538
GrantClusterChecksum,
2539+
GrantClusterChecksumRepair,
25372540
GrantClusterSharding,
25382541
GrantClusterCertificatesRotate,
25392542
GrantClusterCertificatesReload,

server/api_cluster.go

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -510,6 +510,10 @@ func (repman *ReplicationManager) apiClusterProtectedHandler(router *mux.Router)
510510
negroni.HandlerFunc(repman.validateTokenMiddleware),
511511
negroni.Wrap(http.HandlerFunc(repman.handlerMuxClusterSchemaChecksumTable)),
512512
))
513+
router.Handle("/api/clusters/{clusterName}/schema/{schemaName}/{tableName}/actions/checksum-repair-table", negroni.New(
514+
negroni.HandlerFunc(repman.validateTokenMiddleware),
515+
negroni.Wrap(http.HandlerFunc(repman.handlerMuxClusterSchemaChecksumRepairTable)),
516+
))
513517
router.Handle("/api/clusters/{clusterName}/schema/{schemaName}/all/actions/checksum-schema", negroni.New(
514518
negroni.HandlerFunc(repman.validateTokenMiddleware),
515519
negroni.Wrap(http.HandlerFunc(repman.handlerMuxClusterChecksumSchema)),
@@ -522,6 +526,10 @@ func (repman *ReplicationManager) apiClusterProtectedHandler(router *mux.Router)
522526
negroni.HandlerFunc(repman.validateTokenMiddleware),
523527
negroni.Wrap(http.HandlerFunc(repman.handlerMuxClusterSchemaChecksumAllTable)),
524528
))
529+
router.Handle("/api/clusters/{clusterName}/actions/checksum-repair-all-tables", negroni.New(
530+
negroni.HandlerFunc(repman.validateTokenMiddleware),
531+
negroni.Wrap(http.HandlerFunc(repman.handlerMuxClusterSchemaChecksumRepairAllTable)),
532+
))
525533
router.Handle("/api/clusters/{clusterName}/schema/{schemaName}/{tableName}/actions/analyze-table/{persistent}", negroni.New(
526534
negroni.HandlerFunc(repman.validateTokenMiddleware),
527535
negroni.Wrap(http.HandlerFunc(repman.handlerMuxClusterSchemaAnalyzeTable)),
@@ -5967,6 +5975,76 @@ func (repman *ReplicationManager) handlerMuxClusterSchemaChecksumTable(w http.Re
59675975
}
59685976
}
59695977

5978+
// handlerMuxClusterSchemaChecksumRepairAllTable handles the repair checksum for all tables in a given cluster.
5979+
// @Summary Compute Repair for all tables in a specific cluster
5980+
// @Description This endpoint triggers the checksum calculation for all tables in the specified cluster.
5981+
// @Tags ClusterSchema
5982+
// @Accept json
5983+
// @Produce json
5984+
// @Param Authorization header string true "Insert your access token" default(Bearer <Add access token here>)
5985+
// @Param clusterName path string true "Cluster Name"
5986+
// @Success 200 {string} string "Successfully triggered checksum calculation for all tables"
5987+
// @Failure 403 {string} string "No valid ACL"
5988+
// @Failure 500 {string} string "No cluster"
5989+
// @Router /api/clusters/{clusterName}/actions/checksum-repair-all-tables [post]
5990+
func (repman *ReplicationManager) handlerMuxClusterSchemaChecksumRepairAllTable(w http.ResponseWriter, r *http.Request) {
5991+
w.Header().Set("Access-Control-Allow-Origin", "*")
5992+
5993+
vars := mux.Vars(r)
5994+
mycluster := repman.getClusterByName(vars["clusterName"])
5995+
if mycluster != nil {
5996+
if valid, _ := repman.IsValidClusterACL(r, mycluster); !valid {
5997+
http.Error(w, "No valid ACL", http.StatusForbidden)
5998+
return
5999+
}
6000+
master := mycluster.GetMaster()
6001+
if master == nil || len(master.Tables) == 0 {
6002+
mycluster.LogModulePrintf(mycluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlInfo,
6003+
"Repair Checksum all tables requested; schema cache empty. Triggered schema monitoring; re-run checksum after cache is ready.")
6004+
mycluster.SetWaitMonitorSchema()
6005+
w.Header().Set("Content-Type", "application/json")
6006+
w.WriteHeader(http.StatusAccepted)
6007+
json.NewEncoder(w).Encode(map[string]string{
6008+
"message": "schema cache empty; schema monitoring triggered; retry checksum after cache is populated",
6009+
})
6010+
return
6011+
}
6012+
go mycluster.RepairAllTableChecksum()
6013+
} else {
6014+
http.Error(w, "No cluster", http.StatusInternalServerError)
6015+
}
6016+
}
6017+
6018+
// handlerMuxClusterSchemaChecksumRepairTable handles repair after checksum calculation for a specific table in a given cluster.
6019+
// @Summary Repair checksum error for a specific table in a specific cluster
6020+
// @Description This endpoint triggers the checksum calculation for a specific table in the specified cluster.
6021+
// @Tags ClusterSchema
6022+
// @Accept json
6023+
// @Produce json
6024+
// @Param Authorization header string true "Insert your access token" default(Bearer <Add access token here>)
6025+
// @Param clusterName path string true "Cluster Name"
6026+
// @Param schemaName path string true "Schema Name"
6027+
// @Param tableName path string true "Table Name"
6028+
// @Success 200 {string} string "Successfully triggered checksum calculation for the table"
6029+
// @Failure 403 {string} string "No valid ACL"
6030+
// @Failure 500 {string} string "No cluster"
6031+
// @Router /api/clusters/{clusterName}/schema/{schemaName}/{tableName}/actions/checksum-repair-table [post]
6032+
func (repman *ReplicationManager) handlerMuxClusterSchemaChecksumRepairTable(w http.ResponseWriter, r *http.Request) {
6033+
w.Header().Set("Access-Control-Allow-Origin", "*")
6034+
6035+
vars := mux.Vars(r)
6036+
mycluster := repman.getClusterByName(vars["clusterName"])
6037+
if mycluster != nil {
6038+
if valid, _ := repman.IsValidClusterACL(r, mycluster); !valid {
6039+
http.Error(w, "No valid ACL", http.StatusForbidden)
6040+
return
6041+
}
6042+
go mycluster.RepairTableChecksum(vars["schemaName"], vars["tableName"])
6043+
} else {
6044+
http.Error(w, "No cluster", http.StatusInternalServerError)
6045+
}
6046+
}
6047+
59706048
// handlerMuxClusterSchemaAnalyzeAllTables handles the analyze calculation for all tables in a given cluster.
59716049
// @Summary Calculate analyze for all tables in a specific cluster
59726050
// @Description This endpoint triggers the analyze calculation for all tables in the specified cluster.

0 commit comments

Comments
 (0)