Skip to content

Commit a417cca

Browse files
authored
Merge branch 'main' into flant/refresh-index-patterns
2 parents e5bc208 + 38f3056 commit a417cca

File tree

12 files changed

+757
-230
lines changed

12 files changed

+757
-230
lines changed

ARCHITECTURE.md

Lines changed: 72 additions & 60 deletions
Large diffs are not rendered by default.

commands/snapshot-manual.go

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -132,11 +132,6 @@ func runSnapshotManual(cmd *cobra.Command, args []string) error {
132132
return fmt.Errorf("failed to wait for snapshot completion: %v", err)
133133
}
134134

135-
err = utils.WaitForSnapshotTasks(client, logger, "", repoToUse)
136-
if err != nil {
137-
return fmt.Errorf("failed to wait for snapshot tasks: %v", err)
138-
}
139-
140135
allSnapshots, err := utils.GetSnapshotsIgnore404(client, repoToUse, "*"+today+"*")
141136
if err != nil {
142137
return fmt.Errorf("failed to get snapshots: %v", err)
@@ -166,7 +161,7 @@ func runSnapshotManual(cmd *cobra.Command, args []string) error {
166161
indicesStr := strings.Join(matchingIndices, ",")
167162
logger.Info(fmt.Sprintf("Creating snapshot %s", snapshotName))
168163
logger.Info(fmt.Sprintf("Snapshot indices %s", indicesStr))
169-
err = utils.CreateSnapshotWithRetry(client, snapshotName, indicesStr, repoToUse, madisonClient, logger, 60*time.Second)
164+
err = utils.CreateSnapshotWithRetry(client, snapshotName, indicesStr, repoToUse, cfg.GetKubeNamespace(), today, madisonClient, logger, 60*time.Second, cfg.GetMaxConcurrentSnapshots(), 0)
170165
if err != nil {
171166
logger.Error(fmt.Sprintf("Failed to create snapshot after retries snapshot=%s error=%v", snapshotName, err))
172167
return err

commands/snapshots.go

Lines changed: 91 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import (
88
"osctl/pkg/logging"
99
"osctl/pkg/opensearch"
1010
"osctl/pkg/utils"
11+
"sort"
12+
"strconv"
1113
"strings"
1214
"time"
1315

@@ -55,6 +57,7 @@ func runSnapshot(cmd *cobra.Command, args []string) error {
5557
var indicesToSnapshot []string
5658
repoGroups := map[string]utils.SnapshotGroup{}
5759
var unknownIndices []string
60+
indexSizes := make(map[string]int64)
5861

5962
systemConfigs := make([]config.IndexConfig, 0)
6063
regularConfigs := make([]config.IndexConfig, 0)
@@ -81,6 +84,9 @@ func runSnapshot(cmd *cobra.Command, args []string) error {
8184

8285
for _, idx := range allSystemIndices {
8386
indexName := idx.Index
87+
if size, err := strconv.ParseInt(idx.Size, 10, 64); err == nil {
88+
indexSizes[indexName] = size
89+
}
8490
indexConfig := utils.FindMatchingIndexConfig(indexName, systemConfigs)
8591
if indexConfig != nil && indexConfig.Snapshot && !indexConfig.ManualSnapshot {
8692
utils.AddIndexToSnapshotGroups(indexName, *indexConfig, today, repoGroups, &indicesToSnapshot)
@@ -102,6 +108,9 @@ func runSnapshot(cmd *cobra.Command, args []string) error {
102108

103109
for _, idx := range allRegularIndices {
104110
indexName := idx.Index
111+
if size, err := strconv.ParseInt(idx.Size, 10, 64); err == nil {
112+
indexSizes[indexName] = size
113+
}
105114
indexConfig := utils.FindMatchingIndexConfig(indexName, regularConfigs)
106115
if indexConfig != nil && indexConfig.Snapshot && !indexConfig.ManualSnapshot {
107116
utils.AddIndexToSnapshotGroups(indexName, *indexConfig, today, repoGroups, &indicesToSnapshot)
@@ -136,6 +145,17 @@ func runSnapshot(cmd *cobra.Command, args []string) error {
136145
})
137146
}
138147

148+
sort.Slice(snapshotGroups, func(i, j int) bool {
149+
var sizeI, sizeJ int64
150+
for _, idx := range snapshotGroups[i].Indices {
151+
sizeI += indexSizes[idx]
152+
}
153+
for _, idx := range snapshotGroups[j].Indices {
154+
sizeJ += indexSizes[idx]
155+
}
156+
return sizeI > sizeJ
157+
})
158+
139159
if cfg.GetDryRun() {
140160
existingMain, err := utils.GetSnapshotsIgnore404(client, defaultRepo, "*"+today+"*")
141161
if err != nil {
@@ -258,6 +278,7 @@ func runSnapshot(cmd *cobra.Command, args []string) error {
258278
logger.Info("Existing snapshots today none")
259279
}
260280

281+
var snapshotTasks []utils.SnapshotTask
261282
for _, group := range snapshotGroups {
262283
if state, ok, err := utils.CheckSnapshotStateInRepo(client, defaultRepo, group.SnapshotName); err == nil && ok {
263284
if state == "SUCCESS" {
@@ -291,15 +312,19 @@ func runSnapshot(cmd *cobra.Command, args []string) error {
291312
newSnapshotName := baseName + "-" + randomSuffix + "-" + datePart
292313
logger.Info(fmt.Sprintf("Some indices missing in existing snapshot, creating additional snapshot original=%s new=%s missingIndicesCount=%d", group.SnapshotName, newSnapshotName, len(missingIndices)))
293314
indicesStr := strings.Join(missingIndices, ",")
294-
logger.Info(fmt.Sprintf("Creating snapshot %s", newSnapshotName))
295-
logger.Info(fmt.Sprintf("Snapshot indices %s", indicesStr))
296-
err = utils.CreateSnapshotWithRetry(client, newSnapshotName, indicesStr, defaultRepo, madisonClient, logger, 60*time.Second)
297-
if err != nil {
298-
logger.Error(fmt.Sprintf("Failed to create snapshot after retries snapshot=%s error=%v", newSnapshotName, err))
299-
failedSnapshots = append(failedSnapshots, newSnapshotName)
300-
} else {
301-
successfulSnapshots = append(successfulSnapshots, newSnapshotName)
315+
var totalSize int64
316+
for _, idx := range missingIndices {
317+
totalSize += indexSizes[idx]
302318
}
319+
snapshotTasks = append(snapshotTasks, utils.SnapshotTask{
320+
SnapshotName: newSnapshotName,
321+
IndicesStr: indicesStr,
322+
Repo: defaultRepo,
323+
Namespace: cfg.GetKubeNamespace(),
324+
DateStr: today,
325+
PollInterval: 60 * time.Second,
326+
Size: totalSize,
327+
})
303328
}
304329
continue
305330
}
@@ -321,15 +346,25 @@ func runSnapshot(cmd *cobra.Command, args []string) error {
321346
}
322347

323348
indicesStr := strings.Join(group.Indices, ",")
324-
logger.Info(fmt.Sprintf("Creating snapshot %s", group.SnapshotName))
325-
logger.Info(fmt.Sprintf("Snapshot indices %s", indicesStr))
326-
err = utils.CreateSnapshotWithRetry(client, group.SnapshotName, indicesStr, defaultRepo, madisonClient, logger, 60*time.Second)
327-
if err != nil {
328-
logger.Error(fmt.Sprintf("Failed to create snapshot after retries snapshot=%s error=%v", group.SnapshotName, err))
329-
failedSnapshots = append(failedSnapshots, group.SnapshotName)
330-
continue
349+
var totalSize int64
350+
for _, idx := range group.Indices {
351+
totalSize += indexSizes[idx]
331352
}
332-
successfulSnapshots = append(successfulSnapshots, group.SnapshotName)
353+
snapshotTasks = append(snapshotTasks, utils.SnapshotTask{
354+
SnapshotName: group.SnapshotName,
355+
IndicesStr: indicesStr,
356+
Repo: defaultRepo,
357+
Namespace: cfg.GetKubeNamespace(),
358+
DateStr: today,
359+
PollInterval: 60 * time.Second,
360+
Size: totalSize,
361+
})
362+
}
363+
364+
if len(snapshotTasks) > 0 {
365+
successful, failed := utils.CreateSnapshotsInParallel(client, snapshotTasks, cfg.GetMaxConcurrentSnapshots(), madisonClient, logger, true)
366+
successfulSnapshots = append(successfulSnapshots, successful...)
367+
failedSnapshots = append(failedSnapshots, failed...)
333368
}
334369

335370
if len(repoGroups) > 0 {
@@ -339,7 +374,18 @@ func runSnapshot(cmd *cobra.Command, args []string) error {
339374
repo := parts[0]
340375
perRepo[repo] = append(perRepo[repo], g)
341376
}
377+
var repoSnapshotTasks []utils.SnapshotTask
342378
for repo, groups := range perRepo {
379+
sort.Slice(groups, func(i, j int) bool {
380+
var sizeI, sizeJ int64
381+
for _, idx := range groups[i].Indices {
382+
sizeI += indexSizes[idx]
383+
}
384+
for _, idx := range groups[j].Indices {
385+
sizeJ += indexSizes[idx]
386+
}
387+
return sizeI > sizeJ
388+
})
343389
existing, err := utils.GetSnapshotsIgnore404(client, repo, "*"+today+"*")
344390
if err != nil {
345391
logger.Error(fmt.Sprintf("Failed to get snapshots from repo repo=%s error=%v", repo, err))
@@ -381,15 +427,19 @@ func runSnapshot(cmd *cobra.Command, args []string) error {
381427
newSnapshotName := baseName + "-" + randomSuffix + "-" + datePart
382428
logger.Info(fmt.Sprintf("Some indices missing in existing snapshot, creating additional snapshot repo=%s original=%s new=%s missingIndicesCount=%d", repo, g.SnapshotName, newSnapshotName, len(missingIndices)))
383429
indicesStr := strings.Join(missingIndices, ",")
384-
logger.Info(fmt.Sprintf("Creating snapshot repo=%s snapshot=%s", repo, newSnapshotName))
385-
logger.Info(fmt.Sprintf("Snapshot indices %s", indicesStr))
386-
err = utils.CreateSnapshotWithRetry(client, newSnapshotName, indicesStr, repo, madisonClient, logger, 60*time.Second)
387-
if err != nil {
388-
logger.Error(fmt.Sprintf("Failed to create snapshot after retries repo=%s snapshot=%s error=%v", repo, newSnapshotName, err))
389-
failedSnapshots = append(failedSnapshots, fmt.Sprintf("%s (repo=%s)", newSnapshotName, repo))
390-
} else {
391-
successfulSnapshots = append(successfulSnapshots, fmt.Sprintf("%s (repo=%s)", newSnapshotName, repo))
430+
var totalSize int64
431+
for _, idx := range missingIndices {
432+
totalSize += indexSizes[idx]
392433
}
434+
repoSnapshotTasks = append(repoSnapshotTasks, utils.SnapshotTask{
435+
SnapshotName: newSnapshotName,
436+
IndicesStr: indicesStr,
437+
Repo: repo,
438+
Namespace: cfg.GetKubeNamespace(),
439+
DateStr: today,
440+
PollInterval: 60 * time.Second,
441+
Size: totalSize,
442+
})
393443
}
394444
continue
395445
}
@@ -408,17 +458,26 @@ func runSnapshot(cmd *cobra.Command, args []string) error {
408458
continue
409459
}
410460
indicesStr := strings.Join(g.Indices, ",")
411-
logger.Info(fmt.Sprintf("Creating snapshot repo=%s snapshot=%s", repo, g.SnapshotName))
412-
logger.Info(fmt.Sprintf("Snapshot indices %s", indicesStr))
413-
err = utils.CreateSnapshotWithRetry(client, g.SnapshotName, indicesStr, repo, madisonClient, logger, 60*time.Second)
414-
if err != nil {
415-
logger.Error(fmt.Sprintf("Failed to create snapshot after retries repo=%s snapshot=%s error=%v", repo, g.SnapshotName, err))
416-
failedSnapshots = append(failedSnapshots, fmt.Sprintf("%s (repo=%s)", g.SnapshotName, repo))
417-
continue
461+
var totalSize int64
462+
for _, idx := range g.Indices {
463+
totalSize += indexSizes[idx]
418464
}
419-
successfulSnapshots = append(successfulSnapshots, fmt.Sprintf("%s (repo=%s)", g.SnapshotName, repo))
465+
repoSnapshotTasks = append(repoSnapshotTasks, utils.SnapshotTask{
466+
SnapshotName: g.SnapshotName,
467+
IndicesStr: indicesStr,
468+
Repo: repo,
469+
Namespace: cfg.GetKubeNamespace(),
470+
DateStr: today,
471+
PollInterval: 60 * time.Second,
472+
Size: totalSize,
473+
})
420474
}
421475
}
476+
if len(repoSnapshotTasks) > 0 {
477+
successful, failed := utils.CreateSnapshotsInParallel(client, repoSnapshotTasks, cfg.GetMaxConcurrentSnapshots(), madisonClient, logger, true)
478+
successfulSnapshots = append(successfulSnapshots, successful...)
479+
failedSnapshots = append(failedSnapshots, failed...)
480+
}
422481
}
423482
}
424483

0 commit comments

Comments
 (0)