@@ -807,6 +807,13 @@ func (co churnOp) patchParams(w *workload) (realOp, error) {
807
807
return & co , nil
808
808
}
809
809
810
+ type SchedulingStage string
811
+
812
+ const (
813
+ Scheduled SchedulingStage = "Scheduled"
814
+ Attempted SchedulingStage = "Attempted"
815
+ )
816
+
810
817
// barrierOp defines an op that can be used to wait until all scheduled pods of
811
818
// one or many namespaces have been bound to nodes. This is useful when pods
812
819
// were scheduled with SkipWaitToCompletion set to true.
@@ -816,9 +823,16 @@ type barrierOp struct {
816
823
// Namespaces to block on. Empty array or not specifying this field signifies
817
824
// that the barrier should block on all namespaces.
818
825
Namespaces []string
826
+ // Determines what stage of pods scheduling the barrier should wait for.
827
+ // If empty, it is interpreted as "Scheduled".
828
+ // Optional
829
+ StageRequirement SchedulingStage
819
830
}
820
831
821
832
func (bo * barrierOp ) isValid (allowParameterization bool ) error {
833
+ if bo .StageRequirement != "" && bo .StageRequirement != Scheduled && bo .StageRequirement != Attempted {
834
+ return fmt .Errorf ("invalid StageRequirement %s" , bo .StageRequirement )
835
+ }
822
836
return nil
823
837
}
824
838
@@ -827,6 +841,9 @@ func (*barrierOp) collectsMetrics() bool {
827
841
}
828
842
829
843
func (bo barrierOp ) patchParams (w * workload ) (realOp , error ) {
844
+ if bo .StageRequirement == "" {
845
+ bo .StageRequirement = Scheduled
846
+ }
830
847
return & bo , nil
831
848
}
832
849
@@ -1565,16 +1582,26 @@ func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFact
1565
1582
tCtx .Fatalf ("op %d: unknown namespace %s" , opIndex , namespace )
1566
1583
}
1567
1584
}
1568
- if err := waitUntilPodsScheduled (tCtx , podInformer , concreteOp .Namespaces , numPodsScheduledPerNamespace ); err != nil {
1569
- tCtx .Fatalf ("op %d: %v" , opIndex , err )
1570
- }
1571
- // At the end of the barrier, we can be sure that there are no pods
1572
- // pending scheduling in the namespaces that we just blocked on.
1573
- if len (concreteOp .Namespaces ) == 0 {
1574
- numPodsScheduledPerNamespace = make (map [string ]int )
1575
- } else {
1576
- for _ , namespace := range concreteOp .Namespaces {
1577
- delete (numPodsScheduledPerNamespace , namespace )
1585
+ switch concreteOp .StageRequirement {
1586
+ case Attempted :
1587
+ if err := waitUntilPodsAttempted (tCtx , podInformer , concreteOp .Namespaces , numPodsScheduledPerNamespace ); err != nil {
1588
+ tCtx .Fatalf ("op %d: %v" , opIndex , err )
1589
+ }
1590
+ case Scheduled :
1591
+ // Default should be treated like "Scheduled", so handling both in the same way.
1592
+ fallthrough
1593
+ default :
1594
+ if err := waitUntilPodsScheduled (tCtx , podInformer , concreteOp .Namespaces , numPodsScheduledPerNamespace ); err != nil {
1595
+ tCtx .Fatalf ("op %d: %v" , opIndex , err )
1596
+ }
1597
+ // At the end of the barrier, we can be sure that there are no pods
1598
+ // pending scheduling in the namespaces that we just blocked on.
1599
+ if len (concreteOp .Namespaces ) == 0 {
1600
+ numPodsScheduledPerNamespace = make (map [string ]int )
1601
+ } else {
1602
+ for _ , namespace := range concreteOp .Namespaces {
1603
+ delete (numPodsScheduledPerNamespace , namespace )
1604
+ }
1578
1605
}
1579
1606
}
1580
1607
@@ -1845,7 +1872,7 @@ func waitUntilPodsScheduledInNamespace(tCtx ktesting.TContext, podInformer corei
1845
1872
return true , ctx .Err ()
1846
1873
default :
1847
1874
}
1848
- scheduled , unscheduled , err := getScheduledPods (podInformer , namespace )
1875
+ scheduled , attempted , unattempted , err := getScheduledPods (podInformer , namespace )
1849
1876
if err != nil {
1850
1877
return false , err
1851
1878
}
@@ -1854,16 +1881,53 @@ func waitUntilPodsScheduledInNamespace(tCtx ktesting.TContext, podInformer corei
1854
1881
return true , nil
1855
1882
}
1856
1883
tCtx .Logf ("namespace: %s, pods: want %d, got %d" , namespace , wantCount , len (scheduled ))
1857
- if len (unscheduled ) > 0 {
1858
- pendingPod = unscheduled [0 ]
1884
+ if len (attempted ) > 0 {
1885
+ pendingPod = attempted [0 ]
1886
+ } else if len (unattempted ) > 0 {
1887
+ pendingPod = unattempted [0 ]
1859
1888
} else {
1860
1889
pendingPod = nil
1861
1890
}
1862
1891
return false , nil
1863
1892
})
1864
1893
1865
1894
if err != nil && pendingPod != nil {
1866
- err = fmt .Errorf ("at least pod %s is not scheduled: %v" , klog .KObj (pendingPod ), err )
1895
+ err = fmt .Errorf ("at least pod %s is not scheduled: %w" , klog .KObj (pendingPod ), err )
1896
+ }
1897
+ return err
1898
+ }
1899
+
1900
+ // waitUntilPodsAttemptedInNamespace blocks until all pods in the given
1901
+ // namespace at least once went through a schedyling cycle.
1902
+ // Times out after 10 minutes similarly to waitUntilPodsScheduledInNamespace.
1903
+ func waitUntilPodsAttemptedInNamespace (tCtx ktesting.TContext , podInformer coreinformers.PodInformer , namespace string , wantCount int ) error {
1904
+ var pendingPod * v1.Pod
1905
+
1906
+ err := wait .PollUntilContextTimeout (tCtx , 1 * time .Second , 10 * time .Minute , true , func (ctx context.Context ) (bool , error ) {
1907
+ select {
1908
+ case <- ctx .Done ():
1909
+ return true , ctx .Err ()
1910
+ default :
1911
+ }
1912
+ scheduled , attempted , unattempted , err := getScheduledPods (podInformer , namespace )
1913
+ if err != nil {
1914
+ return false , err
1915
+ }
1916
+ if len (scheduled )+ len (attempted ) >= wantCount {
1917
+ tCtx .Logf ("all pods attempted to be scheduled" )
1918
+ return true , nil
1919
+ }
1920
+ tCtx .Logf ("namespace: %s, attempted pods: want %d, got %d" , namespace , wantCount , len (scheduled )+ len (attempted ))
1921
+ if len (unattempted ) > 0 {
1922
+ pendingPod = unattempted [0 ]
1923
+ } else {
1924
+ pendingPod = nil
1925
+ }
1926
+ return false , nil
1927
+ })
1928
+
1929
+ if err != nil && pendingPod != nil {
1930
+ err = fmt .Errorf ("at least pod %s is not attempted: %w" , klog .KObj (pendingPod ), err )
1867
1931
}
1868
1932
return err
1869
1933
}
@@ -1894,6 +1958,32 @@ func waitUntilPodsScheduled(tCtx ktesting.TContext, podInformer coreinformers.Po
1894
1958
return nil
1895
1959
}
1896
1960
1961
+ // waitUntilPodsAttempted blocks until the all pods in the given namespaces are
1962
+ // attempted (at least once went through a schedyling cycle).
1963
+ func waitUntilPodsAttempted (tCtx ktesting.TContext , podInformer coreinformers.PodInformer , namespaces []string , numPodsScheduledPerNamespace map [string ]int ) error {
1964
+ // If unspecified, default to all known namespaces.
1965
+ if len (namespaces ) == 0 {
1966
+ for namespace := range numPodsScheduledPerNamespace {
1967
+ namespaces = append (namespaces , namespace )
1968
+ }
1969
+ }
1970
+ for _ , namespace := range namespaces {
1971
+ select {
1972
+ case <- tCtx .Done ():
1973
+ return context .Cause (tCtx )
1974
+ default :
1975
+ }
1976
+ wantCount , ok := numPodsScheduledPerNamespace [namespace ]
1977
+ if ! ok {
1978
+ return fmt .Errorf ("unknown namespace %s" , namespace )
1979
+ }
1980
+ if err := waitUntilPodsAttemptedInNamespace (tCtx , podInformer , namespace , wantCount ); err != nil {
1981
+ return fmt .Errorf ("error waiting for pods in namespace %q: %w" , namespace , err )
1982
+ }
1983
+ }
1984
+ return nil
1985
+ }
1986
+
1897
1987
func getSpecFromFile (path * string , spec interface {}) error {
1898
1988
bytes , err := os .ReadFile (* path )
1899
1989
if err != nil {
0 commit comments