Skip to content

Commit 7b67580

Browse files
author
anton.voskresensky
committed
fix backfill
1 parent 3bcdb5d commit 7b67580

File tree

5 files changed

+334
-69
lines changed

5 files changed

+334
-69
lines changed

ARCHITECTURE.md

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -71,17 +71,26 @@ osctl/
7171
- Создаем группы по репозиториям: для каждой группы создается один снапшот
7272
5. **Dry run режим**:
7373
- Получаем существующие снапшоты за сегодня из целевого репозитория
74-
- Фильтруем снапшоты со статусом `SUCCESS` - они не показываются в плане
74+
- Для каждой группы проверяем наличие снапшота со статусом `SUCCESS`:
75+
- Если снапшот существует и содержит все индексы - пропускаем
76+
- Если снапшот существует, но некоторые индексы отсутствуют - показываем план создания дополнительного снапшота с именем `{baseName}-{randomSuffix}-{date}` только для отсутствующих индексов
7577
- Для снапшотов со статусом `IN_PROGRESS` показываем явное сообщение
7678
- Показываем план создания снапшотов с указанием репозитория для каждой группы
7779
6. **Создание снапшотов**:
7880
- Ждем завершения всех активных снапшотов через `WaitForSnapshotCompletion` и `WaitForSnapshotTasks`
7981
- Для каждой группы проверяем существующие снапшоты через `GET /_snapshot/{repo}/*{today}*`
8082
- Проверяем состояние снапшота через `CheckSnapshotStateInRepo`:
81-
- Если снапшот уже в состоянии `SUCCESS` - пропускаем с логированием
83+
- Если снапшот уже в состоянии `SUCCESS`:
84+
- Проверяем наличие всех индексов из группы в существующем снапшоте
85+
- Если все индексы присутствуют - пропускаем с логированием
86+
- Если некоторые индексы отсутствуют:
87+
- Создаем новый снапшот с именем `{baseName}-{randomSuffix}-{date}`, где `randomSuffix` - 6-символьная рандомная строка
88+
- В новый снапшот добавляем только отсутствующие индексы
89+
- Создаем через `CreateSnapshotWithRetry` с retry логикой
8290
- Если снапшот в состоянии `IN_PROGRESS` - пропускаем с логированием
83-
- Для каждой группы проверяем/удаляем ошибочный снапшот через `CheckAndCleanSnapshot`
84-
- Создаем снапшот через `CreateSnapshotWithRetry` с retry логикой (до 5 попыток)
91+
- Если снапшота нет или он не в состоянии `SUCCESS`:
92+
- Для каждой группы проверяем/удаляем ошибочный снапшот через `CheckAndCleanSnapshot`
93+
- Создаем снапшот через `CreateSnapshotWithRetry` с retry логикой (до 5 попыток)
8594
- **Алертинг**: При неудаче после всех попыток отправляется алерт в Madison через `SendSnapshotFailureAlert` (логируется попытка отправки и результат)
8695
- **Обработка ошибок**: При ошибке создания снапшота команда продолжает работу со следующими снапшотами (не прерывает выполнение)
8796
- **Repo-specific группы**: Для снапшотов в кастомных репозиториях применяется та же логика создания с retry
@@ -334,15 +343,25 @@ osctl/
334343
4. **Группировка для снапшотов**: Группируем индексы через `GroupIndicesForSnapshots`
335344
5. **Dry run режим**:
336345
- Получаем существующие снапшоты за дату из целевого репозитория
337-
- Фильтруем снапшоты со статусом `SUCCESS` и `IN_PROGRESS`
346+
- Для каждой группы проверяем наличие снапшота со статусом `SUCCESS`:
347+
- Если снапшот существует и содержит все индексы - пропускаем
348+
- Если снапшот существует, но некоторые индексы отсутствуют - показываем план создания дополнительного снапшота с именем `{baseName}-{randomSuffix}-{date}` только для отсутствующих индексов
349+
- Фильтруем снапшоты со статусом `IN_PROGRESS`
338350
- Показываем план создания снапшотов с указанием репозитория для каждой группы
339351
6. **Создание снапшотов**:
340352
- Для каждой группы проверяем существующие снапшоты через `GET /_snapshot/{repo}/*{date}*`
341353
- Проверяем состояние снапшота через `CheckSnapshotStateInRepo`:
342-
- Если снапшот уже в состоянии `SUCCESS` - пропускаем с логированием
354+
- Если снапшот уже в состоянии `SUCCESS`:
355+
- Проверяем наличие всех индексов из группы в существующем снапшоте
356+
- Если все индексы присутствуют - пропускаем с логированием
357+
- Если некоторые индексы отсутствуют:
358+
- Создаем новый снапшот с именем `{baseName}-{randomSuffix}-{date}`, где `randomSuffix` - 6-символьная рандомная строка
359+
- В новый снапшот добавляем только отсутствующие индексы
360+
- Создаем через `CreateSnapshotWithRetry` с retry логикой
343361
- Если снапшот в состоянии `IN_PROGRESS` - пропускаем с логированием
344-
- Для каждой группы проверяем/удаляем ошибочный снапшот через `CheckAndCleanSnapshot`
345-
- Создаем снапшот через `CreateSnapshotWithRetry` с retry логикой
362+
- Если снапшота нет или он не в состоянии `SUCCESS`:
363+
- Для каждой группы проверяем/удаляем ошибочный снапшот через `CheckAndCleanSnapshot`
364+
- Создаем снапшот через `CreateSnapshotWithRetry` с retry логикой
346365
- **Пауза**: После каждого создания снапшота делаем паузу 10 минут перед следующим
347366
- **Алертинг**: При неудаче после всех попыток отправляется алерт в Madison
348367
- **Обработка ошибок**: При ошибке создания снапшота команда продолжает работу со следующими снапшотами

