Skip to content

Commit dfea431

Browse files
committed
Merge branch 'master' of github.com:Altinity/clickhouse-backup into xavierleune/master
2 parents 002ba34 + bbf63b2 commit dfea431

File tree

16 files changed

+189
-160
lines changed

16 files changed

+189
-160
lines changed

pkg/backup/backuper.go

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,7 @@ type Backuper struct {
5050
}
5151

5252
func NewBackuper(cfg *config.Config, opts ...BackuperOpt) *Backuper {
53-
ch := &clickhouse.ClickHouse{
54-
Config: &cfg.ClickHouse,
55-
}
53+
ch := clickhouse.NewClickHouse(&cfg.ClickHouse)
5654
b := &Backuper{
5755
cfg: cfg,
5856
ch: ch,
@@ -217,13 +215,13 @@ func (b *Backuper) getEmbeddedRestoreSettings(version int) []string {
217215
if (b.cfg.General.RemoteStorage == "s3" || b.cfg.General.RemoteStorage == "gcs") && version >= 23007000 {
218216
settings = append(settings, "allow_s3_native_copy=1")
219217
if err := b.ch.Query("SET s3_request_timeout_ms=600000"); err != nil {
220-
log.Fatal().Msgf("SET s3_request_timeout_ms=600000 error: %v", err)
218+
log.Fatal().Stack().Msgf("SET s3_request_timeout_ms=600000 error: %v", err)
221219
}
222220

223221
}
224222
if (b.cfg.General.RemoteStorage == "s3" || b.cfg.General.RemoteStorage == "gcs") && version >= 23011000 {
225223
if err := b.ch.Query("SET s3_use_adaptive_timeouts=0"); err != nil {
226-
log.Fatal().Msgf("SET s3_use_adaptive_timeouts=0 error: %v", err)
224+
log.Fatal().Stack().Msgf("SET s3_use_adaptive_timeouts=0 error: %v", err)
227225
}
228226
}
229227
return settings
@@ -234,13 +232,13 @@ func (b *Backuper) getEmbeddedBackupSettings(version int) []string {
234232
if (b.cfg.General.RemoteStorage == "s3" || b.cfg.General.RemoteStorage == "gcs") && version >= 23007000 {
235233
settings = append(settings, "allow_s3_native_copy=1")
236234
if err := b.ch.Query("SET s3_request_timeout_ms=600000"); err != nil {
237-
log.Fatal().Msgf("SET s3_request_timeout_ms=600000 error: %v", err)
235+
log.Fatal().Stack().Msgf("SET s3_request_timeout_ms=600000 error: %v", err)
238236
}
239237

240238
}
241239
if (b.cfg.General.RemoteStorage == "s3" || b.cfg.General.RemoteStorage == "gcs") && version >= 23011000 {
242240
if err := b.ch.Query("SET s3_use_adaptive_timeouts=0"); err != nil {
243-
log.Fatal().Msgf("SET s3_use_adaptive_timeouts=0 error: %v", err)
241+
log.Fatal().Stack().Msgf("SET s3_use_adaptive_timeouts=0 error: %v", err)
244242
}
245243
}
246244
if b.cfg.General.RemoteStorage == "azblob" && version >= 24005000 && b.cfg.ClickHouse.EmbeddedBackupDisk == "" {
@@ -436,6 +434,12 @@ func (b *Backuper) getTablesDiffFromRemote(ctx context.Context, diffFromRemote s
436434

437435
func (b *Backuper) GetLocalDataSize(ctx context.Context) (float64, error) {
438436
localDataSize := float64(0)
437+
if !b.ch.IsOpen {
438+
if connectErr := b.ch.Connect(); connectErr != nil {
439+
return 0, errors.WithStack(connectErr)
440+
}
441+
defer b.ch.Close()
442+
}
439443
err := b.ch.SelectSingleRow(ctx, &localDataSize, "SELECT value FROM system.asynchronous_metrics WHERE metric='TotalBytesOfMergeTreeTables'")
440444
return localDataSize, err
441445
}

pkg/backup/create.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -190,23 +190,23 @@ func (b *Backuper) createConfigsNamedCollectionsAndRBACIfNecessary(ctx context.C
190190
if createRBAC || rbacOnly {
191191
var createRBACErr error
192192
if backupRBACSize, createRBACErr = b.createBackupRBAC(ctx, backupPath, disks); createRBACErr != nil {
193-
log.Fatal().Msgf("error during do RBAC backup: %v", createRBACErr)
193+
log.Fatal().Stack().Msgf("error during do RBAC backup: %v", createRBACErr)
194194
} else {
195195
log.Info().Str("size", utils.FormatBytes(backupRBACSize)).Msg("done createBackupRBAC")
196196
}
197197
}
198198
if createConfigs || configsOnly {
199199
var createConfigsErr error
200200
if backupConfigSize, createConfigsErr = b.createBackupConfigs(ctx, backupPath); createConfigsErr != nil {
201-
log.Fatal().Msgf("error during do CONFIG backup: %v", createConfigsErr)
201+
log.Fatal().Stack().Msgf("error during do CONFIG backup: %v", createConfigsErr)
202202
} else {
203203
log.Info().Str("size", utils.FormatBytes(backupConfigSize)).Msg("done createBackupConfigs")
204204
}
205205
}
206206
if createNamedCollections || namedCollectionsOnly {
207207
var createNamedCollectionsErr error
208208
if backupNamedCollectionsSize, createNamedCollectionsErr = b.createBackupNamedCollections(ctx, backupPath); createNamedCollectionsErr != nil {
209-
log.Fatal().Msgf("error during do NamedCollections backup: %v", createNamedCollectionsErr)
209+
log.Fatal().Stack().Msgf("error during do NamedCollections backup: %v", createNamedCollectionsErr)
210210
} else {
211211
log.Info().Str("size", utils.FormatBytes(backupNamedCollectionsSize)).Msg("done createBackupNamedCollections")
212212
}

pkg/backup/download.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ func (b *Backuper) Download(backupName string, tablePattern string, partitions [
6868

6969
if backupName == "" {
7070
remoteBackups := b.CollectRemoteBackups(ctx, "all")
71-
_ = b.PrintBackup(remoteBackups, "all", "text")
71+
_ = b.PrintBackup(remoteBackups, "text")
7272
return fmt.Errorf("select backup for download")
7373
}
7474
localBackups, disks, err := b.GetLocalBackups(ctx, nil)

pkg/backup/list.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -76,10 +76,10 @@ func (b *Backuper) List(what, ptype, format string) error {
7676
case "all", "":
7777
backupInfos = append(backupInfos, b.CollectAllBackups(ctx, ptype)...)
7878
}
79-
return b.PrintBackup(backupInfos, ptype, format)
79+
return b.PrintBackup(backupInfos, format)
8080
}
8181

82-
func (b *Backuper) PrintBackup(backupInfos []BackupInfo, ptype, format string) error {
82+
func (b *Backuper) PrintBackup(backupInfos []BackupInfo, format string) error {
8383
w := tabwriter.NewWriter(os.Stdout, 0, 0, 3, ' ', tabwriter.DiscardEmptyColumns)
8484
switch format {
8585
case "json":
@@ -329,14 +329,14 @@ func (b *Backuper) GetLocalBackups(ctx context.Context, disks []clickhouse.Disk)
329329
var err error
330330
if !b.ch.IsOpen {
331331
if err = b.ch.Connect(); err != nil {
332-
return nil, nil, err
332+
return nil, nil, errors.WithStack(err)
333333
}
334334
defer b.ch.Close()
335335
}
336336
if disks == nil {
337337
disks, err = b.ch.GetDisks(ctx, true)
338338
if err != nil {
339-
return nil, nil, err
339+
return nil, nil, errors.WithStack(err)
340340
}
341341
}
342342
if disks == nil {
@@ -393,11 +393,11 @@ func (b *Backuper) GetLocalBackups(ctx context.Context, disks []clickhouse.Disk)
393393
if os.IsNotExist(openErr) {
394394
return result, disks, nil
395395
}
396-
return nil, nil, openErr
396+
return nil, nil, errors.WithStack(openErr)
397397
}
398398
names, err := d.Readdirnames(-1)
399399
if err != nil {
400-
return nil, nil, err
400+
return nil, nil, errors.WithStack(err)
401401
}
402402
for _, name := range names {
403403
info, err := os.Stat(path.Join(backupPath, name))

pkg/backup/restore.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ func (b *Backuper) Restore(backupName, tablePattern string, databaseMapping, tab
9292

9393
if backupName == "" {
9494
localBackups := b.CollectLocalBackups(ctx, "all")
95-
_ = b.PrintBackup(localBackups, "all", "text")
95+
_ = b.PrintBackup(localBackups, "text")
9696
return fmt.Errorf("select backup for restore")
9797
}
9898
disks, err := b.ch.GetDisks(ctx, true)
@@ -444,9 +444,12 @@ func (b *Backuper) restoreEmptyDatabase(ctx context.Context, targetDB, tablePatt
444444
if version > 20011000 {
445445
sync = "SYNC"
446446
}
447-
if _, err := os.Create(path.Join(b.DefaultDataPath, "/flags/force_drop_table")); err != nil {
448-
return err
447+
var f *os.File
448+
var createErr error
449+
if f, createErr = os.Create(path.Join(b.DefaultDataPath, "/flags/force_drop_table")); createErr != nil {
450+
return createErr
449451
}
452+
_ = f.Close()
450453
if err := b.ch.QueryContext(ctx, fmt.Sprintf("DROP DATABASE IF EXISTS `%s` %s %s %s", targetDB, onCluster, sync, settings)); err != nil {
451454
return err
452455
}
@@ -1713,17 +1716,17 @@ func (b *Backuper) checkReplicaAlreadyExistsAndChangeReplicationPath(ctx context
17131716
var settingsValues map[string]string
17141717
settingsValues, err = b.ch.GetSettingsValues(ctx, []interface{}{"default_replica_path", "default_replica_name"})
17151718
if err != nil {
1716-
log.Fatal().Msgf("can't get from `system.settings` -> `default_replica_path`, `default_replica_name` error: %v", err)
1719+
log.Fatal().Stack().Msgf("can't get from `system.settings` -> `default_replica_path`, `default_replica_name` error: %v", err)
17171720
}
17181721
replicaPath = settingsValues["default_replica_path"]
17191722
replicaName = settingsValues["default_replica_name"]
17201723
}
17211724
var resolvedReplicaPath, resolvedReplicaName string
17221725
if resolvedReplicaPath, err = b.ch.ApplyMacros(ctx, replicaPath); err != nil {
1723-
log.Fatal().Msgf("can't ApplyMacros to %s error: %v", replicaPath, err)
1726+
log.Fatal().Stack().Msgf("can't ApplyMacros to %s error: %v", replicaPath, err)
17241727
}
17251728
if resolvedReplicaName, err = b.ch.ApplyMacros(ctx, replicaName); err != nil {
1726-
log.Fatal().Msgf("can't ApplyMacros to %s error: %v", replicaPath, err)
1729+
log.Fatal().Stack().Msgf("can't ApplyMacros to %s error: %v", replicaPath, err)
17271730
}
17281731
if matches = replicatedUuidRE.FindAllStringSubmatch(schema.Query, 1); len(matches) > 0 {
17291732
resolvedReplicaPath = strings.Replace(resolvedReplicaPath, "{uuid}", matches[0][1], -1)

pkg/backup/upload.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -371,7 +371,7 @@ func (b *Backuper) validateUploadParams(ctx context.Context, backupName string,
371371
}
372372
if backupName == "" {
373373
localBackups := b.CollectLocalBackups(ctx, "all")
374-
_ = b.PrintBackup(localBackups, "all", "text")
374+
_ = b.PrintBackup(localBackups, "text")
375375
return fmt.Errorf("select backup for upload")
376376
}
377377
if b.cfg.General.UploadConcurrency == 0 {
@@ -717,14 +717,14 @@ func (b *Backuper) uploadTableMetadataEmbedded(ctx context.Context, backupName s
717717
return 0, err
718718
}
719719
}
720-
if info, err = os.Stat(localTableMetaFile); err != nil {
721-
return 0, err
722-
}
723720
defer func() {
724-
if err := localReader.Close(); err != nil {
725-
log.Warn().Msgf("can't close %v: %v", localReader, err)
721+
if closeErr := localReader.Close(); closeErr != nil {
722+
log.Warn().Msgf("can't close %v: %v", localReader, closeErr)
726723
}
727724
}()
725+
if info, err = os.Stat(localTableMetaFile); err != nil {
726+
return 0, err
727+
}
728728
retry := retrier.New(retrier.ExponentialBackoff(b.cfg.General.RetriesOnFailure, common.AddRandomJitter(b.cfg.General.RetriesDuration, b.cfg.General.RetriesJitter)), b)
729729
err = retry.RunCtx(ctx, func(ctx context.Context) error {
730730
return b.dst.PutFile(ctx, remoteTableMetaFile, localReader, 0)

pkg/backup/watch.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,11 +71,18 @@ func (b *Backuper) ValidateWatchParams(watchInterval, fullInterval, watchBackupN
7171
func (b *Backuper) Watch(watchInterval, fullInterval, watchBackupNameTemplate, tablePattern string, partitions, skipProjections []string, schemaOnly, backupRBAC, backupConfigs, backupNamedCollections, skipCheckPartsColumns, deleteSource bool, version string, commandId int, metrics *metrics.APIMetrics, cliCtx *cli.Context) error {
7272
ctx, cancel, err := status.Current.GetContextWithCancel(commandId)
7373
if err != nil {
74-
return err
74+
return errors.WithStack(err)
7575
}
7676
ctx, cancel = context.WithCancel(ctx)
7777
defer cancel()
7878

79+
if !b.ch.IsOpen {
80+
if err = b.ch.Connect(); err != nil {
81+
return errors.WithStack(err)
82+
}
83+
defer b.ch.Close()
84+
}
85+
7986
if err := b.ValidateWatchParams(watchInterval, fullInterval, watchBackupNameTemplate); err != nil {
8087
return err
8188
}

pkg/clickhouse/clickhouse.go

Lines changed: 39 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"regexp"
1313
"strconv"
1414
"strings"
15+
"sync"
1516
"time"
1617

1718
"github.com/Altinity/clickhouse-backup/v2/pkg/common"
@@ -31,13 +32,24 @@ import (
3132
type ClickHouse struct {
3233
Config *config.ClickHouseConfig
3334
conn driver.Conn
35+
ConnMutex *sync.Mutex
3436
version int
3537
IsOpen bool
3638
BreakConnectOnError bool
3739
}
3840

41+
func NewClickHouse(cfg *config.ClickHouseConfig) *ClickHouse {
42+
return &ClickHouse{
43+
Config: cfg,
44+
ConnMutex: &sync.Mutex{},
45+
}
46+
}
47+
3948
// Connect - establish connection to ClickHouse
4049
func (ch *ClickHouse) Connect() error {
50+
ch.ConnMutex.Lock()
51+
defer ch.ConnMutex.Unlock()
52+
4153
if ch.IsOpen {
4254
if err := ch.conn.Close(); err != nil {
4355
log.Error().Msgf("close previous connection error: %v", err)
@@ -121,20 +133,19 @@ func (ch *ClickHouse) Connect() error {
121133
break
122134
}
123135
if ch.BreakConnectOnError {
124-
return err
136+
return errors.WithStack(err)
125137
}
126138
log.Warn().Msgf("clickhouse connection: %s, sql.Open return error: %v, will wait 5 second to reconnect", fmt.Sprintf("tcp://%v:%v", ch.Config.Host, ch.Config.Port), err)
127139
time.Sleep(5 * time.Second)
128140
}
129-
log.WithLevel(logLevel).Msgf("clickhouse connection prepared: %s run ping", fmt.Sprintf("tcp://%v:%v", ch.Config.Host, ch.Config.Port))
130141
err = ch.conn.Ping(context.Background())
131142
if err == nil {
132143
log.WithLevel(logLevel).Msgf("clickhouse connection success: %s", fmt.Sprintf("tcp://%v:%v", ch.Config.Host, ch.Config.Port))
133144
ch.IsOpen = true
134145
break
135146
}
136147
if ch.BreakConnectOnError {
137-
return err
148+
return errors.WithStack(err)
138149
}
139150
log.Warn().Msgf("clickhouse connection ping: %s return error: %v, will wait 5 second to reconnect", fmt.Sprintf("tcp://%v:%v", ch.Config.Host, ch.Config.Port), err)
140151
time.Sleep(5 * time.Second)
@@ -551,22 +562,19 @@ func (ch *ClickHouse) GetDatabases(ctx context.Context, cfg *config.Config, tabl
551562
if tablePattern != "" {
552563
bypassTablesPatterns = append(bypassTablesPatterns, strings.Split(tablePattern, ",")...)
553564
}
554-
for _, pattern := range skipTablesPatterns {
555-
pattern = strings.Trim(pattern, " \r\t\n")
556-
if strings.HasSuffix(pattern, ".*") {
557-
skipDatabases = common.AddStringToSliceIfNotExists(skipDatabases, strings.Trim(strings.TrimSuffix(pattern, ".*"), "\"` "))
558-
} else {
559-
skipDatabases = common.AddStringToSliceIfNotExists(skipDatabases, strings.Trim(tableNameSuffixRE.ReplaceAllString(pattern, ""), "\"` "))
560-
}
561-
}
562-
for _, pattern := range bypassTablesPatterns {
563-
pattern = strings.Trim(pattern, " \r\t\n")
564-
if strings.HasSuffix(pattern, ".*") {
565-
bypassDatabases = common.AddStringToSliceIfNotExists(bypassDatabases, strings.Trim(strings.TrimSuffix(pattern, ".*"), "\"` "))
566-
} else {
567-
bypassDatabases = common.AddStringToSliceIfNotExists(bypassDatabases, strings.Trim(tableNameSuffixRE.ReplaceAllString(pattern, ""), "\"` "))
565+
processDbPatterns := func(databases []string, patterns []string) []string {
566+
for _, pattern := range patterns {
567+
pattern = strings.Trim(pattern, " \r\t\n")
568+
if strings.HasSuffix(pattern, ".*") {
569+
databases = common.AddStringToSliceIfNotExists(databases, strings.Trim(strings.TrimSuffix(pattern, ".*"), "\"` "))
570+
} else {
571+
databases = common.AddStringToSliceIfNotExists(databases, strings.Trim(tableNameSuffixRE.ReplaceAllString(pattern, ""), "\"` "))
572+
}
568573
}
574+
return databases
569575
}
576+
skipDatabases = processDbPatterns(skipDatabases, skipTablesPatterns)
577+
bypassDatabases = processDbPatterns(bypassDatabases, bypassTablesPatterns)
570578
metadataPath, err := ch.getMetadataPath(ctx)
571579
if err != nil {
572580
return nil, err
@@ -608,12 +616,14 @@ func (ch *ClickHouse) GetDatabases(ctx context.Context, cfg *config.Config, tabl
608616
} else {
609617
// 23.3+ masked secrets https://github.com/Altinity/clickhouse-backup/issues/640
610618
if strings.Contains(result, "'[HIDDEN]'") {
611-
if attachSQL, err := os.ReadFile(path.Join(metadataPath, common.TablePathEncode(db.Name)+".sql")); err != nil {
612-
return nil, err
613-
} else {
614-
result = strings.Replace(string(attachSQL), "ATTACH", "CREATE", 1)
615-
result = strings.Replace(result, " _ ", " `"+db.Name+"` ", 1)
619+
var attachSQL []byte
620+
var readErr error
621+
if attachSQL, readErr = os.ReadFile(path.Join(metadataPath, common.TablePathEncode(db.Name)+".sql")); readErr != nil {
622+
return nil, errors.WithStack(readErr)
616623
}
624+
625+
result = strings.Replace(string(attachSQL), "ATTACH", "CREATE", 1)
626+
result = strings.Replace(result, " _ ", " `"+db.Name+"` ", 1)
617627
}
618628
allDatabases[i].Query = result
619629
}
@@ -951,9 +961,12 @@ func (ch *ClickHouse) DropOrDetachTable(table Table, query, onCluster string, ig
951961
dropQuery += " SETTINGS check_table_dependencies=0"
952962
}
953963
if defaultDataPath != "" && !useDetach {
954-
if _, err = os.Create(path.Join(defaultDataPath, "/flags/force_drop_table")); err != nil {
955-
return err
964+
var f *os.File
965+
var createErr error
966+
if f, createErr = os.Create(path.Join(defaultDataPath, "/flags/force_drop_table")); createErr != nil {
967+
return errors.WithStack(createErr)
956968
}
969+
_ = f.Close()
957970
}
958971
if err = ch.Query(dropQuery); err != nil {
959972
return err
@@ -1371,10 +1384,9 @@ func (ch *ClickHouse) CheckReplicationInProgress(table metadata.TableMetadata) (
13711384
}
13721385
// https://github.com/Altinity/clickhouse-backup/issues/967
13731386
if existsReplicas[0].LogPointer > 2 || existsReplicas[0].LogMaxIndex > 1 || existsReplicas[0].AbsoluteDelay > 0 || existsReplicas[0].QueueSize > 0 {
1374-
return false, fmt.Errorf("%s.%s can't restore cause system.replicas entries already exists and replication in progress from another replica, log_pointer=%d, log_max_index=%d, absolute_delay=%d, queue_size=%d", table.Database, table.Table, existsReplicas[0].LogPointer, existsReplicas[0].LogMaxIndex, existsReplicas[0].AbsoluteDelay, existsReplicas[0].QueueSize)
1375-
} else {
1376-
log.Info().Msgf("replication_in_progress status = %+v", existsReplicas)
1387+
return false, errors.WithStack(fmt.Errorf("%s.%s can't restore cause system.replicas entries already exists and replication in progress from another replica, log_pointer=%d, log_max_index=%d, absolute_delay=%d, queue_size=%d", table.Database, table.Table, existsReplicas[0].LogPointer, existsReplicas[0].LogMaxIndex, existsReplicas[0].AbsoluteDelay, existsReplicas[0].QueueSize))
13771388
}
1389+
log.Info().Msgf("replication_in_progress status = %+v", existsReplicas)
13781390
}
13791391
return true, nil
13801392
}

0 commit comments

Comments
 (0)