Skip to content

Commit 07fcc81

Browse files
committed
object_disk.CopyObjectStreaming can't copy files with size more 5Gb to S3, cause pass localSize 0 into PutFileAbsolute, fix #1176
Signed-off-by: Slach <[email protected]>
1 parent be086ef commit 07fcc81

File tree

9 files changed

+45
-13
lines changed

9 files changed

+45
-13
lines changed

ChangeLog.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
# v2.6.26
2+
BUG FIXES
3+
- `object_disk.CopyObjectStreaming` can't copy files with size more 5Gb to S3, cause pass localSize 0 into PutFileAbsolute, fix [1176](https://github.com/Altinity/clickhouse-backup/issues/1176)
4+
15
# v2.6.25
26
IMPROVEMENTS
37
- change retries from constant to exponential backoff and add RETRIES_JITTER configuration option, to avoid same time retries from parallel operation

pkg/storage/azblob.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -229,8 +229,12 @@ func (a *AzureBlob) DeleteFileFromObjectDiskBackup(ctx context.Context, key stri
229229
}
230230

231231
func (a *AzureBlob) StatFile(ctx context.Context, key string) (RemoteFile, error) {
232-
a.logf("AZBLOB->StatFile %s", key)
233-
blob := a.Container.NewBlockBlobURL(path.Join(a.Config.Path, key))
232+
return a.StatFileAbsolute(ctx, path.Join(a.Config.Path, key))
233+
}
234+
235+
func (a *AzureBlob) StatFileAbsolute(ctx context.Context, key string) (RemoteFile, error) {
236+
a.logf("AZBLOB->StatFileAbsolute %s", key)
237+
blob := a.Container.NewBlockBlobURL(key)
234238
r, err := blob.GetProperties(ctx, azblob.BlobAccessConditions{}, a.CPK)
235239
if err != nil {
236240
var se azblob.StorageError

pkg/storage/cos.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,12 @@ func (c *COS) Close(ctx context.Context) error {
6767
}
6868

6969
func (c *COS) StatFile(ctx context.Context, key string) (RemoteFile, error) {
70-
// file max size is 5Gb
71-
resp, err := c.client.Object.Get(ctx, path.Join(c.Config.Path, key), nil)
70+
return c.StatFileAbsolute(ctx, path.Join(c.Config.Path, key))
71+
}
72+
73+
func (c *COS) StatFileAbsolute(ctx context.Context, key string) (RemoteFile, error) {
74+
// @todo - COS Stat file max size is 5Gb
75+
resp, err := c.client.Object.Get(ctx, key, nil)
7276
if err != nil {
7377
var cosErr *cos.ErrorResponse
7478
ok := errors.As(err, &cosErr)

pkg/storage/ftp.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,12 @@ func (f *FTP) returnConnectionToPool(ctx context.Context, where string, client *
8383
}
8484

8585
func (f *FTP) StatFile(ctx context.Context, key string) (RemoteFile, error) {
86-
// cant list files, so check the dir
87-
dir := path.Dir(path.Join(f.Config.Path, key))
86+
return f.StatFileAbsolute(ctx, path.Join(f.Config.Path, key))
87+
}
88+
89+
func (f *FTP) StatFileAbsolute(ctx context.Context, key string) (RemoteFile, error) {
90+
// can't list files, so check the dir
91+
dir := path.Dir(key)
8892
client, err := f.getConnectionFromPool(ctx, fmt.Sprintf("StatFile, key=%s", key))
8993
if err != nil {
9094
return nil, err
@@ -98,7 +102,7 @@ func (f *FTP) StatFile(ctx context.Context, key string) (RemoteFile, error) {
98102
}
99103
return nil, err
100104
}
101-
file := path.Base(path.Join(f.Config.Path, key))
105+
file := path.Base(key)
102106
for i := range entries {
103107
if file == entries[i].Name {
104108
// file found, return it

pkg/storage/gcs.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -292,7 +292,11 @@ func (gcs *GCS) PutFileAbsolute(ctx context.Context, key string, r io.ReadCloser
292292
}
293293

294294
func (gcs *GCS) StatFile(ctx context.Context, key string) (RemoteFile, error) {
295-
objAttr, err := gcs.client.Bucket(gcs.Config.Bucket).Object(path.Join(gcs.Config.Path, key)).Attrs(ctx)
295+
return gcs.StatFileAbsolute(ctx, path.Join(gcs.Config.Path, key))
296+
}
297+
298+
func (gcs *GCS) StatFileAbsolute(ctx context.Context, key string) (RemoteFile, error) {
299+
objAttr, err := gcs.client.Bucket(gcs.Config.Bucket).Object(key).Attrs(ctx)
296300
if err != nil {
297301
if errors.Is(err, storage.ErrObjectNotExist) {
298302
return nil, ErrNotFound

pkg/storage/object_disk/object_disk.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -727,6 +727,11 @@ func CopyObject(ctx context.Context, diskName string, srcSize int64, srcBucket,
727727
}
728728

729729
func CopyObjectStreaming(ctx context.Context, srcStorage storage.RemoteStorage, dstStorage storage.RemoteStorage, srcKey, dstKey string) error {
730+
srcInfo, statErr := srcStorage.StatFileAbsolute(ctx, srcKey)
731+
if statErr != nil {
732+
return fmt.Errorf("srcStorage.StatFileReaderAbsolute(%s) error: %v", srcKey, statErr)
733+
}
734+
730735
srcReader, srcErr := srcStorage.GetFileReaderAbsolute(ctx, srcKey)
731736
if srcErr != nil {
732737
return fmt.Errorf("srcStorage.GetFileReaderAbsolute(%s) error: %v", srcKey, srcErr)
@@ -736,7 +741,7 @@ func CopyObjectStreaming(ctx context.Context, srcStorage storage.RemoteStorage,
736741
log.Error().Msgf("srcReader.Close(%s) error: %v", srcKey, closeErr)
737742
}
738743
}()
739-
if putErr := dstStorage.PutFileAbsolute(ctx, dstKey, srcReader, 0); putErr != nil {
744+
if putErr := dstStorage.PutFileAbsolute(ctx, dstKey, srcReader, srcInfo.Size()); putErr != nil {
740745
return fmt.Errorf("dstStorage.PutFileAbsolute(%s) error: %v", dstKey, putErr)
741746
}
742747
return nil

pkg/storage/s3.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -405,9 +405,13 @@ func (s *S3) getObjectAllVersions(ctx context.Context, key string) ([]string, er
405405
}
406406

407407
func (s *S3) StatFile(ctx context.Context, key string) (RemoteFile, error) {
408+
return s.StatFileAbsolute(ctx, path.Join(s.Config.Path, key))
409+
}
410+
411+
func (s *S3) StatFileAbsolute(ctx context.Context, key string) (RemoteFile, error) {
408412
params := &s3.HeadObjectInput{
409413
Bucket: aws.String(s.Config.Bucket),
410-
Key: aws.String(path.Join(s.Config.Path, key)),
414+
Key: aws.String(key),
411415
}
412416
s.enrichHeadParams(params)
413417
head, err := s.client.HeadObject(ctx, params)

pkg/storage/sftp.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -100,11 +100,13 @@ func (sftp *SFTP) Close(ctx context.Context) error {
100100
}
101101

102102
func (sftp *SFTP) StatFile(ctx context.Context, key string) (RemoteFile, error) {
103-
filePath := path.Join(sftp.Config.Path, key)
103+
return sftp.StatFileAbsolute(ctx, path.Join(sftp.Config.Path, key))
104+
}
104105

105-
stat, err := sftp.sftpClient.Stat(filePath)
106+
func (sftp *SFTP) StatFileAbsolute(ctx context.Context, key string) (RemoteFile, error) {
107+
stat, err := sftp.sftpClient.Stat(key)
106108
if err != nil {
107-
sftp.Debug("[SFTP_DEBUG] StatFile::STAT %s return error %v", filePath, err)
109+
sftp.Debug("[SFTP_DEBUG] StatFile::STAT %s return error %v", key, err)
108110
if strings.Contains(err.Error(), "not exist") {
109111
return nil, ErrNotFound
110112
}

pkg/storage/structs.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ type RemoteStorage interface {
2525
Connect(ctx context.Context) error
2626
Close(ctx context.Context) error
2727
StatFile(ctx context.Context, key string) (RemoteFile, error)
28+
StatFileAbsolute(ctx context.Context, key string) (RemoteFile, error)
2829
DeleteFile(ctx context.Context, key string) error
2930
DeleteFileFromObjectDiskBackup(ctx context.Context, key string) error
3031
Walk(ctx context.Context, prefix string, recursive bool, fn func(context.Context, RemoteFile) error) error

0 commit comments

Comments
 (0)