commands/retention.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -95,20 +95,20 @@ func runRetention(cmd *cobra.Command, args []string) error {
9595
}
9696

9797
if cfg.GetDryRun() {
98-
fmt.Println("\nDRY RUN: Indices that would be deleted")
99-
fmt.Println("=" + strings.Repeat("=", 50))
98+
logger.Info("DRY RUN: Indices that would be deleted")
99+
logger.Info("=" + strings.Repeat("=", 50))
100100
count := 0
101101
for _, idx := range indicesToDelete {
102102
if count >= 5 {
103103
break
104104
}
105-
fmt.Printf("%d. %s (size: %s)\n", count+1, idx.Index, idx.Size)
105+
logger.Info(fmt.Sprintf("%d. %s (size: %s)", count+1, idx.Index, idx.Size))
106106
count++
107107
}
108108
if len(indicesToDelete) > 5 {
109-
fmt.Printf("... and %d more indices\n", len(indicesToDelete)-5)
109+
logger.Info(fmt.Sprintf("... and %d more indices", len(indicesToDelete)-5))
110110
}
111-
fmt.Printf("\nDRY RUN: Would delete %d indices\n", len(indicesToDelete))
111+
logger.Info(fmt.Sprintf("DRY RUN: Would delete %d indices", len(indicesToDelete)))
112112
return nil
113113
}
114114

commands/snapshot-manual.go

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -110,18 +110,20 @@ func runSnapshotManual(cmd *cobra.Command, args []string) error {
110110
logger.Info(fmt.Sprintf("Snapshot is currently IN_PROGRESS snapshot=%s repo=%s", snapshotName, repoToUse))
111111
return nil
112112
}
113-
fmt.Println("\nDRY RUN: Manual snapshot creation plan")
114-
fmt.Println("=" + strings.Repeat("=", 50))
113+
logger.Info("DRY RUN: Manual snapshot creation plan")
114+
logger.Info("=" + strings.Repeat("=", 50))
115115

116-
fmt.Printf("\nSnapshot (repo %s): %s\n", repoToUse, snapshotName)
117-
fmt.Printf("Pattern: %s (%s)\n", value, kind)
118-
fmt.Printf("Indices (%d):\n", len(matchingIndices))
116+
logger.Info("")
117+
logger.Info(fmt.Sprintf("Snapshot (repo %s): %s", repoToUse, snapshotName))
118+
logger.Info(fmt.Sprintf("Pattern: %s (%s)", value, kind))
119+
logger.Info(fmt.Sprintf("Indices (%d):", len(matchingIndices)))
119120

120121
for _, index := range matchingIndices {
121-
fmt.Printf(" %s\n", index)
122+
logger.Info(fmt.Sprintf(" %s", index))
122123
}
123124

124-
fmt.Printf("\nDRY RUN: Would create 1 manual snapshot\n")
125+
logger.Info("")
126+
logger.Info("DRY RUN: Would create 1 manual snapshot")
125127
return nil
126128
}
127129

commands/snapshots.go

Lines changed: 101 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -178,40 +178,43 @@ func runSnapshot(cmd *cobra.Command, args []string) error {
178178
}
179179
}
180180

