Skip to content

Commit 8ca4889

Browse files
authored
Merge pull request #1310 from xavierleune/master
fix S3 multipart operations to respect S3_MAX_PARTS_COUNT
2 parents bbf63b2 + 088e891 commit 8ca4889

File tree

4 files changed

+32
-23
lines changed

4 files changed

+32
-23
lines changed

ChangeLog.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
# v2.6.42
2+
BUG FIXES
3+
- fix S3 multipart operations (upload, download, copy) to respect `S3_MAX_PARTS_COUNT` instead of hardcoded 10000 value, allows S3-compatible backends with stricter limits
4+
15
# v2.6.41
26
BUG FIXES
37
- improve restore long RBAC which have length more 64k, fix [1305](https://github.com/Altinity/clickhouse-backup/issues/1305)

pkg/backup/backuper.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -372,9 +372,9 @@ func (b *Backuper) getObjectDiskPath() (string, error) {
372372
return b.cfg.FTP.ObjectDiskPath, nil
373373
} else if b.cfg.General.RemoteStorage == "sftp" {
374374
return b.cfg.SFTP.ObjectDiskPath, nil
375-
} else {
376-
return "", fmt.Errorf("cleanBackupObjectDisks: requesst object disks path but have unsupported remote_storage: %s", b.cfg.General.RemoteStorage)
377375
}
376+
377+
return "", errors.WithStack(fmt.Errorf("cleanBackupObjectDisks: requesst object disks path but have unsupported remote_storage: %s", b.cfg.General.RemoteStorage))
378378
}
379379

