Skip to content

Commit 8275ab1

Browse files
feat(syncwaves) - use binary tree ordering for sync waves
Signed-off-by: SebastienFelix <[email protected]>
1 parent 093aef0 commit 8275ab1

File tree

10 files changed

+706
-57
lines changed

10 files changed

+706
-57
lines changed

pkg/sync/common/types.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ const (
1212
AnnotationSyncOptions = "argocd.argoproj.io/sync-options"
1313
// AnnotationSyncWave indicates which wave of the sync the resource or hook should be in
1414
AnnotationSyncWave = "argocd.argoproj.io/sync-wave"
15+
// AnnotationUseBinaryTreeWaveOrdering indicates if the resource or hook's wave should be ordered using a binary tree ordering
16+
AnnotationUseBinaryTreeWaveOrdering = "argocd.argoproj.io/use-binary-tree-wave-ordering"
1517
// AnnotationKeyHook contains the hook type of a resource
1618
AnnotationKeyHook = "argocd.argoproj.io/hook"
1719
// AnnotationKeyHookDeletePolicy is the policy of deleting a hook
@@ -58,7 +60,7 @@ type SyncPhase string
5860
// SyncWaveHook is a callback function which will be invoked after each sync wave is successfully
5961
// applied during a sync operation. The callback indicates which phase and wave it had just
6062
// executed, and whether or not that wave was the final one.
61-
type SyncWaveHook func(phase SyncPhase, wave int, final bool) error
63+
type SyncWaveHook func(phase SyncPhase, waves []int, final bool) error
6264

6365
const (
6466
SyncPhasePreSync = "PreSync"

pkg/sync/doc.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ Package implements Kubernetes resources synchronization and provides the followi
44
- resource pruning
55
- resource hooks
66
- sync waves
7+
- sync waves binary tree ordering
78
- sync options
89
910
# Basic Syncing
@@ -75,6 +76,31 @@ that runs before all other resources. The `argocd.argoproj.io/sync-wave` annotat
7576
annotations:
7677
argocd.argoproj.io/sync-wave: "5"
7778
79+
# Sync Waves Binary Tree Ordering
80+
81+
The wave ordering using a binary tree feature allows to run parallel waves of synchronisation where the sync-wave values
82+
correspond to a complete binary tree with root's label equal to 1. A sync-wave value X would be considered less than Y
83+
when using binary tree ordering if and only if there exists integers N and M such that :
84+
Y = X * 2**N + M where 0 <= M < N.
85+
86+
The `argocd.argoproj.io/use-binary-tree-wave-ordering` annotation define the type of wave's ordering used for a resource's wave:
87+
88+
metadata:
89+
annotations:
90+
argocd.argoproj.io/sync-wave: "5"
91+
argocd.argoproj.io/use-binary-tree-wave-ordering: "true"
92+
93+
example of waves ordering using binary tree:
94+
95+
1 -----> 2 -----> 4
96+
\ \----> 5
97+
\---> 3 -----> 6
98+
\----> 7
99+
100+
Note that a resource using a binary tree ordering for sync waves will always be synced after all resources using a normal ordering.
101+
Note also that all resources using a binary tree ordering and having a sync wave value inferior to 1 will behave like resources using
102+
a normal wave ordering.
103+
78104
# Sync Options
79105
80106
The sync options allows customizing the synchronization of selected resources. The options are specified using the

pkg/sync/sync_context.go

Lines changed: 112 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import (
44
"context"
55
"encoding/json"
66
"fmt"
7+
"reflect"
8+
"slices"
79
"sort"
810
"strings"
911
"sync"
@@ -562,24 +564,27 @@ func (sc *syncContext) Sync() {
562564

563565
// remove any tasks not in this wave
564566
phase := tasks.phase()
565-
wave := tasks.wave()
566-
finalWave := phase == tasks.lastPhase() && wave == tasks.lastWave()
567+
waves, wavesUseBinaryTreeOrdering := tasks.waves()
568+
lastWaves, lastWavesUseBinaryTreeOrdering := tasks.lastWaves()
569+
finalWaves := phase == tasks.lastPhase() && reflect.DeepEqual(waves, lastWaves) && wavesUseBinaryTreeOrdering == lastWavesUseBinaryTreeOrdering
567570

568571
// if it is the last phase/wave and the only remaining tasks are non-hooks, the we are successful
569572
// EVEN if those objects subsequently degraded
570573
// This handles the common case where neither hooks or waves are used and a sync equates to simply an (asynchronous) kubectl apply of manifests, which succeeds immediately.
571-
remainingTasks := tasks.Filter(func(t *syncTask) bool { return t.phase != phase || wave != t.wave() || t.isHook() })
574+
remainingTasks := tasks.Filter(func(t *syncTask) bool { return t.phase != phase || !slices.Contains(waves, t.wave()) || t.isHook() })
572575

573-
sc.log.WithValues("phase", phase, "wave", wave, "tasks", tasks, "syncFailTasks", syncFailTasks).V(1).Info("Filtering tasks in correct phase and wave")
574-
tasks = tasks.Filter(func(t *syncTask) bool { return t.phase == phase && t.wave() == wave })
576+
sc.log.WithValues("phase", phase, "wave", waves, "tasks", tasks, "syncFailTasks", syncFailTasks).V(1).Info("Filtering tasks in correct phase and wave")
577+
tasks = tasks.Filter(func(t *syncTask) bool {
578+
return t.phase == phase && slices.Contains(waves, t.wave()) && t.waveUseBinaryTreeOrdering() == wavesUseBinaryTreeOrdering
579+
})
575580

576581
sc.setOperationPhase(common.OperationRunning, "one or more tasks are running")
577582

578583
sc.log.WithValues("tasks", tasks).V(1).Info("Wet-run")
579584
runState := sc.runTasks(tasks, false)
580585

581586
if sc.syncWaveHook != nil && runState != failed {
582-
err := sc.syncWaveHook(phase, wave, finalWave)
587+
err := sc.syncWaveHook(phase, waves, finalWaves)
583588
if err != nil {
584589
sc.deleteHooks(hooksPendingDeletionFailed)
585590
sc.setOperationPhase(common.OperationFailed, fmt.Sprintf("SyncWaveHook failed: %v", err))
@@ -900,51 +905,134 @@ func (sc *syncContext) getSyncTasks() (_ syncTasks, successful bool) {
900905
}
901906

902907
// for prune tasks, modify the waves for proper cleanup i.e reverse of sync wave (creation order)
903-
pruneTasks := make(map[int][]*syncTask)
908+
// if all prune tasks have a normal syncWaveUseBinaryTreeOrdering, use the legacy method. Otherwise, change the
909+
// syncWaveUseBinaryTreeOrdering of all prune tasks to BTree and modify the waves to decreasing power of 2.
910+
// For prune tasks which already had a BTree syncWaveUseBinaryTreeOrdering, set an identical syncWave to tasks which
911+
// have the same level in a complete binary tree rooted at 1 where each node n has 2*n and 2*n+1 as children.
912+
913+
pruntTasksUsingNormalOrdering := make(map[int][]*syncTask)
904914
for _, task := range tasks {
905-
if task.isPrune() {
906-
pruneTasks[task.wave()] = append(pruneTasks[task.wave()], task)
915+
if task.isPrune() && task.waveUseBinaryTreeOrdering() == "false" {
916+
pruntTasksUsingNormalOrdering[task.wave()] = append(pruntTasksUsingNormalOrdering[task.wave()], task)
907917
}
908918
}
919+
var uniquePruneWavesUsingNormalOrdering []int
920+
for k := range pruntTasksUsingNormalOrdering {
921+
uniquePruneWavesUsingNormalOrdering = append(uniquePruneWavesUsingNormalOrdering, k)
922+
}
909923

910-
var uniquePruneWaves []int
911-
for k := range pruneTasks {
912-
uniquePruneWaves = append(uniquePruneWaves, k)
924+
sort.Ints(uniquePruneWavesUsingNormalOrdering)
925+
pruneTasksUsingBinaryTreeOrdering := make(map[int][]*syncTask)
926+
for _, task := range tasks {
927+
if task.isPrune() && task.waveUseBinaryTreeOrdering() == "true" {
928+
pruneTasksUsingBinaryTreeOrdering[task.wave()] = append(pruneTasksUsingBinaryTreeOrdering[task.wave()], task)
929+
}
913930
}
914-
sort.Ints(uniquePruneWaves)
915931

916-
// reorder waves for pruning tasks using symmetric swap on prune waves
917-
n := len(uniquePruneWaves)
918-
for i := 0; i < n/2; i++ {
919-
// waves to swap
920-
startWave := uniquePruneWaves[i]
921-
endWave := uniquePruneWaves[n-1-i]
932+
if len(pruneTasksUsingBinaryTreeOrdering) > 0 {
933+
var uniquePruneWavesUsingBinaryTreeOrdering []int
934+
for k := range pruneTasksUsingBinaryTreeOrdering {
935+
uniquePruneWavesUsingBinaryTreeOrdering = append(uniquePruneWavesUsingBinaryTreeOrdering, k)
936+
}
937+
sort.Ints(uniquePruneWavesUsingBinaryTreeOrdering)
938+
939+
pruneTasksWavesValues := []int{0}
940+
for i := 1; i < len(uniquePruneWavesUsingNormalOrdering); i++ {
941+
pruneTasksWavesValues = append(pruneTasksWavesValues, i)
942+
}
943+
nextPotentialWaveValue := len(uniquePruneWavesUsingNormalOrdering)
944+
if len(uniquePruneWavesUsingNormalOrdering) != 0 {
945+
pruneTasksWavesValues = append(pruneTasksWavesValues, nextPotentialWaveValue)
946+
}
947+
for i := 1; i < len(uniquePruneWavesUsingBinaryTreeOrdering); i++ {
948+
currentBTreeWaveLevel := biggestPowerOf2InferiorThan(uniquePruneWavesUsingBinaryTreeOrdering[i])
949+
previousBTreeWaveLevel := biggestPowerOf2InferiorThan(uniquePruneWavesUsingBinaryTreeOrdering[i-1])
950+
if currentBTreeWaveLevel == previousBTreeWaveLevel {
951+
pruneTasksWavesValues = append(pruneTasksWavesValues, nextPotentialWaveValue)
952+
} else {
953+
nextPotentialWaveValue++
954+
pruneTasksWavesValues = append(pruneTasksWavesValues, nextPotentialWaveValue)
955+
}
956+
}
957+
958+
pruneTasksWavesNewValues := PowInt(2, pruneTasksWavesValues[len(pruneTasksWavesValues)-1])
959+
newPruneWaves := []int{pruneTasksWavesNewValues}
960+
n := len(pruneTasksWavesValues)
961+
for i := 1; i < len(pruneTasksWavesValues); i++ {
962+
if pruneTasksWavesValues[n-i-1] == pruneTasksWavesValues[n-i] {
963+
newPruneWaves = append(newPruneWaves, pruneTasksWavesNewValues)
964+
} else {
965+
pruneTasksWavesNewValues /= 2
966+
newPruneWaves = append(newPruneWaves, pruneTasksWavesNewValues)
967+
}
968+
}
969+
970+
bTreeWaveUseBinaryTreeOrdering := "true"
971+
972+
for i := range uniquePruneWavesUsingNormalOrdering {
973+
// Normal waves to reorder
974+
iWave := uniquePruneWavesUsingNormalOrdering[i]
975+
976+
for _, task := range pruntTasksUsingNormalOrdering[iWave] {
977+
task.waveOverride = &newPruneWaves[i]
978+
task.waveUseBinaryTreeOrderingOverride = &bTreeWaveUseBinaryTreeOrdering
979+
}
980+
}
922981

923-
for _, task := range pruneTasks[startWave] {
924-
task.waveOverride = &endWave
982+
n = len(uniquePruneWavesUsingNormalOrdering)
983+
for i := range uniquePruneWavesUsingBinaryTreeOrdering {
984+
// BTree waves to reorder
985+
iWave := uniquePruneWavesUsingBinaryTreeOrdering[i]
986+
987+
for _, task := range pruneTasksUsingBinaryTreeOrdering[iWave] {
988+
task.waveOverride = &(newPruneWaves[n+i])
989+
task.waveUseBinaryTreeOrderingOverride = &bTreeWaveUseBinaryTreeOrdering
990+
}
925991
}
992+
} else {
993+
994+
// reorder waves for pruning tasks using symmetric swap on prune waves
995+
n := len(uniquePruneWavesUsingNormalOrdering)
996+
for i := 0; i < n/2; i++ {
997+
// waves to swap
998+
startWave := uniquePruneWavesUsingNormalOrdering[i]
999+
endWave := uniquePruneWavesUsingNormalOrdering[n-1-i]
1000+
1001+
for _, task := range pruntTasksUsingNormalOrdering[startWave] {
1002+
task.waveOverride = &endWave
1003+
}
9261004

927-
for _, task := range pruneTasks[endWave] {
928-
task.waveOverride = &startWave
1005+
for _, task := range pruntTasksUsingNormalOrdering[endWave] {
1006+
task.waveOverride = &startWave
1007+
}
9291008
}
9301009
}
9311010

9321011
// for pruneLast tasks, modify the wave to sync phase last wave of tasks + 1
9331012
// to ensure proper cleanup, syncPhaseLastWave should also consider prune tasks to determine last wave
9341013
syncPhaseLastWave := 0
1014+
syncPhaseLastWaveUseBinaryTreeOrdering := "false"
9351015
for _, task := range tasks {
9361016
if task.phase == common.SyncPhaseSync {
9371017
if task.wave() > syncPhaseLastWave {
9381018
syncPhaseLastWave = task.wave()
1019+
syncPhaseLastWaveUseBinaryTreeOrdering = task.waveUseBinaryTreeOrdering()
9391020
}
9401021
}
9411022
}
942-
syncPhaseLastWave = syncPhaseLastWave + 1
1023+
1024+
// if prune tasks contain BTree ordering syncWaves, then set the tasks with PruneLast
1025+
if syncPhaseLastWaveUseBinaryTreeOrdering == "false" {
1026+
syncPhaseLastWave++
1027+
} else {
1028+
syncPhaseLastWave *= 2
1029+
}
9431030

9441031
for _, task := range tasks {
9451032
if task.isPrune() &&
9461033
(sc.pruneLast || resourceutil.HasAnnotationOption(task.liveObj, common.AnnotationSyncOptions, common.SyncOptionPruneLast)) {
9471034
task.waveOverride = &syncPhaseLastWave
1035+
task.waveUseBinaryTreeOrderingOverride = &syncPhaseLastWaveUseBinaryTreeOrdering
9481036
}
9491037
}
9501038

0 commit comments

Comments
 (0)