181-
fmt.Println("\nDRY RUN: Snapshot creation plan")
182-
fmt.Println("=" + strings.Repeat("=", 50))
181+
logger.Info("DRY RUN: Snapshot creation plan")
182+
logger.Info("=" + strings.Repeat("=", 50))
183183

184184
if len(inProgressMain)+len(inProgressPerRepo) > 0 {
185-
fmt.Println("\nCurrently IN_PROGRESS snapshots:")
185+
logger.Info("")
186+
logger.Info("Currently IN_PROGRESS snapshots:")
186187
for _, msg := range inProgressMain {
187-
fmt.Printf(" %s\n", msg)
188+
logger.Info(fmt.Sprintf(" %s", msg))
188189
}
189190
for _, msg := range inProgressPerRepo {
190-
fmt.Printf(" %s\n", msg)
191+
logger.Info(fmt.Sprintf(" %s", msg))
191192
}
192-
fmt.Println("=" + strings.Repeat("=", 30))
193+
logger.Info("=" + strings.Repeat("=", 30))
193194
}
194195

195196
for i, group := range filteredMain {
196-
fmt.Printf("\nSnapshot %d (repo %s): %s\n", i+1, defaultRepo, group.SnapshotName)
197-
fmt.Printf("Pattern: %s (%s)\n", group.Pattern, group.Kind)
198-
fmt.Printf("Indices (%d):\n", len(group.Indices))
197+
logger.Info("")
198+
logger.Info(fmt.Sprintf("Snapshot %d (repo %s): %s", i+1, defaultRepo, group.SnapshotName))
199+
logger.Info(fmt.Sprintf("Pattern: %s (%s)", group.Pattern, group.Kind))
200+
logger.Info(fmt.Sprintf("Indices (%d):", len(group.Indices)))
199201
for _, index := range group.Indices {
200-
fmt.Printf(" %s\n", index)
202+
logger.Info(fmt.Sprintf(" %s", index))
201203
}
202-
fmt.Println("=" + strings.Repeat("=", 30))
204+
logger.Info("=" + strings.Repeat("=", 30))
203205
}
204206

205207
if len(filteredPerRepo) > 0 {
206208
for repo, groups := range filteredPerRepo {
207209
for _, g := range groups {
208-
fmt.Printf("\nSnapshot (repo %s): %s\n", repo, g.SnapshotName)
209-
fmt.Printf("Pattern: %s (%s)\n", g.Pattern, g.Kind)
210-
fmt.Printf("Indices (%d):\n", len(g.Indices))
210+
logger.Info("")
211+
logger.Info(fmt.Sprintf("Snapshot (repo %s): %s", repo, g.SnapshotName))
212+
logger.Info(fmt.Sprintf("Pattern: %s (%s)", g.Pattern, g.Kind))
213+
logger.Info(fmt.Sprintf("Indices (%d):", len(g.Indices)))
211214
for _, index := range g.Indices {
212-
fmt.Printf(" %s\n", index)
215+
logger.Info(fmt.Sprintf(" %s", index))
213216
}
214-
fmt.Println("=" + strings.Repeat("=", 30))
217+
logger.Info("=" + strings.Repeat("=", 30))
215218
}
216219
}
217220
}
@@ -220,7 +223,8 @@ func runSnapshot(cmd *cobra.Command, args []string) error {
220223
for _, groups := range filteredPerRepo {
221224
total += len(groups)
222225
}
223-
fmt.Printf("\nDRY RUN: Would create %d snapshots\n", total)
226+
logger.Info("")
227+
logger.Info(fmt.Sprintf("DRY RUN: Would create %d snapshots", total))
224228
return nil
225229
}
226230

