Skip to content
This repository was archived by the owner on Aug 21, 2023. It is now read-only.

Commit 0d8ccce

Browse files
authored
*: update dumpling log and pd usage (#335) (#341)
1 parent 4b40fbc commit 0d8ccce

File tree

5 files changed

+42
-30
lines changed

5 files changed

+42
-30
lines changed

v4/export/dump.go

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -470,7 +470,7 @@ func (d *Dumper) concurrentDumpTable(tctx *tcontext.Context, conn *sql.Conn, met
470470
return err
471471
}
472472
tctx.L().Warn("fallback to concurrent dump tables using rows due to tidb error",
473-
zap.String("database", db), zap.String("table", tbl), zap.Error(err))
473+
zap.String("database", db), zap.String("table", tbl), log.ShortError(err))
474474
}
475475

476476
orderByClause, err := buildOrderByClause(conf, conn, db, tbl, meta.HasImplicitRowID())
@@ -482,7 +482,7 @@ func (d *Dumper) concurrentDumpTable(tctx *tcontext.Context, conn *sql.Conn, met
482482
if err != nil || field == "" {
483483
// skip split chunk logic if not found proper field
484484
tctx.L().Warn("fallback to sequential dump due to no proper field",
485-
zap.String("database", db), zap.String("table", tbl), zap.Error(err))
485+
zap.String("database", db), zap.String("table", tbl), log.ShortError(err))
486486
return d.dumpWholeTableDirectly(tctx, meta, taskChan, "", orderByClause, 0, 1)
487487
}
488488

