Skip to content

Commit 5973095

Browse files
committed
add 25.10 to CI/CD,add support object_disk.VersionFullObjectKey=5, fix #1290
Signed-off-by: Slach <bloodjazman@gmail.com>
1 parent 1529d42 commit 5973095

File tree

5 files changed

+88
-33
lines changed

5 files changed

+88
-33
lines changed

.github/workflows/build.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,7 @@ jobs:
219219
- '24.8'
220220
- '25.3'
221221
- '25.8'
222+
- '25.10'
222223
steps:
223224
- name: Checkout project
224225
uses: actions/checkout@v4

pkg/backup/create.go

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1034,24 +1034,31 @@ func (b *Backuper) uploadObjectDiskParts(ctx context.Context, backupName string,
10341034
}
10351035
}
10361036
uploadObjectDiskPartsWorkingGroup.Go(func() error {
1037-
objPartFileMeta, readMetadataErr := object_disk.ReadMetadataFromFile(fPath)
1037+
objMeta, readMetadataErr := object_disk.ReadMetadataFromFile(fPath)
10381038
if readMetadataErr != nil {
10391039
return readMetadataErr
10401040
}
1041-
for _, storageObject := range objPartFileMeta.StorageObjects {
1041+
for _, storageObject := range objMeta.StorageObjects {
10421042
if storageObject.ObjectSize == 0 {
10431043
continue
10441044
}
10451045
var copyObjectErr error
1046-
srcKey := path.Join(srcDiskConnection.GetRemotePath(), storageObject.ObjectRelativePath)
1046+
1047+
// 25.10+ contains full path need make it relative, https://github.com/Altinity/clickhouse-backup/issues/1290
1048+
if storageObject.IsAbsolute && srcDiskConnection.GetRemotePath() != "" && srcDiskConnection.GetRemotePath() != "/" && strings.Contains(storageObject.ObjectPath, srcDiskConnection.GetRemotePath()) {
1049+
storageObject.ObjectPath = strings.TrimPrefix(storageObject.ObjectPath, srcDiskConnection.GetRemotePath())
1050+
}
1051+
1052+
srcKey := path.Join(srcDiskConnection.GetRemotePath(), storageObject.ObjectPath)
1053+
10471054
if b.resume {
10481055
isAlreadyProcesses := false
10491056
isAlreadyProcesses, objSize = b.resumableState.IsAlreadyProcessed(path.Join(srcBucket, srcKey))
10501057
if isAlreadyProcesses {
10511058
continue
10521059
}
10531060
}
1054-
dstKey := path.Join(backupName, disk.Name, storageObject.ObjectRelativePath)
1061+
dstKey := path.Join(backupName, disk.Name, storageObject.ObjectPath)
10551062
if !b.cfg.General.AllowObjectDiskStreaming {
10561063
retry := retrier.New(retrier.ExponentialBackoff(b.cfg.General.RetriesOnFailure, common.AddRandomJitter(b.cfg.General.RetriesDuration, b.cfg.General.RetriesJitter)), b)
10571064
copyObjectErr = retry.RunCtx(uploadCtx, func(ctx context.Context) error {
@@ -1087,10 +1094,10 @@ func (b *Backuper) uploadObjectDiskParts(ctx context.Context, backupName string,
10871094
}
10881095
realSize += objSize
10891096
}
1090-
if realSize > objPartFileMeta.TotalSize {
1097+
if realSize > objMeta.TotalSize {
10911098
atomic.AddInt64(&size, realSize)
10921099
} else {
1093-
atomic.AddInt64(&size, objPartFileMeta.TotalSize)
1100+
atomic.AddInt64(&size, objMeta.TotalSize)
10941101
}
10951102
return nil
10961103
})

pkg/backup/delete.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ func (b *Backuper) cleanLocalEmbedded(ctx context.Context, backup LocalBackup, d
235235
return err
236236
}
237237
for _, o := range meta.StorageObjects {
238-
err = object_disk.DeleteFile(ctx, b.cfg.ClickHouse.EmbeddedBackupDisk, o.ObjectRelativePath)
238+
err = object_disk.DeleteFile(ctx, b.cfg.ClickHouse.EmbeddedBackupDisk, o.ObjectPath)
239239
if err != nil {
240240
return err
241241
}
@@ -378,7 +378,7 @@ func (b *Backuper) cleanRemoteEmbedded(ctx context.Context, backup storage.Backu
378378
return err
379379
}
380380
for _, o := range meta.StorageObjects {
381-
if err = object_disk.DeleteFile(ctx, b.cfg.ClickHouse.EmbeddedBackupDisk, o.ObjectRelativePath); err != nil {
381+
if err = object_disk.DeleteFile(ctx, b.cfg.ClickHouse.EmbeddedBackupDisk, o.ObjectPath); err != nil {
382382
return err
383383
}
384384
}

pkg/backup/restore.go

Lines changed: 37 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2077,14 +2077,38 @@ func (b *Backuper) downloadObjectDiskParts(ctx context.Context, backupName strin
20772077
if err != nil {
20782078
return err
20792079
}
2080-
if objMeta.StorageObjectCount < 1 && objMeta.Version < object_disk.VersionRelativePath {
2080+
if objMeta.StorageObjectCount < 1 && (objMeta.Version < object_disk.VersionRelativePath || objMeta.Version != object_disk.VersionFullObjectKey) {
20812081
return fmt.Errorf("%s: invalid object_disk.Metadata: %#v", fPath, objMeta)
20822082
}
2083+
needObjMetaRewrite := false
20832084
//to allow deleting Object Disk Data during DROP TABLE/DATABASE ...SYNC
20842085
if objMeta.RefCount > 0 || objMeta.ReadOnly {
20852086
objMeta.RefCount = 0
20862087
objMeta.ReadOnly = false
20872088
logger.Debug().Msgf("%s %#v set RefCount=0 and ReadOnly=0", fPath, objMeta.StorageObjects)
2089+
needObjMetaRewrite = true
2090+
}
2091+
// 25.10+ contains full path with old disk, need rewrite it with destination disk path, https://github.com/Altinity/clickhouse-backup/issues/1290
2092+
for storageObjIdx, storageObject := range objMeta.StorageObjects {
2093+
if storageObject.ObjectSize == 0 || !storageObject.IsAbsolute {
2094+
continue
2095+
}
2096+
dstConnection, ok := object_disk.DisksConnections.Load(dstDiskName)
2097+
if !ok {
2098+
return errors.WithStack(fmt.Errorf("can't find %s in object_disk.DiskConnections", diskName))
2099+
}
2100+
if strings.HasPrefix(storageObject.ObjectPath, dstConnection.GetRemotePath()) {
2101+
continue
2102+
}
2103+
objPathParts := strings.Split(storageObject.ObjectPath, "/")
2104+
// 25.10, full path for azblob we will write to container, so path will not contain /, https://github.com/Altinity/clickhouse-backup/issues/1290
2105+
if len(objPathParts) >= 2 {
2106+
objMeta.StorageObjects[storageObjIdx].ObjectPath = dstConnection.GetRemotePath() + "/" + strings.Join(objPathParts[len(objPathParts)-2:], "/")
2107+
needObjMetaRewrite = true
2108+
}
2109+
}
2110+
2111+
if needObjMetaRewrite {
20882112
if writeMetaErr := object_disk.WriteMetadataToFile(objMeta, fPath); writeMetaErr != nil {
20892113
return fmt.Errorf("%s: object_disk.WriteMetadataToFile return error: %v", fPath, writeMetaErr)
20902114
}
@@ -2099,7 +2123,15 @@ func (b *Backuper) downloadObjectDiskParts(ctx context.Context, backupName strin
20992123
if objectDiskPathErr != nil {
21002124
return objectDiskPathErr
21012125
}
2102-
srcKey = path.Join(objectDiskPath, srcBackupName, srcDiskName, storageObject.ObjectRelativePath)
2126+
// 25.10+ contains full path, need make it relative again, for properly copy, https://github.com/Altinity/clickhouse-backup/issues/1290
2127+
if storageObject.IsAbsolute {
2128+
objPathParts := strings.Split(storageObject.ObjectPath, "/")
2129+
if len(objPathParts) >= 2 {
2130+
storageObject.ObjectPath = strings.Join(objPathParts[len(objPathParts)-2:], "/")
2131+
}
2132+
}
2133+
2134+
srcKey = path.Join(objectDiskPath, srcBackupName, srcDiskName, storageObject.ObjectPath)
21032135
srcBucket = ""
21042136
if b.cfg.General.RemoteStorage == "s3" {
21052137
srcBucket = b.cfg.S3.Bucket
@@ -2114,7 +2146,7 @@ func (b *Backuper) downloadObjectDiskParts(ctx context.Context, backupName strin
21142146
retry := retrier.New(retrier.ExponentialBackoff(b.cfg.General.RetriesOnFailure, common.AddRandomJitter(b.cfg.General.RetriesDuration, b.cfg.General.RetriesJitter)), b)
21152147
copyObjectErr = retry.RunCtx(downloadCtx, func(ctx context.Context) error {
21162148
var retryErr error
2117-
copiedSize, retryErr = object_disk.CopyObject(downloadCtx, dstDiskName, storageObject.ObjectSize, srcBucket, srcKey, storageObject.ObjectRelativePath)
2149+
copiedSize, retryErr = object_disk.CopyObject(downloadCtx, dstDiskName, storageObject.ObjectSize, srcBucket, srcKey, storageObject.ObjectPath)
21182150
return retryErr
21192151
})
21202152
if copyObjectErr != nil {
@@ -2123,7 +2155,7 @@ func (b *Backuper) downloadObjectDiskParts(ctx context.Context, backupName strin
21232155
} else {
21242156
copyObjectErr = nil
21252157
if srcBucket != "" && !isCopyFailed.Load() {
2126-
copiedSize, copyObjectErr = object_disk.CopyObject(downloadCtx, dstDiskName, storageObject.ObjectSize, srcBucket, srcKey, storageObject.ObjectRelativePath)
2158+
copiedSize, copyObjectErr = object_disk.CopyObject(downloadCtx, dstDiskName, storageObject.ObjectSize, srcBucket, srcKey, storageObject.ObjectPath)
21272159
if copyObjectErr != nil {
21282160
isCopyFailed.Store(true)
21292161
log.Warn().Msgf("object_disk.CopyObject `%s`.`%s` error: %v, will try streaming via local memory (possible high network traffic)", backupTable.Database, backupTable.Table, copyObjectErr)
@@ -2137,7 +2169,7 @@ func (b *Backuper) downloadObjectDiskParts(ctx context.Context, backupName strin
21372169
return fmt.Errorf("unknown object_disk.DisksConnections %s", dstDiskName)
21382170
}
21392171
dstStorage := dstConnection.GetRemoteStorage()
2140-
dstKey := path.Join(dstConnection.GetRemoteObjectDiskPath(), storageObject.ObjectRelativePath)
2172+
dstKey := path.Join(dstConnection.GetRemoteObjectDiskPath(), storageObject.ObjectPath)
21412173
retry := retrier.New(retrier.ExponentialBackoff(b.cfg.General.RetriesOnFailure, common.AddRandomJitter(b.cfg.General.RetriesDuration, b.cfg.General.RetriesJitter)), b)
21422174
copyObjectErr = retry.RunCtx(downloadCtx, func(ctx context.Context) error {
21432175
return object_disk.CopyObjectStreaming(downloadCtx, srcStorage, dstStorage, srcKey, dstKey)

pkg/storage/object_disk/object_disk.go

Lines changed: 35 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"github.com/Altinity/clickhouse-backup/v2/pkg/config"
2020
"github.com/Altinity/clickhouse-backup/v2/pkg/storage"
2121
"github.com/antchfx/xmlquery"
22+
"github.com/pkg/errors"
2223
"github.com/puzpuzpuz/xsync"
2324
"github.com/rs/zerolog/log"
2425
)
@@ -31,11 +32,13 @@ const (
3132
VersionRelativePath MetadataVersion = 2
3233
VersionReadOnlyFlag MetadataVersion = 3
3334
VersionInlineData MetadataVersion = 4
35+
VersionFullObjectKey MetadataVersion = 5
3436
)
3537

3638
type StorageObject struct {
37-
ObjectSize int64
38-
ObjectRelativePath string
39+
IsAbsolute bool
40+
ObjectSize int64
41+
ObjectPath string
3942
}
4043

4144
type Metadata struct {
@@ -92,7 +95,6 @@ func ReadBoolText(scanner *bufio.Scanner) (bool, error) {
9295

9396
func (m *Metadata) readFromFile(file io.Reader) error {
9497

95-
objectStorageRootPath := ""
9698
scanner := bufio.NewScanner(file)
9799
// todo think about, resize scanner's capacity for lines over 64K
98100
scanner.Split(bufio.ScanWords)
@@ -103,37 +105,44 @@ func (m *Metadata) readFromFile(file io.Reader) error {
103105
return err
104106
}
105107

106-
if version < int(VersionAbsolutePaths) || version > int(VersionInlineData) {
107-
return fmt.Errorf("invalid metadata.Version=%v", m.Version)
108+
if version < int(VersionAbsolutePaths) || version > int(VersionFullObjectKey) {
109+
return fmt.Errorf("invalid metadata.Version=%v", version)
108110
}
109111

110112
m.Version = MetadataVersion(version)
111113

112114
m.StorageObjectCount, err = ReadIntText(scanner)
115+
if err != nil {
116+
return errors.WithStack(errors.Wrap(err, "can't read StorageObjectCount"))
117+
}
113118

114119
m.TotalSize, err = ReadInt64Text(scanner)
120+
if err != nil {
121+
return errors.WithStack(errors.Wrap(err, "can't read TotalSize"))
122+
}
115123

116124
for i := 0; i < m.StorageObjectCount; i++ {
117-
118-
objectSize, _ := ReadInt64Text(scanner)
119-
scanner.Scan()
120-
objectRelativePath := scanner.Text()
121-
122-
if version == int(VersionAbsolutePaths) {
123-
if !strings.HasPrefix(objectRelativePath, objectStorageRootPath) {
124-
return fmt.Errorf("%s doesn't contains %s", objectRelativePath, objectStorageRootPath)
125-
}
126-
objectRelativePath = objectRelativePath[len(objectStorageRootPath):]
125+
storageObject := StorageObject{IsAbsolute: m.Version == VersionAbsolutePaths || m.Version == VersionFullObjectKey}
126+
storageObject.ObjectSize, err = ReadInt64Text(scanner)
127+
if err != nil {
128+
return errors.WithStack(errors.Wrap(err, "can't read ObjectSize"))
127129
}
128130

129-
m.StorageObjects = append(m.StorageObjects, StorageObject{ObjectSize: objectSize, ObjectRelativePath: objectRelativePath})
130-
131+
scanner.Scan()
132+
storageObject.ObjectPath = scanner.Text()
133+
m.StorageObjects = append(m.StorageObjects, storageObject)
131134
}
132135

133136
m.RefCount, err = ReadIntText(scanner)
137+
if err != nil {
138+
return errors.WithStack(errors.Wrap(err, "can't read TotalSize"))
139+
}
134140

135141
if version >= int(VersionReadOnlyFlag) {
136142
m.ReadOnly, err = ReadBoolText(scanner)
143+
if err != nil {
144+
return errors.WithStack(errors.Wrap(err, "can't read ReadOnly"))
145+
}
137146
}
138147

139148
if version >= int(VersionInlineData) {
@@ -156,7 +165,7 @@ func (m *Metadata) writeToFile(file *os.File) error {
156165
}
157166

158167
for i := 0; i < m.StorageObjectCount; i++ {
159-
if _, err = file.WriteString(strconv.FormatInt(m.StorageObjects[i].ObjectSize, 10) + "\t" + m.StorageObjects[i].ObjectRelativePath + "\n"); err != nil {
168+
if _, err = file.WriteString(strconv.FormatInt(m.StorageObjects[i].ObjectSize, 10) + "\t" + m.StorageObjects[i].ObjectPath + "\n"); err != nil {
160169
return err
161170
}
162171
}
@@ -322,6 +331,9 @@ func getObjectDisksCredentials(ctx context.Context, ch *clickhouse.ClickHouse) e
322331
return err
323332
}
324333
root := xmlquery.FindOne(doc, "/")
334+
if root == nil {
335+
return fmt.Errorf("object_disk->getObjectDisksCredentials")
336+
}
325337
disks := xmlquery.Find(doc, fmt.Sprintf("/%s/storage_configuration/disks/*", root.Data))
326338
for _, d := range disks {
327339
diskName := d.Data
@@ -624,7 +636,7 @@ func ConvertLocalPathToRemote(diskName, localPath string) (string, error) {
624636
if err != nil {
625637
return "", err
626638
}
627-
return meta.StorageObjects[0].ObjectRelativePath, nil
639+
return meta.StorageObjects[0].ObjectPath, nil
628640
}
629641

630642
func GetFileReader(ctx context.Context, diskName, remotePath string) (io.ReadCloser, error) {
@@ -721,7 +733,10 @@ func GetFileSize(ctx context.Context, ch *clickhouse.ClickHouse, cfg *config.Con
721733
*/
722734

723735
func CopyObject(ctx context.Context, diskName string, srcSize int64, srcBucket, srcKey, dstPath string) (int64, error) {
724-
connection, _ := DisksConnections.Load(diskName)
736+
connection, ok := DisksConnections.Load(diskName)
737+
if !ok {
738+
return 0, errors.WithStack(fmt.Errorf("can't find %s in object_disk.DiskConnections", diskName))
739+
}
725740
remoteStorage := connection.GetRemoteStorage()
726741
return remoteStorage.CopyObject(ctx, srcSize, srcBucket, srcKey, dstPath)
727742
}

0 commit comments

Comments
 (0)