5
5
"encoding/json"
6
6
"fmt"
7
7
"sort"
8
- "strconv"
9
8
"strings"
10
9
"sync"
11
10
"time"
@@ -457,6 +456,18 @@ func (sc *syncContext) Sync() {
457
456
return
458
457
}
459
458
459
+ // if pruned tasks pending deletion, then wait...
460
+ prunedTasksPendingDelete := tasks .Filter (func (t * syncTask ) bool {
461
+ if t .pruned () && t .liveObj != nil {
462
+ return t .liveObj .GetDeletionTimestamp () != nil
463
+ }
464
+ return false
465
+ })
466
+ if prunedTasksPendingDelete .Len () > 0 {
467
+ sc .setRunningPhase (prunedTasksPendingDelete , true )
468
+ return
469
+ }
470
+
460
471
// collect all completed hooks which have appropriate delete policy
461
472
hooksPendingDeletionSuccessful := tasks .Filter (func (task * syncTask ) bool {
462
473
return task .isHook () && task .liveObj != nil && ! task .running () && task .deleteOnPhaseSuccessful ()
@@ -747,11 +758,42 @@ func (sc *syncContext) getSyncTasks() (_ syncTasks, successful bool) {
747
758
}
748
759
}
749
760
750
- // for pruneLast tasks, modify the wave to sync phase last wave of non prune task +1
761
+ // for prune tasks, modify the waves for proper cleanup i.e reverse of sync wave (creation order)
762
+ pruneTasks := make (map [int ][]* syncTask )
763
+ for _ , task := range tasks {
764
+ if task .isPrune () {
765
+ pruneTasks [task .wave ()] = append (pruneTasks [task .wave ()], task )
766
+ }
767
+ }
768
+
769
+ var uniquePruneWaves []int
770
+ for k := range pruneTasks {
771
+ uniquePruneWaves = append (uniquePruneWaves , k )
772
+ }
773
+ sort .Ints (uniquePruneWaves )
774
+
775
+ // reorder waves for pruning tasks using symmetric swap on prune waves
776
+ n := len (uniquePruneWaves )
777
+ for i := 0 ; i < n / 2 ; i ++ {
778
+ // waves to swap
779
+ startWave := uniquePruneWaves [i ]
780
+ endWave := uniquePruneWaves [n - 1 - i ]
781
+
782
+ for _ , task := range pruneTasks [startWave ] {
783
+ task .waveOverride = & endWave
784
+ }
785
+
786
+ for _ , task := range pruneTasks [endWave ] {
787
+ task .waveOverride = & startWave
788
+ }
789
+ }
790
+
791
+ // for pruneLast tasks, modify the wave to sync phase last wave of tasks + 1
792
+ // to ensure proper cleanup, syncPhaseLastWave should also consider prune tasks to determine last wave
751
793
syncPhaseLastWave := 0
752
794
for _ , task := range tasks {
753
795
if task .phase == common .SyncPhaseSync {
754
- if task .wave () > syncPhaseLastWave && ! task . isPrune () {
796
+ if task .wave () > syncPhaseLastWave {
755
797
syncPhaseLastWave = task .wave ()
756
798
}
757
799
}
@@ -761,12 +803,7 @@ func (sc *syncContext) getSyncTasks() (_ syncTasks, successful bool) {
761
803
for _ , task := range tasks {
762
804
if task .isPrune () &&
763
805
(sc .pruneLast || resourceutil .HasAnnotationOption (task .liveObj , common .AnnotationSyncOptions , common .SyncOptionPruneLast )) {
764
- annotations := task .liveObj .GetAnnotations ()
765
- if annotations == nil {
766
- annotations = make (map [string ]string )
767
- }
768
- annotations [common .AnnotationSyncWave ] = strconv .Itoa (syncPhaseLastWave )
769
- task .liveObj .SetAnnotations (annotations )
806
+ task .waveOverride = & syncPhaseLastWave
770
807
}
771
808
}
772
809
0 commit comments