@@ -1023,7 +1023,7 @@ func startHTTPService(d *Dumper) error {
10231023
go func() {
10241024
err := startDumplingService(d.tctx, conf.StatusAddr)
10251025
if err != nil {
1026-
d.L().Warn("meet error when stopping dumpling http service", zap.Error(err))
1026+
d.L().Warn("meet error when stopping dumpling http service", log.ShortError(err))
10271027
}
10281028
}()
10291029
}
@@ -1080,16 +1080,17 @@ func tidbSetPDClientForGC(d *Dumper) error {
10801080
}
10811081
pdAddrs, err := GetPdAddrs(tctx, pool)
10821082
if err != nil {
1083-
return err
1083+
tctx.L().Warn("meet error while fetching pd addrs", log.ShortError(err))
1084+
return nil
10841085
}
10851086
if len(pdAddrs) > 0 {
10861087
doPdGC, err := checkSameCluster(tctx, pool, pdAddrs)
10871088
if err != nil {
1088-
tctx.L().Warn("meet error while check whether fetched pd addr and TiDB belong to one cluster", zap.Error(err), zap.Strings("pdAddrs", pdAddrs))
1089+
tctx.L().Warn("meet error while check whether fetched pd addr and TiDB belong to one cluster", log.ShortError(err), zap.Strings("pdAddrs", pdAddrs))
10891090
} else if doPdGC {
10901091
pdClient, err := pd.NewClientWithContext(tctx, pdAddrs, pd.SecurityOption{})
10911092
if err != nil {
1092-
tctx.L().Warn("create pd client to control GC failed", zap.Error(err), zap.Strings("pdAddrs", pdAddrs))
1093+
tctx.L().Warn("create pd client to control GC failed", log.ShortError(err), zap.Strings("pdAddrs", pdAddrs))
10931094
}
10941095
d.tidbPDClientForGC = pdClient
10951096
}
@@ -1105,13 +1106,13 @@ func tidbGetSnapshot(d *Dumper) error {
11051106
if conf.Snapshot == "" && (doPdGC || consistency == "snapshot") {
11061107
conn, err := pool.Conn(tctx)
11071108
if err != nil {
1108-
tctx.L().Warn("cannot get snapshot from TiDB", zap.Error(err))
1109+
tctx.L().Warn("cannot get snapshot from TiDB", log.ShortError(err))
11091110
return nil
11101111
}
11111112
snapshot, err := getSnapshot(conn)
11121113
_ = conn.Close()
11131114
if err != nil {
1114-
tctx.L().Warn("cannot get snapshot from TiDB", zap.Error(err))
1115+
tctx.L().Warn("cannot get snapshot from TiDB", log.ShortError(err))
11151116
return nil
11161117
}
11171118
conf.Snapshot = snapshot
@@ -1186,7 +1187,7 @@ func setSessionParam(d *Dumper) error {
11861187
if consistency == consistencyTypeSnapshot {
11871188
conf.ServerInfo.HasTiKV, err = CheckTiDBWithTiKV(pool)
11881189
if err != nil {
1189-
return err
1190+
d.L().Warn("fail to check whether TiDB has TiKV", log.ShortError(err))
11901191
}
11911192
if conf.ServerInfo.HasTiKV {
11921193
sessionParam["tidb_snapshot"] = snapshot

v4/export/http_handler.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,11 @@ import (
1010
"time"
1111

1212
tcontext "github.com/pingcap/dumpling/v4/context"
13+
"github.com/pingcap/dumpling/v4/log"
1314

1415
"github.com/pingcap/errors"
1516
"github.com/prometheus/client_golang/prometheus/promhttp"
1617
"github.com/soheilhy/cmux"
17-
"go.uber.org/zap"
1818
)
1919

2020
var cmuxReadTimeout = 10 * time.Second
@@ -35,7 +35,7 @@ func startHTTPServer(tctx *tcontext.Context, lis net.Listener) {
3535
err := httpServer.Serve(lis)
3636
err = errors.Cause(err)
3737
if err != nil && !isErrNetClosing(err) && err != http.ErrServerClosed {
38-
tctx.L().Warn("http server return with error", zap.Error(err))
38+
tctx.L().Warn("dumpling http handler return with error", log.ShortError(err))
3939
}
4040
}
4141

v4/export/sql.go

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"strings"
1414

1515
tcontext "github.com/pingcap/dumpling/v4/context"
16+
"github.com/pingcap/dumpling/v4/log"
1617

1718
"github.com/go-sql-driver/mysql"
1819
"github.com/pingcap/errors"
@@ -585,23 +586,21 @@ func GetPdAddrs(tctx *tcontext.Context, db *sql.DB) ([]string, error) {
585586
query := "SELECT * FROM information_schema.cluster_info where type = 'pd';"
586587
rows, err := db.QueryContext(tctx, query)
587588
if err != nil {
588-
tctx.L().Warn("can't execute query from db",
589-
zap.String("query", query), zap.Error(err))
590589
return []string{}, errors.Annotatef(err, "sql: %s", query)
591590
}
592-
return GetSpecifiedColumnValueAndClose(rows, "STATUS_ADDRESS")
591+
pdAddrs, err := GetSpecifiedColumnValueAndClose(rows, "STATUS_ADDRESS")
592+
return pdAddrs, errors.Annotatef(err, "sql: %s", query)
593593
}
594594

595595
// GetTiDBDDLIDs gets DDL IDs from TiDB
596596
func GetTiDBDDLIDs(tctx *tcontext.Context, db *sql.DB) ([]string, error) {
597597
query := "SELECT * FROM information_schema.tidb_servers_info;"
598598
rows, err := db.QueryContext(tctx, query)
599599
if err != nil {
600-
tctx.L().Warn("can't execute query from db",
601-
zap.String("query", query), zap.Error(err))
602600
return []string{}, errors.Annotatef(err, "sql: %s", query)
603601
}
604-
return GetSpecifiedColumnValueAndClose(rows, "DDL_ID")
602+
ddlIDs, err := GetSpecifiedColumnValueAndClose(rows, "DDL_ID")
603+
return ddlIDs, errors.Annotatef(err, "sql: %s", query)
605604
}
606605

607606
// CheckTiDBWithTiKV use sql to check whether current TiDB has TiKV
@@ -611,7 +610,9 @@ func CheckTiDBWithTiKV(db *sql.DB) (bool, error) {
611610
row := db.QueryRow(query)
612611
err := row.Scan(&count)
613612
if err != nil {
614-
return false, errors.Annotatef(err, "sql: %s", query)
613+
// still return true here. Because sometimes users may not have privileges for MySQL.TiDB database
614+
// In most production cases TiDB has TiKV
615+
return true, errors.Annotatef(err, "sql: %s", query)
615616
}
616617
return count > 0, nil
617618
}
@@ -995,22 +996,22 @@ func estimateCount(tctx *tcontext.Context, dbName, tableName string, db *sql.Con
995996
func detectEstimateRows(tctx *tcontext.Context, db *sql.Conn, query string, fieldNames []string) uint64 {
996997
rows, err := db.QueryContext(tctx, query)
997998
if err != nil {
998-
tctx.L().Warn("can't execute query from db",
999-
zap.String("query", query), zap.Error(err))
999+
tctx.L().Warn("can't detect estimate rows from db",
1000+
zap.String("query", query), log.ShortError(err))
10001001
return 0
10011002
}
10021003
defer rows.Close()
10031004
rows.Next()
10041005
columns, err := rows.Columns()
10051006
if err != nil {
10061007
tctx.L().Warn("can't get columns from db",
1007-
zap.String("query", query), zap.Error(err))
1008+
zap.String("query", query), log.ShortError(err))
10081009
return 0
10091010
}
10101011
err = rows.Err()
10111012
if err != nil {
10121013
tctx.L().Warn("rows meet some error during the query",
1013-
zap.String("query", query), zap.Error(err))
1014+
zap.String("query", query), log.ShortError(err))
10141015
return 0
10151016
}
10161017
addr := make([]interface{}, len(columns))
@@ -1031,14 +1032,14 @@ found:
10311032
err = rows.Scan(addr...)
10321033
if err != nil || fieldIndex < 0 {
10331034
tctx.L().Warn("can't get estimate count from db",
1034-
zap.String("query", query), zap.Error(err))
1035+
zap.String("query", query), log.ShortError(err))
10351036
return 0
10361037
}
10371038

10381039
estRows, err := strconv.ParseFloat(oneRow[fieldIndex].String, 64)
10391040
if err != nil {
10401041
tctx.L().Warn("can't get parse rows from db",
1041-
zap.String("query", query), zap.Error(err))
1042+
zap.String("query", query), log.ShortError(err))
10421043
return 0
10431044
}
10441045
return uint64(estRows)

v4/export/writer_util.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"time"
1414

1515
tcontext "github.com/pingcap/dumpling/v4/context"
16+
"github.com/pingcap/dumpling/v4/log"
1617

1718
"github.com/pingcap/br/pkg/storage"
1819
"github.com/pingcap/br/pkg/summary"
@@ -177,12 +178,12 @@ func WriteInsert(pCtx *tcontext.Context, cfg *Config, meta TableMeta, tblIR Tabl
177178

178179
defer func() {
179180
if err != nil {
180-
pCtx.L().Warn("fail to dumping table(chunk), will revert some metrics now",
181-
zap.Error(err),
181+
pCtx.L().Warn("fail to dumping table(chunk), will revert some metrics and start a retry if possible",
182182
zap.String("database", meta.DatabaseName()),
183183
zap.String("table", meta.TableName()),
184184
zap.Uint64("finished rows", lastCounter),
185-
zap.Uint64("finished size", wp.finishedFileSize))
185+
zap.Uint64("finished size", wp.finishedFileSize),
186+
log.ShortError(err))
186187
SubGauge(finishedRowsGauge, cfg.Labels, float64(lastCounter))
187188
SubGauge(finishedSizeGauge, cfg.Labels, float64(wp.finishedFileSize))
188189
} else {
@@ -315,12 +316,12 @@ func WriteInsertInCsv(pCtx *tcontext.Context, cfg *Config, meta TableMeta, tblIR
315316

316317
defer func() {
317318
if err != nil {
318-
pCtx.L().Warn("fail to dumping table(chunk), will revert some metrics now",
319-
zap.Error(err),
319+
pCtx.L().Warn("fail to dumping table(chunk), will revert some metrics and start a retry if possible",
320320
zap.String("database", meta.DatabaseName()),
321321
zap.String("table", meta.TableName()),
322322
zap.Uint64("finished rows", lastCounter),
323-
zap.Uint64("finished size", wp.finishedFileSize))
323+
zap.Uint64("finished size", wp.finishedFileSize),
324+
log.ShortError(err))
324325
SubGauge(finishedRowsGauge, cfg.Labels, float64(lastCounter))
325326
SubGauge(finishedSizeGauge, cfg.Labels, float64(wp.finishedFileSize))
326327
} else {

v4/log/log.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,3 +59,12 @@ func InitAppLogger(cfg *Config) (Logger, *pclog.ZapProperties, error) {
5959
func NewAppLogger(logger *zap.Logger) Logger {
6060
return Logger{logger}
6161
}
62+
63+
// ShortError contructs a field which only records the error message without the
64+
// verbose text (i.e. excludes the stack trace).
65+
func ShortError(err error) zap.Field {
66+
if err == nil {
67+
return zap.Skip()
68+
}
69+
return zap.String("error", err.Error())
70+
}

0 commit comments

Comments
 (0)