Skip to content

Commit 070d34c

Browse files
author
anton.voskresensky
committed
add parallel snapshots
1 parent 1ca4473 commit 070d34c

File tree

6 files changed

+218
-137
lines changed

6 files changed

+218
-137
lines changed

commands/snapshot-manual.go

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -132,11 +132,6 @@ func runSnapshotManual(cmd *cobra.Command, args []string) error {
132132
return fmt.Errorf("failed to wait for snapshot completion: %v", err)
133133
}
134134

135-
err = utils.WaitForSnapshotTasks(client, logger, "", repoToUse)
136-
if err != nil {
137-
return fmt.Errorf("failed to wait for snapshot tasks: %v", err)
138-
}
139-
140135
allSnapshots, err := utils.GetSnapshotsIgnore404(client, repoToUse, "*"+today+"*")
141136
if err != nil {
142137
return fmt.Errorf("failed to get snapshots: %v", err)
@@ -166,7 +161,7 @@ func runSnapshotManual(cmd *cobra.Command, args []string) error {
166161
indicesStr := strings.Join(matchingIndices, ",")
167162
logger.Info(fmt.Sprintf("Creating snapshot %s", snapshotName))
168163
logger.Info(fmt.Sprintf("Snapshot indices %s", indicesStr))
169-
err = utils.CreateSnapshotWithRetry(client, snapshotName, indicesStr, repoToUse, cfg.GetKubeNamespace(), today, madisonClient, logger, 60*time.Second)
164+
err = utils.CreateSnapshotWithRetry(client, snapshotName, indicesStr, repoToUse, cfg.GetKubeNamespace(), today, madisonClient, logger, 60*time.Second, cfg.GetMaxConcurrentSnapshots())
170165
if err != nil {
171166
logger.Error(fmt.Sprintf("Failed to create snapshot after retries snapshot=%s error=%v", snapshotName, err))
172167
return err

commands/snapshots.go

Lines changed: 61 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,7 @@ func runSnapshot(cmd *cobra.Command, args []string) error {
278278
logger.Info("Existing snapshots today none")
279279
}
280280

281+
var snapshotTasks []utils.SnapshotTask
281282
for _, group := range snapshotGroups {
282283
if state, ok, err := utils.CheckSnapshotStateInRepo(client, defaultRepo, group.SnapshotName); err == nil && ok {
283284
if state == "SUCCESS" {
@@ -311,15 +312,19 @@ func runSnapshot(cmd *cobra.Command, args []string) error {
311312
newSnapshotName := baseName + "-" + randomSuffix + "-" + datePart
312313
logger.Info(fmt.Sprintf("Some indices missing in existing snapshot, creating additional snapshot original=%s new=%s missingIndicesCount=%d", group.SnapshotName, newSnapshotName, len(missingIndices)))
313314
indicesStr := strings.Join(missingIndices, ",")
314-
logger.Info(fmt.Sprintf("Creating snapshot %s", newSnapshotName))
315-
logger.Info(fmt.Sprintf("Snapshot indices %s", indicesStr))
316-
err = utils.CreateSnapshotWithRetry(client, newSnapshotName, indicesStr, defaultRepo, cfg.GetKubeNamespace(), today, madisonClient, logger, 60*time.Second)
317-
if err != nil {
318-
logger.Error(fmt.Sprintf("Failed to create snapshot after retries snapshot=%s error=%v", newSnapshotName, err))
319-
failedSnapshots = append(failedSnapshots, newSnapshotName)
320-
} else {
321-
successfulSnapshots = append(successfulSnapshots, newSnapshotName)
315+
var totalSize int64
316+
for _, idx := range missingIndices {
317+
totalSize += indexSizes[idx]
322318
}
319+
snapshotTasks = append(snapshotTasks, utils.SnapshotTask{
320+
SnapshotName: newSnapshotName,
321+
IndicesStr: indicesStr,
322+
Repo: defaultRepo,
323+
Namespace: cfg.GetKubeNamespace(),
324+
DateStr: today,
325+
PollInterval: 60 * time.Second,
326+
Size: totalSize,
327+
})
323328
}
324329
continue
325330
}
@@ -341,15 +346,25 @@ func runSnapshot(cmd *cobra.Command, args []string) error {
341346
}
342347

343348
indicesStr := strings.Join(group.Indices, ",")
344-
logger.Info(fmt.Sprintf("Creating snapshot %s", group.SnapshotName))
345-
logger.Info(fmt.Sprintf("Snapshot indices %s", indicesStr))
346-
err = utils.CreateSnapshotWithRetry(client, group.SnapshotName, indicesStr, defaultRepo, cfg.GetKubeNamespace(), today, madisonClient, logger, 60*time.Second)
347-
if err != nil {
348-
logger.Error(fmt.Sprintf("Failed to create snapshot after retries snapshot=%s error=%v", group.SnapshotName, err))
349-
failedSnapshots = append(failedSnapshots, group.SnapshotName)
350-
continue
349+
var totalSize int64
350+
for _, idx := range group.Indices {
351+
totalSize += indexSizes[idx]
351352
}
352-
successfulSnapshots = append(successfulSnapshots, group.SnapshotName)
353+
snapshotTasks = append(snapshotTasks, utils.SnapshotTask{
354+
SnapshotName: group.SnapshotName,
355+
IndicesStr: indicesStr,
356+
Repo: defaultRepo,
357+
Namespace: cfg.GetKubeNamespace(),
358+
DateStr: today,
359+
PollInterval: 60 * time.Second,
360+
Size: totalSize,
361+
})
362+
}
363+
364+
if len(snapshotTasks) > 0 {
365+
successful, failed := utils.CreateSnapshotsInParallel(client, snapshotTasks, cfg.GetMaxConcurrentSnapshots(), madisonClient, logger)
366+
successfulSnapshots = append(successfulSnapshots, successful...)
367+
failedSnapshots = append(failedSnapshots, failed...)
353368
}
354369

355370
if len(repoGroups) > 0 {
@@ -359,6 +374,7 @@ func runSnapshot(cmd *cobra.Command, args []string) error {
359374
repo := parts[0]
360375
perRepo[repo] = append(perRepo[repo], g)
361376
}
377+
var repoSnapshotTasks []utils.SnapshotTask
362378
for repo, groups := range perRepo {
363379
sort.Slice(groups, func(i, j int) bool {
364380
var sizeI, sizeJ int64
@@ -411,15 +427,19 @@ func runSnapshot(cmd *cobra.Command, args []string) error {
411427
newSnapshotName := baseName + "-" + randomSuffix + "-" + datePart
412428
logger.Info(fmt.Sprintf("Some indices missing in existing snapshot, creating additional snapshot repo=%s original=%s new=%s missingIndicesCount=%d", repo, g.SnapshotName, newSnapshotName, len(missingIndices)))
413429
indicesStr := strings.Join(missingIndices, ",")
414-
logger.Info(fmt.Sprintf("Creating snapshot repo=%s snapshot=%s", repo, newSnapshotName))
415-
logger.Info(fmt.Sprintf("Snapshot indices %s", indicesStr))
416-
err = utils.CreateSnapshotWithRetry(client, newSnapshotName, indicesStr, repo, cfg.GetKubeNamespace(), today, madisonClient, logger, 60*time.Second)
417-
if err != nil {
418-
logger.Error(fmt.Sprintf("Failed to create snapshot after retries repo=%s snapshot=%s error=%v", repo, newSnapshotName, err))
419-
failedSnapshots = append(failedSnapshots, fmt.Sprintf("%s (repo=%s)", newSnapshotName, repo))
420-
} else {
421-
successfulSnapshots = append(successfulSnapshots, fmt.Sprintf("%s (repo=%s)", newSnapshotName, repo))
430+
var totalSize int64
431+
for _, idx := range missingIndices {
432+
totalSize += indexSizes[idx]
422433
}
434+
repoSnapshotTasks = append(repoSnapshotTasks, utils.SnapshotTask{
435+
SnapshotName: newSnapshotName,
436+
IndicesStr: indicesStr,
437+
Repo: repo,
438+
Namespace: cfg.GetKubeNamespace(),
439+
DateStr: today,
440+
PollInterval: 60 * time.Second,
441+
Size: totalSize,
442+
})
423443
}
424444
continue
425445
}
@@ -438,17 +458,26 @@ func runSnapshot(cmd *cobra.Command, args []string) error {
438458
continue
439459
}
440460
indicesStr := strings.Join(g.Indices, ",")
441-
logger.Info(fmt.Sprintf("Creating snapshot repo=%s snapshot=%s", repo, g.SnapshotName))
442-
logger.Info(fmt.Sprintf("Snapshot indices %s", indicesStr))
443-
err = utils.CreateSnapshotWithRetry(client, g.SnapshotName, indicesStr, repo, cfg.GetKubeNamespace(), today, madisonClient, logger, 60*time.Second)
444-
if err != nil {
445-
logger.Error(fmt.Sprintf("Failed to create snapshot after retries repo=%s snapshot=%s error=%v", repo, g.SnapshotName, err))
446-
failedSnapshots = append(failedSnapshots, fmt.Sprintf("%s (repo=%s)", g.SnapshotName, repo))
447-
continue
461+
var totalSize int64
462+
for _, idx := range g.Indices {
463+
totalSize += indexSizes[idx]
448464
}
449-
successfulSnapshots = append(successfulSnapshots, fmt.Sprintf("%s (repo=%s)", g.SnapshotName, repo))
465+
repoSnapshotTasks = append(repoSnapshotTasks, utils.SnapshotTask{
466+
SnapshotName: g.SnapshotName,
467+
IndicesStr: indicesStr,
468+
Repo: repo,
469+
Namespace: cfg.GetKubeNamespace(),
470+
DateStr: today,
471+
PollInterval: 60 * time.Second,
472+
Size: totalSize,
473+
})
450474
}
451475
}
476+
if len(repoSnapshotTasks) > 0 {
477+
successful, failed := utils.CreateSnapshotsInParallel(client, repoSnapshotTasks, cfg.GetMaxConcurrentSnapshots(), madisonClient, logger)
478+
successfulSnapshots = append(successfulSnapshots, successful...)
479+
failedSnapshots = append(failedSnapshots, failed...)
480+
}
452481
}
453482
}
454483

commands/snapshotsbackfill.go

Lines changed: 60 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -537,6 +537,7 @@ func runSnapshotsBackfill(cmd *cobra.Command, args []string) error {
537537
allSnapshotsForDate = []opensearch.Snapshot{}
538538
}
539539

540+
var snapshotTasks []utils.SnapshotTask
540541
for _, group := range snapshotGroups {
541542
if state, ok, err := utils.CheckSnapshotStateInRepo(client, defaultRepo, group.SnapshotName); err == nil && ok {
542543
if state == "SUCCESS" {
@@ -570,17 +571,19 @@ func runSnapshotsBackfill(cmd *cobra.Command, args []string) error {
570571
newSnapshotName := baseName + "-" + randomSuffix + "-" + datePart
571572
logger.Info(fmt.Sprintf("Some indices missing in existing snapshot, creating additional snapshot original=%s new=%s missingIndicesCount=%d", group.SnapshotName, newSnapshotName, len(missingIndices)))
572573
indicesStr := strings.Join(missingIndices, ",")
573-
logger.Info(fmt.Sprintf("Creating snapshot %s", newSnapshotName))
574-
logger.Info(fmt.Sprintf("Snapshot indices %s", indicesStr))
575-
err = utils.CreateSnapshotWithRetry(client, newSnapshotName, indicesStr, defaultRepo, cfg.GetKubeNamespace(), today, madisonClient, logger, 10*time.Minute)
576-
if err != nil {
577-
logger.Error(fmt.Sprintf("Failed to create snapshot after retries snapshot=%s error=%v", newSnapshotName, err))
578-
failedSnapshots = append(failedSnapshots, newSnapshotName)
579-
} else {
580-
successfulSnapshots = append(successfulSnapshots, newSnapshotName)
574+
var totalSize int64
575+
for _, idx := range missingIndices {
576+
totalSize += indexSizes[idx]
581577
}
582-
logger.Info("Waiting 10 minutes before next snapshot creation")
583-
time.Sleep(10 * time.Minute)
578+
snapshotTasks = append(snapshotTasks, utils.SnapshotTask{
579+
SnapshotName: newSnapshotName,
580+
IndicesStr: indicesStr,
581+
Repo: defaultRepo,
582+
Namespace: cfg.GetKubeNamespace(),
583+
DateStr: today,
584+
PollInterval: 10 * time.Minute,
585+
Size: totalSize,
586+
})
584587
}
585588
continue
586589
}
@@ -602,18 +605,25 @@ func runSnapshotsBackfill(cmd *cobra.Command, args []string) error {
602605
}
603606

604607
indicesStr := strings.Join(group.Indices, ",")
605-
logger.Info(fmt.Sprintf("Creating snapshot snapshot=%s", group.SnapshotName))
606-
logger.Info(fmt.Sprintf("Snapshot indices %s", indicesStr))
607-
err = utils.CreateSnapshotWithRetry(client, group.SnapshotName, indicesStr, defaultRepo, cfg.GetKubeNamespace(), today, madisonClient, logger, 10*time.Minute)
608-
if err != nil {
609-
logger.Error(fmt.Sprintf("Failed to create snapshot after retries snapshot=%s error=%v", group.SnapshotName, err))
610-
failedSnapshots = append(failedSnapshots, group.SnapshotName)
611-
continue
608+
var totalSize int64
609+
for _, idx := range group.Indices {
610+
totalSize += indexSizes[idx]
612611
}
613-
successfulSnapshots = append(successfulSnapshots, group.SnapshotName)
612+
snapshotTasks = append(snapshotTasks, utils.SnapshotTask{
613+
SnapshotName: group.SnapshotName,
614+
IndicesStr: indicesStr,
615+
Repo: defaultRepo,
616+
Namespace: cfg.GetKubeNamespace(),
617+
DateStr: today,
618+
PollInterval: 10 * time.Minute,
619+
Size: totalSize,
620+
})
621+
}
614622

615-
logger.Info("Waiting 10 minutes before next snapshot creation")
616-
time.Sleep(10 * time.Minute)
623+
if len(snapshotTasks) > 0 {
624+
successful, failed := utils.CreateSnapshotsInParallel(client, snapshotTasks, cfg.GetMaxConcurrentSnapshots(), madisonClient, logger)
625+
successfulSnapshots = append(successfulSnapshots, successful...)
626+
failedSnapshots = append(failedSnapshots, failed...)
617627
}
618628

619629
if len(repoGroups) > 0 {
@@ -623,6 +633,7 @@ func runSnapshotsBackfill(cmd *cobra.Command, args []string) error {
623633
repo := parts[0]
624634
perRepo[repo] = append(perRepo[repo], g)
625635
}
636+
var repoSnapshotTasks []utils.SnapshotTask
626637
for repo, groups := range perRepo {
627638
sort.Slice(groups, func(i, j int) bool {
628639
var sizeI, sizeJ int64
@@ -675,17 +686,19 @@ func runSnapshotsBackfill(cmd *cobra.Command, args []string) error {
675686
newSnapshotName := baseName + "-" + randomSuffix + "-" + datePart
676687
logger.Info(fmt.Sprintf("Some indices missing in existing snapshot, creating additional snapshot repo=%s original=%s new=%s missingIndicesCount=%d", repo, g.SnapshotName, newSnapshotName, len(missingIndices)))
677688
indicesStr := strings.Join(missingIndices, ",")
678-
logger.Info(fmt.Sprintf("Creating snapshot repo=%s snapshot=%s", repo, newSnapshotName))
679-
logger.Info(fmt.Sprintf("Snapshot indices %s", indicesStr))
680-
err = utils.CreateSnapshotWithRetry(client, newSnapshotName, indicesStr, repo, cfg.GetKubeNamespace(), today, madisonClient, logger, 10*time.Minute)
681-
if err != nil {
682-
logger.Error(fmt.Sprintf("Failed to create snapshot after retries repo=%s snapshot=%s error=%v", repo, newSnapshotName, err))
683-
failedSnapshots = append(failedSnapshots, fmt.Sprintf("%s (repo=%s)", newSnapshotName, repo))
684-
} else {
685-
successfulSnapshots = append(successfulSnapshots, fmt.Sprintf("%s (repo=%s)", newSnapshotName, repo))
689+
var totalSize int64
690+
for _, idx := range missingIndices {
691+
totalSize += indexSizes[idx]
686692
}
687-
logger.Info("Waiting 10 minutes before next snapshot creation")
688-
time.Sleep(10 * time.Minute)
693+
repoSnapshotTasks = append(repoSnapshotTasks, utils.SnapshotTask{
694+
SnapshotName: newSnapshotName,
695+
IndicesStr: indicesStr,
696+
Repo: repo,
697+
Namespace: cfg.GetKubeNamespace(),
698+
DateStr: today,
699+
PollInterval: 10 * time.Minute,
700+
Size: totalSize,
701+
})
689702
}
690703
continue
691704
}
@@ -704,20 +717,26 @@ func runSnapshotsBackfill(cmd *cobra.Command, args []string) error {
704717
continue
705718
}
706719
indicesStr := strings.Join(g.Indices, ",")
707-
logger.Info(fmt.Sprintf("Creating snapshot repo=%s snapshot=%s", repo, g.SnapshotName))
708-
logger.Info(fmt.Sprintf("Snapshot indices %s", indicesStr))
709-
err = utils.CreateSnapshotWithRetry(client, g.SnapshotName, indicesStr, repo, cfg.GetKubeNamespace(), today, madisonClient, logger, 10*time.Minute)
710-
if err != nil {
711-
logger.Error(fmt.Sprintf("Failed to create snapshot after retries repo=%s snapshot=%s error=%v", repo, g.SnapshotName, err))
712-
failedSnapshots = append(failedSnapshots, fmt.Sprintf("%s (repo=%s)", g.SnapshotName, repo))
713-
continue
720+
var totalSize int64
721+
for _, idx := range g.Indices {
722+
totalSize += indexSizes[idx]
714723
}
715-
successfulSnapshots = append(successfulSnapshots, fmt.Sprintf("%s (repo=%s)", g.SnapshotName, repo))
716-
717-
logger.Info("Waiting 10 minutes before next snapshot creation")
718-
time.Sleep(10 * time.Minute)
724+
repoSnapshotTasks = append(repoSnapshotTasks, utils.SnapshotTask{
725+
SnapshotName: g.SnapshotName,
726+
IndicesStr: indicesStr,
727+
Repo: repo,
728+
Namespace: cfg.GetKubeNamespace(),
729+
DateStr: today,
730+
PollInterval: 10 * time.Minute,
731+
Size: totalSize,
732+
})
719733
}
720734
}
735+
if len(repoSnapshotTasks) > 0 {
736+
successful, failed := utils.CreateSnapshotsInParallel(client, repoSnapshotTasks, cfg.GetMaxConcurrentSnapshots(), madisonClient, logger)
737+
successfulSnapshots = append(successfulSnapshots, successful...)
738+
failedSnapshots = append(failedSnapshots, failed...)
739+
}
721740
}
722741
}
723742
}

config.yaml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,8 @@ retention_check_nodes_down: true
7070
sharding_target_size_gib: 25
7171
exclude_sharding: ""
7272

73-
# snapshots:
73+
snapshots:
74+
max_concurrent_snapshots: 3
7475
# Uses osctl-indices-config for detailed configuration
7576

7677
# snapshotschecker:

pkg/config/config.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ type Config struct {
6666
IndexPatternsKibanaTenantsConfig string
6767
IndexPatternsRecovererEnabled string
6868
SnapshotsBackfillIndicesList string
69+
MaxConcurrentSnapshots string
6970
}
7071

7172
type CommandConfig = Config
@@ -181,6 +182,7 @@ func LoadConfig(cmd *cobra.Command, commandName string) error {
181182
IndexPatternsKibanaTenantsConfig: getValue(cmd, "indexpatterns-kibana-tenants-config", "INDEXPATTERNS_KIBANA_TENANTS_CONFIG", viper.GetString("indexpatterns_kibana_tenants_config")),
182183
IndexPatternsRecovererEnabled: getValue(cmd, "indexpatterns-recoverer-enabled", "INDEXPATTERNS_RECOVERER_ENABLED", viper.GetString("indexpatterns_recoverer_enabled")),
183184
SnapshotsBackfillIndicesList: getValue(cmd, "indices-list", "SNAPSHOTS_BACKFILL_INDICES_LIST", viper.GetString("snapshots_backfill_indices_list")),
185+
MaxConcurrentSnapshots: getValue(cmd, "max-concurrent-snapshots", "MAX_CONCURRENT_SNAPSHOTS", viper.GetString("max_concurrent_snapshots")),
184186
}
185187

186188
switch commandName {
@@ -248,6 +250,7 @@ func setDefaults() {
248250
viper.SetDefault("kibana_multidomain_enabled", false)
249251
viper.SetDefault("datasource_name", "recoverer")
250252
viper.SetDefault("datasource_endpoint", "https://opendistro-recoverer:9200")
253+
viper.SetDefault("max_concurrent_snapshots", 3)
251254
}
252255

253256
func GetAvailableActions() []string {
@@ -569,6 +572,10 @@ func (c *Config) GetSnapshotsBackfillIndicesList() string {
569572
return c.SnapshotsBackfillIndicesList
570573
}
571574

575+
func (c *Config) GetMaxConcurrentSnapshots() int {
576+
return parseIntWithDefault(c.MaxConcurrentSnapshots, "max_concurrent_snapshots")
577+
}
578+
572579
type FlagDefinition struct {
573580
Name string
574581
Type string
@@ -580,6 +587,7 @@ type FlagDefinition struct {
580587
var CommandFlags = map[string][]FlagDefinition{
581588
"common": {
582589
{"osctl-indices-config", "string", "", "Path to osctl indices configuration file", []string{}},
590+
{"max-concurrent-snapshots", "int", 3, "Maximum number of snapshots to create simultaneously", []string{"min:1", "max:10"}},
583591
},
584592
"snapshots": {
585593
{"dry-run", "bool", false, "Show what would be created without actually creating", []string{}},

0 commit comments

Comments
 (0)