380380
func (b *Backuper) getTablesDiffFromLocal(ctx context.Context, diffFrom string, tablePattern string) (tablesForUploadFromDiff map[metadata.TableTitle]metadata.TableMetadata, err error) {

pkg/backup/upload.go

Lines changed: 23 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -89,9 +89,9 @@ func (b *Backuper) Upload(backupName string, deleteSource bool, diffFrom, diffFr
8989
if backupName == remoteBackups[i].BackupName {
9090
if !b.resume {
9191
return fmt.Errorf("'%s' already exists on remote storage", backupName)
92-
} else {
93-
log.Warn().Msgf("'%s' already exists on remote, will try to resume upload", backupName)
9492
}
93+
94+
log.Warn().Msgf("'%s' already exists on remote, will try to resume upload", backupName)
9595
}
9696
}
9797
backupMetadata, err := b.ReadBackupMetadataLocal(ctx, backupName)
@@ -582,14 +582,16 @@ func (b *Backuper) uploadTableData(ctx context.Context, backupName string, delet
582582
}
583583
}
584584
log.Debug().Msgf("start upload %d files to %s", len(partFiles), remotePath)
585-
if uploadPathBytes, err := b.dst.UploadPath(ctx, backupPath, partFiles, remotePath, b.cfg.General.RetriesOnFailure, b.cfg.General.RetriesDuration, b.cfg.General.RetriesJitter, b, b.cfg.General.UploadMaxBytesPerSecond); err != nil {
585+
var uploadPathBytes int64
586+
var err error
587+
if uploadPathBytes, err = b.dst.UploadPath(ctx, backupPath, partFiles, remotePath, b.cfg.General.RetriesOnFailure, b.cfg.General.RetriesDuration, b.cfg.General.RetriesJitter, b, b.cfg.General.UploadMaxBytesPerSecond); err != nil {
586588
log.Error().Msgf("UploadPath return error: %v", err)
587589
return errors.Wrap(err, "can't upload")
588-
} else {
589-
atomic.AddInt64(&uploadedBytes, uploadPathBytes)
590-
if b.resume {
591-
b.resumableState.AppendToState(remotePathFull, uploadPathBytes)
592-
}
590+
}
591+
592+
atomic.AddInt64(&uploadedBytes, uploadPathBytes)
593+
if b.resume {
594+
b.resumableState.AppendToState(remotePathFull, uploadPathBytes)
593595
}
594596
// https://github.com/Altinity/clickhouse-backup/issues/777
595597
if deleteSource {
@@ -659,12 +661,16 @@ func (b *Backuper) uploadTableData(ctx context.Context, backupName string, delet
659661

660662
func (b *Backuper) uploadTableMetadata(ctx context.Context, backupName string, requiredBackupName string, tableMetadata *metadata.TableMetadata) (int64, error) {
661663
if b.isEmbedded {
662-
if sqlSize, err := b.uploadTableMetadataEmbedded(ctx, backupName, requiredBackupName, *tableMetadata); err != nil {
663-
return sqlSize, err
664-
} else {
665-
jsonSize, err := b.uploadTableMetadataRegular(ctx, backupName, *tableMetadata)
666-
return sqlSize + jsonSize, err
664+
var sqlSize, jsonSize int64
665+
var err error
666+
if sqlSize, err = b.uploadTableMetadataEmbedded(ctx, backupName, requiredBackupName, *tableMetadata); err != nil {
667+
return sqlSize, errors.WithStack(err)
667668
}
669+
jsonSize, err = b.uploadTableMetadataRegular(ctx, backupName, *tableMetadata)
670+
if err != nil {
671+
err = errors.WithStack(err)
672+
}
673+
return sqlSize + jsonSize, err
668674
}
669675
return b.uploadTableMetadataRegular(ctx, backupName, *tableMetadata)
670676
}
@@ -711,11 +717,11 @@ func (b *Backuper) uploadTableMetadataEmbedded(ctx context.Context, backupName s
711717
if err != nil {
712718
err = errors.Wrapf(err, "can't open %s", localTableMetaFile)
713719
if requiredBackupName != "" {
714-
log.Warn().Err(err).Send()
720+
log.Warn().Stack().Err(err).Send()
715721
return 0, nil
716-
} else {
717-
return 0, err
718722
}
723+
724+
return 0, err
719725
}
720726
defer func() {
721727
if closeErr := localReader.Close(); closeErr != nil {
@@ -812,9 +818,8 @@ bodyRead:
812818
func (b *Backuper) splitPartFiles(basePath string, parts []metadata.Part, database, table string, skipProjections []string) ([]metadata.SplitPartFiles, error) {
813819
if b.cfg.General.UploadByPart {
814820
return b.splitFilesByName(basePath, parts, database, table, skipProjections)
815-
} else {
816-
return b.splitFilesBySize(basePath, parts, database, table, skipProjections)
817821
}
822+
return b.splitFilesBySize(basePath, parts, database, table, skipProjections)
818823
}
819824

820825
func (b *Backuper) splitFilesByName(basePath string, parts []metadata.Part, database, table string, skipProjections []string) ([]metadata.SplitPartFiles, error) {

pkg/storage/s3.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ func (s *S3) GetFileReaderWithLocalPath(ctx context.Context, key, localPath stri
258258
downloader.Concurrency = s.Concurrency
259259
downloader.BufferProvider = s3manager.NewPooledBufferedWriterReadFromProvider(s.BufferSize)
260260
var partSize int64
261-
if s.Config.ChunkSize > 0 && (remoteSize+s.Config.ChunkSize-1)/s.Config.ChunkSize < 10000 {
261+
if s.Config.ChunkSize > 0 && (remoteSize+s.Config.ChunkSize-1)/s.Config.ChunkSize < s.Config.MaxPartsCount {
262262
// Use configured chunk size
263263
partSize = s.Config.ChunkSize
264264
} else {
@@ -334,7 +334,7 @@ func (s *S3) PutFileAbsolute(ctx context.Context, key string, r io.ReadCloser, l
334334
uploader.Concurrency = s.Concurrency
335335
uploader.BufferProvider = s3manager.NewBufferedReadSeekerWriteToPool(s.BufferSize)
336336
var partSize int64
337-
if s.Config.ChunkSize > 0 && (localSize+s.Config.ChunkSize-1)/s.Config.ChunkSize < 10000 {
337+
if s.Config.ChunkSize > 0 && (localSize+s.Config.ChunkSize-1)/s.Config.ChunkSize < s.Config.MaxPartsCount {
338338
partSize = s.Config.ChunkSize
339339
} else {
340340
partSize = localSize / s.Config.MaxPartsCount
@@ -546,7 +546,7 @@ func (s *S3) CopyObject(ctx context.Context, srcSize int64, srcBucket, srcKey, d
546546

547547
// Set the part size (128 MB minimum for CopyObject, or use configured chunk size)
548548
var partSize int64
549-
if s.Config.ChunkSize > 0 && (srcSize+s.Config.ChunkSize-1)/s.Config.ChunkSize < 10000 {
549+
if s.Config.ChunkSize > 0 && (srcSize+s.Config.ChunkSize-1)/s.Config.ChunkSize < s.Config.MaxPartsCount {
550550
partSize = s.Config.ChunkSize
551551
} else {
552552
partSize = srcSize / s.Config.MaxPartsCount

0 commit comments

Comments
 (0)