Skip to content

Commit 9853048

Browse files
committed
fix ApplyMacros behavior for Embedded backup/restore
Signed-off-by: Slach <bloodjazman@gmail.com>
1 parent 88497d0 commit 9853048

File tree

2 files changed

+31
-19
lines changed

2 files changed

+31
-19
lines changed

pkg/backup/backuper.go

Lines changed: 28 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -254,10 +254,7 @@ func (b *Backuper) getEmbeddedBackupLocation(ctx context.Context, backupName str
254254
return fmt.Sprintf("Disk('%s','%s')", b.cfg.ClickHouse.EmbeddedBackupDisk, backupName), nil
255255
}
256256
if b.cfg.General.RemoteStorage == "s3" {
257-
s3Endpoint, err := b.ch.ApplyMacros(ctx, b.buildEmbeddedLocationS3())
258-
if err != nil {
259-
return "", err
260-
}
257+
s3Endpoint := b.buildEmbeddedLocationS3(ctx)
261258
if b.cfg.S3.AccessKey != "" {
262259
return fmt.Sprintf("S3('%s/%s/','%s','%s')", s3Endpoint, backupName, b.cfg.S3.AccessKey, b.cfg.S3.SecretKey), nil
263260
}
@@ -267,10 +264,7 @@ func (b *Backuper) getEmbeddedBackupLocation(ctx context.Context, backupName str
267264
return "", errors.WithStack(errors.New("provide s3->access_key and s3->secret_key in config to allow embedded backup without `clickhouse->embedded_backup_disk`"))
268265
}
269266
if b.cfg.General.RemoteStorage == "gcs" {
270-
gcsEndpoint, err := b.ch.ApplyMacros(ctx, b.buildEmbeddedLocationGCS())
271-
if err != nil {
272-
return "", err
273-
}
267+
gcsEndpoint := b.buildEmbeddedLocationGCS(ctx)
274268
if b.cfg.GCS.EmbeddedAccessKey != "" {
275269
return fmt.Sprintf("S3('%s/%s/','%s','%s')", gcsEndpoint, backupName, b.cfg.GCS.EmbeddedAccessKey, b.cfg.GCS.EmbeddedSecretKey), nil
276270
}
@@ -280,45 +274,50 @@ func (b *Backuper) getEmbeddedBackupLocation(ctx context.Context, backupName str
280274
return "", fmt.Errorf("provide gcs->embedded_access_key and gcs->embedded_secret_key in config to allow embedded backup without `clickhouse->embedded_backup_disk`")
281275
}
282276
if b.cfg.General.RemoteStorage == "azblob" {
283-
azblobEndpoint, err := b.ch.ApplyMacros(ctx, b.buildEmbeddedLocationAZBLOB())
277+
azblobEndpoint := b.buildEmbeddedLocationAZBLOB()
278+
azblobPath, err := b.ch.ApplyMacros(ctx, b.cfg.AzureBlob.ObjectDiskPath)
284279
if err != nil {
285280
return "", err
286281
}
287282
if b.cfg.AzureBlob.Container != "" {
288-
return fmt.Sprintf("AzureBlobStorage('%s','%s','%s/%s/')", azblobEndpoint, b.cfg.AzureBlob.Container, b.cfg.AzureBlob.ObjectDiskPath, backupName), nil
283+
return fmt.Sprintf("AzureBlobStorage('%s','%s','%s/%s/')", azblobEndpoint, b.cfg.AzureBlob.Container, azblobPath, backupName), nil
289284
}
290285
return "", fmt.Errorf("provide azblob->container and azblob->account_name, azblob->account_key in config to allow embedded backup without `clickhouse->embedded_backup_disk`")
291286
}
292287
return "", fmt.Errorf("empty clickhouse->embedded_backup_disk and invalid general->remote_storage: %s", b.cfg.General.RemoteStorage)
293288
}
294289

295-
296-
func (b *Backuper) buildEmbeddedLocationS3() string {
290+
func (b *Backuper) buildEmbeddedLocationS3(ctx context.Context) string {
297291
s3backupURL := url.URL{}
298292
s3backupURL.Scheme = "https"
293+
s3Path, err := b.ch.ApplyMacros(ctx, b.cfg.S3.ObjectDiskPath)
294+
if err != nil {
295+
log.Error().Stack().Err(err).Send()
296+
return ""
297+
}
299298
if strings.HasPrefix(b.cfg.S3.Endpoint, "http") {
300299
newUrl, _ := s3backupURL.Parse(b.cfg.S3.Endpoint)
301300
s3backupURL = *newUrl
302-
s3backupURL.Path = path.Join(b.cfg.S3.Bucket, b.cfg.S3.ObjectDiskPath)
301+
s3backupURL.Path = path.Join(b.cfg.S3.Bucket, s3Path)
303302
} else {
304303
s3backupURL.Host = b.cfg.S3.Endpoint
305-
s3backupURL.Path = path.Join(b.cfg.S3.Bucket, b.cfg.S3.ObjectDiskPath)
304+
s3backupURL.Path = path.Join(b.cfg.S3.Bucket, s3Path)
306305
}
307306
if b.cfg.S3.DisableSSL {
308307
s3backupURL.Scheme = "http"
309308
}
310309
if s3backupURL.Host == "" && b.cfg.S3.Region != "" && b.cfg.S3.ForcePathStyle {
311310
s3backupURL.Host = "s3." + b.cfg.S3.Region + ".amazonaws.com"
312-
s3backupURL.Path = path.Join(b.cfg.S3.Bucket, b.cfg.S3.ObjectDiskPath)
311+
s3backupURL.Path = path.Join(b.cfg.S3.Bucket, s3Path)
313312
}
314313
if s3backupURL.Host == "" && b.cfg.S3.Bucket != "" && !b.cfg.S3.ForcePathStyle {
315314
s3backupURL.Host = b.cfg.S3.Bucket + "." + "s3." + b.cfg.S3.Region + ".amazonaws.com"
316-
s3backupURL.Path = b.cfg.S3.ObjectDiskPath
315+
s3backupURL.Path = s3Path
317316
}
318317
return s3backupURL.String()
319318
}
320319

321-
func (b *Backuper) buildEmbeddedLocationGCS() string {
320+
func (b *Backuper) buildEmbeddedLocationGCS(ctx context.Context) string {
322321
gcsBackupURL := url.URL{}
323322
gcsBackupURL.Scheme = "https"
324323
if b.cfg.GCS.ForceHttp {
@@ -328,14 +327,24 @@ func (b *Backuper) buildEmbeddedLocationGCS() string {
328327
if !strings.HasPrefix(b.cfg.GCS.Endpoint, "http") {
329328
gcsBackupURL.Host = b.cfg.GCS.Endpoint
330329
} else {
331-
newUrl, _ := gcsBackupURL.Parse(b.cfg.GCS.Endpoint)
330+
newUrl, err := gcsBackupURL.Parse(b.cfg.GCS.Endpoint)
331+
if err != nil {
332+
log.Error().Err(err).Stack().Send()
333+
return ""
334+
}
332335
gcsBackupURL = *newUrl
333336
}
334337
}
335338
if gcsBackupURL.Host == "" {
336339
gcsBackupURL.Host = "storage.googleapis.com"
337340
}
338-
gcsBackupURL.Path = path.Join(b.cfg.GCS.Bucket, b.cfg.GCS.ObjectDiskPath)
341+
gcsPath, err := b.ch.ApplyMacros(ctx, b.cfg.GCS.ObjectDiskPath)
342+
if err != nil {
343+
log.Error().Err(err).Stack().Send()
344+
return ""
345+
}
346+
347+
gcsBackupURL.Path = path.Join(b.cfg.GCS.Bucket, gcsPath)
339348
return gcsBackupURL.String()
340349
}
341350

pkg/clickhouse/clickhouse.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1305,6 +1305,9 @@ func (ch *ClickHouse) GetInProgressMutations(ctx context.Context, database strin
13051305
}
13061306

13071307
func (ch *ClickHouse) ApplyMacros(ctx context.Context, s string) (string, error) {
1308+
if !strings.Contains(s, "{") {
1309+
return s, nil
1310+
}
13081311
var macrosExists uint64
13091312
err := ch.SelectSingleRow(ctx, &macrosExists, "SELECT count() AS is_macros_exists FROM system.tables WHERE database='system' AND name='macros' SETTINGS empty_result_for_aggregation_by_empty_set=0")
13101313
if err != nil || macrosExists == 0 {

0 commit comments

Comments
 (0)