@@ -807,6 +807,13 @@ func (co churnOp) patchParams(w *workload) (realOp, error) {
807
807
return & co , nil
808
808
}
809
809
810
+ type SchedulingStage string
811
+
812
+ const (
813
+ Scheduled SchedulingStage = "Scheduled"
814
+ Attempted SchedulingStage = "Attempted"
815
+ )
816
+
810
817
// barrierOp defines an op that can be used to wait until all scheduled pods of
811
818
// one or many namespaces have been bound to nodes. This is useful when pods
812
819
// were scheduled with SkipWaitToCompletion set to true.
@@ -816,9 +823,16 @@ type barrierOp struct {
816
823
// Namespaces to block on. Empty array or not specifying this field signifies
817
824
// that the barrier should block on all namespaces.
818
825
Namespaces []string
826
+ // Determines what stage of pods scheduling the barrier should wait for.
827
+ // If empty, it is interpreted as "Scheduled".
828
+ // Optional
829
+ StageRequirement SchedulingStage
819
830
}
820
831
821
832
func (bo * barrierOp ) isValid (allowParameterization bool ) error {
833
+ if bo .StageRequirement != "" && bo .StageRequirement != Scheduled && bo .StageRequirement != Attempted {
834
+ return fmt .Errorf ("invalid StageRequirement %s" , bo .StageRequirement )
835
+ }
822
836
return nil
823
837
}
824
838
@@ -827,6 +841,9 @@ func (*barrierOp) collectsMetrics() bool {
827
841
}
828
842
829
843
func (bo barrierOp ) patchParams (w * workload ) (realOp , error ) {
844
+ if bo .StageRequirement == "" {
845
+ bo .StageRequirement = Scheduled
846
+ }
830
847
return & bo , nil
831
848
}
832
849
@@ -1561,16 +1578,26 @@ func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFact
1561
1578
tCtx .Fatalf ("op %d: unknown namespace %s" , opIndex , namespace )
1562
1579
}
1563
1580
}
1564
- if err := waitUntilPodsScheduled (tCtx , podInformer , concreteOp .Namespaces , numPodsScheduledPerNamespace ); err != nil {
1565
- tCtx .Fatalf ("op %d: %v" , opIndex , err )
1566
- }
1567
- // At the end of the barrier, we can be sure that there are no pods
1568
- // pending scheduling in the namespaces that we just blocked on.
1569
- if len (concreteOp .Namespaces ) == 0 {
1570
- numPodsScheduledPerNamespace = make (map [string ]int )
1571
- } else {
1572
- for _ , namespace := range concreteOp .Namespaces {
1573
- delete (numPodsScheduledPerNamespace , namespace )
1581
+ switch concreteOp .StageRequirement {
1582
+ case Attempted :
1583
+ if err := waitUntilPodsAttempted (tCtx , podInformer , concreteOp .Namespaces , numPodsScheduledPerNamespace ); err != nil {
1584
+ tCtx .Fatalf ("op %d: %v" , opIndex , err )
1585
+ }
1586
+ case Scheduled :
1587
+ // Default should be treated like "Scheduled", so handling both in the same way.
1588
+ fallthrough
1589
+ default :
1590
+ if err := waitUntilPodsScheduled (tCtx , podInformer , concreteOp .Namespaces , numPodsScheduledPerNamespace ); err != nil {
1591
+ tCtx .Fatalf ("op %d: %v" , opIndex , err )
1592
+ }
1593
+ // At the end of the barrier, we can be sure that there are no pods
1594
+ // pending scheduling in the namespaces that we just blocked on.
1595
+ if len (concreteOp .Namespaces ) == 0 {
1596
+ numPodsScheduledPerNamespace = make (map [string ]int )
1597
+ } else {
1598
+ for _ , namespace := range concreteOp .Namespaces {
1599
+ delete (numPodsScheduledPerNamespace , namespace )
1600
+ }
1574
1601
}
1575
1602
}
1576
1603
@@ -1841,7 +1868,7 @@ func waitUntilPodsScheduledInNamespace(tCtx ktesting.TContext, podInformer corei
1841
1868
return true , ctx .Err ()
1842
1869
default :
1843
1870
}
1844
- scheduled , unscheduled , err := getScheduledPods (podInformer , namespace )
1871
+ scheduled , attempted , unattempted , err := getScheduledPods (podInformer , namespace )
1845
1872
if err != nil {
1846
1873
return false , err
1847
1874
}
@@ -1850,16 +1877,53 @@ func waitUntilPodsScheduledInNamespace(tCtx ktesting.TContext, podInformer corei
1850
1877
return true , nil
1851
1878
}
1852
1879
tCtx .Logf ("namespace: %s, pods: want %d, got %d" , namespace , wantCount , len (scheduled ))
1853
- if len (unscheduled ) > 0 {
1854
- pendingPod = unscheduled [0 ]
1880
+ if len (attempted ) > 0 {
1881
+ pendingPod = attempted [0 ]
1882
+ } else if len (unattempted ) > 0 {
1883
+ pendingPod = unattempted [0 ]
1855
1884
} else {
1856
1885
pendingPod = nil
1857
1886
}
1858
1887
return false , nil
1859
1888
})
1860
1889
1861
1890
if err != nil && pendingPod != nil {
1862
- err = fmt .Errorf ("at least pod %s is not scheduled: %v" , klog .KObj (pendingPod ), err )
1891
+ err = fmt .Errorf ("at least pod %s is not scheduled: %w" , klog .KObj (pendingPod ), err )
1892
+ }
1893
+ return err
1894
+ }
1895
+
1896
+ // waitUntilPodsAttemptedInNamespace blocks until all pods in the given
1897
+ // namespace at least once went through a schedyling cycle.
1898
+ // Times out after 10 minutes similarly to waitUntilPodsScheduledInNamespace.
1899
+ func waitUntilPodsAttemptedInNamespace (tCtx ktesting.TContext , podInformer coreinformers.PodInformer , namespace string , wantCount int ) error {
1900
+ var pendingPod * v1.Pod
1901
+
1902
+ err := wait .PollUntilContextTimeout (tCtx , 1 * time .Second , 10 * time .Minute , true , func (ctx context.Context ) (bool , error ) {
1903
+ select {
1904
+ case <- ctx .Done ():
1905
+ return true , ctx .Err ()
1906
+ default :
1907
+ }
1908
+ scheduled , attempted , unattempted , err := getScheduledPods (podInformer , namespace )
1909
+ if err != nil {
1910
+ return false , err
1911
+ }
1912
+ if len (scheduled )+ len (attempted ) >= wantCount {
1913
+ tCtx .Logf ("all pods attempted to be scheduled" )
1914
+ return true , nil
1915
+ }
1916
+ tCtx .Logf ("namespace: %s, attempted pods: want %d, got %d" , namespace , wantCount , len (scheduled )+ len (attempted ))
1917
+ if len (unattempted ) > 0 {
1918
+ pendingPod = unattempted [0 ]
1919
+ } else {
1920
+ pendingPod = nil
1921
+ }
1922
+ return false , nil
1923
+ })
1924
+
1925
+ if err != nil && pendingPod != nil {
1926
+ err = fmt .Errorf ("at least pod %s is not attempted: %w" , klog .KObj (pendingPod ), err )
1863
1927
}
1864
1928
return err
1865
1929
}
@@ -1890,6 +1954,32 @@ func waitUntilPodsScheduled(tCtx ktesting.TContext, podInformer coreinformers.Po
1890
1954
return nil
1891
1955
}
1892
1956
1957
+ // waitUntilPodsAttempted blocks until the all pods in the given namespaces are
1958
+ // attempted (at least once went through a schedyling cycle).
1959
+ func waitUntilPodsAttempted (tCtx ktesting.TContext , podInformer coreinformers.PodInformer , namespaces []string , numPodsScheduledPerNamespace map [string ]int ) error {
1960
+ // If unspecified, default to all known namespaces.
1961
+ if len (namespaces ) == 0 {
1962
+ for namespace := range numPodsScheduledPerNamespace {
1963
+ namespaces = append (namespaces , namespace )
1964
+ }
1965
+ }
1966
+ for _ , namespace := range namespaces {
1967
+ select {
1968
+ case <- tCtx .Done ():
1969
+ return context .Cause (tCtx )
1970
+ default :
1971
+ }
1972
+ wantCount , ok := numPodsScheduledPerNamespace [namespace ]
1973
+ if ! ok {
1974
+ return fmt .Errorf ("unknown namespace %s" , namespace )
1975
+ }
1976
+ if err := waitUntilPodsAttemptedInNamespace (tCtx , podInformer , namespace , wantCount ); err != nil {
1977
+ return fmt .Errorf ("error waiting for pods in namespace %q: %w" , namespace , err )
1978
+ }
1979
+ }
1980
+ return nil
1981
+ }
1982
+
1893
1983
func getSpecFromFile (path * string , spec interface {}) error {
1894
1984
bytes , err := os .ReadFile (* path )
1895
1985
if err != nil {
0 commit comments