@@ -247,7 +251,46 @@ func runSnapshot(cmd *cobra.Command, args []string) error {
247251
for _, group := range snapshotGroups {
248252
if state, ok, err := utils.CheckSnapshotStateInRepo(client, defaultRepo, group.SnapshotName); err == nil && ok {
249253
if state == "SUCCESS" {
250-
logger.Info(fmt.Sprintf("Valid snapshot already exists snapshot=%s", group.SnapshotName))
254+
missingIndices := make([]string, 0)
255+
for _, snapshot := range allSnapshots {
256+
if snapshot.Snapshot == group.SnapshotName {
257+
for _, idx := range group.Indices {
258+
found := false
259+
for _, snapshotIndex := range snapshot.Indices {
260+
if snapshotIndex == idx {
261+
found = true
262+
break
263+
}
264+
}
265+
if !found {
266+
missingIndices = append(missingIndices, idx)
267+
}
268+
}
269+
break
270+
}
271+
}
272+
if len(missingIndices) == 0 {
273+
logger.Info(fmt.Sprintf("Valid snapshot already exists with all indices snapshot=%s", group.SnapshotName))
274+
continue
275+
}
276+
randomSuffix := utils.GenerateRandomAlphanumericString(6)
277+
parts := strings.Split(group.SnapshotName, "-")
278+
if len(parts) > 0 {
279+
datePart := parts[len(parts)-1]
280+
baseName := strings.Join(parts[:len(parts)-1], "-")
281+
newSnapshotName := baseName + "-" + randomSuffix + "-" + datePart
282+
logger.Info(fmt.Sprintf("Some indices missing in existing snapshot, creating additional snapshot original=%s new=%s missingIndicesCount=%d", group.SnapshotName, newSnapshotName, len(missingIndices)))
283+
indicesStr := strings.Join(missingIndices, ",")
284+
logger.Info(fmt.Sprintf("Creating snapshot %s", newSnapshotName))
285+
logger.Info(fmt.Sprintf("Snapshot indices %s", indicesStr))
286+
err = utils.CreateSnapshotWithRetry(client, newSnapshotName, indicesStr, defaultRepo, madisonClient, logger, 60*time.Second)
287+
if err != nil {
288+
logger.Error(fmt.Sprintf("Failed to create snapshot after retries snapshot=%s error=%v", newSnapshotName, err))
289+
failedSnapshots = append(failedSnapshots, newSnapshotName)
290+
} else {
291+
successfulSnapshots = append(successfulSnapshots, newSnapshotName)
292+
}
293+
}
251294
continue
252295
}
253296
if state == "IN_PROGRESS" {
@@ -295,7 +338,46 @@ func runSnapshot(cmd *cobra.Command, args []string) error {
295338
for _, g := range groups {
296339
if state, ok, err := utils.CheckSnapshotStateInRepo(client, repo, g.SnapshotName); err == nil && ok {
297340
if state == "SUCCESS" {
298-
logger.Info(fmt.Sprintf("Valid snapshot already exists repo=%s snapshot=%s", repo, g.SnapshotName))
341+
missingIndices := make([]string, 0)
342+
for _, snapshot := range existing {
343+
if snapshot.Snapshot == g.SnapshotName {
344+
for _, idx := range g.Indices {
345+
found := false
346+
for _, snapshotIndex := range snapshot.Indices {
347+
if snapshotIndex == idx {
348+
found = true
349+
break
350+
}
351+
}
352+
if !found {
353+
missingIndices = append(missingIndices, idx)
354+
}
355+
}
356+
break
357+
}
358+
}
359+
if len(missingIndices) == 0 {
360+
logger.Info(fmt.Sprintf("Valid snapshot already exists with all indices repo=%s snapshot=%s", repo, g.SnapshotName))
361+
continue
362+
}
363+
randomSuffix := utils.GenerateRandomAlphanumericString(6)
364+
parts := strings.Split(g.SnapshotName, "-")
365+
if len(parts) > 0 {
366+
datePart := parts[len(parts)-1]
367+
baseName := strings.Join(parts[:len(parts)-1], "-")
368+
newSnapshotName := baseName + "-" + randomSuffix + "-" + datePart
369+
logger.Info(fmt.Sprintf("Some indices missing in existing snapshot, creating additional snapshot repo=%s original=%s new=%s missingIndicesCount=%d", repo, g.SnapshotName, newSnapshotName, len(missingIndices)))
370+
indicesStr := strings.Join(missingIndices, ",")
371+
logger.Info(fmt.Sprintf("Creating snapshot repo=%s snapshot=%s", repo, newSnapshotName))
372+
logger.Info(fmt.Sprintf("Snapshot indices %s", indicesStr))
373+
err = utils.CreateSnapshotWithRetry(client, newSnapshotName, indicesStr, repo, madisonClient, logger, 60*time.Second)
374+
if err != nil {
375+
logger.Error(fmt.Sprintf("Failed to create snapshot after retries repo=%s snapshot=%s error=%v", repo, newSnapshotName, err))
376+
failedSnapshots = append(failedSnapshots, fmt.Sprintf("%s (repo=%s)", newSnapshotName, repo))
377+
} else {
378+
successfulSnapshots = append(successfulSnapshots, fmt.Sprintf("%s (repo=%s)", newSnapshotName, repo))
379+
}
380+
}
299381
continue
300382
}
301383
if state == "IN_PROGRESS" {

0 commit comments

Comments
 (0)