@@ -1194,7 +1194,10 @@ func RunBenchmarkPerfScheduling(b *testing.B, configFile string, topicName strin
1194
1194
b .Fatalf ("workload %s is not valid: %v" , w .Name , err )
1195
1195
}
1196
1196
1197
- results := runWorkload (tCtx , tc , w , informerFactory )
1197
+ results , err := runWorkload (tCtx , tc , w , informerFactory )
1198
+ if err != nil {
1199
+ tCtx .Fatalf ("%w: %s" , w .Name , err )
1200
+ }
1198
1201
dataItems .DataItems = append (dataItems .DataItems , results ... )
1199
1202
1200
1203
if len (results ) > 0 {
@@ -1410,7 +1413,7 @@ func checkEmptyInFlightEvents() error {
1410
1413
return nil
1411
1414
}
1412
1415
1413
- func startCollectingMetrics (tCtx ktesting.TContext , collectorWG * sync.WaitGroup , podInformer coreinformers.PodInformer , mcc * metricsCollectorConfig , throughputErrorMargin float64 , opIndex int , name string , namespaces []string , labelSelector map [string ]string ) (ktesting.TContext , []testDataCollector ) {
1416
+ func startCollectingMetrics (tCtx ktesting.TContext , collectorWG * sync.WaitGroup , podInformer coreinformers.PodInformer , mcc * metricsCollectorConfig , throughputErrorMargin float64 , opIndex int , name string , namespaces []string , labelSelector map [string ]string ) (ktesting.TContext , []testDataCollector , error ) {
1414
1417
collectorCtx := ktesting .WithCancel (tCtx )
1415
1418
workloadName := tCtx .Name ()
1416
1419
// The first part is the same for each workload, therefore we can strip it.
@@ -1421,20 +1424,20 @@ func startCollectingMetrics(tCtx ktesting.TContext, collectorWG *sync.WaitGroup,
1421
1424
collector := collector
1422
1425
err := collector .init ()
1423
1426
if err != nil {
1424
- tCtx . Fatalf ("op %d: Failed to initialize data collector: %v" , opIndex , err )
1427
+ return nil , nil , fmt . Errorf ("op %d: Failed to initialize data collector: %v" , opIndex , err )
1425
1428
}
1426
1429
collectorWG .Add (1 )
1427
1430
go func () {
1428
1431
defer collectorWG .Done ()
1429
1432
collector .run (collectorCtx )
1430
1433
}()
1431
1434
}
1432
- return collectorCtx , collectors
1435
+ return collectorCtx , collectors , nil
1433
1436
}
1434
1437
1435
- func stopCollectingMetrics (tCtx ktesting.TContext , collectorCtx ktesting.TContext , collectorWG * sync.WaitGroup , threshold float64 , tms thresholdMetricSelector , opIndex int , collectors []testDataCollector ) []DataItem {
1438
+ func stopCollectingMetrics (tCtx ktesting.TContext , collectorCtx ktesting.TContext , collectorWG * sync.WaitGroup , threshold float64 , tms thresholdMetricSelector , opIndex int , collectors []testDataCollector ) ( []DataItem , error ) {
1436
1439
if collectorCtx == nil {
1437
- tCtx . Fatalf ("op %d: Missing startCollectingMetrics operation before stopping" , opIndex )
1440
+ return nil , fmt . Errorf ("op %d: Missing startCollectingMetrics operation before stopping" , opIndex )
1438
1441
}
1439
1442
collectorCtx .Cancel ("collecting metrics, collector must stop first" )
1440
1443
collectorWG .Wait ()
@@ -1447,7 +1450,7 @@ func stopCollectingMetrics(tCtx ktesting.TContext, collectorCtx ktesting.TContex
1447
1450
tCtx .Errorf ("op %d: %s" , opIndex , err )
1448
1451
}
1449
1452
}
1450
- return dataItems
1453
+ return dataItems , nil
1451
1454
}
1452
1455
1453
1456
type WorkloadExecutor struct {
@@ -1465,7 +1468,7 @@ type WorkloadExecutor struct {
1465
1468
nextNodeIndex int
1466
1469
}
1467
1470
1468
- func runWorkload (tCtx ktesting.TContext , tc * testCase , w * workload , informerFactory informers.SharedInformerFactory ) []DataItem {
1471
+ func runWorkload (tCtx ktesting.TContext , tc * testCase , w * workload , informerFactory informers.SharedInformerFactory ) ( []DataItem , error ) {
1469
1472
b , benchmarking := tCtx .TB ().(* testing.B )
1470
1473
if benchmarking {
1471
1474
start := time .Now ()
@@ -1513,70 +1516,74 @@ func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFact
1513
1516
for opIndex , op := range unrollWorkloadTemplate (tCtx , tc .WorkloadTemplate , w ) {
1514
1517
realOp , err := op .realOp .patchParams (w )
1515
1518
if err != nil {
1516
- tCtx . Fatalf ("op %d: %v" , opIndex , err )
1519
+ return nil , fmt . Errorf ("op %d: %v" , opIndex , err )
1517
1520
}
1518
1521
select {
1519
1522
case <- tCtx .Done ():
1520
- tCtx . Fatalf ("op %d: %v" , opIndex , context .Cause (tCtx ))
1523
+ return nil , fmt . Errorf ("op %d: %v" , opIndex , context .Cause (tCtx ))
1521
1524
default :
1522
1525
}
1523
1526
switch concreteOp := realOp .(type ) {
1524
1527
case * createNodesOp :
1525
- executor .runCreateNodesOp (opIndex , concreteOp )
1528
+ err = executor .runCreateNodesOp (opIndex , concreteOp )
1526
1529
case * createNamespacesOp :
1527
- executor .runCreateNamespaceOp (opIndex , concreteOp )
1530
+ err = executor .runCreateNamespaceOp (opIndex , concreteOp )
1528
1531
case * createPodsOp :
1529
- executor .runCreatePodsOp (opIndex , concreteOp )
1532
+ err = executor .runCreatePodsOp (opIndex , concreteOp )
1530
1533
case * deletePodsOp :
1531
- executor .runDeletePodsOp (opIndex , concreteOp )
1534
+ err = executor .runDeletePodsOp (opIndex , concreteOp )
1532
1535
case * churnOp :
1533
- executor .runChurnOp (opIndex , concreteOp )
1536
+ err = executor .runChurnOp (opIndex , concreteOp )
1534
1537
case * barrierOp :
1535
- executor .runBarrierOp (opIndex , concreteOp )
1538
+ err = executor .runBarrierOp (opIndex , concreteOp )
1536
1539
case * sleepOp :
1537
1540
executor .runSleepOp (concreteOp )
1538
1541
case * startCollectingMetricsOp :
1539
- executor .runStartCollectingMetricsOp (opIndex , concreteOp )
1542
+ err = executor .runStartCollectingMetricsOp (opIndex , concreteOp )
1540
1543
case * stopCollectingMetricsOp :
1541
- executor .runStopCollectingMetrics (opIndex )
1544
+ err = executor .runStopCollectingMetrics (opIndex )
1542
1545
default :
1543
- executor .runDefaultOp (opIndex , concreteOp )
1546
+ err = executor .runDefaultOp (opIndex , concreteOp )
1547
+ }
1548
+ if err != nil {
1549
+ return nil , err
1544
1550
}
1545
1551
}
1546
1552
1547
1553
// check unused params and inform users
1548
1554
unusedParams := w .unusedParams ()
1549
1555
if len (unusedParams ) != 0 {
1550
- tCtx . Fatalf ("the parameters %v are defined on workload %s, but unused.\n Please make sure there are no typos." , unusedParams , w .Name )
1556
+ return nil , fmt . Errorf ("the parameters %v are defined on workload %s, but unused.\n Please make sure there are no typos." , unusedParams , w .Name )
1551
1557
}
1552
1558
1553
1559
// Some tests have unschedulable pods. Do not add an implicit barrier at the
1554
1560
// end as we do not want to wait for them.
1555
- return executor .dataItems
1561
+ return executor .dataItems , nil
1556
1562
}
1557
1563
1558
- func (e * WorkloadExecutor ) runCreateNodesOp (opIndex int , op * createNodesOp ) {
1564
+ func (e * WorkloadExecutor ) runCreateNodesOp (opIndex int , op * createNodesOp ) error {
1559
1565
nodePreparer , err := getNodePreparer (fmt .Sprintf ("node-%d-" , opIndex ), op , e .tCtx .Client ())
1560
1566
if err != nil {
1561
- e . tCtx . Fatalf ("op %d: %v" , opIndex , err )
1567
+ return fmt . Errorf ("op %d: %v" , opIndex , err )
1562
1568
}
1563
1569
if err := nodePreparer .PrepareNodes (e .tCtx , e .nextNodeIndex ); err != nil {
1564
- e . tCtx . Fatalf ("op %d: %v" , opIndex , err )
1570
+ return fmt . Errorf ("op %d: %v" , opIndex , err )
1565
1571
}
1566
1572
e .nextNodeIndex += op .Count
1573
+ return nil
1567
1574
}
1568
1575
1569
- func (e * WorkloadExecutor ) runCreateNamespaceOp (opIndex int , op * createNamespacesOp ) {
1576
+ func (e * WorkloadExecutor ) runCreateNamespaceOp (opIndex int , op * createNamespacesOp ) error {
1570
1577
nsPreparer , err := newNamespacePreparer (e .tCtx , op )
1571
1578
if err != nil {
1572
- e . tCtx . Fatalf ("op %d: %v" , opIndex , err )
1579
+ return fmt . Errorf ("op %d: %v" , opIndex , err )
1573
1580
}
1574
1581
if err := nsPreparer .prepare (e .tCtx ); err != nil {
1575
1582
err2 := nsPreparer .cleanup (e .tCtx )
1576
1583
if err2 != nil {
1577
1584
err = fmt .Errorf ("prepare: %w; cleanup: %w" , err , err2 )
1578
1585
}
1579
- e . tCtx . Fatalf ("op %d: %v" , opIndex , err )
1586
+ return fmt . Errorf ("op %d: %v" , opIndex , err )
1580
1587
}
1581
1588
for _ , n := range nsPreparer .namespaces () {
1582
1589
if _ , ok := e .numPodsScheduledPerNamespace [n ]; ok {
@@ -1585,25 +1592,26 @@ func (e *WorkloadExecutor) runCreateNamespaceOp(opIndex int, op *createNamespace
1585
1592
}
1586
1593
e .numPodsScheduledPerNamespace [n ] = 0
1587
1594
}
1595
+ return nil
1588
1596
}
1589
1597
1590
- func (e * WorkloadExecutor ) runBarrierOp (opIndex int , op * barrierOp ) {
1598
+ func (e * WorkloadExecutor ) runBarrierOp (opIndex int , op * barrierOp ) error {
1591
1599
for _ , namespace := range op .Namespaces {
1592
1600
if _ , ok := e .numPodsScheduledPerNamespace [namespace ]; ! ok {
1593
- e . tCtx . Fatalf ("op %d: unknown namespace %s" , opIndex , namespace )
1601
+ return fmt . Errorf ("op %d: unknown namespace %s" , opIndex , namespace )
1594
1602
}
1595
1603
}
1596
1604
switch op .StageRequirement {
1597
1605
case Attempted :
1598
1606
if err := waitUntilPodsAttempted (e .tCtx , e .podInformer , op .LabelSelector , op .Namespaces , e .numPodsScheduledPerNamespace ); err != nil {
1599
- e . tCtx . Fatalf ("op %d: %v" , opIndex , err )
1607
+ return fmt . Errorf ("op %d: %v" , opIndex , err )
1600
1608
}
1601
1609
case Scheduled :
1602
1610
// Default should be treated like "Scheduled", so handling both in the same way.
1603
1611
fallthrough
1604
1612
default :
1605
1613
if err := waitUntilPodsScheduled (e .tCtx , e .podInformer , op .LabelSelector , op .Namespaces , e .numPodsScheduledPerNamespace ); err != nil {
1606
- e . tCtx . Fatalf ("op %d: %v" , opIndex , err )
1614
+ return fmt . Errorf ("op %d: %v" , opIndex , err )
1607
1615
}
1608
1616
// At the end of the barrier, we can be sure that there are no pods
1609
1617
// pending scheduling in the namespaces that we just blocked on.
@@ -1615,6 +1623,7 @@ func (e *WorkloadExecutor) runBarrierOp(opIndex int, op *barrierOp) {
1615
1623
}
1616
1624
}
1617
1625
}
1626
+ return nil
1618
1627
}
1619
1628
1620
1629
func (e * WorkloadExecutor ) runSleepOp (op * sleepOp ) {
@@ -1624,13 +1633,17 @@ func (e *WorkloadExecutor) runSleepOp(op *sleepOp) {
1624
1633
}
1625
1634
}
1626
1635
1627
- func (e * WorkloadExecutor ) runStopCollectingMetrics (opIndex int ) {
1628
- items := stopCollectingMetrics (e .tCtx , e .collectorCtx , & e .collectorWG , e .workload .Threshold , * e .workload .ThresholdMetricSelector , opIndex , e .collectors )
1636
+ func (e * WorkloadExecutor ) runStopCollectingMetrics (opIndex int ) error {
1637
+ items , err := stopCollectingMetrics (e .tCtx , e .collectorCtx , & e .collectorWG , e .workload .Threshold , * e .workload .ThresholdMetricSelector , opIndex , e .collectors )
1638
+ if err != nil {
1639
+ return err
1640
+ }
1629
1641
e .dataItems = append (e .dataItems , items ... )
1630
1642
e .collectorCtx = nil
1643
+ return nil
1631
1644
}
1632
1645
1633
- func (e * WorkloadExecutor ) runCreatePodsOp (opIndex int , op * createPodsOp ) {
1646
+ func (e * WorkloadExecutor ) runCreatePodsOp (opIndex int , op * createPodsOp ) error {
1634
1647
// define Pod's namespace automatically, and create that namespace.
1635
1648
namespace := fmt .Sprintf ("namespace-%d" , opIndex )
1636
1649
if op .Namespace != nil {
@@ -1643,17 +1656,21 @@ func (e *WorkloadExecutor) runCreatePodsOp(opIndex int, op *createPodsOp) {
1643
1656
1644
1657
if op .CollectMetrics {
1645
1658
if e .collectorCtx != nil {
1646
- e .tCtx .Fatalf ("op %d: Metrics collection is overlapping. Probably second collector was started before stopping a previous one" , opIndex )
1659
+ return fmt .Errorf ("op %d: Metrics collection is overlapping. Probably second collector was started before stopping a previous one" , opIndex )
1660
+ }
1661
+ var err error
1662
+ e .collectorCtx , e .collectors , err = startCollectingMetrics (e .tCtx , & e .collectorWG , e .podInformer , e .testCase .MetricsCollectorConfig , e .throughputErrorMargin , opIndex , namespace , []string {namespace }, nil )
1663
+ if err != nil {
1664
+ return err
1647
1665
}
1648
- e .collectorCtx , e .collectors = startCollectingMetrics (e .tCtx , & e .collectorWG , e .podInformer , e .testCase .MetricsCollectorConfig , e .throughputErrorMargin , opIndex , namespace , []string {namespace }, nil )
1649
1666
e .tCtx .TB ().Cleanup (func () {
1650
1667
if e .collectorCtx != nil {
1651
1668
e .collectorCtx .Cancel ("cleaning up" )
1652
1669
}
1653
1670
})
1654
1671
}
1655
1672
if err := createPodsRapidly (e .tCtx , namespace , op ); err != nil {
1656
- e . tCtx . Fatalf ("op %d: %v" , opIndex , err )
1673
+ return fmt . Errorf ("op %d: %v" , opIndex , err )
1657
1674
}
1658
1675
switch {
1659
1676
case op .SkipWaitToCompletion :
@@ -1662,29 +1679,33 @@ func (e *WorkloadExecutor) runCreatePodsOp(opIndex int, op *createPodsOp) {
1662
1679
e .numPodsScheduledPerNamespace [namespace ] += op .Count
1663
1680
case op .SteadyState :
1664
1681
if err := createPodsSteadily (e .tCtx , namespace , e .podInformer , op ); err != nil {
1665
- e . tCtx . Fatalf ("op %d: %v" , opIndex , err )
1682
+ return fmt . Errorf ("op %d: %v" , opIndex , err )
1666
1683
}
1667
1684
default :
1668
1685
if err := waitUntilPodsScheduledInNamespace (e .tCtx , e .podInformer , nil , namespace , op .Count ); err != nil {
1669
- e . tCtx . Fatalf ("op %d: error in waiting for pods to get scheduled: %v" , opIndex , err )
1686
+ return fmt . Errorf ("op %d: error in waiting for pods to get scheduled: %v" , opIndex , err )
1670
1687
}
1671
1688
}
1672
1689
if op .CollectMetrics {
1673
1690
// CollectMetrics and SkipWaitToCompletion can never be true at the
1674
1691
// same time, so if we're here, it means that all pods have been
1675
1692
// scheduled.
1676
- items := stopCollectingMetrics (e .tCtx , e .collectorCtx , & e .collectorWG , e .workload .Threshold , * e .workload .ThresholdMetricSelector , opIndex , e .collectors )
1693
+ items , err := stopCollectingMetrics (e .tCtx , e .collectorCtx , & e .collectorWG , e .workload .Threshold , * e .workload .ThresholdMetricSelector , opIndex , e .collectors )
1694
+ if err != nil {
1695
+ return err
1696
+ }
1677
1697
e .dataItems = append (e .dataItems , items ... )
1678
1698
e .collectorCtx = nil
1679
1699
}
1700
+ return nil
1680
1701
}
1681
1702
1682
- func (e * WorkloadExecutor ) runDeletePodsOp (opIndex int , op * deletePodsOp ) {
1703
+ func (e * WorkloadExecutor ) runDeletePodsOp (opIndex int , op * deletePodsOp ) error {
1683
1704
labelSelector := labels .ValidatedSetSelector (op .LabelSelector )
1684
1705
1685
1706
podsToDelete , err := e .podInformer .Lister ().Pods (op .Namespace ).List (labelSelector )
1686
1707
if err != nil {
1687
- e . tCtx . Fatalf ("op %d: error in listing pods in the namespace %s: %v" , opIndex , op .Namespace , err )
1708
+ return fmt . Errorf ("op %d: error in listing pods in the namespace %s: %v" , opIndex , op .Namespace , err )
1688
1709
}
1689
1710
1690
1711
deletePods := func (opIndex int ) {
@@ -1727,9 +1748,10 @@ func (e *WorkloadExecutor) runDeletePodsOp(opIndex int, op *deletePodsOp) {
1727
1748
} else {
1728
1749
deletePods (opIndex )
1729
1750
}
1751
+ return nil
1730
1752
}
1731
1753
1732
- func (e * WorkloadExecutor ) runChurnOp (opIndex int , op * churnOp ) {
1754
+ func (e * WorkloadExecutor ) runChurnOp (opIndex int , op * churnOp ) error {
1733
1755
var namespace string
1734
1756
if op .Namespace != nil {
1735
1757
namespace = * op .Namespace
@@ -1740,20 +1762,20 @@ func (e *WorkloadExecutor) runChurnOp(opIndex int, op *churnOp) {
1740
1762
// Ensure the namespace exists.
1741
1763
nsObj := & v1.Namespace {ObjectMeta : metav1.ObjectMeta {Name : namespace }}
1742
1764
if _ , err := e .tCtx .Client ().CoreV1 ().Namespaces ().Create (e .tCtx , nsObj , metav1.CreateOptions {}); err != nil && ! apierrors .IsAlreadyExists (err ) {
1743
- e . tCtx . Fatalf ("op %d: unable to create namespace %v: %v" , opIndex , namespace , err )
1765
+ return fmt . Errorf ("op %d: unable to create namespace %v: %v" , opIndex , namespace , err )
1744
1766
}
1745
1767
1746
1768
var churnFns []func (name string ) string
1747
1769
1748
1770
for i , path := range op .TemplatePaths {
1749
1771
unstructuredObj , gvk , err := getUnstructuredFromFile (path )
1750
1772
if err != nil {
1751
- e . tCtx . Fatalf ("op %d: unable to parse the %v-th template path: %v" , opIndex , i , err )
1773
+ return fmt . Errorf ("op %d: unable to parse the %v-th template path: %v" , opIndex , i , err )
1752
1774
}
1753
1775
// Obtain GVR.
1754
1776
mapping , err := restMapper .RESTMapping (gvk .GroupKind (), gvk .Version )
1755
1777
if err != nil {
1756
- e . tCtx . Fatalf ("op %d: unable to find GVR for %v: %v" , opIndex , gvk , err )
1778
+ return fmt . Errorf ("op %d: unable to find GVR for %v: %v" , opIndex , gvk , err )
1757
1779
}
1758
1780
gvr := mapping .Resource
1759
1781
// Distinguish cluster-scoped with namespaced API objects.
@@ -1833,41 +1855,49 @@ func (e *WorkloadExecutor) runChurnOp(opIndex int, op *churnOp) {
1833
1855
}
1834
1856
}()
1835
1857
}
1858
+ return nil
1836
1859
}
1837
1860
1838
- func (e * WorkloadExecutor ) runDefaultOp (opIndex int , op realOp ) {
1861
+ func (e * WorkloadExecutor ) runDefaultOp (opIndex int , op realOp ) error {
1839
1862
runable , ok := op .(runnableOp )
1840
1863
if ! ok {
1841
- e . tCtx . Fatalf ("op %d: invalid op %v" , opIndex , op )
1864
+ return fmt . Errorf ("op %d: invalid op %v" , opIndex , op )
1842
1865
}
1843
1866
for _ , namespace := range runable .requiredNamespaces () {
1844
1867
createNamespaceIfNotPresent (e .tCtx , namespace , & e .numPodsScheduledPerNamespace )
1845
1868
}
1846
1869
runable .run (e .tCtx )
1870
+ return nil
1847
1871
}
1848
1872
1849
- func (e * WorkloadExecutor ) runStartCollectingMetricsOp (opIndex int , op * startCollectingMetricsOp ) {
1873
+ func (e * WorkloadExecutor ) runStartCollectingMetricsOp (opIndex int , op * startCollectingMetricsOp ) error {
1850
1874
if e .collectorCtx != nil {
1851
- e .tCtx .Fatalf ("op %d: Metrics collection is overlapping. Probably second collector was started before stopping a previous one" , opIndex )
1875
+ return fmt .Errorf ("op %d: Metrics collection is overlapping. Probably second collector was started before stopping a previous one" , opIndex )
1876
+ }
1877
+ var err error
1878
+ e .collectorCtx , e .collectors , err = startCollectingMetrics (e .tCtx , & e .collectorWG , e .podInformer , e .testCase .MetricsCollectorConfig , e .throughputErrorMargin , opIndex , op .Name , op .Namespaces , op .LabelSelector )
1879
+ if err != nil {
1880
+ return err
1852
1881
}
1853
- e .collectorCtx , e .collectors = startCollectingMetrics (e .tCtx , & e .collectorWG , e .podInformer , e .testCase .MetricsCollectorConfig , e .throughputErrorMargin , opIndex , op .Name , op .Namespaces , op .LabelSelector )
1854
1882
e .tCtx .TB ().Cleanup (func () {
1855
1883
if e .collectorCtx != nil {
1856
1884
e .collectorCtx .Cancel ("cleaning up" )
1857
1885
}
1858
1886
})
1887
+ return nil
1859
1888
}
1860
1889
1861
- func createNamespaceIfNotPresent (tCtx ktesting.TContext , namespace string , podsPerNamespace * map [string ]int ) {
1890
+ func createNamespaceIfNotPresent (tCtx ktesting.TContext , namespace string , podsPerNamespace * map [string ]int ) error {
1862
1891
if _ , ok := (* podsPerNamespace )[namespace ]; ! ok {
1863
1892
// The namespace has not created yet.
1864
1893
// So, create that and register it.
1865
1894
_ , err := tCtx .Client ().CoreV1 ().Namespaces ().Create (tCtx , & v1.Namespace {ObjectMeta : metav1.ObjectMeta {Name : namespace }}, metav1.CreateOptions {})
1866
1895
if err != nil {
1867
- tCtx . Fatalf ("failed to create namespace for Pod: %v" , namespace )
1896
+ return fmt . Errorf ("failed to create namespace for Pod: %v" , namespace )
1868
1897
}
1869
1898
(* podsPerNamespace )[namespace ] = 0
1870
1899
}
1900
+ return nil
1871
1901
}
1872
1902
1873
1903
type testDataCollector interface {
0 commit comments