Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions br/pkg/logutil/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package logutil

import (
"bytes"
"encoding/hex"
"encoding/json"
"fmt"
Expand Down Expand Up @@ -222,6 +223,44 @@ func (m zapSSTMetasMarshaler) MarshalLogArray(encoder zapcore.ArrayEncoder) erro
return nil
}

// Describes the overall range of the SST metas and their size.
func BriefSSTMetas(key string, sstMetas []*import_sstpb.SSTMeta) zap.Field {
var (
startKey, endKey []byte
total int
totalSize uint64
totalKv uint64
totalKvSize uint64
)

for _, meta := range sstMetas {
if total == 0 {
startKey = meta.GetRange().GetStart()
endKey = meta.GetRange().GetEnd()
}
if bytes.Compare(meta.GetRange().GetStart(), startKey) < 0 {
startKey = meta.GetRange().GetStart()
}
// NOTE: SST meta shouldn't has an empty end key?
if bytes.Compare(meta.GetRange().GetEnd(), endKey) > 0 {
endKey = meta.GetRange().GetEnd()
}
totalSize += meta.GetLength()
totalKv += meta.GetTotalKvs()
totalKvSize += meta.GetTotalBytes()
total++
}
return zap.Object(key, zapcore.ObjectMarshalerFunc(func(enc zapcore.ObjectEncoder) error {
enc.AddInt("total", total)
enc.AddString("startKey", redact.Key(startKey))
enc.AddString("endKey", redact.Key(endKey))
enc.AddUint64("totalSize", totalSize)
enc.AddUint64("totalKvs", totalKv)
enc.AddUint64("totalKvSize", totalKvSize)
return nil
}))
}

