Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions lightning/cmd/tidb-lightning-ctl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,6 @@ go_library(
deps = [
"//lightning/pkg/importer",
"//lightning/pkg/server",
"//pkg/lightning/backend",
"//pkg/lightning/backend/local",
"//pkg/lightning/checkpoints",
"//pkg/lightning/common",
"//pkg/lightning/config",
"//pkg/lightning/tikv",
Expand Down
203 changes: 38 additions & 165 deletions lightning/cmd/tidb-lightning-ctl/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,12 @@ import (
"flag"
"fmt"
"os"
"path/filepath"
"strings"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/tidb/lightning/pkg/importer"
"github.com/pingcap/tidb/lightning/pkg/server"
"github.com/pingcap/tidb/pkg/lightning/backend"
"github.com/pingcap/tidb/pkg/lightning/backend/local"
"github.com/pingcap/tidb/pkg/lightning/checkpoints"
"github.com/pingcap/tidb/pkg/lightning/common"
"github.com/pingcap/tidb/pkg/lightning/config"
"github.com/pingcap/tidb/pkg/lightning/tikv"
Expand Down Expand Up @@ -116,19 +112,52 @@ func run() error {
}

if len(*cpRemove) != 0 {
return errors.Trace(server.CheckpointRemove(ctx, cfg, *cpRemove))
ctl, err := server.NewCheckpointControl(cfg, tls)
if err != nil {
return errors.Trace(err)
}
return errors.Trace(ctl.Remove(ctx, *cpRemove))
}
if len(*cpErrIgnore) != 0 {
return errors.Trace(checkpointErrorIgnore(ctx, cfg, *cpErrIgnore))
ctl, err := server.NewCheckpointControl(cfg, tls)
if err != nil {
return errors.Trace(err)
}
return errors.Trace(ctl.IgnoreError(ctx, *cpErrIgnore))
}
if len(*cpErrDestroy) != 0 {
return errors.Trace(checkpointErrorDestroy(ctx, cfg, tls, *cpErrDestroy))
ctl, err := server.NewCheckpointControl(cfg, tls)
if err != nil {
return errors.Trace(err)
}
return errors.Trace(ctl.DestroyError(ctx, *cpErrDestroy))
}
if len(*cpDump) != 0 {
return errors.Trace(checkpointDump(ctx, cfg, *cpDump))
ctl, err := server.NewCheckpointControl(cfg, tls)
if err != nil {
return errors.Trace(err)
}
return errors.Trace(ctl.Dump(ctx, *cpDump))
}
if *localStoringTables {
return errors.Trace(getLocalStoringTables(ctx, cfg))
ctl, err := server.NewCheckpointControl(cfg, tls)
if err != nil {
return errors.Trace(err)
}
tables, err := ctl.GetLocalStoringTables(ctx)
if err != nil {
return errors.Trace(err)
}
if len(tables) == 0 {
fmt.Fprintln(os.Stderr, "No table has lost intermediate files according to given config")
} else {
tableNames := make([]string, 0, len(tables))
for name := range tables {
tableNames = append(tableNames, name)
}
fmt.Fprintln(os.Stderr, "These tables are missing intermediate files:", tableNames)
}
return nil
}

fsUsage()
Expand Down Expand Up @@ -162,159 +191,3 @@ func fetchMode(ctx context.Context, cli pdhttp.Client, tls *common.TLS) error {
},
)
}

func checkpointErrorIgnore(ctx context.Context, cfg *config.Config, tableName string) error {
cpdb, err := checkpoints.OpenCheckpointsDB(ctx, cfg)
if err != nil {
return errors.Trace(err)
}
//nolint: errcheck
defer cpdb.Close()

return errors.Trace(cpdb.IgnoreErrorCheckpoint(ctx, tableName))
}

func checkpointErrorDestroy(ctx context.Context, cfg *config.Config, tls *common.TLS, tableName string) error {
cpdb, err := checkpoints.OpenCheckpointsDB(ctx, cfg)
if err != nil {
return errors.Trace(err)
}
//nolint: errcheck
defer cpdb.Close()

target, err := importer.NewTiDBManager(ctx, cfg.TiDB, tls)
if err != nil {
return errors.Trace(err)
}
defer target.Close()

targetTables, err := cpdb.DestroyErrorCheckpoint(ctx, tableName)
if err != nil {
return errors.Trace(err)
}

var errs []error

for _, table := range targetTables {
fmt.Fprintln(os.Stderr, "Dropping table:", table.TableName)
err := target.DropTable(ctx, table.TableName)
if err != nil {
fmt.Fprintln(os.Stderr, "* Encountered error while dropping table:", err)
errs = append(errs, err)
}
}

// For importer backend, engine was stored in importer's memory, we can retrieve it from alive importer process.
// But in local backend, if we want to use common API `UnsafeCloseEngine` and `Cleanup`,
// we need either lightning process alive or engine map persistent.
// both of them seems unnecessary if we only need to do is cleanup specify engine directory.
// so we didn't choose to use common API.
if cfg.TikvImporter.Backend == config.BackendLocal {
for _, table := range targetTables {
for engineID := table.MinEngineID; engineID <= table.MaxEngineID; engineID++ {
fmt.Fprintln(os.Stderr, "Closing and cleaning up engine:", table.TableName, engineID)
_, eID := backend.MakeUUID(table.TableName, int64(engineID))
engine := local.Engine{UUID: eID}
err := engine.Cleanup(cfg.TikvImporter.SortedKVDir)
if err != nil {
fmt.Fprintln(os.Stderr, "* Encountered error while cleanup engine:", err)
errs = append(errs, err)
}
}
}
}

// try clean up metas
if len(errs) == 0 {
errs = append(errs, server.CleanupMetas(ctx, cfg, tableName))
}

return errors.Trace(errors.Join(errs...))
}

func checkpointDump(ctx context.Context, cfg *config.Config, dumpFolder string) error {
cpdb, err := checkpoints.OpenCheckpointsDB(ctx, cfg)
if err != nil {
return errors.Trace(err)
}
//nolint: errcheck
defer cpdb.Close()

if err := os.MkdirAll(dumpFolder, 0o750); err != nil {
return errors.Trace(err)
}

tablesFileName := filepath.Join(dumpFolder, "tables.csv")
tablesFile, err := os.Create(tablesFileName)
if err != nil {
return errors.Annotatef(err, "failed to create %s", tablesFileName)
}
defer tablesFile.Close()

enginesFileName := filepath.Join(dumpFolder, "engines.csv")
enginesFile, err := os.Create(enginesFileName)
if err != nil {
return errors.Annotatef(err, "failed to create %s", enginesFileName)
}
defer enginesFile.Close()

chunksFileName := filepath.Join(dumpFolder, "chunks.csv")
chunksFile, err := os.Create(chunksFileName)
if err != nil {
return errors.Annotatef(err, "failed to create %s", chunksFileName)
}
defer chunksFile.Close()

if err := cpdb.DumpTables(ctx, tablesFile); err != nil {
return errors.Trace(err)
}
if err := cpdb.DumpEngines(ctx, enginesFile); err != nil {
return errors.Trace(err)
}
if err := cpdb.DumpChunks(ctx, chunksFile); err != nil {
return errors.Trace(err)
}
return nil
}

func getLocalStoringTables(ctx context.Context, cfg *config.Config) (err2 error) {
//nolint: prealloc
var tables []string
defer func() {
if err2 == nil {
if len(tables) == 0 {
fmt.Fprintln(os.Stderr, "No table has lost intermediate files according to given config")
} else {
fmt.Fprintln(os.Stderr, "These tables are missing intermediate files:", tables)
}
}
}()

if cfg.TikvImporter.Backend != config.BackendLocal {
return nil
}
exist, err := checkpoints.IsCheckpointsDBExists(ctx, cfg)
if err != nil {
return errors.Trace(err)
}
if !exist {
return nil
}
cpdb, err := checkpoints.OpenCheckpointsDB(ctx, cfg)
if err != nil {
return errors.Trace(err)
}
//nolint: errcheck
defer cpdb.Close()

tableWithEngine, err := cpdb.GetLocalStoringTables(ctx)
if err != nil {
return errors.Trace(err)
}
tables = make([]string, 0, len(tableWithEngine))
for tableName := range tableWithEngine {
tables = append(tables, tableName)
}

return nil
}
8 changes: 7 additions & 1 deletion lightning/pkg/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "server",
srcs = [
"checkpoint_control.go",
"lightning.go",
"run_options.go",
"sigusr1_other.go",
Expand All @@ -15,8 +16,10 @@ go_library(
"//br/pkg/storage",
"//br/pkg/version/build",
"//lightning/pkg/importer",
"//lightning/pkg/importinto",
"//lightning/pkg/web",
"//pkg/expression",
"//pkg/lightning/backend",
"//pkg/lightning/backend/local",
"//pkg/lightning/checkpoints",
"//pkg/lightning/common",
Expand Down Expand Up @@ -51,14 +54,16 @@ go_test(
name = "server_test",
timeout = "short",
srcs = [
"checkpoint_control_test.go",
"lightning_serial_test.go",
"lightning_server_serial_test.go",
"main_test.go",
],
embed = [":server"],
flaky = True,
shard_count = 7,
shard_count = 10,
deps = [
"//lightning/pkg/importinto/mock",
"//lightning/pkg/web",
"//pkg/lightning/checkpoints",
"//pkg/lightning/config",
Expand All @@ -69,5 +74,6 @@ go_test(
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_stretchr_testify//require",
"@org_uber_go_mock//gomock",
],
)
Loading