// SSTMetas make the zap fields for SST metas.
func SSTMetas(sstMetas []*import_sstpb.SSTMeta) zap.Field {
return zap.Array("sstMetas", zapSSTMetasMarshaler(sstMetas))
Expand Down
12 changes: 10 additions & 2 deletions br/pkg/restore/log_client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -907,6 +907,7 @@ func (rc *LogClient) RestoreKVFiles(
var applyWg sync.WaitGroup
eg, ectx := errgroup.WithContext(ctx)
applyFunc := func(files []*LogDataFileInfo, kvCount int64, size uint64) {
cnt := 0
if len(files) == 0 {
return
}
Expand All @@ -920,6 +921,9 @@ func (rc *LogClient) RestoreKVFiles(
skipFile += len(files)
} else {
applyWg.Add(1)
cnt += 1
i := cnt
ectx := logutil.ContextWithField(ectx, zap.Int("sn", i))
rc.logRestoreManager.workerPool.ApplyOnErrorGroup(eg, func() (err error) {
fileStart := time.Now()
defer applyWg.Done()
Expand All @@ -930,16 +934,20 @@ func (rc *LogClient) RestoreKVFiles(

if err == nil {
filenames := make([]string, 0, len(files))
maxTs, minTs := uint64(0), uint64(math.MaxUint64)
for _, f := range files {
filenames = append(filenames, f.Path+", ")
maxTs = max(f.MaxTs, maxTs)
minTs = min(f.MinTs, minTs)
filenames = append(filenames, f.Path)
if rc.logRestoreManager.checkpointRunner != nil {
if e := checkpoint.AppendRangeForLogRestore(ectx, rc.logRestoreManager.checkpointRunner, f.MetaDataGroupName, rule.NewTableID, f.OffsetInMetaGroup, f.OffsetInMergedGroup); e != nil {
err = errors.Annotate(e, "failed to append checkpoint data")
break
}
}
}
log.Info("import files done", zap.Int("batch-count", len(files)), zap.Uint64("batch-size", size),
logutil.CL(ectx).Info("import files done", zap.Int("batch-count", len(files)), zap.Uint64("batch-size", size),
zap.Uint64("min-ts", minTs), zap.Uint64("max-ts", maxTs), zap.String("cf", files[0].Cf),
zap.Duration("take", time.Since(fileStart)), zap.Strings("files", filenames))
}
}()
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/restore/log_client/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (importer *LogFileImporter) ImportKVFiles(
}
}

log.Debug("rewrite file keys",
logutil.CL(ctx).Debug("rewrite file keys",
logutil.Key("startKey", startKey), logutil.Key("endKey", endKey))

// This RetryState will retry 45 time, about 10 min.
Expand Down
6 changes: 6 additions & 0 deletions br/pkg/restore/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,9 @@ func GroupOverlappedBackupFileSetsIter(ctx context.Context, regionClient split.S
}
if !inOneRegion && len(thisBatchBackupFileSet) > 0 {
// not in the same region, so this batch backup file set can be output
log.Info("generating one batch.", zap.Int("size", len(thisBatchBackupFileSet)),
zap.Binary("from", lastEndKey),
zap.Binary("to", file.startKey))
fn(thisBatchBackupFileSet)
thisBatchBackupFileSet = make([]BackupFileSet, 0)
}
Expand Down Expand Up @@ -527,6 +530,9 @@ func GroupOverlappedBackupFileSetsIter(ctx context.Context, regionClient split.S
}
// output the last batch backup file set
if len(thisBatchBackupFileSet) > 0 {
log.Info("generating one batch.", zap.Int("size", len(thisBatchBackupFileSet)),
zap.Binary("from", lastEndKey),
zap.Binary("to", []byte{}))
fn(thisBatchBackupFileSet)
}
return nil
Expand Down
6 changes: 5 additions & 1 deletion br/pkg/restore/restorer.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,10 @@ func (s *BatchRestorer) WaitUntilFinish() error {

func (s *BatchRestorer) GoRestore(onProgress func(int64), batchFileSets ...BatchBackupFileSet) error {
s.workerPool.ApplyOnErrorGroup(s.eg, func() error {
counter := 0
return GroupOverlappedBackupFileSetsIter(s.ectx, s.regionClient, slices.Concat(batchFileSets...), func(batchSet BatchBackupFileSet) {
i := counter
counter += 1
s.workerPool.ApplyOnErrorGroup(s.eg, func() (restoreErr error) {
fileStart := time.Now()
defer func() {
Expand All @@ -272,7 +275,8 @@ func (s *BatchRestorer) GoRestore(onProgress func(int64), batchFileSets ...Batch
}
}
}()
err := s.batchFileImporter.Import(s.ectx, batchSet...)
cx := logutil.ContextWithField(s.ectx, zap.Int("batch#", i))
err := s.batchFileImporter.Import(cx, batchSet...)
if err != nil {
return errors.Trace(err)
}
Expand Down
21 changes: 15 additions & 6 deletions br/pkg/restore/snap_client/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,9 +433,10 @@ func (importer *SnapFileImporter) Import(
logutil.CL(ctx).Debug("scan regions", logutil.Key("start key", startKey), logutil.Key("end key", endKey), zap.Int("count", len(regionInfos)), zap.Int("pool size", workerpoolsize))
start := time.Now()
// Try to download and ingest the file in every region
for _, regionInfo := range regionInfos {
for i, regionInfo := range regionInfos {
info := regionInfo
// Try to download file.
ectx := logutil.ContextWithField(ectx, zap.Int("region#", i), zap.Int("total#", len(regionInfos)))
workerpool.ApplyOnErrorGroup(eg, func() error {
downloadMetas, errDownload := importer.download(ectx, info, backupFileSets, importer.cipher, importer.apiVersion)
if errDownload != nil {
Expand Down Expand Up @@ -463,7 +464,7 @@ func (importer *SnapFileImporter) Import(
})
}
return eg.Wait()
}, utils.NewImportSSTBackoffer())
}, utils.VerboseRetry(utils.NewImportSSTBackoffer(), logutil.CL(ctx)))
if err != nil {
logutil.CL(ctx).Error("import sst file failed after retry, stop the whole progress", restore.ZapBatchBackupFileSet(backupFileSets), zap.Error(err))
return errors.Trace(err)
Expand Down Expand Up @@ -752,17 +753,24 @@ func (importer *SnapFileImporter) batchDownloadSST(
defer func() {
importer.releaseToken(tokenCh)
}()
for _, downloadReqMap := range downloadReqs {
for _, req := range downloadReqMap {
for i, downloadReqMap := range downloadReqs {
logger0 := logutil.CL(ectx).With(zap.Int("filegroup#", i), zap.Int("filegroup.total#", len(downloadReqs)))
for j, req := range downloadReqMap {
var err error
var resp *import_sstpb.DownloadResponse
resp, err = utils.WithRetryV2(ectx, utils.NewDownloadSSTBackoffer(), func(ctx context.Context) (*import_sstpb.DownloadResponse, error) {
logger := logger0.With(zap.String("reqName", j))
resp, err = utils.WithRetryV2(ectx, utils.VerboseRetry(utils.NewDownloadSSTBackoffer(), logger), func(ctx context.Context) (*import_sstpb.DownloadResponse, error) {
dctx, cancel := context.WithTimeout(ctx, gRPCTimeOut)
defer cancel()
if len(req.Ssts) == 0 {
// fallback to single download
return importer.importClient.DownloadSST(dctx, peer.GetStoreId(), req)
}
logger.Info("Sending batch download SST request.",
zap.Uint64("store_id", peer.GetStoreId()),
logutil.BriefSSTMetas("ssts", maps.Values(req.Ssts)),
logutil.Region(regionInfo.Region),
)
return importer.importClient.BatchDownloadSST(dctx, peer.GetStoreId(), req)
})
if err != nil {
Expand All @@ -772,7 +780,7 @@ func (importer *SnapFileImporter) batchDownloadSST(
return errors.Annotate(berrors.ErrKVDownloadFailed, resp.GetError().GetMessage())
}
if resp.GetIsEmpty() {
log.Warn("download file skipped", zap.String("filename", req.Name),
logger.Warn("download file skipped", zap.String("filename", req.Name),
logutil.Region(regionInfo.Region), zap.Error(berrors.ErrKVRangeIsEmpty))
continue
}
Expand Down Expand Up @@ -999,6 +1007,7 @@ func (importer *SnapFileImporter) ingest(
errPb := ingestResp.GetError()
switch {
case errPb == nil:
logutil.CL(ctx).Info("finish ingesting into a region.", logutil.Region(info.Region), zap.Int("sst", len(downloadMetas)))
return nil
case errPb.NotLeader != nil:
// If error is `NotLeader`, update the region info and retry
Expand Down
6 changes: 6 additions & 0 deletions br/pkg/restore/split/splitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/logutil"
restoreutils "github.com/pingcap/tidb/br/pkg/restore/utils"
"github.com/pingcap/tidb/pkg/tablecodec"
"github.com/pingcap/tidb/pkg/util"
Expand Down Expand Up @@ -363,10 +364,15 @@ func (r *PipelineRegionsSplitterImpl) splitRegionByPoints(
number int64 = initialNumber
)
for _, v := range valueds {
log.Debug("[split-point] accumlating a item for splitting.", zap.Stringer("range", logutil.StringifyRangeOf(v.GetStartKey(), v.GetEndKey())),
zap.Uint64("size", v.Value.Size), zap.Int64("number", v.Value.Number))
// decode will discard ts behind the key, which results in the same key for consecutive ranges
if !bytes.Equal(lastKey, v.GetStartKey()) && (v.Value.Size+length > r.splitThresholdSize || v.Value.Number+number > r.splitThresholdKeys) {
_, rawKey, _ := codec.DecodeBytes(v.GetStartKey(), nil)
splitPoints = append(splitPoints, rawKey)
log.Info("[split-point] added split key for region.",
logutil.Region(region.Region), logutil.Key("split", rawKey),
zap.Uint64("accumulated-size", length), zap.Int64("accumulated-keys", number))
length = 0
number = 0
}
Expand Down